[CalendarServer-changes] [7249] CalendarServer/branches/users/glyph/subtransactions/twext/enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Thu Mar 24 13:35:15 PDT 2011
Revision: 7249
http://trac.macosforge.org/projects/calendarserver/changeset/7249
Author: glyph at apple.com
Date: 2011-03-24 13:35:13 -0700 (Thu, 24 Mar 2011)
Log Message:
-----------
basic command block (low-level prerequisite for subtransactions) implementation
Modified Paths:
--------------
CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/adbapi2.py
CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/test/test_adbapi2.py
Modified: CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/adbapi2.py 2011-03-24 20:27:07 UTC (rev 7248)
+++ CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/adbapi2.py 2011-03-24 20:35:13 UTC (rev 7249)
@@ -166,12 +166,24 @@
def _end(self, really):
"""
- Common logic for commit or abort. Executed in the cursor main thread.
+ Common logic for commit or abort. Executed in the main reactor thread.
+
+ @param really: the callable to execute in the cursor thread to actually
+ do the commit or rollback.
+
+ @return: a L{Deferred} which fires when the database logic has
+ completed.
+
+ @raise: L{AlreadyFinishedError} if the transaction has already been
+ committed or aborted.
"""
if not self._completed:
self._completed = True
def reallySomething():
- # Executed in the cursor thread.
+ """
+ Do the database work and set appropriate flags. Executed in the
+ cursor thread.
+ """
if self._cursor is None:
return
really()
@@ -264,6 +276,11 @@
implements(IAsyncTransaction)
def __init__(self, pool):
+ """
+ Initialize a L{_WaitingTxn} based on a L{ConnectionPool}. (The C{pool}
+ is used only to reflect C{dialect} and C{paramstyle} attributes; not
+ remembered or modified in any way.)
+ """
self._spool = []
self.paramstyle = pool.paramstyle
self.dialect = pool.dialect
@@ -335,9 +352,13 @@
"""
def __init__(self, pool, baseTxn):
- self._pool = pool
- self._baseTxn = baseTxn
- self._complete = False
+ self._pool = pool
+ self._baseTxn = baseTxn
+ self._complete = False
+ self._currentBlock = None
+ self._pendingBlocks = []
+ self._allPending = []
+ self._blockedQueue = None
def __repr__(self):
@@ -358,11 +379,56 @@
spooledBase._unspool(baseTxn)
- def execSQL(self, *a, **kw):
+ def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+ return self._execSQLForBlock(sql, args, raiseOnZeroRowCount, None)
+
+
+ def _execSQLForBlock(self, sql, args, raiseOnZeroRowCount, block):
+ """
+ Execute some SQL for a particular L{CommandBlock}; or, if the given
+ C{block} is C{None}, execute it in the outermost transaction context.
+ """
self._checkComplete()
- return super(_SingleTxn, self).execSQL(*a, **kw)
+ if block is None and self._blockedQueue is not None:
+ return self._blockedQueue.execSQL(sql, args, raiseOnZeroRowCount)
+ d = super(_SingleTxn, self).execSQL(sql, args, raiseOnZeroRowCount)
+ self._allPending.append(d)
+ def itsDone(result):
+ self._allPending.remove(d)
+ self._checkNextBlock()
+ return result
+ d.addBoth(itsDone)
+ return d
+ def _checkNextBlock(self):
+ """
+ Check to see if there are any blocks pending statements waiting to
+ execute, and execute the next one if there are no outstanding execute
+ calls.
+ """
+ if self._allPending:
+ return
+
+ if self._currentBlock is not None:
+ if self._currentBlock._ended:
+ # ???: is this necessary? We can immediately forget the block
+ # when it ends, we don't need to keep it around until all of its
+ # statements have completed.
+ self._currentBlock = None
+ bq = self._blockedQueue
+ self._blockedQueue = None
+ bq._unspool(self)
+ else:
+ return
+
+ if self._pendingBlocks:
+ block = self._pendingBlocks.pop(0)
+ self._currentBlock = block
+ self._blockedQueue = _WaitingTxn(self._pool)
+ block._startExecuting()
+
+
def commit(self):
self._markComplete()
return super(_SingleTxn, self).commit()
@@ -400,7 +466,66 @@
self._complete = True
+ def commandBlock(self):
+ """
+ Create an IAsyncTransaction that will wait for all currently spooled
+ commands to complete before executing its own.
+ """
+ cb = CommandBlock(self)
+ # FIXME: test the case where it's ready immediately.
+ self._checkNextBlock()
+ return cb
+
+
+class CommandBlock(object):
+ """
+ A partial implementation of L{IAsyncTransaction} that will group execSQL
+ calls together.
+
+ Does not implement commit() or abort(), because this will simply group
+ commands. In order to implement sub-transactions or checkpoints, some
+ understanding of the SQL dialect in use by the underlying connection is
+ required. Instead, it provides 'end'.
+ """
+
+ def __init__(self, singleTxn):
+ self._singleTxn = singleTxn
+ self.paramstyle = singleTxn.paramstyle
+ self.dialect = singleTxn.dialect
+ self._spool = _WaitingTxn(singleTxn._pool)
+ self._ended = False
+ singleTxn._pendingBlocks.append(self)
+
+
+ def _startExecuting(self):
+ self._spool._unspool(self)
+
+
+ def execSQL(self, sql, args=None, raiseOnZeroRowCount=None):
+ """
+ Execute some SQL within this command block.
+ """
+ # FIXME: check 'ended'
+ if self._singleTxn._currentBlock is self:
+ return self._singleTxn._execSQLForBlock(
+ sql, args, raiseOnZeroRowCount, self)
+ else:
+ return self._spool.execSQL(sql, args, raiseOnZeroRowCount)
+
+
+ def end(self):
+ """
+ The block of commands has completed. Allow other SQL to run on the
+ underlying L{IAsyncTransaction}.
+ """
+ # FIXME: test the case where end() is called when it's not the current
+ # executing block.
+ self._ended = True
+ self._singleTxn._checkNextBlock()
+
+
+
class _ConnectingPseudoTxn(object):
_retry = None
Modified: CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/test/test_adbapi2.py 2011-03-24 20:27:07 UTC (rev 7248)
+++ CalendarServer/branches/users/glyph/subtransactions/twext/enterprise/test/test_adbapi2.py 2011-03-24 20:35:13 UTC (rev 7249)
@@ -146,6 +146,7 @@
# not entirely correct, but all we care about is its truth value.
self.description = False
self.variables = []
+ self.allExecutions = []
@property
@@ -158,6 +159,7 @@
self.connection.executions += 1
if self.connection._executeFailQueue:
raise self.connection._executeFailQueue.pop(0)()
+ self.allExecutions.append((sql, args))
self.sql = sql
self.description = True
self.rowcount = 1
@@ -424,7 +426,8 @@
and thereby frees up the resources it is holding.
"""
a = self.pool.connection()
- [[[counter, echo]]] = resultOf(a.execSQL("alpha"))
+ alphaResult = resultOf(a.execSQL("alpha"))
+ [[[counter, echo]]] = alphaResult
self.assertEquals(len(self.factory.connections), 1)
self.assertEquals(len(self.holders), 1)
[holder] = self.holders
@@ -847,3 +850,26 @@
self.assertEquals(self.factory.connections[0].closed, True)
self.assertEquals(self.factory.connections[1].closed, False)
+
+ def test_commandBlock(self):
+ """
+ L{IAsyncTransaction.commandBlock} returns an L{IAsyncTransaction}
+ provider which ensures that a block of commands are executed together.
+ """
+ txn = self.pool.connection()
+ a = resultOf(txn.execSQL("a"))
+ cb = txn.commandBlock()
+ b = resultOf(cb.execSQL("b"))
+ d = resultOf(txn.execSQL("d"))
+ c = resultOf(cb.execSQL("c"))
+ cb.end()
+ e = resultOf(txn.execSQL("e"))
+ self.assertEquals(self.factory.connections[0].cursors[0].allExecutions,
+ [("a", []), ("b", []), ("c", []), ("d", []),
+ ("e", [])])
+ self.assertEquals(len(a), 1)
+ self.assertEquals(len(b), 1)
+ self.assertEquals(len(c), 1)
+ self.assertEquals(len(d), 1)
+ self.assertEquals(len(e), 1)
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110324/3cefb2a8/attachment-0001.html>
More information about the calendarserver-changes
mailing list