<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>[13483] twext/trunk/twext/enterprise</title>
</head>
<body>

<style type="text/css"><!--
#msg dl.meta { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; }
#msg dl.meta dt { float: left; width: 6em; font-weight: bold; }
#msg dt:after { content:':';}
#msg dl, #msg dt, #msg ul, #msg li, #header, #footer, #logmsg { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt;  }
#msg dl a { font-weight: bold}
#msg dl a:link    { color:#fc3; }
#msg dl a:active  { color:#ff0; }
#msg dl a:visited { color:#cc6; }
h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: bold; }
#msg pre { overflow: auto; background: #ffc; border: 1px #fa0 solid; padding: 6px; }
#logmsg { background: #ffc; border: 1px #fa0 solid; padding: 1em 1em 0 1em; }
#logmsg p, #logmsg pre, #logmsg blockquote { margin: 0 0 1em 0; }
#logmsg p, #logmsg li, #logmsg dt, #logmsg dd { line-height: 14pt; }
#logmsg h1, #logmsg h2, #logmsg h3, #logmsg h4, #logmsg h5, #logmsg h6 { margin: .5em 0; }
#logmsg h1:first-child, #logmsg h2:first-child, #logmsg h3:first-child, #logmsg h4:first-child, #logmsg h5:first-child, #logmsg h6:first-child { margin-top: 0; }
#logmsg ul, #logmsg ol { padding: 0; list-style-position: inside; margin: 0 0 0 1em; }
#logmsg ul { text-indent: -1em; padding-left: 1em; }#logmsg ol { text-indent: -1.5em; padding-left: 1.5em; }
#logmsg > ul, #logmsg > ol { margin: 0 0 1em 0; }
#logmsg pre { background: #eee; padding: 1em; }
#logmsg blockquote { border: 1px solid #fa0; border-left-width: 10px; padding: 1em 1em 0 1em; background: white;}
#logmsg dl { margin: 0; }
#logmsg dt { font-weight: bold; }
#logmsg dd { margin: 0; padding: 0 0 0.5em 0; }
#logmsg dd:before { content:'\00bb';}
#logmsg table { border-spacing: 0px; border-collapse: collapse; border-top: 4px solid #fa0; border-bottom: 1px solid #fa0; background: #fff; }
#logmsg table th { text-align: left; font-weight: normal; padding: 0.2em 0.5em; border-top: 1px dotted #fa0; }
#logmsg table td { text-align: right; border-top: 1px dotted #fa0; padding: 0.2em 0.5em; }
#logmsg table thead th { text-align: center; border-bottom: 1px solid #fa0; }
#logmsg table th.Corner { text-align: left; }
#logmsg hr { border: none 0; border-top: 2px dashed #fa0; height: 1px; }
#header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: 6px; }
#patch { width: 100%; }
#patch h4 {font-family: verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;}
#patch .propset h4, #patch .binary h4 {margin:0;}
#patch pre {padding:0;line-height:1.2em;margin:0;}
#patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;}
#patch .propset .diff, #patch .binary .diff  {padding:10px 0;}
#patch span {display:block;padding:0 10px;}
#patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, #patch .copfile {border:1px solid #ccc;margin:10px 0;}
#patch ins {background:#dfd;text-decoration:none;display:block;padding:0 10px;}
#patch del {background:#fdd;text-decoration:none;display:block;padding:0 10px;}
#patch .lines, .info {color:#888;background:#fff;}
--></style>
<div id="msg">
<dl class="meta">
<dt>Revision</dt> <dd><a href="http://trac.calendarserver.org//changeset/13483">13483</a></dd>
<dt>Author</dt> <dd>cdaboo@apple.com</dd>
<dt>Date</dt> <dd>2014-05-15 18:50:56 -0700 (Thu, 15 May 2014)</dd>
</dl>

