[CalendarServer-changes] [13750] twext/trunk/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Fri Jul 11 11:03:57 PDT 2014


Revision: 13750
          http://trac.calendarserver.org//changeset/13750
Author:   cdaboo at apple.com
Date:     2014-07-11 11:03:57 -0700 (Fri, 11 Jul 2014)
Log Message:
-----------
Back-off job polling interval when there is nothing to do.

Modified Paths:
--------------
    twext/trunk/twext/enterprise/jobqueue.py
    twext/trunk/twext/enterprise/test/test_jobqueue.py

Modified: twext/trunk/twext/enterprise/jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/jobqueue.py	2014-07-11 15:47:56 UTC (rev 13749)
+++ twext/trunk/twext/enterprise/jobqueue.py	2014-07-11 18:03:57 UTC (rev 13750)
@@ -23,9 +23,8 @@
 database instance, that want to defer and parallelize work that involves
 storing the results of computation.
 
-By enqueuing with L{twisted.enterprise.queue}, you may guarantee that the work
-will I{eventually} be done, and reliably commit to doing it in the future, but
-defer it if it does not need to be done I{now}.
+By enqueuing with L{twisted.enterprise.jobqueue}, you may guarantee that the work
+will I{eventually} be done, and reliably commit to doing it in the future.
 
 To pick a hypothetical example, let's say that you have a store which wants to
 issue a promotional coupon based on a customer loyalty program, in response to
@@ -81,31 +80,48 @@
 
 More details:
 
-    A master process in each node (host in a multi-host setup) has a:
+    Terminology:
+    
+        node: a host in a multi-host setup. Each node will contain a
+            "controller" process and a set of "worker" processes.
+            Nodes communicate with each other to allow load balancing
+            of jobs across the entire cluster.
+        
+        controller: a process running in a node that is in charge of
+            managing "workers" as well as connections to other nodes. The
+            controller polls the job queue and dispatches outstanding jobs
+            to its "workers".
+        
+        worker: a process running in a node that is responsible for
+            executing jobs sent to it by the "controller". It also
+            handles enqueuing of jobs as dictated by operations it
+            is doing.
+                
+    A controller has a:
 
-    L{WorkerConnectionPool}: this maintains a list of child processes that
-        have connected to the master over AMP. It is responsible for
-        dispatching work that is to be performed locally on that node.
-        The child process is identified by an L{ConnectionFromWorker}
-        object which maintains the child AMP connection. The
-        L{ConnectionFromWorker} tracks the load on its child so that
-        work can be distributed evenly or halted if the node is too busy.
+    L{WorkerConnectionPool}: this maintains a list of worker processes that
+        have connected to the controller over AMP. It is responsible for
+        dispatching jobs that are to be performed locally on that node.
+        The worker process is identified by an L{ConnectionFromWorker}
+        object which maintains the AMP connection. The
+        L{ConnectionFromWorker} tracks the load on its workers so that
+        jobs can be distributed evenly or halted if the node is too busy.
 
     L{PeerConnectionPool}: this is an AMP based service that connects a node
         to all the other nodes in the cluster. It also runs the main job
-        queue loop to dispatch enqueued work when it becomes due. The master
+        queue loop to dispatch enqueued work when it becomes due. The controller
         maintains a list of other nodes via L{ConnectionFromPeerNode} objects,
         which maintain the AMP connections. L{ConnectionFromPeerNode} can
-        report its load to others, and can receive work which it must perform
-        locally (via a dispatch to a child).
+        report its load to others, and can receive jobs which it must perform
+        locally (via a dispatch to a worker).
 
-    A child process has:
+    A worker process has:
 
