[CalendarServer-changes] [8160] CalendarServer/branches/users/glyph/shared-pool-take2/twext/ enterprise

source_changes at macosforge.org source_changes at macosforge.org
Mon Oct 10 08:02:51 PDT 2011


Revision: 8160
          http://trac.macosforge.org/projects/calendarserver/changeset/8160
Author:   glyph at apple.com
Date:     2011-10-10 08:02:51 -0700 (Mon, 10 Oct 2011)
Log Message:
-----------
retrofit existing tests so that all existing functionality is covered in the ConnectionPoolClient

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/adbapi2.py
    CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/test/test_adbapi2.py

Modified: CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/adbapi2.py	2011-10-10 15:02:40 UTC (rev 8159)
+++ CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/adbapi2.py	2011-10-10 15:02:51 UTC (rev 8160)
@@ -1154,7 +1154,9 @@
         """
         Successfully complete the given transaction.
         """
-        return self._complete(transactionID, lambda x: x.commit())
+        def commitme(x):
+            return x.commit()
+        return self._complete(transactionID, commitme)
 
 
     @Abort.responder
@@ -1162,7 +1164,9 @@
         """
         Roll back the given transaction.
         """
-        return self._complete(transactionID, lambda x: x.abort())
+        def abortme(x):
+            return x.abort()
+        return self._complete(transactionID, abortme)
 
 
 
@@ -1170,6 +1174,11 @@
     """
     A client which can execute SQL.
     """
+
+    # See DEFAULT_PARAM_STYLE FIXME above.
+    paramstyle = DEFAULT_PARAM_STYLE
+    dialect = POSTGRES_DIALECT
+
     def __init__(self):
         super(ConnectionPoolClient, self).__init__()
         self._nextID  = count().next
@@ -1243,9 +1252,6 @@
 
     implements(IAsyncTransaction)
 
-    # See DEFAULT_PARAM_STYLE FIXME above.
-    paramstyle = DEFAULT_PARAM_STYLE
-
     def __init__(self, client, transactionID):
         """
         Initialize a transaction with a L{ConnectionPoolClient} and a unique
@@ -1256,14 +1262,35 @@
         self._completed     = False
 
 
+    @property
+    def paramstyle(self):
+        """
+        Forward 'paramstyle' attribute to the client.
+        """
+        return self._client.paramstyle
+
+
+    @property
+    def dialect(self):
+        """
+        Forward 'dialect' attribute to the client.
+        """
+        return self._client.dialect
+
+
     def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
         if args is None:
             args = []
         queryID = str(self._client._nextID())
         query = self._client._queries[queryID] = _Query(raiseOnZeroRowCount)
-        self._client.callRemote(ExecSQL, queryID=queryID, sql=sql, args=args,
-                                transactionID=self._transactionID)
-        return query.deferred
+        result = (
+            self._client.callRemote(
+                ExecSQL, queryID=queryID, sql=sql, args=args,
+                transactionID=self._transactionID
+            )
+            .addCallback(lambda nothing: query.deferred)
+        )
+        return result
 
 
     def _complete(self, command):
@@ -1272,7 +1299,7 @@
         self._completed = True
         return self._client.callRemote(
             command, transactionID=self._transactionID
-            ).addCallback(lambda x: None)
+        ).addCallback(lambda x: None)
 
 
     def commit(self):
@@ -1283,3 +1310,8 @@
         return self._complete(Abort)
 
 
+    def commandBlock(self):
+        raise NotImplementedError()
+
+
+

Modified: CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/test/test_adbapi2.py	2011-10-10 15:02:40 UTC (rev 8159)
+++ CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/test/test_adbapi2.py	2011-10-10 15:02:51 UTC (rev 8160)
@@ -20,19 +20,28 @@
 
 from itertools import count
 
-from zope.interface.verify import verifyClass
+from zope.interface.verify import verifyClass, verifyObject
+from zope.interface.declarations import implements
 
 from twisted.python.threadpool import ThreadPool
+
+from twisted.protocols.amp import UnknownRemoteError
+
 from twisted.trial.unittest import TestCase
 
 from twisted.internet.defer import execute
 from twisted.internet.task import Clock
 
+from twisted.internet.interfaces import IReactorThreads
 from twisted.internet.defer import Deferred
+
+from twisted.test.proto_helpers import StringTransport
+
 from twext.enterprise.ienterprise import ConnectionError
 from twext.enterprise.ienterprise import AlreadyFinishedError
-from twisted.internet.interfaces import IReactorThreads
-from zope.interface.declarations import implements
+from twext.enterprise.adbapi2 import ConnectionPoolClient
+from twext.enterprise.adbapi2 import ConnectionPoolConnection
+from twext.enterprise.ienterprise import IAsyncTransaction
 from twext.enterprise.adbapi2 import ConnectionPool
 
 
@@ -421,9 +430,9 @@
 
 
 
-class ConnectionPoolTests(TestCase):
+class ConnectionPoolBase(TestCase):
     """
