<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>[13750] twext/trunk/twext/enterprise</title>
</head>
<body>
<style type="text/css"><!--
#msg dl.meta { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; }
#msg dl.meta dt { float: left; width: 6em; font-weight: bold; }
#msg dt:after { content:':';}
#msg dl, #msg dt, #msg ul, #msg li, #header, #footer, #logmsg { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; }
#msg dl a { font-weight: bold}
#msg dl a:link { color:#fc3; }
#msg dl a:active { color:#ff0; }
#msg dl a:visited { color:#cc6; }
h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: bold; }
#msg pre { overflow: auto; background: #ffc; border: 1px #fa0 solid; padding: 6px; }
#logmsg { background: #ffc; border: 1px #fa0 solid; padding: 1em 1em 0 1em; }
#logmsg p, #logmsg pre, #logmsg blockquote { margin: 0 0 1em 0; }
#logmsg p, #logmsg li, #logmsg dt, #logmsg dd { line-height: 14pt; }
#logmsg h1, #logmsg h2, #logmsg h3, #logmsg h4, #logmsg h5, #logmsg h6 { margin: .5em 0; }
#logmsg h1:first-child, #logmsg h2:first-child, #logmsg h3:first-child, #logmsg h4:first-child, #logmsg h5:first-child, #logmsg h6:first-child { margin-top: 0; }
#logmsg ul, #logmsg ol { padding: 0; list-style-position: inside; margin: 0 0 0 1em; }
#logmsg ul { text-indent: -1em; padding-left: 1em; }#logmsg ol { text-indent: -1.5em; padding-left: 1.5em; }
#logmsg > ul, #logmsg > ol { margin: 0 0 1em 0; }
#logmsg pre { background: #eee; padding: 1em; }
#logmsg blockquote { border: 1px solid #fa0; border-left-width: 10px; padding: 1em 1em 0 1em; background: white;}
#logmsg dl { margin: 0; }
#logmsg dt { font-weight: bold; }
#logmsg dd { margin: 0; padding: 0 0 0.5em 0; }
#logmsg dd:before { content:'\00bb';}
#logmsg table { border-spacing: 0px; border-collapse: collapse; border-top: 4px solid #fa0; border-bottom: 1px solid #fa0; background: #fff; }
#logmsg table th { text-align: left; font-weight: normal; padding: 0.2em 0.5em; border-top: 1px dotted #fa0; }
#logmsg table td { text-align: right; border-top: 1px dotted #fa0; padding: 0.2em 0.5em; }
#logmsg table thead th { text-align: center; border-bottom: 1px solid #fa0; }
#logmsg table th.Corner { text-align: left; }
#logmsg hr { border: none 0; border-top: 2px dashed #fa0; height: 1px; }
#header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: 6px; }
#patch { width: 100%; }
#patch h4 {font-family: verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;}
#patch .propset h4, #patch .binary h4 {margin:0;}
#patch pre {padding:0;line-height:1.2em;margin:0;}
#patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;}
#patch .propset .diff, #patch .binary .diff {padding:10px 0;}
#patch span {display:block;padding:0 10px;}
#patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, #patch .copfile {border:1px solid #ccc;margin:10px 0;}
#patch ins {background:#dfd;text-decoration:none;display:block;padding:0 10px;}
#patch del {background:#fdd;text-decoration:none;display:block;padding:0 10px;}
#patch .lines, .info {color:#888;background:#fff;}
--></style>
<div id="msg">
<dl class="meta">
<dt>Revision</dt> <dd><a href="http://trac.calendarserver.org//changeset/13750">13750</a></dd>
<dt>Author</dt> <dd>cdaboo@apple.com</dd>
<dt>Date</dt> <dd>2014-07-11 11:03:57 -0700 (Fri, 11 Jul 2014)</dd>
</dl>
<h3>Log Message</h3>
<pre>Back-off job polling interval when there is nothing to do.</pre>
<h3>Modified Paths</h3>
<ul>
<li><a href="#twexttrunktwextenterprisejobqueuepy">twext/trunk/twext/enterprise/jobqueue.py</a></li>
<li><a href="#twexttrunktwextenterprisetesttest_jobqueuepy">twext/trunk/twext/enterprise/test/test_jobqueue.py</a></li>
</ul>
</div>
<div id="patch">
<h3>Diff</h3>
<a id="twexttrunktwextenterprisejobqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobqueue.py (13749 => 13750)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobqueue.py        2014-07-11 15:47:56 UTC (rev 13749)
+++ twext/trunk/twext/enterprise/jobqueue.py        2014-07-11 18:03:57 UTC (rev 13750)
</span><span class="lines">@@ -23,9 +23,8 @@
</span><span class="cx"> database instance, that want to defer and parallelize work that involves
</span><span class="cx"> storing the results of computation.
</span><span class="cx">
</span><del>-By enqueuing with L{twisted.enterprise.queue}, you may guarantee that the work
-will I{eventually} be done, and reliably commit to doing it in the future, but
-defer it if it does not need to be done I{now}.
</del><ins>+By enqueuing with L{twisted.enterprise.jobqueue}, you may guarantee that the work
+will I{eventually} be done, and reliably commit to doing it in the future.
</ins><span class="cx">
</span><span class="cx"> To pick a hypothetical example, let's say that you have a store which wants to
</span><span class="cx"> issue a promotional coupon based on a customer loyalty program, in response to
</span><span class="lines">@@ -81,31 +80,48 @@
</span><span class="cx">
</span><span class="cx"> More details:
</span><span class="cx">
</span><del>- A master process in each node (host in a multi-host setup) has a:
</del><ins>+ Terminology:
+
+ node: a host in a multi-host setup. Each node will contain a
+ "controller" process and a set of "worker" processes.
+ Nodes communicate with each other to allow load balancing
+ of jobs across the entire cluster.
+
+ controller: a process running in a node that is in charge of
+ managing "workers" as well as connections to other nodes. The
+ controller polls the job queue and dispatches outstanding jobs
+ to its "workers".
+
+ worker: a process running in a node that is responsible for
+ executing jobs sent to it by the "controller". It also
+ handles enqueuing of jobs as dictated by operations it
+ is doing.
+
+ A controller has a:
</ins><span class="cx">
</span><del>- L{WorkerConnectionPool}: this maintains a list of child processes that
- have connected to the master over AMP. It is responsible for
- dispatching work that is to be performed locally on that node.
- The child process is identified by an L{ConnectionFromWorker}
- object which maintains the child AMP connection. The
- L{ConnectionFromWorker} tracks the load on its child so that
- work can be distributed evenly or halted if the node is too busy.
</del><ins>+ L{WorkerConnectionPool}: this maintains a list of worker processes that
+ have connected to the controller over AMP. It is responsible for
+ dispatching jobs that are to be performed locally on that node.
+ The worker process is identified by an L{ConnectionFromWorker}
+ object which maintains the AMP connection. The
+ L{ConnectionFromWorker} tracks the load on its workers so that
+ jobs can be distributed evenly or halted if the node is too busy.
</ins><span class="cx">
</span><span class="cx"> L{PeerConnectionPool}: this is an AMP based service that connects a node
</span><span class="cx"> to all the other nodes in the cluster. It also runs the main job
</span><del>- queue loop to dispatch enqueued work when it becomes due. The master
</del><ins>+ queue loop to dispatch enqueued work when it becomes due. The controller
</ins><span class="cx"> maintains a list of other nodes via L{ConnectionFromPeerNode} objects,
</span><span class="cx"> which maintain the AMP connections. L{ConnectionFromPeerNode} can
</span><del>- report its load to others, and can receive work which it must perform
- locally (via a dispatch to a child).
</del><ins>+ report its load to others, and can receive jobs which it must perform
+ locally (via a dispatch to a worker).
</ins><span class="cx">
</span><del>- A child process has:
</del><ins>+ A worker process has:
</ins><span class="cx">
</span><del>- L{ConnectionFromController}: an AMP connection to the master. The master
- will use this to dispatch work to the child. The child can also
- use this to tell the master that work needs to be performed - that
- can be used when the work needs to be distributed evenly to any
- child.
</del><ins>+ L{ConnectionFromController}: an AMP connection to the controller which
+ is managed by an L{ConnectionFromWorker} object in the controller. The
+ controller will dispatch jobs to the worker using this connection. The
+ worker uses this object to enqueue jobs which the controller will pick up
+ at the appropriate time in its job queue polling.
</ins><span class="cx"> """
</span><span class="cx">
</span><span class="cx"> from functools import wraps
</span><span class="lines">@@ -120,6 +136,7 @@
</span><span class="cx"> inlineCallbacks, returnValue, Deferred, passthru, succeed
</span><span class="cx"> )
</span><span class="cx"> from twisted.internet.endpoints import TCP4ClientEndpoint
</span><ins>+from twisted.internet.error import AlreadyCalled, AlreadyCancelled
</ins><span class="cx"> from twisted.protocols.amp import AMP, Command, Integer, String, Argument
</span><span class="cx"> from twisted.python.reflect import qual
</span><span class="cx"> from twext.python.log import Logger
</span><span class="lines">@@ -1114,6 +1131,17 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx">
</span><ins>+class EnqueuedJob(Command):
+ """
+ Notify the controller process that a worker enqueued some work. This is used to "wake up"
+ the controller if it has slowed its polling loop due to it being idle.
+ """
+
+ arguments = []
+ response = []
+
+
+
</ins><span class="cx"> class ReportLoad(Command):
</span><span class="cx"> """
</span><span class="cx"> Notify another node of the total, current load for this whole node (all of
</span><span class="lines">@@ -1466,7 +1494,19 @@
</span><span class="cx"> return d
</span><span class="cx">
</span><span class="cx">
</span><ins>+ @EnqueuedJob.responder
+ def enqueuedJob(self):
+ """
+ A worker enqueued a job and is letting us know. We need to "ping" the
+ L{PeerConnectionPool} to ensure it is polling the job queue at its
+ normal "fast" rate, as opposed to slower idle rates.
+ """
</ins><span class="cx">
</span><ins>+ self.peerPool.enqueuedJob()
+ return {}
+
+
+
</ins><span class="cx"> class ConnectionFromController(AMP):
</span><span class="cx"> """
</span><span class="cx"> A L{ConnectionFromController} is the connection to a node-controller
</span><span class="lines">@@ -1535,6 +1575,7 @@
</span><span class="cx"> """
</span><span class="cx"> wp = WorkProposal(self, txn, workItemType, kw)
</span><span class="cx"> yield wp._start()
</span><ins>+ self.callRemote(EnqueuedJob)
</ins><span class="cx"> returnValue(wp)
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -1690,10 +1731,18 @@
</span><span class="cx"> yield wp._start()
</span><span class="cx"> for callback in self.proposalCallbacks:
</span><span class="cx"> callback(wp)
</span><ins>+ self.enqueuedJob()
</ins><span class="cx"> returnValue(wp)
</span><span class="cx">
</span><span class="cx">
</span><ins>+ def enqueuedJob(self):
+ """
+ Work has been enqueued
+ """
+ pass
</ins><span class="cx">
</span><ins>+
+
</ins><span class="cx"> class PeerConnectionPool(_BaseQueuer, MultiService, object):
</span><span class="cx"> """
</span><span class="cx"> Each node has a L{PeerConnectionPool} connecting it to all the other nodes
</span><span class="lines">@@ -1742,6 +1791,7 @@
</span><span class="cx">
</span><span class="cx"> queuePollInterval = 0.1 # How often to poll for new work
</span><span class="cx"> queueOverdueTimeout = 5.0 * 60.0 # How long before assigned work is possibly overdue
</span><ins>+ queuePollingBackoff = ((60.0, 60.0), (5.0, 1.0),) # Polling backoffs
</ins><span class="cx">
</span><span class="cx"> overloadLevel = 95 # Percentage load level above which job queue processing stops
</span><span class="cx"> highPriorityLevel = 80 # Percentage load level above which only high priority jobs are processed
</span><span class="lines">@@ -1776,6 +1826,8 @@
</span><span class="cx"> self._lastSeenTotalNodes = 1
</span><span class="cx"> self._lastSeenNodeIndex = 1
</span><span class="cx"> self._lastMinPriority = WORK_PRIORITY_LOW
</span><ins>+ self._timeOfLastWork = time.time()
+ self._actualPollInterval = self.queuePollInterval
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> def addPeerConnection(self, peer):
</span><span class="lines">@@ -1896,6 +1948,7 @@
</span><span class="cx"> if self._lastMinPriority != WORK_PRIORITY_HIGH + 1:
</span><span class="cx"> log.error("workCheck: jobqueue is overloaded")
</span><span class="cx"> self._lastMinPriority = WORK_PRIORITY_HIGH + 1
</span><ins>+ self._timeOfLastWork = time.time()
</ins><span class="cx"> break
</span><span class="cx"> elif level > self.highPriorityLevel:
</span><span class="cx"> minPriority = WORK_PRIORITY_HIGH
</span><span class="lines">@@ -1947,6 +2000,7 @@
</span><span class="cx">
</span><span class="cx"> # Always assign as a new job even when it is an orphan
</span><span class="cx"> yield nextJob.assign(nowTime, self.queueOverdueTimeout)
</span><ins>+ self._timeOfLastWork = time.time()
</ins><span class="cx"> loopCounter += 1
</span><span class="cx">
</span><span class="cx"> except Exception as e:
</span><span class="lines">@@ -1993,13 +2047,48 @@
</span><span class="cx"> self._currentWorkDeferred = None
</span><span class="cx"> if not self.running:
</span><span class="cx"> return
</span><ins>+
+ # Check for adjustment to poll interval - if the workCheck is idle for certain
+ # periods of time we will gradually increase the poll interval to avoid consuming
+ # excessive power when there is nothing to do
+ interval = self.queuePollInterval
+ idle = time.time() - self._timeOfLastWork
+ for threshold, poll in self.queuePollingBackoff:
+ if idle > threshold:
+ interval = poll
+ break
+ if self._actualPollInterval != interval:
+ log.debug("workCheckLoop: interval set to {interval}s", interval=interval)
+ self._actualPollInterval = interval
</ins><span class="cx"> self._workCheckCall = self.reactor.callLater(
</span><del>- self.queuePollInterval, self._workCheckLoop
</del><ins>+ self._actualPollInterval, self._workCheckLoop
</ins><span class="cx"> )
</span><span class="cx">
</span><span class="cx"> self._currentWorkDeferred = scheduleNext
</span><span class="cx">
</span><span class="cx">
</span><ins>+ def enqueuedJob(self):
+ """
+ Reschedule the work check loop to run right now. This should be called in response to "external" activity that
+ might want to "speed up" the job queue polling because new work may have been added.
+ """
+
+ # Only need to do this if the actual poll interval is greater than the default rapid value
+ if self._actualPollInterval == self.queuePollInterval:
+ return
+
+ # Bump time of last work so that we go back to the rapid (default) polling interval
+ self._timeOfLastWork = time.time()
+
+ # Reschedule the outstanding delayed call (handle exceptions by ignoring if its already running or
+ # just finished)
+ try:
+ if self._workCheckCall is not None:
+ self._workCheckCall.reset(0)
+ except (AlreadyCalled, AlreadyCancelled):
+ pass
+
+
</ins><span class="cx"> def startService(self):
</span><span class="cx"> """
</span><span class="cx"> Register ourselves with the database and establish all outgoing
</span><span class="lines">@@ -2062,9 +2151,11 @@
</span><span class="cx">
</span><span class="cx"> if self._workCheckCall is not None:
</span><span class="cx"> self._workCheckCall.cancel()
</span><ins>+ self._workCheckCall = None
</ins><span class="cx">
</span><span class="cx"> if self._currentWorkDeferred is not None:
</span><span class="cx"> self._currentWorkDeferred.cancel()
</span><ins>+ self._currentWorkDeferred = None
</ins><span class="cx">
</span><span class="cx"> for connector in self._connectingToPeer:
</span><span class="cx"> d = Deferred()
</span></span></pre></div>
<a id="twexttrunktwextenterprisetesttest_jobqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py (13749 => 13750)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/test/test_jobqueue.py        2014-07-11 15:47:56 UTC (rev 13749)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py        2014-07-11 18:03:57 UTC (rev 13750)
</span><span class="lines">@@ -1445,7 +1445,53 @@
</span><span class="cx"> self.assertEqual(failed[0], 1)
</span><span class="cx">
</span><span class="cx">
</span><ins>+ @inlineCallbacks
+ def test_pollingBackoff(self):
+ """
+ Check that an idle queue backs off its polling and goes back to rapid polling
+ when a worker enqueues a job.
+ """
</ins><span class="cx">
</span><ins>+ # Speed up the backoff process
+ self.patch(PeerConnectionPool, "queuePollingBackoff", ((1.0, 60.0),))
+
+ # Wait for backoff
+ while self.node1._actualPollInterval == self.node1.queuePollInterval:
+ d = Deferred()
+ reactor.callLater(1.0, lambda : d.callback(None))
+ yield d
+
+ self.assertEqual(self.node1._actualPollInterval, 60.0)
+
+ # TODO: this exact test should run against LocalQueuer as well.
+ def operation(txn):
+ # TODO: how does "enqueue" get associated with the transaction?
+ # This is not the fact with a raw t.w.enterprise transaction.
+ # Should probably do something with components.
+ return txn.enqueue(DummyWorkItem, a=3, b=4, jobID=100, workID=1,
+ notBefore=datetime.datetime.utcnow())
+ yield inTransaction(self.store.newTransaction, operation)
+
+ # Backoff terminated
+ while self.node1._actualPollInterval != self.node1.queuePollInterval:
+ d = Deferred()
+ reactor.callLater(0.1, lambda : d.callback(None))
+ yield d
+ self.assertEqual(self.node1._actualPollInterval, self.node1.queuePollInterval)
+
+ # Wait for it to be executed. Hopefully this does not time out :-\.
+ yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+
+ # Wait for backoff
+ while self.node1._actualPollInterval == self.node1.queuePollInterval:
+ d = Deferred()
+ reactor.callLater(1.0, lambda : d.callback(None))
+ yield d
+
+ self.assertEqual(self.node1._actualPollInterval, 60.0)
+
+
+
</ins><span class="cx"> class DummyProposal(object):
</span><span class="cx">
</span><span class="cx"> def __init__(self, *ignored):
</span></span></pre>
</div>
</div>
</body>
</html>