[CalendarServer-changes] [8133] CalendarServer/trunk/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Wed Sep 28 21:26:08 PDT 2011


Revision: 8133
          http://trac.macosforge.org/projects/calendarserver/changeset/8133
Author:   glyph at apple.com
Date:     2011-09-28 21:26:08 -0700 (Wed, 28 Sep 2011)
Log Message:
-----------
Fix _reallyExecSQL so that if cursor.close() raises an exception during automatic database reconnection, the reconnection will still take place and the error will not be reported to application code.  Also, add error reporting for the first-execute()-in-transaction-reconnects case, with an appropriately qualifying message, so that we won't mask any more serious / intermittent errors but hopefully won't unduly alarm administrators.

Finally, add extensive documentation, and factor out the slightly distracting IDerivedParameter stuff, so that other maintainers will have a thorough explanation handy if they need to touch this somewhat subtle and mysterious code.

Modified Paths:
--------------
    CalendarServer/trunk/twext/enterprise/adbapi2.py
    CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py

Modified: CalendarServer/trunk/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/adbapi2.py	2011-09-28 22:52:10 UTC (rev 8132)
+++ CalendarServer/trunk/twext/enterprise/adbapi2.py	2011-09-29 04:26:08 UTC (rev 8133)
@@ -82,6 +82,57 @@
 
 
 
+def _deriveParameters(cursor, args):
+    """
+    Some DB-API extensions need to call special extension methods on
+    the cursor itself before executing.
+
+    @param cursor: The DB-API cursor object to derive parameters from.
+
+    @param args: the parameters being specified to C{execSQL}.  This list will
+        be modified to present parameters suitable to pass to the C{cursor}'s
+        C{execute} method.
+
+    @return: a list of L{IDerivedParameter} providers which had C{preQuery}
+        executed on them, so that after the query they may have C{postQuery}
+        executed.  This may also be C{None} if no parameters were derived.
+
+    @see: {IDerivedParameter}
+    """
+    # TODO: have the underlying connection report whether it has any
+    # IDerivedParameters that it knows about, so we can skip even inspecting
+    # the arguments if none of them could possibly provide
+    # IDerivedParameter.
+    derived = None
+    for n, arg in enumerate(args):
+        if IDerivedParameter.providedBy(arg):
+            if derived is None:
+                # Be as sparing as possible with extra allocations, as this
+                # usually isn't needed, and we're doing a ton of extra work to
+                # support it.
+                derived = []
+            derived.append(arg)
+            args[n] = arg.preQuery(cursor)
+    return derived
+
+
+def _deriveQueryEnded(cursor, derived):
+    """
+    A query which involved some L{IDerivedParameter}s just ended.  Execute any
+    post-query cleanup or tasks that those parameters have to do.
+
+    @param cursor: The DB-API object that derived the query.
+
+    @param derived: The L{IDerivedParameter} providers that were given
+        C{preQuery} notifications when the query started.
+
+    @return: C{None}
+    """
+    for arg in derived:
+        arg.postQuery(cursor)
+
+
+
 class _ConnectedTxn(object):
     """
     L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the
@@ -113,33 +164,119 @@
 
 
     def _reallyExecSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+        """
