[CalendarServer-changes] [8175] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Mon Oct 10 08:11:56 PDT 2011


Revision: 8175
          http://trac.macosforge.org/projects/calendarserver/changeset/8175
Author:   glyph at apple.com
Date:     2011-10-10 08:11:56 -0700 (Mon, 10 Oct 2011)
Log Message:
-----------
Merge fix to make SharedConnectionPool work with commandBlock (i.e. to actually
function within recent versions of the server).  This will allow
SharedConnectionsPerPool to precisely configure the maximum number of database
connections from the whole server, rather than from each process.

Modified Paths:
--------------
    CalendarServer/trunk/calendarserver/tap/caldav.py
    CalendarServer/trunk/calendarserver/tap/util.py
    CalendarServer/trunk/twext/enterprise/adbapi2.py
    CalendarServer/trunk/twext/enterprise/dal/parseschema.py
    CalendarServer/trunk/twext/enterprise/dal/syntax.py
    CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py
    CalendarServer/trunk/twext/enterprise/ienterprise.py
    CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py

Property Changed:
----------------
    CalendarServer/trunk/


Property changes on: CalendarServer/trunk
___________________________________________________________________
Modified: svn:mergeinfo
   - /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/wsanchez/transations:5515-5593
   + /CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/wsanchez/transations:5515-5593

Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py	2011-10-10 15:05:54 UTC (rev 8174)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py	2011-10-10 15:11:56 UTC (rev 8175)
@@ -638,14 +638,20 @@
         L{makeService_Combined}, which does the work of actually handling
         CalDAV and CardDAV requests.
         """
+        if config.DBType == 'oracle':
+            dialect = ORACLE_DIALECT
+            paramstyle = 'numeric'
+        else:
+            dialect = POSTGRES_DIALECT
+            paramstyle = 'pyformat'
         pool = None
         if config.DBAMPFD:
-            txnFactory = transactionFactoryFromFD(int(config.DBAMPFD))
+            txnFactory = transactionFactoryFromFD(
+                int(config.DBAMPFD), dialect, paramstyle
+            )
         elif not config.UseDatabase:
             txnFactory = None
         elif not config.SharedConnectionPool:
-            dialect = POSTGRES_DIALECT
-            paramstyle = 'pyformat'
             if config.DBType == '':
                 # get a PostgresService to tell us what the local connection
                 # info is, but *don't* start it (that would start one postgres
@@ -655,8 +661,6 @@
             elif config.DBType == 'postgres':
                 connectionFactory = pgConnectorFromConfig(config)
             elif config.DBType == 'oracle':
-                dialect = ORACLE_DIALECT
-                paramstyle = 'numeric'
                 connectionFactory = oracleConnectorFromConfig(config)
             else:
                 raise UsageError("unknown DB type: %r" % (config.DBType,))

Modified: CalendarServer/trunk/calendarserver/tap/util.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/util.py	2011-10-10 15:05:54 UTC (rev 8174)
+++ CalendarServer/trunk/calendarserver/tap/util.py	2011-10-10 15:11:56 UTC (rev 8175)
@@ -156,13 +156,13 @@
 
 
 
-def transactionFactoryFromFD(dbampfd):
+def transactionFactoryFromFD(dbampfd, dialect, paramstyle):
     """
     Create a transaction factory from an inherited file descriptor.
     """
     skt = fromfd(dbampfd, AF_UNIX, SOCK_STREAM)
     os.close(dbampfd)
-    protocol = ConnectionPoolClient()
+    protocol = ConnectionPoolClient(dialect=dialect, paramstyle=paramstyle)
     transport = ConnectionWithPeer(skt, protocol)
     protocol.makeConnection(transport)
     transport.startReading()

Modified: CalendarServer/trunk/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/adbapi2.py	2011-10-10 15:05:54 UTC (rev 8174)
+++ CalendarServer/trunk/twext/enterprise/adbapi2.py	2011-10-10 15:11:56 UTC (rev 8175)
@@ -57,8 +57,9 @@
 from twext.enterprise.ienterprise import IDerivedParameter
 
 from twisted.internet.defer import fail
+
 from twext.enterprise.ienterprise import (
-    AlreadyFinishedError, IAsyncTransaction, POSTGRES_DIALECT
+    AlreadyFinishedError, IAsyncTransaction, POSTGRES_DIALECT, ICommandBlock
 )
 
 
@@ -670,6 +671,7 @@
     understanding of the SQL dialect in use by the underlying connection is
     required.  Instead, it provides 'end'.
     """
+    implements(ICommandBlock)
 
     def __init__(self, singleTxn):
         self._singleTxn = singleTxn
@@ -1048,12 +1050,49 @@
 
 
 
+class FailsafeException(Exception):
+    """
+    Exception raised by all responders.
+    """
 
+
+
+quashErrors = {
+    FailsafeException: "SOMETHING_UNKNOWN"
+}
+
+
+
+def failsafeResponder(command):
+    """
+    Wrap an AMP command responder in some fail-safe logic, to make it so that
+    unknown errors won't drop the connection, as AMP's default behavior would.
+    """
+    def wrap(inner):
+        @inlineCallbacks
+        def innerinner(*a, **k):
+            try:
+                val = yield inner(*a, **k)
+            except:
+                # FIXME: if this were a general thing, it should probably allow
+                # known errors through; look at the command's 'errors' attribute
+                # before collapsing into FailsafeException.
+                log.err(Failure(),
+                        "shared database connection pool encountered error")
+                raise FailsafeException()
+            else:
+                returnValue(val)
+        return command.responder(innerinner)
+    return wrap
+
+
+
 class StartTxn(Command):
     """
     Start a transaction, identified with an ID generated by the client.
     """
     arguments = txnarg()
+    errors = quashErrors
 
 
 
@@ -1063,10 +1102,30 @@
     """
     arguments = [('sql', String()),
                  ('queryID', String()),
-                 ('args', Pickle())] + txnarg()
+                 ('args', Pickle()),
+                 ('blockID', String())] + txnarg()
+    errors = quashErrors
 
 
 
+class StartBlock(Command):
+    """
+    Create a new SQL command block.
+    """
+    arguments = [("blockID", String())] + txnarg()
+    errors = quashErrors
+
+
+
+class EndBlock(Command):
+    """
+    Create a new SQL command block.
+    """
+    arguments = [("blockID", String())] + txnarg()
+    errors = quashErrors
+
+
+
 class Row(Command):
     """
     A row has been returned.  Sent from server to client in response to
@@ -1075,6 +1134,7 @@
 
     arguments = [('queryID', String()),
                  ('row', Pickle())]
+    errors = quashErrors
 
 
 
@@ -1084,17 +1144,21 @@
     """
 
     arguments = [('queryID', String()),
-                 ('norows', Boolean())]
+                 ('norows', Boolean()),
+                 ('derived', Pickle())]
+    errors = quashErrors
 
 
 
 class Commit(Command):
     arguments = txnarg()
+    errors = quashErrors
 
 
 
 class Abort(Command):
     arguments = txnarg()
+    errors = quashErrors
 
 
 
@@ -1107,7 +1171,8 @@
 class ConnectionPoolConnection(AMP):
     """
     A L{ConnectionPoolConnection} is a single connection to a