-    Tests for L{ConnectionPool}.
+    Common functionality for testing L{ConnectionPool}.
     """
 
     def setUp(self):
@@ -439,20 +448,15 @@
         self.pool._createHolder = self.makeAHolder
         self.clock              = self.pool.reactor = ClockWithThreads()
         self.pool.startService()
+        self.addCleanup(self.flushHolders)
 
 
-    def tearDown(self):
+    def flushHolders(self):
         """
-        Make sure the service is stopped and the fake ThreadHolders are all
+        Flush all pending C{submit}s since C{pauseHolders} was called.  This
+        makes sure the service is stopped and the fake ThreadHolders are all
         executing their queues so failed tests can exit cleanly.
         """
-        self.flushHolders()
-
-
-    def flushHolders(self):
-        """
-        Flush all pending C{submit}s since C{pauseHolders} was called.
-        """
         self.paused = False
         for holder in self.holders:
             holder.flush()
@@ -475,6 +479,24 @@
         return fth
 
 
+    def resultOf(self, it):
+        return resultOf(it)
+
+
+    def createTransaction(self):
+        return self.pool.connection()
+
+
+    def translateError(self, err):
+        return err
+
+
+
+class ConnectionPoolTests(ConnectionPoolBase):
+    """
+    Tests for L{ConnectionPool}.
+    """
+
     def test_tooManyConnections(self):
         """
         When the number of outstanding busy transactions exceeds the number of
@@ -483,29 +505,29 @@
         not backed by any real database connection; this object will queue its
         SQL statements until an existing connection becomes available.
         """
-        a = self.pool.connection()
+        a = self.createTransaction()
 
-        alphaResult = resultOf(a.execSQL("alpha"))
+        alphaResult = self.resultOf(a.execSQL("alpha"))
         [[counter, echo]] = alphaResult[0]
 
-        b = self.pool.connection()
+        b = self.createTransaction()
         # 'b' should have opened a connection.
         self.assertEquals(len(self.factory.connections), 2)
-        betaResult = resultOf(b.execSQL("beta"))
+        betaResult = self.resultOf(b.execSQL("beta"))
         [[bcounter, becho]] = betaResult[0]
 
         # both 'a' and 'b' are holding open a connection now; let's try to open
         # a third one.  (The ordering will be deterministic even if this fails,
         # because those threads are already busy.)
-        c = self.pool.connection()
-        gammaResult = resultOf(c.execSQL("gamma"))
+        c = self.createTransaction()
+        gammaResult = self.resultOf(c.execSQL("gamma"))
 
         # Did 'c' open a connection?  Let's hope not...
         self.assertEquals(len(self.factory.connections), 2)
         # SQL shouldn't be executed too soon...
         self.assertEquals(gammaResult, [])
 
-        commitResult = resultOf(b.commit())
+        commitResult = self.resultOf(b.commit())
 
         # Now that 'b' has committed, 'c' should be able to complete.
         [[ccounter, cecho]] = gammaResult[0]
@@ -523,8 +545,8 @@
         L{ConnectionPool.stopService} stops all the associated L{ThreadHolder}s
         and thereby frees up the resources it is holding.
         """
-        a = self.pool.connection()
-        alphaResult = resultOf(a.execSQL("alpha"))
+        a = self.createTransaction()
+        alphaResult = self.resultOf(a.execSQL("alpha"))
         [[[counter, echo]]] = alphaResult
         self.assertEquals(len(self.factory.connections), 1)
         self.assertEquals(len(self.holders), 1)
