[CalendarServer-changes] [6877] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Thu Feb 3 16:42:52 PST 2011


Revision: 6877
          http://trac.macosforge.org/projects/calendarserver/changeset/6877
Author:   glyph at apple.com
Date:     2011-02-03 16:42:52 -0800 (Thu, 03 Feb 2011)
Log Message:
-----------
Fix various issues with database connectivity and service shutdown.

Modified Paths:
--------------
    CalendarServer/trunk/calendarserver/tap/caldav.py
    CalendarServer/trunk/twext/enterprise/adbapi2.py
    CalendarServer/trunk/twext/enterprise/ienterprise.py
    CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py

Property Changed:
----------------
    CalendarServer/trunk/


Property changes on: CalendarServer/trunk
___________________________________________________________________
Modified: svn:mergeinfo
   - /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/wsanchez/transations:5515-5593
   + /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/wsanchez/transations:5515-5593

Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py	2011-02-03 23:42:00 UTC (rev 6876)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py	2011-02-04 00:42:52 UTC (rev 6877)
@@ -39,7 +39,7 @@
 from twisted.python.usage import Options, UsageError
 from twisted.python.reflect import namedClass
 from twisted.plugin import IPlugin
-from twisted.internet.defer import gatherResults
+from twisted.internet.defer import gatherResults, Deferred
 from twisted.internet import reactor as _reactor
 from twisted.internet.reactor import addSystemEventTrigger
 from twisted.internet.process import ProcessExitedAlready
@@ -1428,6 +1428,8 @@
         """
         self.stopping = True
         self.deferreds = {}
+        for name in self.processes:
+            self.deferreds[name] = Deferred()
         super(DelayedStartupProcessMonitor, self).stopService()
 
         # Cancel any outstanding restarts
@@ -1503,7 +1505,7 @@
                                                          self.startProcess,
                                                          name)
         if self.stopping:
-            deferred = self.deferreds.get(name, None)
+            deferred = self.deferreds.pop(name, None)
             if deferred is not None:
                 deferred.callback(None)
 

Modified: CalendarServer/trunk/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/adbapi2.py	2011-02-03 23:42:00 UTC (rev 6876)
+++ CalendarServer/trunk/twext/enterprise/adbapi2.py	2011-02-04 00:42:52 UTC (rev 6877)
@@ -1,4 +1,4 @@
-# -*- test-case-name: twext.enterprise.test. -*-
+# -*- test-case-name: twext.enterprise.test.test_adbapi2 -*-
 ##
 # Copyright (c) 2010 Apple Inc. All rights reserved.
 #
@@ -50,50 +50,36 @@
 from twisted.python.components import proxyForInterface
 
 from twext.internet.threadutils import ThreadHolder
+from twisted.internet.defer import succeed
+from twext.enterprise.ienterprise import ConnectionError
+from twisted.internet.defer import fail
 from twext.enterprise.ienterprise import AlreadyFinishedError, IAsyncTransaction
 
 
-# FIXME: there should be no default, it should be discovered dynamically
-# everywhere.  Right now we're only using pgdb so we only support that.
+# FIXME: there should be no default for DEFAULT_PARAM_STYLE, it should be
+# discovered dynamically everywhere.  Right now we're only using pgdb so we only
+# support that.
 
 DEFAULT_PARAM_STYLE = 'pyformat'
 
-class BaseSqlTxn(object):
+class _ConnectedTxn(object):
     """
     L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the
     current process.
     """
     implements(IAsyncTransaction)
 
-    # FIXME: this should *really* be 
+    # See DEFAULT_PARAM_STYLE FIXME above.
     paramstyle = DEFAULT_PARAM_STYLE
 
-    def __init__(self, connectionFactory, reactor=_reactor):
-        """
-        @param connectionFactory: A 0-argument callable which returns a DB-API
-            2.0 connection.
-        """
-        self._completed = False
-        self._cursor = None
-        self._holder = ThreadHolder(reactor)
-        self._holder.start()
+    def __init__(self, pool, threadHolder, connection, cursor):
+        self._pool       = pool
+        self._completed  = True
+        self._cursor     = cursor
+        self._connection = connection
+        self._holder     = threadHolder
 
-        def initCursor():
-            # support threadlevel=1; we can't necessarily cursor() in a
-            # different thread than we do transactions in.
 
-            # TODO: Re-try connect when it fails.  Specify a timeout.  That
-            # should happen in this layer because we need to be able to stop
-            # the reconnect attempt if it's hanging.
-            self._connection = connectionFactory()
-            self._cursor = self._connection.cursor()
-
-        # Note: no locking necessary here; since this gets submitted first, all
-        # subsequent submitted work-units will be in line behind it and the
-        # cursor will already have been initialized.
-        self._holder.submit(initCursor).addErrback(log.err)
-
-
     def _reallyExecSQL(self, sql, args=None, raiseOnZeroRowCount=None):
         if args is None:
             args = []
@@ -125,31 +111,33 @@
         return result
 
 
-    def commit(self):
+    def _end(self, really):
+        """
+        Common logic for commit or abort.
+        """
         if not self._completed:
             self._completed = True
-            def reallyCommit():
-                self._connection.commit()
-            result = self._holder.submit(reallyCommit)
+            def reallySomething():
+                if self._cursor is None:
+                    return
+                really()
+            result = self._holder.submit(reallySomething)
+            self._pool._repoolAfter(self, result)
             return result
         else:
             raise AlreadyFinishedError()
 
 
+    def commit(self):
+        return self._end(self._connection.commit)
+
+
     def abort(self):
-        if not self._completed:
-            self._completed = True
-            def reallyAbort():
-                self._connection.rollback()
-            result = self._holder.submit(reallyAbort)
-            return result
-        else:
-            raise AlreadyFinishedError()
+        return self._end(self._connection.rollback)
 
 
     def __del__(self):
         if not self._completed:
-            print 'BaseSqlTxn.__del__: OK'
             self.abort()
 
 
@@ -165,33 +153,55 @@
         self._completed = False
 
 
-    def stop(self):
+    def _releaseConnection(self):
         """
         Release the thread and database connection associated with this
         transaction.
         """
         self._completed = True
-        self._stopped = True
-        holder = self._holder
-        self._holder = None
-        holder.submit(self._connection.close)
+        self._stopped   = True
+        holder          = self._holder
+        self._holder    = None
+
+        def _reallyClose():
+            if self._cursor is None:
+                return
+            self._connection.close()
+        holder.submit(_reallyClose)
         return holder.stop()
 
 
+class _NoTxn(object):
+    """
+    An L{IAsyncTransaction} that indicates a local failure before we could even
+    communicate any statements (or possibly even any connection attempts) to the
+    server.
+    """
+    implements(IAsyncTransaction)
 
-class SpooledTxn(object):
+    def _everything(self, *a, **kw):
+        """
+        Everything fails with a L{ConnectionError}.
+        """
+        return fail(ConnectionError())
+
+    execSQL = _everything
+    commit  = _everything
+    abort   = _everything
+
+
+
+class _WaitingTxn(object):
     """
