[CalendarServer-changes] [6857] CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/ adbapi2.py

source_changes at macosforge.org source_changes at macosforge.org
Thu Feb 3 11:00:27 PST 2011


Revision: 6857
          http://trac.macosforge.org/projects/calendarserver/changeset/6857
Author:   glyph at apple.com
Date:     2011-02-03 11:00:27 -0800 (Thu, 03 Feb 2011)
Log Message:
-----------
fix the outstanding-spooled-connection case.

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py

Modified: CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py	2011-02-03 19:00:24 UTC (rev 6856)
+++ CalendarServer/branches/users/glyph/db-reconnect/twext/enterprise/adbapi2.py	2011-02-03 19:00:27 UTC (rev 6857)
@@ -51,6 +51,8 @@
 
 from twext.internet.threadutils import ThreadHolder
 from twisted.internet.defer import succeed
+from twext.enterprise.ienterprise import ConnectionError
+from twisted.internet.defer import fail
 from twext.enterprise.ienterprise import AlreadyFinishedError, IAsyncTransaction
 
 
@@ -168,7 +170,25 @@
         return holder.stop()
 
 
+class FailedTxn(object):
+    """
+    An L{IAsyncTransaction} that indicates a local failure before we could even
+    communicate any statements (or possibly even any connection attempts) to the
+    server.
+    """
 
+    def _everything(self, *a, **kw):
+        """
+        Everything fails with a L{ConnectionError}.
+        """
+        return fail(ConnectionError())
+
+    execSQL = _everything
+    commit = _everything
+    abort = _everything
+
+
+
 class SpooledTxn(object):
     """
     A L{SpooledTxn} is an implementation of L{IAsyncTransaction} which cannot
@@ -270,15 +290,21 @@
 
     def abort(self):
         self._markComplete()
-        waiting = self._pool.waiting
-        if self in waiting:
-            waiting.remove(self)
-            # FIXME: waiting.remove()
-            return succeed(None)
+        if self in self._pool.waiting:
+            return self.stopWaiting()
         else:
             return self._pool._repoolAfter(self, super(PooledSqlTxn, self).abort())
 
 
+    def stopWaiting(self):
+        """
+        Stop waiting for a transaction and fail.
+        """
+        self._pool.waiting.remove(self)
+        self._unspoolOnto(FailedTxn())
+        return succeed(None)
+
+
     def _checkComplete(self):
         """
         If the transaction is complete, raise L{AlreadyFinishedError}
@@ -306,8 +332,6 @@
 
 
     def abort(self):
-        # not implemented yet, but let's fail rather than break the test
-        # raise NotImplementedError()
         if self._retry is not None:
             self._retry.cancel()
         d = self._holder.stop()
@@ -353,6 +377,9 @@
     @ivar waiting: The list of L{PooledSqlTxn} objects attached to a
         L{SpooledTxn}; i.e. those which are awaiting a connection to become free
         so that they can be executed.
+
+    @ivar stopping: Is this L{ConnectionPool} in the process of shutting down?
+        (If so, new connections will not be established.)
     """
 
     reactor = _reactor
@@ -371,6 +398,7 @@
         self.waiting = []
         self.finishing = []
         self.connecting = []
+        self.stopping = False
 
 
     def startService(self):
@@ -385,6 +413,14 @@
         Forcibly abort any outstanding transactions, and release all resources
         (notably, threads).
         """
+        # FIXME: actually honor this flag
+        self.stopping = True
+        # Phase 1: Cancel any transactions that are waiting so they won't try to
+        # eagerly acquire new connections as they flow into the free-list.
+        while self.waiting:
+            waiting = self.waiting[0]
+            yield waiting.stopWaiting()
+        # FIXME: there should be tests for these 'yield's.
         # Phase 1: All of the busy transactions must be aborted first.  As each
         # one is aborted, it will remove itself from the list.
         while self.busy:
@@ -477,7 +513,10 @@
         L{Deferred} has fired.
         """
         if txn in self.busy:
-            # should only _not_ in self.busy if its underlying txn is a SpooledTxn
+            # should only _not_ in self.busy if its underlying txn is a
+            # SpooledTxn.  TODO: when the SpooledTxn unspools, does it properly
+            # go into 'finishing' once it's re-submitted its commit or abort?  I
+            # don't think so.
             self.busy.remove(txn)
         finishRecord = (txn, d)
         self.finishing.append(finishRecord)
@@ -495,6 +534,14 @@
         """
         baseTxn = txn._baseTxn
         txn._baseTxn = None
+        if isinstance(baseTxn, FailedTxn):
+            # A bit of a hack, maybe clean this up, but: if a SpooledTxn spools
+            # a commit(), PooledTxn() will repoolAfter(commit()).  If the
+            # service stops before that PooledTxn gets a real backend, then we
+            # will receive it here.  If that's the case we can give up and just
+            # throw it away.  (Maybe the pool-management logic needs to be moved
+            # to BaseSqlTxn to avoid this kind of layering violation?)
+            return
         baseTxn.reset()
         if self.waiting:
             waiting = self.waiting.pop(0)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110203/1d1d0f24/attachment-0001.html>


More information about the calendarserver-changes mailing list