[CalendarServer-changes] [7189] CalendarServer/trunk/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Mon Mar 14 05:28:45 PDT 2011


Revision: 7189
          http://trac.macosforge.org/projects/calendarserver/changeset/7189
Author:   glyph at apple.com
Date:     2011-03-14 05:28:45 -0700 (Mon, 14 Mar 2011)
Log Message:
-----------
Re-connect to the database when connections are lost, and re-start the transaction where it's possible to do that.

Modified Paths:
--------------
    CalendarServer/trunk/twext/enterprise/adbapi2.py
    CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py

Modified: CalendarServer/trunk/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/adbapi2.py	2011-03-13 04:33:13 UTC (rev 7188)
+++ CalendarServer/trunk/twext/enterprise/adbapi2.py	2011-03-14 12:28:45 UTC (rev 7189)
@@ -92,6 +92,7 @@
         self._cursor     = cursor
         self._connection = connection
         self._holder     = threadHolder
+        self._first      = True
 
 
     @_forward
@@ -109,6 +110,8 @@
 
 
     def _reallyExecSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+        wasFirst = self._first
+        self._first = False
         if args is None:
             args = []
         derived = None
@@ -120,7 +123,17 @@
                     derived = []
                 derived.append(arg)
                 args[n] = arg.preQuery(self._cursor)
-        self._cursor.execute(sql, args)
+        try:
+            self._cursor.execute(sql, args)
+        except:
+            if wasFirst:
+                self._connection.close()
+                self._connection = self._pool.connectionFactory()
+                self._cursor     = self._connection.cursor()
+                result = self._reallyExecSQL(sql, args, raiseOnZeroRowCount)
+                return result
+            else:
+                raise
         if derived is not None:
             for arg in derived:
                 arg.postQuery(self._cursor)
@@ -153,14 +166,16 @@
 
     def _end(self, really):
         """
-        Common logic for commit or abort.
+        Common logic for commit or abort.  Executed in the cursor main thread.
         """
         if not self._completed:
             self._completed = True
             def reallySomething():
+                # Executed in the cursor thread.
                 if self._cursor is None:
                     return
                 really()
+                self._first = True
             result = self._holder.submit(reallySomething)
             self._pool._repoolAfter(self, result)
             return result
@@ -173,7 +188,7 @@
 
 
     def abort(self):
-        return self._end(self._connection.rollback)
+        return self._end(self._connection.rollback).addErrback(log.err)
 
 
     def __del__(self):
@@ -510,11 +525,7 @@
         # Phase 3: All of the busy transactions must be aborted first.  As each
         # one is aborted, it will remove itself from the list.
         while self._busy:
-            d = self._busy[0].abort()
-            try:
-                yield d
-            except:
-                log.err()
+            yield self._busy[0].abort()
 
         # Phase 4: All transactions should now be in the free list, since
         # 'abort()' will have put them there.  Shut down all the associated
@@ -611,7 +622,12 @@
             self._finishing.remove(finishRecord)
             self._repoolNow(txn)
             return result
-        return d.addBoth(repool)
+        def discard(result):
+            self._finishing.remove(finishRecord)
+            txn._releaseConnection()
+            self._startOneMore()
+            return result
+        return d.addCallbacks(repool, discard)
 
 
     def _repoolNow(self, txn):

Modified: CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py	2011-03-13 04:33:13 UTC (rev 7188)
+++ CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py	2011-03-14 12:28:45 UTC (rev 7189)
@@ -75,8 +75,12 @@
 class FakeConnection(Parent, Child):
     """
     Fake Stand-in for DB-API 2.0 connection.
+
+    @ivar executions: the number of statements which have been executed.
     """
 
+    executions = 0
+
     def __init__(self, factory):
         """
         Initialize list of cursors
@@ -84,8 +88,17 @@
         Parent.__init__(self)
         Child.__init__(self, factory)
         self.id = factory.idcounter.next()
+        self._executeFailQueue = []
 
 
+    def executeWillFail(self, thunk):
+        """
+        The next call to L{FakeCursor.execute} will fail with an exception
+        returned from the given callable.
+        """
+        self._executeFailQueue.append(thunk)
+
+
     @property
     def cursors(self):
         "Alias to make tests more readable."
@@ -108,6 +121,7 @@
             raise RollbackFail()
 
 
+
 class RollbackFail(Exception):
     """
     Sample rollback-failure exception.
@@ -141,6 +155,9 @@
 
 
     def execute(self, sql, args=()):
+        self.connection.executions += 1
+        if self.connection._executeFailQueue:
+            raise self.connection._executeFailQueue.pop(0)()
         self.sql = sql
         self.description = True
         self.rowcount = 1
@@ -184,7 +201,7 @@
     def __init__(self):
         Parent.__init__(self)
         self.idcounter = count(1)
-        self._resultQueue = []
+        self._connectResultQueue = []
         self.defaultConnect()
 
 
@@ -199,8 +216,8 @@
         Implement the C{ConnectionFactory} callable expected by
         L{ConnectionPool}.
         """
-        if self._resultQueue:
-            thunk = self._resultQueue.pop(0)
+        if self._connectResultQueue:
+            thunk = self._connectResultQueue.pop(0)
         else:
             thunk = self._default
         return thunk()
@@ -212,7 +229,7 @@
         """
         def thunk():
             return FakeConnection(self)
-        self._resultQueue.append(thunk)
+        self._connectResultQueue.append(thunk)
 
 
     def willFail(self):
@@ -221,7 +238,7 @@
         """
         def thunk():
             raise FakeConnectionError()
-        self._resultQueue.append(thunk)
+        self._connectResultQueue.append(thunk)
 
 
     def defaultConnect(self):
@@ -229,7 +246,7 @@
         By default, connection attempts will succeed.
         """
         self.willConnect()
