[CalendarServer-changes] [11810] CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twext /enterprise/queue.py
source_changes at macosforge.org
source_changes at macosforge.org
Sun Oct 13 07:57:55 PDT 2013
Revision: 11810
http://trac.calendarserver.org//changeset/11810
Author: cdaboo at apple.com
Date: 2013-10-13 07:57:55 -0700 (Sun, 13 Oct 2013)
Log Message:
-----------
Keep track of created work items for logging purposes.
Modified Paths:
--------------
CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twext/enterprise/queue.py
Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twext/enterprise/queue.py 2013-10-12 01:05:34 UTC (rev 11809)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twext/enterprise/queue.py 2013-10-13 14:57:55 UTC (rev 11810)
@@ -17,7 +17,7 @@
"""
L{twext.enterprise.queue} is an U{eventually consistent
-<https://en.wikipedia.org/wiki/Eventual_consistency>} task-queueing system for
+<https://en.wikipedia.org/wiki/Eventual_consistency>} task-queuing system for
use by applications with multiple front-end servers talking to a single
database instance, that want to defer and parallelize work that involves
storing the results of computation.
@@ -35,7 +35,7 @@
would also be acceptable to have that happen at a later time, outside the
critical path.
-Such an application might be implemented with this queueing system like so::
+Such an application might be implemented with this queuing system like so::
from twext.enterprise.queue import WorkItem, queueFromTransaction
from twext.enterprise.dal.parseschema import addSQLToSchema
@@ -115,7 +115,7 @@
(in the worst case) pass from worker->controller->controller->worker.
"""
- def performWork(table, workID):
+ def performWork(table, workID): #@NoSelf
"""
@param table: The table where work is waiting.
@type table: L{TableSyntax}
@@ -284,7 +284,7 @@
the exact timing and location of the work execution may differ.
L{WorkItem}s may be constrained in the ordering and timing of their
- execution, to control concurrency and for performance reasons repsectively.
+ execution, to control concurrency and for performance reasons respectively.
Although all the usual database mutual-exclusion rules apply to work
executed in L{WorkItem.doWork}, implicit database row locking is not always
@@ -305,7 +305,7 @@
cheap operations all have to wait for the expensive ones to complete,
but continue to consume whatever database resources they were using.
- In order to ameliorate these problems with potentiallly concurrent work
+ In order to ameliorate these problems with potentially concurrent work
that uses the same resources, L{WorkItem} provides a database-wide mutex
that is automatically acquired at the beginning of the transaction and
released at the end. To use it, simply L{align
@@ -367,10 +367,9 @@
This method will be invoked in a worker process.
This method does I{not} need to delete the row referencing it; that
- will be taken care of by the job queueing machinery.
+ will be taken care of by the job queuing machinery.
"""
-
@classmethod
def forTable(cls, table):
"""
@@ -431,7 +430,7 @@
necessary because we don't want to rely on DNS; if reverse DNS weren't set
up perfectly, the listener would not be able to identify its peer, and it
is easier to modify local configuration so that L{socket.getfqdn} returns
- the right value than to ensure that DNS doesself.
+ the right value than to ensure that DNS does self.
"""
arguments = [
@@ -460,7 +459,7 @@
A connection to a peer node. Symmetric; since the 'client' and the
'server' both serve the same role, the logic is the same in every node.
- @ivar localWorkerPool: the pool of local worker procesess that can process
+ @ivar localWorkerPool: the pool of local worker processes that can process
queue work.
@type localWorkerPool: L{WorkerConnectionPool}
@@ -865,9 +864,6 @@
-
-
-
class WorkerFactory(Factory, object):
"""
Factory, to be used as the client to connect from the worker to the
@@ -942,6 +938,7 @@
self._whenProposed = Deferred()
self._whenExecuted = Deferred()
self._whenCommitted = Deferred()
+ self.workItem = None
def _start(self):
@@ -952,6 +949,7 @@
"""
@passthru(self.workItemType.create(self.txn, **self.kw).addCallback)
def whenCreated(item):
+ self.workItem = item
self._whenProposed.callback(self)
@self.txn.postCommit
def whenDone():
@@ -1023,6 +1021,8 @@
"""
return _cloneDeferred(self._whenCommitted)
+
+
class _BaseQueuer(object):
implements(IQueuer)
@@ -1030,13 +1030,16 @@
super(_BaseQueuer, self).__init__()
self.proposalCallbacks = set()
+
def callWithNewProposals(self, callback):
- self.proposalCallbacks.add(callback);
+ self.proposalCallbacks.add(callback)
+
def transferProposalCallbacks(self, newQueuer):
newQueuer.proposalCallbacks = self.proposalCallbacks
return newQueuer
+
def enqueueWork(self, txn, workItemType, **kw):
"""
There is some work to do. Do it, someplace else, ideally in parallel.
@@ -1061,6 +1064,7 @@
return wp
+
class PeerConnectionPool(_BaseQueuer, MultiService, object):
"""
Each node has a L{PeerConnectionPool} connecting it to all the other nodes
@@ -1277,7 +1281,6 @@
overdueItem.workID)
return inTransaction(self.transactionFactory, workCheck)
-
_currentWorkDeferred = None
_lostWorkCheckCall = None
@@ -1430,8 +1433,6 @@
-
-
class LocalQueuer(_BaseQueuer):
"""
When work is enqueued with this queuer, it is just executed locally.
@@ -1469,6 +1470,7 @@
return succeed(None)
+
class NonPerformingQueuer(_BaseQueuer):
"""
When work is enqueued with this queuer, it is never executed locally.
@@ -1487,4 +1489,4 @@
"""
Choose to perform the work locally.
"""
- return NonPerformer()
\ No newline at end of file
+ return NonPerformer()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20131013/e4725fbc/attachment.html>
More information about the calendarserver-changes
mailing list