[CalendarServer-changes] [6843] CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Tue Feb 1 16:08:55 PST 2011


Revision: 6843
          http://trac.macosforge.org/projects/calendarserver/changeset/6843
Author:   glyph at apple.com
Date:     2011-02-01 16:08:55 -0800 (Tue, 01 Feb 2011)
Log Message:
-----------
fix for the abort()-a-pooled-transaction case.

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
    CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py

Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py	2011-02-02 00:08:51 UTC (rev 6842)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py	2011-02-02 00:08:55 UTC (rev 6843)
@@ -50,6 +50,7 @@
 from twisted.python.components import proxyForInterface
 
 from twext.internet.threadutils import ThreadHolder
+from twisted.internet.defer import succeed
 from twext.enterprise.ienterprise import AlreadyFinishedError, IAsyncTransaction
 
 
@@ -135,7 +136,6 @@
 
     def __del__(self):
         if not self._completed:
-            print 'BaseSqlTxn.__del__: OK'
             self.abort()
 
 
@@ -230,7 +230,7 @@
 
 
     def abort(self):
-        return self._enspool('abort')
+        return succeed(None)
 
 
 
@@ -249,6 +249,15 @@
         self._complete = False
 
 
+    def _unspoolOnto(self, baseTxn):
+        """
+        Replace my C{_baseTxn}, currently a L{SpooledTxn}, with a L{BaseSqlTxn}.
+        """
+        spooledBase = self._baseTxn
+        self._baseTxn = baseTxn
+        spooledBase._unspool(baseTxn)
+
+
     def execSQL(self, *a, **kw):
         self._checkComplete()
         return super(PooledSqlTxn, self).execSQL(*a, **kw)
@@ -256,12 +265,18 @@
 
     def commit(self):
         self._markComplete()
-        return self._repoolAfter(super(PooledSqlTxn, self).commit())
+        return self._pool._repoolAfter(self, super(PooledSqlTxn, self).commit())
 
 
     def abort(self):
         self._markComplete()
-        return self._repoolAfter(super(PooledSqlTxn, self).abort())
+        waiting = self._pool.waiting
+        if self in waiting:
+            waiting.remove(self)
+            # FIXME: waiting.remove()
+            return succeed(None)
+        else:
+            return self._pool._repoolAfter(self, super(PooledSqlTxn, self).abort())
 
 
     def _checkComplete(self):
@@ -280,33 +295,16 @@
         self._complete = True
 
 
-    def _repoolAfter(self, d):
-        # FIXME: if abort() is called (say, by an HTTP connection terminating
-        # because the reactor is shutting down), the transaction will not
-        # immediately be moved to the 'stopped' list or removed from the 'busy'
-        # list; instead, it will be re-pooled only after the first abort()
-        # concludes, and stopService will sit in a loop calling abort() over and
-        # over again (and logging the traceback from _checkComplete()) until the
-        # thread actually doing the aborting finishes what it's doing.  This
-        # needs to be fixed to represent an intermediary state where completion
-        # has started and not stopped - note that this is *only* for interaction
-        # with stopService, there's no other legitimate cause for two abort()s
-        # to be called.
-        def repool(result):
-            self._pool.reclaim(self)
-            return result
-        return d.addCallback(repool)
 
+class _ConnectingPseudoTxn(object):
 
-
-class _ConnectingPsuedoTxn(object):
-
     _retry = None
 
     def __init__(self, pool, holder):
         self._pool = pool
         self._holder = holder
 
+
     def abort(self):
         # not implemented yet, but let's fail rather than break the test
         # raise NotImplementedError()
@@ -339,6 +337,22 @@
         for failed connect() attempts.
 
     @type reactor: L{IReactorTime} and L{IReactorThreads} provider.
