[CalendarServer-changes] [9634] CalendarServer/branches/users/glyph/q
source_changes at macosforge.org
source_changes at macosforge.org
Sat Aug 11 01:55:09 PDT 2012
Revision: 9634
http://trac.macosforge.org/projects/calendarserver/changeset/9634
Author: glyph at apple.com
Date: 2012-08-11 01:55:09 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
explode out work execution into a structure that allows for access to all its phases of execution
Modified Paths:
--------------
CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
Property Changed:
----------------
CalendarServer/branches/users/glyph/q/
Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:08 UTC (rev 9633)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:09 UTC (rev 9634)
@@ -376,7 +376,116 @@
"""
+class WorkProposal(object):
+ """
+ A L{WorkProposal} is a proposal for work that will be executed, perhaps on
+ another node, perhaps in the future.
+ @ivar pool: the connection pool which this L{WorkProposal} will use to
+ submit its work.
+ @type pool: L{PeerConnectionPool}
+
+ @ivar txn: The transaction where the work will be enqueued.
+ @type txn: L{IAsyncTransaction}
+
+ @ivar workItemType: The type of work to be enqueued by this L{WorkProposal}
+ @type workItemType: L{WorkItem} subclass
+
+ @ivar kw: The keyword arguments to pass to C{self.workItemType} to
+ construct it.
+ @type kw: L{dict}
+ """
+
+ def __init__(self, pool, txn, workItemType, kw):
+ self.pool = pool
+ self.txn = txn
+ self.workItemType = workItemType
+ self.kw = kw
+ self._whenProposed = Deferred()
+ self._whenExecuted = Deferred()
+ self._whenCommitted = Deferred()
+
+
+ def _start(self):
+ """
+ Execute this L{WorkProposal}.
+ """
+ creation = self.workItemType.create(self.txn, **self.kw)
+ @creation.addCallback
+ def created(workItem):
+ self._whenProposed.callback(None)
+ @self.txn.postCommit
+ def whenDone():
+ self._whenCommitted.callback(None)
+ peerChosen = self.pool.choosePeer()
+ @peerChosen.addCallback
+ def gotPeer(peer):
+ performed = peer.performWork(workItem.__tbl__,
+ workItem.workID)
+ @performed.addCallback
+ def performed(result):
+ self._whenExecuted.callback(None)
+ @performed.addErrback
+ def notPerformed(why):
+ self._whenExecuted.errback(why)
+ @peerChosen.addErrback
+ def notChosen(whyNot):
+ self._whenExecuted.errback(whyNot)
+ @self.txn.postAbort
+ def whenFailed():
+ self._whenCommitted.errback(TransactionFailed)
+
+
+ def whenExecuted(self):
+ """
+ Let the caller know when the proposed work has been fully executed.
+
+ @note: The L{Deferred} returned by C{whenExecuted} should be used with
+ extreme caution. If an application decides to do any
+ database-persistent work as a result of this L{Deferred} firing,
+ that work I{may be lost} as a result of a service being normally
+ shut down between the time that the work is scheduled and the time
+ that it is executed. So, the only things that should be added as
+ callbacks to this L{Deferred} are those which are ephemeral, in
+ memory, and reflect only presentation state associated with the
+ user's perception of the completion of work, not logical chains of
+ work which need to be completed in sequence; those should all be
+ completed within the transaction of the L{WorkItem.doWork} that
+ gets executed.
+
+ @return: a L{Deferred} that fires with C{None} when the work has been
+ completed remotely.
+ """
+ # TODO: implement
+
+
+ def whenProposed(self):
+ """
+ Let the caller know when the work has been proposed; i.e. when the work
+ is first transmitted to the database.
+
+ @return: a L{Deferred} that fires with C{None} when the relevant
+ commands have been sent to the database to create the L{WorkItem},
+ and fails if those commands do not succeed for some reason.
+ """
+ # TODO: implement
+ # XXX should this actually fire the WorkItem rather than None?
+
+
+ def whenCommitted(self):
+ """
+ Let the caller know when the work has been committed to; i.e. when the
+ transaction where the work was proposed has been committed to the
+ database.
+
+ @return: a L{Deferred} that fires with C{None} when the relevant
+ transaction has been committed, or fails if the transaction is not
+ committed for any reason.
+ """
+ # TODO: implement
+
+
+
class PeerConnectionPool(Service, object):
"""
Each node has a L{PeerConnectionPool} connecting it to all the other nodes
@@ -457,18 +566,6 @@
Later, let the caller know that the work has been completed by firing a
L{Deferred}.
- @note: The L{Deferred} returned by C{enqueueWork} should be used with
- caution. If an application decides to do any database-persistent
- work as a result of this L{Deferred} firing, that work I{may be
- lost} as a result of a service being normally shut down between the
- time that the work is scheduled and the time that it is executed.
- So, the only things that should be added as callbacks to this
- L{Deferred} are those which are ephemeral, in memory, and reflect
- only presentation state associated with the user's perception of
- the completion of work, not logical chains of work which need to be
- completed in sequence; those should all be completed within the
- transaction of the L{WorkItem.doWork} that gets executed.
-
@param workItemType: The type of work item to be enqueued.
@type workItemType: A subtype of L{WorkItem}
@@ -476,25 +573,13 @@
@type kw: keyword parameters to C{workItemType.create}, i.e.
C{workItemType.__init__}
- @return: a L{Deferred} that fires when the work has been completed, or
- fails if the work does not take place due to an error later in the
- transaction.
- @rtype: L{Deferred} firing L{None} or failing L{TransactionFailed}
+ @return: an object that can track the enqueuing and remote execution of
+ this work.
+ @rtype: L{WorkProposal}
"""
- @passthru(workItemType.create(txn, **kw).addCallback)
- def created(result):
- d = Deferred()
- @txn.postCommit
- @inlineCallbacks
- def whenDone():
- peer = yield self.choosePeer()
- peer.performWork(result.__tbl__, result.workID)
- d.callback(None)
- @txn.postAbort
- def whenFailed():
- d.errback(TransactionFailed)
- return d
- return created
+ wp = WorkProposal(self, txn, workItemType, kw)
+ wp._start()
+ return wp
def allWorkItemTypes(self):
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/20f447f0/attachment-0001.html>
More information about the calendarserver-changes
mailing list