@@ -549,7 +571,7 @@
         self.factory.willFail()
         self.factory.willFail()
         self.factory.willConnect()
-        c = self.pool.connection()
+        c = self.createTransaction()
         def checkOneFailure():
             errors = self.flushLoggedErrors(FakeConnectionError)
             self.assertEquals(len(errors), 1)
@@ -562,6 +584,7 @@
         checkOneFailure()
         self.assertEquals(happened, [])
         self.clock.advance(self.pool.RETRY_TIMEOUT + 0.01)
+        self.flushHolders()
         self.assertEquals(happened, [[[1, "alpha"]]])
 
 
@@ -573,7 +596,7 @@
         as normal.
         """
         self.factory.defaultFail()
-        self.pool.connection()
+        self.createTransaction()
         errors = self.flushLoggedErrors(FakeConnectionError)
         self.assertEquals(len(errors), 1)
         stopd = []
@@ -592,7 +615,7 @@
         connection attempt has finished; in this case, succeeded.
         """
         self.pauseHolders()
-        self.pool.connection()
+        self.createTransaction()
         stopd = []
         self.pool.stopService().addBoth(stopd.append)
         self.assertEquals(stopd, [])
@@ -611,7 +634,7 @@
         """
         self.factory.defaultFail()
         self.pauseHolders()
-        self.pool.connection()
+        self.createTransaction()
         stopd = []
         self.pool.stopService().addBoth(stopd.append)
         self.assertEquals(stopd, [])
@@ -633,13 +656,13 @@
         """
         # TODO: commit() too?
         self.pauseHolders()
-        c = self.pool.connection()
-        abortResult = resultOf(c.abort())
+        c = self.createTransaction()
+        abortResult = self.resultOf(c.abort())
         # Should abort instantly, as it hasn't managed to unspool anything yet.
         # FIXME: kill all Deferreds associated with this thing, make sure that
         # any outstanding query callback chains get nuked.
         self.assertEquals(abortResult, [None])
-        stopResult = resultOf(self.pool.stopService())
+        stopResult = self.resultOf(self.pool.stopService())
         self.assertEquals(stopResult, [])
         self.flushHolders()
         #self.assertEquals(abortResult, [None])
@@ -654,17 +677,17 @@
         """
         # Use up the free slots so we have to spool.
         hold = []
-        hold.append(self.pool.connection())
-        hold.append(self.pool.connection())
+        hold.append(self.createTransaction())
+        hold.append(self.createTransaction())
 
-        c = self.pool.connection()
-        se = resultOf(c.execSQL("alpha"))
-        ce = resultOf(c.commit())
+        c = self.createTransaction()
+        se = self.resultOf(c.execSQL("alpha"))
+        ce = self.resultOf(c.commit())
         self.assertEquals(se, [])
         self.assertEquals(ce, [])
-        self.pool.stopService()
-        self.assertEquals(se[0].type, ConnectionError)
-        self.assertEquals(ce[0].type, ConnectionError)
+        self.resultOf(self.pool.stopService())
+        self.assertEquals(se[0].type, self.translateError(ConnectionError))
+        self.assertEquals(ce[0].type, self.translateError(ConnectionError))
 
 
     def test_repoolSpooled(self):
@@ -675,15 +698,15 @@
         behind any detritus that prevents stopService from working.
         """
         self.pauseHolders()
-        c = self.pool.connection()
-        c2 = self.pool.connection()
-        c3 = self.pool.connection()
+        c = self.createTransaction()
+        c2 = self.createTransaction()
+        c3 = self.createTransaction()
         c.commit()
         c2.commit()
         c3.commit()
         self.flushHolders()
         self.assertEquals(len(self.factory.connections), 2)
-        stopResult = resultOf(self.pool.stopService())
+        stopResult = self.resultOf(self.pool.stopService())
         self.assertEquals(stopResult, [None])
         self.assertEquals(len(self.factory.connections), 2)
         self.assertEquals(self.factory.connections[0].closed, True)
