[CalendarServer-changes] [10291] CalendarServer/trunk/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Fri Jan 4 18:40:37 PST 2013


Revision: 10291
          http://trac.calendarserver.org//changeset/10291
Author:   glyph at apple.com
Date:     2013-01-04 18:40:37 -0800 (Fri, 04 Jan 2013)
Log Message:
-----------
Abort transactions when they're garbage collected.

Also improve error-handling behavior and reduce log spew of shared connection
pool a bit, by dealing with known exceptions when possible.

Modified Paths:
--------------
    CalendarServer/trunk/twext/enterprise/adbapi2.py
    CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py

Modified: CalendarServer/trunk/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/adbapi2.py	2013-01-05 00:51:01 UTC (rev 10290)
+++ CalendarServer/trunk/twext/enterprise/adbapi2.py	2013-01-05 02:40:37 UTC (rev 10291)
@@ -30,6 +30,7 @@
 """
 
 import sys
+import weakref
 
 from cStringIO import StringIO
 from cPickle import dumps, loads
@@ -143,7 +144,7 @@
 
     def __init__(self, pool, threadHolder, connection, cursor):
         self._pool       = pool
-        self._completed  = True
+        self._completed  = "idle"
         self._cursor     = cursor
         self._connection = connection
         self._holder     = threadHolder
@@ -324,7 +325,7 @@
             committed or aborted.
         """
         if not self._completed:
-            self._completed = True
+            self._completed = "ended"
             def reallySomething():
                 """
                 Do the database work and set appropriate flags.  Executed in the
@@ -338,7 +339,7 @@
             self._pool._repoolAfter(self, result)
             return result
         else:
-            raise AlreadyFinishedError()
+            raise AlreadyFinishedError(self._completed)
 
 
     def commit(self):
@@ -349,11 +350,6 @@
         return self._end(self._connection.rollback).addErrback(log.err)
 
 
-    def __del__(self):
-        if not self._completed:
-            self.abort()
-
-
     def reset(self):
         """
         Call this when placing this transaction back into the pool.
@@ -371,7 +367,7 @@
         Release the thread and database connection associated with this
         transaction.
         """
-        self._completed = True
+        self._completed = "released"
         self._stopped   = True
         holder          = self._holder
         self._holder    = None
@@ -393,16 +389,17 @@
     """
     implements(IAsyncTransaction)
 
-    def __init__(self, pool):
+    def __init__(self, pool, reason):
         self.paramstyle = pool.paramstyle
         self.dialect = pool.dialect
+        self.reason = reason
 
 
     def _everything(self, *a, **kw):
         """
         Everything fails with a L{ConnectionError}.
         """
-        return fail(ConnectionError())
+        return fail(ConnectionError(self.reason))
 
 
     execSQL = _everything
@@ -685,7 +682,10 @@
         Stop waiting for a free transaction and fail.
         """
         self._pool._waiting.remove(self)
-        self._unspoolOnto(_NoTxn(self._pool))
+        self._completed = True
+        self._unspoolOnto(_NoTxn(self._pool,
+                                 "connection pool shut down while txn "
+                                 "waiting for database connection."))
 
 
     def _checkComplete(self):
@@ -718,7 +718,20 @@
         return block
 
 
+    def __del__(self):
+        """
+        When garbage collected, a L{_SingleTxn} recycles itself.
+        """
+        try:
+            if not self._completed:
+                self.abort()
+        except AlreadyFinishedError:
+            # The underlying transaction might already be completed without us
+            # knowing; for example if the service shuts down.
+            pass
 
+
+
 class _Unspooler(object):
     def __init__(self, orig):
         self.orig = orig
@@ -1006,7 +1019,7 @@
         if self._stopping:
             # FIXME: should be wrapping a _SingleTxn around this to get
             # .commandBlock()
-            return _NoTxn(self)
+            return _NoTxn(self, "txn created while DB pool shutting down")
         if self._free:
             basetxn = self._free.pop(0)
             self._busy.append(basetxn)
@@ -1157,8 +1170,10 @@
 
 
 
-quashErrors = {
-    FailsafeException: "SOMETHING_UNKNOWN"
+_quashErrors = {
+    FailsafeException: "SOMETHING_UNKNOWN",
+    AlreadyFinishedError: "ALREADY_FINISHED",
+    ConnectionError: "CONNECTION_ERROR",
 }
 
 
@@ -1174,12 +1189,13 @@
             try:
                 val = yield inner(*a, **k)
             except:
-                # FIXME: if this were a general thing, it should probably allow
-                # known errors through; look at the command's 'errors' attribute
-                # before collapsing into FailsafeException.
-                log.err(Failure(),
-                        "shared database connection pool encountered error")
-                raise FailsafeException()
+                f = Failure()
+                if f.type in command.errors:
+                    returnValue(f)
+                else:
+                    log.err(Failure(),
+                            "shared database connection pool encountered error")
+                    raise FailsafeException()
             else:
                 returnValue(val)
         return command.responder(innerinner)
@@ -1192,7 +1208,7 @@
     Start a transaction, identified with an ID generated by the client.
     """
     arguments = txnarg()