-    A L{SpooledTxn} is an implementation of L{IAsyncTransaction} which cannot
-    yet actually execute anything, so it spools SQL reqeusts for later
-    execution.  When a L{BaseSqlTxn} becomes available later, it can be
+    A L{_WaitingTxn} is an implementation of L{IAsyncTransaction} which cannot
+    yet actually execute anything, so it waits and spools SQL requests for later
+    execution.  When a L{_ConnectedTxn} becomes available later, it can be
     unspooled onto that.
     """
 
     implements(IAsyncTransaction)
 
-    # FIXME: this should be relayed from the connection factory of the thing
-    # creating the spooled transaction.
-
+    # See DEFAULT_PARAM_STYLE FIXME above.
     paramstyle = DEFAULT_PARAM_STYLE
 
     def __init__(self):
@@ -242,17 +252,25 @@
 
 
     def abort(self):
-        return self._enspool('abort')
+        return succeed(None)
 
 
 
-class PooledSqlTxn(proxyForInterface(iface=IAsyncTransaction,
+class _SingleTxn(proxyForInterface(iface=IAsyncTransaction,
                                      originalAttribute='_baseTxn')):
     """
-    This is a temporary throw-away wrapper for the longer-lived BaseSqlTxn, so
-    that if a badly-behaved API client accidentally hangs on to one of these
-    and, for example C{.abort()}s it multiple times once another client is
-    using that connection, it will get some harmless tracebacks.
+    A L{_SingleTxn} is a single-use wrapper for the longer-lived
+    L{_ConnectedTxn}, so that if a badly-behaved API client accidentally hangs
+    on to one of these and, for example C{.abort()}s it multiple times once
+    another client is using that connection, it will get some harmless
+    tracebacks.
+
+    It's a wrapper around a "real" implementation; either a L{_ConnectedTxn},
+    L{_NoTxn}, or L{_WaitingTxn} depending on the availability of real
+    underlying datbase connections.
+
+    This is the only L{IAsyncTransaction} implementation exposed to application
+    code.
     """
 
     def __init__(self, pool, baseTxn):
@@ -261,21 +279,50 @@
         self._complete = False
 
 
+    def __repr__(self):
+        """
+        Reveal the backend in the string representation.
+        """
+        return '_SingleTxn(%r)' % (self._baseTxn,)
+
+
+    def _unspoolOnto(self, baseTxn):
+        """
+        Replace my C{_baseTxn}, currently a L{_WaitingTxn}, with a new
+        implementation of L{IAsyncTransaction} that will actually do the work;
+        either a L{_ConnectedTxn} or a L{_NoTxn}.
+        """
+        spooledBase   = self._baseTxn
+        self._baseTxn = baseTxn
+        spooledBase._unspool(baseTxn)
+
+
     def execSQL(self, *a, **kw):
         self._checkComplete()
-        return super(PooledSqlTxn, self).execSQL(*a, **kw)
+        return super(_SingleTxn, self).execSQL(*a, **kw)
 
 
     def commit(self):
         self._markComplete()
-        return self._repoolAfter(super(PooledSqlTxn, self).commit())
+        return super(_SingleTxn, self).commit()
 
 
     def abort(self):
         self._markComplete()
-        return self._repoolAfter(super(PooledSqlTxn, self).abort())
+        if self in self._pool._waiting:
+            self._stopWaiting()
+            return succeed(None)
+        return super(_SingleTxn, self).abort()
 
 
+    def _stopWaiting(self):
+        """
+        Stop waiting for a free transaction and fail.
+        """
+        self._pool._waiting.remove(self)
+        self._unspoolOnto(_NoTxn())
+
+
     def _checkComplete(self):
         """
         If the transaction is complete, raise L{AlreadyFinishedError}