-    L{ConnectionPool}.
+    L{ConnectionPool}.  This is the server side of the connection-pool-sharing
+    protocol; it implements all the AMP responders necessary.
     """
 
     def __init__(self, pool):
@@ -1117,19 +1182,54 @@
         super(ConnectionPoolConnection, self).__init__()
         self.pool  = pool
         self._txns = {}
+        self._blocks = {}
 
 
-    @StartTxn.responder
+    def stopReceivingBoxes(self, why):
+        log.msg("(S) Stopped receiving boxes: " + why.getTraceback())
+
+
+    def unhandledError(self, failure):
+        """
+        An unhandled error has occurred.  Since we can't really classify errors
+        well on this protocol, log it and forget it.
+        """
+        log.err(failure, "Shared connection pool server encountered an error.")
+
+
+    @failsafeResponder(StartTxn)
     def start(self, transactionID):
         self._txns[transactionID] = self.pool.connection()
         return {}
 
 
-    @ExecSQL.responder
+    @failsafeResponder(StartBlock)
+    def startBlock(self, transactionID, blockID):
+        self._blocks[blockID] = self._txns[transactionID].commandBlock()
+        return {}
+
+
+    @failsafeResponder(EndBlock)
+    def endBlock(self, transactionID, blockID):
+        self._blocks[blockID].end()
+        return {}
+
+
+    @failsafeResponder(ExecSQL)
     @inlineCallbacks
-    def receivedSQL(self, transactionID, queryID, sql, args):
+    def receivedSQL(self, transactionID, queryID, sql, args, blockID):
+        derived = None
+        for param in args:
+            if IDerivedParameter.providedBy(param):
+                if derived is None:
+                    derived = []
+                derived.append(param)
+        if blockID:
+            txn = self._blocks[blockID]
+        else:
+            txn = self._txns[transactionID]
         try:
-            rows = yield self._txns[transactionID].execSQL(sql, args, _NoRows)
+            rows = yield txn.execSQL(sql, args, _NoRows)
         except _NoRows:
             norows = True
         else:
@@ -1139,7 +1239,9 @@
                     # Either this should be yielded or it should be
                     # requiresAnswer=False
                     self.callRemote(Row, queryID=queryID, row=row)
-        self.callRemote(QueryComplete, queryID=queryID, norows=norows)
+
+        self.callRemote(QueryComplete, queryID=queryID, norows=norows,
+                        derived=derived)
         returnValue({})
 
 
@@ -1148,20 +1250,24 @@
         return thunk(txn).addCallback(lambda ignored: {})
 
 
-    @Commit.responder
+    @failsafeResponder(Commit)
     def commit(self, transactionID):
         """
         Successfully complete the given transaction.
         """
-        return self._complete(transactionID, lambda x: x.commit())
+        def commitme(x):
+            return x.commit()
+        return self._complete(transactionID, commitme)
 
 
-    @Abort.responder
+    @failsafeResponder(Abort)
     def abort(self, transactionID):
         """
         Roll back the given transaction.
         """
-        return self._complete(transactionID, lambda x: x.abort())
+        def abortme(x):
+            return x.abort()
+        return self._complete(transactionID, abortme)
 
 
 
@@ -1169,13 +1275,29 @@
     """
     A client which can execute SQL.
     """
-    def __init__(self):
+
+    def __init__(self, dialect=POSTGRES_DIALECT, paramstyle=DEFAULT_PARAM_STYLE):
+        # See DEFAULT_PARAM_STYLE FIXME above.
         super(ConnectionPoolClient, self).__init__()
-        self._nextID  = count().next
-        self._txns    = {}
-        self._queries = {}
+        self._nextID    = count().next
+        self._txns      = {}
+        self._queries   = {}
+        self.dialect    = dialect
+        self.paramstyle = paramstyle
 
 
+    def unhandledError(self, failure):
+        """
+        An unhandled error has occurred.  Since we can't really classify errors
+        well on this protocol, log it and forget it.
+        """
+        log.err(failure, "Shared connection pool client encountered an error.")
+
+
+    def stopReceivingBoxes(self, why):
+        log.msg("(C) Stopped receiving boxes: " + why.getTraceback())
+
+
     def newTransaction(self):
         """
         Create a new networked provider of L{IAsyncTransaction}.
@@ -1192,21 +1314,22 @@
         return txn
 
 
-    @Row.responder
+    @failsafeResponder(Row)
     def row(self, queryID, row):
         self._queries[queryID].row(row)
         return {}
 
 
-    @QueryComplete.responder
-    def complete(self, queryID, norows):
-        self._queries.pop(queryID).done(norows)
+    @failsafeResponder(QueryComplete)
+    def complete(self, queryID, norows, derived):
+        self._queries.pop(queryID).done(norows, derived)
         return {}
 
 
 
 class _Query(object):
-    def __init__(self, raiseOnZeroRowCount):
+    def __init__(self, raiseOnZeroRowCount, args):
+        self.args                = args
         self.results             = []
         self.deferred            = Deferred()
         self.raiseOnZeroRowCount = raiseOnZeroRowCount
@@ -1219,12 +1342,28 @@
         self.results.append(row)
 
 
-    def done(self, norows):
+    def done(self, norows, derived):
         """
         The query is complete.
 
         @param norows: A boolean.  True if there were not any rows.
+
+        @param derived: either C{None} or a C{list} of L{IDerivedParameter}
+            providers initially passed into the C{execSQL} that started this
+            query.  The values of these object swill mutate the original input
+            parameters to resemble them.  Although L{IDerivedParameter.preQuery}
+            and L{IDerivedParameter.postQuery} are invoked on the other end of
+            the wire, the local objects will be made to appear as though they
+            were called here.
         """
+        if derived is not None:
+            # 1) Bleecchh.
+            # 2) FIXME: add some direct tests in test_adbapi2, the unit test for
+            # this crosses some abstraction boundaries so it's a little
+            # integration-y and in the tests for twext.enterprise.dal
+            for remote, local in zip(derived, self._deriveDerived()):
+                local.__dict__ = remote.__dict__
+
         if norows and (self.raiseOnZeroRowCount is not None):
             exc = self.raiseOnZeroRowCount()
             self.deferred.errback(Failure(exc))
@@ -1232,7 +1371,17 @@
             self.deferred.callback(self.results)
 
 
+    def _deriveDerived(self):
+        derived = None
+        for param in self.args:
+            if IDerivedParameter.providedBy(param):
+                if derived is None:
+                    derived = []
+                derived.append(param)
+        return derived
 
+
+
 class _NetTransaction(object):
     """
     A L{_NetTransaction} is an L{AMP}-protocol-based provider of the
@@ -1242,9 +1391,6 @@
 
     implements(IAsyncTransaction)
 
-    # See DEFAULT_PARAM_STYLE FIXME above.
-    paramstyle = DEFAULT_PARAM_STYLE
-
     def __init__(self, client, transactionID):
         """
         Initialize a transaction with a L{ConnectionPoolClient} and a unique
@@ -1253,16 +1399,43 @@
         self._client        = client
         self._transactionID = transactionID
         self._completed     = False
+        self._committing    = False
+        self._committed     = False
 
 
-    def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+    @property
+    def paramstyle(self):
+        """
+        Forward 'paramstyle' attribute to the client.
+        """
+        return self._client.paramstyle
+
+
+    @property
+    def dialect(self):
+        """
+        Forward 'dialect' attribute to the client.
+        """
+        return self._client.dialect
+
+
+    def execSQL(self, sql, args=None, raiseOnZeroRowCount=None, blockID=""):
+        if not blockID:
+            if self._completed:
+                raise AlreadyFinishedError()
         if args is None:
             args = []
-        queryID = str(self._client._nextID())
-        query = self._client._queries[queryID] = _Query(raiseOnZeroRowCount)
-        self._client.callRemote(ExecSQL, queryID=queryID, sql=sql, args=args,
-                                transactionID=self._transactionID)
-        return query.deferred
+        client = self._client
+        queryID = str(client._nextID())
+        query = client._queries[queryID] = _Query(raiseOnZeroRowCount, args)
+        result = (
+            client.callRemote(
+                ExecSQL, queryID=queryID, sql=sql, args=args,
+                transactionID=self._transactionID, blockID=blockID
+            )
+            .addCallback(lambda nothing: query.deferred)
+        )
+        return result
 
 
     def _complete(self, command):
@@ -1271,14 +1444,84 @@
         self._completed = True
         return self._client.callRemote(
             command, transactionID=self._transactionID
-            ).addCallback(lambda x: None)
+        ).addCallback(lambda x: None)
 
 
     def commit(self):
-        return self._complete(Commit)
+        self._committing = True
+        def done(whatever):
+            self._committed = True
+            return whatever
+        return self._complete(Commit).addBoth(done)
 
 
     def abort(self):
         return self._complete(Abort)
 
 
