<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>[13483] twext/trunk/twext/enterprise</title>
</head>
<body>
<style type="text/css"><!--
#msg dl.meta { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; }
#msg dl.meta dt { float: left; width: 6em; font-weight: bold; }
#msg dt:after { content:':';}
#msg dl, #msg dt, #msg ul, #msg li, #header, #footer, #logmsg { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; }
#msg dl a { font-weight: bold}
#msg dl a:link { color:#fc3; }
#msg dl a:active { color:#ff0; }
#msg dl a:visited { color:#cc6; }
h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: bold; }
#msg pre { overflow: auto; background: #ffc; border: 1px #fa0 solid; padding: 6px; }
#logmsg { background: #ffc; border: 1px #fa0 solid; padding: 1em 1em 0 1em; }
#logmsg p, #logmsg pre, #logmsg blockquote { margin: 0 0 1em 0; }
#logmsg p, #logmsg li, #logmsg dt, #logmsg dd { line-height: 14pt; }
#logmsg h1, #logmsg h2, #logmsg h3, #logmsg h4, #logmsg h5, #logmsg h6 { margin: .5em 0; }
#logmsg h1:first-child, #logmsg h2:first-child, #logmsg h3:first-child, #logmsg h4:first-child, #logmsg h5:first-child, #logmsg h6:first-child { margin-top: 0; }
#logmsg ul, #logmsg ol { padding: 0; list-style-position: inside; margin: 0 0 0 1em; }
#logmsg ul { text-indent: -1em; padding-left: 1em; }#logmsg ol { text-indent: -1.5em; padding-left: 1.5em; }
#logmsg > ul, #logmsg > ol { margin: 0 0 1em 0; }
#logmsg pre { background: #eee; padding: 1em; }
#logmsg blockquote { border: 1px solid #fa0; border-left-width: 10px; padding: 1em 1em 0 1em; background: white;}
#logmsg dl { margin: 0; }
#logmsg dt { font-weight: bold; }
#logmsg dd { margin: 0; padding: 0 0 0.5em 0; }
#logmsg dd:before { content:'\00bb';}
#logmsg table { border-spacing: 0px; border-collapse: collapse; border-top: 4px solid #fa0; border-bottom: 1px solid #fa0; background: #fff; }
#logmsg table th { text-align: left; font-weight: normal; padding: 0.2em 0.5em; border-top: 1px dotted #fa0; }
#logmsg table td { text-align: right; border-top: 1px dotted #fa0; padding: 0.2em 0.5em; }
#logmsg table thead th { text-align: center; border-bottom: 1px solid #fa0; }
#logmsg table th.Corner { text-align: left; }
#logmsg hr { border: none 0; border-top: 2px dashed #fa0; height: 1px; }
#header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: 6px; }
#patch { width: 100%; }
#patch h4 {font-family: verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;}
#patch .propset h4, #patch .binary h4 {margin:0;}
#patch pre {padding:0;line-height:1.2em;margin:0;}
#patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;}
#patch .propset .diff, #patch .binary .diff {padding:10px 0;}
#patch span {display:block;padding:0 10px;}
#patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, #patch .copfile {border:1px solid #ccc;margin:10px 0;}
#patch ins {background:#dfd;text-decoration:none;display:block;padding:0 10px;}
#patch del {background:#fdd;text-decoration:none;display:block;padding:0 10px;}
#patch .lines, .info {color:#888;background:#fff;}
--></style>
<div id="msg">
<dl class="meta">
<dt>Revision</dt> <dd><a href="http://trac.calendarserver.org//changeset/13483">13483</a></dd>
<dt>Author</dt> <dd>cdaboo@apple.com</dd>
<dt>Date</dt> <dd>2014-05-15 18:50:56 -0700 (Thu, 15 May 2014)</dd>
</dl>
<h3>Log Message</h3>
<pre>OPtimize work item lookup. Tweak logging. Add additional tracking for dashboard.</pre>
<h3>Modified Paths</h3>
<ul>
<li><a href="#twexttrunktwextenterprisejobqueuepy">twext/trunk/twext/enterprise/jobqueue.py</a></li>
<li><a href="#twexttrunktwextenterprisetesttest_jobqueuepy">twext/trunk/twext/enterprise/test/test_jobqueue.py</a></li>
</ul>
</div>
<div id="patch">
<h3>Diff</h3>
<a id="twexttrunktwextenterprisejobqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobqueue.py (13482 => 13483)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobqueue.py        2014-05-16 01:44:14 UTC (rev 13482)
+++ twext/trunk/twext/enterprise/jobqueue.py        2014-05-16 01:50:56 UTC (rev 13483)
</span><span class="lines">@@ -78,6 +78,34 @@
</span><span class="cx"> # queuer is a provider of IQueuer, of which there are several
</span><span class="cx"> # implementations in this module.
</span><span class="cx"> queuer.enqueueWork(txn, CouponWork, customerID=customerID)
</span><ins>+
+More details:
+
+ A master process in each node (host in a multi-host setup) has a:
+
+ L{WorkerConnectionPool}: this maintains a list of child processes that
+ have connected to the master over AMP. It is responsible for
+ dispatching work that is to be performed locally on that node.
+ The child process is identified by an L{ConnectionFromWorker}
+ object which maintains the child AMP connection. The
+ L{ConnectionFromWorker} tracks the load on its child so that
+ work can be distributed evenly or halted if the node is too busy.
+
+ L{PeerConnectionPool}: this is an AMP based service that connects a node
+ to all the other nodes in the cluster. It also runs the main job
+ queue loop to dispatch enqueued work when it becomes due. The master
+ maintains a list of other nodes via L{ConnectionFromPeerNode} objects,
+ which maintain the AMP connections. L{ConnectionFromPeerNode} can
+ report its load to others, and can receive work which it must perform
+ locally (via a dispatch to a child).
+
+ A child process has:
+
+ L{ConnectionFromController}: an AMP connection to the master. The master
+ will use this to dispatch work to the child. The child can also
+ use this to tell the master that work needs to be performed - that
+ can be used when the work needs to be distributed evenly to any
+ child.
</ins><span class="cx"> """
</span><span class="cx">
</span><span class="cx"> from functools import wraps
</span><span class="lines">@@ -287,19 +315,13 @@
</span><span class="cx"> associated with work items.
</span><span class="cx"> """
</span><span class="cx">
</span><ins>+ _workTypes = None
+ _workTypeMap = None
+
</ins><span class="cx"> def descriptor(self):
</span><span class="cx"> return JobDescriptor(self.jobID, self.weight)
</span><span class="cx">
</span><span class="cx">
</span><del>- @inlineCallbacks
- def workItem(self):
- workItemClass = WorkItem.forTableName(self.workType)
- workItems = yield workItemClass.loadForJob(
- self.transaction, self.jobID
- )
- returnValue(workItems[0] if len(workItems) == 1 else None)
-
-
</del><span class="cx"> def assign(self, now):
</span><span class="cx"> """
</span><span class="cx"> Mark this job as assigned to a worker by setting the assigned column to the current,
</span><span class="lines">@@ -350,13 +372,13 @@
</span><span class="cx"> def _overtm(nb):
</span><span class="cx"> return "{:.0f}".format(1000 * (t - astimestamp(nb)))
</span><span class="cx">
</span><del>- log.debug("JobItem: starting to run {jobid}".format(jobid=jobID))
</del><ins>+ log.debug("JobItem: {jobid} starting to run".format(jobid=jobID))
</ins><span class="cx"> txn = txnFactory(label="ultimatelyPerform: {}".format(jobID))
</span><span class="cx"> try:
</span><span class="cx"> job = yield cls.load(txn, jobID)
</span><span class="cx"> if hasattr(txn, "_label"):
</span><span class="cx"> txn._label = "{} <{}>".format(txn._label, job.workType)
</span><del>- log.debug("JobItem: loaded {jobid} {work} t={tm}".format(
</del><ins>+ log.debug("JobItem: {jobid} loaded {work} t={tm}".format(
</ins><span class="cx"> jobid=jobID,
</span><span class="cx"> work=job.workType,
</span><span class="cx"> tm=_tm())
</span><span class="lines">@@ -366,7 +388,7 @@
</span><span class="cx"> except NoSuchRecord:
</span><span class="cx"> # The record has already been removed
</span><span class="cx"> yield txn.commit()
</span><del>- log.debug("JobItem: already removed {jobid} t={tm}".format(jobid=jobID, tm=_tm()))
</del><ins>+ log.debug("JobItem: {jobid} already removed t={tm}".format(jobid=jobID, tm=_tm()))
</ins><span class="cx">
</span><span class="cx"> except JobFailedError:
</span><span class="cx"> # Job failed: abort with cleanup, but pretend this method succeeded
</span><span class="lines">@@ -374,12 +396,12 @@
</span><span class="cx"> @inlineCallbacks
</span><span class="cx"> def _cleanUp2(txn2):
</span><span class="cx"> job = yield cls.load(txn2, jobID)
</span><del>- log.debug("JobItem: marking as failed {jobid}, failure count: {count} t={tm}".format(jobid=jobID, count=job.failed + 1, tm=_tm()))
</del><ins>+ log.debug("JobItem: {jobid} marking as failed {count} t={tm}".format(jobid=jobID, count=job.failed + 1, tm=_tm()))
</ins><span class="cx"> yield job.failedToRun()
</span><span class="cx"> return inTransaction(txnFactory, _cleanUp2, "ultimatelyPerform._cleanUp")
</span><span class="cx"> txn.postAbort(_cleanUp)
</span><span class="cx"> yield txn.abort()
</span><del>- log.debug("JobItem: failed {jobid} {work} t={tm}".format(
</del><ins>+ log.debug("JobItem: {jobid} failed {work} t={tm}".format(
</ins><span class="cx"> jobid=jobID,
</span><span class="cx"> work=job.workType,
</span><span class="cx"> tm=_tm()
</span><span class="lines">@@ -387,7 +409,7 @@
</span><span class="cx">
</span><span class="cx"> except:
</span><span class="cx"> f = Failure()
</span><del>- log.error("JobItem: Unknown exception for {jobid} failed t={tm} {exc}".format(
</del><ins>+ log.error("JobItem: {jobid} unknown exception t={tm} {exc}".format(
</ins><span class="cx"> jobid=jobID,
</span><span class="cx"> tm=_tm(),
</span><span class="cx"> exc=f,
</span><span class="lines">@@ -397,7 +419,7 @@
</span><span class="cx">
</span><span class="cx"> else:
</span><span class="cx"> yield txn.commit()
</span><del>- log.debug("JobItem: completed {jobid} {work} t={tm} over={over}".format(
</del><ins>+ log.debug("JobItem: {jobid} completed {work} t={tm} over={over}".format(
</ins><span class="cx"> jobid=jobID,
</span><span class="cx"> work=job.workType,
</span><span class="cx"> tm=_tm(),
</span><span class="lines">@@ -482,10 +504,24 @@
</span><span class="cx"> Run this job item by finding the appropriate work item class and
</span><span class="cx"> running that.
</span><span class="cx"> """
</span><ins>+
+ # TODO: Move group into the JOB table.
+ # Do a select * where group = X for update nowait to lock
+ # all rows in the group - on exception raise JobFailed
+ # with a rollback to allow the job to be re-assigned to a later
+ # date.
</ins><span class="cx"> workItem = yield self.workItem()
</span><span class="cx"> if workItem is not None:
</span><span class="cx"> if workItem.group is not None:
</span><del>- yield NamedLock.acquire(self.transaction, workItem.group)
</del><ins>+ try:
+ yield NamedLock.acquire(self.transaction, workItem.group)
+ except Exception as e:
+ log.error("JobItem: {jobid}, WorkItem: {workid} lock failed: {exc}".format(
+ jobid=self.jobID,
+ workid=workItem.workID,
+ exc=e,
+ ))
+ raise JobFailedError(e)
</ins><span class="cx">
</span><span class="cx"> try:
</span><span class="cx"> # Once the work is done we delete ourselves
</span><span class="lines">@@ -512,16 +548,51 @@
</span><span class="cx"> pass
</span><span class="cx">
</span><span class="cx">
</span><ins>+ @inlineCallbacks
+ def workItem(self):
+ workItemClass = self.workItemForType(self.workType)
+ workItems = yield workItemClass.loadForJob(
+ self.transaction, self.jobID
+ )
+ returnValue(workItems[0] if len(workItems) == 1 else None)
+
+
</ins><span class="cx"> @classmethod
</span><ins>+ def workItemForType(cls, workType):
+ if cls._workTypeMap is None:
+ cls.workTypes()
+ return cls._workTypeMap[workType]
+
+
+ @classmethod
</ins><span class="cx"> def workTypes(cls):
</span><span class="cx"> """
</span><span class="cx"> @return: All of the work item types.
</span><span class="cx"> @rtype: iterable of L{WorkItem} subclasses
</span><span class="cx"> """
</span><del>- return WorkItem.__subclasses__()
</del><ins>+ if cls._workTypes is None:
+ cls._workTypes = []
+ def getWorkType(subcls, appendTo):
+ if hasattr(subcls, "table"):
+ appendTo.append(subcls)
+ else:
+ for subsubcls in subcls.__subclasses__():
+ getWorkType(subsubcls, appendTo)
+ getWorkType(WorkItem, cls._workTypes)
</ins><span class="cx">
</span><ins>+ cls._workTypeMap = {}
+ for subcls in cls._workTypes:
+ cls._workTypeMap[subcls.table.model.name] = subcls
</ins><span class="cx">
</span><ins>+ return cls._workTypes
+
+
</ins><span class="cx"> @classmethod
</span><ins>+ def numberOfWorkTypes(cls):
+ return len(cls.workTypes())
+
+
+ @classmethod
</ins><span class="cx"> @inlineCallbacks
</span><span class="cx"> def waitEmpty(cls, txnCreator, reactor, timeout):
</span><span class="cx"> """
</span><span class="lines">@@ -568,11 +639,6 @@
</span><span class="cx"> returnValue(results)
</span><span class="cx">
</span><span class="cx">
</span><del>- @classmethod
- def numberOfWorkTypes(cls):
- return len(cls.workTypes())
-
-
</del><span class="cx"> JobDescriptor = namedtuple("JobDescriptor", ["jobID", "weight"])
</span><span class="cx">
</span><span class="cx"> class JobDescriptorArg(Argument):
</span><span class="lines">@@ -703,8 +769,8 @@
</span><span class="cx"> group = None
</span><span class="cx"> default_priority = WORK_PRIORITY_LOW # Default - subclasses should override
</span><span class="cx"> default_weight = WORK_WEIGHT_5 # Default - subclasses should override
</span><ins>+ _tableNameMap = {}
</ins><span class="cx">
</span><del>-
</del><span class="cx"> @classmethod
</span><span class="cx"> @inlineCallbacks
</span><span class="cx"> def makeJob(cls, transaction, **kwargs):
</span><span class="lines">@@ -763,30 +829,7 @@
</span><span class="cx"> raise NotImplementedError
</span><span class="cx">
</span><span class="cx">
</span><del>- @classmethod
- def forTableName(cls, tableName):
- """
- Look up a work-item class given a particular table name. Factoring
- this correctly may place it into L{twext.enterprise.record.Record}
- instead; it is probably generally useful to be able to look up a mapped
- class from a table.
</del><span class="cx">
</span><del>- @param tableName: the name of the table to look up
- @type tableName: L{str}
-
- @return: the relevant subclass
- @rtype: L{type}
- """
- for subcls in cls.__subclasses__():
- clstable = getattr(subcls, "table", None)
- if tableName == clstable.model.name:
- return subcls
- raise KeyError("No mapped {0} class for {1}.".format(
- cls, tableName
- ))
-
-
-
</del><span class="cx"> class PerformJob(Command):
</span><span class="cx"> """
</span><span class="cx"> Notify another process that it must do a job that has been persisted to
</span><span class="lines">@@ -1024,7 +1067,7 @@
</span><span class="cx"> """
</span><span class="cx"> The load of all currently connected workers.
</span><span class="cx"> """
</span><del>- return [(worker.currentLoad, worker.totalCompleted) for worker in self.workers]
</del><ins>+ return [(worker.currentAssigned, worker.currentLoad, worker.totalCompleted) for worker in self.workers]
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> def allWorkerLoad(self):
</span><span class="lines">@@ -1072,11 +1115,20 @@
</span><span class="cx"> def __init__(self, peerPool, boxReceiver=None, locator=None):
</span><span class="cx"> super(ConnectionFromWorker, self).__init__(boxReceiver, locator)
</span><span class="cx"> self.peerPool = peerPool
</span><ins>+ self._assigned = 0
</ins><span class="cx"> self._load = 0
</span><span class="cx"> self._completed = 0
</span><span class="cx">
</span><span class="cx">
</span><span class="cx"> @property
</span><ins>+ def currentAssigned(self):
+ """
+ How many jobs currently assigned to this worker?
+ """
+ return self._assigned
+
+
+ @property
</ins><span class="cx"> def currentLoad(self):
</span><span class="cx"> """
</span><span class="cx"> What is the current load of this worker?
</span><span class="lines">@@ -1120,10 +1172,12 @@
</span><span class="cx"> L{ConnectionFromController.actuallyReallyExecuteJobHere}.
</span><span class="cx"> """
</span><span class="cx"> d = self.callRemote(PerformJob, job=job)
</span><ins>+ self._assigned += 1
</ins><span class="cx"> self._load += job.weight
</span><span class="cx">
</span><span class="cx"> @d.addBoth
</span><span class="cx"> def f(result):
</span><ins>+ self._assigned -= 1
</ins><span class="cx"> self._load -= job.weight
</span><span class="cx"> self._completed += 1
</span><span class="cx"> return result
</span></span></pre></div>
<a id="twexttrunktwextenterprisetesttest_jobqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py (13482 => 13483)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/test/test_jobqueue.py        2014-05-16 01:44:14 UTC (rev 13482)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py        2014-05-16 01:50:56 UTC (rev 13483)
</span><span class="lines">@@ -313,7 +313,7 @@
</span><span class="cx"> table.
</span><span class="cx"> """
</span><span class="cx"> self.assertIdentical(
</span><del>- WorkItem.forTableName(schema.DUMMY_WORK_ITEM.model.name), DummyWorkItem
</del><ins>+ JobItem.workItemForType(schema.DUMMY_WORK_ITEM.model.name), DummyWorkItem
</ins><span class="cx"> )
</span><span class="cx">
</span><span class="cx">
</span></span></pre>
</div>
</div>
</body>
</html>