[CalendarServer-changes] [6866] CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Thu Feb 3 11:00:56 PST 2011


Revision: 6866
          http://trac.macosforge.org/projects/calendarserver/changeset/6866
Author:   glyph at apple.com
Date:     2011-02-03 11:00:55 -0800 (Thu, 03 Feb 2011)
Log Message:
-----------
Rename PooledSqlTxn -> _SingleTxn, and fix abort()-raises-exception shutdown bug.

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
    CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py

Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py	2011-02-03 19:00:52 UTC (rev 6865)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py	2011-02-03 19:00:55 UTC (rev 6866)
@@ -255,14 +255,21 @@
 
 
 
-class PooledSqlTxn(proxyForInterface(iface=IAsyncTransaction,
+class _SingleTxn(proxyForInterface(iface=IAsyncTransaction,
                                      originalAttribute='_baseTxn')):
     """
-    This is a temporary throw-away wrapper for the longer-lived
+    A L{_SingleTxn} is a single-use wrapper for the longer-lived
     L{_ConnectedTxn}, so that if a badly-behaved API client accidentally hangs
     on to one of these and, for example C{.abort()}s it multiple times once
     another client is using that connection, it will get some harmless
     tracebacks.
+
+    It's a wrapper around a "real" implementation; either a L{_ConnectedTxn},
+    L{_NoTxn}, or L{_WaitingTxn} depending on the availability of real
+    underlying datbase connections.
+
+    This is the only L{IAsyncTransaction} implementation exposed to application
+    code.
     """
 
     def __init__(self, pool, baseTxn):
@@ -275,7 +282,7 @@
         """
         Reveal the backend in the string representation.
         """
-        return 'PooledSqlTxn(%r)' % (self._baseTxn,)
+        return '_SingleTxn(%r)' % (self._baseTxn,)
 
 
     def _unspoolOnto(self, baseTxn):
@@ -291,19 +298,19 @@
 
     def execSQL(self, *a, **kw):
         self._checkComplete()
-        return super(PooledSqlTxn, self).execSQL(*a, **kw)
+        return super(_SingleTxn, self).execSQL(*a, **kw)
 
 
     def commit(self):
         self._markComplete()
-        return super(PooledSqlTxn, self).commit()
+        return super(_SingleTxn, self).commit()
 
 
     def abort(self):
         self._markComplete()
         if self in self._pool._waiting:
             return self._stopWaiting()
-        return super(PooledSqlTxn, self).abort()
+        return super(_SingleTxn, self).abort()
 
 
     def _stopWaiting(self):
@@ -373,18 +380,18 @@
     @type reactor: L{IReactorTime} and L{IReactorThreads} provider.
 
     @ivar _free: The list of free L{_ConnectedTxn} objects which are not
-        currently attached to a L{PooledSqlTxn} object, and have active
+        currently attached to a L{_SingleTxn} object, and have active
         connections ready for processing a new transaction.
 
     @ivar _busy: The list of busy L{_ConnectedTxn} objects; those currently
-        servicing an unfinished L{PooledSqlTxn} object.
+        servicing an unfinished L{_SingleTxn} object.
 
-    @ivar _finishing: The list of 2-tuples of L{PooledSqlTxn} objects which have
+    @ivar _finishing: The list of 2-tuples of L{_SingleTxn} objects which have
         had C{abort} or C{commit} called on them, but are not done executing
         that method, and the L{Deferred} returned from that method that will be
         fired when its execution has completed.
 
-    @ivar _waiting: The list of L{PooledSqlTxn} objects attached to a
+    @ivar _waiting: The list of L{_SingleTxn} objects attached to a
         L{_WaitingTxn}; i.e. those which are awaiting a connection to become
         free so that they can be executed.
 
@@ -434,11 +441,11 @@
         # one is aborted, it will remove itself from the list.
         while self._busy:
             busy = self._busy[0]
-#            try:
+            try:
             # FIXME: abort() might fail.
-            yield busy.abort()
-#            except:
-#                log.err()
+                yield busy.abort()
+            except:
+                log.err()
 #            if self._busy and busy is self._busy[0]:
 #                raise RuntimeError("this will result in an infinite loop.")
         # Phase 2: All transactions should now be in the free list, since
@@ -446,11 +453,12 @@
         # ThreadHolders.
         while self._free:
             # (Stopping a _ConnectedTxn doesn't automatically recycle it /
-            # remove it the way aborting a PooledSqlTxn does, so we need to
-            # .pop() here.)
+            # remove it the way aborting a _SingleTxn does, so we need to .pop()
+            # here.)
             free = self._free.pop()
             # stop() really shouldn't be able to fail, as it's just stopping the
-            # thread, and the holder's stop() is independently submitted.
+            # thread, and the holder's stop() is independently submitted from
+            # .abort() / .close().
             yield free.stop()
 
 
@@ -463,8 +471,9 @@
 
     def connection(self, label="<unlabeled>"):
         """
-        Find a transaction; either retrieve a free one from the list or
-        allocate a new one if no free ones are available.
+        Find and immediately return an L{IAsyncTransaction} object.  Execution
+        of statements, commit and abort on that transaction may be delayed until
+        a real underlying database connection is available.
 
         @return: an L{IAsyncTransaction}
         """
@@ -477,7 +486,7 @@
         else:
             basetxn = _WaitingTxn()
             tracking = self._waiting
-        txn = PooledSqlTxn(self, basetxn)
+        txn = _SingleTxn(self, basetxn)
         tracking.append(txn)
         # FIXME/TESTME: should be len(self._busy) + len(self._finishing) (free
         # doesn't need to be considered, as it's tested above)
@@ -730,8 +739,16 @@
 
 
     def newTransaction(self):
+        """
+        Create a new networked provider of L{IAsyncTransaction}.
+
+        (This will ultimately call L{ConnectionPool.connection} on the other end
+        of the wire.)
+
+        @rtype: L{IAsyncTransaction}
+        """
         txnid             = str(self._nextID())
-        txn               = Transaction(client=self, transactionID=txnid)
+        txn               = _NetTransaction(client=self, transactionID=txnid)
         self._txns[txnid] = txn
         self.callRemote(StartTxn, transactionID=txnid)
         return txn
@@ -778,9 +795,11 @@
 
 
 
-class Transaction(object):
+class _NetTransaction(object):
     """
-    Async protocol-based transaction implementation.
+    A L{_NetTransaction} is an L{AMP}-protocol-based provider of the
+    L{IAsyncTransaction} interface.  It sends SQL statements, query results, and
+    commit/abort commands via an AMP socket to a pooling process.
     """
 
     implements(IAsyncTransaction)

Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py	2011-02-03 19:00:52 UTC (rev 6865)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py	2011-02-03 19:00:55 UTC (rev 6866)
@@ -97,14 +97,31 @@
 
 
     def commit(self):
-        return
+        if self.parent.commitFail:
+            self.parent.commitFail = False
+            raise CommitFail()
 
 
     def rollback(self):
-        return
+        if self.parent.rollbackFail:
+            self.parent.rollbackFail = False
+            raise RollbackFail()
 
 
+class RollbackFail(Exception):
+    """
+    Sample rollback-failure exception.
+    """
 
+
+
+class CommitFail(Exception):
+    """
+    Sample Commit-failure exception.
+    """
+
+
+
 class FakeCursor(Child):
     """
     Fake stand-in for a DB-API 2.0 cursor.
@@ -139,6 +156,9 @@
 
 class ConnectionFactory(Parent):
 
+    rollbackFail = False
+    commitFail = False
+
     def __init__(self):
         Parent.__init__(self)
         self.idcounter = count(1)
@@ -563,3 +583,21 @@
         self.assertEquals(preCloseResult[0].type, ConnectionError)
 
 
+    def test_abortFailsDuringStopService(self):
+        """
+        L{IAsyncTransaction.abort} might fail, most likely because the
+        underlying database connection has already been disconnected.  If this
+        happens, shutdown should continue.
+        """
+        txns = []
+        txns.append(self.pool.connection())
+        txns.append(self.pool.connection())
+        # Fail one (and only one) call to rollback().
+        self.factory.rollbackFail = True
+        stopResult = resultOf(self.pool.stopService())
+        self.assertEquals(stopResult, [None])
+        self.assertEquals(len(self.flushLoggedErrors(RollbackFail)), 1)
+        self.assertEquals(self.factory.connections[0].closed, True)
+        self.assertEquals(self.factory.connections[1].closed, True)
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110203/8d9dbee7/attachment-0001.html>


More information about the calendarserver-changes mailing list