<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>[13750] twext/trunk/twext/enterprise</title>
</head>
<body>

<style type="text/css"><!--
#msg dl.meta { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; }
#msg dl.meta dt { float: left; width: 6em; font-weight: bold; }
#msg dt:after { content:':';}
#msg dl, #msg dt, #msg ul, #msg li, #header, #footer, #logmsg { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt;  }
#msg dl a { font-weight: bold}
#msg dl a:link    { color:#fc3; }
#msg dl a:active  { color:#ff0; }
#msg dl a:visited { color:#cc6; }
h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: bold; }
#msg pre { overflow: auto; background: #ffc; border: 1px #fa0 solid; padding: 6px; }
#logmsg { background: #ffc; border: 1px #fa0 solid; padding: 1em 1em 0 1em; }
#logmsg p, #logmsg pre, #logmsg blockquote { margin: 0 0 1em 0; }
#logmsg p, #logmsg li, #logmsg dt, #logmsg dd { line-height: 14pt; }
#logmsg h1, #logmsg h2, #logmsg h3, #logmsg h4, #logmsg h5, #logmsg h6 { margin: .5em 0; }
#logmsg h1:first-child, #logmsg h2:first-child, #logmsg h3:first-child, #logmsg h4:first-child, #logmsg h5:first-child, #logmsg h6:first-child { margin-top: 0; }
#logmsg ul, #logmsg ol { padding: 0; list-style-position: inside; margin: 0 0 0 1em; }
#logmsg ul { text-indent: -1em; padding-left: 1em; }#logmsg ol { text-indent: -1.5em; padding-left: 1.5em; }
#logmsg > ul, #logmsg > ol { margin: 0 0 1em 0; }
#logmsg pre { background: #eee; padding: 1em; }
#logmsg blockquote { border: 1px solid #fa0; border-left-width: 10px; padding: 1em 1em 0 1em; background: white;}
#logmsg dl { margin: 0; }
#logmsg dt { font-weight: bold; }
#logmsg dd { margin: 0; padding: 0 0 0.5em 0; }
#logmsg dd:before { content:'\00bb';}
#logmsg table { border-spacing: 0px; border-collapse: collapse; border-top: 4px solid #fa0; border-bottom: 1px solid #fa0; background: #fff; }
#logmsg table th { text-align: left; font-weight: normal; padding: 0.2em 0.5em; border-top: 1px dotted #fa0; }
#logmsg table td { text-align: right; border-top: 1px dotted #fa0; padding: 0.2em 0.5em; }
#logmsg table thead th { text-align: center; border-bottom: 1px solid #fa0; }
#logmsg table th.Corner { text-align: left; }
#logmsg hr { border: none 0; border-top: 2px dashed #fa0; height: 1px; }
#header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: 6px; }
#patch { width: 100%; }
#patch h4 {font-family: verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;}
#patch .propset h4, #patch .binary h4 {margin:0;}
#patch pre {padding:0;line-height:1.2em;margin:0;}
#patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;}
#patch .propset .diff, #patch .binary .diff  {padding:10px 0;}
#patch span {display:block;padding:0 10px;}
#patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, #patch .copfile {border:1px solid #ccc;margin:10px 0;}
#patch ins {background:#dfd;text-decoration:none;display:block;padding:0 10px;}
#patch del {background:#fdd;text-decoration:none;display:block;padding:0 10px;}
#patch .lines, .info {color:#888;background:#fff;}
--></style>
<div id="msg">
<dl class="meta">
<dt>Revision</dt> <dd><a href="http://trac.calendarserver.org//changeset/13750">13750</a></dd>
<dt>Author</dt> <dd>cdaboo@apple.com</dd>
<dt>Date</dt> <dd>2014-07-11 11:03:57 -0700 (Fri, 11 Jul 2014)</dd>
</dl>

<h3>Log Message</h3>
<pre>Back-off job polling interval when there is nothing to do.</pre>

<h3>Modified Paths</h3>
<ul>
<li><a href="#twexttrunktwextenterprisejobqueuepy">twext/trunk/twext/enterprise/jobqueue.py</a></li>
<li><a href="#twexttrunktwextenterprisetesttest_jobqueuepy">twext/trunk/twext/enterprise/test/test_jobqueue.py</a></li>
</ul>