+        Execute the given SQL on a thread, using a DB-API 2.0 cursor.
+
+        This method is invoked internally on a non-reactor thread, one dedicated
+        to and associated with the current cursor.  It executes the given SQL,
+        re-connecting first if necessary, re-cycling the old connection if
+        necessary, and then, if there are results from the statement (as
+        determined by the DB-API 2.0 'description' attribute) it will fetch all
+        the rows and return them, leaving them to be relayed to
+        L{_ConnectedTxn.execSQL} via the L{ThreadHolder}.
+
+        The rules for possibly reconnecting automatically are: if this is the
+        very first statement being executed in this transaction, and an error
+        occurs in C{execute}, close the connection and try again.  We will
+        ignore any errors from C{close()} (or C{rollback()}) and log them during
+        this process.  This is OK because adbapi2 always enforces transaction
+        discipline: connections are never in autocommit mode, so if the first
+        statement in a transaction fails, nothing can have happened to the
+        database; as per the ADBAPI spec, a lost connection is a rolled-back
+        transaction.  In the cases where some databases fail to enforce
+        transaction atomicity (i.e. schema manipulations), re-executing the same
+        statement will result, at worst, in a spurious and harmless error (like
+        "table already exists"), not corruption.
+
+        @param sql: The SQL string to execute.
+
+        @type sql: C{str}
+
+        @param args: The bind parameters to pass to adbapi, if any.
+
+        @type args: C{list} or C{None}
+
+        @param raiseOnZeroRowCount: If specified, an exception to raise when no
+            rows are found.
+
+        @return: all the rows that resulted from execution of the given C{sql},
+            or C{None}, if the statement is one which does not produce results.
+
+        @rtype: C{list} of C{tuple}, or C{NoneType}
+
+        @raise Exception: this function may raise any exception raised by the
+            underlying C{dbapi.connect}, C{cursor.execute},
+            L{IDerivedParameter.preQuery}, C{connection.cursor}, or
+            C{cursor.fetchall}.
+
+        @raise raiseOnZeroRowCount: if the argument was specified and no rows
+            were returned by the executed statement.
+        """
         wasFirst = self._first
+        # If this is the first time this cursor has been used in this
+        # transaction, remember that, but mark it as now used.
         self._first = False
         if args is None:
             args = []
-        derived = None
-        for n, arg in enumerate(args):
-            if IDerivedParameter.providedBy(arg):
-                if derived is None:
-                    # Be sparing with extra allocations, as this usually isn't
-                    # needed, and we're doing a ton of extra work to support it.
-                    derived = []
-                derived.append(arg)
-                args[n] = arg.preQuery(self._cursor)
+        # Note: as of this writing, derived parameters are only used to support
+        # cx_Oracle's "host variable" feature (i.e. cursor.var()), and creating
+        # a host variable will never be a connection-oriented error (a
+        # disconnected cursor can happily create variables of all types).
+        # However, this may need to move into the 'try' below if other database
+        # features need to compute database arguments based on runtime state.
+        derived = _deriveParameters(self._cursor, args)
         try:
             self._cursor.execute(sql, args)
         except:
+            # If execute() raised an exception, and this was the first thing to
+            # happen in the transaction, then the connection has probably gone
+            # bad in the meanwhile, and we should try again.
             if wasFirst:
-                self._connection.close()
+                # Report the error before doing anything else, since doing other
+                # things may cause the traceback stack to be eliminated if they
+                # raise exceptions (even internally).
+                log.err(
+                    Failure(),
+                    "Exception from execute() on first statement in "
+                    "transaction.  Possibly caused by a database server "
+                    "restart.  Automatically reconnecting now."
+                )
+                try:
+                    self._connection.close()
+                except:
+                    # close() may raise an exception to alert us of an error as
+                    # well.  Right now the only type of error we know about is
+                    # "the connection is already closed", which obviously
+                    # doesn't need to be handled specially. Unfortunately the
+                    # reporting of this type of error is not consistent or
+                    # predictable across different databases, or even different
+                    # bindings to the same database, so we have to do a
+                    # catch-all here.  While I can't imagine another type of
+                    # error at the moment, bare 'except:'s are notorious for
+                    # making debugging surprising error conditions very
+                    # difficult, so let's make sure that the error is logged
+                    # just in case.
+                    log.err(
+                        Failure(),
+                        "Exception from close() while automatically "
+                        "reconnecting. (Probably not serious.)"
+                    )
+
+                # Now, if either of *these* things fail, there's an error here
+                # that we cannot workaround or address automatically, so no
+                # try:except: for them.
                 self._connection = self._pool.connectionFactory()
                 self._cursor     = self._connection.cursor()
+
+                # Note that although this method is being invoked recursively,
+                # the '_first' flag is re-set at the very top, so we will _not_
+                # be re-entering it more than once.
                 result = self._reallyExecSQL(sql, args, raiseOnZeroRowCount)
                 return result
             else:
                 raise
         if derived is not None:
-            for arg in derived:
-                arg.postQuery(self._cursor)
+            _deriveQueryEnded(self._cursor, derived)
         if raiseOnZeroRowCount is not None and self._cursor.rowcount == 0:
             raise raiseOnZeroRowCount()
         if self._cursor.description:

Modified: CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py	2011-09-28 22:52:10 UTC (rev 8132)
+++ CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py	2011-09-29 04:26:08 UTC (rev 8133)
@@ -63,7 +63,10 @@
         self.parent = parent
         self.parent.children.append(self)
 
+
     def close(self):
+        if self.parent._closeFailQueue:
+            raise self.parent._closeFailQueue.pop(0)
         self.closed = True
 
 
@@ -75,9 +78,19 @@
 
     def __init__(self):
         self.children = []
+        self._closeFailQueue = []
 
 
+    def childCloseWillFail(self, exception):
+        """
+        Closing children of this object will result in the given exception.
 
