<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>[15096] twext/trunk/twext/enterprise/jobs</title>
</head>
<body>

<style type="text/css"><!--
#msg dl.meta { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; }
#msg dl.meta dt { float: left; width: 6em; font-weight: bold; }
#msg dt:after { content:':';}
#msg dl, #msg dt, #msg ul, #msg li, #header, #footer, #logmsg { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt;  }
#msg dl a { font-weight: bold}
#msg dl a:link    { color:#fc3; }
#msg dl a:active  { color:#ff0; }
#msg dl a:visited { color:#cc6; }
h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: bold; }
#msg pre { overflow: auto; background: #ffc; border: 1px #fa0 solid; padding: 6px; }
#logmsg { background: #ffc; border: 1px #fa0 solid; padding: 1em 1em 0 1em; }
#logmsg p, #logmsg pre, #logmsg blockquote { margin: 0 0 1em 0; }
#logmsg p, #logmsg li, #logmsg dt, #logmsg dd { line-height: 14pt; }
#logmsg h1, #logmsg h2, #logmsg h3, #logmsg h4, #logmsg h5, #logmsg h6 { margin: .5em 0; }
#logmsg h1:first-child, #logmsg h2:first-child, #logmsg h3:first-child, #logmsg h4:first-child, #logmsg h5:first-child, #logmsg h6:first-child { margin-top: 0; }
#logmsg ul, #logmsg ol { padding: 0; list-style-position: inside; margin: 0 0 0 1em; }
#logmsg ul { text-indent: -1em; padding-left: 1em; }#logmsg ol { text-indent: -1.5em; padding-left: 1.5em; }
#logmsg > ul, #logmsg > ol { margin: 0 0 1em 0; }
#logmsg pre { background: #eee; padding: 1em; }
#logmsg blockquote { border: 1px solid #fa0; border-left-width: 10px; padding: 1em 1em 0 1em; background: white;}
#logmsg dl { margin: 0; }
#logmsg dt { font-weight: bold; }
#logmsg dd { margin: 0; padding: 0 0 0.5em 0; }
#logmsg dd:before { content:'\00bb';}
#logmsg table { border-spacing: 0px; border-collapse: collapse; border-top: 4px solid #fa0; border-bottom: 1px solid #fa0; background: #fff; }
#logmsg table th { text-align: left; font-weight: normal; padding: 0.2em 0.5em; border-top: 1px dotted #fa0; }
#logmsg table td { text-align: right; border-top: 1px dotted #fa0; padding: 0.2em 0.5em; }
#logmsg table thead th { text-align: center; border-bottom: 1px solid #fa0; }
#logmsg table th.Corner { text-align: left; }
#logmsg hr { border: none 0; border-top: 2px dashed #fa0; height: 1px; }
#header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: 6px; }
#patch { width: 100%; }
#patch h4 {font-family: verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;}
#patch .propset h4, #patch .binary h4 {margin:0;}
#patch pre {padding:0;line-height:1.2em;margin:0;}
#patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;}
#patch .propset .diff, #patch .binary .diff  {padding:10px 0;}
#patch span {display:block;padding:0 10px;}
#patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, #patch .copfile {border:1px solid #ccc;margin:10px 0;}
#patch ins {background:#dfd;text-decoration:none;display:block;padding:0 10px;}
#patch del {background:#fdd;text-decoration:none;display:block;padding:0 10px;}
#patch .lines, .info {color:#888;background:#fff;}
--></style>
<div id="msg">
<dl class="meta">
<dt>Revision</dt> <dd><a href="http://trac.calendarserver.org//changeset/15096">15096</a></dd>
<dt>Author</dt> <dd>cdaboo@apple.com</dd>
<dt>Date</dt> <dd>2015-09-03 13:49:06 -0700 (Thu, 03 Sep 2015)</dd>
</dl>

<h3>Log Message</h3>
<pre>Split the orphan work check out into its own loop to help speed up the main work loop. Improve some logging.</pre>

<h3>Modified Paths</h3>
<ul>
<li><a href="#twexttrunktwextenterprisejobsjobitempy">twext/trunk/twext/enterprise/jobs/jobitem.py</a></li>
<li><a href="#twexttrunktwextenterprisejobsqueuepy">twext/trunk/twext/enterprise/jobs/queue.py</a></li>
<li><a href="#twexttrunktwextenterprisejobstesttest_jobspy">twext/trunk/twext/enterprise/jobs/test/test_jobs.py</a></li>
</ul>