+    def commandBlock(self):
+        if self._completed:
+            raise AlreadyFinishedError()
+        blockID = str(self._client._nextID())
+        self._client.callRemote(
+            StartBlock, blockID=blockID, transactionID=self._transactionID
+        )
+        return _NetCommandBlock(self, blockID)
+
+
+
+class _NetCommandBlock(object):
+    """
+    Net command block.
+    """
+
+    implements(ICommandBlock)
+
+    def __init__(self, transaction, blockID):
+        self._transaction = transaction
+        self._blockID = blockID
+        self._ended = False
+
+
+    @property
+    def paramstyle(self):
+        """
+        Forward 'paramstyle' attribute to the transaction.
+        """
+        return self._transaction.paramstyle
+
+
+    @property
+    def dialect(self):
+        """
+        Forward 'dialect' attribute to the transaction.
+        """
+        return self._transaction.dialect
+
+
+
+    def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+        """
+        Execute some SQL on this command block.
+        """
+        if  (self._ended or
+             self._transaction._completed and
+             not self._transaction._committing or
+             self._transaction._committed):
+            raise AlreadyFinishedError()
+        return self._transaction.execSQL(sql, args, raiseOnZeroRowCount,
+                                         self._blockID)
+
+
+    def end(self):
+        """
+        End this block.
+        """
+        if self._ended:
+            raise AlreadyFinishedError()
+        self._ended = True
+        self._transaction._client.callRemote(
+            EndBlock, blockID=self._blockID,
+            transactionID=self._transaction._transactionID
+        )
+

Modified: CalendarServer/trunk/twext/enterprise/dal/parseschema.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/parseschema.py	2011-10-10 15:05:54 UTC (rev 8174)
+++ CalendarServer/trunk/twext/enterprise/dal/parseschema.py	2011-10-10 15:11:56 UTC (rev 8175)
@@ -448,7 +448,7 @@
     Retrieve a value from an iterator and check its properties.  Same signature
     as L{expectSingle}, except it takes an iterator instead of a value.
 
-    @see L{expectSingle}
+    @see: L{expectSingle}
     """
     nextval = iterator.next()
     return expectSingle(nextval, **kw)

Modified: CalendarServer/trunk/twext/enterprise/dal/syntax.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/syntax.py	2011-10-10 15:05:54 UTC (rev 8174)
+++ CalendarServer/trunk/twext/enterprise/dal/syntax.py	2011-10-10 15:11:56 UTC (rev 8175)
@@ -1020,6 +1020,7 @@
 
     def postQuery(self, cursor):
         self.value = mapOracleOutputType(self.var.getvalue())
+        self.var = None
 
 
 

Modified: CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py	2011-10-10 15:05:54 UTC (rev 8174)
+++ CalendarServer/trunk/twext/enterprise/dal/test/test_sqlsyntax.py	2011-10-10 15:11:56 UTC (rev 8175)
@@ -18,23 +18,24 @@
 Tests for L{twext.enterprise.dal.syntax}
 """
 
-from twext.enterprise.dal.model import Schema
 from twext.enterprise.dal.parseschema import addSQLToSchema
 from twext.enterprise.dal import syntax
 from twext.enterprise.dal.syntax import (
-    SchemaSyntax, Select, Insert, Update, Delete, Lock, SQLFragment,
-    TableMismatch, Parameter, Max, Len, NotEnoughValues
-, Savepoint, RollbackToSavepoint, ReleaseSavepoint, SavepointAction)
+    Select, Insert, Update, Delete, Lock, SQLFragment,
+    TableMismatch, Parameter, Max, Len, NotEnoughValues,
+    Savepoint, RollbackToSavepoint, ReleaseSavepoint, SavepointAction
+)
 
 from twext.enterprise.dal.syntax import Function
 
 from twext.enterprise.dal.syntax import FixedPlaceholder, NumericPlaceholder
 from twext.enterprise.ienterprise import POSTGRES_DIALECT, ORACLE_DIALECT
-from twext.enterprise.test.test_adbapi2 import ConnectionFactory
-from twext.enterprise.adbapi2 import ConnectionPool
 from twext.enterprise.test.test_adbapi2 import resultOf
-from twext.enterprise.test.test_adbapi2 import FakeThreadHolder
 from twisted.internet.defer import succeed
+from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
+from twext.enterprise.dal.syntax import SchemaSyntax
+from twext.enterprise.test.test_adbapi2 import ConnectionPoolHelper
+from twext.enterprise.test.test_adbapi2 import NetworkedPoolHelper
 from twisted.trial.unittest import TestCase
 
 
@@ -69,28 +70,36 @@
     def execSQL(self, text, params, exc):
         return succeed([[None, None]])
 
-class GenerationTests(TestCase):
+
+EXAMPLE_SCHEMA = """
+create sequence A_SEQ;
+create table FOO (BAR integer, BAZ varchar(255));
+create table BOZ (QUX integer, QUUX integer);
+create table OTHER (BAR integer,
+                    FOO_BAR integer not null);
+create table TEXTUAL (MYTEXT varchar(255));
+create table LEVELS (ACCESS integer,
+                     USERNAME varchar(255));
+create table NULLCHECK (ASTRING varchar(255) not null,
+                        ANUMBER integer);
+"""
+
+class ExampleSchemaHelper(SchemaTestHelper):
     """
-    Tests for syntactic helpers to generate SQL queries.
+    setUp implementor.
     """
 
     def setUp(self):
-        s = Schema(self.id())
-        addSQLToSchema(schema=s, schemaData="""
-                       create sequence A_SEQ;
-                       create table FOO (BAR integer, BAZ varchar(255));
-                       create table BOZ (QUX integer, QUUX integer);
-                       create table OTHER (BAR integer,
-                                           FOO_BAR integer not null);
-                       create table TEXTUAL (MYTEXT varchar(255));
-                       create table LEVELS (ACCESS integer,
-                                            USERNAME varchar(255));
-                       create table NULLCHECK (ASTRING varchar(255) not null,
-                                               ANUMBER integer);
-                       """)
-        self.schema = SchemaSyntax(s)
+        self.schema = SchemaSyntax(self.schemaFromString(EXAMPLE_SCHEMA))
 
 
+
+class GenerationTests(ExampleSchemaHelper, TestCase):
+    """
+    Tests for syntactic helpers to generate SQL queries.
+    """
+
+
     def test_simplestSelect(self):
         """
         L{Select} generates a 'select' statement, by default, asking for all
@@ -623,46 +632,6 @@
         )
 
 
-    def simulateOracleConnection(self):
-        """
-        Create a fake oracle-ish connection pool without using real threads or a
-        real database.
-
-        @return: a 3-tuple of L{IAsyncTransaction}, L{ConnectionPool},
-            L{ConnectionFactory}.
-        """
-        self.patch(syntax, 'cx_Oracle', FakeCXOracleModule)
-        factory    = ConnectionFactory()
-        pool       = ConnectionPool(factory.connect, maxConnections=2,
-                                    dialect=ORACLE_DIALECT,
-                                    paramstyle='numeric')
-        self.paused = False
-        pool._createHolder = lambda : FakeThreadHolder(self)
-        pool.startService()
-        conn = pool.connection()
-        return conn, pool, factory
-
-
-    def test_insertMultiReturnOnOracleTxn(self):
-        """
-        As described in L{test_insertMultiReturnOracle}, Oracle deals with
-        'returning' clauses by using out parameters.  However, this is not quite
-        enough, as the code needs to actually retrieve the values from the out
-        parameters.
-        """
-        conn, _ignore_pool, factory = self.simulateOracleConnection()
-        i = Insert({self.schema.FOO.BAR: 40,
-                    self.schema.FOO.BAZ: 50},
-                   Return=(self.schema.FOO.BAR, self.schema.FOO.BAZ))
-        # See fake result generation in test_adbapi2.py.
-        result = resultOf(i.on(conn))
-        self.assertEquals(result, [[[300, 301]]])
-        curvars = factory.connections[0].cursors[0].variables
-        self.assertEquals(len(curvars), 2)
-        self.assertEquals(curvars[0].type, FakeCXOracleModule.NUMBER)
-        self.assertEquals(curvars[1].type, FakeCXOracleModule.STRING)
-
-
     def test_insertMismatch(self):
         """
         L{Insert} raises L{TableMismatch} if the columns specified aren't all
@@ -985,26 +954,6 @@
         self.assertEquals(rows, [['', None]])
 
 