+
+    @ivar free: The list of free L{BaseSqlTxn} objects which are not currently
+        attached to a L{PooledSqlTxn} object, and have active connections ready
+        for processing a new transaction.
+
+    @ivar busy: The list of busy (currently bound to a L{BaseSqlTxn})
+        L{PooledSqlTxn} objects.
+
+    @ivar finishing: The list of 2-tuples of L{PooledSqlTxn} objects which have
+        had C{abort} or C{commit} called on them, but are not done executing
+        that method, and the L{Deferred} returned from that method that will be
+        fired when its execution has completed.
+
+    @ivar waiting: The list of L{PooledSqlTxn} objects attached to a
+        L{SpooledTxn}; i.e. those which are awaiting a connection to become free
+        so that they can be executed.
     """
 
     reactor = _reactor
@@ -347,12 +361,16 @@
 
 
     def __init__(self, connectionFactory, maxConnections=10):
+
         super(ConnectionPool, self).__init__()
+        self.connectionFactory = connectionFactory
+        self.maxConnections = maxConnections
+
         self.free = []
         self.busy = []
         self.waiting = []
-        self.connectionFactory = connectionFactory
-        self.maxConnections = maxConnections
+        self.finishing = []
+        self.connecting = []
 
 
     def startService(self):
@@ -371,10 +389,13 @@
         # one is aborted, it will remove itself from the list.
         while self.busy:
             busy = self.busy[0]
-            try:
-                yield busy.abort()
-            except:
-                log.err()
+#            try:
+            # FIXME: abort() might fail.
+            yield busy.abort()
+#            except:
+#                log.err()
+#            if self.busy and busy is self.busy[0]:
+#                raise RuntimeError("this will result in an infinite loop.")
         # Phase 2: All transactions should now be in the free list, since
         # 'abort()' will have put them there.  Shut down all the associated
         # ThreadHolders.
@@ -383,6 +404,8 @@
             # it the way aborting a PooledSqlTxn does, so we need to .pop()
             # here.)
             free = self.free.pop()
+            # stop() really shouldn't be able to fail, as it's just stopping the
+            # thread, and the holder's stop() is independently submitted.
             yield free.stop()
 
 
@@ -408,6 +431,8 @@
             tracking = self.waiting
         txn = PooledSqlTxn(self, basetxn)
         tracking.append(txn)
+        # FIXME/TESTME: should be len(self.busy) + len(self.finishing) (free
+        # doesn't need to be considered, as it's tested above)
         if tracking is self.waiting and len(self.busy) < self.maxConnections:
             self._startOneMore()
         return txn
@@ -419,7 +444,7 @@
         """
         holder = self._createHolder()
         holder.start()
-        txn = _ConnectingPsuedoTxn(self, holder)
+        txn = _ConnectingPseudoTxn(self, holder)
         # take up a slot in the 'busy' list, sit there so we can be aborted.
         self.busy.append(txn)
         def initCursor():
@@ -434,36 +459,47 @@
                 connection=connection,
                 cursor=cursor
             )
+            self.busy.remove(txn)
             txn._baseTxn = baseTxn
-            self.reclaim(txn)
+            self._repoolNow(txn)
         def maybeTryAgain(f):
-            log.err(f)
+            log.err(f, "Re-trying connection due to connection failure")
             txn._retry = self.reactor.callLater(self.RETRY_TIMEOUT, resubmit)
         def resubmit():
             d = holder.submit(initCursor)
             d.addCallbacks(finishInit, maybeTryAgain)
         resubmit()
-        return txn
 
 
-    def reclaim(self, txn):
+    def _repoolAfter(self, txn, d):
         """
+        Re-pool the back-end for the given L{PooledSqlTxn} after the given
+        L{Deferred} has fired.
+        """
+        if txn in self.busy:
+            # should only _not_ in self.busy if its underlying txn is a SpooledTxn
+            self.busy.remove(txn)
+        finishRecord = (txn, d)
+        self.finishing.append(finishRecord)
+        def repool(result):
+            self.finishing.remove(finishRecord)
+            self._repoolNow(txn)
+            return result
+        return d.addBoth(repool)
+
+
+    def _repoolNow(self, txn):
+        """
         Shuck the L{PooledSqlTxn} wrapper off, and recycle the underlying
         L{BaseSqlTxn} into the free list.
         """
-        self.busy.remove(txn)
         baseTxn = txn._baseTxn
+        txn._baseTxn = None
         baseTxn.reset()
         if self.waiting:
             waiting = self.waiting.pop(0)
-            waiting._baseTxn._unspool(baseTxn)
-            # Note: although commit() may already have been called, we don't
-            # have to handle it specially here.  It only unspools after the
-            # deferred returned by commit() has actually been called, and since
-            # that occurs in a callFromThread, that won't happen until the next
-            # iteration of the mainloop, when the _baseTxn is safely correct.
-            waiting._baseTxn = baseTxn
             self.busy.append(waiting)
+            waiting._unspoolOnto(baseTxn)
         else:
             self.free.append(baseTxn)
 

Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py	2011-02-02 00:08:51 UTC (rev 6842)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py	2011-02-02 00:08:55 UTC (rev 6843)
@@ -22,8 +22,6 @@
 
 from twisted.trial.unittest import TestCase
 
-from twisted.internet.defer import inlineCallbacks
-
 from twisted.internet.defer import execute
 from twisted.internet.task import Clock
 
@@ -31,6 +29,24 @@
 from twext.enterprise.adbapi2 import ConnectionPool
 
 
