[CalendarServer-changes] [8161] CalendarServer/branches/users/glyph/shared-pool-take2/twext/ enterprise

source_changes at macosforge.org source_changes at macosforge.org
Mon Oct 10 08:03:01 PDT 2011


Revision: 8161
          http://trac.macosforge.org/projects/calendarserver/changeset/8161
Author:   glyph at apple.com
Date:     2011-10-10 08:03:01 -0700 (Mon, 10 Oct 2011)
Log Message:
-----------
implementation of command blocks for shared connection pool, plus a few additional test tweaks to make it work

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/adbapi2.py
    CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/test/test_adbapi2.py

Modified: CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/adbapi2.py	2011-10-10 15:02:51 UTC (rev 8160)
+++ CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/adbapi2.py	2011-10-10 15:03:01 UTC (rev 8161)
@@ -57,8 +57,9 @@
 from twext.enterprise.ienterprise import IDerivedParameter
 
 from twisted.internet.defer import fail
+
 from twext.enterprise.ienterprise import (
-    AlreadyFinishedError, IAsyncTransaction, POSTGRES_DIALECT
+    AlreadyFinishedError, IAsyncTransaction, POSTGRES_DIALECT, ICommandBlock
 )
 
 
@@ -1063,10 +1064,27 @@
     """
     arguments = [('sql', String()),
                  ('queryID', String()),
-                 ('args', Pickle())] + txnarg()
+                 ('args', Pickle()),
+                 ('blockID', String())] + txnarg()
 
 
 
+class StartBlock(Command):
+    """
+    Create a new SQL command block.
+    """
+    arguments = [("blockID", String())] + txnarg()
+
+
+
+class EndBlock(Command):
+    """
+    Create a new SQL command block.
+    """
+    arguments = [("blockID", String())] + txnarg()
+
+
+
 class Row(Command):
     """
     A row has been returned.  Sent from server to client in response to
@@ -1118,6 +1136,7 @@
         super(ConnectionPoolConnection, self).__init__()
         self.pool  = pool
         self._txns = {}
+        self._blocks = {}
 
 
     @StartTxn.responder
@@ -1126,11 +1145,27 @@
         return {}
 
 
+    @StartBlock.responder
+    def startBlock(self, transactionID, blockID):
+        self._blocks[blockID] = self._txns[transactionID].commandBlock()
+        return {}
+
+
+    @EndBlock.responder
+    def endBlock(self, transactionID, blockID):
+        self._blocks[blockID].end()
+        return {}
+
+
     @ExecSQL.responder
     @inlineCallbacks
-    def receivedSQL(self, transactionID, queryID, sql, args):
+    def receivedSQL(self, transactionID, queryID, sql, args, blockID):
+        if blockID:
+            txn = self._blocks[blockID]
+        else:
+            txn = self._txns[transactionID]
         try:
-            rows = yield self._txns[transactionID].execSQL(sql, args, _NoRows)
+            rows = yield txn.execSQL(sql, args, _NoRows)
         except _NoRows:
             norows = True
         else:
@@ -1260,6 +1295,8 @@
         self._client        = client
         self._transactionID = transactionID
         self._completed     = False
+        self._committing    = False
+        self._committed     = False
 
 
     @property
@@ -1278,7 +1315,10 @@
         return self._client.dialect
 
 
-    def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+    def execSQL(self, sql, args=None, raiseOnZeroRowCount=None, blockID=""):
+        if not blockID:
+            if self._completed:
+                raise AlreadyFinishedError()
         if args is None:
             args = []
         queryID = str(self._client._nextID())
@@ -1286,7 +1326,7 @@
         result = (
             self._client.callRemote(
                 ExecSQL, queryID=queryID, sql=sql, args=args,
-                transactionID=self._transactionID
+                transactionID=self._transactionID, blockID=blockID
             )
             .addCallback(lambda nothing: query.deferred)
         )
@@ -1303,7 +1343,11 @@
 
 
     def commit(self):
-        return self._complete(Commit)
+        self._committing = True
+        def done(whatever):
+            self._committed = True
+            return whatever
+        return self._complete(Commit).addBoth(done)
 
 
     def abort(self):
@@ -1311,7 +1355,51 @@
 
 
     def commandBlock(self):
-        raise NotImplementedError()
+        if self._completed:
+            raise AlreadyFinishedError()
+        blockID = str(self._client._nextID())
+        self._client.callRemote(
+            StartBlock, blockID=blockID, transactionID=self._transactionID
+        )
+        return _NetCommandBlock(self, blockID)
 
 
 
+class _NetCommandBlock(object):
+    """
+    Net command block.
+    """
+
+    implements(ICommandBlock)
+
+    def __init__(self, transaction, blockID):
+        self._transaction = transaction
+        self._blockID = blockID
+        self._ended = False
+
+
+    def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+        """
+        Execute some SQL on this command block.
+        """
+        if  (self._ended or
+             self._transaction._completed and
+             not self._transaction._committing or
+             self._transaction._committed):
+            raise AlreadyFinishedError()
+        return self._transaction.execSQL(sql, args, raiseOnZeroRowCount,
+                                         self._blockID)
+
+
+    def end(self):
+        """
+        End this block.
+        """
+        if self._ended:
+            raise AlreadyFinishedError()
+        self._ended = True
+        self._transaction._client.callRemote(
+            EndBlock, blockID=self._blockID,
+            transactionID=self._transaction._transactionID
+        )
+

Modified: CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/test/test_adbapi2.py	2011-10-10 15:02:51 UTC (rev 8160)
+++ CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/test/test_adbapi2.py	2011-10-10 15:03:01 UTC (rev 8161)
@@ -1158,6 +1158,7 @@
         cb2.end()
         cb1.end()
         flush()
+        self.flushHolders()
         self.assertEquals(self.factory.connections[0].cursors[0].allExecutions,
                           [("a", []), ("b", []), ("c", []), ("d", []),
                            ("e", [])])
@@ -1192,11 +1193,12 @@
         txn = self.createTransaction()
         block = txn.commandBlock()
         commitResult = self.resultOf(txn.commit())
-        block.execSQL("in block")
+        self.resultOf(block.execSQL("in block"))
         self.assertEquals(commitResult, [])
         self.assertEquals(self.factory.connections[0].cursors[0].allExecutions,
                           [("in block", [])])
         block.end()
+        self.flushHolders()
         self.assertEquals(commitResult, [None])
 
 
@@ -1254,7 +1256,7 @@
         an exception.
         """
         txn = self.createTransaction()
-        txn.abort()
+        self.resultOf(txn.abort())
         self.assertRaises(AlreadyFinishedError, txn.commandBlock)
 
 
@@ -1326,7 +1328,8 @@
     # Don't run these tests.
     def test_propagateDialect(self):
         """
-        Paramstyle and dialect are configured differently.
+        Paramstyle and dialect are configured differently for
+        shared-connection-pool transactions.
         """
 
     test_propagateParamstyle = test_propagateDialect
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111010/ce0f79c2/attachment.html>


More information about the calendarserver-changes mailing list