[CalendarServer-changes] [7249] CalendarServer/branches/users/glyph/subtransactions/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Thu Mar 24 13:35:15 PDT 2011


Revision: 7249
          http://trac.macosforge.org/projects/calendarserver/changeset/7249
Author:   glyph at apple.com
Date:     2011-03-24 13:35:13 -0700 (Thu, 24 Mar 2011)
Log Message:
-----------
basic command block (low-level prerequisite for subtransactions) implementation

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/adbapi2.py
    CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/test/test_adbapi2.py

Modified: CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/adbapi2.py	2011-03-24 20:27:07 UTC (rev 7248)
+++ CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/adbapi2.py	2011-03-24 20:35:13 UTC (rev 7249)
@@ -166,12 +166,24 @@
 
     def _end(self, really):
         """
-        Common logic for commit or abort.  Executed in the cursor main thread.
+        Common logic for commit or abort.  Executed in the main reactor thread.
+
+        @param really: the callable to execute in the cursor thread to actually
+            do the commit or rollback.
+
+        @return: a L{Deferred} which fires when the database logic has
+            completed.
+
+        @raise: L{AlreadyFinishedError} if the transaction has already been
+            committed or aborted.
         """
         if not self._completed:
             self._completed = True
             def reallySomething():
-                # Executed in the cursor thread.
+                """
+                Do the database work and set appropriate flags.  Executed in the
+                cursor thread.
+                """
                 if self._cursor is None:
                     return
                 really()
@@ -264,6 +276,11 @@
     implements(IAsyncTransaction)
 
     def __init__(self, pool):
+        """
+        Initialize a L{_WaitingTxn} based on a L{ConnectionPool}.  (The C{pool}
+        is used only to reflect C{dialect} and C{paramstyle} attributes; not
+        remembered or modified in any way.)
+        """
         self._spool = []
         self.paramstyle = pool.paramstyle
         self.dialect = pool.dialect
