[CalendarServer-changes] [9674] CalendarServer/branches/users/glyph/q
source_changes at macosforge.org
source_changes at macosforge.org
Sat Aug 11 01:55:43 PDT 2012
Revision: 9674
http://trac.macosforge.org/projects/calendarserver/changeset/9674
Author: glyph at apple.com
Date: 2012-08-11 01:55:43 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
propagate the initiation of work-doing down to the worker, where it *also* has to be
Modified Paths:
--------------
CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
Property Changed:
----------------
CalendarServer/branches/users/glyph/q/
Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:42 UTC (rev 9673)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:43 UTC (rev 9674)
@@ -385,7 +385,7 @@
complete.
@rtype: L{Deferred} firing L{dict}
"""
- d = self.callRemote(PerformWork, table=table.model.name, workID=workID)
+ d = self.callRemote(PerformWork, table=table, workID=workID)
self._bonusLoad += 1
@d.addBoth
def performed(result):
@@ -545,7 +545,7 @@
@see: The responder for this should always be
L{ConnectionFromController.actuallyReallyExecuteWorkHere}.
"""
- d = self.callRemote(PerformWork, table=table.model.name, workID=workID)
+ d = self.callRemote(PerformWork, table=table, workID=workID)
self._load += 1
@d.addBoth
def f(result):
@@ -563,13 +563,59 @@
L{ConnectionFromWorker}.
"""
- def __init__(self, transactionFactory, schema,
+ def __init__(self, transactionFactory, schema, whenConnected,
boxReceiver=None, locator=None):
super(ConnectionFromController, self).__init__(schema,
boxReceiver, locator)
self.transactionFactory = transactionFactory
+ self.whenConnected = whenConnected
+ def startReceivingBoxes(self, sender):
+ super(ConnectionFromController, self).startReceivingBoxes(sender)
+ self.whenConnected(self)
+
+
+ def choosePerformer(self):
+ """
+ To conform with L{WorkProposal}'s expectations, which may run in either
+ a controller (against a L{PeerConnectionPool}) or in a worker (against
+ a L{ConnectionFromController}), this is implemented to always return
+ C{self}, since C{self} is also an object that has a C{performWork}
+ method.
+ """
+ return succeed(self)
+
+
+ def performWork(self, table, workID):
+ """
+ Ask the controller to perform some work on our behalf.
+ """
+ return self.callRemote(PerformWork, table=table, workID=workID)
+
+
+ def enqueueWork(self, txn, workItemType, **kw):
+ """
+ There is some work to do. Do it, someplace else, ideally in parallel.
+ Later, let the caller know that the work has been completed by firing a
+ L{Deferred}.
+
+ @param workItemType: The type of work item to be enqueued.
+ @type workItemType: A subtype of L{WorkItem}
+
+ @param kw: The parameters to construct a work item.
+ @type kw: keyword parameters to C{workItemType.create}, i.e.
+ C{workItemType.__init__}
+
+ @return: an object that can track the enqueuing and remote execution of
+ this work.
+ @rtype: L{WorkProposal}
+ """
+ wp = WorkProposal(self, txn, workItemType, kw)
+ wp._start()
+ return wp
+
+
@PerformWork.responder
@inlineCallbacks
def actuallyReallyExecuteWorkHere(self, table, workID):
@@ -604,16 +650,22 @@
controller.
"""
- def __init__(self, transactionFactory, schema):
+ def __init__(self, transactionFactory, schema, whenConnected):
"""
Create a L{WorkerFactory} with a transaction factory and a schema.
"""
self.transactionFactory = transactionFactory
self.schema = schema
+ self.whenConnected = whenConnected
def buildProtocol(self, addr):
- return ConnectionFromController(self.transactionFactory, self.schema)
+ """
+ Create a L{ConnectionFromController} connected to the
+ transactionFactory and store.
+ """
+ return ConnectionFromController(self.transactionFactory, self.schema,
+ self.whenConnected)
@@ -679,15 +731,15 @@
@self.txn.postCommit
def whenDone():
self._whenCommitted.callback(None)
- @passthru(self.pool.choosePeer().addCallback)
- def peerChosen(peer):
- @passthru(peer.performWork(item.table, item.workID))
+ @passthru(self.pool.choosePerformer().addCallback)
+ def performerChosen(performer):
+ @passthru(performer.performWork(item.table, item.workID))
def performed(result):
self._whenExecuted.callback(None)
@performed.addErrback
def notPerformed(why):
self._whenExecuted.errback(why)
- @peerChosen.addErrback
+ @performerChosen.addErrback
def notChosen(whyNot):
self._whenExecuted.errback(whyNot)
@self.txn.postAbort
@@ -836,7 +888,7 @@
self.peers.remove(peer)
- def choosePeer(self):
+ def choosePerformer(self):
"""
Choose a peer to distribute work to based on the current known slot
occupancy of the other nodes. Note that this will prefer distributing
@@ -935,7 +987,7 @@
yield itemType.query(
txn, itemType.created > self.queueProcessTimeout
)):
- peer = yield self.choosePeer()
+ peer = yield self.choosePerformer()
yield peer.performWork(overdueItem.table,
overdueItem.workID)
finally:
@@ -991,7 +1043,7 @@
if self._startingUp is not None:
yield self._startingUp
if self._listeningPortObject is not None:
- return self._listeningPortObject.stopListening()
+ yield self._listeningPortObject.stopListening()
if self._lostWorkCheckCall is not None:
self._lostWorkCheckCall.cancel()
if self._currentWorkDeferred is not None:
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/d9208eb1/attachment-0001.html>
More information about the calendarserver-changes
mailing list