@@ -292,14 +339,43 @@
         self._complete = True
 
 
-    def _repoolAfter(self, d):
-        def repool(result):
-            self._pool.reclaim(self)
-            return result
-        return d.addCallback(repool)
 
+class _ConnectingPseudoTxn(object):
 
+    _retry = None
 
+    def __init__(self, pool, holder):
+        self._pool   = pool
+        self._holder = holder
+
+
+    def abort(self):
+        if self._retry is not None:
+            self._retry.cancel()
+        d = self._holder.stop()
+        def removeme(ignored):
+            if self in self._pool._busy:
+                self._pool._busy.remove(self)
+        d.addCallback(removeme)
+        return d
+
+
+
+def _fork(x):
+    """
+    Produce a L{Deferred} that will fire when another L{Deferred} fires without
+    disturbing its results.
+    """
+    d = Deferred()
+    def fired(result):
+        d.callback(result)
+        return result
+
+    x.addBoth(fired)
+    return d
+
+
+
 class ConnectionPool(Service, object):
     """
     This is a central service that has a threadpool and executes SQL statements
@@ -313,19 +389,50 @@
         than this many concurrent connections to the database.
 
     @type maxConnections: C{int}
+
+    @ivar reactor: The reactor used for scheduling threads as well as retries
+        for failed connect() attempts.
+
+    @type reactor: L{IReactorTime} and L{IReactorThreads} provider.
+
+    @ivar _free: The list of free L{_ConnectedTxn} objects which are not
+        currently attached to a L{_SingleTxn} object, and have active
+        connections ready for processing a new transaction.
+
+    @ivar _busy: The list of busy L{_ConnectedTxn} objects; those currently
+        servicing an unfinished L{_SingleTxn} object.
+
+    @ivar _finishing: The list of 2-tuples of L{_ConnectedTxn} objects which
+        have had C{abort} or C{commit} called on them, but are not done
+        executing that method, and the L{Deferred} returned from that method
+        that will be fired when its execution has completed.
+
+    @ivar _waiting: The list of L{_SingleTxn} objects attached to a
+        L{_WaitingTxn}; i.e. those which are awaiting a connection to become
+        free so that they can be executed.
+
+    @ivar _stopping: Is this L{ConnectionPool} in the process of shutting down?
+        (If so, new connections will not be established.)
     """
 
     reactor = _reactor
 
+    RETRY_TIMEOUT = 10.0
+
+
     def __init__(self, connectionFactory, maxConnections=10):
+
         super(ConnectionPool, self).__init__()
-        self.free = []
-        self.busy = []
-        self.waiting = []
         self.connectionFactory = connectionFactory
         self.maxConnections = maxConnections
 
+        self._free       = []
+        self._busy       = []
+        self._waiting    = []
+        self._finishing  = []
+        self._stopping   = False
 
+
     def startService(self):
         """
         No startup necessary.
@@ -335,66 +442,140 @@
     @inlineCallbacks
     def stopService(self):
         """
-        Forcibly abort any outstanding transactions.
+        Forcibly abort any outstanding transactions, and release all resources
+        (notably, threads).
         """
-        for busy in self.busy[:]:
+        self._stopping = True
+
+        # Phase 1: Cancel any transactions that are waiting so they won't try to
+        # eagerly acquire new connections as they flow into the free-list.
+        while self._waiting:
+            waiting = self._waiting[0]
+            waiting._stopWaiting()
+
+        # Phase 2: Wait for all the Deferreds from the L{_ConnectedTxn}s that
+        # have *already* been stopped.
+        while self._finishing:
+            yield _fork(self._finishing[0][1])
+
+        # Phase 3: All of the busy transactions must be aborted first.  As each
+        # one is aborted, it will remove itself from the list.
+        while self._busy:
+            d = self._busy[0].abort()
             try:
-                yield busy.abort()
+                yield d
             except:
                 log.err()
-        # all transactions should now be in the free list, since 'abort()' will
-        # have put them there.
-        for free in self.free:
-            yield free.stop()
 
+        # Phase 4: All transactions should now be in the free list, since
+        # 'abort()' will have put them there.  Shut down all the associated
+        # ThreadHolders.
+        while self._free:
+            # Releasing a L{_ConnectedTxn} doesn't automatically recycle it /
+            # remove it the way aborting a _SingleTxn does, so we need to .pop()
+            # here.  L{_ConnectedTxn.stop} really shouldn't be able to fail, as
+            # it's just stopping the thread, and the holder's stop() is
+            # independently submitted from .abort() / .close().
+            yield self._free.pop()._releaseConnection()
 
+
+    def _createHolder(self):
+        """
+        Create a L{ThreadHolder}.  (Test hook.)
+        """
+        return ThreadHolder(self.reactor)
+
+
     def connection(self, label="<unlabeled>"):
         """
