[CalendarServer-changes] [11693] CalendarServer/branches/users/glyph/log-cleanups/twext/enterprise/ adbapi2.py

source_changes at macosforge.org source_changes at macosforge.org
Tue Sep 17 16:02:13 PDT 2013


Revision: 11693
          http://trac.calendarserver.org//changeset/11693
Author:   glyph at apple.com
Date:     2013-09-17 16:02:13 -0700 (Tue, 17 Sep 2013)
Log Message:
-----------
Coding standard fixes.

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/log-cleanups/twext/enterprise/adbapi2.py

Modified: CalendarServer/branches/users/glyph/log-cleanups/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/log-cleanups/twext/enterprise/adbapi2.py	2013-09-17 23:02:12 UTC (rev 11692)
+++ CalendarServer/branches/users/glyph/log-cleanups/twext/enterprise/adbapi2.py	2013-09-17 23:02:13 UTC (rev 11693)
@@ -18,10 +18,10 @@
 """
 Asynchronous multi-process connection pool.
 
-This is similar to L{twisted.enterprise.adbapi}, but can hold a transaction (and
-thereby a thread) open across multiple asynchronous operations, rather than
-forcing the transaction to be completed entirely in a thread and/or entirely in
-a single SQL statement.
+This is similar to L{twisted.enterprise.adbapi}, but can hold a transaction
+(and thereby a thread) open across multiple asynchronous operations, rather
+than forcing the transaction to be completed entirely in a thread and/or
+entirely in a single SQL statement.
 
 Also, this module includes an AMP protocol for multiplexing connections through
 a single choke-point host.  This is not currently in use, however, as AMP needs
@@ -118,6 +118,7 @@
     return derived
 
 
+
 def _deriveQueryEnded(cursor, derived):
     """
     A query which involved some L{IDerivedParameter}s just ended.  Execute any
@@ -142,6 +143,8 @@
     """
     implements(IAsyncTransaction)
 
+    noisy = False
+
     def __init__(self, pool, threadHolder, connection, cursor):
         self._pool       = pool
         self._completed  = "idle"
@@ -169,33 +172,31 @@
         """
         Execute the given SQL on a thread, using a DB-API 2.0 cursor.
 
-        This method is invoked internally on a non-reactor thread, one dedicated
-        to and associated with the current cursor.  It executes the given SQL,
-        re-connecting first if necessary, re-cycling the old connection if
-        necessary, and then, if there are results from the statement (as
-        determined by the DB-API 2.0 'description' attribute) it will fetch all
-        the rows and return them, leaving them to be relayed to
+        This method is invoked internally on a non-reactor thread, one
+        dedicated to and associated with the current cursor.  It executes the
+        given SQL, re-connecting first if necessary, re-cycling the old
+        connection if necessary, and then, if there are results from the
+        statement (as determined by the DB-API 2.0 'description' attribute) it
+        will fetch all the rows and return them, leaving them to be relayed to
         L{_ConnectedTxn.execSQL} via the L{ThreadHolder}.
 
         The rules for possibly reconnecting automatically are: if this is the
         very first statement being executed in this transaction, and an error
         occurs in C{execute}, close the connection and try again.  We will
-        ignore any errors from C{close()} (or C{rollback()}) and log them during
-        this process.  This is OK because adbapi2 always enforces transaction
-        discipline: connections are never in autocommit mode, so if the first
-        statement in a transaction fails, nothing can have happened to the
-        database; as per the ADBAPI spec, a lost connection is a rolled-back
-        transaction.  In the cases where some databases fail to enforce
-        transaction atomicity (i.e. schema manipulations), re-executing the same
-        statement will result, at worst, in a spurious and harmless error (like
-        "table already exists"), not corruption.
+        ignore any errors from C{close()} (or C{rollback()}) and log them
+        during this process.  This is OK because adbapi2 always enforces
+        transaction discipline: connections are never in autocommit mode, so if
+        the first statement in a transaction fails, nothing can have happened
+        to the database; as per the ADBAPI spec, a lost connection is a
+        rolled-back transaction.  In the cases where some databases fail to
+        enforce transaction atomicity (i.e.  schema manipulations),
+        re-executing the same statement will result, at worst, in a spurious
+        and harmless error (like "table already exists"), not corruption.
 
         @param sql: The SQL string to execute.
