[CalendarServer-changes] [11810] CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twext /enterprise/queue.py

source_changes at macosforge.org source_changes at macosforge.org
Sun Oct 13 07:57:55 PDT 2013


Revision: 11810
          http://trac.calendarserver.org//changeset/11810
Author:   cdaboo at apple.com
Date:     2013-10-13 07:57:55 -0700 (Sun, 13 Oct 2013)
Log Message:
-----------
Keep track of created work items for logging purposes.

Modified Paths:
--------------
    CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twext/enterprise/queue.py

Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twext/enterprise/queue.py	2013-10-12 01:05:34 UTC (rev 11809)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twext/enterprise/queue.py	2013-10-13 14:57:55 UTC (rev 11810)
@@ -17,7 +17,7 @@
 
 """
 L{twext.enterprise.queue} is an U{eventually consistent
-<https://en.wikipedia.org/wiki/Eventual_consistency>} task-queueing system for
+<https://en.wikipedia.org/wiki/Eventual_consistency>} task-queuing system for
 use by applications with multiple front-end servers talking to a single
 database instance, that want to defer and parallelize work that involves
 storing the results of computation.
@@ -35,7 +35,7 @@
 would also be acceptable to have that happen at a later time, outside the
 critical path.
 
-Such an application might be implemented with this queueing system like so::
+Such an application might be implemented with this queuing system like so::
 
     from twext.enterprise.queue import WorkItem, queueFromTransaction
     from twext.enterprise.dal.parseschema import addSQLToSchema
@@ -115,7 +115,7 @@
     (in the worst case) pass from worker->controller->controller->worker.
     """
 
-    def performWork(table, workID):
+    def performWork(table, workID): #@NoSelf
         """
         @param table: The table where work is waiting.
         @type table: L{TableSyntax}
@@ -284,7 +284,7 @@
     the exact timing and location of the work execution may differ.
 
     L{WorkItem}s may be constrained in the ordering and timing of their
-    execution, to control concurrency and for performance reasons repsectively.
+    execution, to control concurrency and for performance reasons respectively.
 
     Although all the usual database mutual-exclusion rules apply to work
     executed in L{WorkItem.doWork}, implicit database row locking is not always
@@ -305,7 +305,7 @@
           cheap operations all have to wait for the expensive ones to complete,
           but continue to consume whatever database resources they were using.
 
-    In order to ameliorate these problems with potentiallly concurrent work
+    In order to ameliorate these problems with potentially concurrent work
     that uses the same resources, L{WorkItem} provides a database-wide mutex
     that is automatically acquired at the beginning of the transaction and
     released at the end.  To use it, simply L{align
@@ -367,10 +367,9 @@
         This method will be invoked in a worker process.
 
         This method does I{not} need to delete the row referencing it; that
-        will be taken care of by the job queueing machinery.
+        will be taken care of by the job queuing machinery.
         """
 
-
     @classmethod
     def forTable(cls, table):
         """
@@ -431,7 +430,7 @@
     necessary because we don't want to rely on DNS; if reverse DNS weren't set
     up perfectly, the listener would not be able to identify its peer, and it
     is easier to modify local configuration so that L{socket.getfqdn} returns
-    the right value than to ensure that DNS doesself.
+    the right value than to ensure that DNS does self.
     """
 
     arguments = [
@@ -460,7 +459,7 @@
     A connection to a peer node.  Symmetric; since the 'client' and the
     'server' both serve the same role, the logic is the same in every node.
 
-    @ivar localWorkerPool: the pool of local worker procesess that can process
+    @ivar localWorkerPool: the pool of local worker processes that can process
         queue work.
     @type localWorkerPool: L{WorkerConnectionPool}
 
@@ -865,9 +864,6 @@
 
 
 
-
-
-
 class WorkerFactory(Factory, object):
     """
     Factory, to be used as the client to connect from the worker to the
@@ -942,6 +938,7 @@
         self._whenProposed = Deferred()
         self._whenExecuted = Deferred()
         self._whenCommitted = Deferred()
+        self.workItem = None
 
 
     def _start(self):
@@ -952,6 +949,7 @@
         """
         @passthru(self.workItemType.create(self.txn, **self.kw).addCallback)
         def whenCreated(item):
+            self.workItem = item
             self._whenProposed.callback(self)
             @self.txn.postCommit
             def whenDone():
@@ -1023,6 +1021,8 @@
         """
         return _cloneDeferred(self._whenCommitted)
 
+
+
 class _BaseQueuer(object):
     implements(IQueuer)
 
@@ -1030,13 +1030,16 @@
         super(_BaseQueuer, self).__init__()
         self.proposalCallbacks = set()
 
+
     def callWithNewProposals(self, callback):
-        self.proposalCallbacks.add(callback);
+        self.proposalCallbacks.add(callback)
 
+
     def transferProposalCallbacks(self, newQueuer):
         newQueuer.proposalCallbacks = self.proposalCallbacks
         return newQueuer
 
+
     def enqueueWork(self, txn, workItemType, **kw):
         """
         There is some work to do.  Do it, someplace else, ideally in parallel.
@@ -1061,6 +1064,7 @@
         return wp
 
 
+
 class PeerConnectionPool(_BaseQueuer, MultiService, object):
     """
     Each node has a L{PeerConnectionPool} connecting it to all the other nodes
@@ -1277,7 +1281,6 @@
                                            overdueItem.workID)
         return inTransaction(self.transactionFactory, workCheck)
 
-
     _currentWorkDeferred = None
     _lostWorkCheckCall = None
 
@@ -1430,8 +1433,6 @@
 
 
 
-
-
 class LocalQueuer(_BaseQueuer):
     """
     When work is enqueued with this queuer, it is just executed locally.
@@ -1469,6 +1470,7 @@
         return succeed(None)
 
 
+
 class NonPerformingQueuer(_BaseQueuer):
     """
     When work is enqueued with this queuer, it is never executed locally.
@@ -1487,4 +1489,4 @@
         """
         Choose to perform the work locally.
         """
-        return NonPerformer()
\ No newline at end of file
+        return NonPerformer()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20131013/e4725fbc/attachment.html>


More information about the calendarserver-changes mailing list