[CalendarServer-changes] [6843] CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Tue Feb 1 16:08:55 PST 2011
Revision: 6843
http://trac.macosforge.org/projects/calendarserver/changeset/6843
Author: glyph at apple.com
Date: 2011-02-01 16:08:55 -0800 (Tue, 01 Feb 2011)
Log Message:
-----------
fix for the abort()-a-pooled-transaction case.
Modified Paths:
--------------
CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py
Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py 2011-02-02 00:08:51 UTC (rev 6842)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py 2011-02-02 00:08:55 UTC (rev 6843)
@@ -50,6 +50,7 @@
from twisted.python.components import proxyForInterface
from twext.internet.threadutils import ThreadHolder
+from twisted.internet.defer import succeed
from twext.enterprise.ienterprise import AlreadyFinishedError, IAsyncTransaction
@@ -135,7 +136,6 @@
def __del__(self):
if not self._completed:
- print 'BaseSqlTxn.__del__: OK'
self.abort()
@@ -230,7 +230,7 @@
def abort(self):
- return self._enspool('abort')
+ return succeed(None)
@@ -249,6 +249,15 @@
self._complete = False
+ def _unspoolOnto(self, baseTxn):
+ """
+ Replace my C{_baseTxn}, currently a L{SpooledTxn}, with a L{BaseSqlTxn}.
+ """
+ spooledBase = self._baseTxn
+ self._baseTxn = baseTxn
+ spooledBase._unspool(baseTxn)
+
+
def execSQL(self, *a, **kw):
self._checkComplete()
return super(PooledSqlTxn, self).execSQL(*a, **kw)
@@ -256,12 +265,18 @@
def commit(self):
self._markComplete()
- return self._repoolAfter(super(PooledSqlTxn, self).commit())
+ return self._pool._repoolAfter(self, super(PooledSqlTxn, self).commit())
def abort(self):
self._markComplete()
- return self._repoolAfter(super(PooledSqlTxn, self).abort())
+ waiting = self._pool.waiting
+ if self in waiting:
+ waiting.remove(self)
+ # FIXME: waiting.remove()
+ return succeed(None)
+ else:
+ return self._pool._repoolAfter(self, super(PooledSqlTxn, self).abort())
def _checkComplete(self):
@@ -280,33 +295,16 @@
self._complete = True
- def _repoolAfter(self, d):
- # FIXME: if abort() is called (say, by an HTTP connection terminating
- # because the reactor is shutting down), the transaction will not
- # immediately be moved to the 'stopped' list or removed from the 'busy'
- # list; instead, it will be re-pooled only after the first abort()
- # concludes, and stopService will sit in a loop calling abort() over and
- # over again (and logging the traceback from _checkComplete()) until the
- # thread actually doing the aborting finishes what it's doing. This
- # needs to be fixed to represent an intermediary state where completion
- # has started and not stopped - note that this is *only* for interaction
- # with stopService, there's no other legitimate cause for two abort()s
- # to be called.
- def repool(result):
- self._pool.reclaim(self)
- return result
- return d.addCallback(repool)
+class _ConnectingPseudoTxn(object):
-
-class _ConnectingPsuedoTxn(object):
-
_retry = None
def __init__(self, pool, holder):
self._pool = pool
self._holder = holder
+
def abort(self):
# not implemented yet, but let's fail rather than break the test
# raise NotImplementedError()
@@ -339,6 +337,22 @@
for failed connect() attempts.
@type reactor: L{IReactorTime} and L{IReactorThreads} provider.
+
+ @ivar free: The list of free L{BaseSqlTxn} objects which are not currently
+ attached to a L{PooledSqlTxn} object, and have active connections ready
+ for processing a new transaction.
+
+ @ivar busy: The list of busy (currently bound to a L{BaseSqlTxn})
+ L{PooledSqlTxn} objects.
+
+ @ivar finishing: The list of 2-tuples of L{PooledSqlTxn} objects which have
+ had C{abort} or C{commit} called on them, but are not done executing
+ that method, and the L{Deferred} returned from that method that will be
+ fired when its execution has completed.
+
+ @ivar waiting: The list of L{PooledSqlTxn} objects attached to a
+ L{SpooledTxn}; i.e. those which are awaiting a connection to become free
+ so that they can be executed.
"""
reactor = _reactor
@@ -347,12 +361,16 @@
def __init__(self, connectionFactory, maxConnections=10):
+
super(ConnectionPool, self).__init__()
+ self.connectionFactory = connectionFactory
+ self.maxConnections = maxConnections
+
self.free = []
self.busy = []
self.waiting = []
- self.connectionFactory = connectionFactory
- self.maxConnections = maxConnections
+ self.finishing = []
+ self.connecting = []
def startService(self):
@@ -371,10 +389,13 @@
# one is aborted, it will remove itself from the list.
while self.busy:
busy = self.busy[0]
- try:
- yield busy.abort()
- except:
- log.err()
+# try:
+ # FIXME: abort() might fail.
+ yield busy.abort()
+# except:
+# log.err()
+# if self.busy and busy is self.busy[0]:
+# raise RuntimeError("this will result in an infinite loop.")
# Phase 2: All transactions should now be in the free list, since
# 'abort()' will have put them there. Shut down all the associated
# ThreadHolders.
@@ -383,6 +404,8 @@
# it the way aborting a PooledSqlTxn does, so we need to .pop()
# here.)
free = self.free.pop()
+ # stop() really shouldn't be able to fail, as it's just stopping the
+ # thread, and the holder's stop() is independently submitted.
yield free.stop()
@@ -408,6 +431,8 @@
tracking = self.waiting
txn = PooledSqlTxn(self, basetxn)
tracking.append(txn)
+ # FIXME/TESTME: should be len(self.busy) + len(self.finishing) (free
+ # doesn't need to be considered, as it's tested above)
if tracking is self.waiting and len(self.busy) < self.maxConnections:
self._startOneMore()
return txn
@@ -419,7 +444,7 @@
"""
holder = self._createHolder()
holder.start()
- txn = _ConnectingPsuedoTxn(self, holder)
+ txn = _ConnectingPseudoTxn(self, holder)
# take up a slot in the 'busy' list, sit there so we can be aborted.
self.busy.append(txn)
def initCursor():
@@ -434,36 +459,47 @@
connection=connection,
cursor=cursor
)
+ self.busy.remove(txn)
txn._baseTxn = baseTxn
- self.reclaim(txn)
+ self._repoolNow(txn)
def maybeTryAgain(f):
- log.err(f)
+ log.err(f, "Re-trying connection due to connection failure")
txn._retry = self.reactor.callLater(self.RETRY_TIMEOUT, resubmit)
def resubmit():
d = holder.submit(initCursor)
d.addCallbacks(finishInit, maybeTryAgain)
resubmit()
- return txn
- def reclaim(self, txn):
+ def _repoolAfter(self, txn, d):
"""
+ Re-pool the back-end for the given L{PooledSqlTxn} after the given
+ L{Deferred} has fired.
+ """
+ if txn in self.busy:
+ # should only _not_ in self.busy if its underlying txn is a SpooledTxn
+ self.busy.remove(txn)
+ finishRecord = (txn, d)
+ self.finishing.append(finishRecord)
+ def repool(result):
+ self.finishing.remove(finishRecord)
+ self._repoolNow(txn)
+ return result
+ return d.addBoth(repool)
+
+
+ def _repoolNow(self, txn):
+ """
Shuck the L{PooledSqlTxn} wrapper off, and recycle the underlying
L{BaseSqlTxn} into the free list.
"""
- self.busy.remove(txn)
baseTxn = txn._baseTxn
+ txn._baseTxn = None
baseTxn.reset()
if self.waiting:
waiting = self.waiting.pop(0)
- waiting._baseTxn._unspool(baseTxn)
- # Note: although commit() may already have been called, we don't
- # have to handle it specially here. It only unspools after the
- # deferred returned by commit() has actually been called, and since
- # that occurs in a callFromThread, that won't happen until the next
- # iteration of the mainloop, when the _baseTxn is safely correct.
- waiting._baseTxn = baseTxn
self.busy.append(waiting)
+ waiting._unspoolOnto(baseTxn)
else:
self.free.append(baseTxn)
Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py 2011-02-02 00:08:51 UTC (rev 6842)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py 2011-02-02 00:08:55 UTC (rev 6843)
@@ -22,8 +22,6 @@
from twisted.trial.unittest import TestCase
-from twisted.internet.defer import inlineCallbacks
-
from twisted.internet.defer import execute
from twisted.internet.task import Clock
@@ -31,6 +29,24 @@
from twext.enterprise.adbapi2 import ConnectionPool
+def resultOf(deferred, propagate=False):
+ """
+ Add a callback and errback which will capture the result of a L{Deferred} in
+ a list, and return that list. If 'propagate' is True, pass through the
+ results.
+ """
+ results = []
+ if propagate:
+ def cb(r):
+ results.append(r)
+ return r
+ else:
+ cb = results.append
+ deferred.addBoth(cb)
+ return results
+
+
+
class Child(object):
"""
An object with a L{Parent}, in its list of C{children}.
@@ -263,14 +279,12 @@
self.pool.startService()
- @inlineCallbacks
def tearDown(self):
"""
Make sure the service is stopped and the fake ThreadHolders are all
executing their queues so failed tests can exit cleanly.
"""
self.flushHolders()
- yield self.pool.stopService()
def flushHolders(self):
@@ -299,7 +313,6 @@
return fth
- @inlineCallbacks
def test_tooManyConnections(self):
"""
When the number of outstanding busy transactions exceeds the number of
@@ -309,48 +322,52 @@
until an existing connection becomes available.
"""
a = self.pool.connection()
- [[counter, echo]] = yield a.execSQL("alpha")
+
+ alphaResult = resultOf(a.execSQL("alpha"))
+ [[counter, echo]] = alphaResult[0]
+
b = self.pool.connection()
- [[bcounter, becho]] = yield b.execSQL("beta")
+ # 'b' should have opened a connection.
+ self.assertEquals(len(self.factory.connections), 2)
+ betaResult = resultOf(b.execSQL("beta"))
+ [[bcounter, becho]] = betaResult[0]
# both 'a' and 'b' are holding open a connection now; let's try to open
# a third one. (The ordering will be deterministic even if this fails,
# because those threads are already busy.)
c = self.pool.connection()
- enqueue = c.execSQL("gamma")
- x = []
- def addtox(it):
- x.append(it)
- return it
- enqueue.addCallback(addtox)
+ gammaResult = resultOf(c.execSQL("gamma"))
# Did 'c' open a connection? Let's hope not...
self.assertEquals(len(self.factory.connections), 2)
+ # SQL shouldn't be executed too soon...
+ self.assertEquals(gammaResult, [])
- self.failIf(bool(x), "SQL executed too soon!")
- yield b.commit()
+ commitResult = resultOf(b.commit())
# Now that 'b' has committed, 'c' should be able to complete.
- [[ccounter, cecho]] = yield enqueue
+ [[ccounter, cecho]] = gammaResult[0]
- # The connection for 'a' ought to be busy, so let's make sure we're
- # using the one for 'c'.
+ # The connection for 'a' ought to still be busy, so let's make sure
+ # we're using the one for 'c'.
self.assertEquals(ccounter, bcounter)
+ # Sanity check: the commit should have succeded!
+ self.assertEquals(commitResult, [None])
- @inlineCallbacks
+
def test_stopService(self):
"""
L{ConnectionPool.stopService} stops all the associated L{ThreadHolder}s
and thereby frees up the resources it is holding.
"""
a = self.pool.connection()
- [[counter, echo]] = yield a.execSQL("alpha")
+ [[[counter, echo]]] = resultOf(a.execSQL("alpha"))
self.assertEquals(len(self.holders), 1)
[holder] = self.holders
self.assertEquals(holder.started, True)
self.assertEquals(holder.stopped, False)
- yield self.pool.stopService()
+ self.pool.stopService()
self.assertEquals(self.pool.busy, [])
self.assertEquals(self.pool.free, [])
self.assertEquals(len(self.holders), 1)
@@ -442,4 +459,26 @@
self.assertEquals(holder.stopped, True)
+ def test_stopServiceMidAbort(self):
+ """
+ When L{ConnectionPool.stopService} is called with deferreds from
+ C{abort} still outstanding, it will wait for the currently-aborting
+ transaction to fully abort before firing the L{Deferred} returned from
+ C{stopService}.
+ """
+ # TODO: commit() too?
+ self.pauseHolders()
+ c = self.pool.connection()
+ abortResult = resultOf(c.abort())
+ # Should abort instantly, as it hasn't managed to unspool anything yet.
+ # FIXME: kill all Deferreds associated with this thing, make sure that
+ # any outstanding query callback chains get nuked.
+ self.assertEquals(abortResult, [None])
+ stopResult = resultOf(self.pool.stopService())
+ self.assertEquals(stopResult, [])
+ self.flushHolders()
+ #self.assertEquals(abortResult, [None])
+ self.assertEquals(stopResult, [None])
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110201/7fe7a941/attachment-0001.html>
More information about the calendarserver-changes
mailing list