[CalendarServer-changes] [13750] twext/trunk/twext/enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Fri Jul 11 11:03:57 PDT 2014
Revision: 13750
http://trac.calendarserver.org//changeset/13750
Author: cdaboo at apple.com
Date: 2014-07-11 11:03:57 -0700 (Fri, 11 Jul 2014)
Log Message:
-----------
Back-off job polling interval when there is nothing to do.
Modified Paths:
--------------
twext/trunk/twext/enterprise/jobqueue.py
twext/trunk/twext/enterprise/test/test_jobqueue.py
Modified: twext/trunk/twext/enterprise/jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/jobqueue.py 2014-07-11 15:47:56 UTC (rev 13749)
+++ twext/trunk/twext/enterprise/jobqueue.py 2014-07-11 18:03:57 UTC (rev 13750)
@@ -23,9 +23,8 @@
database instance, that want to defer and parallelize work that involves
storing the results of computation.
-By enqueuing with L{twisted.enterprise.queue}, you may guarantee that the work
-will I{eventually} be done, and reliably commit to doing it in the future, but
-defer it if it does not need to be done I{now}.
+By enqueuing with L{twisted.enterprise.jobqueue}, you may guarantee that the work
+will I{eventually} be done, and reliably commit to doing it in the future.
To pick a hypothetical example, let's say that you have a store which wants to
issue a promotional coupon based on a customer loyalty program, in response to
@@ -81,31 +80,48 @@
More details:
- A master process in each node (host in a multi-host setup) has a:
+ Terminology:
+
+ node: a host in a multi-host setup. Each node will contain a
+ "controller" process and a set of "worker" processes.
+ Nodes communicate with each other to allow load balancing
+ of jobs across the entire cluster.
+
+ controller: a process running in a node that is in charge of
+ managing "workers" as well as connections to other nodes. The
+ controller polls the job queue and dispatches outstanding jobs
+ to its "workers".
+
+ worker: a process running in a node that is responsible for
+ executing jobs sent to it by the "controller". It also
+ handles enqueuing of jobs as dictated by operations it
+ is doing.
+
+ A controller has a:
- L{WorkerConnectionPool}: this maintains a list of child processes that
- have connected to the master over AMP. It is responsible for
- dispatching work that is to be performed locally on that node.
- The child process is identified by an L{ConnectionFromWorker}
- object which maintains the child AMP connection. The
- L{ConnectionFromWorker} tracks the load on its child so that
- work can be distributed evenly or halted if the node is too busy.
+ L{WorkerConnectionPool}: this maintains a list of worker processes that
+ have connected to the controller over AMP. It is responsible for
+ dispatching jobs that are to be performed locally on that node.
+ The worker process is identified by an L{ConnectionFromWorker}
+ object which maintains the AMP connection. The
+ L{ConnectionFromWorker} tracks the load on its workers so that
+ jobs can be distributed evenly or halted if the node is too busy.
L{PeerConnectionPool}: this is an AMP based service that connects a node
to all the other nodes in the cluster. It also runs the main job
- queue loop to dispatch enqueued work when it becomes due. The master
+ queue loop to dispatch enqueued work when it becomes due. The controller
maintains a list of other nodes via L{ConnectionFromPeerNode} objects,
which maintain the AMP connections. L{ConnectionFromPeerNode} can
- report its load to others, and can receive work which it must perform
- locally (via a dispatch to a child).
+ report its load to others, and can receive jobs which it must perform
+ locally (via a dispatch to a worker).
- A child process has:
+ A worker process has:
- L{ConnectionFromController}: an AMP connection to the master. The master
- will use this to dispatch work to the child. The child can also
- use this to tell the master that work needs to be performed - that
- can be used when the work needs to be distributed evenly to any
- child.
+ L{ConnectionFromController}: an AMP connection to the controller which
+ is managed by an L{ConnectionFromWorker} object in the controller. The
+ controller will dispatch jobs to the worker using this connection. The
+ worker uses this object to enqueue jobs which the controller will pick up
+ at the appropriate time in its job queue polling.
"""
from functools import wraps
@@ -120,6 +136,7 @@
inlineCallbacks, returnValue, Deferred, passthru, succeed
)
from twisted.internet.endpoints import TCP4ClientEndpoint
+from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from twisted.protocols.amp import AMP, Command, Integer, String, Argument
from twisted.python.reflect import qual
from twext.python.log import Logger
@@ -1114,6 +1131,17 @@
+class EnqueuedJob(Command):
+ """
+ Notify the controller process that a worker enqueued some work. This is used to "wake up"
+ the controller if it has slowed its polling loop due to it being idle.
+ """
+
+ arguments = []
+ response = []
+
+
+
class ReportLoad(Command):
"""
Notify another node of the total, current load for this whole node (all of
@@ -1466,7 +1494,19 @@
return d
+ @EnqueuedJob.responder
+ def enqueuedJob(self):
+ """
+ A worker enqueued a job and is letting us know. We need to "ping" the
+ L{PeerConnectionPool} to ensure it is polling the job queue at its
+ normal "fast" rate, as opposed to slower idle rates.
+ """
+ self.peerPool.enqueuedJob()
+ return {}
+
+
+
class ConnectionFromController(AMP):
"""
A L{ConnectionFromController} is the connection to a node-controller
@@ -1535,6 +1575,7 @@
"""
wp = WorkProposal(self, txn, workItemType, kw)
yield wp._start()
+ self.callRemote(EnqueuedJob)
returnValue(wp)
@@ -1690,10 +1731,18 @@
yield wp._start()
for callback in self.proposalCallbacks:
callback(wp)
+ self.enqueuedJob()
returnValue(wp)
+ def enqueuedJob(self):
+ """
+ Work has been enqueued
+ """
+ pass
+
+
class PeerConnectionPool(_BaseQueuer, MultiService, object):
"""
Each node has a L{PeerConnectionPool} connecting it to all the other nodes
@@ -1742,6 +1791,7 @@
queuePollInterval = 0.1 # How often to poll for new work
queueOverdueTimeout = 5.0 * 60.0 # How long before assigned work is possibly overdue
+ queuePollingBackoff = ((60.0, 60.0), (5.0, 1.0),) # Polling backoffs
overloadLevel = 95 # Percentage load level above which job queue processing stops
highPriorityLevel = 80 # Percentage load level above which only high priority jobs are processed
@@ -1776,6 +1826,8 @@
self._lastSeenTotalNodes = 1
self._lastSeenNodeIndex = 1
self._lastMinPriority = WORK_PRIORITY_LOW
+ self._timeOfLastWork = time.time()
+ self._actualPollInterval = self.queuePollInterval
def addPeerConnection(self, peer):
@@ -1896,6 +1948,7 @@
if self._lastMinPriority != WORK_PRIORITY_HIGH + 1:
log.error("workCheck: jobqueue is overloaded")
self._lastMinPriority = WORK_PRIORITY_HIGH + 1
+ self._timeOfLastWork = time.time()
break
elif level > self.highPriorityLevel:
minPriority = WORK_PRIORITY_HIGH
@@ -1947,6 +2000,7 @@
# Always assign as a new job even when it is an orphan
yield nextJob.assign(nowTime, self.queueOverdueTimeout)
+ self._timeOfLastWork = time.time()
loopCounter += 1
except Exception as e:
@@ -1993,13 +2047,48 @@
self._currentWorkDeferred = None
if not self.running:
return
+
+ # Check for adjustment to poll interval - if the workCheck is idle for certain
+ # periods of time we will gradually increase the poll interval to avoid consuming
+ # excessive power when there is nothing to do
+ interval = self.queuePollInterval
+ idle = time.time() - self._timeOfLastWork
+ for threshold, poll in self.queuePollingBackoff:
+ if idle > threshold:
+ interval = poll
+ break
+ if self._actualPollInterval != interval:
+ log.debug("workCheckLoop: interval set to {interval}s", interval=interval)
+ self._actualPollInterval = interval
self._workCheckCall = self.reactor.callLater(
- self.queuePollInterval, self._workCheckLoop
+ self._actualPollInterval, self._workCheckLoop
)
self._currentWorkDeferred = scheduleNext
+ def enqueuedJob(self):
+ """
+ Reschedule the work check loop to run right now. This should be called in response to "external" activity that
+ might want to "speed up" the job queue polling because new work may have been added.
+ """
+
+ # Only need to do this if the actual poll interval is greater than the default rapid value
+ if self._actualPollInterval == self.queuePollInterval:
+ return
+
+ # Bump time of last work so that we go back to the rapid (default) polling interval
+ self._timeOfLastWork = time.time()
+
+ # Reschedule the outstanding delayed call (handle exceptions by ignoring if its already running or
+ # just finished)
+ try:
+ if self._workCheckCall is not None:
+ self._workCheckCall.reset(0)
+ except (AlreadyCalled, AlreadyCancelled):
+ pass
+
+
def startService(self):
"""
Register ourselves with the database and establish all outgoing
@@ -2062,9 +2151,11 @@
if self._workCheckCall is not None:
self._workCheckCall.cancel()
+ self._workCheckCall = None
if self._currentWorkDeferred is not None:
self._currentWorkDeferred.cancel()
+ self._currentWorkDeferred = None
for connector in self._connectingToPeer:
d = Deferred()
Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/test/test_jobqueue.py 2014-07-11 15:47:56 UTC (rev 13749)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py 2014-07-11 18:03:57 UTC (rev 13750)
@@ -1445,7 +1445,53 @@
self.assertEqual(failed[0], 1)
+ @inlineCallbacks
+ def test_pollingBackoff(self):
+ """
+ Check that an idle queue backs off its polling and goes back to rapid polling
+ when a worker enqueues a job.
+ """
+ # Speed up the backoff process
+ self.patch(PeerConnectionPool, "queuePollingBackoff", ((1.0, 60.0),))
+
+ # Wait for backoff
+ while self.node1._actualPollInterval == self.node1.queuePollInterval:
+ d = Deferred()
+ reactor.callLater(1.0, lambda : d.callback(None))
+ yield d
+
+ self.assertEqual(self.node1._actualPollInterval, 60.0)
+
+ # TODO: this exact test should run against LocalQueuer as well.
+ def operation(txn):
+ # TODO: how does "enqueue" get associated with the transaction?
+ # This is not the fact with a raw t.w.enterprise transaction.
+ # Should probably do something with components.
+ return txn.enqueue(DummyWorkItem, a=3, b=4, jobID=100, workID=1,
+ notBefore=datetime.datetime.utcnow())
+ yield inTransaction(self.store.newTransaction, operation)
+
+ # Backoff terminated
+ while self.node1._actualPollInterval != self.node1.queuePollInterval:
+ d = Deferred()
+ reactor.callLater(0.1, lambda : d.callback(None))
+ yield d
+ self.assertEqual(self.node1._actualPollInterval, self.node1.queuePollInterval)
+
+ # Wait for it to be executed. Hopefully this does not time out :-\.
+ yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+
+ # Wait for backoff
+ while self.node1._actualPollInterval == self.node1.queuePollInterval:
+ d = Deferred()
+ reactor.callLater(1.0, lambda : d.callback(None))
+ yield d
+
+ self.assertEqual(self.node1._actualPollInterval, 60.0)
+
+
+
class DummyProposal(object):
def __init__(self, *ignored):
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140711/a2e740c3/attachment-0001.html>
More information about the calendarserver-changes
mailing list