[CalendarServer-changes] [6858] CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Thu Feb 3 11:00:31 PST 2011


Revision: 6858
          http://trac.macosforge.org/projects/calendarserver/changeset/6858
Author:   glyph at apple.com
Date:     2011-02-03 11:00:31 -0800 (Thu, 03 Feb 2011)
Log Message:
-----------
Do re-pooling internally in BaseSqlTxn now, fixing an issue where bogus transactions (ones with no backend) were being kept around in the busy list.

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
    CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py

Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py	2011-02-03 19:00:27 UTC (rev 6857)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py	2011-02-03 19:00:31 UTC (rev 6858)
@@ -72,11 +72,12 @@
     # See DEFAULT_PARAM_STYLE FIXME above.
     paramstyle = DEFAULT_PARAM_STYLE
 
-    def __init__(self, threadHolder, connection, cursor):
-        self._completed = True
-        self._cursor = cursor
+    def __init__(self, pool, threadHolder, connection, cursor):
+        self._pool       = pool
+        self._completed  = True
+        self._cursor     = cursor
         self._connection = connection
-        self._holder = threadHolder
+        self._holder     = threadHolder
 
 
     def _reallyExecSQL(self, sql, args=None, raiseOnZeroRowCount=None):
@@ -110,30 +111,29 @@
         return result
 
 
-    def commit(self):
+    def _end(self, really):
+        """
+        Common logic for commit or abort.
+        """
         if not self._completed:
             self._completed = True
-            def reallyCommit():
+            def reallySomething():
                 if self._cursor is None:
                     return
-                self._connection.commit()
-            result = self._holder.submit(reallyCommit)
+                really()
+            result = self._holder.submit(reallySomething)
+            self._pool._repoolAfter(self, result)
             return result
         else:
             raise AlreadyFinishedError()
 
 
+    def commit(self):
+        return self._end(self._connection.commit)
+
+
     def abort(self):
-        if not self._completed:
-            self._completed = True
-            def reallyAbort():
-                if self._cursor is None:
-                    return
-                self._connection.rollback()
-            result = self._holder.submit(reallyAbort)
-            return result
-        else:
-            raise AlreadyFinishedError()
+        return self._end(self._connection.rollback)
 
 
     def __del__(self):
@@ -269,11 +269,18 @@
         self._complete = False
 
 
+    def __repr__(self):
+        """
+        Reveal the backend in the string representation.
+        """
+        return 'PooledSqlTxn(%r)' % (self._baseTxn,)
+
+
     def _unspoolOnto(self, baseTxn):
         """
         Replace my C{_baseTxn}, currently a L{SpooledTxn}, with a L{BaseSqlTxn}.
         """
-        spooledBase = self._baseTxn
+        spooledBase   = self._baseTxn
         self._baseTxn = baseTxn
         spooledBase._unspool(baseTxn)
 
@@ -285,20 +292,19 @@
 
     def commit(self):
         self._markComplete()
-        return self._pool._repoolAfter(self, super(PooledSqlTxn, self).commit())
+        return super(PooledSqlTxn, self).commit()
 
 
     def abort(self):
         self._markComplete()
         if self in self._pool.waiting:
-            return self.stopWaiting()
-        else:
-            return self._pool._repoolAfter(self, super(PooledSqlTxn, self).abort())
+            return self._stopWaiting()
+        return super(PooledSqlTxn, self).abort()
 
 
-    def stopWaiting(self):
+    def _stopWaiting(self):
         """
-        Stop waiting for a transaction and fail.
+        Stop waiting for a free transaction and fail.
         """
         self._pool.waiting.remove(self)
         self._unspoolOnto(FailedTxn())
@@ -327,7 +333,7 @@
     _retry = None
 
     def __init__(self, pool, holder):
-        self._pool = pool
+        self._pool   = pool
         self._holder = holder
 
 
@@ -366,8 +372,8 @@
         attached to a L{PooledSqlTxn} object, and have active connections ready
         for processing a new transaction.
 