-    def test_rewriteOracleNULLs_Insert(self):
-        """
-        The behavior described in L{test_rewriteOracleNULLs_Select} applies to
-        other statement types as well, specifically those with 'returning'
-        clauses.
-        """
-        conn, _ignore_pool, factory = self.simulateOracleConnection()
-        # Add 2 cursor variable values so that these will be used by
-        # FakeVariable.getvalue.
-        factory.varvals.extend([None, None])
-        rows = resultOf(
-            Insert({self.schema.NULLCHECK.ASTRING: '',
-                    self.schema.NULLCHECK.ANUMBER: None},
-                   Return=[self.schema.NULLCHECK.ASTRING,
-                           self.schema.NULLCHECK.ANUMBER]
-                  ).on(conn))[0]
-
-        self.assertEquals(rows, [['', None]])
-
-
     def test_nestedLogicalExpressions(self):
         """
         Make sure that logical operator precedence inserts proper parenthesis
@@ -1151,3 +1100,75 @@
         self.assertEquals(values, {})
 
 
+
+class OracleConnectionMethods(object):
+    def test_rewriteOracleNULLs_Insert(self):
+        """
+        The behavior described in L{test_rewriteOracleNULLs_Select} applies to
+        other statement types as well, specifically those with 'returning'
+        clauses.
+        """
+        # Add 2 cursor variable values so that these will be used by
+        # FakeVariable.getvalue.
+        self.factory.varvals.extend([None, None])
+        rows = self.resultOf(
+            Insert({self.schema.NULLCHECK.ASTRING: '',
+                    self.schema.NULLCHECK.ANUMBER: None},
+                   Return=[self.schema.NULLCHECK.ASTRING,
+                           self.schema.NULLCHECK.ANUMBER]
+                  ).on(self.createTransaction()))[0]
+        self.assertEquals(rows, [['', None]])
+
+
+    def test_insertMultiReturnOnOracleTxn(self):
+        """
+        As described in L{test_insertMultiReturnOracle}, Oracle deals with
+        'returning' clauses by using out parameters.  However, this is not quite
+        enough, as the code needs to actually retrieve the values from the out
+        parameters.
+        """
+        i = Insert({self.schema.FOO.BAR: 40,
+                    self.schema.FOO.BAZ: 50},
+                   Return=(self.schema.FOO.BAR, self.schema.FOO.BAZ))
+        self.factory.varvals.extend(["first val!", "second val!"])
+        result = self.resultOf(i.on(self.createTransaction()))
+        self.assertEquals(result, [[["first val!", "second val!"]]])
+        curvars = self.factory.connections[0].cursors[0].variables
+        self.assertEquals(len(curvars), 2)
+        self.assertEquals(curvars[0].type, FakeCXOracleModule.NUMBER)
+        self.assertEquals(curvars[1].type, FakeCXOracleModule.STRING)
+
+
+
+class OracleConnectionTests(ConnectionPoolHelper, ExampleSchemaHelper,
+                            OracleConnectionMethods, TestCase):
+    """
+    Tests which use an oracle connection.
+    """
+
+    dialect = ORACLE_DIALECT
+
+    def setUp(self):
+        """
+        Create a fake oracle-ish connection pool without using real threads or a
+        real database.
+        """
+        self.patch(syntax, 'cx_Oracle', FakeCXOracleModule)
+        super(OracleConnectionTests, self).setUp()
+        ExampleSchemaHelper.setUp(self)
+
+
+
+
+class OracleNetConnectionTests(NetworkedPoolHelper, ExampleSchemaHelper,
+                               OracleConnectionMethods, TestCase):
+
+    dialect = ORACLE_DIALECT
+
+    def setUp(self):
+        self.patch(syntax, 'cx_Oracle', FakeCXOracleModule)
+        super(OracleNetConnectionTests, self).setUp()
+        ExampleSchemaHelper.setUp(self)
+        self.pump.client.dialect = ORACLE_DIALECT
+
+

Modified: CalendarServer/trunk/twext/enterprise/ienterprise.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/ienterprise.py	2011-10-10 15:05:54 UTC (rev 8174)
+++ CalendarServer/trunk/twext/enterprise/ienterprise.py	2011-10-10 15:11:56 UTC (rev 8175)
@@ -46,12 +46,9 @@
 
 
 
-class IAsyncTransaction(Interface):
+class ISQLExecutor(Interface):
     """
-    Asynchronous execution of SQL.
-
-    Note that there is no {begin()} method; if an L{IAsyncTransaction} exists,
-    it is assumed to have been started.
+    Base SQL-execution interface, for a group of commands or a transaction.
     """
 
     paramstyle = Attribute(
@@ -87,6 +84,15 @@
         """
 
 
+
+class IAsyncTransaction(ISQLExecutor):
+    """
+    Asynchronous execution of SQL.
+
+    Note that there is no {begin()} method; if an L{IAsyncTransaction} exists at
+    all, it is assumed to have been started.
+    """
+
     def commit():
         """
         Commit changes caused by this transaction.
@@ -105,7 +111,73 @@
         """
 
 
+    def commandBlock():
+        """
+        Create an object which will cause the commands executed on it to be
+        grouped together.
 
+        This is useful when using database-specific features such as
+        sub-transactions where order of execution is importnat, but where
+        application code may need to perform I/O to determine what SQL, exactly,
+        it wants to execute.  Consider this fairly contrived example for an
+        imaginary database::
+
+            def storeWebPage(url, block):
+                block.execSQL("BEGIN SUB TRANSACTION")
+                got = getPage(url)
+                def gotPage(data):
+                    block.execSQL("INSERT INTO PAGES (TEXT) VALUES (?)", [data])
+                    block.execSQL("INSERT INTO INDEX (TOKENS) VALUES (?)",
+                                 [tokenize(data)])
+                    lastStmt = block.execSQL("END SUB TRANSACTION")
+                    block.end()
+                    return lastStmt
+                return got.addCallback(gotPage)
+            gatherResults([storeWebPage(url, txn.commandBlock())
+                          for url in urls]).addCallbacks(
+                            lambda x: txn.commit(), lambda f: txn.abort()
+                          )
+
+        This fires off all the C{getPage} requests in parallel, and prepares all
+        the necessary SQL immediately as the results arrive, but executes those
+        statements in order.  In the above example, this makes sure to store the
+        page and its tokens together, another use for this might be to store a
+        computed aggregate (such as a sum) at a particular point in a
+        transaction, without sacrificing parallelism.
+
+        @rtype: L{ICommandBlock}
+        """
+
+
+
+class ICommandBlock(ISQLExecutor):
+    """
+    This is a block of SQL commands that are grouped together.
+
+    @see: L{IAsyncTransaction.commandBlock}
+    """
+
+    def end():
+        """
+        End this command block, allowing other commands queued on the underlying
+        transaction to end.
+
+        @note: This is I{not} the same as either L{IAsyncTransaction.commit} or
+            L{IAsyncTransaction.abort}, since it does not denote success or
+            failure; merely that the command block has completed and other
+            statements may now be executed.  Since sub-transactions are a
+            database-specific feature, they must be implemented at a
+            higher-level than this facility provides (although this facility may
+            be useful in their implementation).  Also note that, unlike either
+            of those methods, this does I{not} return a Deferred: if you want to
+            know when the block has completed, simply add a callback to the last
+            L{ICommandBlock.execSQL} call executed on this L{ICommandBlock}.
+            (This may be changed in a future version for the sake of
+            convenience, however.)
+        """
+
+
+
 class IDerivedParameter(Interface):
     """
     A parameter which needs to be derived from the underlying DB-API cursor;
@@ -114,6 +186,11 @@
     C{args} argument to L{IAsyncTransaction.execSQL}, it will have its
     C{prequery} and C{postquery} methods invoked on it before and after
     executing the SQL query in question, respectively.
+
+    @note: L{IDerivedParameter} providers must also always be I{pickleable},
+        because in some cases the actual database cursor objects will be on the
+        other end of a network connection.  For an explanation of why this
+        might be, see L{twext.enterprise.adbapi2.ConnectionPoolConnection}.
     """
 
     def preQuery(cursor):

Modified: CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py	2011-10-10 15:05:54 UTC (rev 8174)
+++ CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py	2011-10-10 15:11:56 UTC (rev 8175)
@@ -20,19 +20,30 @@
 
 from itertools import count
 