</div>
<div id="patch">
<h3>Diff</h3>
<a id="twexttrunktwextenterprisejobsjobitempy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobs/jobitem.py (15095 => 15096)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobs/jobitem.py        2015-09-03 20:40:30 UTC (rev 15095)
+++ twext/trunk/twext/enterprise/jobs/jobitem.py        2015-09-03 20:49:06 UTC (rev 15096)
</span><span class="lines">@@ -18,7 +18,8 @@
</span><span class="cx"> from twext.enterprise.dal.model import Sequence
</span><span class="cx"> from twext.enterprise.dal.model import Table, Schema, SQLType
</span><span class="cx"> from twext.enterprise.dal.record import Record, fromTable, NoSuchRecord
</span><del>-from twext.enterprise.dal.syntax import SchemaSyntax
</del><ins>+from twext.enterprise.dal.syntax import SchemaSyntax, Count, NullIf, Constant, \
+    Sum
</ins><span class="cx"> from twext.enterprise.ienterprise import ORACLE_DIALECT
</span><span class="cx"> from twext.enterprise.jobs.utils import inTransaction, astimestamp
</span><span class="cx"> from twext.python.log import Logger
</span><span class="lines">@@ -107,6 +108,12 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+# Priority for work - used to order work items in the job queue
+JOB_PRIORITY_LOW = 0
+JOB_PRIORITY_MEDIUM = 1
+JOB_PRIORITY_HIGH = 2
+
+
</ins><span class="cx"> class JobItem(Record, fromTable(JobInfoSchema.JOB)):
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     @DynamicAttrs
</span><span class="lines">@@ -165,6 +172,13 @@
</span><span class="cx">         return self.update(assigned=when, overdue=when + timedelta(seconds=overdue))
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+    def unassign(self):
+        &quot;&quot;&quot;
+        Mark this job as unassigned by setting the assigned and overdue columns to L{None}.
+        &quot;&quot;&quot;
+        return self.update(assigned=None, overdue=None)
+
+
</ins><span class="cx">     def bumpOverdue(self, bump):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Increment the overdue value by the specified number of seconds. Used when an overdue job
</span><span class="lines">@@ -219,7 +233,7 @@
</span><span class="cx"> 
</span><span class="cx">     @classmethod
</span><span class="cx">     @inlineCallbacks
</span><del>-    def ultimatelyPerform(cls, txnFactory, jobID):
</del><ins>+    def ultimatelyPerform(cls, txnFactory, jobDescriptor):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Eventually, after routing the job to the appropriate place, somebody
</span><span class="cx">         actually has to I{do} it. This method basically calls L{JobItem.run}
</span><span class="lines">@@ -229,8 +243,8 @@
</span><span class="cx">         @param txnFactory: a 0- or 1-argument callable that creates an
</span><span class="cx">             L{IAsyncTransaction}
</span><span class="cx">         @type txnFactory: L{callable}
</span><del>-        @param jobID: the ID of the job to be performed
-        @type jobID: L{int}
</del><ins>+        @param jobDescriptor: the job descriptor
+        @type jobID: L{JobDescriptor}
</ins><span class="cx">         @return: a L{Deferred} which fires with C{None} when the job has been
</span><span class="cx">             performed, or fails if the job can't be performed.
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="lines">@@ -246,32 +260,35 @@
</span><span class="cx">             @inlineCallbacks
</span><span class="cx">             def _cleanUp2(txn2):
</span><span class="cx">                 try:
</span><del>-                    job = yield cls.load(txn2, jobID)
</del><ins>+                    job = yield cls.load(txn2, jobDescriptor.jobID)
</ins><span class="cx">                 except NoSuchRecord:
</span><span class="cx">                     log.debug(
</span><del>-                        &quot;JobItem: {jobid} disappeared t={tm}&quot;,
-                        jobid=jobID,
</del><ins>+                        &quot;JobItem: {workType} {jobid} disappeared t={tm}&quot;,
+                        workType=jobDescriptor.workType,
+                        jobid=jobDescriptor.jobID,
</ins><span class="cx">                         tm=_tm(),
</span><span class="cx">                     )
</span><span class="cx">                 else:
</span><span class="cx">                     log.debug(
</span><del>-                        &quot;JobItem: {jobid} marking as failed {count} t={tm}&quot;,
-                        jobid=jobID,
</del><ins>+                        &quot;JobItem: {workType} {jobid} marking as failed {count} t={tm}&quot;,
+                        workType=jobDescriptor.workType,
+                        jobid=jobDescriptor.jobID,
</ins><span class="cx">                         count=job.failed + 1,
</span><span class="cx">                         tm=_tm(),
</span><span class="cx">                     )
</span><span class="cx">                     yield job.failedToRun(locked=isinstance(e, JobRunningError), delay=delay)
</span><span class="cx">             return inTransaction(txnFactory, _cleanUp2, &quot;ultimatelyPerform._failureCleanUp&quot;)
</span><span class="cx"> 
</span><del>-        log.debug(&quot;JobItem: {jobid} starting to run&quot;, jobid=jobID)
-        txn = txnFactory(label=&quot;ultimatelyPerform: {}&quot;.format(jobID))
</del><ins>+        log.debug(&quot;JobItem: {workType} {jobid} starting to run&quot;, workType=jobDescriptor.workType, jobid=jobDescriptor.jobID)
+        txn = txnFactory(label=&quot;ultimatelyPerform: {workType} {jobid}&quot;.format(workType=jobDescriptor.workType, jobid=jobDescriptor.jobID))
</ins><span class="cx">         try:
</span><del>-            job = yield cls.load(txn, jobID)
</del><ins>+            job = yield cls.load(txn, jobDescriptor.jobID)
</ins><span class="cx">             if hasattr(txn, &quot;_label&quot;):
</span><span class="cx">                 txn._label = &quot;{} &lt;{}&gt;&quot;.format(txn._label, job.workType)
</span><span class="cx">             log.debug(
</span><del>-                &quot;JobItem: {jobid} loaded {work} t={tm}&quot;,
-                jobid=jobID,
</del><ins>+                &quot;JobItem: {workType} {jobid} loaded {work} t={tm}&quot;,
+                workType=jobDescriptor.workType,
+                jobid=jobDescriptor.jobID,
</ins><span class="cx">                 work=job.workType,
</span><span class="cx">                 tm=_tm(),
</span><span class="cx">             )
</span><span class="lines">@@ -281,8 +298,9 @@
</span><span class="cx">             # The record has already been removed
</span><span class="cx">             yield txn.commit()
</span><span class="cx">             log.debug(
</span><del>-                &quot;JobItem: {jobid} already removed t={tm}&quot;,
-                jobid=jobID,
</del><ins>+                &quot;JobItem: {workType} {jobid} already removed t={tm}&quot;,
+                workType=jobDescriptor.workType,
+                jobid=jobDescriptor.jobID,
</ins><span class="cx">                 tm=_tm(),
</span><span class="cx">             )
</span><span class="cx"> 
</span><span class="lines">@@ -292,10 +310,10 @@
</span><span class="cx">             def _temporaryFailure():
</span><span class="cx">                 return _failureCleanUp(delay=e.delay * (job.failed + 1))
</span><span class="cx">             log.debug(
</span><del>-                &quot;JobItem: {jobid} {desc} {work} t={tm}&quot;,
-                jobid=jobID,
</del><ins>+                &quot;JobItem: {workType} {jobid} {desc} t={tm}&quot;,
+                workType=jobDescriptor.workType,
+                jobid=jobDescriptor.jobID,
</ins><span class="cx">                 desc=&quot;temporary failure #{}&quot;.format(job.failed + 1),
</span><del>-                work=job.workType,
</del><span class="cx">                 tm=_tm(),
</span><span class="cx">             )
</span><span class="cx">             txn.postAbort(_temporaryFailure)
</span><span class="lines">@@ -305,10 +323,10 @@
</span><span class="cx"> 
</span><span class="cx">             # Permanent failure
</span><span class="cx">             log.debug(
</span><del>-                &quot;JobItem: {jobid} {desc} {work} t={tm}&quot;,
-                jobid=jobID,
</del><ins>+                &quot;JobItem: {workType} {jobid} {desc} t={tm}&quot;,
+                workType=jobDescriptor.workType,
+                jobid=jobDescriptor.jobID,
</ins><span class="cx">                 desc=&quot;failed&quot; if isinstance(e, JobFailedError) else &quot;locked&quot;,
</span><del>-                work=job.workType,
</del><span class="cx">                 tm=_tm(),
</span><span class="cx">             )
</span><span class="cx">             txn.postAbort(_failureCleanUp)
</span><span class="lines">@@ -317,8 +335,9 @@
</span><span class="cx">         except:
</span><span class="cx">             f = Failure()
</span><span class="cx">             log.error(
</span><del>-                &quot;JobItem: {jobid} unknown exception t={tm} {exc}&quot;,
-                jobid=jobID,
</del><ins>+                &quot;JobItem: {workType} {jobid} exception t={tm} {exc}&quot;,
+                workType=jobDescriptor.workType,
+                jobid=jobDescriptor.jobID,
</ins><span class="cx">                 tm=_tm(),
</span><span class="cx">                 exc=f,
</span><span class="cx">             )
</span><span class="lines">@@ -328,9 +347,9 @@
</span><span class="cx">         else:
</span><span class="cx">             yield txn.commit()
</span><span class="cx">             log.debug(
</span><del>-                &quot;JobItem: {jobid} completed {work} t={tm} over={over}&quot;,
-                jobid=jobID,
-                work=job.workType,
</del><ins>+                &quot;JobItem: {workType} {jobid} completed t={tm} over={over}&quot;,
+                workType=jobDescriptor.workType,
+                jobid=jobDescriptor.jobID,
</ins><span class="cx">                 tm=_tm(),
</span><span class="cx">                 over=_overtm(job.notBefore),
</span><span class="cx">             )
</span><span class="lines">@@ -344,7 +363,7 @@
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Find the next available job based on priority, also return any that are overdue. This
</span><span class="cx">         method uses an SQL query to find the matching jobs, and sorts based on the NOT_BEFORE
</span><del>-        value and priority..
</del><ins>+        value and priority.
</ins><span class="cx"> 
</span><span class="cx">         @param txn: the transaction to use
</span><span class="cx">         @type txn: L{IAsyncTransaction}
</span><span class="lines">@@ -362,7 +381,7 @@
</span><span class="cx"> 
</span><span class="cx">         # Must only be one or zero
</span><span class="cx">         if jobs and len(jobs) &gt; 1:
</span><del>-            raise AssertionError(&quot;next_job() returned more than one row&quot;)
</del><ins>+            raise AssertionError(&quot;nextjob() returned more than one row&quot;)
</ins><span class="cx"> 
</span><span class="cx">         returnValue(jobs[0] if jobs else None)
</span><span class="cx"> 
</span><span class="lines">@@ -386,10 +405,24 @@
</span><span class="cx">         @rtype: L{JobItem}
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx"> 
</span><del>-        queryExpr = (cls.notBefore &lt;= now).And(cls.priority &gt;= minPriority).And(cls.pause == 0).And(
-            (cls.assigned == None).Or(cls.overdue &lt; now)
-        )
</del><ins>+        # Only add the PRIORITY term if minimum is greater than zero
+        queryExpr = (cls.assigned == None).And(cls.pause == 0).And(cls.notBefore &lt;= now)
</ins><span class="cx"> 
</span><ins>+        # PRIORITY can only be 0, 1, or 2. So we can convert an inequality into
+        # an equality test as follows:
+        #
+        # PRIORITY &gt;= 0 - no test needed all values match all the time
+        # PRIORITY &gt;= 1 === PRIORITY != 0
+        # PRIORITY &gt;= 2 === PRIORITY == 2
+        #
+        # Doing this allows use of the PRIORITY column in an index since we already
+        # have one inequality in the index (NOT_BEFORE)
+
+        if minPriority == JOB_PRIORITY_MEDIUM:
+            queryExpr = (cls.priority != JOB_PRIORITY_LOW).And(queryExpr)
+        elif minPriority == JOB_PRIORITY_HIGH:
+            queryExpr = (cls.priority == JOB_PRIORITY_HIGH).And(queryExpr)
+
</ins><span class="cx">         if txn.dialect == ORACLE_DIALECT:
</span><span class="cx">             # Oracle does not support a &quot;for update&quot; clause with &quot;order by&quot;. So do the
</span><span class="cx">             # &quot;for update&quot; as a second query right after the first. Will need to check
</span><span class="lines">@@ -397,7 +430,7 @@
</span><span class="cx">             jobs = yield cls.query(
</span><span class="cx">                 txn,
</span><span class="cx">                 queryExpr,
</span><del>-                order=(cls.assigned, cls.priority),
</del><ins>+                order=cls.priority,
</ins><span class="cx">                 ascending=False,
</span><span class="cx">                 limit=limit,
</span><span class="cx">             )
</span><span class="lines">@@ -412,7 +445,7 @@
</span><span class="cx">             jobs = yield cls.query(
</span><span class="cx">                 txn,
</span><span class="cx">                 queryExpr,
</span><del>-                order=(cls.assigned, cls.priority),
</del><ins>+                order=cls.priority,
</ins><span class="cx">                 ascending=False,
</span><span class="cx">                 forUpdate=True,
</span><span class="cx">                 noWait=False,
</span><span class="lines">@@ -422,7 +455,37 @@
</span><span class="cx">         returnValue(jobs)
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+    @classmethod
</ins><span class="cx">     @inlineCallbacks
</span><ins>+    def overduejobs(cls, txn, now, limit=None):
+        &quot;&quot;&quot;
+        Find the next overdue job based on priority.
+
+        @param txn: the transaction to use
+        @type txn: L{IAsyncTransaction}
+        @param now: current timestamp
+        @type now: L{datetime.datetime}
+        @param limit: limit on number of jobs to return
+        @type limit: L{int}
+
+        @return: the job record
+        @rtype: L{JobItem}
+        &quot;&quot;&quot;
+
+        queryExpr = (cls.assigned != None).And(cls.overdue &lt; now)
+
+        jobs = yield cls.query(
+            txn,
+            queryExpr,
+            forUpdate=True,
+            noWait=False,
+            limit=limit,
+        )
+
+        returnValue(jobs)
+
+
+    @inlineCallbacks
</ins><span class="cx">     def run(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Run this job item by finding the appropriate work item class and
</span><span class="lines">@@ -613,6 +676,10 @@
</span><span class="cx">         Generate a histogram of work items currently in the queue.
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         from twext.enterprise.jobs.queue import WorkerConnectionPool
</span><ins>+
+        # Fill out an empty set of results for all the known work types. The SQL
+        # query will only return work types that are currently queued, but we want
+        # results for all possible work.
</ins><span class="cx">         results = {}
</span><span class="cx">         now = datetime.utcnow()
</span><span class="cx">         for workItemType in cls.workTypes():
</span><span class="lines">@@ -626,22 +693,32 @@
</span><span class="cx">                 &quot;time&quot;: WorkerConnectionPool.timing.get(workType, 0.0)
</span><span class="cx">             })
</span><span class="cx"> 
</span><del>-        jobs = yield cls.all(txn)
</del><ins>+        # Use an aggregate query to get the results for each currently queued
+        # work type.
+        jobs = yield cls.queryExpr(
+            expr=None,
+            attributes=(
+                cls.workType,
+                Count(cls.workType),
+                Count(cls.assigned),
+                Count(NullIf(cls.assigned is not None and cls.notBefore &lt; now, Constant(False))),
+                Sum(cls.failed),
+            ),
+            group=cls.workType
+        ).on(txn)
</ins><span class="cx"> 
</span><del>-        for job in jobs:
-            r = results[job.workType]
-            r[&quot;queued&quot;] += 1
-            if job.assigned is not None:
-                r[&quot;assigned&quot;] += 1
-            if job.assigned is None and job.notBefore &lt; now:
-                r[&quot;late&quot;] += 1
-            if job.failed:
-                r[&quot;failed&quot;] += 1
</del><ins>+        for workType, queued, assigned, late, failed in jobs:
+            results[workType].update({
+                &quot;queued&quot;: queued,
+                &quot;assigned&quot;: assigned,
+                &quot;late&quot;: late,
+                &quot;failed&quot;: failed,
+            })
</ins><span class="cx"> 
</span><span class="cx">         returnValue(results)
</span><span class="cx"> 
</span><span class="cx"> 
</span><del>-JobDescriptor = namedtuple(&quot;JobDescriptor&quot;, [&quot;jobID&quot;, &quot;weight&quot;, &quot;type&quot;])
</del><ins>+JobDescriptor = namedtuple(&quot;JobDescriptor&quot;, [&quot;jobID&quot;, &quot;weight&quot;, &quot;workType&quot;])
</ins><span class="cx"> 
</span><span class="cx"> class JobDescriptorArg(Argument):
</span><span class="cx">     &quot;&quot;&quot;
</span></span></pre></div>
<a id="twexttrunktwextenterprisejobsqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobs/queue.py (15095 => 15096)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobs/queue.py        2015-09-03 20:40:30 UTC (rev 15095)
+++ twext/trunk/twext/enterprise/jobs/queue.py        2015-09-03 20:49:06 UTC (rev 15096)
</span><span class="lines">@@ -24,7 +24,7 @@
</span><span class="cx"> from twext.python.log import Logger
</span><span class="cx"> 
</span><span class="cx"> from twisted.application.service import MultiService
</span><del>-from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, passthru, succeed
</del><ins>+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, succeed
</ins><span class="cx"> from twisted.internet.error import AlreadyCalled, AlreadyCancelled
</span><span class="cx"> from twisted.internet.protocol import Factory
</span><span class="cx"> from twisted.protocols.amp import AMP, Command
</span><span class="lines">@@ -181,9 +181,12 @@
</span><span class="cx">         @return: a worker connection with the lowest current load.
</span><span class="cx">         @rtype: L{ConnectionFromWorker}
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        return sorted(self.workers[:], key=lambda w: w.currentLoad)[0]
</del><span class="cx"> 
</span><ins>+        # Stable sort based on worker load
+        self.workers.sort(key=lambda w: w.currentLoad)
+        return self.workers[0]
</ins><span class="cx"> 
</span><ins>+
</ins><span class="cx">     @inlineCallbacks
</span><span class="cx">     def performJob(self, job):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="lines">@@ -203,8 +206,8 @@
</span><span class="cx">         try:
</span><span class="cx">             result = yield preferredWorker.performJob(job)
</span><span class="cx">         finally:
</span><del>-            self.completed[job.type] += 1
-            self.timing[job.type] += time.time() - t
</del><ins>+            self.completed[job.workType] += 1
+            self.timing[job.workType] += time.time() - t
</ins><span class="cx">         returnValue(result)
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="lines">@@ -352,7 +355,7 @@
</span><span class="cx">         process has instructed this worker to do it; so, look up the data in
</span><span class="cx">         the row, and do it.
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        d = JobItem.ultimatelyPerform(self.transactionFactory, job.jobID)
</del><ins>+        d = JobItem.ultimatelyPerform(self.transactionFactory, job)
</ins><span class="cx">         d.addCallback(lambda ignored: {})
</span><span class="cx">         return d
</span><span class="cx"> 
</span><span class="lines">@@ -452,6 +455,7 @@
</span><span class="cx">     implements(IQueuer)
</span><span class="cx"> 
</span><span class="cx">     queuePollInterval = 0.1             # How often to poll for new work
</span><ins>+    queueOverduePollInterval = 60.0     # How often to poll for overdue work
</ins><span class="cx">     queueOverdueTimeout = 5.0 * 60.0    # How long before assigned work is possibly overdue
</span><span class="cx">     queuePollingBackoff = ((60.0, 60.0), (5.0, 1.0),)   # Polling backoffs
</span><span class="cx"> 
</span><span class="lines">@@ -478,6 +482,7 @@
</span><span class="cx">         self._timeOfLastWork = time.time()
</span><span class="cx">         self._actualPollInterval = self.queuePollInterval
</span><span class="cx">         self._inWorkCheck = False
</span><ins>+        self._inOverdueCheck = False
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     def enable(self):
</span><span class="lines">@@ -576,30 +581,6 @@
</span><span class="cx">                 if nextJob is None:
</span><span class="cx">                     break
</span><span class="cx"> 
</span><del>-                if nextJob.assigned is not None:
-                    if nextJob.overdue &gt; nowTime:
-                        # If it is now assigned but not overdue, ignore as this may have
-                        # been returned after another txn just assigned it
-                        continue
-                    else:
-                        # It is overdue - check to see whether the work item is currently locked - if so no
-                        # need to re-assign
-                        running = yield nextJob.isRunning()
-                        if running:
-                            # Change the overdue to further in the future whilst we wait for
-                            # the running job to complete
-                            yield nextJob.bumpOverdue(self.queueOverdueTimeout)
-                            log.debug(
-                                &quot;workCheck: bumped overdue timeout on jobid={jobid}&quot;,
-                                jobid=nextJob.jobID,
-                            )
-                            continue
-                        else:
-                            log.debug(
-                                &quot;workCheck: overdue re-assignment for jobid={jobid}&quot;,
-                                jobid=nextJob.jobID,
-                            )
-
</del><span class="cx">                 # Always assign as a new job even when it is an orphan
</span><span class="cx">                 yield nextJob.assign(nowTime, self.queueOverdueTimeout)
</span><span class="cx">                 self._timeOfLastWork = time.time()
</span><span class="lines">@@ -664,46 +645,169 @@
</span><span class="cx">         if loopCounter:
</span><span class="cx">             log.debug(&quot;workCheck: processed {ctr} jobs in one loop&quot;, ctr=loopCounter)
</span><span class="cx"> 
</span><del>-    _currentWorkDeferred = None
</del><span class="cx">     _workCheckCall = None
</span><span class="cx"> 
</span><ins>+    @inlineCallbacks
</ins><span class="cx">     def _workCheckLoop(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        While the service is running, keep checking for any overdue / lost work
-        items and re-submit them to the cluster for processing.
</del><ins>+        While the service is running, keep check for work items and execute
+        them. Use a back-off strategy for polling to avoid using too much CPU
+        when there is not a lot to do.
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         self._workCheckCall = None
</span><span class="cx"> 
</span><span class="cx">         if not self.running:
</span><del>-            return
</del><ins>+            returnValue(None)
</ins><span class="cx"> 
</span><del>-        @passthru(
-            self._workCheck().addErrback(lambda result: log.error(&quot;_workCheckLoop: {exc}&quot;, exc=result)).addCallback
</del><ins>+        try:
+            yield self._workCheck()
+        except Exception as e:
+            log.error(&quot;_workCheckLoop: {exc}&quot;, exc=e)
+
+        if not self.running:
+            returnValue(None)
+
+        # Check for adjustment to poll interval - if the workCheck is idle for certain
+        # periods of time we will gradually increase the poll interval to avoid consuming
+        # excessive power when there is nothing to do
+        interval = self.queuePollInterval
+        idle = time.time() - self._timeOfLastWork
+        for threshold, poll in self.queuePollingBackoff:
+            if idle &gt; threshold:
+                interval = poll
+                break
+        if self._actualPollInterval != interval:
+            log.debug(&quot;workCheckLoop: interval set to {interval}s&quot;, interval=interval)
+        self._actualPollInterval = interval
+        self._workCheckCall = self.reactor.callLater(
+            self._actualPollInterval, self._workCheckLoop
</ins><span class="cx">         )
</span><del>-        def scheduleNext(result):
-            self._currentWorkDeferred = None
-            if not self.running:
-                return
</del><span class="cx"> 
</span><del>-            # Check for adjustment to poll interval - if the workCheck is idle for certain
-            # periods of time we will gradually increase the poll interval to avoid consuming
-            # excessive power when there is nothing to do
-            interval = self.queuePollInterval
-            idle = time.time() - self._timeOfLastWork
-            for threshold, poll in self.queuePollingBackoff:
-                if idle &gt; threshold:
-                    interval = poll
</del><ins>+
+    @inlineCallbacks
+    def _overdueCheck(self):
+        &quot;&quot;&quot;
+        Every controller will periodically check for any overdue work and unassign that
+        work so that it gets execute during the next regular work check.
+        &quot;&quot;&quot;
+
+        loopCounter = 0
+        while True:
+            if not self.running or self.disableWorkProcessing:
+                returnValue(None)
+
+            # Determine what the timestamp cutoff
+            # TODO: here is where we should iterate over the unlocked items
+            # that are due, ordered by priority, notBefore etc
+            nowTime = datetime.utcfromtimestamp(self.reactor.seconds())
+
+            self._inOverdueCheck = True
+            txn = overdueJob = None
+            try:
+                txn = self.transactionFactory(label=&quot;jobqueue.overdueCheck&quot;)
+                overdueJobs = yield JobItem.overduejobs(txn, nowTime, limit=1)
+                if overdueJobs:
+                    overdueJob = overdueJobs[0]
+                else:
</ins><span class="cx">                     break
</span><del>-            if self._actualPollInterval != interval:
-                log.debug(&quot;workCheckLoop: interval set to {interval}s&quot;, interval=interval)
-            self._actualPollInterval = interval
-            self._workCheckCall = self.reactor.callLater(
-                self._actualPollInterval, self._workCheckLoop
-            )
</del><span class="cx"> 
</span><del>-        self._currentWorkDeferred = scheduleNext
</del><ins>+                # It is overdue - check to see whether the work item is currently locked - if so no
+                # need to re-assign
+                running = yield overdueJob.isRunning()
+                if running:
+                    # Change the overdue to further in the future whilst we wait for
+                    # the running job to complete
+                    yield overdueJob.bumpOverdue(self.queueOverdueTimeout)
+                    log.debug(
+                        &quot;overdueCheck: bumped overdue timeout on jobid={jobid}&quot;,
+                        jobid=overdueJob.jobID,
+                    )
+                else:
+                    # Unassign the job so it is picked up by the next L{_workCheck}
+                    yield overdueJob.unassign()
+                    log.debug(
+                        &quot;overdueCheck: overdue unassigned for jobid={jobid}&quot;,
+                        jobid=overdueJob.jobID,
+                    )
+                loopCounter += 1
</ins><span class="cx"> 
</span><ins>+            except Exception as e:
+                log.error(
+                    &quot;Failed to process overdue job: {jobID}, {exc}&quot;,
+                    jobID=overdueJob.jobID if overdueJob else &quot;?&quot;,
+                    exc=e,
+                )
+                if txn is not None:
+                    yield txn.abort()
+                    txn = None
</ins><span class="cx"> 
</span><ins>+                # If we can identify the problem job, try and set it to failed so that it
+                # won't block other jobs behind it (it will be picked again when the failure
+                # interval is exceeded - but that has a back off so a permanently stuck item
+                # should fade away. We probably want to have some additional logic to simply
+                # remove something that is permanently failing.
+                if overdueJob is not None:
+                    txn = self.transactionFactory(label=&quot;jobqueue.overdueCheck.failed&quot;)
+                    try:
+                        failedJob = yield JobItem.load(txn, overdueJob.jobID)
+                        yield failedJob.failedToRun()
+                    except Exception as e:
+                        # Could not mark as failed - break out of the overdue job loop
+                        log.error(
+                            &quot;Failed to mark failed overdue job:{}, {exc}&quot;,
+                            jobID=overdueJob.jobID,
+                            exc=e,
+                        )
+                        yield txn.abort()
+                        txn = None
+                        overdueJob = None
+                        break
+                    else:
+                        # Marked the problem one as failed, so keep going and get the next overdue job
+                        log.error(&quot;Marked failed overdue job: {jobID}&quot;, jobID=overdueJob.jobID)
+                        yield txn.commit()
+                        txn = None
+                        overdueJob = None
+                else:
+                    # Cannot mark anything as failed - break out of overdue job loop
+                    log.error(&quot;Cannot mark failed overdue job&quot;)
+                    break
+            finally:
+                if txn is not None:
+                    yield txn.commit()
+                    txn = None
+                self._inOverdueCheck = False
+
+        if loopCounter:
+            # Make sure the regular work check loop runs immediately if we processed any overdue items
+            yield self.enqueuedJob()
+            log.debug(&quot;overdueCheck: processed {ctr} jobs in one loop&quot;, ctr=loopCounter)
+
+    _overdueCheckCall = None
+
+    @inlineCallbacks
+    def _overdueCheckLoop(self):
+        &quot;&quot;&quot;
+        While the service is running, keep checking for any overdue items.
+        &quot;&quot;&quot;
+        self._overdueCheckCall = None
+
+        if not self.running:
+            returnValue(None)
+
+        try:
+            yield self._overdueCheck()
+        except Exception as e:
+            log.error(&quot;_overdueCheckLoop: {exc}&quot;, exc=e)
+
+        if not self.running:
+            returnValue(None)
+
+        self._overdueCheckCall = self.reactor.callLater(
+            self.queueOverduePollInterval, self._overdueCheckLoop
+        )
+
+
</ins><span class="cx">     def enqueuedJob(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Reschedule the work check loop to run right now. This should be called in response to &quot;external&quot; activity that
</span><span class="lines">@@ -733,6 +837,7 @@
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         super(ControllerQueue, self).startService()
</span><span class="cx">         self._workCheckLoop()
</span><ins>+        self._overdueCheckLoop()
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @inlineCallbacks
</span><span class="lines">@@ -747,13 +852,13 @@
</span><span class="cx">             self._workCheckCall.cancel()
</span><span class="cx">             self._workCheckCall = None
</span><span class="cx"> 
</span><del>-        if self._currentWorkDeferred is not None:
-            self._currentWorkDeferred.cancel()
-            self._currentWorkDeferred = None
</del><ins>+        if self._overdueCheckCall is not None:
+            self._overdueCheckCall.cancel()
+            self._overdueCheckCall = None
</ins><span class="cx"> 
</span><span class="cx">         # Wait for any active work check to finish (but no more than 1 minute)
</span><span class="cx">         start = time.time()
</span><del>-        while self._inWorkCheck:
</del><ins>+        while self._inWorkCheck and self._inOverdueCheck:
</ins><span class="cx">             d = Deferred()
</span><span class="cx">             self.reactor.callLater(0.5, lambda : d.callback(None))
</span><span class="cx">             yield d
</span><span class="lines">@@ -780,7 +885,7 @@
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Perform the given job right now.
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        return JobItem.ultimatelyPerform(self.txnFactory, job.jobID)
</del><ins>+        return JobItem.ultimatelyPerform(self.txnFactory, job)
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx"> 
</span></span></pre></div>
<a id="twexttrunktwextenterprisejobstesttest_jobspy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobs/test/test_jobs.py (15095 => 15096)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobs/test/test_jobs.py        2015-09-03 20:40:30 UTC (rev 15095)
+++ twext/trunk/twext/enterprise/jobs/test/test_jobs.py        2015-09-03 20:49:06 UTC (rev 15096)
</span><span class="lines">@@ -573,11 +573,11 @@
</span><span class="cx">         self.assertTrue(job is None)
</span><span class="cx">         self.assertTrue(work is None)
</span><span class="cx"> 
</span><del>-        # Assigned job with past notBefore, but overdue is returned
</del><ins>+        # Assigned job with past notBefore, but overdue is not returned
</ins><span class="cx">         yield inTransaction(dbpool.connection, assignJob, when=now + datetime.timedelta(days=-1))
</span><span class="cx">         job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_HIGH)
</span><del>-        self.assertTrue(job is not None)
-        self.assertTrue(work.a == 2)
</del><ins>+        self.assertTrue(job is None)
+        self.assertTrue(work is None)
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @inlineCallbacks
</span><span class="lines">@@ -1481,6 +1481,8 @@
</span><span class="cx">             return _oldBumped(self, 100)
</span><span class="cx">         self.patch(JobItem, &quot;bumpOverdue&quot;, _newBump)
</span><span class="cx"> 
</span><ins>+        self.patch(ControllerQueue, &quot;queueOverduePollInterval&quot;, 0.5)
+
</ins><span class="cx">         DummyWorkPauseItem.workStarted = Deferred()
</span><span class="cx">         DummyWorkPauseItem.unpauseWork = Deferred()
</span><span class="cx"> 
</span><span class="lines">@@ -1567,6 +1569,8 @@
</span><span class="cx">         DummyWorkPauseItem.workStarted = Deferred()
</span><span class="cx">         self.patch(DummyWorkPauseItem, &quot;doWork&quot;, _newDoWorkRaise)
</span><span class="cx"> 
</span><ins>+        self.patch(ControllerQueue, &quot;queueOverduePollInterval&quot;, 0.5)
+
</ins><span class="cx">         @transactionally(self.store.newTransaction)
</span><span class="cx">         def _enqueue(txn):
</span><span class="cx">             return txn.enqueue(
</span><span class="lines">@@ -1635,6 +1639,8 @@
</span><span class="cx">         DummyWorkPauseItem.workStarted = Deferred()
</span><span class="cx">         self.patch(DummyWorkPauseItem, &quot;doWork&quot;, _newDoWorkRaise)
</span><span class="cx"> 
</span><ins>+        self.patch(ControllerQueue, &quot;queueOverduePollInterval&quot;, 0.5)
+
</ins><span class="cx">         @transactionally(self.store.newTransaction)
</span><span class="cx">         def _enqueue(txn):
</span><span class="cx">             return txn.enqueue(
</span></span></pre>
</div>
</div>

</body>
</html>