</div>
<div id="patch">
<h3>Diff</h3>
<a id="twexttrunktwextenterprisejobqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobqueue.py (13749 => 13750)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobqueue.py        2014-07-11 15:47:56 UTC (rev 13749)
+++ twext/trunk/twext/enterprise/jobqueue.py        2014-07-11 18:03:57 UTC (rev 13750)
</span><span class="lines">@@ -23,9 +23,8 @@
</span><span class="cx"> database instance, that want to defer and parallelize work that involves
</span><span class="cx"> storing the results of computation.
</span><span class="cx"> 
</span><del>-By enqueuing with L{twisted.enterprise.queue}, you may guarantee that the work
-will I{eventually} be done, and reliably commit to doing it in the future, but
-defer it if it does not need to be done I{now}.
</del><ins>+By enqueuing with L{twisted.enterprise.jobqueue}, you may guarantee that the work
+will I{eventually} be done, and reliably commit to doing it in the future.
</ins><span class="cx"> 
</span><span class="cx"> To pick a hypothetical example, let's say that you have a store which wants to
</span><span class="cx"> issue a promotional coupon based on a customer loyalty program, in response to
</span><span class="lines">@@ -81,31 +80,48 @@
</span><span class="cx"> 
</span><span class="cx"> More details:
</span><span class="cx"> 
</span><del>-    A master process in each node (host in a multi-host setup) has a:
</del><ins>+    Terminology:
+    
+        node: a host in a multi-host setup. Each node will contain a
+            &quot;controller&quot; process and a set of &quot;worker&quot; processes.
+            Nodes communicate with each other to allow load balancing
+            of jobs across the entire cluster.
+        
+        controller: a process running in a node that is in charge of
+            managing &quot;workers&quot; as well as connections to other nodes. The
+            controller polls the job queue and dispatches outstanding jobs
+            to its &quot;workers&quot;.
+        
+        worker: a process running in a node that is responsible for
+            executing jobs sent to it by the &quot;controller&quot;. It also
+            handles enqueuing of jobs as dictated by operations it
+            is doing.
+                
+    A controller has a:
</ins><span class="cx"> 
</span><del>-    L{WorkerConnectionPool}: this maintains a list of child processes that
-        have connected to the master over AMP. It is responsible for
-        dispatching work that is to be performed locally on that node.
-        The child process is identified by an L{ConnectionFromWorker}
-        object which maintains the child AMP connection. The
-        L{ConnectionFromWorker} tracks the load on its child so that
-        work can be distributed evenly or halted if the node is too busy.
</del><ins>+    L{WorkerConnectionPool}: this maintains a list of worker processes that
+        have connected to the controller over AMP. It is responsible for
+        dispatching jobs that are to be performed locally on that node.
+        The worker process is identified by an L{ConnectionFromWorker}
+        object which maintains the AMP connection. The
+        L{ConnectionFromWorker} tracks the load on its workers so that
+        jobs can be distributed evenly or halted if the node is too busy.
</ins><span class="cx"> 
</span><span class="cx">     L{PeerConnectionPool}: this is an AMP based service that connects a node
</span><span class="cx">         to all the other nodes in the cluster. It also runs the main job
</span><del>-        queue loop to dispatch enqueued work when it becomes due. The master
</del><ins>+        queue loop to dispatch enqueued work when it becomes due. The controller
</ins><span class="cx">         maintains a list of other nodes via L{ConnectionFromPeerNode} objects,
</span><span class="cx">         which maintain the AMP connections. L{ConnectionFromPeerNode} can
</span><del>-        report its load to others, and can receive work which it must perform
-        locally (via a dispatch to a child).
</del><ins>+        report its load to others, and can receive jobs which it must perform
+        locally (via a dispatch to a worker).
</ins><span class="cx"> 
</span><del>-    A child process has:
</del><ins>+    A worker process has:
</ins><span class="cx"> 
</span><del>-    L{ConnectionFromController}: an AMP connection to the master. The master
-        will use this to dispatch work to the child. The child can also
-        use this to tell the master that work needs to be performed - that
-        can be used when the work needs to be distributed evenly to any
-        child.
</del><ins>+    L{ConnectionFromController}: an AMP connection to the controller which
+        is managed by an L{ConnectionFromWorker} object in the controller. The
+        controller will dispatch jobs to the worker using this connection. The
+        worker uses this object to enqueue jobs which the controller will pick up
+        at the appropriate time in its job queue polling.
</ins><span class="cx"> &quot;&quot;&quot;
</span><span class="cx"> 
</span><span class="cx"> from functools import wraps
</span><span class="lines">@@ -120,6 +136,7 @@
</span><span class="cx">     inlineCallbacks, returnValue, Deferred, passthru, succeed
</span><span class="cx"> )
</span><span class="cx"> from twisted.internet.endpoints import TCP4ClientEndpoint
</span><ins>+from twisted.internet.error import AlreadyCalled, AlreadyCancelled
</ins><span class="cx"> from twisted.protocols.amp import AMP, Command, Integer, String, Argument
</span><span class="cx"> from twisted.python.reflect import qual
</span><span class="cx"> from twext.python.log import Logger
</span><span class="lines">@@ -1114,6 +1131,17 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+class EnqueuedJob(Command):
+    &quot;&quot;&quot;
+    Notify the controller process that a worker enqueued some work. This is used to &quot;wake up&quot;
+    the controller if it has slowed its polling loop due to it being idle.
+    &quot;&quot;&quot;
+
+    arguments = []
+    response = []
+
+
+
</ins><span class="cx"> class ReportLoad(Command):
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     Notify another node of the total, current load for this whole node (all of
</span><span class="lines">@@ -1466,7 +1494,19 @@
</span><span class="cx">         return d
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+    @EnqueuedJob.responder
+    def enqueuedJob(self):
+        &quot;&quot;&quot;
+        A worker enqueued a job and is letting us know. We need to &quot;ping&quot; the
+        L{PeerConnectionPool} to ensure it is polling the job queue at its
+        normal &quot;fast&quot; rate, as opposed to slower idle rates. 
+        &quot;&quot;&quot;
</ins><span class="cx"> 
</span><ins>+        self.peerPool.enqueuedJob()
+        return {}
+
+
+
</ins><span class="cx"> class ConnectionFromController(AMP):
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     A L{ConnectionFromController} is the connection to a node-controller
</span><span class="lines">@@ -1535,6 +1575,7 @@
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         wp = WorkProposal(self, txn, workItemType, kw)
</span><span class="cx">         yield wp._start()
</span><ins>+        self.callRemote(EnqueuedJob)
</ins><span class="cx">         returnValue(wp)
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="lines">@@ -1690,10 +1731,18 @@
</span><span class="cx">         yield wp._start()
</span><span class="cx">         for callback in self.proposalCallbacks:
</span><span class="cx">             callback(wp)
</span><ins>+        self.enqueuedJob()
</ins><span class="cx">         returnValue(wp)
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+    def enqueuedJob(self):
+        &quot;&quot;&quot;
+        Work has been enqueued
+        &quot;&quot;&quot;
+        pass
</ins><span class="cx"> 
</span><ins>+
+
</ins><span class="cx"> class PeerConnectionPool(_BaseQueuer, MultiService, object):
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     Each node has a L{PeerConnectionPool} connecting it to all the other nodes
</span><span class="lines">@@ -1742,6 +1791,7 @@
</span><span class="cx"> 
</span><span class="cx">     queuePollInterval = 0.1             # How often to poll for new work
</span><span class="cx">     queueOverdueTimeout = 5.0 * 60.0    # How long before assigned work is possibly overdue
</span><ins>+    queuePollingBackoff = ((60.0, 60.0), (5.0, 1.0),)   # Polling backoffs
</ins><span class="cx"> 
</span><span class="cx">     overloadLevel = 95          # Percentage load level above which job queue processing stops
</span><span class="cx">     highPriorityLevel = 80      # Percentage load level above which only high priority jobs are processed
</span><span class="lines">@@ -1776,6 +1826,8 @@
</span><span class="cx">         self._lastSeenTotalNodes = 1
</span><span class="cx">         self._lastSeenNodeIndex = 1
</span><span class="cx">         self._lastMinPriority = WORK_PRIORITY_LOW
</span><ins>+        self._timeOfLastWork = time.time()
+        self._actualPollInterval = self.queuePollInterval
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     def addPeerConnection(self, peer):
</span><span class="lines">@@ -1896,6 +1948,7 @@
</span><span class="cx">                 if self._lastMinPriority != WORK_PRIORITY_HIGH + 1:
</span><span class="cx">                     log.error(&quot;workCheck: jobqueue is overloaded&quot;)
</span><span class="cx">                 self._lastMinPriority = WORK_PRIORITY_HIGH + 1
</span><ins>+                self._timeOfLastWork = time.time()
</ins><span class="cx">                 break
</span><span class="cx">             elif level &gt; self.highPriorityLevel:
</span><span class="cx">                 minPriority = WORK_PRIORITY_HIGH
</span><span class="lines">@@ -1947,6 +2000,7 @@
</span><span class="cx"> 
</span><span class="cx">                 # Always assign as a new job even when it is an orphan
</span><span class="cx">                 yield nextJob.assign(nowTime, self.queueOverdueTimeout)
</span><ins>+                self._timeOfLastWork = time.time()
</ins><span class="cx">                 loopCounter += 1
</span><span class="cx"> 
</span><span class="cx">             except Exception as e:
</span><span class="lines">@@ -1993,13 +2047,48 @@
</span><span class="cx">             self._currentWorkDeferred = None
</span><span class="cx">             if not self.running:
</span><span class="cx">                 return
</span><ins>+
+            # Check for adjustment to poll interval - if the workCheck is idle for certain
+            # periods of time we will gradually increase the poll interval to avoid consuming
+            # excessive power when there is nothing to do
+            interval = self.queuePollInterval
+            idle = time.time() - self._timeOfLastWork
+            for threshold, poll in self.queuePollingBackoff:
+                if idle &gt; threshold:
+                    interval = poll
+                    break
+            if self._actualPollInterval != interval:
+                log.debug(&quot;workCheckLoop: interval set to {interval}s&quot;, interval=interval)
+            self._actualPollInterval = interval
</ins><span class="cx">             self._workCheckCall = self.reactor.callLater(
</span><del>-                self.queuePollInterval, self._workCheckLoop
</del><ins>+                self._actualPollInterval, self._workCheckLoop
</ins><span class="cx">             )
</span><span class="cx"> 
</span><span class="cx">         self._currentWorkDeferred = scheduleNext
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+    def enqueuedJob(self):
+        &quot;&quot;&quot;
+        Reschedule the work check loop to run right now. This should be called in response to &quot;external&quot; activity that
+        might want to &quot;speed up&quot; the job queue polling because new work may have been added.
+        &quot;&quot;&quot;
+
+        # Only need to do this if the actual poll interval is greater than the default rapid value
+        if self._actualPollInterval == self.queuePollInterval:
+            return
+
+        # Bump time of last work so that we go back to the rapid (default) polling interval
+        self._timeOfLastWork = time.time()
+
+        # Reschedule the outstanding delayed call (handle exceptions by ignoring if its already running or
+        # just finished)
+        try:
+            if self._workCheckCall is not None:
+                self._workCheckCall.reset(0)
+        except (AlreadyCalled, AlreadyCancelled):
+            pass
+
+
</ins><span class="cx">     def startService(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Register ourselves with the database and establish all outgoing
</span><span class="lines">@@ -2062,9 +2151,11 @@
</span><span class="cx"> 
</span><span class="cx">         if self._workCheckCall is not None:
</span><span class="cx">             self._workCheckCall.cancel()
</span><ins>+            self._workCheckCall = None
</ins><span class="cx"> 
</span><span class="cx">         if self._currentWorkDeferred is not None:
</span><span class="cx">             self._currentWorkDeferred.cancel()
</span><ins>+            self._currentWorkDeferred = None
</ins><span class="cx"> 
</span><span class="cx">         for connector in self._connectingToPeer:
</span><span class="cx">             d = Deferred()
</span></span></pre></div>
<a id="twexttrunktwextenterprisetesttest_jobqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py (13749 => 13750)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/test/test_jobqueue.py        2014-07-11 15:47:56 UTC (rev 13749)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py        2014-07-11 18:03:57 UTC (rev 13750)
</span><span class="lines">@@ -1445,7 +1445,53 @@
</span><span class="cx">         self.assertEqual(failed[0], 1)
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+    @inlineCallbacks
+    def test_pollingBackoff(self):
+        &quot;&quot;&quot;
+        Check that an idle queue backs off its polling and goes back to rapid polling
+        when a worker enqueues a job.
+        &quot;&quot;&quot;
</ins><span class="cx"> 
</span><ins>+        # Speed up the backoff process
+        self.patch(PeerConnectionPool, &quot;queuePollingBackoff&quot;, ((1.0, 60.0),))
+
+        # Wait for backoff
+        while self.node1._actualPollInterval == self.node1.queuePollInterval:
+            d = Deferred()
+            reactor.callLater(1.0, lambda : d.callback(None))
+            yield d
+
+        self.assertEqual(self.node1._actualPollInterval, 60.0)
+
+        # TODO: this exact test should run against LocalQueuer as well.
+        def operation(txn):
+            # TODO: how does &quot;enqueue&quot; get associated with the transaction?
+            # This is not the fact with a raw t.w.enterprise transaction.
+            # Should probably do something with components.
+            return txn.enqueue(DummyWorkItem, a=3, b=4, jobID=100, workID=1,
+                               notBefore=datetime.datetime.utcnow())
+        yield inTransaction(self.store.newTransaction, operation)
+
+        # Backoff terminated
+        while self.node1._actualPollInterval != self.node1.queuePollInterval:
+            d = Deferred()
+            reactor.callLater(0.1, lambda : d.callback(None))
+            yield d
+        self.assertEqual(self.node1._actualPollInterval, self.node1.queuePollInterval)
+
+        # Wait for it to be executed.  Hopefully this does not time out :-\.
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+
+        # Wait for backoff
+        while self.node1._actualPollInterval == self.node1.queuePollInterval:
+            d = Deferred()
+            reactor.callLater(1.0, lambda : d.callback(None))
+            yield d
+
+        self.assertEqual(self.node1._actualPollInterval, 60.0)
+
+
+
</ins><span class="cx"> class DummyProposal(object):
</span><span class="cx"> 
</span><span class="cx">     def __init__(self, *ignored):
</span></span></pre>
</div>
</div>

</body>
</html>