-from zope.interface.verify import verifyClass
+from zope.interface.verify import verifyClass, verifyObject
+from zope.interface.declarations import implements
 
 from twisted.python.threadpool import ThreadPool
+
 from twisted.trial.unittest import TestCase
 
 from twisted.internet.defer import execute
 from twisted.internet.task import Clock
 
+from twisted.internet.interfaces import IReactorThreads
 from twisted.internet.defer import Deferred
+
+from twisted.test.proto_helpers import StringTransport
+
 from twext.enterprise.ienterprise import ConnectionError
 from twext.enterprise.ienterprise import AlreadyFinishedError
-from twisted.internet.interfaces import IReactorThreads
-from zope.interface.declarations import implements
+from twext.enterprise.adbapi2 import ConnectionPoolClient
+from twext.enterprise.adbapi2 import ConnectionPoolConnection
+from twext.enterprise.ienterprise import IAsyncTransaction
+from twext.enterprise.ienterprise import POSTGRES_DIALECT
+from twext.enterprise.ienterprise import ICommandBlock
+from twext.enterprise.adbapi2 import FailsafeException
+from twext.enterprise.adbapi2 import DEFAULT_PARAM_STYLE
 from twext.enterprise.adbapi2 import ConnectionPool
 
 
@@ -216,7 +227,11 @@
         return self.cursor.variables.index(self) + 300
 
 
+    def __reduce__(self):
+        raise RuntimeError("Not pickleable (since oracle vars aren't)")
 
+
+
 class ConnectionFactory(Parent):
 
     rollbackFail = False
@@ -421,11 +436,15 @@
 
 
 
-class ConnectionPoolTests(TestCase):
+class ConnectionPoolHelper(object):
     """
-    Tests for L{ConnectionPool}.
+    Connection pool setting-up facilities for tests that need a
+    L{ConnectionPool}.
     """
 
+    dialect = POSTGRES_DIALECT
+    paramstyle = DEFAULT_PARAM_STYLE
+
     def setUp(self):
         """
         Create a L{ConnectionPool} attached to a C{ConnectionFactory}.  Start
@@ -435,24 +454,21 @@
         self.holders            = []
         self.factory            = ConnectionFactory()
         self.pool               = ConnectionPool(self.factory.connect,
-                                                 maxConnections=2)
+                                                 maxConnections=2,
+                                                 dialect=self.dialect,
+                                                 paramstyle=self.paramstyle)
         self.pool._createHolder = self.makeAHolder
         self.clock              = self.pool.reactor = ClockWithThreads()
         self.pool.startService()
+        self.addCleanup(self.flushHolders)
 
 
-    def tearDown(self):
+    def flushHolders(self):
         """
-        Make sure the service is stopped and the fake ThreadHolders are all
+        Flush all pending C{submit}s since C{pauseHolders} was called.  This
+        makes sure the service is stopped and the fake ThreadHolders are all
         executing their queues so failed tests can exit cleanly.
         """
-        self.flushHolders()
-
-
-    def flushHolders(self):
-        """
-        Flush all pending C{submit}s since C{pauseHolders} was called.
-        """
         self.paused = False
         for holder in self.holders:
             holder.flush()
@@ -475,6 +491,24 @@
         return fth
 
 
+    def resultOf(self, it):
+        return resultOf(it)
+
+
+    def createTransaction(self):
+        return self.pool.connection()
+
+
+    def translateError(self, err):
+        return err
+
+
+
+class ConnectionPoolTests(ConnectionPoolHelper, TestCase):
+    """
+    Tests for L{ConnectionPool}.
+    """
+
     def test_tooManyConnections(self):
         """
         When the number of outstanding busy transactions exceeds the number of
@@ -483,29 +517,29 @@
         not backed by any real database connection; this object will queue its
         SQL statements until an existing connection becomes available.
         """
-        a = self.pool.connection()
+        a = self.createTransaction()
 
-        alphaResult = resultOf(a.execSQL("alpha"))
+        alphaResult = self.resultOf(a.execSQL("alpha"))
         [[counter, echo]] = alphaResult[0]
 
-        b = self.pool.connection()
+        b = self.createTransaction()
         # 'b' should have opened a connection.
         self.assertEquals(len(self.factory.connections), 2)
-        betaResult = resultOf(b.execSQL("beta"))
+        betaResult = self.resultOf(b.execSQL("beta"))
         [[bcounter, becho]] = betaResult[0]
 
         # both 'a' and 'b' are holding open a connection now; let's try to open
         # a third one.  (The ordering will be deterministic even if this fails,
         # because those threads are already busy.)
-        c = self.pool.connection()
-        gammaResult = resultOf(c.execSQL("gamma"))
+        c = self.createTransaction()
+        gammaResult = self.resultOf(c.execSQL("gamma"))
 
         # Did 'c' open a connection?  Let's hope not...
         self.assertEquals(len(self.factory.connections), 2)
         # SQL shouldn't be executed too soon...
         self.assertEquals(gammaResult, [])
 
-        commitResult = resultOf(b.commit())
+        commitResult = self.resultOf(b.commit())
 
         # Now that 'b' has committed, 'c' should be able to complete.
         [[ccounter, cecho]] = gammaResult[0]
@@ -523,8 +557,8 @@
         L{ConnectionPool.stopService} stops all the associated L{ThreadHolder}s
         and thereby frees up the resources it is holding.
         """
-        a = self.pool.connection()
-        alphaResult = resultOf(a.execSQL("alpha"))
+        a = self.createTransaction()
+        alphaResult = self.resultOf(a.execSQL("alpha"))
         [[[counter, echo]]] = alphaResult
         self.assertEquals(len(self.factory.connections), 1)
         self.assertEquals(len(self.holders), 1)
@@ -549,7 +583,7 @@
         self.factory.willFail()
         self.factory.willFail()
         self.factory.willConnect()
-        c = self.pool.connection()
+        c = self.createTransaction()
         def checkOneFailure():
             errors = self.flushLoggedErrors(FakeConnectionError)
             self.assertEquals(len(errors), 1)
@@ -562,6 +596,7 @@
         checkOneFailure()
         self.assertEquals(happened, [])
         self.clock.advance(self.pool.RETRY_TIMEOUT + 0.01)
+        self.flushHolders()
         self.assertEquals(happened, [[[1, "alpha"]]])
 
 
@@ -573,7 +608,7 @@
         as normal.
         """
         self.factory.defaultFail()
-        self.pool.connection()
+        self.createTransaction()
         errors = self.flushLoggedErrors(FakeConnectionError)
         self.assertEquals(len(errors), 1)
         stopd = []
@@ -592,7 +627,7 @@
         connection attempt has finished; in this case, succeeded.
         """
         self.pauseHolders()
-        self.pool.connection()
+        self.createTransaction()
         stopd = []
         self.pool.stopService().addBoth(stopd.append)
         self.assertEquals(stopd, [])
@@ -611,7 +646,7 @@
         """
         self.factory.defaultFail()
         self.pauseHolders()
-        self.pool.connection()
+        self.createTransaction()
         stopd = []
         self.pool.stopService().addBoth(stopd.append)
         self.assertEquals(stopd, [])
@@ -633,13 +668,13 @@
         """
         # TODO: commit() too?
         self.pauseHolders()
-        c = self.pool.connection()
-        abortResult = resultOf(c.abort())
+        c = self.createTransaction()
+        abortResult = self.resultOf(c.abort())
         # Should abort instantly, as it hasn't managed to unspool anything yet.
         # FIXME: kill all Deferreds associated with this thing, make sure that
         # any outstanding query callback chains get nuked.
         self.assertEquals(abortResult, [None])
-        stopResult = resultOf(self.pool.stopService())
+        stopResult = self.resultOf(self.pool.stopService())
         self.assertEquals(stopResult, [])
         self.flushHolders()
         #self.assertEquals(abortResult, [None])
