[CalendarServer-changes] [6492] CalendarServer/branches/users/glyph/sharedpool

source_changes at macosforge.org source_changes at macosforge.org
Mon Nov 1 14:12:00 PDT 2010


Revision: 6492
          http://trac.macosforge.org/projects/calendarserver/changeset/6492
Author:   glyph at apple.com
Date:     2010-11-01 14:11:58 -0700 (Mon, 01 Nov 2010)
Log Message:
-----------
refactor data store to take an async SQL transaction instead of a synchronous one

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/util.py
    CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py
    CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/sql.py
    CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/test/util.py

Modified: CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/util.py
===================================================================
--- CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/util.py	2010-11-01 21:11:10 UTC (rev 6491)
+++ CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/util.py	2010-11-01 21:11:58 UTC (rev 6492)
@@ -94,7 +94,7 @@
             maxConnections=config.Postgres.MaxConnections,
             options=config.Postgres.Options,
         )
-        return CommonSQLDataStore(postgresService.produceConnection,
+        return CommonSQLDataStore(postgresService.produceLocalTransaction,
             notifierFactory, dbRoot.child("attachments"),
             config.EnableCalDAV, config.EnableCardDAV)
     else:

Modified: CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py
===================================================================
--- CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py	2010-11-01 21:11:10 UTC (rev 6491)
+++ CalendarServer/branches/users/glyph/sharedpool/txdav/base/datastore/subpostgres.py	2010-11-01 21:11:58 UTC (rev 6492)
@@ -20,10 +20,15 @@
 """
 import os
 import pwd
+import sys
 #import thread
 
+from Queue import Queue
+
 from hashlib import md5
 
+from zope.interface import implements
+
 from twisted.python.procutils import which
 from twisted.internet.protocol import ProcessProtocol
 
@@ -35,6 +40,9 @@
 
 from twisted.protocols.basic import LineReceiver
 from twisted.internet import reactor
+from twisted.python.failure import Failure
+from txdav.idav import IAsyncTransaction
+from txdav.idav import AlreadyFinishedError
 from twisted.internet.defer import Deferred
 
 from twisted.application.service import MultiService
@@ -167,6 +175,7 @@
         self.completionDeferred.callback(None)
 
 
+
 class ErrorOutput(Exception):
     """
     The process produced some error output and exited with a non-zero exit
@@ -174,6 +183,7 @@
     """
 
 
+
 class CapturingProcessProtocol(ProcessProtocol):
     """
     A L{ProcessProtocol} that captures its output and error.
@@ -226,6 +236,183 @@
         self.deferred.callback(''.join(self.output))
 
 
+
+_DONE = object()
+
+_STATE_STOPPED = 'STOPPED'
+_STATE_RUNNING = 'RUNNING'
+_STATE_STOPPING = 'STOPPING'
+
+class ThreadHolder(object):
+    """
+    A queue which will hold a reactor threadpool thread open until all of the
+    work in that queue is done.
+    """
+
+    def __init__(self, reactor):
+        self._reactor = reactor
+        self._state = _STATE_STOPPED
+        self._stopper = None
+        self._q = None
+
+
+    def _run(self):
+        """
+        Worker function which runs in a non-reactor thread.
+        """
+        while True:
+            work = self._q.get()
+            if work is _DONE:
+                def finishStopping():
+                    self._state = _STATE_STOPPED
+                    self._q = None
+                    s = self._stopper
+                    self._stopper = None
+                    s.callback(None)
+                self._reactor.callFromThread(finishStopping)
+                return
+            self._oneWorkUnit(*work)
+
+
+    def _oneWorkUnit(self, deferred, instruction):
+        try: 
+            result = instruction()
+        except:
+            etype, evalue, etb = sys.exc_info()
+            def relayFailure():
+                f = Failure(evalue, etype, etb)
+                deferred.errback(f)
+            self._reactor.callFromThread(relayFailure)
+        else:
+            self._reactor.callFromThread(deferred.callback, result)
+
+
+    def submit(self, work):
+        """
+        Submit some work to be run.
+
+        @param work: a 0-argument callable, which will be run in a thread.
+
+        @return: L{Deferred} that fires with the result of L{work}
+        """
+        d = Deferred()
+        self._q.put((d, work))
+        return d
+
+
+    def start(self):
+        """
+        Start this thing, if it's stopped.
+        """
+        if self._state != _STATE_STOPPED:
+            raise RuntimeError("Not stopped.")
+        self._state = _STATE_RUNNING
+        self._q = Queue(0)
+        self._reactor.callInThread(self._run)
+
+
+    def stop(self):
+        """
+        Stop this thing and release its thread, if it's running.
+        """
+        if self._state != _STATE_RUNNING:
+            raise RuntimeError("Not running.")
+        s = self._stopper = Deferred()
+        self._state = _STATE_STOPPING
+        self._q.put(_DONE)
+        return s
+
+
+
+class ThisProcessSqlTxn(object):
+    """
+    L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the
+    current process.
+    """
+    implements(IAsyncTransaction)
+
+    def __init__(self, connectionFactory):
+        """
+        @param connectionFactory: A 0-argument callable which returns a DB-API
+            2.0 connection.
+        """
+        self._completed = False
+        self._holder = ThreadHolder(reactor)
+        self._holder.start()
+        def initCursor():
+            # support threadlevel=1; we can't necessarily cursor() in a
+            # different thread than we do transactions in.
+
+            # FIXME: may need to be pooling ThreadHolders along with
+            # connections, if threadlevel=1 requires connect() be called in the
+            # same thread as cursor() et. al.
+            self._connection = connectionFactory()
+            self._cursor = self._connection.cursor()
+        self._holder.submit(initCursor)
+
+
+    def _reallyExecSQL(self, sql, args=[], raiseOnZeroRowCount=None):
+        self._cursor.execute(sql, args)
+        if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0:
+            raise raiseOnZeroRowCount()
+        if self._cursor.description:
+            return self._cursor.fetchall()
+        else:
+            return None
+
+
+    noisy = False
+
+    def execSQL(self, *args, **kw):
+        result = self._holder.submit(
+            lambda : self._reallyExecSQL(*args, **kw)
+        )
+        if self.noisy:
+            def reportResult(results):
+                sys.stdout.write("\n".join([
+                    "",
+                    "SQL: %r %r" % (args, kw),
+                    "Results: %r" % (results,),
+                    "",
+                    ]))
+                return results
+            result.addBoth(reportResult)
+        return result
+
+
+    def commit(self):
+        if not self._completed:
+            self._completed = True
+            def reallyCommit():
+                self._connection.commit()
+                self._connection.close()
+            result = self._holder.submit(reallyCommit)
+            self._holder.stop()
+            return result
+        else:
+            raise AlreadyFinishedError()
+
+
+    def abort(self):
+        if not self._completed:
+            def reallyAbort():
+                self._connection.rollback()
+                self._connection.close()
+            self._completed = True
+            result = self._holder.submit(reallyAbort)
+            self._holder.stop()
+            return result
+        else:
+            raise AlreadyFinishedError()
+
+
+    def __del__(self):
+        if not self._completed:
+            print 'CommonStoreTransaction.__del__: OK'
+            self.abort()
+
+
+
 class PostgresService(MultiService):
 
     def __init__(self, dataStoreDirectory, subServiceFactory,
@@ -290,6 +477,7 @@
         self.monitor = None
         self.openConnections = []
 
+
     def activateDelayedShutdown(self):
         """
         Call this when starting database initialization code to protect against
@@ -301,6 +489,7 @@
         """
         self.delayedShutdown = True
 
+
     def deactivateDelayedShutdown(self):
         """
         Call this when database initialization code has completed so that the
@@ -310,6 +499,7 @@
         if self.shutdownDeferred:
             self.shutdownDeferred.callback(None)
 
+
     def produceConnection(self, label="<unlabeled>", databaseName=None):
         """
         Produce a DB-API 2.0 connection pointed at this database.
@@ -345,6 +535,13 @@
         return w
 
 
+    def produceLocalTransaction(self, label="<unlabeled>"):
+        """
+        Create a L{IAsyncTransaction} based on a thread in the current process.
+        """
+        return ThisProcessSqlTxn(lambda : self.produceConnection(label))
+
+
     def ready(self):
         """
         Subprocess is ready.  Time to initialize the subservice.
@@ -389,6 +586,7 @@
             # Only continue startup if we've not begun shutdown
             self.subServiceFactory(self.produceConnection).setServiceParent(self)
 
+
     def pauseMonitor(self):
         """
         Pause monitoring.  This is a testing hook for when (if) we are
@@ -417,9 +615,11 @@
         monitor = _PostgresMonitor(self)
         pg_ctl = which("pg_ctl")[0]
         # check consistency of initdb and postgres?
-        
+
         options = []
-        options.append("-c listen_addresses='%s'" % (",".join(self.listenAddresses)))
+        options.append(
+            "-c listen_addresses='%s'" % (",".join(self.listenAddresses))
+        )
         if self.socketDir:
             options.append("-k '%s'" % (self.socketDir.path,))
         options.append("-c shared_buffers=%d" % (self.sharedBuffers,))

Modified: CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/sql.py
===================================================================
--- CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/sql.py	2010-11-01 21:11:10 UTC (rev 6491)
+++ CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/sql.py	2010-11-01 21:11:58 UTC (rev 6492)
@@ -25,26 +25,23 @@
     "CommonHome",
 ]
 
-import sys
 import datetime
-from Queue import Queue
 
-from zope.interface.declarations import implements, directlyProvides
+from zope.interface import implements, directlyProvides
 
+from twext.python.log import Logger, LoggingMixIn
+from twext.web2.dav.element.rfc2518 import ResourceType
+from twext.web2.http_headers import MimeType
+
 from twisted.python import hashlib
 from twisted.python.modules import getModule
 from twisted.python.util import FancyEqMixin
-from twisted.python.failure import Failure
 
-from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
+from twisted.internet.defer import inlineCallbacks, returnValue
 
 from twisted.application.service import Service
 
-from twext.python.log import Logger, LoggingMixIn
 from twext.internet.decorate import memoizedKey
-from twext.web2.dav.element.rfc2518 import ResourceType
-from twext.web2.http_headers import MimeType
 
 from txdav.common.datastore.sql_legacy import PostgresLegacyNotificationsEmulator
 from txdav.caldav.icalendarstore import ICalendarTransaction, ICalendarStore
@@ -60,10 +57,7 @@
     NoSuchObjectResourceError
 from txdav.common.inotifications import INotificationCollection, \
     INotificationObject
-from txdav.idav import IAsyncTransaction
-from txdav.idav import AlreadyFinishedError
 
-
 from txdav.base.propertystore.sql import PropertyStore
 from txdav.base.propertystore.base import PropertyName
 
@@ -89,15 +83,17 @@
 
     implements(ICalendarStore)
 
-    def __init__(self, connectionFactory, notifierFactory, attachmentsPath,
-                 enableCalendars=True, enableAddressBooks=True):
+    def __init__(self, sqlTxnFactory, notifierFactory, attachmentsPath,
+                 enableCalendars=True, enableAddressBooks=True,
+                 label="unlabeled"):
         assert enableCalendars or enableAddressBooks
 
-        self.connectionFactory = connectionFactory
+        self.sqlTxnFactory = sqlTxnFactory
         self.notifierFactory = notifierFactory
         self.attachmentsPath = attachmentsPath
         self.enableCalendars = enableCalendars
         self.enableAddressBooks = enableAddressBooks
+        self.label = label
 
 
     def eachCalendarHome(self):
@@ -117,7 +113,7 @@
     def newTransaction(self, label="unlabeled", migrating=False):
         return CommonStoreTransaction(
             self,
-            self.connectionFactory,
+            self.sqlTxnFactory(),
             self.enableCalendars,
             self.enableAddressBooks,
             self.notifierFactory,
@@ -126,188 +122,7 @@
         )
 
 
-_DONE = object()
 
-_STATE_STOPPED = "STOPPED"
-_STATE_RUNNING = "RUNNING"
-_STATE_STOPPING = "STOPPING"
-
-class ThreadHolder(object):
-    """
-    A queue which will hold a reactor threadpool thread open until all of the
-    work in that queue is done.
-    """
-
-    def __init__(self, reactor):
-        self._reactor = reactor
-        self._state = _STATE_STOPPED
-        self._stopper = None
-        self._q = None
-
-
-    def _run(self):
-        """
-        Worker function which runs in a non-reactor thread.
-        """
-        while True:
-            work = self._q.get()
-            if work is _DONE:
-                def finishStopping():
-                    self._state = _STATE_STOPPED
-                    self._q = None
-                    s = self._stopper
-                    self._stopper = None
-                    s.callback(None)
-                self._reactor.callFromThread(finishStopping)
-                return
-            self._oneWorkUnit(*work)
-
-
-    def _oneWorkUnit(self, deferred, instruction):
-        try: 
-            result = instruction()
-        except:
-            etype, evalue, etb = sys.exc_info()
-            def relayFailure():
-                f = Failure(evalue, etype, etb)
-                deferred.errback(f)
-            self._reactor.callFromThread(relayFailure)
-        else:
-            self._reactor.callFromThread(deferred.callback, result)
-
-
-    def submit(self, work):
-        """
-        Submit some work to be run.
-
-        @param work: a 0-argument callable, which will be run in a thread.
-
-        @return: L{Deferred} that fires with the result of L{work}
-        """
-        d = Deferred()
-        self._q.put((d, work))
-        return d
-
-
-    def start(self):
-        """
-        Start this thing, if it's stopped.
-        """
-        if self._state != _STATE_STOPPED:
-            raise RuntimeError("Not stopped.")
-        self._state = _STATE_RUNNING
-        self._q = Queue(0)
-        self._reactor.callInThread(self._run)
-
-
-    def stop(self):
-        """
-        Stop this thing and release its thread, if it's running.
-        """
-        if self._state != _STATE_RUNNING:
-            raise RuntimeError("Not running.")
-        s = self._stopper = Deferred()
-        self._state = _STATE_STOPPING
-        self._q.put(_DONE)
-        return s
-
-
-
-class ThisProcessSqlTxn(object):
-    """
-    L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the
-    current process.
-    """
-    implements(IAsyncTransaction)
-
-    def __init__(self, connectionFactory):
-        """
-        @param connectionFactory: A 0-argument callable which returns a DB-API
-            2.0 connection.
-        """
-        self._completed = False
-        self._holder = ThreadHolder(reactor)
-        self._holder.start()
-        def initCursor():
-            # support threadlevel=1; we can't necessarily cursor() in a
-            # different thread than we do transactions in.
-
-            # FIXME: may need to be pooling ThreadHolders along with
-            # connections, if threadlevel=1 requires connect() be called in the
-            # same thread as cursor() et. al.
-            self._connection = connectionFactory()
-            self._cursor = self._connection.cursor()
-        self._holder.submit(initCursor)
-
-
-    def store(self):
-        return self._store
-
-
-    def __repr__(self):
-        return "PG-TXN<%s>" % (self._label,)
-
-
-    def _reallyExecSQL(self, sql, args=[], raiseOnZeroRowCount=None):
-        self._cursor.execute(sql, args)
-        if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0:
-            raise raiseOnZeroRowCount()
-        if self._cursor.description:
-            return self._cursor.fetchall()
-        else:
-            return None
-
-
-    def execSQL(self, *args, **kw):
-        result = self._holder.submit(
-            lambda : self._reallyExecSQL(*args, **kw)
-        )
-        if self.noisy:
-            def reportResult(results):
-                sys.stdout.write("\n".join([
-                    "",
-                    "SQL (%d): %r %r" % (self._txid, args, kw),
-                    "Results (%d): %r" % (self._txid, results,),
-                    "",
-                    ]))
-                return results
-            result.addBoth(reportResult)
-        return result
-
-
-    def commit(self):
-        if not self._completed:
-            self._completed = True
-            def reallyCommit():
-                self._connection.commit()
-                self._connection.close()
-            result = self._holder.submit(reallyCommit)
-            self._holder.stop()
-            return result
-        else:
-            raise AlreadyFinishedError()
-
-
-    def abort(self):
-        if not self._completed:
-            def reallyAbort():
-                self._connection.rollback()
-                self._connection.close()
-            self._completed = True
-            result = self._holder.submit(reallyAbort)
-            self._holder.stop()
-            return result
-        else:
-            raise AlreadyFinishedError()
-
-
-    def __del__(self):
-        if not self._completed:
-            print "CommonStoreTransaction.__del__: OK"
-            self.abort()
-
-
-
 class CommonStoreTransaction(object):
     """
     Transaction implementation for SQL database.
@@ -317,7 +132,7 @@
 
     id = 0
 
-    def __init__(self, store, cursorFactory,
+    def __init__(self, store, sqlTxn,
                  enableCalendars, enableAddressBooks,
                  notifierFactory, label, migrating=False):
         self._store = store
@@ -345,7 +160,7 @@
         CommonStoreTransaction._homeClass[EADDRESSBOOKTYPE] = AddressBookHome
         CommonStoreTransaction._homeTable[ECALENDARTYPE] = CALENDAR_HOME_TABLE
         CommonStoreTransaction._homeTable[EADDRESSBOOKTYPE] = ADDRESSBOOK_HOME_TABLE
-        self._sqlTxn = ThisProcessSqlTxn(cursorFactory)
+        self._sqlTxn = sqlTxn
 
 
     def store(self):

Modified: CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/test/util.py
===================================================================
--- CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/test/util.py	2010-11-01 21:11:10 UTC (rev 6491)
+++ CalendarServer/branches/users/glyph/sharedpool/txdav/common/datastore/test/util.py	2010-11-01 21:11:58 UTC (rev 6492)
@@ -28,7 +28,7 @@
 from twext.python.vcomponent import VComponent
 
 from twisted.internet import reactor
-from twisted.internet.defer import Deferred, succeed, inlineCallbacks
+from twisted.internet.defer import Deferred, inlineCallbacks
 from twisted.internet.task import deferLater
 from twisted.python import log
 
@@ -52,6 +52,8 @@
         print connection.label, connection.state
     print '--- CONNECTIONS END ---'
 
+
+
 class SQLStoreBuilder(object):
     """
     Test-fixture-builder which can construct a PostgresStore.
@@ -79,18 +81,18 @@
                     pass
                 try:
                     self.store = CommonDataStore(
-                        lambda label=None: connectionFactory(
-                            label or currentTestID
-                        ),
+                        self.sharedService.produceLocalTransaction,
                         notifierFactory,
                         attachmentRoot
                     )
+                    self.store.label = currentTestID
                 except:
                     ready.errback()
                     raise
                 else:
-                    self.cleanDatabase(testCase)
-                    ready.callback(self.store)
+                    def readyNow(ignored):
+                        ready.callback(self.store)
+                    return self.cleanDatabase(testCase).addCallback(readyNow)
                 return self.store
             self.sharedService = PostgresService(
                 dbRoot, getReady, v1_schema, resetSchema=True,
@@ -107,8 +109,9 @@
             result = ready
         else:
             self.store.notifierFactory = notifierFactory
-            self.cleanDatabase(testCase)
-            result = succeed(self.store)
+            result = self.cleanDatabase(testCase).addCallback(
+                lambda ignored: self.store
+            )
 
         def cleanUp():
             # FIXME: clean up any leaked connections and report them with an
@@ -120,11 +123,13 @@
         return result
 
 
+    @inlineCallbacks
     def cleanDatabase(self, testCase):
-        cleanupConn = self.store.connectionFactory(
+        cleanupTxn = self.store.sqlTxnFactory(
             "%s schema-cleanup" % (testCase.id(),)
         )
-        cursor = cleanupConn.cursor()
+        # TODO: should be getting these tables from a declaration of the schema
+        # somewhere.
         tables = ['INVITE',
                   'RESOURCE_PROPERTY',
                   'ATTACHMENT',
@@ -143,16 +148,16 @@
                   'NOTIFICATION_HOME']
         for table in tables:
             try:
-                cursor.execute("delete from "+table)
+                yield cleanupTxn.execSQL("delete from "+table, [])
             except:
                 log.err()
-        cleanupConn.commit()
-        cleanupConn.close()
+        yield cleanupTxn.commit()
 
 theStoreBuilder = SQLStoreBuilder()
 buildStore = theStoreBuilder.buildStore
 
 
+
 @inlineCallbacks
 def populateCalendarsFrom(requirements, store):
     """
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20101101/5f659bb9/attachment-0001.html>


More information about the calendarserver-changes mailing list