-
         @type sql: C{str}
 
         @param args: The bind parameters to pass to adbapi, if any.
-
         @type args: C{list} or C{None}
 
         @param raiseOnZeroRowCount: If specified, an exception to raise when no
@@ -203,7 +204,6 @@
 
         @return: all the rows that resulted from execution of the given C{sql},
             or C{None}, if the statement is one which does not produce results.
-
         @rtype: C{list} of C{tuple}, or C{NoneType}
 
         @raise Exception: this function may raise any exception raised by the
@@ -234,9 +234,9 @@
             # happen in the transaction, then the connection has probably gone
             # bad in the meanwhile, and we should try again.
             if wasFirst:
-                # Report the error before doing anything else, since doing other
-                # things may cause the traceback stack to be eliminated if they
-                # raise exceptions (even internally).
+                # Report the error before doing anything else, since doing
+                # other things may cause the traceback stack to be eliminated
+                # if they raise exceptions (even internally).
                 log.err(
                     Failure(),
                     "Exception from execute() on first statement in "
@@ -292,11 +292,9 @@
             return None
 
 
-    noisy = False
-
     def execSQL(self, *args, **kw):
         result = self._holder.submit(
-            lambda : self._reallyExecSQL(*args, **kw)
+            lambda: self._reallyExecSQL(*args, **kw)
         )
         if self.noisy:
             def reportResult(results):
@@ -305,7 +303,7 @@
                     "SQL: %r %r" % (args, kw),
                     "Results: %r" % (results,),
                     "",
-                    ]))
+                ]))
                 return results
             result.addBoth(reportResult)
         return result
@@ -328,8 +326,8 @@
             self._completed = "ended"
             def reallySomething():
                 """
-                Do the database work and set appropriate flags.  Executed in the
-                cursor thread.
+                Do the database work and set appropriate flags.  Executed in
+                the cursor thread.
                 """
                 if self._cursor is None or self._first:
                     return
@@ -384,8 +382,8 @@
 class _NoTxn(object):
     """
     An L{IAsyncTransaction} that indicates a local failure before we could even
-    communicate any statements (or possibly even any connection attempts) to the
-    server.
+    communicate any statements (or possibly even any connection attempts) to
+    the server.
     """
     implements(IAsyncTransaction)
 
@@ -401,7 +399,6 @@
         """
         return fail(ConnectionError(self.reason))
 
-
     execSQL = _everything
     commit  = _everything
     abort   = _everything
@@ -411,9 +408,9 @@
 class _WaitingTxn(object):
     """
     A L{_WaitingTxn} is an implementation of L{IAsyncTransaction} which cannot
-    yet actually execute anything, so it waits and spools SQL requests for later
-    execution.  When a L{_ConnectedTxn} becomes available later, it can be
-    unspooled onto that.
+    yet actually execute anything, so it waits and spools SQL requests for
+    later execution.  When a L{_ConnectedTxn} becomes available later, it can
+    be unspooled onto that.
     """
 
     implements(IAsyncTransaction)
@@ -639,9 +636,9 @@
             d = self._currentBlock._startExecuting()
             d.addCallback(self._finishExecuting)
         elif self._blockedQueue is not None:
-            # If there aren't any pending blocks any more, and there are spooled
-            # statements that aren't part of a block, unspool all the statements
-            # that have been held up until this point.
+            # If there aren't any pending blocks any more, and there are
+            # spooled statements that aren't part of a block, unspool all the
+            # statements that have been held up until this point.
             bq = self._blockedQueue
             self._blockedQueue = None
             bq._unspool(self)
@@ -649,8 +646,8 @@
 
     def _finishExecuting(self, result):
         """
