[CalendarServer-changes] [6831] CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Fri Jan 28 17:40:26 PST 2011


Revision: 6831
          http://trac.macosforge.org/projects/calendarserver/changeset/6831
Author:   glyph at apple.com
Date:     2011-01-28 17:40:26 -0800 (Fri, 28 Jan 2011)
Log Message:
-----------
test for stopService at another point, make sure we don't lose track of the connection attempt in progress

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
    CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py

Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py	2011-01-29 01:40:16 UTC (rev 6830)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py	2011-01-29 01:40:26 UTC (rev 6831)
@@ -293,7 +293,8 @@
 
     _retry = None
 
-    def __init__(self, holder):
+    def __init__(self, pool, holder):
+        self._pool = pool
         self._holder = holder
 
     def abort(self):
@@ -301,8 +302,12 @@
         # raise NotImplementedError()
         if self._retry is not None:
             self._retry.cancel()
-        # deferred, should be returned
-        self._holder.stop()
+        d = self._holder.stop()
+        def removeme(ignored):
+            if self in self._pool.busy:
+                self._pool.busy.remove(self)
+        d.addCallback(removeme)
+        return d
 
 
 
@@ -351,7 +356,8 @@
         """
         Forcibly abort any outstanding transactions.
         """
-        for busy in self.busy[:]:
+        while self.busy:
+            busy = self.busy[0]
             try:
                 yield busy.abort()
             except:
@@ -398,16 +404,12 @@
         holder = self._createHolder()
         holder.start()
         # FIXME: attach the holder to the txn so it can be aborted.
-        txn = _ConnectingPsuedoTxn(holder)
+        txn = _ConnectingPsuedoTxn(self, holder)
         # take up a slot in the 'busy' list, sit there so we can be aborted.
         self.busy.append(txn)
         def initCursor():
             # support threadlevel=1; we can't necessarily cursor() in a
             # different thread than we do transactions in.
-
-            # TODO: Re-try connect when it fails.  Specify a timeout.  That
-            # should happen in this layer because we need to be able to stop
-            # the reconnect attempt if it's hanging.
             connection = self.connectionFactory()
             cursor = connection.cursor()
             return (connection, cursor)

Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py	2011-01-29 01:40:16 UTC (rev 6830)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/test/test_adbapi2.py	2011-01-29 01:40:26 UTC (rev 6831)
@@ -27,6 +27,7 @@
 from twisted.internet.defer import execute
 from twisted.internet.task import Clock
 
+from twisted.internet.defer import Deferred
 from twext.enterprise.adbapi2 import ConnectionPool
 
 
@@ -188,9 +189,11 @@
     execution is easier to control.
     """
 
-    def __init__(self):
+    def __init__(self, test):
         self.started = False
         self.stopped = False
+        self.test = test
+        self.queue = []
 
 
     def start(self):
@@ -204,17 +207,38 @@
         """
         Mark this L{FakeThreadHolder} as stopped.
         """
-        self.stopped = True
+        def stopped(nothing):
+            self.stopped = True
+        return self.submit(lambda : None).addCallback(stopped)
 
 
     def submit(self, work):
         """
-        Call the function.
+        Call the function (or queue it)
         """
-        return execute(work)
+        if self.test.paused:
+            d = Deferred()
+            self.queue.append((d, work))
+            return d
+        else:
+            return execute(work)
 
 
+    def flush(self):
+        """
+        Fire all deferreds previously returned from submit.
+        """
+        self.queue, queue = [], self.queue
+        for (d, work) in queue:
+            try:
+                result = work()
+            except:
+                d.errback()
+            else:
+                d.callback(result)
 
+
+
 class ConnectionPoolTests(TestCase):
     """
     Tests for L{ConnectionPool}.
@@ -225,6 +249,7 @@
         Create a L{ConnectionPool} attached to a C{ConnectionFactory}.  Start
         the L{ConnectionPool}.
         """
+        self.paused = False
         self.holders = []
         self.factory = ConnectionFactory()
         self.pool = ConnectionPool(self.factory.connect, maxConnections=2)
@@ -234,11 +259,28 @@
         self.addCleanup(self.pool.stopService)
 
 
+    def flushHolders(self):
+        """
+        Flush all pending C{submit}s since C{pauseHolders} was called.
+        """
+        self.paused = False
+        for holder in self.holders:
+            holder.flush()
+
+
+    def pauseHolders(self):
+        """
+        Pause all L{FakeThreadHolder}s, causing C{submit} to return an unfired
+        L{Deferred}.
+        """
+        self.paused = True
+
+
     def makeAHolder(self):
         """
         Make a ThreadHolder-alike.
         """
-        fth = FakeThreadHolder()
+        fth = FakeThreadHolder(self)
         self.holders.append(fth)
         return fth
 
@@ -344,3 +386,23 @@
         self.assertEquals(holder.started, True)
         self.assertEquals(holder.stopped, True)
 
+
+    def test_shutdownDuringAttemptSuccess(self):
+        """
+        If L{ConnectionPool.stopService} is called while a connection attempt is
+        outstanding, the resulting L{Deferred} won't be fired until the
+        connection attempt has succeeded.
+        """
+        self.pauseHolders()
+        self.pool.connection()
+        stopd = []
+        self.pool.stopService().addBoth(stopd.append)
+        self.assertEquals(stopd, [])
+        self.flushHolders()
+        self.assertEquals(stopd, [None])
+        [holder] = self.holders
+        self.assertEquals(holder.started, True)
+        self.assertEquals(holder.stopped, True)
+        # FIXME: next, 'failed' case.
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110128/bbb61801/attachment.html>


More information about the calendarserver-changes mailing list