[CalendarServer-changes] [11693] CalendarServer/branches/users/glyph/log-cleanups/twext/enterprise/ adbapi2.py
source_changes at macosforge.org
source_changes at macosforge.org
Tue Sep 17 16:02:13 PDT 2013
Revision: 11693
http://trac.calendarserver.org//changeset/11693
Author: glyph at apple.com
Date: 2013-09-17 16:02:13 -0700 (Tue, 17 Sep 2013)
Log Message:
-----------
Coding standard fixes.
Modified Paths:
--------------
CalendarServer/branches/users/glyph/log-cleanups/twext/enterprise/adbapi2.py
Modified: CalendarServer/branches/users/glyph/log-cleanups/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/log-cleanups/twext/enterprise/adbapi2.py 2013-09-17 23:02:12 UTC (rev 11692)
+++ CalendarServer/branches/users/glyph/log-cleanups/twext/enterprise/adbapi2.py 2013-09-17 23:02:13 UTC (rev 11693)
@@ -18,10 +18,10 @@
"""
Asynchronous multi-process connection pool.
-This is similar to L{twisted.enterprise.adbapi}, but can hold a transaction (and
-thereby a thread) open across multiple asynchronous operations, rather than
-forcing the transaction to be completed entirely in a thread and/or entirely in
-a single SQL statement.
+This is similar to L{twisted.enterprise.adbapi}, but can hold a transaction
+(and thereby a thread) open across multiple asynchronous operations, rather
+than forcing the transaction to be completed entirely in a thread and/or
+entirely in a single SQL statement.
Also, this module includes an AMP protocol for multiplexing connections through
a single choke-point host. This is not currently in use, however, as AMP needs
@@ -118,6 +118,7 @@
return derived
+
def _deriveQueryEnded(cursor, derived):
"""
A query which involved some L{IDerivedParameter}s just ended. Execute any
@@ -142,6 +143,8 @@
"""
implements(IAsyncTransaction)
+ noisy = False
+
def __init__(self, pool, threadHolder, connection, cursor):
self._pool = pool
self._completed = "idle"
@@ -169,33 +172,31 @@
"""
Execute the given SQL on a thread, using a DB-API 2.0 cursor.
- This method is invoked internally on a non-reactor thread, one dedicated
- to and associated with the current cursor. It executes the given SQL,
- re-connecting first if necessary, re-cycling the old connection if
- necessary, and then, if there are results from the statement (as
- determined by the DB-API 2.0 'description' attribute) it will fetch all
- the rows and return them, leaving them to be relayed to
+ This method is invoked internally on a non-reactor thread, one
+ dedicated to and associated with the current cursor. It executes the
+ given SQL, re-connecting first if necessary, re-cycling the old
+ connection if necessary, and then, if there are results from the
+ statement (as determined by the DB-API 2.0 'description' attribute) it
+ will fetch all the rows and return them, leaving them to be relayed to
L{_ConnectedTxn.execSQL} via the L{ThreadHolder}.
The rules for possibly reconnecting automatically are: if this is the
very first statement being executed in this transaction, and an error
occurs in C{execute}, close the connection and try again. We will
- ignore any errors from C{close()} (or C{rollback()}) and log them during
- this process. This is OK because adbapi2 always enforces transaction
- discipline: connections are never in autocommit mode, so if the first
- statement in a transaction fails, nothing can have happened to the
- database; as per the ADBAPI spec, a lost connection is a rolled-back
- transaction. In the cases where some databases fail to enforce
- transaction atomicity (i.e. schema manipulations), re-executing the same
- statement will result, at worst, in a spurious and harmless error (like
- "table already exists"), not corruption.
+ ignore any errors from C{close()} (or C{rollback()}) and log them
+ during this process. This is OK because adbapi2 always enforces
+ transaction discipline: connections are never in autocommit mode, so if
+ the first statement in a transaction fails, nothing can have happened
+ to the database; as per the ADBAPI spec, a lost connection is a
+ rolled-back transaction. In the cases where some databases fail to
+ enforce transaction atomicity (i.e. schema manipulations),
+ re-executing the same statement will result, at worst, in a spurious
+ and harmless error (like "table already exists"), not corruption.
@param sql: The SQL string to execute.
-
@type sql: C{str}
@param args: The bind parameters to pass to adbapi, if any.
-
@type args: C{list} or C{None}
@param raiseOnZeroRowCount: If specified, an exception to raise when no
@@ -203,7 +204,6 @@
@return: all the rows that resulted from execution of the given C{sql},
or C{None}, if the statement is one which does not produce results.
-
@rtype: C{list} of C{tuple}, or C{NoneType}
@raise Exception: this function may raise any exception raised by the
@@ -234,9 +234,9 @@
# happen in the transaction, then the connection has probably gone
# bad in the meanwhile, and we should try again.
if wasFirst:
- # Report the error before doing anything else, since doing other
- # things may cause the traceback stack to be eliminated if they
- # raise exceptions (even internally).
+ # Report the error before doing anything else, since doing
+ # other things may cause the traceback stack to be eliminated
+ # if they raise exceptions (even internally).
log.err(
Failure(),
"Exception from execute() on first statement in "
@@ -292,11 +292,9 @@
return None
- noisy = False
-
def execSQL(self, *args, **kw):
result = self._holder.submit(
- lambda : self._reallyExecSQL(*args, **kw)
+ lambda: self._reallyExecSQL(*args, **kw)
)
if self.noisy:
def reportResult(results):
@@ -305,7 +303,7 @@
"SQL: %r %r" % (args, kw),
"Results: %r" % (results,),
"",
- ]))
+ ]))
return results
result.addBoth(reportResult)
return result
@@ -328,8 +326,8 @@
self._completed = "ended"
def reallySomething():
"""
- Do the database work and set appropriate flags. Executed in the
- cursor thread.
+ Do the database work and set appropriate flags. Executed in
+ the cursor thread.
"""
if self._cursor is None or self._first:
return
@@ -384,8 +382,8 @@
class _NoTxn(object):
"""
An L{IAsyncTransaction} that indicates a local failure before we could even
- communicate any statements (or possibly even any connection attempts) to the
- server.
+ communicate any statements (or possibly even any connection attempts) to
+ the server.
"""
implements(IAsyncTransaction)
@@ -401,7 +399,6 @@
"""
return fail(ConnectionError(self.reason))
-
execSQL = _everything
commit = _everything
abort = _everything
@@ -411,9 +408,9 @@
class _WaitingTxn(object):
"""
A L{_WaitingTxn} is an implementation of L{IAsyncTransaction} which cannot
- yet actually execute anything, so it waits and spools SQL requests for later
- execution. When a L{_ConnectedTxn} becomes available later, it can be
- unspooled onto that.
+ yet actually execute anything, so it waits and spools SQL requests for
+ later execution. When a L{_ConnectedTxn} becomes available later, it can
+ be unspooled onto that.
"""
implements(IAsyncTransaction)
@@ -639,9 +636,9 @@
d = self._currentBlock._startExecuting()
d.addCallback(self._finishExecuting)
elif self._blockedQueue is not None:
- # If there aren't any pending blocks any more, and there are spooled
- # statements that aren't part of a block, unspool all the statements
- # that have been held up until this point.
+ # If there aren't any pending blocks any more, and there are
+ # spooled statements that aren't part of a block, unspool all the
+ # statements that have been held up until this point.
bq = self._blockedQueue
self._blockedQueue = None
bq._unspool(self)
@@ -649,8 +646,8 @@
def _finishExecuting(self, result):
"""
- The active block just finished executing. Clear it and see if there are
- more blocks to execute, or if all the blocks are done and we should
+ The active block just finished executing. Clear it and see if there
+ are more blocks to execute, or if all the blocks are done and we should
execute any queued free statements.
"""
self._currentBlock = None
@@ -659,8 +656,9 @@
def commit(self):
if self._blockedQueue is not None:
- # We're in the process of executing a block of commands. Wait until
- # they're done. (Commit will be repeated in _checkNextBlock.)
+ # We're in the process of executing a block of commands. Wait
+ # until they're done. (Commit will be repeated in
+ # _checkNextBlock.)
return self._blockedQueue.commit()
def reallyCommit():
self._markComplete()
@@ -785,9 +783,9 @@
@param raiseOnZeroRowCount: see L{IAsyncTransaction.execSQL}
- @param track: an internal parameter; was this called by application code
- or as part of unspooling some previously-queued requests? True if
- application code, False if unspooling.
+ @param track: an internal parameter; was this called by application
+ code or as part of unspooling some previously-queued requests?
+ True if application code, False if unspooling.
"""
if track and self._ended:
raise AlreadyFinishedError()
@@ -970,8 +968,8 @@
super(ConnectionPool, self).stopService()
self._stopping = True
- # Phase 1: Cancel any transactions that are waiting so they won't try to
- # eagerly acquire new connections as they flow into the free-list.
+ # Phase 1: Cancel any transactions that are waiting so they won't try
+ # to eagerly acquire new connections as they flow into the free-list.
while self._waiting:
waiting = self._waiting[0]
waiting._stopWaiting()
@@ -991,10 +989,10 @@
# ThreadHolders.
while self._free:
# Releasing a L{_ConnectedTxn} doesn't automatically recycle it /
- # remove it the way aborting a _SingleTxn does, so we need to .pop()
- # here. L{_ConnectedTxn.stop} really shouldn't be able to fail, as
- # it's just stopping the thread, and the holder's stop() is
- # independently submitted from .abort() / .close().
+ # remove it the way aborting a _SingleTxn does, so we need to
+ # .pop() here. L{_ConnectedTxn.stop} really shouldn't be able to
+ # fail, as it's just stopping the thread, and the holder's stop()
+ # is independently submitted from .abort() / .close().
yield self._free.pop()._releaseConnection()
tp = self.reactor.getThreadPool()
@@ -1011,8 +1009,8 @@
def connection(self, label="<unlabeled>"):
"""
Find and immediately return an L{IAsyncTransaction} object. Execution
- of statements, commit and abort on that transaction may be delayed until
- a real underlying database connection is available.
+ of statements, commit and abort on that transaction may be delayed
+ until a real underlying database connection is available.
@return: an L{IAsyncTransaction}
"""
@@ -1158,6 +1156,7 @@
def toString(self, inObject):
return dumps(inObject)
+
def fromString(self, inString):
return loads(inString)
@@ -1193,8 +1192,7 @@
if f.type in command.errors:
returnValue(f)
else:
- log.err(Failure(),
- "shared database connection pool encountered error")
+ log.err(Failure(), "shared database connection pool error")
raise FailsafeException()
else:
returnValue(val)
@@ -1286,6 +1284,7 @@
"""
+
class ConnectionPoolConnection(AMP):
"""
A L{ConnectionPoolConnection} is a single connection to a
@@ -1402,7 +1401,8 @@
A client which can execute SQL.
"""
- def __init__(self, dialect=POSTGRES_DIALECT, paramstyle=DEFAULT_PARAM_STYLE):
+ def __init__(self, dialect=POSTGRES_DIALECT,
+ paramstyle=DEFAULT_PARAM_STYLE):
# See DEFAULT_PARAM_STYLE FIXME above.
super(ConnectionPoolClient, self).__init__()
self._nextID = count().next
@@ -1428,8 +1428,8 @@
"""
Create a new networked provider of L{IAsyncTransaction}.
- (This will ultimately call L{ConnectionPool.connection} on the other end
- of the wire.)
+ (This will ultimately call L{ConnectionPool.connection} on the other
+ end of the wire.)
@rtype: L{IAsyncTransaction}
"""
@@ -1478,12 +1478,12 @@
@param derived: either C{None} or a C{list} of L{IDerivedParameter}
providers initially passed into the C{execSQL} that started this
query. The values of these object swill mutate the original input
- parameters to resemble them. Although L{IDerivedParameter.preQuery}
- and L{IDerivedParameter.postQuery} are invoked on the other end of
- the wire, the local objects will be made to appear as though they
- were called here.
+ parameters to resemble them. Although
+ L{IDerivedParameter.preQuery} and L{IDerivedParameter.postQuery}
+ are invoked on the other end of the wire, the local objects will be
+ made to appear as though they were called here.
- @param noneResult: should the result of the query be C{None} (i.e. did
+ @param noneResult: should the result of the query be C{None} (i.e. did
it not have a C{description} on the cursor).
"""
if noneResult and not self.results:
@@ -1492,8 +1492,8 @@
results = self.results
if derived is not None:
# 1) Bleecchh.
- # 2) FIXME: add some direct tests in test_adbapi2, the unit test for
- # this crosses some abstraction boundaries so it's a little
+ # 2) FIXME: add some direct tests in test_adbapi2, the unit test
+ # for this crosses some abstraction boundaries so it's a little
# integration-y and in the tests for twext.enterprise.dal
for remote, local in zip(derived, self._deriveDerived()):
local.__dict__ = remote.__dict__
@@ -1519,8 +1519,8 @@
class _NetTransaction(_CommitAndAbortHooks):
"""
A L{_NetTransaction} is an L{AMP}-protocol-based provider of the
- L{IAsyncTransaction} interface. It sends SQL statements, query results, and
- commit/abort commands via an AMP socket to a pooling process.
+ L{IAsyncTransaction} interface. It sends SQL statements, query results,
+ and commit/abort commands via an AMP socket to a pooling process.
"""
implements(IAsyncTransaction)
@@ -1562,7 +1562,8 @@
args = []
client = self._client
queryID = str(client._nextID())
- query = client._queries[queryID] = _Query(sql, raiseOnZeroRowCount, args)
+ query = client._queries[queryID] = _Query(sql, raiseOnZeroRowCount,
+ args)
result = (
client.callRemote(
ExecSQL, queryID=queryID, sql=sql, args=args,
@@ -1617,6 +1618,7 @@
self.abort().addErrback(shush)
+
class _NetCommandBlock(object):
"""
Net command block.
@@ -1650,10 +1652,10 @@
"""
Execute some SQL on this command block.
"""
- if (self._ended or
- self._transaction._completed and
- not self._transaction._committing or
- self._transaction._committed):
+ if (
+ self._ended or self._transaction._completed and
+ not self._transaction._committing or self._transaction._committed
+ ):
raise AlreadyFinishedError()
return self._transaction.execSQL(sql, args, raiseOnZeroRowCount,
self._blockID)
@@ -1670,4 +1672,3 @@
EndBlock, blockID=self._blockID,
transactionID=self._transaction._transactionID
)
-
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130917/c45fe425/attachment-0001.html>
More information about the calendarserver-changes
mailing list