[CalendarServer-changes] [6825] CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Fri Jan 28 17:39:23 PST 2011


Revision: 6825
          http://trac.macosforge.org/projects/calendarserver/changeset/6825
Author:   glyph at apple.com
Date:     2011-01-28 17:39:23 -0800 (Fri, 28 Jan 2011)
Log Message:
-----------
Test which begins to demonstrate the issue (neatly sidestepping thread-interaction issues for now)

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
    CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py

Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py	2011-01-29 01:38:04 UTC (rev 6824)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py	2011-01-29 01:39:23 UTC (rev 6825)
@@ -68,14 +68,14 @@
     # FIXME: this should *really* be 
     paramstyle = DEFAULT_PARAM_STYLE
 
-    def __init__(self, connectionFactory, reactor=_reactor):
+    def __init__(self, connectionFactory, threadHolder):
         """
         @param connectionFactory: A 0-argument callable which returns a DB-API
             2.0 connection.
         """
         self._completed = False
         self._cursor = None
-        self._holder = ThreadHolder(reactor)
+        self._holder = threadHolder
         self._holder.start()
 
         def initCursor():
@@ -315,7 +315,6 @@
     @type maxConnections: C{int}
     """
 
-    reactor = _reactor
 
     def __init__(self, connectionFactory, maxConnections=10):
         super(ConnectionPool, self).__init__()
@@ -346,8 +345,17 @@
         # have put them there.
         for free in self.free:
             yield free.stop()
+        self.busy = []
+        self.free = []
 
 
+    def _createHolder(self):
+        """
+        Create a L{ThreadHolder}.  (Test hook.)
+        """
+        return ThreadHolder(_reactor)
+
+
     def connection(self, label="<unlabeled>"):
         """
         Find a transaction; either retrieve a free one from the list or
@@ -355,14 +363,13 @@
 
         @return: an L{IAsyncTransaction}
         """
-
         overload = False
         if self.free:
             basetxn = self.free.pop(0)
         elif len(self.busy) < self.maxConnections:
             basetxn = BaseSqlTxn(
                 connectionFactory=self.connectionFactory,
-                reactor=self.reactor
+                threadHolder=self._createHolder()
             )
         else:
             basetxn = SpooledTxn()

Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py	2011-01-29 01:38:04 UTC (rev 6824)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py	2011-01-29 01:39:23 UTC (rev 6825)
@@ -24,6 +24,7 @@
 
 from twisted.internet.defer import inlineCallbacks
 
+from twisted.internet.defer import execute
 from twext.enterprise.adbapi2 import ConnectionPool
 
 
@@ -110,10 +111,15 @@
 
 
 class ConnectionFactory(Parent):
+
+
     def __init__(self):
         Parent.__init__(self)
         self.idcounter = count(1)
+        self._resultQueue = []
+        self.defaultConnect()
 
+
     @property
     def connections(self):
         "Alias to make tests more readable."
@@ -121,12 +127,119 @@
 
 
     def connect(self):
-        return FakeConnection(self)
+        """
+        Implement the C{ConnectionFactory} callable expected by
+        L{ConnectionPool}.
+        """
+        if self._resultQueue:
+            thunk = self._resultQueue.pop(0)
+        else:
+            thunk = self._default
+        return thunk()
 
 
+    def willConnect(self):
+        """
+        Used by tests to queue a successful result for connect().
+        """
+        def thunk():
+            return FakeConnection(self)
+        self._resultQueue.append(thunk)
 
+
+    def willFail(self):
+        """
+        Used by tests to queue a successful result for connect().
+        """
+        def thunk():
+            raise FakeConnectionError()
+        self._resultQueue.append(thunk)
+
+
+    def defaultConnect(self):
+        """
+        By default, connection attempts will succeed.
+        """
+        self.willConnect()
+        self._default = self._resultQueue.pop()
+
+
+    def defaultFail(self):
+        """
+        By default, connection attempts will fail.
+        """
+        self.willFail()
+        self._default = self._resultQueue.pop()
+
+
+
+class FakeConnectionError(Exception):
+    """
+    Synthetic error that might occur during connection.
+    """
+
+
+
+class FakeThreadHolder(object):
+    """
+    Run things submitted to this ThreadHolder on the main thread, so that
+    execution is easier to control.
+    """
+
+    def __init__(self):
+        self.started = False
+        self.stopped = False
+
+
+    def start(self):
+        """
+        Mark this L{FakeThreadHolder} as not started.
+        """
+        self.started = True
+
+
+    def stop(self):
+        """
+        Mark this L{FakeThreadHolder} as stopped.
+        """
+        self.stopped = True
+
+
+    def submit(self, work):
+        """
+        Call the function.
+        """
+        return execute(work)
+
+
+
 class ConnectionPoolTests(TestCase):
+    """
+    Tests for L{ConnectionPool}.
+    """
 
+    def setUp(self):
+        """
+        Create a L{ConnectionPool} attached to a C{ConnectionFactory}.  Start
+        the L{ConnectionPool}.
+        """
+        self.holders = []
+        self.factory = ConnectionFactory()
+        self.pool = ConnectionPool(self.factory.connect, maxConnections=2)
+        self.pool._createHolder = self.makeAHolder
+        self.pool.startService()
+        self.addCleanup(self.pool.stopService)
+
+
+    def makeAHolder(self):
+        """
+        Make a ThreadHolder-alike.
+        """
+        fth = FakeThreadHolder()
+        self.holders.append(fth)
+        return fth
+
+
     @inlineCallbacks
     def test_tooManyConnections(self):
         """