-        Find a transaction; either retrieve a free one from the list or
-        allocate a new one if no free ones are available.
+        Find and immediately return an L{IAsyncTransaction} object.  Execution
+        of statements, commit and abort on that transaction may be delayed until
+        a real underlying database connection is available.
 
         @return: an L{IAsyncTransaction}
         """
-
-        overload = False
-        if self.free:
-            basetxn = self.free.pop(0)
-        elif len(self.busy) < self.maxConnections:
-            basetxn = BaseSqlTxn(
-                connectionFactory=self.connectionFactory,
-                reactor=self.reactor
-            )
+        if self._stopping:
+            return _NoTxn()
+        if self._free:
+            basetxn = self._free.pop(0)
+            self._busy.append(basetxn)
+            txn = _SingleTxn(self, basetxn)
         else:
-            basetxn = SpooledTxn()
-            overload = True
-        txn = PooledSqlTxn(self, basetxn)
-        if overload:
-            self.waiting.append(txn)
-        else:
-            self.busy.append(txn)
+            txn = _SingleTxn(self, _WaitingTxn())
+            self._waiting.append(txn)
+            # FIXME/TESTME: should be len(self._busy) + len(self._finishing)
+            # (free doesn't need to be considered, as it's tested above)
+            if self._activeConnectionCount() < self.maxConnections:
+                self._startOneMore()
         return txn
 
 
-    def reclaim(self, txn):
+    def _activeConnectionCount(self):
         """
-        Shuck the L{PooledSqlTxn} wrapper off, and recycle the underlying
-        BaseSqlTxn into the free list.
+        @return: the number of active outgoing connections to the database.
         """
-        baseTxn = txn._baseTxn
-        baseTxn.reset()
-        self.busy.remove(txn)
-        if self.waiting:
-            waiting = self.waiting.pop(0)
-            waiting._baseTxn._unspool(baseTxn)
-            # Note: although commit() may already have been called, we don't
-            # have to handle it specially here.  It only unspools after the
-            # deferred returned by commit() has actually been called, and since
-            # that occurs in a callFromThread, that won't happen until the next
-            # iteration of the mainloop, when the _baseTxn is safely correct.
-            waiting._baseTxn = baseTxn
-            self.busy.append(waiting)
+        return len(self._busy) + len(self._finishing)
+
+
+    def _startOneMore(self):
+        """
+        Start one more _ConnectedTxn.
+        """
+        holder = self._createHolder()
+        holder.start()
+        txn = _ConnectingPseudoTxn(self, holder)
+        # take up a slot in the 'busy' list, sit there so we can be aborted.
+        self._busy.append(txn)
+        def initCursor():
+            # support threadlevel=1; we can't necessarily cursor() in a
+            # different thread than we do transactions in.
+            connection = self.connectionFactory()
+            cursor     = connection.cursor()
+            return (connection, cursor)
+        def finishInit((connection, cursor)):
+            baseTxn = _ConnectedTxn(
+                pool=self,
+                threadHolder=holder,
+                connection=connection,
+                cursor=cursor
+            )
+            self._busy.remove(txn)
+            self._repoolNow(baseTxn)
+        def maybeTryAgain(f):
+            log.err(f, "Re-trying connection due to connection failure")
+            txn._retry = self.reactor.callLater(self.RETRY_TIMEOUT, resubmit)
+        def resubmit():
+            d = holder.submit(initCursor)
+            d.addCallbacks(finishInit, maybeTryAgain)
+        resubmit()
+
+
+    def _repoolAfter(self, txn, d):
+        """
+        Re-pool the given L{_ConnectedTxn} after the given L{Deferred} has
+        fired.
+        """
+        self._busy.remove(txn)
+        finishRecord = (txn, d)
+        self._finishing.append(finishRecord)
+        def repool(result):
+            self._finishing.remove(finishRecord)
+            self._repoolNow(txn)
+            return result
+        return d.addBoth(repool)
+
+
+    def _repoolNow(self, txn):
+        """
+        Recycle a L{_ConnectedTxn} into the free list.
+        """
+        txn.reset()
+        if self._waiting:
+            waiting = self._waiting.pop(0)
+            self._busy.append(txn)
+            waiting._unspoolOnto(txn)
         else:
-            self.free.append(baseTxn)
+            self._free.append(txn)
 
 
 
@@ -483,7 +664,7 @@
 
 class QueryComplete(Command):
     """
-    A query issued with ExecSQL is complete.
+    A query issued with L{ExecSQL} is complete.
     """
 
     arguments = [('queryID', String()),
@@ -518,7 +699,7 @@
         Initialize a mapping of transaction IDs to transaction objects.
         """
         super(ConnectionPoolConnection, self).__init__()
-        self.pool = pool
+        self.pool  = pool
         self._txns = {}
 
 
@@ -574,16 +755,24 @@
     """
     def __init__(self):
         super(ConnectionPoolClient, self).__init__()
-        self._nextID = count().next
-        self._txns = {}
+        self._nextID  = count().next
+        self._txns    = {}
         self._queries = {}
 
 
     def newTransaction(self):
-        txnid = str(self._nextID())
+        """
+        Create a new networked provider of L{IAsyncTransaction}.
+
+        (This will ultimately call L{ConnectionPool.connection} on the other end
+        of the wire.)
+
+        @rtype: L{IAsyncTransaction}
+        """
+        txnid             = str(self._nextID())
+        txn               = _NetTransaction(client=self, transactionID=txnid)
+        self._txns[txnid] = txn
         self.callRemote(StartTxn, transactionID=txnid)
-        txn = Transaction(client=self, transactionID=txnid)
-        self._txns[txnid] = txn
         return txn
 
 
@@ -602,8 +791,8 @@
 
 class _Query(object):
     def __init__(self, raiseOnZeroRowCount):
-        self.results = []
-        self.deferred = Deferred()
+        self.results             = []
+        self.deferred            = Deferred()
         self.raiseOnZeroRowCount = raiseOnZeroRowCount
 
 
@@ -628,15 +817,16 @@
 
 
 
-class Transaction(object):
+class _NetTransaction(object):
     """