-    @ivar busy: The list of busy (currently bound to a L{BaseSqlTxn})
-        L{PooledSqlTxn} objects.
+    @ivar busy: The list of busy L{BaseSqlTxn} objects; those currently
+        servicing an unfinished L{PooledSqlTxn} object.
 
     @ivar finishing: The list of 2-tuples of L{PooledSqlTxn} objects which have
         had C{abort} or C{commit} called on them, but are not done executing
@@ -393,12 +399,12 @@
         self.connectionFactory = connectionFactory
         self.maxConnections = maxConnections
 
-        self.free = []
-        self.busy = []
-        self.waiting = []
-        self.finishing = []
+        self.free       = []
+        self.busy       = []
+        self.waiting    = []
+        self.finishing  = []
         self.connecting = []
-        self.stopping = False
+        self.stopping   = False
 
 
     def startService(self):
@@ -419,7 +425,7 @@
         # eagerly acquire new connections as they flow into the free-list.
         while self.waiting:
             waiting = self.waiting[0]
-            yield waiting.stopWaiting()
+            yield waiting._stopWaiting()
         # FIXME: there should be tests for these 'yield's.
         # Phase 1: All of the busy transactions must be aborted first.  As each
         # one is aborted, it will remove itself from the list.
@@ -487,17 +493,17 @@
             # support threadlevel=1; we can't necessarily cursor() in a
             # different thread than we do transactions in.
             connection = self.connectionFactory()
-            cursor = connection.cursor()
+            cursor     = connection.cursor()
             return (connection, cursor)
         def finishInit((connection, cursor)):
             baseTxn = BaseSqlTxn(
+                pool=self,
                 threadHolder=holder,
                 connection=connection,
                 cursor=cursor
             )
             self.busy.remove(txn)
-            txn._baseTxn = baseTxn
-            self._repoolNow(txn)
+            self._repoolNow(baseTxn)
         def maybeTryAgain(f):
             log.err(f, "Re-trying connection due to connection failure")
             txn._retry = self.reactor.callLater(self.RETRY_TIMEOUT, resubmit)
@@ -509,15 +515,9 @@
 
     def _repoolAfter(self, txn, d):
         """
-        Re-pool the back-end for the given L{PooledSqlTxn} after the given
-        L{Deferred} has fired.
+        Re-pool the given L{BaseSqlTxn} after the given L{Deferred} has fired.
         """
-        if txn in self.busy:
-            # should only _not_ in self.busy if its underlying txn is a
-            # SpooledTxn.  TODO: when the SpooledTxn unspools, does it properly
-            # go into 'finishing' once it's re-submitted its commit or abort?  I
-            # don't think so.
-            self.busy.remove(txn)
+        self.busy.remove(txn)
         finishRecord = (txn, d)
         self.finishing.append(finishRecord)
         def repool(result):
@@ -529,26 +529,15 @@
 
     def _repoolNow(self, txn):
         """
-        Shuck the L{PooledSqlTxn} wrapper off, and recycle the underlying
-        L{BaseSqlTxn} into the free list.
+        Recycle a L{BaseSqlTxn} into the free list.
         """
-        baseTxn = txn._baseTxn
-        txn._baseTxn = None
-        if isinstance(baseTxn, FailedTxn):
-            # A bit of a hack, maybe clean this up, but: if a SpooledTxn spools
-            # a commit(), PooledTxn() will repoolAfter(commit()).  If the
-            # service stops before that PooledTxn gets a real backend, then we
-            # will receive it here.  If that's the case we can give up and just
-            # throw it away.  (Maybe the pool-management logic needs to be moved
-            # to BaseSqlTxn to avoid this kind of layering violation?)
-            return
-        baseTxn.reset()
+        txn.reset()
         if self.waiting:
             waiting = self.waiting.pop(0)
-            self.busy.append(waiting)
-            waiting._unspoolOnto(baseTxn)
+            self.busy.append(txn)
+            waiting._unspoolOnto(txn)
         else:
-            self.free.append(baseTxn)
+            self.free.append(txn)
 
 
 