@@ -654,17 +689,17 @@
         """
         # Use up the free slots so we have to spool.
         hold = []
-        hold.append(self.pool.connection())
-        hold.append(self.pool.connection())
+        hold.append(self.createTransaction())
+        hold.append(self.createTransaction())
 
-        c = self.pool.connection()
-        se = resultOf(c.execSQL("alpha"))
-        ce = resultOf(c.commit())
+        c = self.createTransaction()
+        se = self.resultOf(c.execSQL("alpha"))
+        ce = self.resultOf(c.commit())
         self.assertEquals(se, [])
         self.assertEquals(ce, [])
-        self.pool.stopService()
-        self.assertEquals(se[0].type, ConnectionError)
-        self.assertEquals(ce[0].type, ConnectionError)
+        self.resultOf(self.pool.stopService())
+        self.assertEquals(se[0].type, self.translateError(ConnectionError))
+        self.assertEquals(ce[0].type, self.translateError(ConnectionError))
 
 
     def test_repoolSpooled(self):
@@ -675,15 +710,15 @@
         behind any detritus that prevents stopService from working.
         """
         self.pauseHolders()
-        c = self.pool.connection()
-        c2 = self.pool.connection()
-        c3 = self.pool.connection()
+        c = self.createTransaction()
+        c2 = self.createTransaction()
+        c3 = self.createTransaction()
         c.commit()
         c2.commit()
         c3.commit()
         self.flushHolders()
         self.assertEquals(len(self.factory.connections), 2)
-        stopResult = resultOf(self.pool.stopService())
+        stopResult = self.resultOf(self.pool.stopService())
         self.assertEquals(stopResult, [None])
         self.assertEquals(len(self.factory.connections), 2)
         self.assertEquals(self.factory.connections[0].closed, True)
@@ -695,13 +730,14 @@
         Calls to connection() after stopService() result in transactions which
         immediately fail all operations.
         """
-        stopResults = resultOf(self.pool.stopService())
+        stopResults = self.resultOf(self.pool.stopService())
         self.assertEquals(stopResults, [None])
         self.pauseHolders()
-        postClose = self.pool.connection()
-        queryResult = resultOf(postClose.execSQL("hello"))
+        postClose = self.createTransaction()
+        queryResult = self.resultOf(postClose.execSQL("hello"))
         self.assertEquals(len(queryResult), 1)
-        self.assertEquals(queryResult[0].type, ConnectionError)
+        self.assertEquals(queryResult[0].type,
+                          self.translateError(ConnectionError))
 
 
     def test_connectAfterStartedStopping(self):
@@ -711,16 +747,18 @@
         operations.
         """
         self.pauseHolders()
-        preClose = self.pool.connection()
-        preCloseResult = resultOf(preClose.execSQL('statement'))
-        stopResult = resultOf(self.pool.stopService())
-        postClose = self.pool.connection()
-        queryResult = resultOf(postClose.execSQL("hello"))
+        preClose = self.createTransaction()
+        preCloseResult = self.resultOf(preClose.execSQL('statement'))
+        stopResult = self.resultOf(self.pool.stopService())
+        postClose = self.createTransaction()
+        queryResult = self.resultOf(postClose.execSQL("hello"))
         self.assertEquals(stopResult, [])
         self.assertEquals(len(queryResult), 1)
-        self.assertEquals(queryResult[0].type, ConnectionError)
+        self.assertEquals(queryResult[0].type,
+                          self.translateError(ConnectionError))
         self.assertEquals(len(preCloseResult), 1)
-        self.assertEquals(preCloseResult[0].type, ConnectionError)
+        self.assertEquals(preCloseResult[0].type,
+                          self.translateError(ConnectionError))
 
 
     def test_abortFailsDuringStopService(self):
@@ -730,16 +768,16 @@
         happens, shutdown should continue.
         """
         txns = []
-        txns.append(self.pool.connection())
-        txns.append(self.pool.connection())
+        txns.append(self.createTransaction())
+        txns.append(self.createTransaction())
         for txn in txns:
             # Make sure rollback will actually be executed.
-            results = resultOf(txn.execSQL("maybe change something!"))
+            results = self.resultOf(txn.execSQL("maybe change something!"))
             [[[counter, echo]]] = results
             self.assertEquals("maybe change something!", echo)
         # Fail one (and only one) call to rollback().
         self.factory.rollbackFail = True
-        stopResult = resultOf(self.pool.stopService())
+        stopResult = self.resultOf(self.pool.stopService())
         self.assertEquals(stopResult, [None])
         self.assertEquals(len(self.flushLoggedErrors(RollbackFail)), 1)
         self.assertEquals(self.factory.connections[0].closed, True)
@@ -751,11 +789,11 @@
         L{ConnectionPool.stopService} will shut down if a recycled transaction
         is still pending.
         """
-        recycled = self.pool.connection()
-        recycled.commit()
+        recycled = self.createTransaction()
+        self.resultOf(recycled.commit())
         remember = []
-        remember.append(self.pool.connection())
-        self.assertEquals(resultOf(self.pool.stopService()), [None])
+        remember.append(self.createTransaction())
+        self.assertEquals(self.resultOf(self.pool.stopService()), [None])
 
 
     def test_abortSpooled(self):
@@ -766,14 +804,14 @@
         """
         # Use up the available connections ...
         for i in xrange(self.pool.maxConnections):
-            self.pool.connection()
+            self.createTransaction()
         # ... so that this one has to be spooled.
-        spooled = self.pool.connection()
-        result = resultOf(spooled.execSQL("alpha"))
+        spooled = self.createTransaction()
+        result = self.resultOf(spooled.execSQL("alpha"))
         # sanity check, it would be bad if this actually executed.
         self.assertEqual(result, [])
-        spooled.abort()
-        self.assertEqual(result[0].type, ConnectionError)
+        self.resultOf(spooled.abort())
+        self.assertEqual(result[0].type, self.translateError(ConnectionError))
 
 
     def test_waitForAlreadyAbortedTransaction(self):
@@ -781,9 +819,9 @@
         L{ConnectionPool.stopService} will wait for all transactions to shut
         down before exiting, including those which have already been stopped.
         """
-        it = self.pool.connection()
+        it = self.createTransaction()
         self.pauseHolders()
-        abortResult = resultOf(it.abort())
+        abortResult = self.resultOf(it.abort())
 
         # steal it from the queue so we can do it out of order
         d, work = self.holders[0].queue.pop()
@@ -792,12 +830,13 @@
         self.assertEquals(self.holders[0].queue, [])
         self.assertEquals(len(self.holders), 1)
         self.flushHolders()
-        stopResult = resultOf(self.pool.stopService())
+        stopResult = self.resultOf(self.pool.stopService())
         # Sanity check that we haven't actually stopped it yet
         self.assertEquals(abortResult, [])
         # We haven't fired it yet, so the service had better not have stopped...
         self.assertEquals(stopResult, [])
         d.callback(None)
+        self.flushHolders()
         self.assertEquals(abortResult, [None])
         self.assertEquals(stopResult, [None])
 
@@ -807,56 +846,72 @@
         L{ConnectionPool.connection} will not spawn more than the maximum
         connections if there are finishing transactions outstanding.
         """
-        a = self.pool.connection()
-        b = self.pool.connection()
+        a = self.createTransaction()
+        b = self.createTransaction()
         self.pauseHolders()
         a.abort()
         b.abort()
         # Remove the holders for the existing connections, so that the 'extra'
         # connection() call wins the race and gets executed first.
         self.holders[:] = []
-        self.pool.connection()
+        self.createTransaction()
         self.flushHolders()
         self.assertEquals(len(self.factory.connections), 2)
 
 
+    def setParamstyle(self, paramstyle):
+        """
+        Change the paramstyle of the transaction under test.
+        """
+        self.pool.paramstyle = paramstyle
+
+
     def test_propagateParamstyle(self):
         """
-        Each different type of L{IAsyncTransaction} relays the C{paramstyle}
+        Each different type of L{ISQLExecutor} relays the C{paramstyle}
         attribute from the L{ConnectionPool}.
         """
         TEST_PARAMSTYLE = "justtesting"
-        self.pool.paramstyle = TEST_PARAMSTYLE
-        normaltxn = self.pool.connection()
+        self.setParamstyle(TEST_PARAMSTYLE)
+        normaltxn = self.createTransaction()
         self.assertEquals(normaltxn.paramstyle, TEST_PARAMSTYLE)
+        self.assertEquals(normaltxn.commandBlock().paramstyle, TEST_PARAMSTYLE)
         self.pauseHolders()
         extra = []
-        extra.append(self.pool.connection())
-        waitingtxn = self.pool.connection()
+        extra.append(self.createTransaction())
+        waitingtxn = self.createTransaction()
         self.assertEquals(waitingtxn.paramstyle, TEST_PARAMSTYLE)
         self.flushHolders()
         self.pool.stopService()
-        notxn = self.pool.connection()
+        notxn = self.createTransaction()
         self.assertEquals(notxn.paramstyle, TEST_PARAMSTYLE)
 
 
+    def setDialect(self, dialect):
+        """
+        Change the dialect of the transaction under test.
+        """
+        self.pool.dialect = dialect
+
+
     def test_propagateDialect(self):
         """