-    errors = quashErrors
+    errors = _quashErrors
 
 
 
@@ -1205,7 +1221,7 @@
                  ('args', Pickle()),
                  ('blockID', String()),
                  ('reportZeroRowCount', Boolean())] + txnarg()
-    errors = quashErrors
+    errors = _quashErrors
 
 
 
@@ -1214,7 +1230,7 @@
     Create a new SQL command block.
     """
     arguments = [("blockID", String())] + txnarg()
-    errors = quashErrors
+    errors = _quashErrors
 
 
 
@@ -1223,7 +1239,7 @@
     Create a new SQL command block.
     """
     arguments = [("blockID", String())] + txnarg()
-    errors = quashErrors
+    errors = _quashErrors
 
 
 
@@ -1235,7 +1251,7 @@
 
     arguments = [('queryID', String()),
                  ('row', Pickle())]
-    errors = quashErrors
+    errors = _quashErrors
 
 
 
@@ -1248,19 +1264,19 @@
                  ('norows', Boolean()),
                  ('derived', Pickle()),
                  ('noneResult', Boolean())]
-    errors = quashErrors
+    errors = _quashErrors
 
 
 
 class Commit(Command):
     arguments = txnarg()
-    errors = quashErrors
+    errors = _quashErrors
 
 
 
 class Abort(Command):
     arguments = txnarg()
-    errors = quashErrors
+    errors = _quashErrors
 
 
 
@@ -1390,7 +1406,7 @@
         # See DEFAULT_PARAM_STYLE FIXME above.
         super(ConnectionPoolClient, self).__init__()
         self._nextID    = count().next
-        self._txns      = {}
+        self._txns      = weakref.WeakValueDictionary()
         self._queries   = {}
         self.dialect    = dialect
         self.paramstyle = paramstyle
@@ -1591,7 +1607,16 @@
         return _NetCommandBlock(self, blockID)
 
 
+    def __del__(self):
+        """
+        When a L{_NetTransaction} is garabage collected, it aborts itself.
+        """
+        if not self._completed:
+            def shush(f):
+                f.trap(ConnectionError, AlreadyFinishedError)
+            self.abort().addErrback(shush)
 
+
 class _NetCommandBlock(object):
     """
     Net command block.

Modified: CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py	2013-01-05 00:51:01 UTC (rev 10290)
+++ CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py	2013-01-05 02:40:37 UTC (rev 10291)
@@ -47,6 +47,7 @@
 from twext.enterprise.adbapi2 import DEFAULT_PARAM_STYLE
 from twext.enterprise.adbapi2 import ConnectionPool
 from twext.internet.threadutils import ThreadHolder
+from twext.enterprise.adbapi2 import Commit
 
 
 def resultOf(deferred, propagate=False):
@@ -151,6 +152,8 @@
         Child.__init__(self, factory)
         self.id = factory.idcounter.next()
         self._executeFailQueue = []
+        self._commitCount = 0
+        self._rollbackCount = 0
 
 
     def executeWillFail(self, thunk):
@@ -172,12 +175,14 @@
 
 
     def commit(self):
+        self._commitCount += 1
         if self.parent.commitFail:
             self.parent.commitFail = False
             raise CommitFail()
 
 
     def rollback(self):
+        self._rollbackCount += 1
         if self.parent.rollbackFail:
             self.parent.rollbackFail = False
             raise RollbackFail()
@@ -883,9 +888,10 @@
         executed) will result in all of its Deferreds immediately failing and
         none of the queued statements being executed.
         """
+        active = []
         # Use up the available connections ...
         for i in xrange(self.pool.maxConnections):
-            self.createTransaction()
+            active.append(self.createTransaction())
         # ... so that this one has to be spooled.
         spooled = self.createTransaction()
         result = self.resultOf(spooled.execSQL("alpha"))
@@ -922,6 +928,25 @@
         self.assertEquals(stopResult, [None])
 
 
+    def test_garbageCollectedTransactionAborts(self):
+        """
+        When an L{IAsyncTransaction} is garbage collected, it ought to abort
+        itself.
+        """
+        t = self.createTransaction()
+        self.resultOf(t.execSQL("echo", []))
+        import gc
+        conns = self.factory.connections
+        self.assertEquals(len(conns), 1)
+        self.assertEquals(conns[0]._rollbackCount, 0)
+        del t
+        gc.collect()
+        self.flushHolders()
+        self.assertEquals(len(conns), 1)
+        self.assertEquals(conns[0]._rollbackCount, 1)
+        self.assertEquals(conns[0]._commitCount, 0)
+
+
     def test_tooManyConnectionsWhileOthersFinish(self):
         """
         L{ConnectionPool.connection} will not spawn more than the maximum
@@ -1585,6 +1610,8 @@
         them.  Flush the locally logged error of the given type and return
         L{UnknownRemoteError}.
         """
+        if err in Commit.errors:
+            return err
         self.flushLoggedErrors(err)
         return FailsafeException
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130104/156befc3/attachment-0001.html>


More information about the calendarserver-changes mailing list