[CalendarServer-changes] [5978] CalendarServer/branches/users/glyph/sql-store

source_changes at macosforge.org source_changes at macosforge.org
Wed Aug 4 13:17:08 PDT 2010


Revision: 5978
          http://trac.macosforge.org/projects/calendarserver/changeset/5978
Author:   glyph at apple.com
Date:     2010-08-04 13:17:07 -0700 (Wed, 04 Aug 2010)
Log Message:
-----------
use a common database across tests to avoid expensive initialization, and have a less race-prone test setup

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/sql-store/txcaldav/calendarstore/test/test_postgres.py
    CalendarServer/branches/users/glyph/sql-store/txdav/datastore/subpostgres.py
    CalendarServer/branches/users/glyph/sql-store/txdav/datastore/test/test_subpostgres.py

Modified: CalendarServer/branches/users/glyph/sql-store/txcaldav/calendarstore/test/test_postgres.py
===================================================================
--- CalendarServer/branches/users/glyph/sql-store/txcaldav/calendarstore/test/test_postgres.py	2010-08-04 20:12:53 UTC (rev 5977)
+++ CalendarServer/branches/users/glyph/sql-store/txcaldav/calendarstore/test/test_postgres.py	2010-08-04 20:17:07 UTC (rev 5978)
@@ -14,20 +14,45 @@
 # limitations under the License.
 ##
 
+"""
+Tests for txcaldav.calendarstore.postgres, mostly based on
+L{txcaldav.calendarstore.test.common}.
+"""
+
+
 from txcaldav.calendarstore.test.common import CommonTests
 
 from twisted.trial import unittest
-from txdav.datastore.subpostgres import PostgresService
+from txdav.datastore.subpostgres import PostgresService, \
+    DiagnosticConnectionWrapper
 from txcaldav.calendarstore.postgres import PostgresStore, v1_schema
 from twisted.internet.defer import Deferred
 from twisted.internet import reactor
 from twext.python.filepath import CachingFilePath
 from twext.python.vcomponent import VComponent
 from twisted.internet.task import deferLater
+from twisted.python import log
+import gc
 
 
 
+def allInstancesOf(cls):
+    for o in gc.get_referrers(cls):
+        if isinstance(o, cls):
+            yield o
+
+
+
+def dumpConnectionStatus():
+    print '+++ ALL CONNECTIONS +++'
+    for connection in allInstancesOf(DiagnosticConnectionWrapper):
+        print connection.label, connection.state
+    print '--- CONNECTIONS END ---'
+
+
+
 sharedService = None
+currentTestID = None
 
 class SQLStorageTests(CommonTests, unittest.TestCase):
     """
@@ -36,54 +61,76 @@
 
     def setUp(self):
         global sharedService
+        global currentTestID
+        currentTestID = self.id()
         if sharedService is None:
             ready = Deferred()
             def getReady(connectionFactory):
                 global calendarStore
-                calendarStore = PostgresStore(connectionFactory)
-                self.populate()
-                ready.callback(None)
+                try:
+                    calendarStore = PostgresStore(
+                        lambda label=None: connectionFactory(
+                            label or currentTestID
+                        )
+                    )
+                except:
+                    ready.errback()
+                    raise
+                else:
+                    self.cleanAndPopulate().chainDeferred(ready)
                 return calendarStore
             sharedService = PostgresService(
-                CachingFilePath("pg"), getReady, v1_schema, "caldav"
+                CachingFilePath("../_test_postgres_db"),
+                getReady, v1_schema, "caldav"
             )
             sharedService.startService()
             def startStopping():
-                for pipe in sharedService.monitor.transport.pipes.values():
-                    pipe.startReading()
-                    pipe.startWriting()
-                sharedService.stopService()
-            reactor.addSystemEventTrigger(
+                log.msg("Starting stopping.")
+                sharedService.unpauseMonitor()
+                dumpConnectionStatus()
+                return sharedService.stopService()
+            reactor.addSystemEventTrigger(#@UndefinedVariable
                 "before", "shutdown", startStopping)
             return ready
         else:
-            cleanupConn = calendarStore.connectionFactory()
-            cursor = cleanupConn.cursor()
-            cursor.execute("delete from RESOURCE_PROPERTY")
-            cursor.execute("delete from ATTACHMENT")
-            cursor.execute("delete from CALENDAR_OBJECT")
-            cursor.execute("delete from CALENDAR_BIND")
-            cursor.execute("delete from CALENDAR")
-            cursor.execute("delete from CALENDAR_HOME")
-            cleanupConn.commit()
-            cleanupConn.close()
-            self.populate()
-            for pipe in sharedService.monitor.transport.pipes.values():
-                pipe.startReading()
-                pipe.startWriting()
-            # I need to allow the log buffer to unspool.
-            return deferLater(reactor, 0.1, lambda : None)
+            return self.cleanAndPopulate()
 
 
+    def cleanAndPopulate(self):
+        """
+        Delete everything from the database, then clean it up.
+        """
+        dumpConnectionStatus()
+        cleanupConn = calendarStore.connectionFactory(
+            "%s schema-cleanup" % (self.id(),)
+        )
+        cursor = cleanupConn.cursor()
+        cursor.execute("delete from RESOURCE_PROPERTY")
+        cleanupConn.commit()
+        cursor.execute("delete from ATTACHMENT")
+        cleanupConn.commit()
+        cursor.execute("delete from CALENDAR_OBJECT")
+        cleanupConn.commit()
+        cursor.execute("delete from CALENDAR_BIND")
+        cleanupConn.commit()
+        cursor.execute("delete from CALENDAR")
+        cleanupConn.commit()
+        cursor.execute("delete from CALENDAR_HOME")
+        cleanupConn.commit()
+        cleanupConn.close()
+        self.populate()
+        sharedService.unpauseMonitor()
+        # I need to allow the log buffer to unspool.
+        return deferLater(reactor, 0.1, lambda : None)
+
+
     def tearDown(self):
+        super(SQLStorageTests, self).tearDown()
         def stopit():
-            for pipe in sharedService.monitor.transport.pipes.values():
-                pipe.stopReading()
-                pipe.stopWriting()
+            sharedService.pauseMonitor()
         return deferLater(reactor, 0.1, stopit)
 
 
-
     def populate(self):
         populateTxn = calendarStore.newTransaction()
         for homeUID in self.requirements:

Modified: CalendarServer/branches/users/glyph/sql-store/txdav/datastore/subpostgres.py
===================================================================
--- CalendarServer/branches/users/glyph/sql-store/txdav/datastore/subpostgres.py	2010-08-04 20:12:53 UTC (rev 5977)
+++ CalendarServer/branches/users/glyph/sql-store/txdav/datastore/subpostgres.py	2010-08-04 20:17:07 UTC (rev 5978)
@@ -19,28 +19,95 @@
 Run and manage PostgreSQL as a subprocess.
 """
 import os
