[CalendarServer-changes] [6831] CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Fri Jan 28 17:40:26 PST 2011
Revision: 6831
http://trac.macosforge.org/projects/calendarserver/changeset/6831
Author: glyph at apple.com
Date: 2011-01-28 17:40:26 -0800 (Fri, 28 Jan 2011)
Log Message:
-----------
test for stopService at another point, make sure we don't lose track of the connection attempt in progress
Modified Paths:
--------------
CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py
Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py 2011-01-29 01:40:16 UTC (rev 6830)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py 2011-01-29 01:40:26 UTC (rev 6831)
@@ -293,7 +293,8 @@
_retry = None
- def __init__(self, holder):
+ def __init__(self, pool, holder):
+ self._pool = pool
self._holder = holder
def abort(self):
@@ -301,8 +302,12 @@
# raise NotImplementedError()
if self._retry is not None:
self._retry.cancel()
- # deferred, should be returned
- self._holder.stop()
+ d = self._holder.stop()
+ def removeme(ignored):
+ if self in self._pool.busy:
+ self._pool.busy.remove(self)
+ d.addCallback(removeme)
+ return d
@@ -351,7 +356,8 @@
"""
Forcibly abort any outstanding transactions.
"""
- for busy in self.busy[:]:
+ while self.busy:
+ busy = self.busy[0]
try:
yield busy.abort()
except:
@@ -398,16 +404,12 @@
holder = self._createHolder()
holder.start()
# FIXME: attach the holder to the txn so it can be aborted.
- txn = _ConnectingPsuedoTxn(holder)
+ txn = _ConnectingPsuedoTxn(self, holder)
# take up a slot in the 'busy' list, sit there so we can be aborted.
self.busy.append(txn)
def initCursor():
# support threadlevel=1; we can't necessarily cursor() in a
# different thread than we do transactions in.
-
- # TODO: Re-try connect when it fails. Specify a timeout. That
- # should happen in this layer because we need to be able to stop
- # the reconnect attempt if it's hanging.
connection = self.connectionFactory()
cursor = connection.cursor()
return (connection, cursor)
Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py 2011-01-29 01:40:16 UTC (rev 6830)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py 2011-01-29 01:40:26 UTC (rev 6831)
@@ -27,6 +27,7 @@
from twisted.internet.defer import execute
from twisted.internet.task import Clock
+from twisted.internet.defer import Deferred
from twext.enterprise.adbapi2 import ConnectionPool
@@ -188,9 +189,11 @@
execution is easier to control.
"""
- def __init__(self):
+ def __init__(self, test):
self.started = False
self.stopped = False
+ self.test = test
+ self.queue = []
def start(self):
@@ -204,17 +207,38 @@
"""
Mark this L{FakeThreadHolder} as stopped.
"""
- self.stopped = True
+ def stopped(nothing):
+ self.stopped = True
+ return self.submit(lambda : None).addCallback(stopped)
def submit(self, work):
"""
- Call the function.
+ Call the function (or queue it)
"""
- return execute(work)
+ if self.test.paused:
+ d = Deferred()
+ self.queue.append((d, work))
+ return d
+ else:
+ return execute(work)
+ def flush(self):
+ """
+ Fire all deferreds previously returned from submit.
+ """
+ self.queue, queue = [], self.queue
+ for (d, work) in queue:
+ try:
+ result = work()
+ except:
+ d.errback()
+ else:
+ d.callback(result)
+
+
class ConnectionPoolTests(TestCase):
"""
Tests for L{ConnectionPool}.
@@ -225,6 +249,7 @@
Create a L{ConnectionPool} attached to a C{ConnectionFactory}. Start
the L{ConnectionPool}.
"""
+ self.paused = False
self.holders = []
self.factory = ConnectionFactory()
self.pool = ConnectionPool(self.factory.connect, maxConnections=2)
@@ -234,11 +259,28 @@
self.addCleanup(self.pool.stopService)
+ def flushHolders(self):
+ """
+ Flush all pending C{submit}s since C{pauseHolders} was called.
+ """
+ self.paused = False
+ for holder in self.holders:
+ holder.flush()
+
+
+ def pauseHolders(self):
+ """
+ Pause all L{FakeThreadHolder}s, causing C{submit} to return an unfired
+ L{Deferred}.
+ """
+ self.paused = True
+
+
def makeAHolder(self):
"""
Make a ThreadHolder-alike.
"""
- fth = FakeThreadHolder()
+ fth = FakeThreadHolder(self)
self.holders.append(fth)
return fth
@@ -344,3 +386,23 @@
self.assertEquals(holder.started, True)
self.assertEquals(holder.stopped, True)
+
+ def test_shutdownDuringAttemptSuccess(self):
+ """
+ If L{ConnectionPool.stopService} is called while a connection attempt is
+ outstanding, the resulting L{Deferred} won't be fired until the
+ connection attempt has succeeded.
+ """
+ self.pauseHolders()
+ self.pool.connection()
+ stopd = []
+ self.pool.stopService().addBoth(stopd.append)
+ self.assertEquals(stopd, [])
+ self.flushHolders()
+ self.assertEquals(stopd, [None])
+ [holder] = self.holders
+ self.assertEquals(holder.started, True)
+ self.assertEquals(holder.stopped, True)
+ # FIXME: next, 'failed' case.
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110128/bbb61801/attachment.html>
More information about the calendarserver-changes
mailing list