-    Async protocol-based transaction implementation.
+    A L{_NetTransaction} is an L{AMP}-protocol-based provider of the
+    L{IAsyncTransaction} interface.  It sends SQL statements, query results, and
+    commit/abort commands via an AMP socket to a pooling process.
     """
 
     implements(IAsyncTransaction)
 
-    # FIXME: this needs to come from the other end of the wire.
-
+    # See DEFAULT_PARAM_STYLE FIXME above.
     paramstyle = DEFAULT_PARAM_STYLE
 
     def __init__(self, client, transactionID):
@@ -644,9 +834,9 @@
         Initialize a transaction with a L{ConnectionPoolClient} and a unique
         transaction identifier.
         """
-        self._client = client
+        self._client        = client
         self._transactionID = transactionID
-        self._completed = False
+        self._completed     = False
 
 
     def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):

Modified: CalendarServer/trunk/twext/enterprise/ienterprise.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/ienterprise.py	2011-02-03 23:42:00 UTC (rev 6876)
+++ CalendarServer/trunk/twext/enterprise/ienterprise.py	2011-02-04 00:42:52 UTC (rev 6877)
@@ -33,6 +33,13 @@
 
 
 
+class ConnectionError(Exception):
+    """
+    An error occurred with the underlying database connection.
+    """
+
+
+
 class IAsyncTransaction(Interface):
     """
     Asynchronous execution of SQL.

Modified: CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py	2011-02-03 23:42:00 UTC (rev 6876)
+++ CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py	2011-02-04 00:42:52 UTC (rev 6877)
@@ -22,22 +22,50 @@
 
 from twisted.trial.unittest import TestCase
 
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import execute
+from twisted.internet.task import Clock
 
+from twisted.internet.defer import Deferred
+from twext.enterprise.ienterprise import ConnectionError
 from twext.enterprise.adbapi2 import ConnectionPool
 
 
+def resultOf(deferred, propagate=False):
+    """
+    Add a callback and errback which will capture the result of a L{Deferred} in
+    a list, and return that list.  If 'propagate' is True, pass through the
+    results.
+    """
+    results = []
+    if propagate:
+        def cb(r):
+            results.append(r)
+            return r
+    else:
+        cb = results.append
+    deferred.addBoth(cb)
+    return results
+
+
+
 class Child(object):
+    """
+    An object with a L{Parent}, in its list of C{children}.
+    """
     def __init__(self, parent):
+        self.closed = False
         self.parent = parent
         self.parent.children.append(self)
 
     def close(self):
-        self.parent.children.remove(self)
+        self.closed = True
 
 
 
 class Parent(object):
+    """
+    An object with a list of L{Child}ren.
+    """
 
     def __init__(self):
         self.children = []
@@ -69,14 +97,31 @@
 
 
     def commit(self):
-        return
+        if self.parent.commitFail:
+            self.parent.commitFail = False
+            raise CommitFail()
 
 
     def rollback(self):
-        return
+        if self.parent.rollbackFail:
+            self.parent.rollbackFail = False
+            raise RollbackFail()
 
 
+class RollbackFail(Exception):
+    """
+    Sample rollback-failure exception.
+    """
 
+
+
+class CommitFail(Exception):
+    """
+    Sample Commit-failure exception.
+    """
+
+
+
 class FakeCursor(Child):
     """
     Fake stand-in for a DB-API 2.0 cursor.
@@ -110,10 +155,17 @@
 
 
 class ConnectionFactory(Parent):
+
+    rollbackFail = False
+    commitFail = False
+
     def __init__(self):
         Parent.__init__(self)
         self.idcounter = count(1)
+        self._resultQueue = []
+        self.defaultConnect()
 
+
     @property
     def connections(self):
         "Alias to make tests more readable."
@@ -121,55 +173,488 @@
 
 
     def connect(self):
-        return FakeConnection(self)
+        """
+        Implement the C{ConnectionFactory} callable expected by
+        L{ConnectionPool}.
+        """
+        if self._resultQueue:
+            thunk = self._resultQueue.pop(0)
+        else:
+            thunk = self._default
+        return thunk()
 
 
+    def willConnect(self):
+        """
+        Used by tests to queue a successful result for connect().
+        """
+        def thunk():
+            return FakeConnection(self)
+        self._resultQueue.append(thunk)
 