-        Each different type of L{IAsyncTransaction} relays the C{dialect}
+        Each different type of L{ISQLExecutor} relays the C{dialect}
         attribute from the L{ConnectionPool}.
         """
         TEST_DIALECT = "otherdialect"
-        self.pool.dialect = TEST_DIALECT
-        normaltxn = self.pool.connection()
+        self.setDialect(TEST_DIALECT)
+        normaltxn = self.createTransaction()
         self.assertEquals(normaltxn.dialect, TEST_DIALECT)
+        self.assertEquals(normaltxn.commandBlock().dialect, TEST_DIALECT)
         self.pauseHolders()
         extra = []
-        extra.append(self.pool.connection())
-        waitingtxn = self.pool.connection()
+        extra.append(self.createTransaction())
+        waitingtxn = self.createTransaction()
         self.assertEquals(waitingtxn.dialect, TEST_DIALECT)
         self.flushHolders()
         self.pool.stopService()
-        notxn = self.pool.connection()
+        notxn = self.createTransaction()
         self.assertEquals(notxn.dialect, TEST_DIALECT)
 
 
@@ -877,7 +932,7 @@
         # it's recycling the underlying transaction, or connect() just
         # succeeded.  Either way you just have a _SingleTxn wrapping a
         # _ConnectedTxn.
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         self.assertEquals(len(self.factory.connections), 1,
                           "Sanity check failed.")
         class CustomExecuteFailed(Exception):
@@ -885,7 +940,7 @@
             Custom 'execute-failed' exception.
             """
         self.factory.connections[0].executeWillFail(CustomExecuteFailed)
-        results = resultOf(txn.execSQL("hello, world!"))
+        results = self.resultOf(txn.execSQL("hello, world!"))
         [[[counter, echo]]] = results
         self.assertEquals("hello, world!", echo)
         # Two execution attempts should have been made, one on each connection.
@@ -911,15 +966,15 @@
         then, the database server will shut down and the connections will die,
         but we will be none the wiser until we try to use them.
         """
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         moreFailureSetup(self.factory)
         self.assertEquals(len(self.factory.connections), 1,
                           "Sanity check failed.")
-        results = resultOf(txn.execSQL("hello, world!"))
+        results = self.resultOf(txn.execSQL("hello, world!"))
         txn.commit()
         [[[counter, echo]]] = results
         self.assertEquals("hello, world!", echo)
-        txn2 = self.pool.connection()
+        txn2 = self.createTransaction()
         self.assertEquals(len(self.factory.connections), 1,
                           "Sanity check failed.")
         class CustomExecFail(Exception):
@@ -927,7 +982,7 @@
             Custom 'execute()' failure.
             """
         self.factory.connections[0].executeWillFail(CustomExecFail)
-        results = resultOf(txn2.execSQL("second try!"))
+        results = self.resultOf(txn2.execSQL("second try!"))
         txn2.commit()
         [[[counter, echo]]] = results
         self.assertEquals("second try!", echo)
@@ -970,16 +1025,15 @@
         statement transparently re-executed by the logic tested by
         L{test_reConnectWhenFirstExecFails}.
         """
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         self.factory.commitFail = True
         self.factory.rollbackFail = True
-        [x] = resultOf(txn.commit())
+        [x] = self.resultOf(txn.commit())
 
         # No statements have been executed, so 'commit' will *not* be executed.
         self.assertEquals(self.factory.commitFail, True)
         self.assertIdentical(x, None)
         self.assertEquals(len(self.pool._free), 1)
-        self.assertIn(txn._baseTxn, self.pool._free)
         self.assertEquals(self.pool._finishing, [])
         self.assertEquals(len(self.factory.connections), 1)
         self.assertEquals(self.factory.connections[0].closed, False)
@@ -1001,18 +1055,18 @@
         relaying the exception back to application code but attempting a
         re-connection on the next try.
         """
-        txn = self.pool.connection()
-        [[[counter, echo]]] = resultOf(txn.execSQL("hello, world!", []))
+        txn = self.createTransaction()
+        [[[counter, echo]]] = self.resultOf(txn.execSQL("hello, world!", []))
         self.factory.connections[0].executeWillFail(ZeroDivisionError)
-        [f] = resultOf(txn.execSQL("divide by zero", []))
-        f.trap(ZeroDivisionError)
+        [f] = self.resultOf(txn.execSQL("divide by zero", []))
+        f.trap(self.translateError(ZeroDivisionError))
         self.assertEquals(self.factory.connections[0].executions, 2)
         # Reconnection should work exactly as before.
         self.assertEquals(self.factory.connections[0].closed, False)
         # Application code has to roll back its transaction at this point, since
         # it failed (and we don't necessarily know why it failed: not enough
         # information).
-        txn.abort()
+        self.resultOf(txn.abort())
         self.factory.connections[0].executions = 0 # re-set for next test
         self.assertEquals(len(self.factory.connections), 1)
         self.test_reConnectWhenFirstExecFails()
@@ -1028,17 +1082,16 @@
         Also, a new connection will immediately be established to keep the pool
         size the same.
         """
-        txn = self.pool.connection()
-        results = resultOf(txn.execSQL("maybe change something!"))
+        txn = self.createTransaction()
+        results = self.resultOf(txn.execSQL("maybe change something!"))
         [[[counter, echo]]] = results
         self.assertEquals("maybe change something!", echo)
         self.factory.rollbackFail = True
-        [x] = resultOf(txn.abort())
+        [x] = self.resultOf(txn.abort())
         # Abort does not propagate the error on, the transaction merely gets
         # disposed of.
         self.assertIdentical(x, None)
         self.assertEquals(len(self.pool._free), 1)
-        self.assertNotIn(txn._baseTxn, self.pool._free)
         self.assertEquals(self.pool._finishing, [])
         self.assertEquals(len(self.factory.connections), 2)
         self.assertEquals(self.factory.connections[0].closed, True)
@@ -1053,15 +1106,14 @@
         C{commit} has to be relayed to client code, since that actually means
         some changes didn't hit the database.
         """
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         self.factory.commitFail = True
-        results = resultOf(txn.execSQL("maybe change something!"))
+        results = self.resultOf(txn.execSQL("maybe change something!"))
         [[[counter, echo]]] = results
         self.assertEquals("maybe change something!", echo)
-        [x] = resultOf(txn.commit())
-        x.trap(CommitFail)
+        [x] = self.resultOf(txn.commit())
+        x.trap(self.translateError(CommitFail))
         self.assertEquals(len(self.pool._free), 1)
-        self.assertNotIn(txn._baseTxn, self.pool._free)
         self.assertEquals(self.pool._finishing, [])
         self.assertEquals(len(self.factory.connections), 2)
         self.assertEquals(self.factory.connections[0].closed, True)
@@ -1073,14 +1125,15 @@
         L{IAsyncTransaction.commandBlock} returns an L{IAsyncTransaction}
         provider which ensures that a block of commands are executed together.
         """
-        txn = self.pool.connection()
-        a = resultOf(txn.execSQL("a"))
+        txn = self.createTransaction()
+        a = self.resultOf(txn.execSQL("a"))
         cb = txn.commandBlock()
-        b = resultOf(cb.execSQL("b"))
-        d = resultOf(txn.execSQL("d"))
-        c = resultOf(cb.execSQL("c"))
+        verifyObject(ICommandBlock, cb)
+        b = self.resultOf(cb.execSQL("b"))
+        d = self.resultOf(txn.execSQL("d"))
+        c = self.resultOf(cb.execSQL("c"))
         cb.end()
-        e = resultOf(txn.execSQL("e"))
+        e = self.resultOf(txn.execSQL("e"))
         self.assertEquals(self.factory.connections[0].cursors[0].allExecutions,
                           [("a", []), ("b", []), ("c", []), ("d", []),
                            ("e", [])])
@@ -1097,13 +1150,13 @@
         executing until all SQL statements scheduled before it have completed.
         """
         self.pauseHolders()