-        self._default = self._resultQueue.pop()
+        self._default = self._connectResultQueue.pop()
 
 
     def defaultFail(self):
@@ -237,7 +254,7 @@
         By default, connection attempts will fail.
         """
         self.willFail()
-        self._default = self._resultQueue.pop()
+        self._default = self._connectResultQueue.pop()
 
 
 
@@ -718,3 +735,115 @@
         notxn = self.pool.connection()
         self.assertEquals(notxn.dialect, TEST_DIALECT)
 
+
+    def test_reConnectWhenFirstExecFails(self):
+        """
+        Generally speaking, DB-API 2.0 adapters do not provide information about
+        the cause of a failed 'execute' method; they definitely don't provide it
+        in a way which can be identified as related to the syntax of the query,
+        the state of the database itself, the state of the connection, etc.
+
+        Therefore the best general heuristic for whether the connection to the
+        database has been lost and needs to be re-established is to catch
+        exceptions which are raised by the I{first} statement executed in a
+        transaction.
+        """
+        # Allow 'connect' to succeed.  This should behave basically the same
+        # whether connect() happened to succeed in some previous transaction and
+        # it's recycling the underlying transaction, or connect() just
+        # succeeded.  Either way you just have a _SingleTxn wrapping a
+        # _ConnectedTxn.
+        txn = self.pool.connection()
+        self.assertEquals(len(self.factory.connections), 1,
+                          "Sanity check failed.")
+        self.factory.connections[0].executeWillFail(RuntimeError)
+        results = resultOf(txn.execSQL("hello, world!"))
+        [[[counter, echo]]] = results
+        self.assertEquals("hello, world!", echo)
+        # Two execution attempts should have been made, one on each connection.
+        # The first failed with a RuntimeError, but that is deliberately
+        # obscured, because then we tried again and it succeeded.
+        self.assertEquals(len(self.factory.connections), 2,
+                          "No new connection opened.")
+        self.assertEquals(self.factory.connections[0].executions, 1)
+        self.assertEquals(self.factory.connections[1].executions, 1)
+        self.assertEquals(self.factory.connections[0].closed, True)
+        self.assertEquals(self.factory.connections[1].closed, False)
+
+
+    def test_reConnectWhenSecondExecFailsThenFirstExecFails(self):
+        """
+        Other connection-oriented errors might raise exceptions if they occur in
+        the middle of a transaction, but that should cause the error to be
+        caught, the transaction to be aborted, and the (closed) connection to be
+        recycled, where the next transaction that attempts to do anything with
+        it will encounter the error immediately and discover it needs to be
+        recycled.
+
+        It would be better if this behavior were invisible, but that could only
+        be accomplished with more precise database exceptions.  We may come up
+        with support in the future for more precisely identifying exceptions,
+        but I{unknown} exceptions should continue to be treated in this manner,
+        relaying the exception back to application code but attempting a
+        re-connection on the next try.
+        """
+        txn = self.pool.connection()
+        [[[counter, echo]]] = resultOf(txn.execSQL("hello, world!", []))
+        self.factory.connections[0].executeWillFail(ZeroDivisionError)
+        [f] = resultOf(txn.execSQL("divide by zero", []))
+        f.trap(ZeroDivisionError)
+        self.assertEquals(self.factory.connections[0].executions, 2)
+        # Reconnection should work exactly as before.
+        self.assertEquals(self.factory.connections[0].closed, False)
+        # Application code has to roll back its transaction at this point, since
+        # it failed (and we don't necessarily know why it failed: not enough
+        # information).
+        txn.abort()
+        self.factory.connections[0].executions = 0 # re-set for next test
+        self.assertEquals(len(self.factory.connections), 1)
+        self.test_reConnectWhenFirstExecFails()
+
+
+    def test_disconnectOnFailedRollback(self):
+        """
+        When C{rollback} fails for any reason on a connection object, then we
+        don't know what state it's in.  Most likely, it's already been
+        disconnected, so the connection should be closed and the transaction
+        de-pooled instead of recycled.
+
+        Also, a new connection will immediately be established to keep the pool
+        size the same.
+        """
+        txn = self.pool.connection()
+        self.factory.rollbackFail = True
+        [x] = resultOf(txn.abort())
+        # Abort does not propagate the error on, the transaction merely gets
+        # disposed of.
+        self.assertIdentical(x, None)
+        self.assertEquals(len(self.pool._free), 1)
+        self.assertNotIn(txn._baseTxn, self.pool._free)
+        self.assertEquals(self.pool._finishing, [])
+        self.assertEquals(len(self.factory.connections), 2)
+        self.assertEquals(self.factory.connections[0].closed, True)
+        self.assertEquals(self.factory.connections[1].closed, False)
+        self.assertEquals(len(self.flushLoggedErrors(RollbackFail)), 1)
+
+
+    def test_exceptionPropagatesFailedCommit(self):
+        """
+        A failed C{rollback} is fine (the premature death of the connection
+        without C{commit} means that the changes are surely gone), but a failed
+        C{commit} has to be relayed to client code, since that actually means
+        some changes didn't hit the database.
+        """
+        txn = self.pool.connection()
+        self.factory.commitFail = True
+        [x] = resultOf(txn.commit())
+        x.trap(CommitFail)
+        self.assertEquals(len(self.pool._free), 1)
+        self.assertNotIn(txn._baseTxn, self.pool._free)
+        self.assertEquals(self.pool._finishing, [])
+        self.assertEquals(len(self.factory.connections), 2)
+        self.assertEquals(self.factory.connections[0].closed, True)
+        self.assertEquals(self.factory.connections[1].closed, False)
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110314/1a43591f/attachment-0001.html>


More information about the calendarserver-changes mailing list