+
+    def willFail(self):
+        """
+        Used by tests to queue a successful result for connect().
+        """
+        def thunk():
+            raise FakeConnectionError()
+        self._resultQueue.append(thunk)
+
+
+    def defaultConnect(self):
+        """
+        By default, connection attempts will succeed.
+        """
+        self.willConnect()
+        self._default = self._resultQueue.pop()
+
+
+    def defaultFail(self):
+        """
+        By default, connection attempts will fail.
+        """
+        self.willFail()
+        self._default = self._resultQueue.pop()
+
+
+
+class FakeConnectionError(Exception):
+    """
+    Synthetic error that might occur during connection.
+    """
+
+
+
+class FakeThreadHolder(object):
+    """
+    Run things submitted to this ThreadHolder on the main thread, so that
+    execution is easier to control.
+    """
+
+    def __init__(self, test):
+        self.started = False
+        self.stopped = False
+        self.test = test
+        self.queue = []
+
+
+    def start(self):
+        """
+        Mark this L{FakeThreadHolder} as not started.
+        """
+        self.started = True
+
+
+    def stop(self):
+        """
+        Mark this L{FakeThreadHolder} as stopped.
+        """
+        def stopped(nothing):
+            self.stopped = True
+        return self.submit(lambda : None).addCallback(stopped)
+
+
+    def submit(self, work):
+        """
+        Call the function (or queue it)
+        """
+        if self.test.paused:
+            d = Deferred()
+            self.queue.append((d, work))
+            return d
+        else:
+            return execute(work)
+
+
+    def flush(self):
+        """
+        Fire all deferreds previously returned from submit.
+        """
+        self.queue, queue = [], self.queue
+        for (d, work) in queue:
+            try:
+                result = work()
+            except:
+                d.errback()
+            else:
+                d.callback(result)
+
+
+
 class ConnectionPoolTests(TestCase):
+    """
+    Tests for L{ConnectionPool}.
+    """
 
-    @inlineCallbacks
+    def setUp(self):
+        """
+        Create a L{ConnectionPool} attached to a C{ConnectionFactory}.  Start
+        the L{ConnectionPool}.
+        """
+        self.paused             = False
+        self.holders            = []
+        self.factory            = ConnectionFactory()
+        self.pool               = ConnectionPool(self.factory.connect,
+                                                 maxConnections=2)
+        self.pool._createHolder = self.makeAHolder
+        self.clock              = self.pool.reactor = Clock()
+        self.pool.startService()
+
+
+    def tearDown(self):
+        """
+        Make sure the service is stopped and the fake ThreadHolders are all
+        executing their queues so failed tests can exit cleanly.
+        """
+        self.flushHolders()
+
+
+    def flushHolders(self):
+        """
+        Flush all pending C{submit}s since C{pauseHolders} was called.
+        """
+        self.paused = False
+        for holder in self.holders:
+            holder.flush()
+
+
+    def pauseHolders(self):
+        """
+        Pause all L{FakeThreadHolder}s, causing C{submit} to return an unfired
+        L{Deferred}.
+        """
+        self.paused = True
+
+
+    def makeAHolder(self):
+        """
+        Make a ThreadHolder-alike.
+        """
+        fth = FakeThreadHolder(self)
+        self.holders.append(fth)
+        return fth
+
+
     def test_tooManyConnections(self):
         """
         When the number of outstanding busy transactions exceeds the number of
         slots specified by L{ConnectionPool.maxConnections},
-        L{ConnectionPool.connection} will return a L{PooledSqlTxn} that is not
-        backed by any L{BaseSqlTxn}; this object will queue its SQL statements
-        until an existing connection becomes available.
+        L{ConnectionPool.connection} will return a pooled transaction that is
+        not backed by any real database connection; this object will queue its
+        SQL statements until an existing connection becomes available.
         """
-        cf = ConnectionFactory()
-        cp = ConnectionPool(cf.connect, maxConnections=2)
-        cp.startService()
-        self.addCleanup(cp.stopService)
-        a = cp.connection()
-        [[counter, echo]] = yield a.execSQL("alpha")
-        b = cp.connection()
-        [[bcounter, becho]] = yield b.execSQL("beta")
+        a = self.pool.connection()
 
+        alphaResult = resultOf(a.execSQL("alpha"))
+        [[counter, echo]] = alphaResult[0]
+
+        b = self.pool.connection()
+        # 'b' should have opened a connection.
+        self.assertEquals(len(self.factory.connections), 2)
+        betaResult = resultOf(b.execSQL("beta"))
+        [[bcounter, becho]] = betaResult[0]
+
         # both 'a' and 'b' are holding open a connection now; let's try to open
         # a third one.  (The ordering will be deterministic even if this fails,
         # because those threads are already busy.)
-        c = cp.connection()
-        enqueue = c.execSQL("gamma")
-        x = []
-        def addtox(it):
-            x.append(it)
-            return it
-        enqueue.addCallback(addtox)
+        c = self.pool.connection()
+        gammaResult = resultOf(c.execSQL("gamma"))
 
         # Did 'c' open a connection?  Let's hope not...
-        self.assertEquals(len(cf.connections), 2)
-        # This assertion is _not_ deterministic, unfortunately; it's unlikely
-        # that the implementation could be adjusted such that this assertion
-        # would fail and the others would succeed.  However, if it does fail,
-        # that's really bad, so I am leaving it regardless.
-        self.failIf(bool(x), "SQL executed too soon!")
-        yield b.commit()
+        self.assertEquals(len(self.factory.connections), 2)
+        # SQL shouldn't be executed too soon...
+        self.assertEquals(gammaResult, [])
 
+        commitResult = resultOf(b.commit())
+
         # Now that 'b' has committed, 'c' should be able to complete.
-        [[ccounter, cecho]] = yield enqueue
+        [[ccounter, cecho]] = gammaResult[0]
 
-        # The connection for 'a' ought to be busy, so let's make sure we're
-        # using the one for 'c'.
+        # The connection for 'a' ought to still be busy, so let's make sure
+        # we're using the one for 'c'.
         self.assertEquals(ccounter, bcounter)
 
+        # Sanity check: the commit should have succeded!
+        self.assertEquals(commitResult, [None])
 