-        The active block just finished executing.  Clear it and see if there are
-        more blocks to execute, or if all the blocks are done and we should
+        The active block just finished executing.  Clear it and see if there
+        are more blocks to execute, or if all the blocks are done and we should
         execute any queued free statements.
         """
         self._currentBlock = None
@@ -659,8 +656,9 @@
 
     def commit(self):
         if self._blockedQueue is not None:
-            # We're in the process of executing a block of commands.  Wait until
-            # they're done.  (Commit will be repeated in _checkNextBlock.)
+            # We're in the process of executing a block of commands.  Wait
+            # until they're done.  (Commit will be repeated in
+            # _checkNextBlock.)
             return self._blockedQueue.commit()
         def reallyCommit():
             self._markComplete()
@@ -785,9 +783,9 @@
 
         @param raiseOnZeroRowCount: see L{IAsyncTransaction.execSQL}
 
-        @param track: an internal parameter; was this called by application code
-            or as part of unspooling some previously-queued requests?  True if
-            application code, False if unspooling.
+        @param track: an internal parameter; was this called by application
+            code or as part of unspooling some previously-queued requests?
+            True if application code, False if unspooling.
         """
         if track and self._ended:
             raise AlreadyFinishedError()
@@ -970,8 +968,8 @@
         super(ConnectionPool, self).stopService()
         self._stopping = True
 
-        # Phase 1: Cancel any transactions that are waiting so they won't try to
-        # eagerly acquire new connections as they flow into the free-list.
+        # Phase 1: Cancel any transactions that are waiting so they won't try
+        # to eagerly acquire new connections as they flow into the free-list.
         while self._waiting:
             waiting = self._waiting[0]
             waiting._stopWaiting()
@@ -991,10 +989,10 @@
         # ThreadHolders.
         while self._free:
             # Releasing a L{_ConnectedTxn} doesn't automatically recycle it /
-            # remove it the way aborting a _SingleTxn does, so we need to .pop()
-            # here.  L{_ConnectedTxn.stop} really shouldn't be able to fail, as
-            # it's just stopping the thread, and the holder's stop() is
-            # independently submitted from .abort() / .close().
+            # remove it the way aborting a _SingleTxn does, so we need to
+            # .pop() here.  L{_ConnectedTxn.stop} really shouldn't be able to
+            # fail, as it's just stopping the thread, and the holder's stop()
+            # is independently submitted from .abort() / .close().
             yield self._free.pop()._releaseConnection()
 
         tp = self.reactor.getThreadPool()
@@ -1011,8 +1009,8 @@
     def connection(self, label="<unlabeled>"):
         """
         Find and immediately return an L{IAsyncTransaction} object.  Execution
-        of statements, commit and abort on that transaction may be delayed until
-        a real underlying database connection is available.
+        of statements, commit and abort on that transaction may be delayed
+        until a real underlying database connection is available.
 
         @return: an L{IAsyncTransaction}
         """
@@ -1158,6 +1156,7 @@
     def toString(self, inObject):
         return dumps(inObject)
 
+
     def fromString(self, inString):
         return loads(inString)
 
@@ -1193,8 +1192,7 @@
                 if f.type in command.errors:
                     returnValue(f)
                 else:
-                    log.err(Failure(),
-                            "shared database connection pool encountered error")
+                    log.err(Failure(), "shared database connection pool error")
                     raise FailsafeException()
             else:
                 returnValue(val)
@@ -1286,6 +1284,7 @@
     """
 
 
+
 class ConnectionPoolConnection(AMP):
     """
     A L{ConnectionPoolConnection} is a single connection to a