<h3>Log Message</h3>
<pre>OPtimize work item lookup. Tweak logging. Add additional tracking for dashboard.</pre>

<h3>Modified Paths</h3>
<ul>
<li><a href="#twexttrunktwextenterprisejobqueuepy">twext/trunk/twext/enterprise/jobqueue.py</a></li>
<li><a href="#twexttrunktwextenterprisetesttest_jobqueuepy">twext/trunk/twext/enterprise/test/test_jobqueue.py</a></li>
</ul>

</div>
<div id="patch">
<h3>Diff</h3>
<a id="twexttrunktwextenterprisejobqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobqueue.py (13482 => 13483)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobqueue.py        2014-05-16 01:44:14 UTC (rev 13482)
+++ twext/trunk/twext/enterprise/jobqueue.py        2014-05-16 01:50:56 UTC (rev 13483)
</span><span class="lines">@@ -78,6 +78,34 @@
</span><span class="cx">             # queuer is a provider of IQueuer, of which there are several
</span><span class="cx">             # implementations in this module.
</span><span class="cx">             queuer.enqueueWork(txn, CouponWork, customerID=customerID)
</span><ins>+
+More details:
+
+    A master process in each node (host in a multi-host setup) has a:
+
+    L{WorkerConnectionPool}: this maintains a list of child processes that
+        have connected to the master over AMP. It is responsible for
+        dispatching work that is to be performed locally on that node.
+        The child process is identified by an L{ConnectionFromWorker}
+        object which maintains the child AMP connection. The
+        L{ConnectionFromWorker} tracks the load on its child so that
+        work can be distributed evenly or halted if the node is too busy.
+
+    L{PeerConnectionPool}: this is an AMP based service that connects a node
+        to all the other nodes in the cluster. It also runs the main job
+        queue loop to dispatch enqueued work when it becomes due. The master
+        maintains a list of other nodes via L{ConnectionFromPeerNode} objects,
+        which maintain the AMP connections. L{ConnectionFromPeerNode} can
+        report its load to others, and can receive work which it must perform
+        locally (via a dispatch to a child).
+
+    A child process has:
+
+    L{ConnectionFromController}: an AMP connection to the master. The master
+        will use this to dispatch work to the child. The child can also
+        use this to tell the master that work needs to be performed - that
+        can be used when the work needs to be distributed evenly to any
+        child.
</ins><span class="cx"> &quot;&quot;&quot;
</span><span class="cx"> 
</span><span class="cx"> from functools import wraps
</span><span class="lines">@@ -287,19 +315,13 @@
</span><span class="cx">     associated with work items.
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx"> 
</span><ins>+    _workTypes = None
+    _workTypeMap = None
+
</ins><span class="cx">     def descriptor(self):
</span><span class="cx">         return JobDescriptor(self.jobID, self.weight)
</span><span class="cx"> 
</span><span class="cx"> 
</span><del>-    @inlineCallbacks
-    def workItem(self):
-        workItemClass = WorkItem.forTableName(self.workType)
-        workItems = yield workItemClass.loadForJob(
-            self.transaction, self.jobID
-        )
-        returnValue(workItems[0] if len(workItems) == 1 else None)
-
-
</del><span class="cx">     def assign(self, now):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Mark this job as assigned to a worker by setting the assigned column to the current,
</span><span class="lines">@@ -350,13 +372,13 @@
</span><span class="cx">         def _overtm(nb):
</span><span class="cx">             return &quot;{:.0f}&quot;.format(1000 * (t - astimestamp(nb)))
</span><span class="cx"> 
</span><del>-        log.debug(&quot;JobItem: starting to run {jobid}&quot;.format(jobid=jobID))
</del><ins>+        log.debug(&quot;JobItem: {jobid} starting to run&quot;.format(jobid=jobID))
</ins><span class="cx">         txn = txnFactory(label=&quot;ultimatelyPerform: {}&quot;.format(jobID))
</span><span class="cx">         try:
</span><span class="cx">             job = yield cls.load(txn, jobID)
</span><span class="cx">             if hasattr(txn, &quot;_label&quot;):
</span><span class="cx">                 txn._label = &quot;{} &lt;{}&gt;&quot;.format(txn._label, job.workType)
</span><del>-            log.debug(&quot;JobItem: loaded {jobid} {work} t={tm}&quot;.format(
</del><ins>+            log.debug(&quot;JobItem: {jobid} loaded {work} t={tm}&quot;.format(
</ins><span class="cx">                 jobid=jobID,
</span><span class="cx">                 work=job.workType,
</span><span class="cx">                 tm=_tm())
</span><span class="lines">@@ -366,7 +388,7 @@
</span><span class="cx">         except NoSuchRecord:
</span><span class="cx">             # The record has already been removed
</span><span class="cx">             yield txn.commit()
</span><del>-            log.debug(&quot;JobItem: already removed {jobid} t={tm}&quot;.format(jobid=jobID, tm=_tm()))
</del><ins>+            log.debug(&quot;JobItem: {jobid} already removed t={tm}&quot;.format(jobid=jobID, tm=_tm()))
</ins><span class="cx"> 
</span><span class="cx">         except JobFailedError:
</span><span class="cx">             # Job failed: abort with cleanup, but pretend this method succeeded
</span><span class="lines">@@ -374,12 +396,12 @@
</span><span class="cx">                 @inlineCallbacks
</span><span class="cx">                 def _cleanUp2(txn2):
</span><span class="cx">                     job = yield cls.load(txn2, jobID)
</span><del>-                    log.debug(&quot;JobItem: marking as failed {jobid}, failure count: {count} t={tm}&quot;.format(jobid=jobID, count=job.failed + 1, tm=_tm()))
</del><ins>+                    log.debug(&quot;JobItem: {jobid} marking as failed {count} t={tm}&quot;.format(jobid=jobID, count=job.failed + 1, tm=_tm()))
</ins><span class="cx">                     yield job.failedToRun()
</span><span class="cx">                 return inTransaction(txnFactory, _cleanUp2, &quot;ultimatelyPerform._cleanUp&quot;)
</span><span class="cx">             txn.postAbort(_cleanUp)
</span><span class="cx">             yield txn.abort()
</span><del>-            log.debug(&quot;JobItem: failed {jobid} {work} t={tm}&quot;.format(
</del><ins>+            log.debug(&quot;JobItem: {jobid} failed {work} t={tm}&quot;.format(
</ins><span class="cx">                 jobid=jobID,
</span><span class="cx">                 work=job.workType,
</span><span class="cx">                 tm=_tm()
</span><span class="lines">@@ -387,7 +409,7 @@
</span><span class="cx"> 
</span><span class="cx">         except:
</span><span class="cx">             f = Failure()
</span><del>-            log.error(&quot;JobItem: Unknown exception for {jobid} failed t={tm} {exc}&quot;.format(
</del><ins>+            log.error(&quot;JobItem: {jobid} unknown exception t={tm} {exc}&quot;.format(
</ins><span class="cx">                 jobid=jobID,
</span><span class="cx">                 tm=_tm(),
</span><span class="cx">                 exc=f,
</span><span class="lines">@@ -397,7 +419,7 @@
</span><span class="cx"> 
</span><span class="cx">         else:
</span><span class="cx">             yield txn.commit()
</span><del>-            log.debug(&quot;JobItem: completed {jobid} {work} t={tm} over={over}&quot;.format(
</del><ins>+            log.debug(&quot;JobItem: {jobid} completed {work} t={tm} over={over}&quot;.format(
</ins><span class="cx">                 jobid=jobID,
</span><span class="cx">                 work=job.workType,
</span><span class="cx">                 tm=_tm(),
</span><span class="lines">@@ -482,10 +504,24 @@
</span><span class="cx">         Run this job item by finding the appropriate work item class and
</span><span class="cx">         running that.
</span><span class="cx">         &quot;&quot;&quot;
</span><ins>+
+        # TODO: Move group into the JOB table.
+        # Do a select * where group = X for update nowait to lock
+        # all rows in the group - on exception raise JobFailed
+        # with a rollback to allow the job to be re-assigned to a later
+        # date.
</ins><span class="cx">         workItem = yield self.workItem()
</span><span class="cx">         if workItem is not None:
</span><span class="cx">             if workItem.group is not None:
</span><del>-                yield NamedLock.acquire(self.transaction, workItem.group)
</del><ins>+                try:
+                    yield NamedLock.acquire(self.transaction, workItem.group)
+                except Exception as e:
+                    log.error(&quot;JobItem: {jobid}, WorkItem: {workid} lock failed: {exc}&quot;.format(
+                        jobid=self.jobID,
+                        workid=workItem.workID,
+                        exc=e,
+                    ))
+                    raise JobFailedError(e)
</ins><span class="cx"> 
</span><span class="cx">             try:
</span><span class="cx">                 # Once the work is done we delete ourselves
</span><span class="lines">@@ -512,16 +548,51 @@
</span><span class="cx">             pass
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+    @inlineCallbacks
+    def workItem(self):
+        workItemClass = self.workItemForType(self.workType)
+        workItems = yield workItemClass.loadForJob(
+            self.transaction, self.jobID
+        )
+        returnValue(workItems[0] if len(workItems) == 1 else None)
+
+
</ins><span class="cx">     @classmethod
</span><ins>+    def workItemForType(cls, workType):
+        if cls._workTypeMap is None:
+            cls.workTypes()
+        return cls._workTypeMap[workType]
+
+
+    @classmethod
</ins><span class="cx">     def workTypes(cls):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         @return: All of the work item types.
</span><span class="cx">         @rtype: iterable of L{WorkItem} subclasses
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        return WorkItem.__subclasses__()
</del><ins>+        if cls._workTypes is None:
+            cls._workTypes = []
+            def getWorkType(subcls, appendTo):
+                if hasattr(subcls, &quot;table&quot;):
+                    appendTo.append(subcls)
+                else:
+                    for subsubcls in subcls.__subclasses__():
+                        getWorkType(subsubcls, appendTo)
+            getWorkType(WorkItem, cls._workTypes)
</ins><span class="cx"> 
</span><ins>+            cls._workTypeMap = {}
+            for subcls in cls._workTypes:
+                cls._workTypeMap[subcls.table.model.name] = subcls
</ins><span class="cx"> 
</span><ins>+        return cls._workTypes
+
+
</ins><span class="cx">     @classmethod
</span><ins>+    def numberOfWorkTypes(cls):
+        return len(cls.workTypes())
+
+
+    @classmethod
</ins><span class="cx">     @inlineCallbacks
</span><span class="cx">     def waitEmpty(cls, txnCreator, reactor, timeout):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="lines">@@ -568,11 +639,6 @@
</span><span class="cx">         returnValue(results)
</span><span class="cx"> 
</span><span class="cx"> 
</span><del>-    @classmethod
-    def numberOfWorkTypes(cls):
-        return len(cls.workTypes())
-
-
</del><span class="cx"> JobDescriptor = namedtuple(&quot;JobDescriptor&quot;, [&quot;jobID&quot;, &quot;weight&quot;])
</span><span class="cx"> 
</span><span class="cx"> class JobDescriptorArg(Argument):
</span><span class="lines">@@ -703,8 +769,8 @@
</span><span class="cx">     group = None
</span><span class="cx">     default_priority = WORK_PRIORITY_LOW    # Default - subclasses should override
</span><span class="cx">     default_weight = WORK_WEIGHT_5          # Default - subclasses should override
</span><ins>+    _tableNameMap = {}
</ins><span class="cx"> 
</span><del>-
</del><span class="cx">     @classmethod
</span><span class="cx">     @inlineCallbacks
</span><span class="cx">     def makeJob(cls, transaction, **kwargs):
</span><span class="lines">@@ -763,30 +829,7 @@
</span><span class="cx">         raise NotImplementedError
</span><span class="cx"> 
</span><span class="cx"> 
</span><del>-    @classmethod
-    def forTableName(cls, tableName):
-        &quot;&quot;&quot;
-        Look up a work-item class given a particular table name.  Factoring
-        this correctly may place it into L{twext.enterprise.record.Record}
-        instead; it is probably generally useful to be able to look up a mapped
-        class from a table.
</del><span class="cx"> 
</span><del>-        @param tableName: the name of the table to look up
-        @type tableName: L{str}
-
-        @return: the relevant subclass
-        @rtype: L{type}
-        &quot;&quot;&quot;
-        for subcls in cls.__subclasses__():
-            clstable = getattr(subcls, &quot;table&quot;, None)
-            if tableName == clstable.model.name:
-                return subcls
-        raise KeyError(&quot;No mapped {0} class for {1}.&quot;.format(
-            cls, tableName
-        ))
-
-
-
</del><span class="cx"> class PerformJob(Command):
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     Notify another process that it must do a job that has been persisted to
</span><span class="lines">@@ -1024,7 +1067,7 @@
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         The load of all currently connected workers.
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        return [(worker.currentLoad, worker.totalCompleted) for worker in self.workers]
</del><ins>+        return [(worker.currentAssigned, worker.currentLoad, worker.totalCompleted) for worker in self.workers]
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     def allWorkerLoad(self):
</span><span class="lines">@@ -1072,11 +1115,20 @@
</span><span class="cx">     def __init__(self, peerPool, boxReceiver=None, locator=None):
</span><span class="cx">         super(ConnectionFromWorker, self).__init__(boxReceiver, locator)
</span><span class="cx">         self.peerPool = peerPool
</span><ins>+        self._assigned = 0
</ins><span class="cx">         self._load = 0
</span><span class="cx">         self._completed = 0
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @property
</span><ins>+    def currentAssigned(self):
+        &quot;&quot;&quot;
+        How many jobs currently assigned to this worker?
+        &quot;&quot;&quot;
+        return self._assigned
+
+
+    @property
</ins><span class="cx">     def currentLoad(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         What is the current load of this worker?
</span><span class="lines">@@ -1120,10 +1172,12 @@
</span><span class="cx">             L{ConnectionFromController.actuallyReallyExecuteJobHere}.
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         d = self.callRemote(PerformJob, job=job)
</span><ins>+        self._assigned += 1
</ins><span class="cx">         self._load += job.weight
</span><span class="cx"> 
</span><span class="cx">         @d.addBoth
</span><span class="cx">         def f(result):
</span><ins>+            self._assigned -= 1
</ins><span class="cx">             self._load -= job.weight
</span><span class="cx">             self._completed += 1
</span><span class="cx">             return result
</span></span></pre></div>
<a id="twexttrunktwextenterprisetesttest_jobqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py (13482 => 13483)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/test/test_jobqueue.py        2014-05-16 01:44:14 UTC (rev 13482)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py        2014-05-16 01:50:56 UTC (rev 13483)
</span><span class="lines">@@ -313,7 +313,7 @@
</span><span class="cx">         table.
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         self.assertIdentical(
</span><del>-            WorkItem.forTableName(schema.DUMMY_WORK_ITEM.model.name), DummyWorkItem
</del><ins>+            JobItem.workItemForType(schema.DUMMY_WORK_ITEM.model.name), DummyWorkItem
</ins><span class="cx">         )
</span><span class="cx"> 
</span><span class="cx"> 
</span></span></pre>
</div>
</div>

</body>
</html>