-    L{ConnectionFromController}: an AMP connection to the master. The master
-        will use this to dispatch work to the child. The child can also
-        use this to tell the master that work needs to be performed - that
-        can be used when the work needs to be distributed evenly to any
-        child.
+    L{ConnectionFromController}: an AMP connection to the controller which
+        is managed by an L{ConnectionFromWorker} object in the controller. The
+        controller will dispatch jobs to the worker using this connection. The
+        worker uses this object to enqueue jobs which the controller will pick up
+        at the appropriate time in its job queue polling.
 """
 
 from functools import wraps
@@ -120,6 +136,7 @@
     inlineCallbacks, returnValue, Deferred, passthru, succeed
 )
 from twisted.internet.endpoints import TCP4ClientEndpoint
+from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 from twisted.protocols.amp import AMP, Command, Integer, String, Argument
 from twisted.python.reflect import qual
 from twext.python.log import Logger
@@ -1114,6 +1131,17 @@
 
 
 
+class EnqueuedJob(Command):
+    """
+    Notify the controller process that a worker enqueued some work. This is used to "wake up"
+    the controller if it has slowed its polling loop due to it being idle.
+    """
+
+    arguments = []
+    response = []
+
+
+
 class ReportLoad(Command):
     """
     Notify another node of the total, current load for this whole node (all of
@@ -1466,7 +1494,19 @@
         return d
 
 
+    @EnqueuedJob.responder
+    def enqueuedJob(self):
+        """
+        A worker enqueued a job and is letting us know. We need to "ping" the
+        L{PeerConnectionPool} to ensure it is polling the job queue at its
+        normal "fast" rate, as opposed to slower idle rates. 
+        """
 
+        self.peerPool.enqueuedJob()
+        return {}
+
+
+
 class ConnectionFromController(AMP):
     """
     A L{ConnectionFromController} is the connection to a node-controller
@@ -1535,6 +1575,7 @@
         """
         wp = WorkProposal(self, txn, workItemType, kw)
         yield wp._start()
+        self.callRemote(EnqueuedJob)
         returnValue(wp)
 
 
@@ -1690,10 +1731,18 @@
         yield wp._start()
         for callback in self.proposalCallbacks:
             callback(wp)
+        self.enqueuedJob()
         returnValue(wp)
 
 
+    def enqueuedJob(self):
+        """
+        Work has been enqueued
+        """
+        pass
 
+
+
 class PeerConnectionPool(_BaseQueuer, MultiService, object):
     """
     Each node has a L{PeerConnectionPool} connecting it to all the other nodes
@@ -1742,6 +1791,7 @@
 
     queuePollInterval = 0.1             # How often to poll for new work
     queueOverdueTimeout = 5.0 * 60.0    # How long before assigned work is possibly overdue
+    queuePollingBackoff = ((60.0, 60.0), (5.0, 1.0),)   # Polling backoffs
 
     overloadLevel = 95          # Percentage load level above which job queue processing stops
     highPriorityLevel = 80      # Percentage load level above which only high priority jobs are processed
@@ -1776,6 +1826,8 @@
         self._lastSeenTotalNodes = 1
         self._lastSeenNodeIndex = 1
         self._lastMinPriority = WORK_PRIORITY_LOW
+        self._timeOfLastWork = time.time()
+        self._actualPollInterval = self.queuePollInterval
 
 
     def addPeerConnection(self, peer):
@@ -1896,6 +1948,7 @@
                 if self._lastMinPriority != WORK_PRIORITY_HIGH + 1:
                     log.error("workCheck: jobqueue is overloaded")
                 self._lastMinPriority = WORK_PRIORITY_HIGH + 1
+                self._timeOfLastWork = time.time()
                 break
             elif level > self.highPriorityLevel:
                 minPriority = WORK_PRIORITY_HIGH
@@ -1947,6 +2000,7 @@
 
                 # Always assign as a new job even when it is an orphan
                 yield nextJob.assign(nowTime, self.queueOverdueTimeout)
+                self._timeOfLastWork = time.time()
                 loopCounter += 1
 
             except Exception as e:
@@ -1993,13 +2047,48 @@
             self._currentWorkDeferred = None
             if not self.running:
                 return
+
+            # Check for adjustment to poll interval - if the workCheck is idle for certain
+            # periods of time we will gradually increase the poll interval to avoid consuming
+            # excessive power when there is nothing to do
+            interval = self.queuePollInterval
+            idle = time.time() - self._timeOfLastWork
+            for threshold, poll in self.queuePollingBackoff:
+                if idle > threshold:
+                    interval = poll
+                    break
+            if self._actualPollInterval != interval:
+                log.debug("workCheckLoop: interval set to {interval}s", interval=interval)
+            self._actualPollInterval = interval
             self._workCheckCall = self.reactor.callLater(
-                self.queuePollInterval, self._workCheckLoop
+                self._actualPollInterval, self._workCheckLoop
             )
 
         self._currentWorkDeferred = scheduleNext
 
 
+    def enqueuedJob(self):
+        """
+        Reschedule the work check loop to run right now. This should be called in response to "external" activity that
+        might want to "speed up" the job queue polling because new work may have been added.
+        """
+
+        # Only need to do this if the actual poll interval is greater than the default rapid value
+        if self._actualPollInterval == self.queuePollInterval:
+            return
+
+        # Bump time of last work so that we go back to the rapid (default) polling interval
+        self._timeOfLastWork = time.time()
+
+        # Reschedule the outstanding delayed call (handle exceptions by ignoring if its already running or
+        # just finished)
+        try:
+            if self._workCheckCall is not None:
+                self._workCheckCall.reset(0)
+        except (AlreadyCalled, AlreadyCancelled):
+            pass
+
+
     def startService(self):
         """
         Register ourselves with the database and establish all outgoing
@@ -2062,9 +2151,11 @@
 
         if self._workCheckCall is not None:
             self._workCheckCall.cancel()
+            self._workCheckCall = None
 
         if self._currentWorkDeferred is not None:
             self._currentWorkDeferred.cancel()
+            self._currentWorkDeferred = None
 
         for connector in self._connectingToPeer:
             d = Deferred()

Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/test/test_jobqueue.py	2014-07-11 15:47:56 UTC (rev 13749)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py	2014-07-11 18:03:57 UTC (rev 13750)
@@ -1445,7 +1445,53 @@
         self.assertEqual(failed[0], 1)
 
 
+    @inlineCallbacks
+    def test_pollingBackoff(self):
+        """
+        Check that an idle queue backs off its polling and goes back to rapid polling
+        when a worker enqueues a job.
+        """
 
+        # Speed up the backoff process
+        self.patch(PeerConnectionPool, "queuePollingBackoff", ((1.0, 60.0),))
+
+        # Wait for backoff
+        while self.node1._actualPollInterval == self.node1.queuePollInterval:
+            d = Deferred()
+            reactor.callLater(1.0, lambda : d.callback(None))
+            yield d
+
+        self.assertEqual(self.node1._actualPollInterval, 60.0)
+
+        # TODO: this exact test should run against LocalQueuer as well.
+        def operation(txn):
+            # TODO: how does "enqueue" get associated with the transaction?
+            # This is not the fact with a raw t.w.enterprise transaction.
+            # Should probably do something with components.
+            return txn.enqueue(DummyWorkItem, a=3, b=4, jobID=100, workID=1,
+                               notBefore=datetime.datetime.utcnow())
+        yield inTransaction(self.store.newTransaction, operation)
+
+        # Backoff terminated
+        while self.node1._actualPollInterval != self.node1.queuePollInterval:
+            d = Deferred()
+            reactor.callLater(0.1, lambda : d.callback(None))
+            yield d
+        self.assertEqual(self.node1._actualPollInterval, self.node1.queuePollInterval)
+
+        # Wait for it to be executed.  Hopefully this does not time out :-\.
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+
+        # Wait for backoff
+        while self.node1._actualPollInterval == self.node1.queuePollInterval:
+            d = Deferred()
+            reactor.callLater(1.0, lambda : d.callback(None))
+            yield d
+
+        self.assertEqual(self.node1._actualPollInterval, 60.0)
+
+
+
 class DummyProposal(object):
 
     def __init__(self, *ignored):
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140711/a2e740c3/attachment-0001.html>


More information about the calendarserver-changes mailing list