[CalendarServer-changes] [8160] CalendarServer/branches/users/glyph/shared-pool-take2/twext/ enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Mon Oct 10 08:02:51 PDT 2011
Revision: 8160
http://trac.macosforge.org/projects/calendarserver/changeset/8160
Author: glyph at apple.com
Date: 2011-10-10 08:02:51 -0700 (Mon, 10 Oct 2011)
Log Message:
-----------
retrofit existing tests so that all existing functionality is covered in the ConnectionPoolClient
Modified Paths:
--------------
CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/adbapi2.py
CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/test/test_adbapi2.py
Modified: CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/adbapi2.py 2011-10-10 15:02:40 UTC (rev 8159)
+++ CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/adbapi2.py 2011-10-10 15:02:51 UTC (rev 8160)
@@ -1154,7 +1154,9 @@
"""
Successfully complete the given transaction.
"""
- return self._complete(transactionID, lambda x: x.commit())
+ def commitme(x):
+ return x.commit()
+ return self._complete(transactionID, commitme)
@Abort.responder
@@ -1162,7 +1164,9 @@
"""
Roll back the given transaction.
"""
- return self._complete(transactionID, lambda x: x.abort())
+ def abortme(x):
+ return x.abort()
+ return self._complete(transactionID, abortme)
@@ -1170,6 +1174,11 @@
"""
A client which can execute SQL.
"""
+
+ # See DEFAULT_PARAM_STYLE FIXME above.
+ paramstyle = DEFAULT_PARAM_STYLE
+ dialect = POSTGRES_DIALECT
+
def __init__(self):
super(ConnectionPoolClient, self).__init__()
self._nextID = count().next
@@ -1243,9 +1252,6 @@
implements(IAsyncTransaction)
- # See DEFAULT_PARAM_STYLE FIXME above.
- paramstyle = DEFAULT_PARAM_STYLE
-
def __init__(self, client, transactionID):
"""
Initialize a transaction with a L{ConnectionPoolClient} and a unique
@@ -1256,14 +1262,35 @@
self._completed = False
+ @property
+ def paramstyle(self):
+ """
+ Forward 'paramstyle' attribute to the client.
+ """
+ return self._client.paramstyle
+
+
+ @property
+ def dialect(self):
+ """
+ Forward 'dialect' attribute to the client.
+ """
+ return self._client.dialect
+
+
def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
if args is None:
args = []
queryID = str(self._client._nextID())
query = self._client._queries[queryID] = _Query(raiseOnZeroRowCount)
- self._client.callRemote(ExecSQL, queryID=queryID, sql=sql, args=args,
- transactionID=self._transactionID)
- return query.deferred
+ result = (
+ self._client.callRemote(
+ ExecSQL, queryID=queryID, sql=sql, args=args,
+ transactionID=self._transactionID
+ )
+ .addCallback(lambda nothing: query.deferred)
+ )
+ return result
def _complete(self, command):
@@ -1272,7 +1299,7 @@
self._completed = True
return self._client.callRemote(
command, transactionID=self._transactionID
- ).addCallback(lambda x: None)
+ ).addCallback(lambda x: None)
def commit(self):
@@ -1283,3 +1310,8 @@
return self._complete(Abort)
+ def commandBlock(self):
+ raise NotImplementedError()
+
+
+
Modified: CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/test/test_adbapi2.py 2011-10-10 15:02:40 UTC (rev 8159)
+++ CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/test/test_adbapi2.py 2011-10-10 15:02:51 UTC (rev 8160)
@@ -20,19 +20,28 @@
from itertools import count
-from zope.interface.verify import verifyClass
+from zope.interface.verify import verifyClass, verifyObject
+from zope.interface.declarations import implements
from twisted.python.threadpool import ThreadPool
+
+from twisted.protocols.amp import UnknownRemoteError
+
from twisted.trial.unittest import TestCase
from twisted.internet.defer import execute
from twisted.internet.task import Clock
+from twisted.internet.interfaces import IReactorThreads
from twisted.internet.defer import Deferred
+
+from twisted.test.proto_helpers import StringTransport
+
from twext.enterprise.ienterprise import ConnectionError
from twext.enterprise.ienterprise import AlreadyFinishedError
-from twisted.internet.interfaces import IReactorThreads
-from zope.interface.declarations import implements
+from twext.enterprise.adbapi2 import ConnectionPoolClient
+from twext.enterprise.adbapi2 import ConnectionPoolConnection
+from twext.enterprise.ienterprise import IAsyncTransaction
from twext.enterprise.adbapi2 import ConnectionPool
@@ -421,9 +430,9 @@
-class ConnectionPoolTests(TestCase):
+class ConnectionPoolBase(TestCase):
"""
- Tests for L{ConnectionPool}.
+ Common functionality for testing L{ConnectionPool}.
"""
def setUp(self):
@@ -439,20 +448,15 @@
self.pool._createHolder = self.makeAHolder
self.clock = self.pool.reactor = ClockWithThreads()
self.pool.startService()
+ self.addCleanup(self.flushHolders)
- def tearDown(self):
+ def flushHolders(self):
"""
- Make sure the service is stopped and the fake ThreadHolders are all
+ Flush all pending C{submit}s since C{pauseHolders} was called. This
+ makes sure the service is stopped and the fake ThreadHolders are all
executing their queues so failed tests can exit cleanly.
"""
- self.flushHolders()
-
-
- def flushHolders(self):
- """
- Flush all pending C{submit}s since C{pauseHolders} was called.
- """
self.paused = False
for holder in self.holders:
holder.flush()
@@ -475,6 +479,24 @@
return fth
+ def resultOf(self, it):
+ return resultOf(it)
+
+
+ def createTransaction(self):
+ return self.pool.connection()
+
+
+ def translateError(self, err):
+ return err
+
+
+
+class ConnectionPoolTests(ConnectionPoolBase):
+ """
+ Tests for L{ConnectionPool}.
+ """
+
def test_tooManyConnections(self):
"""
When the number of outstanding busy transactions exceeds the number of
@@ -483,29 +505,29 @@
not backed by any real database connection; this object will queue its
SQL statements until an existing connection becomes available.
"""
- a = self.pool.connection()
+ a = self.createTransaction()
- alphaResult = resultOf(a.execSQL("alpha"))
+ alphaResult = self.resultOf(a.execSQL("alpha"))
[[counter, echo]] = alphaResult[0]
- b = self.pool.connection()
+ b = self.createTransaction()
# 'b' should have opened a connection.
self.assertEquals(len(self.factory.connections), 2)
- betaResult = resultOf(b.execSQL("beta"))
+ betaResult = self.resultOf(b.execSQL("beta"))
[[bcounter, becho]] = betaResult[0]
# both 'a' and 'b' are holding open a connection now; let's try to open
# a third one. (The ordering will be deterministic even if this fails,
# because those threads are already busy.)
- c = self.pool.connection()
- gammaResult = resultOf(c.execSQL("gamma"))
+ c = self.createTransaction()
+ gammaResult = self.resultOf(c.execSQL("gamma"))
# Did 'c' open a connection? Let's hope not...
self.assertEquals(len(self.factory.connections), 2)
# SQL shouldn't be executed too soon...
self.assertEquals(gammaResult, [])
- commitResult = resultOf(b.commit())
+ commitResult = self.resultOf(b.commit())
# Now that 'b' has committed, 'c' should be able to complete.
[[ccounter, cecho]] = gammaResult[0]
@@ -523,8 +545,8 @@
L{ConnectionPool.stopService} stops all the associated L{ThreadHolder}s
and thereby frees up the resources it is holding.
"""
- a = self.pool.connection()
- alphaResult = resultOf(a.execSQL("alpha"))
+ a = self.createTransaction()
+ alphaResult = self.resultOf(a.execSQL("alpha"))
[[[counter, echo]]] = alphaResult
self.assertEquals(len(self.factory.connections), 1)
self.assertEquals(len(self.holders), 1)
@@ -549,7 +571,7 @@
self.factory.willFail()
self.factory.willFail()
self.factory.willConnect()
- c = self.pool.connection()
+ c = self.createTransaction()
def checkOneFailure():
errors = self.flushLoggedErrors(FakeConnectionError)
self.assertEquals(len(errors), 1)
@@ -562,6 +584,7 @@
checkOneFailure()
self.assertEquals(happened, [])
self.clock.advance(self.pool.RETRY_TIMEOUT + 0.01)
+ self.flushHolders()
self.assertEquals(happened, [[[1, "alpha"]]])
@@ -573,7 +596,7 @@
as normal.
"""
self.factory.defaultFail()
- self.pool.connection()
+ self.createTransaction()
errors = self.flushLoggedErrors(FakeConnectionError)
self.assertEquals(len(errors), 1)
stopd = []
@@ -592,7 +615,7 @@
connection attempt has finished; in this case, succeeded.
"""
self.pauseHolders()
- self.pool.connection()
+ self.createTransaction()
stopd = []
self.pool.stopService().addBoth(stopd.append)
self.assertEquals(stopd, [])
@@ -611,7 +634,7 @@
"""
self.factory.defaultFail()
self.pauseHolders()
- self.pool.connection()
+ self.createTransaction()
stopd = []
self.pool.stopService().addBoth(stopd.append)
self.assertEquals(stopd, [])
@@ -633,13 +656,13 @@
"""
# TODO: commit() too?
self.pauseHolders()
- c = self.pool.connection()
- abortResult = resultOf(c.abort())
+ c = self.createTransaction()
+ abortResult = self.resultOf(c.abort())
# Should abort instantly, as it hasn't managed to unspool anything yet.
# FIXME: kill all Deferreds associated with this thing, make sure that
# any outstanding query callback chains get nuked.
self.assertEquals(abortResult, [None])
- stopResult = resultOf(self.pool.stopService())
+ stopResult = self.resultOf(self.pool.stopService())
self.assertEquals(stopResult, [])
self.flushHolders()
#self.assertEquals(abortResult, [None])
@@ -654,17 +677,17 @@
"""
# Use up the free slots so we have to spool.
hold = []
- hold.append(self.pool.connection())
- hold.append(self.pool.connection())
+ hold.append(self.createTransaction())
+ hold.append(self.createTransaction())
- c = self.pool.connection()
- se = resultOf(c.execSQL("alpha"))
- ce = resultOf(c.commit())
+ c = self.createTransaction()
+ se = self.resultOf(c.execSQL("alpha"))
+ ce = self.resultOf(c.commit())
self.assertEquals(se, [])
self.assertEquals(ce, [])
- self.pool.stopService()
- self.assertEquals(se[0].type, ConnectionError)
- self.assertEquals(ce[0].type, ConnectionError)
+ self.resultOf(self.pool.stopService())
+ self.assertEquals(se[0].type, self.translateError(ConnectionError))
+ self.assertEquals(ce[0].type, self.translateError(ConnectionError))
def test_repoolSpooled(self):
@@ -675,15 +698,15 @@
behind any detritus that prevents stopService from working.
"""
self.pauseHolders()
- c = self.pool.connection()
- c2 = self.pool.connection()
- c3 = self.pool.connection()
+ c = self.createTransaction()
+ c2 = self.createTransaction()
+ c3 = self.createTransaction()
c.commit()
c2.commit()
c3.commit()
self.flushHolders()
self.assertEquals(len(self.factory.connections), 2)
- stopResult = resultOf(self.pool.stopService())
+ stopResult = self.resultOf(self.pool.stopService())
self.assertEquals(stopResult, [None])
self.assertEquals(len(self.factory.connections), 2)
self.assertEquals(self.factory.connections[0].closed, True)
@@ -695,13 +718,14 @@
Calls to connection() after stopService() result in transactions which
immediately fail all operations.
"""
- stopResults = resultOf(self.pool.stopService())
+ stopResults = self.resultOf(self.pool.stopService())
self.assertEquals(stopResults, [None])
self.pauseHolders()
- postClose = self.pool.connection()
- queryResult = resultOf(postClose.execSQL("hello"))
+ postClose = self.createTransaction()
+ queryResult = self.resultOf(postClose.execSQL("hello"))
self.assertEquals(len(queryResult), 1)
- self.assertEquals(queryResult[0].type, ConnectionError)
+ self.assertEquals(queryResult[0].type,
+ self.translateError(ConnectionError))
def test_connectAfterStartedStopping(self):
@@ -711,16 +735,18 @@
operations.
"""
self.pauseHolders()
- preClose = self.pool.connection()
- preCloseResult = resultOf(preClose.execSQL('statement'))
- stopResult = resultOf(self.pool.stopService())
- postClose = self.pool.connection()
- queryResult = resultOf(postClose.execSQL("hello"))
+ preClose = self.createTransaction()
+ preCloseResult = self.resultOf(preClose.execSQL('statement'))
+ stopResult = self.resultOf(self.pool.stopService())
+ postClose = self.createTransaction()
+ queryResult = self.resultOf(postClose.execSQL("hello"))
self.assertEquals(stopResult, [])
self.assertEquals(len(queryResult), 1)
- self.assertEquals(queryResult[0].type, ConnectionError)
+ self.assertEquals(queryResult[0].type,
+ self.translateError(ConnectionError))
self.assertEquals(len(preCloseResult), 1)
- self.assertEquals(preCloseResult[0].type, ConnectionError)
+ self.assertEquals(preCloseResult[0].type,
+ self.translateError(ConnectionError))
def test_abortFailsDuringStopService(self):
@@ -730,16 +756,16 @@
happens, shutdown should continue.
"""
txns = []
- txns.append(self.pool.connection())
- txns.append(self.pool.connection())
+ txns.append(self.createTransaction())
+ txns.append(self.createTransaction())
for txn in txns:
# Make sure rollback will actually be executed.
- results = resultOf(txn.execSQL("maybe change something!"))
+ results = self.resultOf(txn.execSQL("maybe change something!"))
[[[counter, echo]]] = results
self.assertEquals("maybe change something!", echo)
# Fail one (and only one) call to rollback().
self.factory.rollbackFail = True
- stopResult = resultOf(self.pool.stopService())
+ stopResult = self.resultOf(self.pool.stopService())
self.assertEquals(stopResult, [None])
self.assertEquals(len(self.flushLoggedErrors(RollbackFail)), 1)
self.assertEquals(self.factory.connections[0].closed, True)
@@ -751,11 +777,11 @@
L{ConnectionPool.stopService} will shut down if a recycled transaction
is still pending.
"""
- recycled = self.pool.connection()
- recycled.commit()
+ recycled = self.createTransaction()
+ self.resultOf(recycled.commit())
remember = []
- remember.append(self.pool.connection())
- self.assertEquals(resultOf(self.pool.stopService()), [None])
+ remember.append(self.createTransaction())
+ self.assertEquals(self.resultOf(self.pool.stopService()), [None])
def test_abortSpooled(self):
@@ -766,14 +792,14 @@
"""
# Use up the available connections ...
for i in xrange(self.pool.maxConnections):
- self.pool.connection()
+ self.createTransaction()
# ... so that this one has to be spooled.
- spooled = self.pool.connection()
- result = resultOf(spooled.execSQL("alpha"))
+ spooled = self.createTransaction()
+ result = self.resultOf(spooled.execSQL("alpha"))
# sanity check, it would be bad if this actually executed.
self.assertEqual(result, [])
- spooled.abort()
- self.assertEqual(result[0].type, ConnectionError)
+ self.resultOf(spooled.abort())
+ self.assertEqual(result[0].type, self.translateError(ConnectionError))
def test_waitForAlreadyAbortedTransaction(self):
@@ -781,9 +807,9 @@
L{ConnectionPool.stopService} will wait for all transactions to shut
down before exiting, including those which have already been stopped.
"""
- it = self.pool.connection()
+ it = self.createTransaction()
self.pauseHolders()
- abortResult = resultOf(it.abort())
+ abortResult = self.resultOf(it.abort())
# steal it from the queue so we can do it out of order
d, work = self.holders[0].queue.pop()
@@ -792,12 +818,13 @@
self.assertEquals(self.holders[0].queue, [])
self.assertEquals(len(self.holders), 1)
self.flushHolders()
- stopResult = resultOf(self.pool.stopService())
+ stopResult = self.resultOf(self.pool.stopService())
# Sanity check that we haven't actually stopped it yet
self.assertEquals(abortResult, [])
# We haven't fired it yet, so the service had better not have stopped...
self.assertEquals(stopResult, [])
d.callback(None)
+ self.flushHolders()
self.assertEquals(abortResult, [None])
self.assertEquals(stopResult, [None])
@@ -807,15 +834,15 @@
L{ConnectionPool.connection} will not spawn more than the maximum
connections if there are finishing transactions outstanding.
"""
- a = self.pool.connection()
- b = self.pool.connection()
+ a = self.createTransaction()
+ b = self.createTransaction()
self.pauseHolders()
a.abort()
b.abort()
# Remove the holders for the existing connections, so that the 'extra'
# connection() call wins the race and gets executed first.
self.holders[:] = []
- self.pool.connection()
+ self.createTransaction()
self.flushHolders()
self.assertEquals(len(self.factory.connections), 2)
@@ -827,16 +854,16 @@
"""
TEST_PARAMSTYLE = "justtesting"
self.pool.paramstyle = TEST_PARAMSTYLE
- normaltxn = self.pool.connection()
+ normaltxn = self.createTransaction()
self.assertEquals(normaltxn.paramstyle, TEST_PARAMSTYLE)
self.pauseHolders()
extra = []
- extra.append(self.pool.connection())
- waitingtxn = self.pool.connection()
+ extra.append(self.createTransaction())
+ waitingtxn = self.createTransaction()
self.assertEquals(waitingtxn.paramstyle, TEST_PARAMSTYLE)
self.flushHolders()
self.pool.stopService()
- notxn = self.pool.connection()
+ notxn = self.createTransaction()
self.assertEquals(notxn.paramstyle, TEST_PARAMSTYLE)
@@ -847,16 +874,16 @@
"""
TEST_DIALECT = "otherdialect"
self.pool.dialect = TEST_DIALECT
- normaltxn = self.pool.connection()
+ normaltxn = self.createTransaction()
self.assertEquals(normaltxn.dialect, TEST_DIALECT)
self.pauseHolders()
extra = []
- extra.append(self.pool.connection())
- waitingtxn = self.pool.connection()
+ extra.append(self.createTransaction())
+ waitingtxn = self.createTransaction()
self.assertEquals(waitingtxn.dialect, TEST_DIALECT)
self.flushHolders()
self.pool.stopService()
- notxn = self.pool.connection()
+ notxn = self.createTransaction()
self.assertEquals(notxn.dialect, TEST_DIALECT)
@@ -877,7 +904,7 @@
# it's recycling the underlying transaction, or connect() just
# succeeded. Either way you just have a _SingleTxn wrapping a
# _ConnectedTxn.
- txn = self.pool.connection()
+ txn = self.createTransaction()
self.assertEquals(len(self.factory.connections), 1,
"Sanity check failed.")
class CustomExecuteFailed(Exception):
@@ -885,7 +912,7 @@
Custom 'execute-failed' exception.
"""
self.factory.connections[0].executeWillFail(CustomExecuteFailed)
- results = resultOf(txn.execSQL("hello, world!"))
+ results = self.resultOf(txn.execSQL("hello, world!"))
[[[counter, echo]]] = results
self.assertEquals("hello, world!", echo)
# Two execution attempts should have been made, one on each connection.
@@ -911,15 +938,15 @@
then, the database server will shut down and the connections will die,
but we will be none the wiser until we try to use them.
"""
- txn = self.pool.connection()
+ txn = self.createTransaction()
moreFailureSetup(self.factory)
self.assertEquals(len(self.factory.connections), 1,
"Sanity check failed.")
- results = resultOf(txn.execSQL("hello, world!"))
+ results = self.resultOf(txn.execSQL("hello, world!"))
txn.commit()
[[[counter, echo]]] = results
self.assertEquals("hello, world!", echo)
- txn2 = self.pool.connection()
+ txn2 = self.createTransaction()
self.assertEquals(len(self.factory.connections), 1,
"Sanity check failed.")
class CustomExecFail(Exception):
@@ -927,7 +954,7 @@
Custom 'execute()' failure.
"""
self.factory.connections[0].executeWillFail(CustomExecFail)
- results = resultOf(txn2.execSQL("second try!"))
+ results = self.resultOf(txn2.execSQL("second try!"))
txn2.commit()
[[[counter, echo]]] = results
self.assertEquals("second try!", echo)
@@ -970,16 +997,15 @@
statement transparently re-executed by the logic tested by
L{test_reConnectWhenFirstExecFails}.
"""
- txn = self.pool.connection()
+ txn = self.createTransaction()
self.factory.commitFail = True
self.factory.rollbackFail = True
- [x] = resultOf(txn.commit())
+ [x] = self.resultOf(txn.commit())
# No statements have been executed, so 'commit' will *not* be executed.
self.assertEquals(self.factory.commitFail, True)
self.assertIdentical(x, None)
self.assertEquals(len(self.pool._free), 1)
- self.assertIn(txn._baseTxn, self.pool._free)
self.assertEquals(self.pool._finishing, [])
self.assertEquals(len(self.factory.connections), 1)
self.assertEquals(self.factory.connections[0].closed, False)
@@ -1001,18 +1027,18 @@
relaying the exception back to application code but attempting a
re-connection on the next try.
"""
- txn = self.pool.connection()
- [[[counter, echo]]] = resultOf(txn.execSQL("hello, world!", []))
+ txn = self.createTransaction()
+ [[[counter, echo]]] = self.resultOf(txn.execSQL("hello, world!", []))
self.factory.connections[0].executeWillFail(ZeroDivisionError)
- [f] = resultOf(txn.execSQL("divide by zero", []))
- f.trap(ZeroDivisionError)
+ [f] = self.resultOf(txn.execSQL("divide by zero", []))
+ f.trap(self.translateError(ZeroDivisionError))
self.assertEquals(self.factory.connections[0].executions, 2)
# Reconnection should work exactly as before.
self.assertEquals(self.factory.connections[0].closed, False)
# Application code has to roll back its transaction at this point, since
# it failed (and we don't necessarily know why it failed: not enough
# information).
- txn.abort()
+ self.resultOf(txn.abort())
self.factory.connections[0].executions = 0 # re-set for next test
self.assertEquals(len(self.factory.connections), 1)
self.test_reConnectWhenFirstExecFails()
@@ -1028,17 +1054,16 @@
Also, a new connection will immediately be established to keep the pool
size the same.
"""
- txn = self.pool.connection()
- results = resultOf(txn.execSQL("maybe change something!"))
+ txn = self.createTransaction()
+ results = self.resultOf(txn.execSQL("maybe change something!"))
[[[counter, echo]]] = results
self.assertEquals("maybe change something!", echo)
self.factory.rollbackFail = True
- [x] = resultOf(txn.abort())
+ [x] = self.resultOf(txn.abort())
# Abort does not propagate the error on, the transaction merely gets
# disposed of.
self.assertIdentical(x, None)
self.assertEquals(len(self.pool._free), 1)
- self.assertNotIn(txn._baseTxn, self.pool._free)
self.assertEquals(self.pool._finishing, [])
self.assertEquals(len(self.factory.connections), 2)
self.assertEquals(self.factory.connections[0].closed, True)
@@ -1053,15 +1078,14 @@
C{commit} has to be relayed to client code, since that actually means
some changes didn't hit the database.
"""
- txn = self.pool.connection()
+ txn = self.createTransaction()
self.factory.commitFail = True
- results = resultOf(txn.execSQL("maybe change something!"))
+ results = self.resultOf(txn.execSQL("maybe change something!"))
[[[counter, echo]]] = results
self.assertEquals("maybe change something!", echo)
- [x] = resultOf(txn.commit())
- x.trap(CommitFail)
+ [x] = self.resultOf(txn.commit())
+ x.trap(self.translateError(CommitFail))
self.assertEquals(len(self.pool._free), 1)
- self.assertNotIn(txn._baseTxn, self.pool._free)
self.assertEquals(self.pool._finishing, [])
self.assertEquals(len(self.factory.connections), 2)
self.assertEquals(self.factory.connections[0].closed, True)
@@ -1073,14 +1097,14 @@
L{IAsyncTransaction.commandBlock} returns an L{IAsyncTransaction}
provider which ensures that a block of commands are executed together.
"""
- txn = self.pool.connection()
- a = resultOf(txn.execSQL("a"))
+ txn = self.createTransaction()
+ a = self.resultOf(txn.execSQL("a"))
cb = txn.commandBlock()
- b = resultOf(cb.execSQL("b"))
- d = resultOf(txn.execSQL("d"))
- c = resultOf(cb.execSQL("c"))
+ b = self.resultOf(cb.execSQL("b"))
+ d = self.resultOf(txn.execSQL("d"))
+ c = self.resultOf(cb.execSQL("c"))
cb.end()
- e = resultOf(txn.execSQL("e"))
+ e = self.resultOf(txn.execSQL("e"))
self.assertEquals(self.factory.connections[0].cursors[0].allExecutions,
[("a", []), ("b", []), ("c", []), ("d", []),
("e", [])])
@@ -1097,13 +1121,13 @@
executing until all SQL statements scheduled before it have completed.
"""
self.pauseHolders()
- txn = self.pool.connection()
- a = resultOf(txn.execSQL("a"))
- b = resultOf(txn.execSQL("b"))
+ txn = self.createTransaction()
+ a = self.resultOf(txn.execSQL("a"))
+ b = self.resultOf(txn.execSQL("b"))
cb = txn.commandBlock()
- c = resultOf(cb.execSQL("c"))
- d = resultOf(cb.execSQL("d"))
- e = resultOf(txn.execSQL("e"))
+ c = self.resultOf(cb.execSQL("c"))
+ d = self.resultOf(cb.execSQL("d"))
+ e = self.resultOf(txn.execSQL("e"))
cb.end()
self.flushHolders()
@@ -1123,7 +1147,7 @@
When execution of one command block is complete, it will proceed to the
next queued block, then to regular SQL executed on the transaction.
"""
- txn = self.pool.connection()
+ txn = self.createTransaction()
cb1 = txn.commandBlock()
cb2 = txn.commandBlock()
txn.execSQL("e")
@@ -1152,7 +1176,7 @@
L{CommandBlock.end} will raise L{AlreadyFinishedError} when called more
than once.
"""
- txn = self.pool.connection()
+ txn = self.createTransaction()
block = txn.commandBlock()
block.end()
self.assertRaises(AlreadyFinishedError, block.end)
@@ -1165,9 +1189,9 @@
when you call {IAsyncTransaction.commit}(), it should not actually take
effect if there are any pending command blocks.
"""
- txn = self.pool.connection()
+ txn = self.createTransaction()
block = txn.commandBlock()
- commitResult = resultOf(txn.commit())
+ commitResult = self.resultOf(txn.commit())
block.execSQL("in block")
self.assertEquals(commitResult, [])
self.assertEquals(self.factory.connections[0].cursors[0].allExecutions,
@@ -1183,10 +1207,10 @@
all outstanding C{execSQL}s will fail immediately, on both command
blocks and on the transaction itself.
"""
- txn = self.pool.connection()
+ txn = self.createTransaction()
block = txn.commandBlock()
block2 = txn.commandBlock()
- abortResult = resultOf(txn.abort())
+ abortResult = self.resultOf(txn.abort())
self.assertEquals(abortResult, [None])
self.assertRaises(AlreadyFinishedError, block2.execSQL, "bar")
self.assertRaises(AlreadyFinishedError, block.execSQL, "foo")
@@ -1206,7 +1230,7 @@
Attempting to execute SQL on a L{CommandBlock} which has had C{end}
called on it will result in an L{AlreadyFinishedError}.
"""
- txn = self.pool.connection()
+ txn = self.createTransaction()
block = txn.commandBlock()
block.end()
self.assertRaises(AlreadyFinishedError, block.execSQL, "hello")
@@ -1219,7 +1243,7 @@
Once an L{IAsyncTransaction} has been committed, L{commandBlock} raises
an exception.
"""
- txn = self.pool.connection()
+ txn = self.createTransaction()
txn.commit()
self.assertRaises(AlreadyFinishedError, txn.commandBlock)
@@ -1229,9 +1253,137 @@
Once an L{IAsyncTransaction} has been committed, L{commandBlock} raises
an exception.
"""
- txn = self.pool.connection()
+ txn = self.createTransaction()
txn.abort()
self.assertRaises(AlreadyFinishedError, txn.commandBlock)
+class IOPump(object):
+ """
+ Connect a client and a server.
+
+ @ivar client: a client protocol
+
+ @ivar server: a server protocol
+ """
+
+ def __init__(self, client, server):
+ self.client = client
+ self.server = server
+ self.clientTransport = StringTransport()
+ self.serverTransport = StringTransport()
+ self.client.makeConnection(self.clientTransport)
+ self.server.makeConnection(self.serverTransport)
+ self.c2s = [self.clientTransport, self.server]
+ self.s2c = [self.serverTransport, self.client]
+
+
+ def moveData(self, (outTransport, inProtocol)):
+ """
+ Move data from a L{StringTransport} to an L{IProtocol}.
+
+ @return: C{True} if any data was moved, C{False} if no data was moved.
+ """
+ data = outTransport.io.getvalue()
+ outTransport.io.seek(0)
+ outTransport.io.truncate()
+ if data:
+ inProtocol.dataReceived(data)
+ return True
+ else:
+ return False
+
+
+ def pump(self):
+ """
+ Deliver all input from the client to the server, then from the server to
+ the client.
+ """
+ a = self.moveData(self.c2s)
+ b = self.moveData(self.s2c)
+ return a or b
+
+
+ def flush(self, maxTurns=100):
+ """
+ Continue pumping until no more data is flowing.
+ """
+ turns = 0
+ while self.pump():
+ turns += 1
+ if turns > maxTurns:
+ raise RuntimeError("Ran too long!")
+
+
+
+class NetworkedConnectionPoolTests(ConnectionPoolTests):
+ """
+ Tests for L{ConnectionPoolConnection} and L{ConnectionPoolClient}
+ interacting with each other.
+ """
+
+ # Don't run these tests.
+ def test_propagateDialect(self):
+ """
+ Paramstyle and dialect are configured differently.
+ """
+
+ test_propagateParamstyle = test_propagateDialect
+ test_propagateParamstyle.skip = test_propagateParamstyle.__doc__.strip()
+
+ def setUp(self):
+ """
+ Do the same setup from L{ConnectionPoolBase}, but also establish a
+ loopback connection between a L{ConnectionPoolConnection} and a
+ L{ConnectionPoolClient}.
+ """
+ super(NetworkedConnectionPoolTests, self).setUp()
+ self.pump = IOPump(ConnectionPoolClient(),
+ ConnectionPoolConnection(self.pool))
+
+
+ def flushHolders(self):
+ """
+ In addition to flushing the L{ThreadHolder} stubs, also flush any
+ pending network I/O.
+ """
+ self.pump.flush()
+ super(NetworkedConnectionPoolTests, self).flushHolders()
+ self.pump.flush()
+
+
+ def createTransaction(self):
+ txn = self.pump.client.newTransaction()
+ self.pump.flush()
+ return txn
+
+
+ def translateError(self, err):
+ """
+ All errors raised locally will unfortunately be translated into
+ UnknownRemoteError, since AMP requires specific enumeration of all of
+ them. Flush the locally logged error of the given type and return
+ L{UnknownRemoteError}.
+ """
+ self.flushLoggedErrors(err)
+ return UnknownRemoteError
+
+
+ def resultOf(self, it):
+ result = resultOf(it)
+ self.pump.flush()
+ return result
+
+
+ def test_newTransaction(self):
+ """
+ L{ConnectionPoolClient.newTransaction} returns a provider of
+ L{IAsyncTransaction}, and creates a new transaction on the server side.
+ """
+ txn = self.pump.client.newTransaction()
+ verifyObject(IAsyncTransaction, txn)
+ self.pump.flush()
+ self.assertEquals(len(self.factory.connections), 1)
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111010/bc6da157/attachment-0001.html>
More information about the calendarserver-changes
mailing list