@@ -1402,7 +1401,8 @@
     A client which can execute SQL.
     """
 
-    def __init__(self, dialect=POSTGRES_DIALECT, paramstyle=DEFAULT_PARAM_STYLE):
+    def __init__(self, dialect=POSTGRES_DIALECT,
+                 paramstyle=DEFAULT_PARAM_STYLE):
         # See DEFAULT_PARAM_STYLE FIXME above.
         super(ConnectionPoolClient, self).__init__()
         self._nextID    = count().next
@@ -1428,8 +1428,8 @@
         """
         Create a new networked provider of L{IAsyncTransaction}.
 
-        (This will ultimately call L{ConnectionPool.connection} on the other end
-        of the wire.)
+        (This will ultimately call L{ConnectionPool.connection} on the other
+        end of the wire.)
 
         @rtype: L{IAsyncTransaction}
         """
@@ -1478,12 +1478,12 @@
         @param derived: either C{None} or a C{list} of L{IDerivedParameter}
             providers initially passed into the C{execSQL} that started this
             query.  The values of these object swill mutate the original input
-            parameters to resemble them.  Although L{IDerivedParameter.preQuery}
-            and L{IDerivedParameter.postQuery} are invoked on the other end of
-            the wire, the local objects will be made to appear as though they
-            were called here.
+            parameters to resemble them.  Although
+            L{IDerivedParameter.preQuery} and L{IDerivedParameter.postQuery}
+            are invoked on the other end of the wire, the local objects will be
+            made to appear as though they were called here.
 
-        @param noneResult: should the result of the query be C{None} (i.e. did
+        @param noneResult: should the result of the query be C{None} (i.e.  did
             it not have a C{description} on the cursor).
         """
         if noneResult and not self.results:
@@ -1492,8 +1492,8 @@
             results = self.results
         if derived is not None:
             # 1) Bleecchh.
-            # 2) FIXME: add some direct tests in test_adbapi2, the unit test for
-            # this crosses some abstraction boundaries so it's a little
+            # 2) FIXME: add some direct tests in test_adbapi2, the unit test
+            # for this crosses some abstraction boundaries so it's a little
             # integration-y and in the tests for twext.enterprise.dal
             for remote, local in zip(derived, self._deriveDerived()):
                 local.__dict__ = remote.__dict__
@@ -1519,8 +1519,8 @@
 class _NetTransaction(_CommitAndAbortHooks):
     """
     A L{_NetTransaction} is an L{AMP}-protocol-based provider of the
-    L{IAsyncTransaction} interface.  It sends SQL statements, query results, and
-    commit/abort commands via an AMP socket to a pooling process.
+    L{IAsyncTransaction} interface.  It sends SQL statements, query results,
+    and commit/abort commands via an AMP socket to a pooling process.
     """
 
     implements(IAsyncTransaction)
@@ -1562,7 +1562,8 @@
             args = []
         client = self._client
         queryID = str(client._nextID())
-        query = client._queries[queryID] = _Query(sql, raiseOnZeroRowCount, args)
+        query = client._queries[queryID] = _Query(sql, raiseOnZeroRowCount,
+                                                  args)
         result = (
             client.callRemote(
                 ExecSQL, queryID=queryID, sql=sql, args=args,
@@ -1617,6 +1618,7 @@
             self.abort().addErrback(shush)
 
 
+
 class _NetCommandBlock(object):
     """
     Net command block.
@@ -1650,10 +1652,10 @@
         """
         Execute some SQL on this command block.
         """
-        if  (self._ended or
-             self._transaction._completed and
-             not self._transaction._committing or
-             self._transaction._committed):
+        if (
+            self._ended or self._transaction._completed and
+            not self._transaction._committing or self._transaction._committed
+        ):
             raise AlreadyFinishedError()
         return self._transaction.execSQL(sql, args, raiseOnZeroRowCount,
                                          self._blockID)
@@ -1670,4 +1672,3 @@
             EndBlock, blockID=self._blockID,
             transactionID=self._transaction._transactionID
         )
-
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130917/c45fe425/attachment-0001.html>


More information about the calendarserver-changes mailing list