+        @see: L{ConnectionFactory}
+        """
+        self._closeFailQueue.append(exception)
+
+
+
 class FakeConnection(Parent, Child):
     """
     Fake Stand-in for DB-API 2.0 connection.
@@ -867,7 +880,11 @@
         txn = self.pool.connection()
         self.assertEquals(len(self.factory.connections), 1,
                           "Sanity check failed.")
-        self.factory.connections[0].executeWillFail(RuntimeError)
+        class CustomExecuteFailed(Exception):
+            """
+            Custom 'execute-failed' exception.
+            """
+        self.factory.connections[0].executeWillFail(CustomExecuteFailed)
         results = resultOf(txn.execSQL("hello, world!"))
         [[[counter, echo]]] = results
         self.assertEquals("hello, world!", echo)
@@ -881,8 +898,13 @@
         self.assertEquals(self.factory.connections[0].closed, True)
         self.assertEquals(self.factory.connections[1].closed, False)
 
+        # Nevertheless, since there is currently no classification of 'safe'
+        # errors, we should probably log these messages when they occur.
+        self.assertEquals(len(self.flushLoggedErrors(CustomExecuteFailed)), 1)
 
-    def test_reConnectWhenFirstExecOnExistingConnectionFails(self):
+
+    def test_reConnectWhenFirstExecOnExistingConnectionFails(
+            self, moreFailureSetup=lambda factory: None):
         """
         Another situation that might arise is that a connection will be
         successfully connected, executed and recycled into the connection pool;
@@ -890,6 +912,7 @@
         but we will be none the wiser until we try to use them.
         """
         txn = self.pool.connection()
+        moreFailureSetup(self.factory)
         self.assertEquals(len(self.factory.connections), 1,
                           "Sanity check failed.")
         results = resultOf(txn.execSQL("hello, world!"))
@@ -899,13 +922,42 @@
         txn2 = self.pool.connection()
         self.assertEquals(len(self.factory.connections), 1,
                           "Sanity check failed.")
-        self.factory.connections[0].executeWillFail(RuntimeError)
+        class CustomExecFail(Exception):
+            """
+            Custom 'execute()' failure.
+            """
+        self.factory.connections[0].executeWillFail(CustomExecFail)
         results = resultOf(txn2.execSQL("second try!"))
         txn2.commit()
         [[[counter, echo]]] = results
         self.assertEquals("second try!", echo)
+        self.assertEquals(len(self.flushLoggedErrors(CustomExecFail)), 1)
 
 
+    def test_closeExceptionDoesntHinderReconnection(self):
+        """
+        In some database bindings, if the server closes the connection,
+        C{close()} will fail.  If C{close} fails, there's not much that could
+        mean except that the connection is already closed, so similar to the
+        condition described in
+        L{test_reConnectWhenFirstExecOnExistingConnectionFails}, the
+        failure should be logged, but transparent to application code.
+        """
+        class BindingSpecificException(Exception):
+            """
+            Exception that's a placeholder for something that a database binding
+            might raise.
+            """
+        def alsoFailClose(factory):
+            factory.childCloseWillFail(BindingSpecificException())
+        t = self.test_reConnectWhenFirstExecOnExistingConnectionFails(
+            alsoFailClose
+        )
+        errors = self.flushLoggedErrors(BindingSpecificException)
+        self.assertEquals(len(errors), 1)
+        return t
+
+
     def test_noOpCommitDoesntHinderReconnection(self):
         """
         Until you've executed a query or performed a statement on an ADBAPI
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110928/5f6b97ce/attachment-0001.html>


More information about the calendarserver-changes mailing list