[CalendarServer-changes] [10291] CalendarServer/trunk/twext/enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Fri Jan 4 18:40:37 PST 2013
Revision: 10291
http://trac.calendarserver.org//changeset/10291
Author: glyph at apple.com
Date: 2013-01-04 18:40:37 -0800 (Fri, 04 Jan 2013)
Log Message:
-----------
Abort transactions when they're garbage collected.
Also improve error-handling behavior and reduce log spew of shared connection
pool a bit, by dealing with known exceptions when possible.
Modified Paths:
--------------
CalendarServer/trunk/twext/enterprise/adbapi2.py
CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
Modified: CalendarServer/trunk/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/adbapi2.py 2013-01-05 00:51:01 UTC (rev 10290)
+++ CalendarServer/trunk/twext/enterprise/adbapi2.py 2013-01-05 02:40:37 UTC (rev 10291)
@@ -30,6 +30,7 @@
"""
import sys
+import weakref
from cStringIO import StringIO
from cPickle import dumps, loads
@@ -143,7 +144,7 @@
def __init__(self, pool, threadHolder, connection, cursor):
self._pool = pool
- self._completed = True
+ self._completed = "idle"
self._cursor = cursor
self._connection = connection
self._holder = threadHolder
@@ -324,7 +325,7 @@
committed or aborted.
"""
if not self._completed:
- self._completed = True
+ self._completed = "ended"
def reallySomething():
"""
Do the database work and set appropriate flags. Executed in the
@@ -338,7 +339,7 @@
self._pool._repoolAfter(self, result)
return result
else:
- raise AlreadyFinishedError()
+ raise AlreadyFinishedError(self._completed)
def commit(self):
@@ -349,11 +350,6 @@
return self._end(self._connection.rollback).addErrback(log.err)
- def __del__(self):
- if not self._completed:
- self.abort()
-
-
def reset(self):
"""
Call this when placing this transaction back into the pool.
@@ -371,7 +367,7 @@
Release the thread and database connection associated with this
transaction.
"""
- self._completed = True
+ self._completed = "released"
self._stopped = True
holder = self._holder
self._holder = None
@@ -393,16 +389,17 @@
"""
implements(IAsyncTransaction)
- def __init__(self, pool):
+ def __init__(self, pool, reason):
self.paramstyle = pool.paramstyle
self.dialect = pool.dialect
+ self.reason = reason
def _everything(self, *a, **kw):
"""
Everything fails with a L{ConnectionError}.
"""
- return fail(ConnectionError())
+ return fail(ConnectionError(self.reason))
execSQL = _everything
@@ -685,7 +682,10 @@
Stop waiting for a free transaction and fail.
"""
self._pool._waiting.remove(self)
- self._unspoolOnto(_NoTxn(self._pool))
+ self._completed = True
+ self._unspoolOnto(_NoTxn(self._pool,
+ "connection pool shut down while txn "
+ "waiting for database connection."))
def _checkComplete(self):
@@ -718,7 +718,20 @@
return block
+ def __del__(self):
+ """
+ When garbage collected, a L{_SingleTxn} recycles itself.
+ """
+ try:
+ if not self._completed:
+ self.abort()
+ except AlreadyFinishedError:
+ # The underlying transaction might already be completed without us
+ # knowing; for example if the service shuts down.
+ pass
+
+
class _Unspooler(object):
def __init__(self, orig):
self.orig = orig
@@ -1006,7 +1019,7 @@
if self._stopping:
# FIXME: should be wrapping a _SingleTxn around this to get
# .commandBlock()
- return _NoTxn(self)
+ return _NoTxn(self, "txn created while DB pool shutting down")
if self._free:
basetxn = self._free.pop(0)
self._busy.append(basetxn)
@@ -1157,8 +1170,10 @@
-quashErrors = {
- FailsafeException: "SOMETHING_UNKNOWN"
+_quashErrors = {
+ FailsafeException: "SOMETHING_UNKNOWN",
+ AlreadyFinishedError: "ALREADY_FINISHED",
+ ConnectionError: "CONNECTION_ERROR",
}
@@ -1174,12 +1189,13 @@
try:
val = yield inner(*a, **k)
except:
- # FIXME: if this were a general thing, it should probably allow
- # known errors through; look at the command's 'errors' attribute
- # before collapsing into FailsafeException.
- log.err(Failure(),
- "shared database connection pool encountered error")
- raise FailsafeException()
+ f = Failure()
+ if f.type in command.errors:
+ returnValue(f)
+ else:
+ log.err(Failure(),
+ "shared database connection pool encountered error")
+ raise FailsafeException()
else:
returnValue(val)
return command.responder(innerinner)
@@ -1192,7 +1208,7 @@
Start a transaction, identified with an ID generated by the client.
"""
arguments = txnarg()
- errors = quashErrors
+ errors = _quashErrors
@@ -1205,7 +1221,7 @@
('args', Pickle()),
('blockID', String()),
('reportZeroRowCount', Boolean())] + txnarg()
- errors = quashErrors
+ errors = _quashErrors
@@ -1214,7 +1230,7 @@
Create a new SQL command block.
"""
arguments = [("blockID", String())] + txnarg()
- errors = quashErrors
+ errors = _quashErrors
@@ -1223,7 +1239,7 @@
Create a new SQL command block.
"""
arguments = [("blockID", String())] + txnarg()
- errors = quashErrors
+ errors = _quashErrors
@@ -1235,7 +1251,7 @@
arguments = [('queryID', String()),
('row', Pickle())]
- errors = quashErrors
+ errors = _quashErrors
@@ -1248,19 +1264,19 @@
('norows', Boolean()),
('derived', Pickle()),
('noneResult', Boolean())]
- errors = quashErrors
+ errors = _quashErrors
class Commit(Command):
arguments = txnarg()
- errors = quashErrors
+ errors = _quashErrors
class Abort(Command):
arguments = txnarg()
- errors = quashErrors
+ errors = _quashErrors
@@ -1390,7 +1406,7 @@
# See DEFAULT_PARAM_STYLE FIXME above.
super(ConnectionPoolClient, self).__init__()
self._nextID = count().next
- self._txns = {}
+ self._txns = weakref.WeakValueDictionary()
self._queries = {}
self.dialect = dialect
self.paramstyle = paramstyle
@@ -1591,7 +1607,16 @@
return _NetCommandBlock(self, blockID)
+ def __del__(self):
+ """
+ When a L{_NetTransaction} is garabage collected, it aborts itself.
+ """
+ if not self._completed:
+ def shush(f):
+ f.trap(ConnectionError, AlreadyFinishedError)
+ self.abort().addErrback(shush)
+
class _NetCommandBlock(object):
"""
Net command block.
Modified: CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py 2013-01-05 00:51:01 UTC (rev 10290)
+++ CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py 2013-01-05 02:40:37 UTC (rev 10291)
@@ -47,6 +47,7 @@
from twext.enterprise.adbapi2 import DEFAULT_PARAM_STYLE
from twext.enterprise.adbapi2 import ConnectionPool
from twext.internet.threadutils import ThreadHolder
+from twext.enterprise.adbapi2 import Commit
def resultOf(deferred, propagate=False):
@@ -151,6 +152,8 @@
Child.__init__(self, factory)
self.id = factory.idcounter.next()
self._executeFailQueue = []
+ self._commitCount = 0
+ self._rollbackCount = 0
def executeWillFail(self, thunk):
@@ -172,12 +175,14 @@
def commit(self):
+ self._commitCount += 1
if self.parent.commitFail:
self.parent.commitFail = False
raise CommitFail()
def rollback(self):
+ self._rollbackCount += 1
if self.parent.rollbackFail:
self.parent.rollbackFail = False
raise RollbackFail()
@@ -883,9 +888,10 @@
executed) will result in all of its Deferreds immediately failing and
none of the queued statements being executed.
"""
+ active = []
# Use up the available connections ...
for i in xrange(self.pool.maxConnections):
- self.createTransaction()
+ active.append(self.createTransaction())
# ... so that this one has to be spooled.
spooled = self.createTransaction()
result = self.resultOf(spooled.execSQL("alpha"))
@@ -922,6 +928,25 @@
self.assertEquals(stopResult, [None])
+ def test_garbageCollectedTransactionAborts(self):
+ """
+ When an L{IAsyncTransaction} is garbage collected, it ought to abort
+ itself.
+ """
+ t = self.createTransaction()
+ self.resultOf(t.execSQL("echo", []))
+ import gc
+ conns = self.factory.connections
+ self.assertEquals(len(conns), 1)
+ self.assertEquals(conns[0]._rollbackCount, 0)
+ del t
+ gc.collect()
+ self.flushHolders()
+ self.assertEquals(len(conns), 1)
+ self.assertEquals(conns[0]._rollbackCount, 1)
+ self.assertEquals(conns[0]._commitCount, 0)
+
+
def test_tooManyConnectionsWhileOthersFinish(self):
"""
L{ConnectionPool.connection} will not spawn more than the maximum
@@ -1585,6 +1610,8 @@
them. Flush the locally logged error of the given type and return
L{UnknownRemoteError}.
"""
+ if err in Commit.errors:
+ return err
self.flushLoggedErrors(err)
return FailsafeException
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130104/156befc3/attachment-0001.html>
More information about the calendarserver-changes
mailing list