[CalendarServer-changes] [8161] CalendarServer/branches/users/glyph/shared-pool-take2/twext/ enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Mon Oct 10 08:03:01 PDT 2011
Revision: 8161
http://trac.macosforge.org/projects/calendarserver/changeset/8161
Author: glyph at apple.com
Date: 2011-10-10 08:03:01 -0700 (Mon, 10 Oct 2011)
Log Message:
-----------
implementation of command blocks for shared connection pool, plus a few additional test tweaks to make it work
Modified Paths:
--------------
CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/adbapi2.py
CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/test/test_adbapi2.py
Modified: CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/adbapi2.py 2011-10-10 15:02:51 UTC (rev 8160)
+++ CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/adbapi2.py 2011-10-10 15:03:01 UTC (rev 8161)
@@ -57,8 +57,9 @@
from twext.enterprise.ienterprise import IDerivedParameter
from twisted.internet.defer import fail
+
from twext.enterprise.ienterprise import (
- AlreadyFinishedError, IAsyncTransaction, POSTGRES_DIALECT
+ AlreadyFinishedError, IAsyncTransaction, POSTGRES_DIALECT, ICommandBlock
)
@@ -1063,10 +1064,27 @@
"""
arguments = [('sql', String()),
('queryID', String()),
- ('args', Pickle())] + txnarg()
+ ('args', Pickle()),
+ ('blockID', String())] + txnarg()
+class StartBlock(Command):
+ """
+ Create a new SQL command block.
+ """
+ arguments = [("blockID", String())] + txnarg()
+
+
+
+class EndBlock(Command):
+ """
+ Create a new SQL command block.
+ """
+ arguments = [("blockID", String())] + txnarg()
+
+
+
class Row(Command):
"""
A row has been returned. Sent from server to client in response to
@@ -1118,6 +1136,7 @@
super(ConnectionPoolConnection, self).__init__()
self.pool = pool
self._txns = {}
+ self._blocks = {}
@StartTxn.responder
@@ -1126,11 +1145,27 @@
return {}
+ @StartBlock.responder
+ def startBlock(self, transactionID, blockID):
+ self._blocks[blockID] = self._txns[transactionID].commandBlock()
+ return {}
+
+
+ @EndBlock.responder
+ def endBlock(self, transactionID, blockID):
+ self._blocks[blockID].end()
+ return {}
+
+
@ExecSQL.responder
@inlineCallbacks
- def receivedSQL(self, transactionID, queryID, sql, args):
+ def receivedSQL(self, transactionID, queryID, sql, args, blockID):
+ if blockID:
+ txn = self._blocks[blockID]
+ else:
+ txn = self._txns[transactionID]
try:
- rows = yield self._txns[transactionID].execSQL(sql, args, _NoRows)
+ rows = yield txn.execSQL(sql, args, _NoRows)
except _NoRows:
norows = True
else:
@@ -1260,6 +1295,8 @@
self._client = client
self._transactionID = transactionID
self._completed = False
+ self._committing = False
+ self._committed = False
@property
@@ -1278,7 +1315,10 @@
return self._client.dialect
- def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+ def execSQL(self, sql, args=None, raiseOnZeroRowCount=None, blockID=""):
+ if not blockID:
+ if self._completed:
+ raise AlreadyFinishedError()
if args is None:
args = []
queryID = str(self._client._nextID())
@@ -1286,7 +1326,7 @@
result = (
self._client.callRemote(
ExecSQL, queryID=queryID, sql=sql, args=args,
- transactionID=self._transactionID
+ transactionID=self._transactionID, blockID=blockID
)
.addCallback(lambda nothing: query.deferred)
)
@@ -1303,7 +1343,11 @@
def commit(self):
- return self._complete(Commit)
+ self._committing = True
+ def done(whatever):
+ self._committed = True
+ return whatever
+ return self._complete(Commit).addBoth(done)
def abort(self):
@@ -1311,7 +1355,51 @@
def commandBlock(self):
- raise NotImplementedError()
+ if self._completed:
+ raise AlreadyFinishedError()
+ blockID = str(self._client._nextID())
+ self._client.callRemote(
+ StartBlock, blockID=blockID, transactionID=self._transactionID
+ )
+ return _NetCommandBlock(self, blockID)
+class _NetCommandBlock(object):
+ """
+ Net command block.
+ """
+
+ implements(ICommandBlock)
+
+ def __init__(self, transaction, blockID):
+ self._transaction = transaction
+ self._blockID = blockID
+ self._ended = False
+
+
+ def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+ """
+ Execute some SQL on this command block.
+ """
+ if (self._ended or
+ self._transaction._completed and
+ not self._transaction._committing or
+ self._transaction._committed):
+ raise AlreadyFinishedError()
+ return self._transaction.execSQL(sql, args, raiseOnZeroRowCount,
+ self._blockID)
+
+
+ def end(self):
+ """
+ End this block.
+ """
+ if self._ended:
+ raise AlreadyFinishedError()
+ self._ended = True
+ self._transaction._client.callRemote(
+ EndBlock, blockID=self._blockID,
+ transactionID=self._transaction._transactionID
+ )
+
Modified: CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/test/test_adbapi2.py 2011-10-10 15:02:51 UTC (rev 8160)
+++ CalendarServer/branches/users/glyph/shared-pool-take2/twext/enterprise/test/test_adbapi2.py 2011-10-10 15:03:01 UTC (rev 8161)
@@ -1158,6 +1158,7 @@
cb2.end()
cb1.end()
flush()
+ self.flushHolders()
self.assertEquals(self.factory.connections[0].cursors[0].allExecutions,
[("a", []), ("b", []), ("c", []), ("d", []),
("e", [])])
@@ -1192,11 +1193,12 @@
txn = self.createTransaction()
block = txn.commandBlock()
commitResult = self.resultOf(txn.commit())
- block.execSQL("in block")
+ self.resultOf(block.execSQL("in block"))
self.assertEquals(commitResult, [])
self.assertEquals(self.factory.connections[0].cursors[0].allExecutions,
[("in block", [])])
block.end()
+ self.flushHolders()
self.assertEquals(commitResult, [None])
@@ -1254,7 +1256,7 @@
an exception.
"""
txn = self.createTransaction()
- txn.abort()
+ self.resultOf(txn.abort())
self.assertRaises(AlreadyFinishedError, txn.commandBlock)
@@ -1326,7 +1328,8 @@
# Don't run these tests.
def test_propagateDialect(self):
"""
- Paramstyle and dialect are configured differently.
+ Paramstyle and dialect are configured differently for
+ shared-connection-pool transactions.
"""
test_propagateParamstyle = test_propagateDialect
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111010/ce0f79c2/attachment.html>
More information about the calendarserver-changes
mailing list