-import pgdb
 
 from twisted.python.procutils import which
 from twisted.internet.utils import getProcessOutput
 from twisted.internet.protocol import ProcessProtocol
+from twisted.python.reflect import namedAny
+from twisted.python import log
 
+pgdb = namedAny("pgdb")
+
 from twisted.protocols.basic import LineReceiver
 from twisted.internet import reactor
-from twisted.internet.defer import Deferred, succeed
+from twisted.internet.defer import Deferred
 
 from twisted.application.service import MultiService
 
 
+# This appears in the postgres log to indicate that it is accepting
+# connections.
 _MAGIC_READY_COOKIE = "database system is ready to accept connections"
 
 
+class DiagnosticCursorWrapper(object):
+    """
+    Diagnostic wrapper around a DB-API 2.0 cursor for debugging connection
+    status.
+    """
+
+    def __init__(self, realCursor, connectionWrapper):
+        self.realCursor = realCursor
+        self.connectionWrapper = connectionWrapper
+
+
+    @property
+    def rowcount(self):
+        return self.realCursor.rowcount
+
+
+    def execute(self, sql, args=()):
+        self.connectionWrapper.state = 'executing %r' % (sql,)
+        self.realCursor.execute(sql, args)
+
+
+    def close(self):
+        self.realCursor.close()
+
+
+    def fetchall(self):
+        return self.realCursor.fetchall()
+
+
+
+class DiagnosticConnectionWrapper(object):
+    """
+    Diagnostic wrapper around a DB-API 2.0 connection for debugging connection
+    status.
+    """
+
+    def __init__(self, realConnection, label):
+        self.realConnection = realConnection
+        self.label = label
+        self.state = 'idle (start)'
+
+
+    def cursor(self):
+        return DiagnosticCursorWrapper(self.realConnection.cursor(), self)
+
+
+    def close(self):
+        self.realConnection.close()
+        self.state = 'closed'
+
+
+    def commit(self):
+        self.realConnection.commit()
+        self.state = 'idle (after commit)'
+
+
+    def rollback(self):
+        self.realConnection.rollback()
+        self.state = 'idle (after rollback)'
+
+
+
 class _PostgresMonitor(ProcessProtocol):
     """
     A monitoring protocol which watches the postgres subprocess.
     """
 
-    def __init__(self, svc):
+    def __init__(self, svc=None):
         self.lineReceiver = LineReceiver()
         self.lineReceiver.delimiter = '\n'
         self.lineReceiver.lineReceived = self.lineReceived
@@ -50,10 +117,11 @@
 
 
     def lineReceived(self, line):