@@ -695,13 +718,14 @@
         Calls to connection() after stopService() result in transactions which
         immediately fail all operations.
         """
-        stopResults = resultOf(self.pool.stopService())
+        stopResults = self.resultOf(self.pool.stopService())
         self.assertEquals(stopResults, [None])
         self.pauseHolders()
-        postClose = self.pool.connection()
-        queryResult = resultOf(postClose.execSQL("hello"))
+        postClose = self.createTransaction()
+        queryResult = self.resultOf(postClose.execSQL("hello"))
         self.assertEquals(len(queryResult), 1)
-        self.assertEquals(queryResult[0].type, ConnectionError)
+        self.assertEquals(queryResult[0].type,
+                          self.translateError(ConnectionError))
 
 
     def test_connectAfterStartedStopping(self):
@@ -711,16 +735,18 @@
         operations.
         """
         self.pauseHolders()
-        preClose = self.pool.connection()
-        preCloseResult = resultOf(preClose.execSQL('statement'))
-        stopResult = resultOf(self.pool.stopService())
-        postClose = self.pool.connection()
-        queryResult = resultOf(postClose.execSQL("hello"))
+        preClose = self.createTransaction()
+        preCloseResult = self.resultOf(preClose.execSQL('statement'))
+        stopResult = self.resultOf(self.pool.stopService())
+        postClose = self.createTransaction()
+        queryResult = self.resultOf(postClose.execSQL("hello"))
         self.assertEquals(stopResult, [])
         self.assertEquals(len(queryResult), 1)
-        self.assertEquals(queryResult[0].type, ConnectionError)
+        self.assertEquals(queryResult[0].type,
+                          self.translateError(ConnectionError))
         self.assertEquals(len(preCloseResult), 1)
-        self.assertEquals(preCloseResult[0].type, ConnectionError)
+        self.assertEquals(preCloseResult[0].type,
+                          self.translateError(ConnectionError))
 
 
     def test_abortFailsDuringStopService(self):
@@ -730,16 +756,16 @@
         happens, shutdown should continue.
         """
         txns = []
-        txns.append(self.pool.connection())
-        txns.append(self.pool.connection())
+        txns.append(self.createTransaction())
+        txns.append(self.createTransaction())
         for txn in txns:
             # Make sure rollback will actually be executed.
-            results = resultOf(txn.execSQL("maybe change something!"))
+            results = self.resultOf(txn.execSQL("maybe change something!"))
             [[[counter, echo]]] = results
             self.assertEquals("maybe change something!", echo)
         # Fail one (and only one) call to rollback().
         self.factory.rollbackFail = True
-        stopResult = resultOf(self.pool.stopService())
+        stopResult = self.resultOf(self.pool.stopService())
         self.assertEquals(stopResult, [None])
         self.assertEquals(len(self.flushLoggedErrors(RollbackFail)), 1)
         self.assertEquals(self.factory.connections[0].closed, True)
@@ -751,11 +777,11 @@
         L{ConnectionPool.stopService} will shut down if a recycled transaction
         is still pending.
         """
-        recycled = self.pool.connection()
-        recycled.commit()
+        recycled = self.createTransaction()
+        self.resultOf(recycled.commit())
         remember = []
-        remember.append(self.pool.connection())
-        self.assertEquals(resultOf(self.pool.stopService()), [None])
+        remember.append(self.createTransaction())
+        self.assertEquals(self.resultOf(self.pool.stopService()), [None])
 
 
     def test_abortSpooled(self):
@@ -766,14 +792,14 @@
         """
         # Use up the available connections ...
         for i in xrange(self.pool.maxConnections):
-            self.pool.connection()
+            self.createTransaction()
         # ... so that this one has to be spooled.
-        spooled = self.pool.connection()
-        result = resultOf(spooled.execSQL("alpha"))
+        spooled = self.createTransaction()
+        result = self.resultOf(spooled.execSQL("alpha"))
         # sanity check, it would be bad if this actually executed.
         self.assertEqual(result, [])
-        spooled.abort()
-        self.assertEqual(result[0].type, ConnectionError)
+        self.resultOf(spooled.abort())
+        self.assertEqual(result[0].type, self.translateError(ConnectionError))
 
 
     def test_waitForAlreadyAbortedTransaction(self):
@@ -781,9 +807,9 @@
         L{ConnectionPool.stopService} will wait for all transactions to shut
         down before exiting, including those which have already been stopped.
         """
-        it = self.pool.connection()
+        it = self.createTransaction()
         self.pauseHolders()