+
+    def test_stopService(self):
+        """
+        L{ConnectionPool.stopService} stops all the associated L{ThreadHolder}s
+        and thereby frees up the resources it is holding.
+        """
+        a = self.pool.connection()
+        [[[counter, echo]]] = resultOf(a.execSQL("alpha"))
+        self.assertEquals(len(self.factory.connections), 1)
+        self.assertEquals(len(self.holders), 1)
+        [holder] = self.holders
+        self.assertEquals(holder.started, True)
+        self.assertEquals(holder.stopped, False)
+        self.pool.stopService()
+        self.assertEquals(len(self.holders), 1)
+        self.assertEquals(holder.started, True)
+        self.assertEquals(holder.stopped, True)
+        # Closing fake connections removes them from the list.
+        self.assertEquals(len(self.factory.connections), 1)
+        self.assertEquals(self.factory.connections[0].closed, True)
+
+
+    def test_retryAfterConnectError(self):
+        """
+        When the C{connectionFactory} passed to L{ConnectionPool} raises an
+        exception, the L{ConnectionPool} will log the exception and delay
+        execution of a new connection's SQL methods until an attempt succeeds.
+        """
+        self.factory.willFail()
+        self.factory.willFail()
+        self.factory.willConnect()
+        c = self.pool.connection()
+        def checkOneFailure():
+            errors = self.flushLoggedErrors(FakeConnectionError)
+            self.assertEquals(len(errors), 1)
+        checkOneFailure()
+        d = c.execSQL("alpha")
+        happened = []
+        d.addBoth(happened.append)
+        self.assertEquals(happened, [])
+        self.clock.advance(self.pool.RETRY_TIMEOUT + 0.01)
+        checkOneFailure()
+        self.assertEquals(happened, [])
+        self.clock.advance(self.pool.RETRY_TIMEOUT + 0.01)
+        self.assertEquals(happened, [[[1, "alpha"]]])
+
+
+    def test_shutdownDuringRetry(self):
+        """
+        If a L{ConnectionPool} is attempting to shut down while it's in the
+        process of re-trying a connection attempt that received an error, the
+        connection attempt should be cancelled and the shutdown should complete
+        as normal.
+        """
+        self.factory.defaultFail()
+        self.pool.connection()
+        errors = self.flushLoggedErrors(FakeConnectionError)
+        self.assertEquals(len(errors), 1)
+        stopd = []
+        self.pool.stopService().addBoth(stopd.append)
+        self.assertEquals([None], stopd)
+        self.assertEquals(self.clock.calls, [])
+        [holder] = self.holders
+        self.assertEquals(holder.started, True)
+        self.assertEquals(holder.stopped, True)
+
+
+    def test_shutdownDuringAttemptSuccess(self):
+        """
+        If L{ConnectionPool.stopService} is called while a connection attempt is
+        outstanding, the resulting L{Deferred} won't be fired until the
+        connection attempt has finished; in this case, succeeded.
+        """
+        self.pauseHolders()
+        self.pool.connection()
+        stopd = []
+        self.pool.stopService().addBoth(stopd.append)
+        self.assertEquals(stopd, [])
+        self.flushHolders()
+        self.assertEquals(stopd, [None])
+        [holder] = self.holders
+        self.assertEquals(holder.started, True)
+        self.assertEquals(holder.stopped, True)
+
+
+    def test_shutdownDuringAttemptFailed(self):
+        """
+        If L{ConnectionPool.stopService} is called while a connection attempt is
+        outstanding, the resulting L{Deferred} won't be fired until the
+        connection attempt has finished; in this case, failed.
+        """
+        self.factory.defaultFail()
+        self.pauseHolders()
+        self.pool.connection()
+        stopd = []
+        self.pool.stopService().addBoth(stopd.append)
+        self.assertEquals(stopd, [])
+        self.flushHolders()
+        errors = self.flushLoggedErrors(FakeConnectionError)
+        self.assertEquals(len(errors), 1)
+        self.assertEquals(stopd, [None])
+        [holder] = self.holders
+        self.assertEquals(holder.started, True)
+        self.assertEquals(holder.stopped, True)
+
+
+    def test_stopServiceMidAbort(self):
+        """
+        When L{ConnectionPool.stopService} is called with deferreds from
+        C{abort} still outstanding, it will wait for the currently-aborting
+        transaction to fully abort before firing the L{Deferred} returned from
+        C{stopService}.
+        """
+        # TODO: commit() too?
+        self.pauseHolders()
+        c = self.pool.connection()
+        abortResult = resultOf(c.abort())
+        # Should abort instantly, as it hasn't managed to unspool anything yet.
+        # FIXME: kill all Deferreds associated with this thing, make sure that
+        # any outstanding query callback chains get nuked.
+        self.assertEquals(abortResult, [None])
+        stopResult = resultOf(self.pool.stopService())
+        self.assertEquals(stopResult, [])
+        self.flushHolders()
+        #self.assertEquals(abortResult, [None])
+        self.assertEquals(stopResult, [None])
+
+
+    def test_stopServiceWithSpooled(self):
+        """
+        When L{ConnectionPool.stopService} is called when spooled transactions
+        are outstanding, any pending L{Deferreds} returned by those transactions
+        will be failed with L{ConnectionError}.
+        """
+        # Use up the free slots so we have to spool.
+        hold = []
+        hold.append(self.pool.connection())
+        hold.append(self.pool.connection())
+
+        c = self.pool.connection()
+        se = resultOf(c.execSQL("alpha"))
+        ce = resultOf(c.commit())
+        self.assertEquals(se, [])
+        self.assertEquals(ce, [])
+        self.pool.stopService()
+        self.assertEquals(se[0].type, ConnectionError)
+        self.assertEquals(ce[0].type, ConnectionError)
+
+
+    def test_repoolSpooled(self):
+        """
+        Regression test for a somewhat tricky-to-explain bug: when a spooled
+        transaction which has already had commit() called on it before it's
+        received a real connection to start executing on, it will not leave
+        behind any detritus that prevents stopService from working.
+        """
+        self.pauseHolders()
+        c = self.pool.connection()
+        c2 = self.pool.connection()
+        c3 = self.pool.connection()
+        c.commit()
+        c2.commit()
+        c3.commit()
+        self.flushHolders()
+        self.assertEquals(len(self.factory.connections), 2)
+        stopResult = resultOf(self.pool.stopService())
+        self.assertEquals(stopResult, [None])
+        self.assertEquals(len(self.factory.connections), 2)
+        self.assertEquals(self.factory.connections[0].closed, True)
+        self.assertEquals(self.factory.connections[1].closed, True)
+
+
+    def test_connectAfterStop(self):
+        """
+        Calls to connection() after stopService() result in transactions which
+        immediately fail all operations.
+        """
+        stopResults = resultOf(self.pool.stopService())
+        self.assertEquals(stopResults, [None])
+        self.pauseHolders()
+        postClose = self.pool.connection()
+        queryResult = resultOf(postClose.execSQL("hello"))
+        self.assertEquals(len(queryResult), 1)
+        self.assertEquals(queryResult[0].type, ConnectionError)
+
+
+    def test_connectAfterStartedStopping(self):
+        """
+        Calls to connection() after stopService() has been called but before it
+        has completed will result in transactions which immediately fail all
+        operations.
+        """
+        self.pauseHolders()
+        preClose = self.pool.connection()
+        preCloseResult = resultOf(preClose.execSQL('statement'))
+        stopResult = resultOf(self.pool.stopService())
+        postClose = self.pool.connection()
+        queryResult = resultOf(postClose.execSQL("hello"))
+        self.assertEquals(stopResult, [])
+        self.assertEquals(len(queryResult), 1)
+        self.assertEquals(queryResult[0].type, ConnectionError)
+        self.assertEquals(len(preCloseResult), 1)
+        self.assertEquals(preCloseResult[0].type, ConnectionError)
+
+
+    def test_abortFailsDuringStopService(self):
+        """
+        L{IAsyncTransaction.abort} might fail, most likely because the
+        underlying database connection has already been disconnected.  If this
+        happens, shutdown should continue.
+        """
+        txns = []
+        txns.append(self.pool.connection())
+        txns.append(self.pool.connection())
+        # Fail one (and only one) call to rollback().
+        self.factory.rollbackFail = True
+        stopResult = resultOf(self.pool.stopService())
+        self.assertEquals(stopResult, [None])
+        self.assertEquals(len(self.flushLoggedErrors(RollbackFail)), 1)
+        self.assertEquals(self.factory.connections[0].closed, True)
+        self.assertEquals(self.factory.connections[1].closed, True)
+
+
+    def test_abortRecycledTransaction(self):
+        """
+        L{ConnectionPool.stopService} will shut down if a recycled transaction
+        is still pending.
+        """
+        recycled = self.pool.connection()
+        recycled.commit()
+        remember = []
+        remember.append(self.pool.connection())
+        self.assertEquals(resultOf(self.pool.stopService()), [None])
+
+
+    def test_waitForAlreadyAbortedTransaction(self):
+        """
+        L{ConnectionPool.stopService} will wait for all transactions to shut
+        down before exiting, including those which have already been stopped.
+        """
+        it = self.pool.connection()
+        self.pauseHolders()
+        abortResult = resultOf(it.abort())
+
+        # steal it from the queue so we can do it out of order
+        d, work = self.holders[0].queue.pop()
+        # that should be the only work unit so don't continue if something else
+        # got in there
+        self.assertEquals(self.holders[0].queue, [])
+        self.assertEquals(len(self.holders), 1)
+        self.flushHolders()
+        stopResult = resultOf(self.pool.stopService())
+        # Sanity check that we haven't actually stopped it yet
+        self.assertEquals(abortResult, [])
+        # We haven't fired it yet, so the service had better not have stopped...
+        self.assertEquals(stopResult, [])
+        d.callback(None)
+        self.assertEquals(abortResult, [None])
+        self.assertEquals(stopResult, [None])
+
+
+    def test_tooManyConnectionsWhileOthersFinish(self):
+        """
+        L{ConnectionPool.connection} will not spawn more than the maximum
+        connections if there are finishing transactions outstanding.
+        """
+        a = self.pool.connection()
+        b = self.pool.connection()
+        self.pauseHolders()
+        a.abort()
+        b.abort()
+        # Remove the holders for the existing connections, so that the 'extra'
+        # connection() call wins the race and gets executed first.
+        self.holders[:] = []
+        self.pool.connection()
+        self.flushHolders()
+        self.assertEquals(len(self.factory.connections), 2)
+
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110203/d973bdac/attachment-0001.html>


More information about the calendarserver-changes mailing list