+def resultOf(deferred, propagate=False):
+    """
+    Add a callback and errback which will capture the result of a L{Deferred} in
+    a list, and return that list.  If 'propagate' is True, pass through the
+    results.
+    """
+    results = []
+    if propagate:
+        def cb(r):
+            results.append(r)
+            return r
+    else:
+        cb = results.append
+    deferred.addBoth(cb)
+    return results
+
+
+
 class Child(object):
     """
     An object with a L{Parent}, in its list of C{children}.
@@ -263,14 +279,12 @@
         self.pool.startService()
 
 
-    @inlineCallbacks
     def tearDown(self):
         """
         Make sure the service is stopped and the fake ThreadHolders are all
         executing their queues so failed tests can exit cleanly.
         """
         self.flushHolders()
-        yield self.pool.stopService()
 
 
     def flushHolders(self):
@@ -299,7 +313,6 @@
         return fth
 
 
-    @inlineCallbacks
     def test_tooManyConnections(self):
         """
         When the number of outstanding busy transactions exceeds the number of
@@ -309,48 +322,52 @@
         until an existing connection becomes available.
         """
         a = self.pool.connection()
-        [[counter, echo]] = yield a.execSQL("alpha")
+
+        alphaResult = resultOf(a.execSQL("alpha"))
+        [[counter, echo]] = alphaResult[0]
+
         b = self.pool.connection()
-        [[bcounter, becho]] = yield b.execSQL("beta")
+        # 'b' should have opened a connection.
+        self.assertEquals(len(self.factory.connections), 2)
+        betaResult = resultOf(b.execSQL("beta"))
+        [[bcounter, becho]] = betaResult[0]
 
         # both 'a' and 'b' are holding open a connection now; let's try to open
         # a third one.  (The ordering will be deterministic even if this fails,
         # because those threads are already busy.)
         c = self.pool.connection()
-        enqueue = c.execSQL("gamma")
-        x = []
-        def addtox(it):
-            x.append(it)
-            return it
-        enqueue.addCallback(addtox)
+        gammaResult = resultOf(c.execSQL("gamma"))
 
         # Did 'c' open a connection?  Let's hope not...
         self.assertEquals(len(self.factory.connections), 2)
+        # SQL shouldn't be executed too soon...
+        self.assertEquals(gammaResult, [])
 
-        self.failIf(bool(x), "SQL executed too soon!")
-        yield b.commit()
+        commitResult = resultOf(b.commit())
 
         # Now that 'b' has committed, 'c' should be able to complete.
-        [[ccounter, cecho]] = yield enqueue
+        [[ccounter, cecho]] = gammaResult[0]
 
-        # The connection for 'a' ought to be busy, so let's make sure we're
-        # using the one for 'c'.
+        # The connection for 'a' ought to still be busy, so let's make sure
+        # we're using the one for 'c'.
         self.assertEquals(ccounter, bcounter)
 
+        # Sanity check: the commit should have succeded!
+        self.assertEquals(commitResult, [None])
 
-    @inlineCallbacks
+
     def test_stopService(self):
         """
         L{ConnectionPool.stopService} stops all the associated L{ThreadHolder}s
         and thereby frees up the resources it is holding.
         """
         a = self.pool.connection()
-        [[counter, echo]] = yield a.execSQL("alpha")
+        [[[counter, echo]]] = resultOf(a.execSQL("alpha"))
         self.assertEquals(len(self.holders), 1)
         [holder] = self.holders
         self.assertEquals(holder.started, True)
         self.assertEquals(holder.stopped, False)
-        yield self.pool.stopService()
+        self.pool.stopService()
         self.assertEquals(self.pool.busy, [])
         self.assertEquals(self.pool.free, [])
         self.assertEquals(len(self.holders), 1)
@@ -442,4 +459,26 @@
         self.assertEquals(holder.stopped, True)
 
 
+    def test_stopServiceMidAbort(self):
+        """
+        When L{ConnectionPool.stopService} is called with deferreds from
+        C{abort} still outstanding, it will wait for the currently-aborting
+        transaction to fully abort before firing the L{Deferred} returned from
+        C{stopService}.
+        """
+        # TODO: commit() too?
+        self.pauseHolders()
+        c = self.pool.connection()
+        abortResult = resultOf(c.abort())
+        # Should abort instantly, as it hasn't managed to unspool anything yet.
+        # FIXME: kill all Deferreds associated with this thing, make sure that
+        # any outstanding query callback chains get nuked.
+        self.assertEquals(abortResult, [None])
+        stopResult = resultOf(self.pool.stopService())
+        self.assertEquals(stopResult, [])
+        self.flushHolders()
+        #self.assertEquals(abortResult, [None])
+        self.assertEquals(stopResult, [None])
 
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110201/7fe7a941/attachment-0001.html>


More information about the calendarserver-changes mailing list