-        abortResult = resultOf(it.abort())
+        abortResult = self.resultOf(it.abort())
 
         # steal it from the queue so we can do it out of order
         d, work = self.holders[0].queue.pop()
@@ -792,12 +818,13 @@
         self.assertEquals(self.holders[0].queue, [])
         self.assertEquals(len(self.holders), 1)
         self.flushHolders()
-        stopResult = resultOf(self.pool.stopService())
+        stopResult = self.resultOf(self.pool.stopService())
         # Sanity check that we haven't actually stopped it yet
         self.assertEquals(abortResult, [])
         # We haven't fired it yet, so the service had better not have stopped...
         self.assertEquals(stopResult, [])
         d.callback(None)
+        self.flushHolders()
         self.assertEquals(abortResult, [None])
         self.assertEquals(stopResult, [None])
 
@@ -807,15 +834,15 @@
         L{ConnectionPool.connection} will not spawn more than the maximum
         connections if there are finishing transactions outstanding.
         """
-        a = self.pool.connection()
-        b = self.pool.connection()
+        a = self.createTransaction()
+        b = self.createTransaction()
         self.pauseHolders()
         a.abort()
         b.abort()
         # Remove the holders for the existing connections, so that the 'extra'
         # connection() call wins the race and gets executed first.
         self.holders[:] = []
-        self.pool.connection()
+        self.createTransaction()
         self.flushHolders()
         self.assertEquals(len(self.factory.connections), 2)
 
@@ -827,16 +854,16 @@
         """
         TEST_PARAMSTYLE = "justtesting"
         self.pool.paramstyle = TEST_PARAMSTYLE
-        normaltxn = self.pool.connection()
+        normaltxn = self.createTransaction()
         self.assertEquals(normaltxn.paramstyle, TEST_PARAMSTYLE)
         self.pauseHolders()
         extra = []
-        extra.append(self.pool.connection())
-        waitingtxn = self.pool.connection()
+        extra.append(self.createTransaction())
+        waitingtxn = self.createTransaction()
         self.assertEquals(waitingtxn.paramstyle, TEST_PARAMSTYLE)
         self.flushHolders()
         self.pool.stopService()
-        notxn = self.pool.connection()
+        notxn = self.createTransaction()
         self.assertEquals(notxn.paramstyle, TEST_PARAMSTYLE)
 
 
@@ -847,16 +874,16 @@
         """
         TEST_DIALECT = "otherdialect"
         self.pool.dialect = TEST_DIALECT
-        normaltxn = self.pool.connection()
+        normaltxn = self.createTransaction()
         self.assertEquals(normaltxn.dialect, TEST_DIALECT)
         self.pauseHolders()
         extra = []
-        extra.append(self.pool.connection())
-        waitingtxn = self.pool.connection()
+        extra.append(self.createTransaction())
+        waitingtxn = self.createTransaction()
         self.assertEquals(waitingtxn.dialect, TEST_DIALECT)
         self.flushHolders()
         self.pool.stopService()
-        notxn = self.pool.connection()
+        notxn = self.createTransaction()
         self.assertEquals(notxn.dialect, TEST_DIALECT)
 
 
@@ -877,7 +904,7 @@
         # it's recycling the underlying transaction, or connect() just
         # succeeded.  Either way you just have a _SingleTxn wrapping a
         # _ConnectedTxn.
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         self.assertEquals(len(self.factory.connections), 1,
                           "Sanity check failed.")
         class CustomExecuteFailed(Exception):
@@ -885,7 +912,7 @@
             Custom 'execute-failed' exception.
             """
         self.factory.connections[0].executeWillFail(CustomExecuteFailed)
-        results = resultOf(txn.execSQL("hello, world!"))
+        results = self.resultOf(txn.execSQL("hello, world!"))
         [[[counter, echo]]] = results
         self.assertEquals("hello, world!", echo)
         # Two execution attempts should have been made, one on each connection.