@@ -335,9 +352,13 @@
     """
 
     def __init__(self, pool, baseTxn):
-        self._pool     = pool
-        self._baseTxn  = baseTxn
-        self._complete = False
+        self._pool          = pool
+        self._baseTxn       = baseTxn
+        self._complete      = False
+        self._currentBlock  = None
+        self._pendingBlocks = []
+        self._allPending    = []
+        self._blockedQueue  = None
 
 
     def __repr__(self):
@@ -358,11 +379,56 @@
         spooledBase._unspool(baseTxn)
 
 
-    def execSQL(self, *a, **kw):
+    def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+        return self._execSQLForBlock(sql, args, raiseOnZeroRowCount, None)
+
+
+    def _execSQLForBlock(self, sql, args, raiseOnZeroRowCount, block):
+        """
+        Execute some SQL for a particular L{CommandBlock}; or, if the given
+        C{block} is C{None}, execute it in the outermost transaction context.
+        """
         self._checkComplete()
-        return super(_SingleTxn, self).execSQL(*a, **kw)
+        if block is None and self._blockedQueue is not None:
+            return self._blockedQueue.execSQL(sql, args, raiseOnZeroRowCount)
+        d = super(_SingleTxn, self).execSQL(sql, args, raiseOnZeroRowCount)
+        self._allPending.append(d)
+        def itsDone(result):
+            self._allPending.remove(d)
+            self._checkNextBlock()
+            return result
+        d.addBoth(itsDone)
+        return d
 
 
+    def _checkNextBlock(self):
+        """
+        Check to see if there are any blocks pending statements waiting to
+        execute, and execute the next one if there are no outstanding execute
+        calls.
+        """
+        if self._allPending:
+            return
+
+        if self._currentBlock is not None:
+            if self._currentBlock._ended:
+                # ???: is this necessary?  We can immediately forget the block
+                # when it ends, we don't need to keep it around until all of its
+                # statements have completed.
+                self._currentBlock = None
+                bq = self._blockedQueue
+                self._blockedQueue = None
+                bq._unspool(self)
+            else:
+                return
+
+        if self._pendingBlocks:
+            block = self._pendingBlocks.pop(0)
+            self._currentBlock = block
+            self._blockedQueue = _WaitingTxn(self._pool)
+            block._startExecuting()
+
+
     def commit(self):
         self._markComplete()
         return super(_SingleTxn, self).commit()
@@ -400,7 +466,66 @@
         self._complete = True
 
 
+    def commandBlock(self):
+        """
+        Create an IAsyncTransaction that will wait for all currently spooled
+        commands to complete before executing its own.
+        """
+        cb = CommandBlock(self)
+        # FIXME: test the case where it's ready immediately.
+        self._checkNextBlock()
+        return cb
 
+
+
+class CommandBlock(object):
+    """
+    A partial implementation of L{IAsyncTransaction} that will group execSQL
+    calls together.
+
+    Does not implement commit() or abort(), because this will simply group
+    commands.  In order to implement sub-transactions or checkpoints, some
+    understanding of the SQL dialect in use by the underlying connection is
+    required.  Instead, it provides 'end'.
+    """
+
+    def __init__(self, singleTxn):
+        self._singleTxn = singleTxn
+        self.paramstyle = singleTxn.paramstyle
+        self.dialect = singleTxn.dialect
+        self._spool = _WaitingTxn(singleTxn._pool)
+        self._ended = False
+        singleTxn._pendingBlocks.append(self)
+
+
+    def _startExecuting(self):
+        self._spool._unspool(self)
+
+
+    def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+        """
+        Execute some SQL within this command block.
+        """
+        # FIXME: check 'ended'
+        if self._singleTxn._currentBlock is self:
+            return self._singleTxn._execSQLForBlock(
+                sql, args, raiseOnZeroRowCount, self)
+        else:
+            return self._spool.execSQL(sql, args, raiseOnZeroRowCount)
+
+
+    def end(self):
+        """
+        The block of commands has completed.  Allow other SQL to run on the
+        underlying L{IAsyncTransaction}.
+        """
+        # FIXME: test the case where end() is called when it's not the current
+        # executing block.
+        self._ended = True
+        self._singleTxn._checkNextBlock()
+
+
+
 class _ConnectingPseudoTxn(object):
 
     _retry = None

Modified: CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/test/test_adbapi2.py	2011-03-24 20:27:07 UTC (rev 7248)
+++ CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/test/test_adbapi2.py	2011-03-24 20:35:13 UTC (rev 7249)
@@ -146,6 +146,7 @@
         # not entirely correct, but all we care about is its truth value.
         self.description = False
         self.variables = []
+        self.allExecutions = []
 
 
     @property
@@ -158,6 +159,7 @@
         self.connection.executions += 1
         if self.connection._executeFailQueue:
             raise self.connection._executeFailQueue.pop(0)()
+        self.allExecutions.append((sql, args))
         self.sql = sql
         self.description = True
         self.rowcount = 1
@@ -424,7 +426,8 @@
         and thereby frees up the resources it is holding.
         """
         a = self.pool.connection()
-        [[[counter, echo]]] = resultOf(a.execSQL("alpha"))
+        alphaResult = resultOf(a.execSQL("alpha"))
+        [[[counter, echo]]] = alphaResult
         self.assertEquals(len(self.factory.connections), 1)
         self.assertEquals(len(self.holders), 1)
         [holder] = self.holders
@@ -847,3 +850,26 @@
         self.assertEquals(self.factory.connections[0].closed, True)
         self.assertEquals(self.factory.connections[1].closed, False)
 
+
+    def test_commandBlock(self):
+        """
+        L{IAsyncTransaction.commandBlock} returns an L{IAsyncTransaction}
+        provider which ensures that a block of commands are executed together.
+        """
+        txn = self.pool.connection()
+        a = resultOf(txn.execSQL("a"))
+        cb = txn.commandBlock()
+        b = resultOf(cb.execSQL("b"))
+        d = resultOf(txn.execSQL("d"))
+        c = resultOf(cb.execSQL("c"))
+        cb.end()
+        e = resultOf(txn.execSQL("e"))
+        self.assertEquals(self.factory.connections[0].cursors[0].allExecutions,
+                          [("a", []), ("b", []), ("c", []), ("d", []),
+                           ("e", [])])
+        self.assertEquals(len(a), 1)
+        self.assertEquals(len(b), 1)
+        self.assertEquals(len(c), 1)
+        self.assertEquals(len(d), 1)
+        self.assertEquals(len(e), 1)
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110324/3cefb2a8/attachment-0001.html>


More information about the calendarserver-changes mailing list