@@ -672,7 +661,7 @@
         Initialize a mapping of transaction IDs to transaction objects.
         """
         super(ConnectionPoolConnection, self).__init__()
-        self.pool = pool
+        self.pool  = pool
         self._txns = {}
 
 
@@ -728,16 +717,16 @@
     """
     def __init__(self):
         super(ConnectionPoolClient, self).__init__()
-        self._nextID = count().next
-        self._txns = {}
+        self._nextID  = count().next
+        self._txns    = {}
         self._queries = {}
 
 
     def newTransaction(self):
-        txnid = str(self._nextID())
-        self.callRemote(StartTxn, transactionID=txnid)
-        txn = Transaction(client=self, transactionID=txnid)
+        txnid             = str(self._nextID())
+        txn               = Transaction(client=self, transactionID=txnid)
         self._txns[txnid] = txn
+        self.callRemote(StartTxn, transactionID=txnid)
         return txn
 
 
@@ -756,8 +745,8 @@
 
 class _Query(object):
     def __init__(self, raiseOnZeroRowCount):
-        self.results = []
-        self.deferred = Deferred()
+        self.results             = []
+        self.deferred            = Deferred()
         self.raiseOnZeroRowCount = raiseOnZeroRowCount
 
 
@@ -797,9 +786,9 @@
         Initialize a transaction with a L{ConnectionPoolClient} and a unique
         transaction identifier.
         """
-        self._client = client
+        self._client        = client
         self._transactionID = transactionID
-        self._completed = False
+        self._completed     = False
 
 
     def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):

Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py	2011-02-03 19:00:27 UTC (rev 6857)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py	2011-02-03 19:00:31 UTC (rev 6858)
@@ -271,12 +271,13 @@
         Create a L{ConnectionPool} attached to a C{ConnectionFactory}.  Start
         the L{ConnectionPool}.
         """
-        self.paused = False
-        self.holders = []
-        self.factory = ConnectionFactory()
-        self.pool = ConnectionPool(self.factory.connect, maxConnections=2)
+        self.paused             = False
+        self.holders            = []
+        self.factory            = ConnectionFactory()
+        self.pool               = ConnectionPool(self.factory.connect,
+                                                 maxConnections=2)
         self.pool._createHolder = self.makeAHolder
-        self.clock = self.pool.reactor = Clock()
+        self.clock              = self.pool.reactor = Clock()
         self.pool.startService()
 
 
@@ -364,6 +365,7 @@
         """
         a = self.pool.connection()
         [[[counter, echo]]] = resultOf(a.execSQL("alpha"))
+        self.assertEquals(len(self.factory.connections), 1)
         self.assertEquals(len(self.holders), 1)
         [holder] = self.holders
         self.assertEquals(holder.started, True)
@@ -374,6 +376,8 @@
         self.assertEquals(len(self.holders), 1)
         self.assertEquals(holder.started, True)
         self.assertEquals(holder.stopped, True)
+        # Closing fake connections removes them from the list.
+        self.assertEquals(len(self.factory.connections), 0)
 
 
     def test_retryAfterConnectError(self):
@@ -503,3 +507,25 @@
         self.assertEquals(ce[0].type, ConnectionError)
 
 
+    def test_repoolSpooled(self):
+        """
+        Regression test for a somewhat tricky-to-explain bug: when a spooled
+        transaction which has already had commit() called on it before it's
+        received a real connection to start executing on, it will not leave
+        behind any detritus that prevents stopService from working.
+        """
+        self.pauseHolders()
+        c = self.pool.connection()
+        c2 = self.pool.connection()
+        c3 = self.pool.connection()
+        c.commit()
+        c2.commit()
+        c3.commit()
+        self.flushHolders()
+        self.assertEquals(len(self.factory.connections), 2)
+        stopResult = resultOf(self.pool.stopService())
+        self.assertEquals(stopResult, [None])
+        self.assertEquals(len(self.factory.connections), 0)
+
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110203/5e0714f6/attachment-0001.html>


More information about the calendarserver-changes mailing list