@@ -911,15 +938,15 @@
         then, the database server will shut down and the connections will die,
         but we will be none the wiser until we try to use them.
         """
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         moreFailureSetup(self.factory)
         self.assertEquals(len(self.factory.connections), 1,
                           "Sanity check failed.")
-        results = resultOf(txn.execSQL("hello, world!"))
+        results = self.resultOf(txn.execSQL("hello, world!"))
         txn.commit()
         [[[counter, echo]]] = results
         self.assertEquals("hello, world!", echo)
-        txn2 = self.pool.connection()
+        txn2 = self.createTransaction()
         self.assertEquals(len(self.factory.connections), 1,
                           "Sanity check failed.")
         class CustomExecFail(Exception):
@@ -927,7 +954,7 @@
             Custom 'execute()' failure.
             """
         self.factory.connections[0].executeWillFail(CustomExecFail)
-        results = resultOf(txn2.execSQL("second try!"))
+        results = self.resultOf(txn2.execSQL("second try!"))
         txn2.commit()
         [[[counter, echo]]] = results
         self.assertEquals("second try!", echo)
@@ -970,16 +997,15 @@
         statement transparently re-executed by the logic tested by
         L{test_reConnectWhenFirstExecFails}.
         """
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         self.factory.commitFail = True
         self.factory.rollbackFail = True
-        [x] = resultOf(txn.commit())
+        [x] = self.resultOf(txn.commit())
 
         # No statements have been executed, so 'commit' will *not* be executed.
         self.assertEquals(self.factory.commitFail, True)
         self.assertIdentical(x, None)
         self.assertEquals(len(self.pool._free), 1)
-        self.assertIn(txn._baseTxn, self.pool._free)
         self.assertEquals(self.pool._finishing, [])
         self.assertEquals(len(self.factory.connections), 1)
         self.assertEquals(self.factory.connections[0].closed, False)
@@ -1001,18 +1027,18 @@
         relaying the exception back to application code but attempting a
         re-connection on the next try.
         """
-        txn = self.pool.connection()
-        [[[counter, echo]]] = resultOf(txn.execSQL("hello, world!", []))
+        txn = self.createTransaction()
+        [[[counter, echo]]] = self.resultOf(txn.execSQL("hello, world!", []))
         self.factory.connections[0].executeWillFail(ZeroDivisionError)
-        [f] = resultOf(txn.execSQL("divide by zero", []))
-        f.trap(ZeroDivisionError)
+        [f] = self.resultOf(txn.execSQL("divide by zero", []))
+        f.trap(self.translateError(ZeroDivisionError))
         self.assertEquals(self.factory.connections[0].executions, 2)
         # Reconnection should work exactly as before.
         self.assertEquals(self.factory.connections[0].closed, False)
         # Application code has to roll back its transaction at this point, since
         # it failed (and we don't necessarily know why it failed: not enough
         # information).
-        txn.abort()
+        self.resultOf(txn.abort())
         self.factory.connections[0].executions = 0 # re-set for next test
         self.assertEquals(len(self.factory.connections), 1)
         self.test_reConnectWhenFirstExecFails()
@@ -1028,17 +1054,16 @@
         Also, a new connection will immediately be established to keep the pool
         size the same.
         """
-        txn = self.pool.connection()
-        results = resultOf(txn.execSQL("maybe change something!"))
+        txn = self.createTransaction()
+        results = self.resultOf(txn.execSQL("maybe change something!"))
         [[[counter, echo]]] = results
         self.assertEquals("maybe change something!", echo)
         self.factory.rollbackFail = True
-        [x] = resultOf(txn.abort())
+        [x] = self.resultOf(txn.abort())
         # Abort does not propagate the error on, the transaction merely gets
         # disposed of.
         self.assertIdentical(x, None)
         self.assertEquals(len(self.pool._free), 1)
-        self.assertNotIn(txn._baseTxn, self.pool._free)
         self.assertEquals(self.pool._finishing, [])
         self.assertEquals(len(self.factory.connections), 2)
         self.assertEquals(self.factory.connections[0].closed, True)
@@ -1053,15 +1078,14 @@
         C{commit} has to be relayed to client code, since that actually means
         some changes didn't hit the database.
         """
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         self.factory.commitFail = True
-        results = resultOf(txn.execSQL("maybe change something!"))
+        results = self.resultOf(txn.execSQL("maybe change something!"))
         [[[counter, echo]]] = results
         self.assertEquals("maybe change something!", echo)