-        txn = self.pool.connection()
-        a = resultOf(txn.execSQL("a"))
-        b = resultOf(txn.execSQL("b"))
+        txn = self.createTransaction()
+        a = self.resultOf(txn.execSQL("a"))
+        b = self.resultOf(txn.execSQL("b"))
         cb = txn.commandBlock()
-        c = resultOf(cb.execSQL("c"))
-        d = resultOf(cb.execSQL("d"))
-        e = resultOf(txn.execSQL("e"))
+        c = self.resultOf(cb.execSQL("c"))
+        d = self.resultOf(cb.execSQL("d"))
+        e = self.resultOf(txn.execSQL("e"))
         cb.end()
         self.flushHolders()
 
@@ -1123,7 +1176,7 @@
         When execution of one command block is complete, it will proceed to the
         next queued block, then to regular SQL executed on the transaction.
         """
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         cb1 = txn.commandBlock()
         cb2 = txn.commandBlock()
         txn.execSQL("e")
@@ -1134,6 +1187,7 @@
         cb2.end()
         cb1.end()
         flush()
+        self.flushHolders()
         self.assertEquals(self.factory.connections[0].cursors[0].allExecutions,
                           [("a", []), ("b", []), ("c", []), ("d", []),
                            ("e", [])])
@@ -1152,7 +1206,7 @@
         L{CommandBlock.end} will raise L{AlreadyFinishedError} when called more
         than once.
         """
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         block = txn.commandBlock()
         block.end()
         self.assertRaises(AlreadyFinishedError, block.end)
@@ -1165,14 +1219,15 @@
         when you call {IAsyncTransaction.commit}(), it should not actually take
         effect if there are any pending command blocks.
         """
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         block = txn.commandBlock()
-        commitResult = resultOf(txn.commit())
-        block.execSQL("in block")
+        commitResult = self.resultOf(txn.commit())
+        self.resultOf(block.execSQL("in block"))
         self.assertEquals(commitResult, [])
         self.assertEquals(self.factory.connections[0].cursors[0].allExecutions,
                           [("in block", [])])
         block.end()
+        self.flushHolders()
         self.assertEquals(commitResult, [None])
 
 
@@ -1183,10 +1238,10 @@
         all outstanding C{execSQL}s will fail immediately, on both command
         blocks and on the transaction itself.
         """
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         block = txn.commandBlock()
         block2 = txn.commandBlock()
-        abortResult = resultOf(txn.abort())
+        abortResult = self.resultOf(txn.abort())
         self.assertEquals(abortResult, [None])
         self.assertRaises(AlreadyFinishedError, block2.execSQL, "bar")
         self.assertRaises(AlreadyFinishedError, block.execSQL, "foo")
@@ -1206,7 +1261,7 @@
         Attempting to execute SQL on a L{CommandBlock} which has had C{end}
         called on it will result in an L{AlreadyFinishedError}.
         """
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         block = txn.commandBlock()
         block.end()
         self.assertRaises(AlreadyFinishedError, block.execSQL, "hello")
@@ -1219,7 +1274,7 @@
         Once an L{IAsyncTransaction} has been committed, L{commandBlock} raises
         an exception.
         """
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         txn.commit()
         self.assertRaises(AlreadyFinishedError, txn.commandBlock)
 
@@ -1229,9 +1284,154 @@
         Once an L{IAsyncTransaction} has been committed, L{commandBlock} raises
         an exception.
         """
-        txn = self.pool.connection()
-        txn.abort()
+        txn = self.createTransaction()
+        self.resultOf(txn.abort())
         self.assertRaises(AlreadyFinishedError, txn.commandBlock)
 
 
 
+class IOPump(object):
+    """
+    Connect a client and a server.
+
+    @ivar client: a client protocol
+
+    @ivar server: a server protocol
+    """
+
+    def __init__(self, client, server):
+        self.client = client
+        self.server = server
+        self.clientTransport = StringTransport()
+        self.serverTransport = StringTransport()
+        self.client.makeConnection(self.clientTransport)
+        self.server.makeConnection(self.serverTransport)
+        self.c2s = [self.clientTransport, self.server]
+        self.s2c = [self.serverTransport, self.client]
+
+
+    def moveData(self, (outTransport, inProtocol)):
+        """
+        Move data from a L{StringTransport} to an L{IProtocol}.
+
+        @return: C{True} if any data was moved, C{False} if no data was moved.
+        """
+        data = outTransport.io.getvalue()
+        outTransport.io.seek(0)
+        outTransport.io.truncate()
+        if data:
+            inProtocol.dataReceived(data)
+            return True
+        else:
+            return False
+
+
+    def pump(self):
+        """
+        Deliver all input from the client to the server, then from the server to
+        the client.
+        """
+        a = self.moveData(self.c2s)
+        b = self.moveData(self.s2c)
+        return a or b
+
+
+    def flush(self, maxTurns=100):
+        """
+        Continue pumping until no more data is flowing.
+        """
+        turns = 0
+        while self.pump():
+            turns += 1
+            if turns > maxTurns:
+                raise RuntimeError("Ran too long!")
+
+
+
+class NetworkedPoolHelper(ConnectionPoolHelper):
+    """
+    An extension of L{ConnectionPoolHelper} that can set up a
+    L{ConnectionPoolClient} and L{ConnectionPoolConnection} attached to each
+    other.
+    """
+
+    def setUp(self):
+        """
+        Do the same setup from L{ConnectionPoolBase}, but also establish a
+        loopback connection between a L{ConnectionPoolConnection} and a
+        L{ConnectionPoolClient}.
+        """
+        super(NetworkedPoolHelper, self).setUp()
+        self.pump = IOPump(ConnectionPoolClient(dialect=self.dialect,
+                                                paramstyle=self.paramstyle),
+                           ConnectionPoolConnection(self.pool))
+
+
+    def flushHolders(self):
+        """
+        In addition to flushing the L{ThreadHolder} stubs, also flush any
+        pending network I/O.
+        """
+        self.pump.flush()
+        super(NetworkedPoolHelper, self).flushHolders()
+        self.pump.flush()
+
+
+    def createTransaction(self):
+        txn = self.pump.client.newTransaction()
+        self.pump.flush()
+        return txn
+
+
+    def translateError(self, err):
+        """
+        All errors raised locally will unfortunately be translated into
+        UnknownRemoteError, since AMP requires specific enumeration of all of
+        them.  Flush the locally logged error of the given type and return
+        L{UnknownRemoteError}.
+        """
+        self.flushLoggedErrors(err)
+        return FailsafeException
+
+
+    def resultOf(self, it):
+        result = resultOf(it)
+        self.pump.flush()
+        return result
+
+
+
+class NetworkedConnectionPoolTests(NetworkedPoolHelper, ConnectionPoolTests):
+    """
+    Tests for L{ConnectionPoolConnection} and L{ConnectionPoolClient}
+    interacting with each other.
+    """
+
+
+    def setParamstyle(self, paramstyle):
+        """
+        Change the paramstyle on both the pool and the client.
+        """
+        super(NetworkedConnectionPoolTests, self).setParamstyle(paramstyle)
+        self.pump.client.paramstyle = paramstyle
+
+
+    def setDialect(self, dialect):
+        """
+        Change the dialect on both the pool and the client.
+        """
+        super(NetworkedConnectionPoolTests, self).setDialect(dialect)
+        self.pump.client.dialect = dialect
+
+
+    def test_newTransaction(self):
+        """
+        L{ConnectionPoolClient.newTransaction} returns a provider of
+        L{IAsyncTransaction}, and creates a new transaction on the server side.
+        """
+        txn = self.pump.client.newTransaction()
+        verifyObject(IAsyncTransaction, txn)
+        self.pump.flush()
+        self.assertEquals(len(self.factory.connections), 1)
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111010/6be0a045/attachment-0001.html>


More information about the calendarserver-changes mailing list