[CalendarServer-changes] [7251] CalendarServer/branches/users/glyph/subtransactions/twext/enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Thu Mar 24 13:35:56 PDT 2011
Revision: 7251
http://trac.macosforge.org/projects/calendarserver/changeset/7251
Author: glyph at apple.com
Date: 2011-03-24 13:35:56 -0700 (Thu, 24 Mar 2011)
Log Message:
-----------
basic coverage of case where some latency is mixed in
Modified Paths:
--------------
CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/adbapi2.py
CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/test/test_adbapi2.py
Modified: CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/adbapi2.py 2011-03-24 20:35:44 UTC (rev 7250)
+++ CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/adbapi2.py 2011-03-24 20:35:56 UTC (rev 7251)
@@ -40,6 +40,7 @@
from twisted.internet.defer import inlineCallbacks
from twisted.internet.defer import returnValue
+from twisted.internet.defer import DeferredList
from twisted.internet.defer import Deferred
from twisted.protocols.amp import Boolean
from twisted.python.failure import Failure
@@ -414,25 +415,26 @@
if self._allPending:
return
- if self._currentBlock is not None:
- if self._currentBlock._ended:
- # ???: is this necessary? We can immediately forget the block
- # when it ends, we don't need to keep it around until all of its
- # statements have completed.
- self._currentBlock = None
- bq = self._blockedQueue
- self._blockedQueue = None
- bq._unspool(self)
- else:
- return
+ if self._pendingBlocks and self._currentBlock is None:
+ self._currentBlock = self._pendingBlocks.pop(0)
- if self._pendingBlocks:
- block = self._pendingBlocks.pop(0)
- self._currentBlock = block
- self._blockedQueue = _WaitingTxn(self._pool)
- block._startExecuting()
+ if self._currentBlock is not None and not self._currentBlock._started:
+ self._currentBlock._startExecuting().addCallback(
+ self._finishExecuting)
+ def _finishExecuting(self, result):
+ """
+ The active block just finished executing.
+ """
+ self._currentBlock = None
+ bq = self._blockedQueue
+ self._blockedQueue = None
+ bq._unspool(self)
+ # XXX need to check next block, there might have been nothing in the
+ # blocked executing queue.
+
+
def commit(self):
self._markComplete()
return super(_SingleTxn, self).commit()
@@ -475,13 +477,27 @@
Create an IAsyncTransaction that will wait for all currently spooled
commands to complete before executing its own.
"""
- cb = CommandBlock(self)
+ self._currentBlock = CommandBlock(self)
+ self._blockedQueue = _WaitingTxn(self._pool)
# FIXME: test the case where it's ready immediately.
self._checkNextBlock()
- return cb
+ return self._currentBlock
+class _Unspooler(object):
+ def __init__(self, orig):
+ self.orig = orig
+
+
+ def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+ """
+ Execute some SQL, but don't track a new Deferred.
+ """
+ return self.orig.execSQL(sql, args, raiseOnZeroRowCount, False)
+
+
+
class CommandBlock(object):
"""
A partial implementation of L{IAsyncTransaction} that will group execSQL
@@ -498,26 +514,42 @@
self.paramstyle = singleTxn.paramstyle
self.dialect = singleTxn.dialect
self._spool = _WaitingTxn(singleTxn._pool)
+ self._started = False
self._ended = False
+ self._waitingForEnd = []
+ self._endDeferred = Deferred()
singleTxn._pendingBlocks.append(self)
def _startExecuting(self):
- self._spool._unspool(self)
+ self._started = True
+ self._spool._unspool(_Unspooler(self))
+ return self._endDeferred
- def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+ def execSQL(self, sql, args=None, raiseOnZeroRowCount=None, track=True):
"""
Execute some SQL within this command block.
"""
# FIXME: check 'ended'
- if self._singleTxn._currentBlock is self:
- return self._singleTxn._execSQLForBlock(
+ if self._singleTxn._currentBlock is self and self._started:
+ d = self._singleTxn._execSQLForBlock(
sql, args, raiseOnZeroRowCount, self)
else:
- return self._spool.execSQL(sql, args, raiseOnZeroRowCount)
+ d = self._spool.execSQL(sql, args, raiseOnZeroRowCount)
+ if track:
+ self._trackForEnd(d)
+ return d
+ def _trackForEnd(self, d):
+ """
+ Watch the following L{Deferred}, since we need to watch it to determine
+ when C{end} should be considered done.
+ """
+ self._waitingForEnd.append(d)
+
+
def end(self):
"""
The block of commands has completed. Allow other SQL to run on the
@@ -527,6 +559,7 @@
# executing block.
self._ended = True
self._singleTxn._checkNextBlock()
+ DeferredList(self._waitingForEnd).chainDeferred(self._endDeferred)
Modified: CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/test/test_adbapi2.py 2011-03-24 20:35:44 UTC (rev 7250)
+++ CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/test/test_adbapi2.py 2011-03-24 20:35:56 UTC (rev 7251)
@@ -891,3 +891,30 @@
self.assertEquals(len(d), 1)
self.assertEquals(len(e), 1)
+
+ def test_commandBlockWithLatency(self):
+ """
+ A block returned by L{IAsyncTransaction.commandBlock} won't start
+ executing until all SQL statements scheduled before it have completed.
+ """
+ self.pauseHolders()
+ txn = self.pool.connection()
+ a = resultOf(txn.execSQL("a"))
+ b = resultOf(txn.execSQL("b"))
+ cb = txn.commandBlock()
+ c = resultOf(cb.execSQL("c"))
+ d = resultOf(cb.execSQL("d"))
+ e = resultOf(txn.execSQL("e"))
+ cb.end()
+ self.flushHolders()
+
+ self.assertEquals(self.factory.connections[0].cursors[0].allExecutions,
+ [("a", []), ("b", []), ("c", []), ("d", []),
+ ("e", [])])
+
+ self.assertEquals(len(a), 1)
+ self.assertEquals(len(b), 1)
+ self.assertEquals(len(c), 1)
+ self.assertEquals(len(d), 1)
+ self.assertEquals(len(e), 1)
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110324/06800ba7/attachment-0001.html>
More information about the calendarserver-changes
mailing list