-        [x] = resultOf(txn.commit())
-        x.trap(CommitFail)
+        [x] = self.resultOf(txn.commit())
+        x.trap(self.translateError(CommitFail))
         self.assertEquals(len(self.pool._free), 1)
-        self.assertNotIn(txn._baseTxn, self.pool._free)
         self.assertEquals(self.pool._finishing, [])
         self.assertEquals(len(self.factory.connections), 2)
         self.assertEquals(self.factory.connections[0].closed, True)
@@ -1073,14 +1097,14 @@
         L{IAsyncTransaction.commandBlock} returns an L{IAsyncTransaction}
         provider which ensures that a block of commands are executed together.
         """
-        txn = self.pool.connection()
-        a = resultOf(txn.execSQL("a"))
+        txn = self.createTransaction()
+        a = self.resultOf(txn.execSQL("a"))
         cb = txn.commandBlock()
-        b = resultOf(cb.execSQL("b"))
-        d = resultOf(txn.execSQL("d"))
-        c = resultOf(cb.execSQL("c"))
+        b = self.resultOf(cb.execSQL("b"))
+        d = self.resultOf(txn.execSQL("d"))
+        c = self.resultOf(cb.execSQL("c"))
         cb.end()
-        e = resultOf(txn.execSQL("e"))
+        e = self.resultOf(txn.execSQL("e"))
         self.assertEquals(self.factory.connections[0].cursors[0].allExecutions,
                           [("a", []), ("b", []), ("c", []), ("d", []),
                            ("e", [])])
@@ -1097,13 +1121,13 @@
         executing until all SQL statements scheduled before it have completed.
         """
         self.pauseHolders()
-        txn = self.pool.connection()
-        a = resultOf(txn.execSQL("a"))
-        b = resultOf(txn.execSQL("b"))
+        txn = self.createTransaction()
+        a = self.resultOf(txn.execSQL("a"))
+        b = self.resultOf(txn.execSQL("b"))
         cb = txn.commandBlock()
-        c = resultOf(cb.execSQL("c"))
-        d = resultOf(cb.execSQL("d"))
-        e = resultOf(txn.execSQL("e"))
+        c = self.resultOf(cb.execSQL("c"))
+        d = self.resultOf(cb.execSQL("d"))
+        e = self.resultOf(txn.execSQL("e"))
         cb.end()
         self.flushHolders()
 
@@ -1123,7 +1147,7 @@
         When execution of one command block is complete, it will proceed to the
         next queued block, then to regular SQL executed on the transaction.
         """
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         cb1 = txn.commandBlock()
         cb2 = txn.commandBlock()
         txn.execSQL("e")
@@ -1152,7 +1176,7 @@
         L{CommandBlock.end} will raise L{AlreadyFinishedError} when called more
         than once.
         """
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         block = txn.commandBlock()
         block.end()
         self.assertRaises(AlreadyFinishedError, block.end)
@@ -1165,9 +1189,9 @@
         when you call {IAsyncTransaction.commit}(), it should not actually take
         effect if there are any pending command blocks.
         """
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         block = txn.commandBlock()
-        commitResult = resultOf(txn.commit())
+        commitResult = self.resultOf(txn.commit())
         block.execSQL("in block")
         self.assertEquals(commitResult, [])
         self.assertEquals(self.factory.connections[0].cursors[0].allExecutions,
@@ -1183,10 +1207,10 @@
         all outstanding C{execSQL}s will fail immediately, on both command
         blocks and on the transaction itself.
         """
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         block = txn.commandBlock()
         block2 = txn.commandBlock()
-        abortResult = resultOf(txn.abort())
+        abortResult = self.resultOf(txn.abort())
         self.assertEquals(abortResult, [None])
         self.assertRaises(AlreadyFinishedError, block2.execSQL, "bar")
         self.assertRaises(AlreadyFinishedError, block.execSQL, "foo")
@@ -1206,7 +1230,7 @@
         Attempting to execute SQL on a L{CommandBlock} which has had C{end}
         called on it will result in an L{AlreadyFinishedError}.
         """
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         block = txn.commandBlock()
         block.end()
         self.assertRaises(AlreadyFinishedError, block.execSQL, "hello")
@@ -1219,7 +1243,7 @@
         Once an L{IAsyncTransaction} has been committed, L{commandBlock} raises
         an exception.
         """
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         txn.commit()
         self.assertRaises(AlreadyFinishedError, txn.commandBlock)
 
