[CalendarServer-changes] [7251] CalendarServer/branches/users/glyph/subtransactions/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Thu Mar 24 13:35:56 PDT 2011


Revision: 7251
          http://trac.macosforge.org/projects/calendarserver/changeset/7251
Author:   glyph at apple.com
Date:     2011-03-24 13:35:56 -0700 (Thu, 24 Mar 2011)
Log Message:
-----------
basic coverage of case where some latency is mixed in

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/adbapi2.py
    CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/test/test_adbapi2.py

Modified: CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/adbapi2.py	2011-03-24 20:35:44 UTC (rev 7250)
+++ CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/adbapi2.py	2011-03-24 20:35:56 UTC (rev 7251)
@@ -40,6 +40,7 @@
 
 from twisted.internet.defer import inlineCallbacks
 from twisted.internet.defer import returnValue
+from twisted.internet.defer import DeferredList
 from twisted.internet.defer import Deferred
 from twisted.protocols.amp import Boolean
 from twisted.python.failure import Failure
@@ -414,25 +415,26 @@
         if self._allPending:
             return
 
-        if self._currentBlock is not None:
-            if self._currentBlock._ended:
-                # ???: is this necessary?  We can immediately forget the block
-                # when it ends, we don't need to keep it around until all of its
-                # statements have completed.
-                self._currentBlock = None
-                bq = self._blockedQueue
-                self._blockedQueue = None
-                bq._unspool(self)
-            else:
-                return
+        if self._pendingBlocks and self._currentBlock is None:
+            self._currentBlock = self._pendingBlocks.pop(0)
 
-        if self._pendingBlocks:
-            block = self._pendingBlocks.pop(0)
-            self._currentBlock = block
-            self._blockedQueue = _WaitingTxn(self._pool)
-            block._startExecuting()
+        if self._currentBlock is not None and not self._currentBlock._started:
+            self._currentBlock._startExecuting().addCallback(
+                self._finishExecuting)
 
 
+    def _finishExecuting(self, result):
+        """
+        The active block just finished executing.
+        """
+        self._currentBlock = None
+        bq = self._blockedQueue
+        self._blockedQueue = None
+        bq._unspool(self)
+        # XXX need to check next block, there might have been nothing in the
+        # blocked executing queue.
+
+
     def commit(self):
         self._markComplete()
         return super(_SingleTxn, self).commit()
@@ -475,13 +477,27 @@
         Create an IAsyncTransaction that will wait for all currently spooled
         commands to complete before executing its own.
         """
-        cb = CommandBlock(self)
+        self._currentBlock = CommandBlock(self)
+        self._blockedQueue = _WaitingTxn(self._pool)
         # FIXME: test the case where it's ready immediately.
         self._checkNextBlock()
-        return cb
+        return self._currentBlock
 
 
 
+class _Unspooler(object):
+    def __init__(self, orig):
+        self.orig = orig
+
+
+    def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+        """
+        Execute some SQL, but don't track a new Deferred.
+        """
+        return self.orig.execSQL(sql, args, raiseOnZeroRowCount, False)
+
+
+
 class CommandBlock(object):
     """
     A partial implementation of L{IAsyncTransaction} that will group execSQL
@@ -498,26 +514,42 @@
         self.paramstyle = singleTxn.paramstyle
         self.dialect = singleTxn.dialect
         self._spool = _WaitingTxn(singleTxn._pool)
+        self._started = False
         self._ended = False
+        self._waitingForEnd = []
+        self._endDeferred = Deferred()
         singleTxn._pendingBlocks.append(self)
 
 
     def _startExecuting(self):
-        self._spool._unspool(self)
+        self._started = True
+        self._spool._unspool(_Unspooler(self))
+        return self._endDeferred
 
 
-    def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+    def execSQL(self, sql, args=None, raiseOnZeroRowCount=None, track=True):
         """
         Execute some SQL within this command block.
         """
         # FIXME: check 'ended'
-        if self._singleTxn._currentBlock is self:
-            return self._singleTxn._execSQLForBlock(
+        if self._singleTxn._currentBlock is self and self._started:
+            d = self._singleTxn._execSQLForBlock(
                 sql, args, raiseOnZeroRowCount, self)
         else:
-            return self._spool.execSQL(sql, args, raiseOnZeroRowCount)
+            d = self._spool.execSQL(sql, args, raiseOnZeroRowCount)
+        if track:
+            self._trackForEnd(d)
+        return d
 
 
+    def _trackForEnd(self, d):
+        """
+        Watch the following L{Deferred}, since we need to watch it to determine
+        when C{end} should be considered done.
+        """
+        self._waitingForEnd.append(d)
+
+
     def end(self):
         """
         The block of commands has completed.  Allow other SQL to run on the
@@ -527,6 +559,7 @@
         # executing block.
         self._ended = True
         self._singleTxn._checkNextBlock()
+        DeferredList(self._waitingForEnd).chainDeferred(self._endDeferred)
 
 
 

Modified: CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/test/test_adbapi2.py	2011-03-24 20:35:44 UTC (rev 7250)
+++ CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/test/test_adbapi2.py	2011-03-24 20:35:56 UTC (rev 7251)
@@ -891,3 +891,30 @@
         self.assertEquals(len(d), 1)
         self.assertEquals(len(e), 1)
 
+
+    def test_commandBlockWithLatency(self):
+        """
+        A block returned by L{IAsyncTransaction.commandBlock} won't start
+        executing until all SQL statements scheduled before it have completed.
+        """
+        self.pauseHolders()
+        txn = self.pool.connection()
+        a = resultOf(txn.execSQL("a"))
+        b = resultOf(txn.execSQL("b"))
+        cb = txn.commandBlock()
+        c = resultOf(cb.execSQL("c"))
+        d = resultOf(cb.execSQL("d"))
+        e = resultOf(txn.execSQL("e"))
+        cb.end()
+        self.flushHolders()
+
+        self.assertEquals(self.factory.connections[0].cursors[0].allExecutions,
+                          [("a", []), ("b", []), ("c", []), ("d", []),
+                           ("e", [])])
+
+        self.assertEquals(len(a), 1)
+        self.assertEquals(len(b), 1)
+        self.assertEquals(len(c), 1)
+        self.assertEquals(len(d), 1)
+        self.assertEquals(len(e), 1)
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110324/06800ba7/attachment-0001.html>


More information about the calendarserver-changes mailing list