[CalendarServer-changes] [6877] CalendarServer/trunk
source_changes at macosforge.org
source_changes at macosforge.org
Thu Feb 3 16:42:52 PST 2011
Revision: 6877
http://trac.macosforge.org/projects/calendarserver/changeset/6877
Author: glyph at apple.com
Date: 2011-02-03 16:42:52 -0800 (Thu, 03 Feb 2011)
Log Message:
-----------
Fix various issues with database connectivity and service shutdown.
Modified Paths:
--------------
CalendarServer/trunk/calendarserver/tap/caldav.py
CalendarServer/trunk/twext/enterprise/adbapi2.py
CalendarServer/trunk/twext/enterprise/ienterprise.py
CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
Property Changed:
----------------
CalendarServer/trunk/
Property changes on: CalendarServer/trunk
___________________________________________________________________
Modified: svn:mergeinfo
- /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/wsanchez/transations:5515-5593
+ /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/wsanchez/transations:5515-5593
Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py 2011-02-03 23:42:00 UTC (rev 6876)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py 2011-02-04 00:42:52 UTC (rev 6877)
@@ -39,7 +39,7 @@
from twisted.python.usage import Options, UsageError
from twisted.python.reflect import namedClass
from twisted.plugin import IPlugin
-from twisted.internet.defer import gatherResults
+from twisted.internet.defer import gatherResults, Deferred
from twisted.internet import reactor as _reactor
from twisted.internet.reactor import addSystemEventTrigger
from twisted.internet.process import ProcessExitedAlready
@@ -1428,6 +1428,8 @@
"""
self.stopping = True
self.deferreds = {}
+ for name in self.processes:
+ self.deferreds[name] = Deferred()
super(DelayedStartupProcessMonitor, self).stopService()
# Cancel any outstanding restarts
@@ -1503,7 +1505,7 @@
self.startProcess,
name)
if self.stopping:
- deferred = self.deferreds.get(name, None)
+ deferred = self.deferreds.pop(name, None)
if deferred is not None:
deferred.callback(None)
Modified: CalendarServer/trunk/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/adbapi2.py 2011-02-03 23:42:00 UTC (rev 6876)
+++ CalendarServer/trunk/twext/enterprise/adbapi2.py 2011-02-04 00:42:52 UTC (rev 6877)
@@ -1,4 +1,4 @@
-# -*- test-case-name: twext.enterprise.test. -*-
+# -*- test-case-name: twext.enterprise.test.test_adbapi2 -*-
##
# Copyright (c) 2010 Apple Inc. All rights reserved.
#
@@ -50,50 +50,36 @@
from twisted.python.components import proxyForInterface
from twext.internet.threadutils import ThreadHolder
+from twisted.internet.defer import succeed
+from twext.enterprise.ienterprise import ConnectionError
+from twisted.internet.defer import fail
from twext.enterprise.ienterprise import AlreadyFinishedError, IAsyncTransaction
-# FIXME: there should be no default, it should be discovered dynamically
-# everywhere. Right now we're only using pgdb so we only support that.
+# FIXME: there should be no default for DEFAULT_PARAM_STYLE, it should be
+# discovered dynamically everywhere. Right now we're only using pgdb so we only
+# support that.
DEFAULT_PARAM_STYLE = 'pyformat'
-class BaseSqlTxn(object):
+class _ConnectedTxn(object):
"""
L{IAsyncTransaction} implementation based on a L{ThreadHolder} in the
current process.
"""
implements(IAsyncTransaction)
- # FIXME: this should *really* be
+ # See DEFAULT_PARAM_STYLE FIXME above.
paramstyle = DEFAULT_PARAM_STYLE
- def __init__(self, connectionFactory, reactor=_reactor):
- """
- @param connectionFactory: A 0-argument callable which returns a DB-API
- 2.0 connection.
- """
- self._completed = False
- self._cursor = None
- self._holder = ThreadHolder(reactor)
- self._holder.start()
+ def __init__(self, pool, threadHolder, connection, cursor):
+ self._pool = pool
+ self._completed = True
+ self._cursor = cursor
+ self._connection = connection
+ self._holder = threadHolder
- def initCursor():
- # support threadlevel=1; we can't necessarily cursor() in a
- # different thread than we do transactions in.
- # TODO: Re-try connect when it fails. Specify a timeout. That
- # should happen in this layer because we need to be able to stop
- # the reconnect attempt if it's hanging.
- self._connection = connectionFactory()
- self._cursor = self._connection.cursor()
-
- # Note: no locking necessary here; since this gets submitted first, all
- # subsequent submitted work-units will be in line behind it and the
- # cursor will already have been initialized.
- self._holder.submit(initCursor).addErrback(log.err)
-
-
def _reallyExecSQL(self, sql, args=None, raiseOnZeroRowCount=None):
if args is None:
args = []
@@ -125,31 +111,33 @@
return result
- def commit(self):
+ def _end(self, really):
+ """
+ Common logic for commit or abort.
+ """
if not self._completed:
self._completed = True
- def reallyCommit():
- self._connection.commit()
- result = self._holder.submit(reallyCommit)
+ def reallySomething():
+ if self._cursor is None:
+ return
+ really()
+ result = self._holder.submit(reallySomething)
+ self._pool._repoolAfter(self, result)
return result
else:
raise AlreadyFinishedError()
+ def commit(self):
+ return self._end(self._connection.commit)
+
+
def abort(self):
- if not self._completed:
- self._completed = True
- def reallyAbort():
- self._connection.rollback()
- result = self._holder.submit(reallyAbort)
- return result
- else:
- raise AlreadyFinishedError()
+ return self._end(self._connection.rollback)
def __del__(self):
if not self._completed:
- print 'BaseSqlTxn.__del__: OK'
self.abort()
@@ -165,33 +153,55 @@
self._completed = False
- def stop(self):
+ def _releaseConnection(self):
"""
Release the thread and database connection associated with this
transaction.
"""
self._completed = True
- self._stopped = True
- holder = self._holder
- self._holder = None
- holder.submit(self._connection.close)
+ self._stopped = True
+ holder = self._holder
+ self._holder = None
+
+ def _reallyClose():
+ if self._cursor is None:
+ return
+ self._connection.close()
+ holder.submit(_reallyClose)
return holder.stop()
+class _NoTxn(object):
+ """
+ An L{IAsyncTransaction} that indicates a local failure before we could even
+ communicate any statements (or possibly even any connection attempts) to the
+ server.
+ """
+ implements(IAsyncTransaction)
-class SpooledTxn(object):
+ def _everything(self, *a, **kw):
+ """
+ Everything fails with a L{ConnectionError}.
+ """
+ return fail(ConnectionError())
+
+ execSQL = _everything
+ commit = _everything
+ abort = _everything
+
+
+
+class _WaitingTxn(object):
"""
- A L{SpooledTxn} is an implementation of L{IAsyncTransaction} which cannot
- yet actually execute anything, so it spools SQL reqeusts for later
- execution. When a L{BaseSqlTxn} becomes available later, it can be
+ A L{_WaitingTxn} is an implementation of L{IAsyncTransaction} which cannot
+ yet actually execute anything, so it waits and spools SQL requests for later
+ execution. When a L{_ConnectedTxn} becomes available later, it can be
unspooled onto that.
"""
implements(IAsyncTransaction)
- # FIXME: this should be relayed from the connection factory of the thing
- # creating the spooled transaction.
-
+ # See DEFAULT_PARAM_STYLE FIXME above.
paramstyle = DEFAULT_PARAM_STYLE
def __init__(self):
@@ -242,17 +252,25 @@
def abort(self):
- return self._enspool('abort')
+ return succeed(None)
-class PooledSqlTxn(proxyForInterface(iface=IAsyncTransaction,
+class _SingleTxn(proxyForInterface(iface=IAsyncTransaction,
originalAttribute='_baseTxn')):
"""
- This is a temporary throw-away wrapper for the longer-lived BaseSqlTxn, so
- that if a badly-behaved API client accidentally hangs on to one of these
- and, for example C{.abort()}s it multiple times once another client is
- using that connection, it will get some harmless tracebacks.
+ A L{_SingleTxn} is a single-use wrapper for the longer-lived
+ L{_ConnectedTxn}, so that if a badly-behaved API client accidentally hangs
+ on to one of these and, for example C{.abort()}s it multiple times once
+ another client is using that connection, it will get some harmless
+ tracebacks.
+
+ It's a wrapper around a "real" implementation; either a L{_ConnectedTxn},
+ L{_NoTxn}, or L{_WaitingTxn} depending on the availability of real
+ underlying datbase connections.
+
+ This is the only L{IAsyncTransaction} implementation exposed to application
+ code.
"""
def __init__(self, pool, baseTxn):
@@ -261,21 +279,50 @@
self._complete = False
+ def __repr__(self):
+ """
+ Reveal the backend in the string representation.
+ """
+ return '_SingleTxn(%r)' % (self._baseTxn,)
+
+
+ def _unspoolOnto(self, baseTxn):
+ """
+ Replace my C{_baseTxn}, currently a L{_WaitingTxn}, with a new
+ implementation of L{IAsyncTransaction} that will actually do the work;
+ either a L{_ConnectedTxn} or a L{_NoTxn}.
+ """
+ spooledBase = self._baseTxn
+ self._baseTxn = baseTxn
+ spooledBase._unspool(baseTxn)
+
+
def execSQL(self, *a, **kw):
self._checkComplete()
- return super(PooledSqlTxn, self).execSQL(*a, **kw)
+ return super(_SingleTxn, self).execSQL(*a, **kw)
def commit(self):
self._markComplete()
- return self._repoolAfter(super(PooledSqlTxn, self).commit())
+ return super(_SingleTxn, self).commit()
def abort(self):
self._markComplete()
- return self._repoolAfter(super(PooledSqlTxn, self).abort())
+ if self in self._pool._waiting:
+ self._stopWaiting()
+ return succeed(None)
+ return super(_SingleTxn, self).abort()
+ def _stopWaiting(self):
+ """
+ Stop waiting for a free transaction and fail.
+ """
+ self._pool._waiting.remove(self)
+ self._unspoolOnto(_NoTxn())
+
+
def _checkComplete(self):
"""
If the transaction is complete, raise L{AlreadyFinishedError}
@@ -292,14 +339,43 @@
self._complete = True
- def _repoolAfter(self, d):
- def repool(result):
- self._pool.reclaim(self)
- return result
- return d.addCallback(repool)
+class _ConnectingPseudoTxn(object):
+ _retry = None
+ def __init__(self, pool, holder):
+ self._pool = pool
+ self._holder = holder
+
+
+ def abort(self):
+ if self._retry is not None:
+ self._retry.cancel()
+ d = self._holder.stop()
+ def removeme(ignored):
+ if self in self._pool._busy:
+ self._pool._busy.remove(self)
+ d.addCallback(removeme)
+ return d
+
+
+
+def _fork(x):
+ """
+ Produce a L{Deferred} that will fire when another L{Deferred} fires without
+ disturbing its results.
+ """
+ d = Deferred()
+ def fired(result):
+ d.callback(result)
+ return result
+
+ x.addBoth(fired)
+ return d
+
+
+
class ConnectionPool(Service, object):
"""
This is a central service that has a threadpool and executes SQL statements
@@ -313,19 +389,50 @@
than this many concurrent connections to the database.
@type maxConnections: C{int}
+
+ @ivar reactor: The reactor used for scheduling threads as well as retries
+ for failed connect() attempts.
+
+ @type reactor: L{IReactorTime} and L{IReactorThreads} provider.
+
+ @ivar _free: The list of free L{_ConnectedTxn} objects which are not
+ currently attached to a L{_SingleTxn} object, and have active
+ connections ready for processing a new transaction.
+
+ @ivar _busy: The list of busy L{_ConnectedTxn} objects; those currently
+ servicing an unfinished L{_SingleTxn} object.
+
+ @ivar _finishing: The list of 2-tuples of L{_ConnectedTxn} objects which
+ have had C{abort} or C{commit} called on them, but are not done
+ executing that method, and the L{Deferred} returned from that method
+ that will be fired when its execution has completed.
+
+ @ivar _waiting: The list of L{_SingleTxn} objects attached to a
+ L{_WaitingTxn}; i.e. those which are awaiting a connection to become
+ free so that they can be executed.
+
+ @ivar _stopping: Is this L{ConnectionPool} in the process of shutting down?
+ (If so, new connections will not be established.)
"""
reactor = _reactor
+ RETRY_TIMEOUT = 10.0
+
+
def __init__(self, connectionFactory, maxConnections=10):
+
super(ConnectionPool, self).__init__()
- self.free = []
- self.busy = []
- self.waiting = []
self.connectionFactory = connectionFactory
self.maxConnections = maxConnections
+ self._free = []
+ self._busy = []
+ self._waiting = []
+ self._finishing = []
+ self._stopping = False
+
def startService(self):
"""
No startup necessary.
@@ -335,66 +442,140 @@
@inlineCallbacks
def stopService(self):
"""
- Forcibly abort any outstanding transactions.
+ Forcibly abort any outstanding transactions, and release all resources
+ (notably, threads).
"""
- for busy in self.busy[:]:
+ self._stopping = True
+
+ # Phase 1: Cancel any transactions that are waiting so they won't try to
+ # eagerly acquire new connections as they flow into the free-list.
+ while self._waiting:
+ waiting = self._waiting[0]
+ waiting._stopWaiting()
+
+ # Phase 2: Wait for all the Deferreds from the L{_ConnectedTxn}s that
+ # have *already* been stopped.
+ while self._finishing:
+ yield _fork(self._finishing[0][1])
+
+ # Phase 3: All of the busy transactions must be aborted first. As each
+ # one is aborted, it will remove itself from the list.
+ while self._busy:
+ d = self._busy[0].abort()
try:
- yield busy.abort()
+ yield d
except:
log.err()
- # all transactions should now be in the free list, since 'abort()' will
- # have put them there.
- for free in self.free:
- yield free.stop()
+ # Phase 4: All transactions should now be in the free list, since
+ # 'abort()' will have put them there. Shut down all the associated
+ # ThreadHolders.
+ while self._free:
+ # Releasing a L{_ConnectedTxn} doesn't automatically recycle it /
+ # remove it the way aborting a _SingleTxn does, so we need to .pop()
+ # here. L{_ConnectedTxn.stop} really shouldn't be able to fail, as
+ # it's just stopping the thread, and the holder's stop() is
+ # independently submitted from .abort() / .close().
+ yield self._free.pop()._releaseConnection()
+
+ def _createHolder(self):
+ """
+ Create a L{ThreadHolder}. (Test hook.)
+ """
+ return ThreadHolder(self.reactor)
+
+
def connection(self, label="<unlabeled>"):
"""
- Find a transaction; either retrieve a free one from the list or
- allocate a new one if no free ones are available.
+ Find and immediately return an L{IAsyncTransaction} object. Execution
+ of statements, commit and abort on that transaction may be delayed until
+ a real underlying database connection is available.
@return: an L{IAsyncTransaction}
"""
-
- overload = False
- if self.free:
- basetxn = self.free.pop(0)
- elif len(self.busy) < self.maxConnections:
- basetxn = BaseSqlTxn(
- connectionFactory=self.connectionFactory,
- reactor=self.reactor
- )
+ if self._stopping:
+ return _NoTxn()
+ if self._free:
+ basetxn = self._free.pop(0)
+ self._busy.append(basetxn)
+ txn = _SingleTxn(self, basetxn)
else:
- basetxn = SpooledTxn()
- overload = True
- txn = PooledSqlTxn(self, basetxn)
- if overload:
- self.waiting.append(txn)
- else:
- self.busy.append(txn)
+ txn = _SingleTxn(self, _WaitingTxn())
+ self._waiting.append(txn)
+ # FIXME/TESTME: should be len(self._busy) + len(self._finishing)
+ # (free doesn't need to be considered, as it's tested above)
+ if self._activeConnectionCount() < self.maxConnections:
+ self._startOneMore()
return txn
- def reclaim(self, txn):
+ def _activeConnectionCount(self):
"""
- Shuck the L{PooledSqlTxn} wrapper off, and recycle the underlying
- BaseSqlTxn into the free list.
+ @return: the number of active outgoing connections to the database.
"""
- baseTxn = txn._baseTxn
- baseTxn.reset()
- self.busy.remove(txn)
- if self.waiting:
- waiting = self.waiting.pop(0)
- waiting._baseTxn._unspool(baseTxn)
- # Note: although commit() may already have been called, we don't
- # have to handle it specially here. It only unspools after the
- # deferred returned by commit() has actually been called, and since
- # that occurs in a callFromThread, that won't happen until the next
- # iteration of the mainloop, when the _baseTxn is safely correct.
- waiting._baseTxn = baseTxn
- self.busy.append(waiting)
+ return len(self._busy) + len(self._finishing)
+
+
+ def _startOneMore(self):
+ """
+ Start one more _ConnectedTxn.
+ """
+ holder = self._createHolder()
+ holder.start()
+ txn = _ConnectingPseudoTxn(self, holder)
+ # take up a slot in the 'busy' list, sit there so we can be aborted.
+ self._busy.append(txn)
+ def initCursor():
+ # support threadlevel=1; we can't necessarily cursor() in a
+ # different thread than we do transactions in.
+ connection = self.connectionFactory()
+ cursor = connection.cursor()
+ return (connection, cursor)
+ def finishInit((connection, cursor)):
+ baseTxn = _ConnectedTxn(
+ pool=self,
+ threadHolder=holder,
+ connection=connection,
+ cursor=cursor
+ )
+ self._busy.remove(txn)
+ self._repoolNow(baseTxn)
+ def maybeTryAgain(f):
+ log.err(f, "Re-trying connection due to connection failure")
+ txn._retry = self.reactor.callLater(self.RETRY_TIMEOUT, resubmit)
+ def resubmit():
+ d = holder.submit(initCursor)
+ d.addCallbacks(finishInit, maybeTryAgain)
+ resubmit()
+
+
+ def _repoolAfter(self, txn, d):
+ """
+ Re-pool the given L{_ConnectedTxn} after the given L{Deferred} has
+ fired.
+ """
+ self._busy.remove(txn)
+ finishRecord = (txn, d)
+ self._finishing.append(finishRecord)
+ def repool(result):
+ self._finishing.remove(finishRecord)
+ self._repoolNow(txn)
+ return result
+ return d.addBoth(repool)
+
+
+ def _repoolNow(self, txn):
+ """
+ Recycle a L{_ConnectedTxn} into the free list.
+ """
+ txn.reset()
+ if self._waiting:
+ waiting = self._waiting.pop(0)
+ self._busy.append(txn)
+ waiting._unspoolOnto(txn)
else:
- self.free.append(baseTxn)
+ self._free.append(txn)
@@ -483,7 +664,7 @@
class QueryComplete(Command):
"""
- A query issued with ExecSQL is complete.
+ A query issued with L{ExecSQL} is complete.
"""
arguments = [('queryID', String()),
@@ -518,7 +699,7 @@
Initialize a mapping of transaction IDs to transaction objects.
"""
super(ConnectionPoolConnection, self).__init__()
- self.pool = pool
+ self.pool = pool
self._txns = {}
@@ -574,16 +755,24 @@
"""
def __init__(self):
super(ConnectionPoolClient, self).__init__()
- self._nextID = count().next
- self._txns = {}
+ self._nextID = count().next
+ self._txns = {}
self._queries = {}
def newTransaction(self):
- txnid = str(self._nextID())
+ """
+ Create a new networked provider of L{IAsyncTransaction}.
+
+ (This will ultimately call L{ConnectionPool.connection} on the other end
+ of the wire.)
+
+ @rtype: L{IAsyncTransaction}
+ """
+ txnid = str(self._nextID())
+ txn = _NetTransaction(client=self, transactionID=txnid)
+ self._txns[txnid] = txn
self.callRemote(StartTxn, transactionID=txnid)
- txn = Transaction(client=self, transactionID=txnid)
- self._txns[txnid] = txn
return txn
@@ -602,8 +791,8 @@
class _Query(object):
def __init__(self, raiseOnZeroRowCount):
- self.results = []
- self.deferred = Deferred()
+ self.results = []
+ self.deferred = Deferred()
self.raiseOnZeroRowCount = raiseOnZeroRowCount
@@ -628,15 +817,16 @@
-class Transaction(object):
+class _NetTransaction(object):
"""
- Async protocol-based transaction implementation.
+ A L{_NetTransaction} is an L{AMP}-protocol-based provider of the
+ L{IAsyncTransaction} interface. It sends SQL statements, query results, and
+ commit/abort commands via an AMP socket to a pooling process.
"""
implements(IAsyncTransaction)
- # FIXME: this needs to come from the other end of the wire.
-
+ # See DEFAULT_PARAM_STYLE FIXME above.
paramstyle = DEFAULT_PARAM_STYLE
def __init__(self, client, transactionID):
@@ -644,9 +834,9 @@
Initialize a transaction with a L{ConnectionPoolClient} and a unique
transaction identifier.
"""
- self._client = client
+ self._client = client
self._transactionID = transactionID
- self._completed = False
+ self._completed = False
def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
Modified: CalendarServer/trunk/twext/enterprise/ienterprise.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/ienterprise.py 2011-02-03 23:42:00 UTC (rev 6876)
+++ CalendarServer/trunk/twext/enterprise/ienterprise.py 2011-02-04 00:42:52 UTC (rev 6877)
@@ -33,6 +33,13 @@
+class ConnectionError(Exception):
+ """
+ An error occurred with the underlying database connection.
+ """
+
+
+
class IAsyncTransaction(Interface):
"""
Asynchronous execution of SQL.
Modified: CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py 2011-02-03 23:42:00 UTC (rev 6876)
+++ CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py 2011-02-04 00:42:52 UTC (rev 6877)
@@ -22,22 +22,50 @@
from twisted.trial.unittest import TestCase
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import execute
+from twisted.internet.task import Clock
+from twisted.internet.defer import Deferred
+from twext.enterprise.ienterprise import ConnectionError
from twext.enterprise.adbapi2 import ConnectionPool
+def resultOf(deferred, propagate=False):
+ """
+ Add a callback and errback which will capture the result of a L{Deferred} in
+ a list, and return that list. If 'propagate' is True, pass through the
+ results.
+ """
+ results = []
+ if propagate:
+ def cb(r):
+ results.append(r)
+ return r
+ else:
+ cb = results.append
+ deferred.addBoth(cb)
+ return results
+
+
+
class Child(object):
+ """
+ An object with a L{Parent}, in its list of C{children}.
+ """
def __init__(self, parent):
+ self.closed = False
self.parent = parent
self.parent.children.append(self)
def close(self):
- self.parent.children.remove(self)
+ self.closed = True
class Parent(object):
+ """
+ An object with a list of L{Child}ren.
+ """
def __init__(self):
self.children = []
@@ -69,14 +97,31 @@
def commit(self):
- return
+ if self.parent.commitFail:
+ self.parent.commitFail = False
+ raise CommitFail()
def rollback(self):
- return
+ if self.parent.rollbackFail:
+ self.parent.rollbackFail = False
+ raise RollbackFail()
+class RollbackFail(Exception):
+ """
+ Sample rollback-failure exception.
+ """
+
+
+class CommitFail(Exception):
+ """
+ Sample Commit-failure exception.
+ """
+
+
+
class FakeCursor(Child):
"""
Fake stand-in for a DB-API 2.0 cursor.
@@ -110,10 +155,17 @@
class ConnectionFactory(Parent):
+
+ rollbackFail = False
+ commitFail = False
+
def __init__(self):
Parent.__init__(self)
self.idcounter = count(1)
+ self._resultQueue = []
+ self.defaultConnect()
+
@property
def connections(self):
"Alias to make tests more readable."
@@ -121,55 +173,488 @@
def connect(self):
- return FakeConnection(self)
+ """
+ Implement the C{ConnectionFactory} callable expected by
+ L{ConnectionPool}.
+ """
+ if self._resultQueue:
+ thunk = self._resultQueue.pop(0)
+ else:
+ thunk = self._default
+ return thunk()
+ def willConnect(self):
+ """
+ Used by tests to queue a successful result for connect().
+ """
+ def thunk():
+ return FakeConnection(self)
+ self._resultQueue.append(thunk)
+
+ def willFail(self):
+ """
+ Used by tests to queue a successful result for connect().
+ """
+ def thunk():
+ raise FakeConnectionError()
+ self._resultQueue.append(thunk)
+
+
+ def defaultConnect(self):
+ """
+ By default, connection attempts will succeed.
+ """
+ self.willConnect()
+ self._default = self._resultQueue.pop()
+
+
+ def defaultFail(self):
+ """
+ By default, connection attempts will fail.
+ """
+ self.willFail()
+ self._default = self._resultQueue.pop()
+
+
+
+class FakeConnectionError(Exception):
+ """
+ Synthetic error that might occur during connection.
+ """
+
+
+
+class FakeThreadHolder(object):
+ """
+ Run things submitted to this ThreadHolder on the main thread, so that
+ execution is easier to control.
+ """
+
+ def __init__(self, test):
+ self.started = False
+ self.stopped = False
+ self.test = test
+ self.queue = []
+
+
+ def start(self):
+ """
+ Mark this L{FakeThreadHolder} as not started.
+ """
+ self.started = True
+
+
+ def stop(self):
+ """
+ Mark this L{FakeThreadHolder} as stopped.
+ """
+ def stopped(nothing):
+ self.stopped = True
+ return self.submit(lambda : None).addCallback(stopped)
+
+
+ def submit(self, work):
+ """
+ Call the function (or queue it)
+ """
+ if self.test.paused:
+ d = Deferred()
+ self.queue.append((d, work))
+ return d
+ else:
+ return execute(work)
+
+
+ def flush(self):
+ """
+ Fire all deferreds previously returned from submit.
+ """
+ self.queue, queue = [], self.queue
+ for (d, work) in queue:
+ try:
+ result = work()
+ except:
+ d.errback()
+ else:
+ d.callback(result)
+
+
+
class ConnectionPoolTests(TestCase):
+ """
+ Tests for L{ConnectionPool}.
+ """
- @inlineCallbacks
+ def setUp(self):
+ """
+ Create a L{ConnectionPool} attached to a C{ConnectionFactory}. Start
+ the L{ConnectionPool}.
+ """
+ self.paused = False
+ self.holders = []
+ self.factory = ConnectionFactory()
+ self.pool = ConnectionPool(self.factory.connect,
+ maxConnections=2)
+ self.pool._createHolder = self.makeAHolder
+ self.clock = self.pool.reactor = Clock()
+ self.pool.startService()
+
+
+ def tearDown(self):
+ """
+ Make sure the service is stopped and the fake ThreadHolders are all
+ executing their queues so failed tests can exit cleanly.
+ """
+ self.flushHolders()
+
+
+ def flushHolders(self):
+ """
+ Flush all pending C{submit}s since C{pauseHolders} was called.
+ """
+ self.paused = False
+ for holder in self.holders:
+ holder.flush()
+
+
+ def pauseHolders(self):
+ """
+ Pause all L{FakeThreadHolder}s, causing C{submit} to return an unfired
+ L{Deferred}.
+ """
+ self.paused = True
+
+
+ def makeAHolder(self):
+ """
+ Make a ThreadHolder-alike.
+ """
+ fth = FakeThreadHolder(self)
+ self.holders.append(fth)
+ return fth
+
+
def test_tooManyConnections(self):
"""
When the number of outstanding busy transactions exceeds the number of
slots specified by L{ConnectionPool.maxConnections},
- L{ConnectionPool.connection} will return a L{PooledSqlTxn} that is not
- backed by any L{BaseSqlTxn}; this object will queue its SQL statements
- until an existing connection becomes available.
+ L{ConnectionPool.connection} will return a pooled transaction that is
+ not backed by any real database connection; this object will queue its
+ SQL statements until an existing connection becomes available.
"""
- cf = ConnectionFactory()
- cp = ConnectionPool(cf.connect, maxConnections=2)
- cp.startService()
- self.addCleanup(cp.stopService)
- a = cp.connection()
- [[counter, echo]] = yield a.execSQL("alpha")
- b = cp.connection()
- [[bcounter, becho]] = yield b.execSQL("beta")
+ a = self.pool.connection()
+ alphaResult = resultOf(a.execSQL("alpha"))
+ [[counter, echo]] = alphaResult[0]
+
+ b = self.pool.connection()
+ # 'b' should have opened a connection.
+ self.assertEquals(len(self.factory.connections), 2)
+ betaResult = resultOf(b.execSQL("beta"))
+ [[bcounter, becho]] = betaResult[0]
+
# both 'a' and 'b' are holding open a connection now; let's try to open
# a third one. (The ordering will be deterministic even if this fails,
# because those threads are already busy.)
- c = cp.connection()
- enqueue = c.execSQL("gamma")
- x = []
- def addtox(it):
- x.append(it)
- return it
- enqueue.addCallback(addtox)
+ c = self.pool.connection()
+ gammaResult = resultOf(c.execSQL("gamma"))
# Did 'c' open a connection? Let's hope not...
- self.assertEquals(len(cf.connections), 2)
- # This assertion is _not_ deterministic, unfortunately; it's unlikely
- # that the implementation could be adjusted such that this assertion
- # would fail and the others would succeed. However, if it does fail,
- # that's really bad, so I am leaving it regardless.
- self.failIf(bool(x), "SQL executed too soon!")
- yield b.commit()
+ self.assertEquals(len(self.factory.connections), 2)
+ # SQL shouldn't be executed too soon...
+ self.assertEquals(gammaResult, [])
+ commitResult = resultOf(b.commit())
+
# Now that 'b' has committed, 'c' should be able to complete.
- [[ccounter, cecho]] = yield enqueue
+ [[ccounter, cecho]] = gammaResult[0]
- # The connection for 'a' ought to be busy, so let's make sure we're
- # using the one for 'c'.
+ # The connection for 'a' ought to still be busy, so let's make sure
+ # we're using the one for 'c'.
self.assertEquals(ccounter, bcounter)
+ # Sanity check: the commit should have succeded!
+ self.assertEquals(commitResult, [None])
+
+ def test_stopService(self):
+ """
+ L{ConnectionPool.stopService} stops all the associated L{ThreadHolder}s
+ and thereby frees up the resources it is holding.
+ """
+ a = self.pool.connection()
+ [[[counter, echo]]] = resultOf(a.execSQL("alpha"))
+ self.assertEquals(len(self.factory.connections), 1)
+ self.assertEquals(len(self.holders), 1)
+ [holder] = self.holders
+ self.assertEquals(holder.started, True)
+ self.assertEquals(holder.stopped, False)
+ self.pool.stopService()
+ self.assertEquals(len(self.holders), 1)
+ self.assertEquals(holder.started, True)
+ self.assertEquals(holder.stopped, True)
+ # Closing fake connections removes them from the list.
+ self.assertEquals(len(self.factory.connections), 1)
+ self.assertEquals(self.factory.connections[0].closed, True)
+
+
+ def test_retryAfterConnectError(self):
+ """
+ When the C{connectionFactory} passed to L{ConnectionPool} raises an
+ exception, the L{ConnectionPool} will log the exception and delay
+ execution of a new connection's SQL methods until an attempt succeeds.
+ """
+ self.factory.willFail()
+ self.factory.willFail()
+ self.factory.willConnect()
+ c = self.pool.connection()
+ def checkOneFailure():
+ errors = self.flushLoggedErrors(FakeConnectionError)
+ self.assertEquals(len(errors), 1)
+ checkOneFailure()
+ d = c.execSQL("alpha")
+ happened = []
+ d.addBoth(happened.append)
+ self.assertEquals(happened, [])
+ self.clock.advance(self.pool.RETRY_TIMEOUT + 0.01)
+ checkOneFailure()
+ self.assertEquals(happened, [])
+ self.clock.advance(self.pool.RETRY_TIMEOUT + 0.01)
+ self.assertEquals(happened, [[[1, "alpha"]]])
+
+
+ def test_shutdownDuringRetry(self):
+ """
+ If a L{ConnectionPool} is attempting to shut down while it's in the
+ process of re-trying a connection attempt that received an error, the
+ connection attempt should be cancelled and the shutdown should complete
+ as normal.
+ """
+ self.factory.defaultFail()
+ self.pool.connection()
+ errors = self.flushLoggedErrors(FakeConnectionError)
+ self.assertEquals(len(errors), 1)
+ stopd = []
+ self.pool.stopService().addBoth(stopd.append)
+ self.assertEquals([None], stopd)
+ self.assertEquals(self.clock.calls, [])
+ [holder] = self.holders
+ self.assertEquals(holder.started, True)
+ self.assertEquals(holder.stopped, True)
+
+
+ def test_shutdownDuringAttemptSuccess(self):
+ """
+ If L{ConnectionPool.stopService} is called while a connection attempt is
+ outstanding, the resulting L{Deferred} won't be fired until the
+ connection attempt has finished; in this case, succeeded.
+ """
+ self.pauseHolders()
+ self.pool.connection()
+ stopd = []
+ self.pool.stopService().addBoth(stopd.append)
+ self.assertEquals(stopd, [])
+ self.flushHolders()
+ self.assertEquals(stopd, [None])
+ [holder] = self.holders
+ self.assertEquals(holder.started, True)
+ self.assertEquals(holder.stopped, True)
+
+
+ def test_shutdownDuringAttemptFailed(self):
+ """
+ If L{ConnectionPool.stopService} is called while a connection attempt is
+ outstanding, the resulting L{Deferred} won't be fired until the
+ connection attempt has finished; in this case, failed.
+ """
+ self.factory.defaultFail()
+ self.pauseHolders()
+ self.pool.connection()
+ stopd = []
+ self.pool.stopService().addBoth(stopd.append)
+ self.assertEquals(stopd, [])
+ self.flushHolders()
+ errors = self.flushLoggedErrors(FakeConnectionError)
+ self.assertEquals(len(errors), 1)
+ self.assertEquals(stopd, [None])
+ [holder] = self.holders
+ self.assertEquals(holder.started, True)
+ self.assertEquals(holder.stopped, True)
+
+
+ def test_stopServiceMidAbort(self):
+ """
+ When L{ConnectionPool.stopService} is called with deferreds from
+ C{abort} still outstanding, it will wait for the currently-aborting
+ transaction to fully abort before firing the L{Deferred} returned from
+ C{stopService}.
+ """
+ # TODO: commit() too?
+ self.pauseHolders()
+ c = self.pool.connection()
+ abortResult = resultOf(c.abort())
+ # Should abort instantly, as it hasn't managed to unspool anything yet.
+ # FIXME: kill all Deferreds associated with this thing, make sure that
+ # any outstanding query callback chains get nuked.
+ self.assertEquals(abortResult, [None])
+ stopResult = resultOf(self.pool.stopService())
+ self.assertEquals(stopResult, [])
+ self.flushHolders()
+ #self.assertEquals(abortResult, [None])
+ self.assertEquals(stopResult, [None])
+
+
+ def test_stopServiceWithSpooled(self):
+ """
+ When L{ConnectionPool.stopService} is called when spooled transactions
+ are outstanding, any pending L{Deferreds} returned by those transactions
+ will be failed with L{ConnectionError}.
+ """
+ # Use up the free slots so we have to spool.
+ hold = []
+ hold.append(self.pool.connection())
+ hold.append(self.pool.connection())
+
+ c = self.pool.connection()
+ se = resultOf(c.execSQL("alpha"))
+ ce = resultOf(c.commit())
+ self.assertEquals(se, [])
+ self.assertEquals(ce, [])
+ self.pool.stopService()
+ self.assertEquals(se[0].type, ConnectionError)
+ self.assertEquals(ce[0].type, ConnectionError)
+
+
+ def test_repoolSpooled(self):
+ """
+ Regression test for a somewhat tricky-to-explain bug: when a spooled
+ transaction which has already had commit() called on it before it's
+ received a real connection to start executing on, it will not leave
+ behind any detritus that prevents stopService from working.
+ """
+ self.pauseHolders()
+ c = self.pool.connection()
+ c2 = self.pool.connection()
+ c3 = self.pool.connection()
+ c.commit()
+ c2.commit()
+ c3.commit()
+ self.flushHolders()
+ self.assertEquals(len(self.factory.connections), 2)
+ stopResult = resultOf(self.pool.stopService())
+ self.assertEquals(stopResult, [None])
+ self.assertEquals(len(self.factory.connections), 2)
+ self.assertEquals(self.factory.connections[0].closed, True)
+ self.assertEquals(self.factory.connections[1].closed, True)
+
+
+ def test_connectAfterStop(self):
+ """
+ Calls to connection() after stopService() result in transactions which
+ immediately fail all operations.
+ """
+ stopResults = resultOf(self.pool.stopService())
+ self.assertEquals(stopResults, [None])
+ self.pauseHolders()
+ postClose = self.pool.connection()
+ queryResult = resultOf(postClose.execSQL("hello"))
+ self.assertEquals(len(queryResult), 1)
+ self.assertEquals(queryResult[0].type, ConnectionError)
+
+
+ def test_connectAfterStartedStopping(self):
+ """
+ Calls to connection() after stopService() has been called but before it
+ has completed will result in transactions which immediately fail all
+ operations.
+ """
+ self.pauseHolders()
+ preClose = self.pool.connection()
+ preCloseResult = resultOf(preClose.execSQL('statement'))
+ stopResult = resultOf(self.pool.stopService())
+ postClose = self.pool.connection()
+ queryResult = resultOf(postClose.execSQL("hello"))
+ self.assertEquals(stopResult, [])
+ self.assertEquals(len(queryResult), 1)
+ self.assertEquals(queryResult[0].type, ConnectionError)
+ self.assertEquals(len(preCloseResult), 1)
+ self.assertEquals(preCloseResult[0].type, ConnectionError)
+
+
+ def test_abortFailsDuringStopService(self):
+ """
+ L{IAsyncTransaction.abort} might fail, most likely because the
+ underlying database connection has already been disconnected. If this
+ happens, shutdown should continue.
+ """
+ txns = []
+ txns.append(self.pool.connection())
+ txns.append(self.pool.connection())
+ # Fail one (and only one) call to rollback().
+ self.factory.rollbackFail = True
+ stopResult = resultOf(self.pool.stopService())
+ self.assertEquals(stopResult, [None])
+ self.assertEquals(len(self.flushLoggedErrors(RollbackFail)), 1)
+ self.assertEquals(self.factory.connections[0].closed, True)
+ self.assertEquals(self.factory.connections[1].closed, True)
+
+
+ def test_abortRecycledTransaction(self):
+ """
+ L{ConnectionPool.stopService} will shut down if a recycled transaction
+ is still pending.
+ """
+ recycled = self.pool.connection()
+ recycled.commit()
+ remember = []
+ remember.append(self.pool.connection())
+ self.assertEquals(resultOf(self.pool.stopService()), [None])
+
+
+ def test_waitForAlreadyAbortedTransaction(self):
+ """
+ L{ConnectionPool.stopService} will wait for all transactions to shut
+ down before exiting, including those which have already been stopped.
+ """
+ it = self.pool.connection()
+ self.pauseHolders()
+ abortResult = resultOf(it.abort())
+
+ # steal it from the queue so we can do it out of order
+ d, work = self.holders[0].queue.pop()
+ # that should be the only work unit so don't continue if something else
+ # got in there
+ self.assertEquals(self.holders[0].queue, [])
+ self.assertEquals(len(self.holders), 1)
+ self.flushHolders()
+ stopResult = resultOf(self.pool.stopService())
+ # Sanity check that we haven't actually stopped it yet
+ self.assertEquals(abortResult, [])
+ # We haven't fired it yet, so the service had better not have stopped...
+ self.assertEquals(stopResult, [])
+ d.callback(None)
+ self.assertEquals(abortResult, [None])
+ self.assertEquals(stopResult, [None])
+
+
+ def test_tooManyConnectionsWhileOthersFinish(self):
+ """
+ L{ConnectionPool.connection} will not spawn more than the maximum
+ connections if there are finishing transactions outstanding.
+ """
+ a = self.pool.connection()
+ b = self.pool.connection()
+ self.pauseHolders()
+ a.abort()
+ b.abort()
+ # Remove the holders for the existing connections, so that the 'extra'
+ # connection() call wins the race and gets executed first.
+ self.holders[:] = []
+ self.pool.connection()
+ self.flushHolders()
+ self.assertEquals(len(self.factory.connections), 2)
+
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110203/d973bdac/attachment-0001.html>
More information about the calendarserver-changes
mailing list