@@ -1229,9 +1253,137 @@
         Once an L{IAsyncTransaction} has been committed, L{commandBlock} raises
         an exception.
         """
-        txn = self.pool.connection()
+        txn = self.createTransaction()
         txn.abort()
         self.assertRaises(AlreadyFinishedError, txn.commandBlock)
 
 
 
+class IOPump(object):
+    """
+    Connect a client and a server.
+
+    @ivar client: a client protocol
+
+    @ivar server: a server protocol
+    """
+
+    def __init__(self, client, server):
+        self.client = client
+        self.server = server
+        self.clientTransport = StringTransport()
+        self.serverTransport = StringTransport()
+        self.client.makeConnection(self.clientTransport)
+        self.server.makeConnection(self.serverTransport)
+        self.c2s = [self.clientTransport, self.server]
+        self.s2c = [self.serverTransport, self.client]
+
+
+    def moveData(self, (outTransport, inProtocol)):
+        """
+        Move data from a L{StringTransport} to an L{IProtocol}.
+
+        @return: C{True} if any data was moved, C{False} if no data was moved.
+        """
+        data = outTransport.io.getvalue()
+        outTransport.io.seek(0)
+        outTransport.io.truncate()
+        if data:
+            inProtocol.dataReceived(data)
+            return True
+        else:
+            return False
+
+
+    def pump(self):
+        """
+        Deliver all input from the client to the server, then from the server to
+        the client.
+        """
+        a = self.moveData(self.c2s)
+        b = self.moveData(self.s2c)
+        return a or b
+
+
+    def flush(self, maxTurns=100):
+        """
+        Continue pumping until no more data is flowing.
+        """
+        turns = 0
+        while self.pump():
+            turns += 1
+            if turns > maxTurns:
+                raise RuntimeError("Ran too long!")
+
+
+
+class NetworkedConnectionPoolTests(ConnectionPoolTests):
+    """
+    Tests for L{ConnectionPoolConnection} and L{ConnectionPoolClient}
+    interacting with each other.
+    """
+
+    # Don't run these tests.
+    def test_propagateDialect(self):
+        """
+        Paramstyle and dialect are configured differently.
+        """
+
+    test_propagateParamstyle = test_propagateDialect
+    test_propagateParamstyle.skip = test_propagateParamstyle.__doc__.strip()
+
+    def setUp(self):
+        """
+        Do the same setup from L{ConnectionPoolBase}, but also establish a
+        loopback connection between a L{ConnectionPoolConnection} and a
+        L{ConnectionPoolClient}.
+        """
+        super(NetworkedConnectionPoolTests, self).setUp()
+        self.pump = IOPump(ConnectionPoolClient(),
+                           ConnectionPoolConnection(self.pool))
+
+
+    def flushHolders(self):
+        """
+        In addition to flushing the L{ThreadHolder} stubs, also flush any
+        pending network I/O.
+        """
+        self.pump.flush()
+        super(NetworkedConnectionPoolTests, self).flushHolders()
+        self.pump.flush()
+
+
+    def createTransaction(self):
+        txn = self.pump.client.newTransaction()
+        self.pump.flush()
+        return txn
+
+
+    def translateError(self, err):
+        """
+        All errors raised locally will unfortunately be translated into
+        UnknownRemoteError, since AMP requires specific enumeration of all of
+        them.  Flush the locally logged error of the given type and return
+        L{UnknownRemoteError}.
+        """
+        self.flushLoggedErrors(err)
+        return UnknownRemoteError
+
+
+    def resultOf(self, it):
+        result = resultOf(it)
+        self.pump.flush()
+        return result
+
+
+    def test_newTransaction(self):
+        """
+        L{ConnectionPoolClient.newTransaction} returns a provider of
+        L{IAsyncTransaction}, and creates a new transaction on the server side.
+        """
+        txn = self.pump.client.newTransaction()
+        verifyObject(IAsyncTransaction, txn)
+        self.pump.flush()
+        self.assertEquals(len(self.factory.connections), 1)
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111010/bc6da157/attachment-0001.html>


More information about the calendarserver-changes mailing list