@@ -136,19 +249,15 @@
         backed by any L{BaseSqlTxn}; this object will queue its SQL statements
         until an existing connection becomes available.
         """
-        cf = ConnectionFactory()
-        cp = ConnectionPool(cf.connect, maxConnections=2)
-        cp.startService()
-        self.addCleanup(cp.stopService)
-        a = cp.connection()
+        a = self.pool.connection()
         [[counter, echo]] = yield a.execSQL("alpha")
-        b = cp.connection()
+        b = self.pool.connection()
         [[bcounter, becho]] = yield b.execSQL("beta")
 
         # both 'a' and 'b' are holding open a connection now; let's try to open
         # a third one.  (The ordering will be deterministic even if this fails,
         # because those threads are already busy.)
-        c = cp.connection()
+        c = self.pool.connection()
         enqueue = c.execSQL("gamma")
         x = []
         def addtox(it):
@@ -157,11 +266,8 @@
         enqueue.addCallback(addtox)
 
         # Did 'c' open a connection?  Let's hope not...
-        self.assertEquals(len(cf.connections), 2)
-        # This assertion is _not_ deterministic, unfortunately; it's unlikely
-        # that the implementation could be adjusted such that this assertion
-        # would fail and the others would succeed.  However, if it does fail,
-        # that's really bad, so I am leaving it regardless.
+        self.assertEquals(len(self.factory.connections), 2)
+
         self.failIf(bool(x), "SQL executed too soon!")
         yield b.commit()
 
@@ -173,3 +279,37 @@
         self.assertEquals(ccounter, bcounter)
 
 
+    @inlineCallbacks
+    def test_stopService(self):
+        """
+        L{ConnectionPool.stopService} stops all the associated L{ThreadHolder}s
+        and thereby frees up the resources it is holding.
+        """
+        a = self.pool.connection()
+        [[counter, echo]] = yield a.execSQL("alpha")
+        self.assertEquals(len(self.holders), 1)
+        [holder] = self.holders
+        self.assertEquals(holder.started, True)
+        self.assertEquals(holder.stopped, False)
+        yield self.pool.stopService()
+        self.assertEquals(len(self.holders), 1)
+        self.assertEquals(holder.started, True)
+        self.assertEquals(holder.stopped, True)
+
+
+    def test_retryAfterConnectError(self):
+        """
+        When the C{connectionFactory} passed to L{ConnectionPool} raises an
+        exception, the L{ConnectionPool} will log the exception and delay
+        execution of a new connection's SQL methods until an attempt succeeds.
+        """
+        self.factory.defaultFail()
+        c = self.pool.connection()
+        errors = self.flushLoggedErrors(FakeConnectionError)
+        self.assertEquals(len(errors), 1)
+        d = c.execSQL("alpha")
+        happened = []
+        d.addBoth(happened.append)
+        self.assertEquals(happened, [])
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110128/cfa627cd/attachment-0001.html>


More information about the calendarserver-changes mailing list