[CalendarServer-changes] [9674] CalendarServer/branches/users/glyph/q

source_changes at macosforge.org source_changes at macosforge.org
Sat Aug 11 01:55:43 PDT 2012


Revision: 9674
          http://trac.macosforge.org/projects/calendarserver/changeset/9674
Author:   glyph at apple.com
Date:     2012-08-11 01:55:43 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
propagate the initiation of work-doing down to the worker, where it *also* has to be

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/q/

Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:42 UTC (rev 9673)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:43 UTC (rev 9674)
@@ -385,7 +385,7 @@
             complete.
         @rtype: L{Deferred} firing L{dict}
         """
-        d = self.callRemote(PerformWork, table=table.model.name, workID=workID)
+        d = self.callRemote(PerformWork, table=table, workID=workID)
         self._bonusLoad += 1
         @d.addBoth
         def performed(result):
@@ -545,7 +545,7 @@
         @see: The responder for this should always be
             L{ConnectionFromController.actuallyReallyExecuteWorkHere}.
         """
-        d = self.callRemote(PerformWork, table=table.model.name, workID=workID)
+        d = self.callRemote(PerformWork, table=table, workID=workID)
         self._load += 1
         @d.addBoth
         def f(result):
@@ -563,13 +563,59 @@
     L{ConnectionFromWorker}.
     """
 
-    def __init__(self, transactionFactory, schema,
+    def __init__(self, transactionFactory, schema, whenConnected,
                  boxReceiver=None, locator=None):
         super(ConnectionFromController, self).__init__(schema,
                                                        boxReceiver, locator)
         self.transactionFactory = transactionFactory
+        self.whenConnected = whenConnected
 
 
+    def startReceivingBoxes(self, sender):
+        super(ConnectionFromController, self).startReceivingBoxes(sender)
+        self.whenConnected(self)
+
+
+    def choosePerformer(self):
+        """
+        To conform with L{WorkProposal}'s expectations, which may run in either
+        a controller (against a L{PeerConnectionPool}) or in a worker (against
+        a L{ConnectionFromController}), this is implemented to always return
+        C{self}, since C{self} is also an object that has a C{performWork}
+        method.
+        """
+        return succeed(self)
+
+
+    def performWork(self, table, workID):
+        """
+        Ask the controller to perform some work on our behalf.
+        """
+        return self.callRemote(PerformWork, table=table, workID=workID)
+
+
+    def enqueueWork(self, txn, workItemType, **kw):
+        """
+        There is some work to do.  Do it, someplace else, ideally in parallel.
+        Later, let the caller know that the work has been completed by firing a
+        L{Deferred}.
+
+        @param workItemType: The type of work item to be enqueued.
+        @type workItemType: A subtype of L{WorkItem}
+
+        @param kw: The parameters to construct a work item.
+        @type kw: keyword parameters to C{workItemType.create}, i.e.
+            C{workItemType.__init__}
+
+        @return: an object that can track the enqueuing and remote execution of
+            this work.
+        @rtype: L{WorkProposal}
+        """
+        wp = WorkProposal(self, txn, workItemType, kw)
+        wp._start()
+        return wp
+
+
     @PerformWork.responder
     @inlineCallbacks
     def actuallyReallyExecuteWorkHere(self, table, workID):
@@ -604,16 +650,22 @@
     controller.
     """
 
-    def __init__(self, transactionFactory, schema):
+    def __init__(self, transactionFactory, schema, whenConnected):
         """
         Create a L{WorkerFactory} with a transaction factory and a schema.
         """
         self.transactionFactory = transactionFactory
         self.schema = schema
+        self.whenConnected = whenConnected
 
 
     def buildProtocol(self, addr):
-        return ConnectionFromController(self.transactionFactory, self.schema)
+        """
+        Create a L{ConnectionFromController} connected to the
+        transactionFactory and store.
+        """
+        return ConnectionFromController(self.transactionFactory, self.schema,
+                                        self.whenConnected)
 
 
 
@@ -679,15 +731,15 @@
             @self.txn.postCommit
             def whenDone():
                 self._whenCommitted.callback(None)
-                @passthru(self.pool.choosePeer().addCallback)
-                def peerChosen(peer):
-                    @passthru(peer.performWork(item.table, item.workID))
+                @passthru(self.pool.choosePerformer().addCallback)
+                def performerChosen(performer):
+                    @passthru(performer.performWork(item.table, item.workID))
                     def performed(result):
                         self._whenExecuted.callback(None)
                     @performed.addErrback
                     def notPerformed(why):
                         self._whenExecuted.errback(why)
-                @peerChosen.addErrback
+                @performerChosen.addErrback
                 def notChosen(whyNot):
                     self._whenExecuted.errback(whyNot)
             @self.txn.postAbort
@@ -836,7 +888,7 @@
         self.peers.remove(peer)
 
 
-    def choosePeer(self):
+    def choosePerformer(self):
         """
         Choose a peer to distribute work to based on the current known slot
         occupancy of the other nodes.  Note that this will prefer distributing
@@ -935,7 +987,7 @@
                         yield itemType.query(
                             txn, itemType.created > self.queueProcessTimeout
                     )):
-                    peer = yield self.choosePeer()
+                    peer = yield self.choosePerformer()
                     yield peer.performWork(overdueItem.table,
                                            overdueItem.workID)
         finally:
@@ -991,7 +1043,7 @@
         if self._startingUp is not None:
             yield self._startingUp
         if self._listeningPortObject is not None:
-            return self._listeningPortObject.stopListening()
+            yield self._listeningPortObject.stopListening()
         if self._lostWorkCheckCall is not None:
             self._lostWorkCheckCall.cancel()
         if self._currentWorkDeferred is not None:
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/d9208eb1/attachment-0001.html>


More information about the calendarserver-changes mailing list