[CalendarServer-changes] [6825] CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Fri Jan 28 17:39:23 PST 2011
Revision: 6825
http://trac.macosforge.org/projects/calendarserver/changeset/6825
Author: glyph at apple.com
Date: 2011-01-28 17:39:23 -0800 (Fri, 28 Jan 2011)
Log Message:
-----------
Test which begins to demonstrate the issue (neatly sidestepping thread-interaction issues for now)
Modified Paths:
--------------
CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py
Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py 2011-01-29 01:38:04 UTC (rev 6824)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py 2011-01-29 01:39:23 UTC (rev 6825)
@@ -68,14 +68,14 @@
# FIXME: this should *really* be
paramstyle = DEFAULT_PARAM_STYLE
- def __init__(self, connectionFactory, reactor=_reactor):
+ def __init__(self, connectionFactory, threadHolder):
"""
@param connectionFactory: A 0-argument callable which returns a DB-API
2.0 connection.
"""
self._completed = False
self._cursor = None
- self._holder = ThreadHolder(reactor)
+ self._holder = threadHolder
self._holder.start()
def initCursor():
@@ -315,7 +315,6 @@
@type maxConnections: C{int}
"""
- reactor = _reactor
def __init__(self, connectionFactory, maxConnections=10):
super(ConnectionPool, self).__init__()
@@ -346,8 +345,17 @@
# have put them there.
for free in self.free:
yield free.stop()
+ self.busy = []
+ self.free = []
+ def _createHolder(self):
+ """
+ Create a L{ThreadHolder}. (Test hook.)
+ """
+ return ThreadHolder(_reactor)
+
+
def connection(self, label="<unlabeled>"):
"""
Find a transaction; either retrieve a free one from the list or
@@ -355,14 +363,13 @@
@return: an L{IAsyncTransaction}
"""
-
overload = False
if self.free:
basetxn = self.free.pop(0)
elif len(self.busy) < self.maxConnections:
basetxn = BaseSqlTxn(
connectionFactory=self.connectionFactory,
- reactor=self.reactor
+ threadHolder=self._createHolder()
)
else:
basetxn = SpooledTxn()
Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py 2011-01-29 01:38:04 UTC (rev 6824)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py 2011-01-29 01:39:23 UTC (rev 6825)
@@ -24,6 +24,7 @@
from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import execute
from twext.enterprise.adbapi2 import ConnectionPool
@@ -110,10 +111,15 @@
class ConnectionFactory(Parent):
+
+
def __init__(self):
Parent.__init__(self)
self.idcounter = count(1)
+ self._resultQueue = []
+ self.defaultConnect()
+
@property
def connections(self):
"Alias to make tests more readable."
@@ -121,12 +127,119 @@
def connect(self):
- return FakeConnection(self)
+ """
+ Implement the C{ConnectionFactory} callable expected by
+ L{ConnectionPool}.
+ """
+ if self._resultQueue:
+ thunk = self._resultQueue.pop(0)
+ else:
+ thunk = self._default
+ return thunk()
+ def willConnect(self):
+ """
+ Used by tests to queue a successful result for connect().
+ """
+ def thunk():
+ return FakeConnection(self)
+ self._resultQueue.append(thunk)
+
+ def willFail(self):
+ """
+ Used by tests to queue a successful result for connect().
+ """
+ def thunk():
+ raise FakeConnectionError()
+ self._resultQueue.append(thunk)
+
+
+ def defaultConnect(self):
+ """
+ By default, connection attempts will succeed.
+ """
+ self.willConnect()
+ self._default = self._resultQueue.pop()
+
+
+ def defaultFail(self):
+ """
+ By default, connection attempts will fail.
+ """
+ self.willFail()
+ self._default = self._resultQueue.pop()
+
+
+
+class FakeConnectionError(Exception):
+ """
+ Synthetic error that might occur during connection.
+ """
+
+
+
+class FakeThreadHolder(object):
+ """
+ Run things submitted to this ThreadHolder on the main thread, so that
+ execution is easier to control.
+ """
+
+ def __init__(self):
+ self.started = False
+ self.stopped = False
+
+
+ def start(self):
+ """
+ Mark this L{FakeThreadHolder} as not started.
+ """
+ self.started = True
+
+
+ def stop(self):
+ """
+ Mark this L{FakeThreadHolder} as stopped.
+ """
+ self.stopped = True
+
+
+ def submit(self, work):
+ """
+ Call the function.
+ """
+ return execute(work)
+
+
+
class ConnectionPoolTests(TestCase):
+ """
+ Tests for L{ConnectionPool}.
+ """
+ def setUp(self):
+ """
+ Create a L{ConnectionPool} attached to a C{ConnectionFactory}. Start
+ the L{ConnectionPool}.
+ """
+ self.holders = []
+ self.factory = ConnectionFactory()
+ self.pool = ConnectionPool(self.factory.connect, maxConnections=2)
+ self.pool._createHolder = self.makeAHolder
+ self.pool.startService()
+ self.addCleanup(self.pool.stopService)
+
+
+ def makeAHolder(self):
+ """
+ Make a ThreadHolder-alike.
+ """
+ fth = FakeThreadHolder()
+ self.holders.append(fth)
+ return fth
+
+
@inlineCallbacks
def test_tooManyConnections(self):
"""
@@ -136,19 +249,15 @@
backed by any L{BaseSqlTxn}; this object will queue its SQL statements
until an existing connection becomes available.
"""
- cf = ConnectionFactory()
- cp = ConnectionPool(cf.connect, maxConnections=2)
- cp.startService()
- self.addCleanup(cp.stopService)
- a = cp.connection()
+ a = self.pool.connection()
[[counter, echo]] = yield a.execSQL("alpha")
- b = cp.connection()
+ b = self.pool.connection()
[[bcounter, becho]] = yield b.execSQL("beta")
# both 'a' and 'b' are holding open a connection now; let's try to open
# a third one. (The ordering will be deterministic even if this fails,
# because those threads are already busy.)
- c = cp.connection()
+ c = self.pool.connection()
enqueue = c.execSQL("gamma")
x = []
def addtox(it):
@@ -157,11 +266,8 @@
enqueue.addCallback(addtox)
# Did 'c' open a connection? Let's hope not...
- self.assertEquals(len(cf.connections), 2)
- # This assertion is _not_ deterministic, unfortunately; it's unlikely
- # that the implementation could be adjusted such that this assertion
- # would fail and the others would succeed. However, if it does fail,
- # that's really bad, so I am leaving it regardless.
+ self.assertEquals(len(self.factory.connections), 2)
+
self.failIf(bool(x), "SQL executed too soon!")
yield b.commit()
@@ -173,3 +279,37 @@
self.assertEquals(ccounter, bcounter)
+ @inlineCallbacks
+ def test_stopService(self):
+ """
+ L{ConnectionPool.stopService} stops all the associated L{ThreadHolder}s
+ and thereby frees up the resources it is holding.
+ """
+ a = self.pool.connection()
+ [[counter, echo]] = yield a.execSQL("alpha")
+ self.assertEquals(len(self.holders), 1)
+ [holder] = self.holders
+ self.assertEquals(holder.started, True)
+ self.assertEquals(holder.stopped, False)
+ yield self.pool.stopService()
+ self.assertEquals(len(self.holders), 1)
+ self.assertEquals(holder.started, True)
+ self.assertEquals(holder.stopped, True)
+
+
+ def test_retryAfterConnectError(self):
+ """
+ When the C{connectionFactory} passed to L{ConnectionPool} raises an
+ exception, the L{ConnectionPool} will log the exception and delay
+ execution of a new connection's SQL methods until an attempt succeeds.
+ """
+ self.factory.defaultFail()
+ c = self.pool.connection()
+ errors = self.flushLoggedErrors(FakeConnectionError)
+ self.assertEquals(len(errors), 1)
+ d = c.execSQL("alpha")
+ happened = []
+ d.addBoth(happened.append)
+ self.assertEquals(happened, [])
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110128/cfa627cd/attachment-0001.html>
More information about the calendarserver-changes
mailing list