[CalendarServer-changes] [9634] CalendarServer/branches/users/glyph/q

source_changes at macosforge.org source_changes at macosforge.org
Sat Aug 11 01:55:09 PDT 2012


Revision: 9634
          http://trac.macosforge.org/projects/calendarserver/changeset/9634
Author:   glyph at apple.com
Date:     2012-08-11 01:55:09 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
explode out work execution into a structure that allows for access to all its phases of execution

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/q/

Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:08 UTC (rev 9633)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:09 UTC (rev 9634)
@@ -376,7 +376,116 @@
     """
 
 
+class WorkProposal(object):
+    """
+    A L{WorkProposal} is a proposal for work that will be executed, perhaps on
+    another node, perhaps in the future.
 
+    @ivar pool: the connection pool which this L{WorkProposal} will use to
+        submit its work.
+    @type pool: L{PeerConnectionPool}
+
+    @ivar txn: The transaction where the work will be enqueued.
+    @type txn: L{IAsyncTransaction}
+
+    @ivar workItemType: The type of work to be enqueued by this L{WorkProposal}
+    @type workItemType: L{WorkItem} subclass
+
+    @ivar kw: The keyword arguments to pass to C{self.workItemType} to
+        construct it.
+    @type kw: L{dict}
+    """
+
+    def __init__(self, pool, txn, workItemType, kw):
+        self.pool = pool
+        self.txn = txn
+        self.workItemType = workItemType
+        self.kw = kw
+        self._whenProposed = Deferred()
+        self._whenExecuted = Deferred()
+        self._whenCommitted = Deferred()
+
+
+    def _start(self):
+        """
+        Execute this L{WorkProposal}.
+        """
+        creation = self.workItemType.create(self.txn, **self.kw)
+        @creation.addCallback
+        def created(workItem):
+            self._whenProposed.callback(None)
+            @self.txn.postCommit
+            def whenDone():
+                self._whenCommitted.callback(None)
+                peerChosen = self.pool.choosePeer()
+                @peerChosen.addCallback
+                def gotPeer(peer):
+                    performed = peer.performWork(workItem.__tbl__,
+                                                 workItem.workID)
+                    @performed.addCallback
+                    def performed(result):
+                        self._whenExecuted.callback(None)
+                    @performed.addErrback
+                    def notPerformed(why):
+                        self._whenExecuted.errback(why)
+                @peerChosen.addErrback
+                def notChosen(whyNot):
+                    self._whenExecuted.errback(whyNot)
+            @self.txn.postAbort
+            def whenFailed():
+                self._whenCommitted.errback(TransactionFailed)
+
+
+    def whenExecuted(self):
+        """
+        Let the caller know when the proposed work has been fully executed.
+
+        @note: The L{Deferred} returned by C{whenExecuted} should be used with
+            extreme caution.  If an application decides to do any
+            database-persistent work as a result of this L{Deferred} firing,
+            that work I{may be lost} as a result of a service being normally
+            shut down between the time that the work is scheduled and the time
+            that it is executed.  So, the only things that should be added as
+            callbacks to this L{Deferred} are those which are ephemeral, in
+            memory, and reflect only presentation state associated with the
+            user's perception of the completion of work, not logical chains of
+            work which need to be completed in sequence; those should all be
+            completed within the transaction of the L{WorkItem.doWork} that
+            gets executed.
+
+        @return: a L{Deferred} that fires with C{None} when the work has been
+            completed remotely.
+        """
+        # TODO: implement
+
+
+    def whenProposed(self):
+        """
+        Let the caller know when the work has been proposed; i.e. when the work
+        is first transmitted to the database.
+
+        @return: a L{Deferred} that fires with C{None} when the relevant
+            commands have been sent to the database to create the L{WorkItem},
+            and fails if those commands do not succeed for some reason.
+        """
+        # TODO: implement
+        # XXX should this actually fire the WorkItem rather than None?
+
+
+    def whenCommitted(self):
+        """
+        Let the caller know when the work has been committed to; i.e. when the
+        transaction where the work was proposed has been committed to the
+        database.
+
+        @return: a L{Deferred} that fires with C{None} when the relevant
+            transaction has been committed, or fails if the transaction is not
+            committed for any reason.
+        """
+        # TODO: implement
+
+
+
 class PeerConnectionPool(Service, object):
     """
     Each node has a L{PeerConnectionPool} connecting it to all the other nodes
@@ -457,18 +566,6 @@
         Later, let the caller know that the work has been completed by firing a
         L{Deferred}.
 
-        @note: The L{Deferred} returned by C{enqueueWork} should be used with
-            caution.  If an application decides to do any database-persistent
-            work as a result of this L{Deferred} firing, that work I{may be
-            lost} as a result of a service being normally shut down between the
-            time that the work is scheduled and the time that it is executed.
-            So, the only things that should be added as callbacks to this
-            L{Deferred} are those which are ephemeral, in memory, and reflect
-            only presentation state associated with the user's perception of
-            the completion of work, not logical chains of work which need to be
-            completed in sequence; those should all be completed within the
-            transaction of the L{WorkItem.doWork} that gets executed.
-
         @param workItemType: The type of work item to be enqueued.
         @type workItemType: A subtype of L{WorkItem}
 
@@ -476,25 +573,13 @@
         @type kw: keyword parameters to C{workItemType.create}, i.e.
             C{workItemType.__init__}
 
-        @return: a L{Deferred} that fires when the work has been completed, or
-            fails if the work does not take place due to an error later in the
-            transaction.
-        @rtype: L{Deferred} firing L{None} or failing L{TransactionFailed}
+        @return: an object that can track the enqueuing and remote execution of
+            this work.
+        @rtype: L{WorkProposal}
         """
-        @passthru(workItemType.create(txn, **kw).addCallback)
-        def created(result):
-            d = Deferred()
-            @txn.postCommit
-            @inlineCallbacks
-            def whenDone():
-                peer = yield self.choosePeer()
-                peer.performWork(result.__tbl__, result.workID)
-                d.callback(None)
-            @txn.postAbort
-            def whenFailed():
-                d.errback(TransactionFailed)
-            return d
-        return created
+        wp = WorkProposal(self, txn, workItemType, kw)
+        wp._start()
+        return wp
 
 
     def allWorkItemTypes(self):
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/20f447f0/attachment-0001.html>


More information about the calendarserver-changes mailing list