+        if self.svc is None:
+            return
         if not self.isReady:
             if _MAGIC_READY_COOKIE in line:
                 self.svc.ready()
-        print 'log output:', repr(line)
 
 
     disconnecting = False
@@ -62,16 +130,17 @@
 
 
     def outReceived(self, out):
-        print 'received postgres output', out
+        log.msg("received postgres stdout %r" % (out,))
         # self.lineReceiver.dataReceived(out)
 
 
     def errReceived(self, err):
-        print 'postgress err received', repr(err)
+        log.msg("received postgres stderr %r" % (err,))
         self.lineReceiver.dataReceived(err)
 
 
     def processEnded(self, reason):
+        log.msg("postgres process ended %r" % (reason,))
         self.lineReceiver.connectionLost(reason)
         self.completionDeferred.callback(None)
 
@@ -97,96 +166,126 @@
         self.databaseName = databaseName
         self.schema = schema
         self.monitor = None
+        self.openConnections = []
 
 
-    def produceConnection(self):
+    def produceConnection(self, label="<unlabeled>", databaseName=None):
         """
         Produce a DB-API 2.0 connection pointed at this database.
         """
-        return pgdb.connect(
-            "%s:dbname=%s" % (
-                self.socketDir.path,
-                self.databaseName
-            )
+        if databaseName is None:
+            databaseName = self.databaseName
+        connection = pgdb.connect(
+            "%s:dbname=%s" % (self.socketDir.path, databaseName)
         )
+        w = DiagnosticConnectionWrapper(connection, label)
+        c = w.cursor()
+        c.execute("set standard_conforming_strings=on")
+        w.commit()
+        c.close()
+        return w
 
 
     def ready(self):
         """
         Subprocess is ready.  Time to initialize the subservice.
         """
-        if self.firstTime:
-            createDatabaseConn = pgdb.connect(
-                self.socketDir.path + ":dbname=template1"
-            )
-            createDatabaseCursor = createDatabaseConn.cursor()
-            createDatabaseCursor.execute("commit")
+        createDatabaseConn = self.produceConnection(
+            'schema creation', 'postgres'
+        )
+        createDatabaseCursor = createDatabaseConn.cursor()
+        createDatabaseCursor.execute("commit")
+        try:
             createDatabaseCursor.execute(
                 "create database %s" % (self.databaseName)
             )
-            createDatabaseCursor.close()
-            createDatabaseConn.close()
-            print 'executing schema', repr(self.schema)
+        except:
+            execSchema = False
+        else:
+            execSchema = True
+        createDatabaseCursor.close()
+        createDatabaseConn.close()
+        if execSchema:
             connection = self.produceConnection()
             cursor = connection.cursor()
             cursor.execute(self.schema)
             connection.commit()
             connection.close()
-        print 'creating subservice'
         self.subServiceFactory(self.produceConnection).setServiceParent(self)
-        print 'subservice created'
 
 
+    def pauseMonitor(self):
+        """
+        Pause monitoring.  This is a testing hook for when (if) we are
+        continuously monitoring output from the 'postgres' process.
+        """
+#        for pipe in self.monitor.transport.pipes.values():
+#            pipe.stopReading()
+#            pipe.stopWriting()
+
+
+    def unpauseMonitor(self):
+        """
+        Unpause monitoring.
+        
+        @see: L{pauseMonitor} 
+        """
+#        for pipe in self.monitor.transport.pipes.values():
+#            pipe.startReading()
+#            pipe.startWriting()
+
+
     def startDatabase(self):
         """
         Start the database and initialize the subservice.
         """
         monitor = _PostgresMonitor(self)
-        postgres = which("postgres")[0]
+        pg_ctl = which("pg_ctl")[0]
         # check consistency of initdb and postgres?
         reactor.spawnProcess(
-            monitor, postgres,
+            monitor, pg_ctl,
             [
-                postgres,
-                "-k", self.socketDir.path,
-                # "-N", "5000",
+                pg_ctl,
+                "start",
+                "-l", "logfile",
+                "-w",
+                # XXX what are the quoting rules for '-o'?  do I need to repr()
+                # the path here?
+                "-o", "-k '%s' -c standard_conforming_strings=on"
+                % (self.socketDir.path,),
             ],
             self.env
         )
         self.monitor = monitor
+        def gotReady(result):
+            self.ready()
+        def reportit(f):
+            log.err(f)
+        self.monitor.completionDeferred.addCallback(
+            gotReady).addErrback(reportit)
 
 
     def startService(self):
         MultiService.startService(self)
-        self.dataStoreDirectory.createDirectory()
         clusterDir = self.dataStoreDirectory.child("cluster")
         self.socketDir = self.dataStoreDirectory.child("socket")
-        self.socketDir.createDirectory()
         workingDir = self.dataStoreDirectory.child("working")
         env = self.env = os.environ.copy()
-        env.update(PGDATA=clusterDir.path)
+        env.update(PGDATA=clusterDir.path,
+                   PGHOST=self.socketDir.path)
         initdb = which("initdb")[0]
         if clusterDir.isdir():
-            self.firstTime = False
             self.startDatabase()
         else:
+            self.dataStoreDirectory.createDirectory()
+            self.socketDir.createDirectory()
             workingDir.createDirectory()
-            self.firstTime = True
-            print 'Creating database'
             dbInited = getProcessOutput(
                 initdb, [], env, workingDir.path, errortoo=True
             )
             def doCreate(result):
-                print '--- initdb ---'
-                print result
-                print '/// initdb ///'
                 self.startDatabase()
-            dbInited.addCallback(
-                doCreate
-            )
-            def showme(result):
-                print 'SHOW ME:', result.getTraceback()
-            dbInited.addErrback(showme)
+            dbInited.addCallback(doCreate)
 
 
     def stopService(self):
@@ -194,10 +293,22 @@
         Stop all child services, then stop the subprocess, if it's running.
         """
         d = MultiService.stopService(self)
-        def maybeStopSubprocess(result):
-            if self.monitor is not None:
-                self.monitor.transport.signalProcess("INT")
-                return self.monitor.completionDeferred
-            return result
-        d.addCallback(maybeStopSubprocess)
-        return d
+        def superStopped(result):
+            # Probably want to stop and wait for startup if that hasn't
+            # completed yet...
+            monitor = _PostgresMonitor()
+            pg_ctl = which("pg_ctl")[0]
+            reactor.spawnProcess(monitor, pg_ctl,
+                [pg_ctl, '-l', 'logfile', 'stop'],
+                self.env
+            )
+            return monitor.completionDeferred
+        return d.addCallback(superStopped)
+
+#        def maybeStopSubprocess(result):
+#            if self.monitor is not None:
+#                self.monitor.transport.signalProcess("INT")
+#                return self.monitor.completionDeferred
+#            return result
+#        d.addCallback(maybeStopSubprocess)
+#        return d

Modified: CalendarServer/branches/users/glyph/sql-store/txdav/datastore/test/test_subpostgres.py
===================================================================
--- CalendarServer/branches/users/glyph/sql-store/txdav/datastore/test/test_subpostgres.py	2010-08-04 20:12:53 UTC (rev 5977)
+++ CalendarServer/branches/users/glyph/sql-store/txdav/datastore/test/test_subpostgres.py	2010-08-04 20:17:07 UTC (rev 5978)
@@ -13,18 +13,19 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 ##
-from txdav.datastore.subpostgres import PostgresService
-from twisted.internet.defer import inlineCallbacks, Deferred
-from twisted.application.service import Service
 
 """
 Tests for txdav.datastore.subpostgres.
 """
 
-from twext.python.filepath import CachingFilePath
 from twisted.trial.unittest import TestCase
 
+from twext.python.filepath import CachingFilePath
 
+from txdav.datastore.subpostgres import PostgresService
+from twisted.internet.defer import inlineCallbacks, Deferred
+from twisted.application.service import Service
+
 class SubprocessStartup(TestCase):
     """
     Tests for starting and stopping the subprocess.
@@ -41,26 +42,33 @@
 
         test = self
         class SimpleService(Service):
+
             instances = []
             rows = []
             ready = Deferred()
+
             def __init__(self, connectionFactory):
                 self.connection = connectionFactory()
                 test.addCleanup(self.connection.close)
-                print 'CREATING simpleservice'
                 self.instances.append(self)
 
+
             def startService(self):
-                print 'STARTING simpleservice'
                 cursor = self.connection.cursor()
-                cursor.execute(
-                    "insert into test_dummy_table values ('dummy')"
-                )
-                cursor.close()
-                self.ready.callback(None)
+                try:
+                    cursor.execute(
+                        "insert into test_dummy_table values ('dummy')"
+                    )
+                except:
+                    self.ready.errback()
+                else:
+                    self.ready.callback(None)
+                finally:
+                    cursor.close()
 
+
         svc = PostgresService(
-            CachingFilePath("database"),
+            CachingFilePath("../_postgres_test_db"),
             SimpleService,
             "create table TEST_DUMMY_TABLE (stub varchar)",
             "dummy_db"
@@ -72,5 +80,5 @@
         connection = SimpleService.instances[0].connection
         cursor = connection.cursor()
         cursor.execute("select * from test_dummy_table")
-        values = list(cursor)
+        values = cursor.fetchall()
         self.assertEquals(values, [["dummy"]])
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20100804/071f48a0/attachment-0001.html>


More information about the calendarserver-changes mailing list