<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>[13640] twext/trunk/twext/enterprise</title>
</head>
<body>

<style type="text/css"><!--
#msg dl.meta { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; }
#msg dl.meta dt { float: left; width: 6em; font-weight: bold; }
#msg dt:after { content:':';}
#msg dl, #msg dt, #msg ul, #msg li, #header, #footer, #logmsg { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt;  }
#msg dl a { font-weight: bold}
#msg dl a:link    { color:#fc3; }
#msg dl a:active  { color:#ff0; }
#msg dl a:visited { color:#cc6; }
h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: bold; }
#msg pre { overflow: auto; background: #ffc; border: 1px #fa0 solid; padding: 6px; }
#logmsg { background: #ffc; border: 1px #fa0 solid; padding: 1em 1em 0 1em; }
#logmsg p, #logmsg pre, #logmsg blockquote { margin: 0 0 1em 0; }
#logmsg p, #logmsg li, #logmsg dt, #logmsg dd { line-height: 14pt; }
#logmsg h1, #logmsg h2, #logmsg h3, #logmsg h4, #logmsg h5, #logmsg h6 { margin: .5em 0; }
#logmsg h1:first-child, #logmsg h2:first-child, #logmsg h3:first-child, #logmsg h4:first-child, #logmsg h5:first-child, #logmsg h6:first-child { margin-top: 0; }
#logmsg ul, #logmsg ol { padding: 0; list-style-position: inside; margin: 0 0 0 1em; }
#logmsg ul { text-indent: -1em; padding-left: 1em; }#logmsg ol { text-indent: -1.5em; padding-left: 1.5em; }
#logmsg > ul, #logmsg > ol { margin: 0 0 1em 0; }
#logmsg pre { background: #eee; padding: 1em; }
#logmsg blockquote { border: 1px solid #fa0; border-left-width: 10px; padding: 1em 1em 0 1em; background: white;}
#logmsg dl { margin: 0; }
#logmsg dt { font-weight: bold; }
#logmsg dd { margin: 0; padding: 0 0 0.5em 0; }
#logmsg dd:before { content:'\00bb';}
#logmsg table { border-spacing: 0px; border-collapse: collapse; border-top: 4px solid #fa0; border-bottom: 1px solid #fa0; background: #fff; }
#logmsg table th { text-align: left; font-weight: normal; padding: 0.2em 0.5em; border-top: 1px dotted #fa0; }
#logmsg table td { text-align: right; border-top: 1px dotted #fa0; padding: 0.2em 0.5em; }
#logmsg table thead th { text-align: center; border-bottom: 1px solid #fa0; }
#logmsg table th.Corner { text-align: left; }
#logmsg hr { border: none 0; border-top: 2px dashed #fa0; height: 1px; }
#header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: 6px; }
#patch { width: 100%; }
#patch h4 {font-family: verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;}
#patch .propset h4, #patch .binary h4 {margin:0;}
#patch pre {padding:0;line-height:1.2em;margin:0;}
#patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;}
#patch .propset .diff, #patch .binary .diff  {padding:10px 0;}
#patch span {display:block;padding:0 10px;}
#patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, #patch .copfile {border:1px solid #ccc;margin:10px 0;}
#patch ins {background:#dfd;text-decoration:none;display:block;padding:0 10px;}
#patch del {background:#fdd;text-decoration:none;display:block;padding:0 10px;}
#patch .lines, .info {color:#888;background:#fff;}
--></style>
<div id="msg">
<dl class="meta">
<dt>Revision</dt> <dd><a href="http://trac.calendarserver.org//changeset/13640">13640</a></dd>
<dt>Author</dt> <dd>cdaboo@apple.com</dd>
<dt>Date</dt> <dd>2014-06-16 13:21:42 -0700 (Mon, 16 Jun 2014)</dd>
</dl>

<h3>Log Message</h3>
<pre>Revised jobqueue locking to eliminate contention, deadlocks and NamedLock use.</pre>

<h3>Modified Paths</h3>
<ul>
<li><a href="#twexttrunktwextenterprisedalrecordpy">twext/trunk/twext/enterprise/dal/record.py</a></li>
<li><a href="#twexttrunktwextenterprisedaltesttest_recordpy">twext/trunk/twext/enterprise/dal/test/test_record.py</a></li>
<li><a href="#twexttrunktwextenterprisefixturespy">twext/trunk/twext/enterprise/fixtures.py</a></li>
<li><a href="#twexttrunktwextenterprisejobqueuepy">twext/trunk/twext/enterprise/jobqueue.py</a></li>
<li><a href="#twexttrunktwextenterprisetesttest_jobqueuepy">twext/trunk/twext/enterprise/test/test_jobqueue.py</a></li>
</ul>

</div>
<div id="patch">
<h3>Diff</h3>
<a id="twexttrunktwextenterprisedalrecordpy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/dal/record.py (13639 => 13640)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/dal/record.py        2014-06-16 20:09:15 UTC (rev 13639)
+++ twext/trunk/twext/enterprise/dal/record.py        2014-06-16 20:21:42 UTC (rev 13640)
</span><span class="lines">@@ -31,7 +31,7 @@
</span><span class="cx"> 
</span><span class="cx"> from twisted.internet.defer import inlineCallbacks, returnValue
</span><span class="cx"> from twext.enterprise.dal.syntax import (
</span><del>-    Select, Tuple, Constant, ColumnSyntax, Insert, Update, Delete
</del><ins>+    Select, Tuple, Constant, ColumnSyntax, Insert, Update, Delete, SavepointAction
</ins><span class="cx"> )
</span><span class="cx"> from twext.enterprise.util import parseSQLTimestamp
</span><span class="cx"> # from twext.enterprise.dal.syntax import ExpressionSyntax
</span><span class="lines">@@ -330,6 +330,65 @@
</span><span class="cx">         self.__dict__.update(kw)
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+    @inlineCallbacks
+    def lock(self, where=None):
+        &quot;&quot;&quot;
+        Lock with a select for update.
+
+        @param where: SQL expression used to match the rows to lock, by default this is just an expression
+            that matches the primary key of this L{Record}, but it can be used to lock multiple L{Records}
+            matching the expression in one go. If it is an L{str}, then all rows will be matched.
+        @type where: L{SQLExpression} or L{None}
+        @return: a L{Deferred} that fires when the lock has been acquired.
+        &quot;&quot;&quot;
+        if where is None:
+            where = self._primaryKeyComparison(self._primaryKeyValue())
+        elif isinstance(where, str):
+            where = None
+        yield Select(
+            list(self.table),
+            From=self.table,
+            Where=where,
+            ForUpdate=True,
+        ).on(self.transaction)
+
+
+    @inlineCallbacks
+    def trylock(self, where=None):
+        &quot;&quot;&quot;
+        Try to lock with a select for update no wait. If it fails, rollback to
+        a savepoint and return L{False}, else return L{True}.
+
+        @param where: SQL expression used to match the rows to lock, by default this is just an expression
+            that matches the primary key of this L{Record}, but it can be used to lock multiple L{Records}
+            matching the expression in one go. If it is an L{str}, then all rows will be matched.
+        @type where: L{SQLExpression} or L{None}
+        @return: a L{Deferred} that fires when the updates have been sent to
+            the database.
+        &quot;&quot;&quot;
+
+        if where is None:
+            where = self._primaryKeyComparison(self._primaryKeyValue())
+        elif isinstance(where, str):
+            where = None
+        savepoint = SavepointAction(&quot;Record_trylock_{}&quot;.format(self.__class__.__name__))
+        yield savepoint.acquire(self.transaction)
+        try:
+            yield Select(
+                list(self.table),
+                From=self.table,
+                Where=where,
+                ForUpdate=True,
+                NoWait=True,
+            ).on(self.transaction)
+        except:
+            yield savepoint.rollback(self.transaction)
+            returnValue(False)
+        else:
+            yield savepoint.release(self.transaction)
+            returnValue(True)
+
+
</ins><span class="cx">     @classmethod
</span><span class="cx">     def pop(cls, transaction, *primaryKey):
</span><span class="cx">         &quot;&quot;&quot;
</span></span></pre></div>
<a id="twexttrunktwextenterprisedaltesttest_recordpy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/dal/test/test_record.py (13639 => 13640)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/dal/test/test_record.py        2014-06-16 20:09:15 UTC (rev 13639)
+++ twext/trunk/twext/enterprise/dal/test/test_record.py        2014-06-16 20:21:42 UTC (rev 13640)
</span><span class="lines">@@ -415,3 +415,47 @@
</span><span class="cx">             Record.namingConvention(u&quot;LIKE_THIS_ID&quot;),
</span><span class="cx">             &quot;likeThisID&quot;
</span><span class="cx">         )
</span><ins>+
+
+    @inlineCallbacks
+    def test_lock(self):
+        &quot;&quot;&quot;
+        A L{Record} may be locked, with L{Record.lock}.
+        &quot;&quot;&quot;
+        txn = self.pool.connection()
+        for beta, gamma in [
+            (123, u&quot;one&quot;),
+            (234, u&quot;two&quot;),
+            (345, u&quot;three&quot;),
+            (356, u&quot;three&quot;),
+            (456, u&quot;four&quot;),
+        ]:
+            yield txn.execSQL(
+                &quot;insert into ALPHA values (:1, :2)&quot;, [beta, gamma]
+            )
+
+        rec = yield TestRecord.load(txn, 234)
+        yield rec.lock()
+        self.assertEqual(rec.gamma, u'two')
+
+
+    @inlineCallbacks
+    def test_trylock(self):
+        &quot;&quot;&quot;
+        A L{Record} may be locked, with L{Record.trylock}.
+        &quot;&quot;&quot;
+        txn = self.pool.connection()
+        for beta, gamma in [
+            (123, u&quot;one&quot;),
+            (234, u&quot;two&quot;),
+            (345, u&quot;three&quot;),
+            (356, u&quot;three&quot;),
+            (456, u&quot;four&quot;),
+        ]:
+            yield txn.execSQL(
+                &quot;insert into ALPHA values (:1, :2)&quot;, [beta, gamma]
+            )
+
+        rec = yield TestRecord.load(txn, 234)
+        result = yield rec.trylock()
+        self.assertTrue(result)
</ins></span></pre></div>
<a id="twexttrunktwextenterprisefixturespy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/fixtures.py (13639 => 13640)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/fixtures.py        2014-06-16 20:09:15 UTC (rev 13639)
+++ twext/trunk/twext/enterprise/fixtures.py        2014-06-16 20:21:42 UTC (rev 13640)
</span><span class="lines">@@ -59,7 +59,7 @@
</span><span class="cx">     seqs = {}
</span><span class="cx"> 
</span><span class="cx">     def connectionFactory(label=testCase.id()):
</span><del>-        conn = sqlite3.connect(sqlitename)
</del><ins>+        conn = sqlite3.connect(sqlitename, isolation_level=None)
</ins><span class="cx"> 
</span><span class="cx">         def nextval(seq):
</span><span class="cx">             result = seqs[seq] = seqs.get(seq, 0) + 1
</span><span class="lines">@@ -326,7 +326,7 @@
</span><span class="cx">     tmpdb = test.mktemp()
</span><span class="cx"> 
</span><span class="cx">     def connect():
</span><del>-        return sqlite3.connect(tmpdb)
</del><ins>+        return sqlite3.connect(tmpdb, isolation_level=None)
</ins><span class="cx"> 
</span><span class="cx">     return connect
</span><span class="cx"> 
</span></span></pre></div>
<a id="twexttrunktwextenterprisejobqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobqueue.py (13639 => 13640)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobqueue.py        2014-06-16 20:09:15 UTC (rev 13639)
+++ twext/trunk/twext/enterprise/jobqueue.py        2014-06-16 20:21:42 UTC (rev 13640)
</span><span class="lines">@@ -136,7 +136,6 @@
</span><span class="cx"> from twisted.internet.endpoints import TCP4ServerEndpoint
</span><span class="cx"> from twext.enterprise.ienterprise import IQueuer
</span><span class="cx"> from zope.interface.interface import Interface
</span><del>-from twext.enterprise.locking import NamedLock
</del><span class="cx"> 
</span><span class="cx"> import collections
</span><span class="cx"> import time
</span><span class="lines">@@ -219,6 +218,7 @@
</span><span class="cx">     JobTable.addColumn(&quot;WEIGHT&quot;, SQLType(&quot;integer&quot;, 0), default=0)
</span><span class="cx">     JobTable.addColumn(&quot;NOT_BEFORE&quot;, SQLType(&quot;timestamp&quot;, None), notNull=True)
</span><span class="cx">     JobTable.addColumn(&quot;ASSIGNED&quot;, SQLType(&quot;timestamp&quot;, None), default=None)
</span><ins>+    JobTable.addColumn(&quot;OVERDUE&quot;, SQLType(&quot;timestamp&quot;, None), default=None)
</ins><span class="cx">     JobTable.addColumn(&quot;FAILED&quot;, SQLType(&quot;integer&quot;, 0), default=0)
</span><span class="cx"> 
</span><span class="cx">     return inSchema
</span><span class="lines">@@ -309,43 +309,97 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+class JobRunningError(Exception):
+    &quot;&quot;&quot;
+    A job is already running.
+    &quot;&quot;&quot;
+    pass
+
+
+
</ins><span class="cx"> class JobItem(Record, fromTable(JobInfoSchema.JOB)):
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     An item in the job table. This is typically not directly used by code
</span><span class="cx">     creating work items, but rather is used for internal book keeping of jobs
</span><span class="cx">     associated with work items.
</span><ins>+
+    The JOB table has some important columns that determine how a job is being scheduled:
+
+    NOT_BEFORE - this is a timestamp indicating when the job is expected to run. It will not
+    run before this time, but may run quite some time after (if the service is busy).
+
+    ASSIGNED - this is a timestamp that is initially NULL but set when the job processing loop
+    assigns the job to a child process to be executed. Thus, if the value is not NULL, then the
+    job is (probably) being executed. The child process is supposed to delete the L{JobItem}
+    when it is done, however if the child dies without executing the job, then the job
+    processing loop needs to detect it.
+
+    OVERDUE - this is a timestamp initially set when an L{JobItem} is assigned. It represents
+    a point in the future when the job is expected to be finished. The job processing loop skips
+    jobs that have a non-NULL ASSIGNED value and whose OVERDUE value has not been passed. If
+    OVERDUE is in the past, then the job processing loop checks to see if the job is still
+    running - which is determined by whether a row lock exists on the work item (see
+    L{isRunning}. If the job is still running then OVERDUE is bumped up to a new point in the
+    future, if it is not still running the job is marked as failed - which will reschedule it.
+
+    FAILED - a count of the number of times a job has failed or had its overdue count bumped.
+
+    The above behavior depends on some important locking behavior: when an L{JobItem} is run,
+    it locks the L{WorkItem} row corresponding to the job (it may lock other associated
+    rows - e.g., other L{WorkItem}'s in the same group). It does not lock the L{JobItem}
+    row corresponding to the job because the job processing loop may need to update the
+    OVERDUE value of that row if the work takes a long time to complete.
</ins><span class="cx">     &quot;&quot;&quot;
</span><span class="cx"> 
</span><span class="cx">     _workTypes = None
</span><span class="cx">     _workTypeMap = None
</span><span class="cx"> 
</span><ins>+    failureRescheduleInterval = 60  # When a job fails, reschedule it this number of seconds in the future
+    lockRescheduleInterval = 5      # When a job is locked, reschedule it this number of seconds in the future
+
</ins><span class="cx">     def descriptor(self):
</span><span class="cx">         return JobDescriptor(self.jobID, self.weight, self.workType)
</span><span class="cx"> 
</span><span class="cx"> 
</span><del>-    def assign(self, now):
</del><ins>+    def assign(self, when, overdue):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Mark this job as assigned to a worker by setting the assigned column to the current,
</span><del>-        or provided, timestamp.
</del><ins>+        or provided, timestamp. Also set the overdue value to help determine if a job is orphaned.
</ins><span class="cx"> 
</span><del>-        @param now: current timestamp
-        @type now: L{datetime.datetime}
-        @param when: explicitly set the assigned time - typically only used in tests
-        @type when: L{datetime.datetime} or L{None}
</del><ins>+        @param when: current timestamp
+        @type when: L{datetime.datetime}
+        @param overdue: number of seconds after assignment that the job will be considered overdue
+        @type overdue: L{int}
</ins><span class="cx">         &quot;&quot;&quot;
</span><del>-        return self.update(assigned=now)
</del><ins>+        return self.update(assigned=when, overdue=when + timedelta(seconds=overdue))
</ins><span class="cx"> 
</span><span class="cx"> 
</span><del>-    def failedToRun(self):
</del><ins>+    def bumpOverdue(self, bump):
</ins><span class="cx">         &quot;&quot;&quot;
</span><ins>+        Increment the overdue value by the specified number of seconds. Used when an overdue job
+        is still running in a child process but the job processing loop has detected it as overdue.
+
+        @param bump: number of seconds to increment overdue by
+        @type bump: L{int}
+        &quot;&quot;&quot;
+        return self.update(overdue=self.overdue + timedelta(seconds=bump))
+
+
+    def failedToRun(self, delay=None):
+        &quot;&quot;&quot;
</ins><span class="cx">         The attempt to run the job failed. Leave it in the queue, but mark it
</span><span class="cx">         as unassigned, bump the failure count and set to run at some point in
</span><span class="cx">         the future.
</span><ins>+
+        @param delay: the number of seconds in the future at which to reschedule the
+            next execution of the job. If L{None} use the default class property value.
+        @type delay: L{int}
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         return self.update(
</span><span class="cx">             assigned=None,
</span><ins>+            overdue=None,
</ins><span class="cx">             failed=self.failed + 1,
</span><del>-            notBefore=datetime.utcnow() + timedelta(seconds=60)
</del><ins>+            notBefore=datetime.utcnow() + timedelta(seconds=self.failureRescheduleInterval if delay is None else delay)
</ins><span class="cx">         )
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="lines">@@ -354,15 +408,15 @@
</span><span class="cx">     def ultimatelyPerform(cls, txnFactory, jobID):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Eventually, after routing the job to the appropriate place, somebody
</span><del>-        actually has to I{do} it.
</del><ins>+        actually has to I{do} it. This method basically calls L{JobItem.run}
+        but it does a bunch of &quot;booking&quot; to track the transaction and log failures
+        and timing information.
</ins><span class="cx"> 
</span><span class="cx">         @param txnFactory: a 0- or 1-argument callable that creates an
</span><span class="cx">             L{IAsyncTransaction}
</span><span class="cx">         @type txnFactory: L{callable}
</span><del>-
</del><span class="cx">         @param jobID: the ID of the job to be performed
</span><span class="cx">         @type jobID: L{int}
</span><del>-
</del><span class="cx">         @return: a L{Deferred} which fires with C{None} when the job has been
</span><span class="cx">             performed, or fails if the job can't be performed.
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="lines">@@ -396,8 +450,9 @@
</span><span class="cx">                 tm=_tm(),
</span><span class="cx">             )
</span><span class="cx"> 
</span><del>-        except JobFailedError:
</del><ins>+        except (JobFailedError, JobRunningError) as e:
</ins><span class="cx">             # Job failed: abort with cleanup, but pretend this method succeeded
</span><ins>+            delay = job.lockRescheduleInterval if isinstance(e, JobRunningError) else job.failureRescheduleInterval
</ins><span class="cx">             def _cleanUp():
</span><span class="cx">                 @inlineCallbacks
</span><span class="cx">                 def _cleanUp2(txn2):
</span><span class="lines">@@ -408,16 +463,17 @@
</span><span class="cx">                         count=job.failed + 1,
</span><span class="cx">                         tm=_tm(),
</span><span class="cx">                     )
</span><del>-                    yield job.failedToRun()
</del><ins>+                    yield job.failedToRun(delay=delay)
</ins><span class="cx">                 return inTransaction(txnFactory, _cleanUp2, &quot;ultimatelyPerform._cleanUp&quot;)
</span><del>-            txn.postAbort(_cleanUp)
-            yield txn.abort()
</del><span class="cx">             log.debug(
</span><del>-                &quot;JobItem: {jobid} failed {work} t={tm}&quot;,
</del><ins>+                &quot;JobItem: {jobid} {desc} {work} t={tm}&quot;,
</ins><span class="cx">                 jobid=jobID,
</span><ins>+                desc=&quot;failed&quot; if isinstance(e, JobFailedError) else &quot;locked&quot;,
</ins><span class="cx">                 work=job.workType,
</span><span class="cx">                 tm=_tm(),
</span><span class="cx">             )
</span><ins>+            txn.postAbort(_cleanUp)
+            yield txn.abort()
</ins><span class="cx"> 
</span><span class="cx">         except:
</span><span class="cx">             f = Failure()
</span><span class="lines">@@ -445,27 +501,25 @@
</span><span class="cx"> 
</span><span class="cx">     @classmethod
</span><span class="cx">     @inlineCallbacks
</span><del>-    def nextjob(cls, txn, now, minPriority, overdue):
</del><ins>+    def nextjob(cls, txn, now, minPriority):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Find the next available job based on priority, also return any that are overdue. This
</span><del>-        method relies on there being a nextjob() SQL stored procedure to enable skipping over
-        items which are row locked to help avoid contention when multiple nodes are operating
-        on the job queue simultaneously.
</del><ins>+        method uses an SQL query to find the matching jobs, and sorts based on the NOT_BEFORE
+        value and priority..
</ins><span class="cx"> 
</span><span class="cx">         @param txn: the transaction to use
</span><span class="cx">         @type txn: L{IAsyncTransaction}
</span><del>-        @param now: current timestamp
</del><ins>+        @param now: current timestamp - needed for unit tests that might use their
+            own clock.
</ins><span class="cx">         @type now: L{datetime.datetime}
</span><span class="cx">         @param minPriority: lowest priority level to query for
</span><span class="cx">         @type minPriority: L{int}
</span><del>-        @param overdue: how long before an assigned item is considered overdue
-        @type overdue: L{datetime.datetime}
</del><span class="cx"> 
</span><span class="cx">         @return: the job record
</span><span class="cx">         @rtype: L{JobItem}
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx"> 
</span><del>-        jobs = yield cls.nextjobs(txn, now, minPriority, overdue, limit=1)
</del><ins>+        jobs = yield cls.nextjobs(txn, now, minPriority, limit=1)
</ins><span class="cx"> 
</span><span class="cx">         # Must only be one or zero
</span><span class="cx">         if jobs and len(jobs) &gt; 1:
</span><span class="lines">@@ -476,7 +530,7 @@
</span><span class="cx"> 
</span><span class="cx">     @classmethod
</span><span class="cx">     @inlineCallbacks
</span><del>-    def nextjobs(cls, txn, now, minPriority, overdue, limit=1):
</del><ins>+    def nextjobs(cls, txn, now, minPriority, limit=1):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Find the next available job based on priority, also return any that are overdue. This
</span><span class="cx">         method relies on there being a nextjob() SQL stored procedure to enable skipping over
</span><span class="lines">@@ -489,8 +543,6 @@
</span><span class="cx">         @type now: L{datetime.datetime}
</span><span class="cx">         @param minPriority: lowest priority level to query for
</span><span class="cx">         @type minPriority: L{int}
</span><del>-        @param overdue: how long before an assigned item is considered overdue
-        @type overdue: L{datetime.datetime}
</del><span class="cx">         @param limit: limit on number of jobs to return
</span><span class="cx">         @type limit: L{int}
</span><span class="cx"> 
</span><span class="lines">@@ -501,7 +553,7 @@
</span><span class="cx">         jobs = yield cls.query(
</span><span class="cx">             txn,
</span><span class="cx">             (cls.notBefore &lt;= now).And
</span><del>-            (((cls.priority &gt;= minPriority).And(cls.assigned == None)).Or(cls.assigned &lt; overdue)),
</del><ins>+            (((cls.priority &gt;= minPriority).And(cls.assigned == None)).Or(cls.overdue &lt; now)),
</ins><span class="cx">             order=(cls.assigned, cls.priority),
</span><span class="cx">             ascending=False,
</span><span class="cx">             forUpdate=True,
</span><span class="lines">@@ -516,17 +568,20 @@
</span><span class="cx">     def run(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Run this job item by finding the appropriate work item class and
</span><del>-        running that.
</del><ins>+        running that, with appropriate locking.
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx"> 
</span><del>-        # TODO: Move group into the JOB table.
-        # Do a select * where group = X for update nowait to lock
-        # all rows in the group - on exception raise JobFailed
-        # with a rollback to allow the job to be re-assigned to a later
-        # date.
</del><span class="cx">         workItem = yield self.workItem()
</span><span class="cx">         if workItem is not None:
</span><ins>+
+            # First we lock the L{WorkItem}
+            locked = yield workItem.runlock()
+            if not locked:
+                raise JobRunningError()
+
</ins><span class="cx">             try:
</span><ins>+                # Run in three steps, allowing for before/after hooks that sub-classes
+                # may override
</ins><span class="cx">                 okToGo = yield workItem.beforeWork()
</span><span class="cx">                 if okToGo:
</span><span class="cx">                     yield workItem.doWork()
</span><span class="lines">@@ -541,7 +596,8 @@
</span><span class="cx">                 raise JobFailedError(e)
</span><span class="cx"> 
</span><span class="cx">         try:
</span><del>-            # Once the work is done we delete ourselves
</del><ins>+            # Once the work is done we delete ourselves - NB this must be the last thing done
+            # to ensure the L{JobItem} row is not locked for very long.
</ins><span class="cx">             yield self.delete()
</span><span class="cx">         except NoSuchRecord:
</span><span class="cx">             # The record has already been removed
</span><span class="lines">@@ -549,7 +605,23 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @inlineCallbacks
</span><ins>+    def isRunning(self):
+        &quot;&quot;&quot;
+        Return L{True} if the job is currently running (its L{WorkItem} is locked).
+        &quot;&quot;&quot;
+        workItem = yield self.workItem()
+        if workItem is not None:
+            locked = yield workItem.trylock()
+            returnValue(not locked)
+        else:
+            returnValue(False)
+
+
+    @inlineCallbacks
</ins><span class="cx">     def workItem(self):
</span><ins>+        &quot;&quot;&quot;
+        Return the L{WorkItem} corresponding to this L{JobItem}.
+        &quot;&quot;&quot;
</ins><span class="cx">         workItemClass = self.workItemForType(self.workType)
</span><span class="cx">         workItems = yield workItemClass.loadForJob(
</span><span class="cx">             self.transaction, self.jobID
</span><span class="lines">@@ -559,6 +631,12 @@
</span><span class="cx"> 
</span><span class="cx">     @classmethod
</span><span class="cx">     def workItemForType(cls, workType):
</span><ins>+        &quot;&quot;&quot;
+        Return the class of the L{WorkItem} associated with this L{JobItem}.
+
+        @param workType: the name of the L{WorkItem}'s table
+        @type workType: L{str}
+        &quot;&quot;&quot;
</ins><span class="cx">         if cls._workTypeMap is None:
</span><span class="cx">             cls.workTypes()
</span><span class="cx">         return cls._workTypeMap[workType]
</span><span class="lines">@@ -567,6 +645,8 @@
</span><span class="cx">     @classmethod
</span><span class="cx">     def workTypes(cls):
</span><span class="cx">         &quot;&quot;&quot;
</span><ins>+        Map all L{WorkItem} sub-classes table names to the class type.
+
</ins><span class="cx">         @return: All of the work item types.
</span><span class="cx">         @rtype: iterable of L{WorkItem} subclasses
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="lines">@@ -651,7 +731,7 @@
</span><span class="cx"> 
</span><span class="cx"> class JobDescriptorArg(Argument):
</span><span class="cx">     &quot;&quot;&quot;
</span><del>-    Comma-separated.
</del><ins>+    Comma-separated representation of an L{JobDescriptor} for AMP-serialization.
</ins><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     def toString(self, inObject):
</span><span class="cx">         return &quot;,&quot;.join(map(str, inObject))
</span><span class="lines">@@ -826,6 +906,30 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     @inlineCallbacks
</span><ins>+    def runlock(self):
+        &quot;&quot;&quot;
+        Used to lock an L{WorkItem} before it is run. The L{WorkItem}'s row MUST be
+        locked via SELECT FOR UPDATE to ensure the job queue knows it is being worked
+        on so that it can detect when an overdue job needs to be restarted or not.
+
+        Note that the locking used here may cause deadlocks if not done in the correct
+        order. In particular anything that might cause locks across multiple LWorkItem}s,
+        such as group locks, multi-row locks, etc, MUST be done first.
+
+        @return: an L{Deferred} that fires with L{True} if the L{WorkItem} was locked,
+            L{False} if not.
+        @rtype: L{Deferred}
+        &quot;&quot;&quot;
+
+        # Do the group lock first since this can impact multiple rows and thus could
+        # cause deadlocks if done in the wrong order
+
+        # Row level lock on this item
+        locked = yield self.trylock(self.group)
+        returnValue(locked)
+
+
+    @inlineCallbacks
</ins><span class="cx">     def beforeWork(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         A hook that gets called before the L{WorkItem} does its real work. This can be used
</span><span class="lines">@@ -836,18 +940,6 @@
</span><span class="cx">             should continue, L{False} if it should be skipped without error.
</span><span class="cx">         @rtype: L{Deferred}
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        if self.group is not None:
-            try:
-                yield NamedLock.acquire(self.transaction, self.group)
-            except Exception as e:
-                log.error(
-                    &quot;JobItem: {jobid}, WorkItem: {workid} lock failed: {exc}&quot;,
-                    jobid=self.jobID,
-                    workid=self.workID,
-                    exc=e,
-                )
-                raise JobFailedError(e)
-
</del><span class="cx">         try:
</span><span class="cx">             # Work item is deleted before doing work - but someone else may have
</span><span class="cx">             # done it whilst we waited on the lock so handle that by simply
</span><span class="lines">@@ -942,7 +1034,7 @@
</span><span class="cx">     @inlineCallbacks
</span><span class="cx">     def beforeWork(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        No need to lock - for safety just delete any others.
</del><ins>+        For safety just delete any others.
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx"> 
</span><span class="cx">         # Delete all other work items
</span><span class="lines">@@ -1629,7 +1721,7 @@
</span><span class="cx">     getpid = staticmethod(getpid)
</span><span class="cx"> 
</span><span class="cx">     queuePollInterval = 0.1             # How often to poll for new work
</span><del>-    queueOrphanTimeout = 5.0 * 60.0     # How long before assigned work is possibly orphaned
</del><ins>+    queueOverdueTimeout = 5.0 * 60.0    # How long before assigned work is possibly overdue
</ins><span class="cx"> 
</span><span class="cx">     overloadLevel = 95          # Percentage load level above which job queue processing stops
</span><span class="cx">     highPriorityLevel = 80      # Percentage load level above which only high priority jobs are processed
</span><span class="lines">@@ -1802,21 +1894,39 @@
</span><span class="cx">             # TODO: here is where we should iterate over the unlocked items
</span><span class="cx">             # that are due, ordered by priority, notBefore etc
</span><span class="cx">             nowTime = datetime.utcfromtimestamp(self.reactor.seconds())
</span><del>-            orphanTime = nowTime - timedelta(seconds=self.queueOrphanTimeout)
</del><span class="cx"> 
</span><span class="cx">             txn = self.transactionFactory(label=&quot;jobqueue.workCheck&quot;)
</span><span class="cx">             try:
</span><del>-                nextJob = yield JobItem.nextjob(txn, nowTime, minPriority, orphanTime)
</del><ins>+                nextJob = yield JobItem.nextjob(txn, nowTime, minPriority)
</ins><span class="cx">                 if nextJob is None:
</span><span class="cx">                     break
</span><span class="cx"> 
</span><del>-                # If it is now assigned but not earlier than the orphan time, ignore as this may have
-                # been returned after another txn just assigned it
-                if nextJob.assigned is not None and nextJob.assigned &gt; orphanTime:
-                    continue
</del><ins>+                if nextJob.assigned is not None:
+                    if nextJob.overdue &gt; nowTime:
+                        # If it is now assigned but not overdue, ignore as this may have
+                        # been returned after another txn just assigned it
+                        continue
+                    else:
+                        # It is overdue - check to see whether the work item is currently locked - if so no
+                        # need to re-assign
+                        running = yield nextJob.isRunning()
+                        if running:
+                            # Change the overdue to further in the future whilst we wait for
+                            # the running job to complete
+                            yield nextJob.bumpOverdue(self.queueOverdueTimeout)
+                            log.debug(
+                                &quot;workCheck: bumped overdue timeout on jobid={jobid}&quot;,
+                                jobid=nextJob.jobID,
+                            )
+                            continue
+                        else:
+                            log.debug(
+                                &quot;workCheck: overdue re-assignment for jobid={jobid}&quot;,
+                                jobid=nextJob.jobID,
+                            )
</ins><span class="cx"> 
</span><span class="cx">                 # Always assign as a new job even when it is an orphan
</span><del>-                yield nextJob.assign(nowTime)
</del><ins>+                yield nextJob.assign(nowTime, self.queueOverdueTimeout)
</ins><span class="cx">                 loopCounter += 1
</span><span class="cx"> 
</span><span class="cx">             except Exception as e:
</span></span></pre></div>
<a id="twexttrunktwextenterprisetesttest_jobqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py (13639 => 13640)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/test/test_jobqueue.py        2014-06-16 20:09:15 UTC (rev 13639)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py        2014-06-16 20:21:42 UTC (rev 13640)
</span><span class="lines">@@ -32,7 +32,7 @@
</span><span class="cx"> from twisted.protocols.amp import Command, AMP, Integer
</span><span class="cx"> from twisted.application.service import Service, MultiService
</span><span class="cx"> 
</span><del>-from twext.enterprise.dal.syntax import SchemaSyntax
</del><ins>+from twext.enterprise.dal.syntax import SchemaSyntax, Delete
</ins><span class="cx"> from twext.enterprise.dal.record import fromTable
</span><span class="cx"> from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
</span><span class="cx"> from twext.enterprise.fixtures import buildConnectionPool
</span><span class="lines">@@ -206,6 +206,7 @@
</span><span class="cx">       WEIGHT      integer default 0,
</span><span class="cx">       NOT_BEFORE  timestamp not null,
</span><span class="cx">       ASSIGNED    timestamp default null,
</span><ins>+      OVERDUE     timestamp default null,
</ins><span class="cx">       FAILED      integer default 0
</span><span class="cx">     );
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="lines">@@ -219,6 +220,24 @@
</span><span class="cx">       A integer, B integer,
</span><span class="cx">       DELETE_ON_LOAD integer default 0
</span><span class="cx">     );
</span><ins>+    create table DUMMY_WORK_SINGLETON_ITEM (
+      WORK_ID integer primary key,
+      JOB_ID integer references JOB,
+      A integer, B integer,
+      DELETE_ON_LOAD integer default 0
+    );
+    create table DUMMY_WORK_PAUSE_ITEM (
+      WORK_ID integer primary key,
+      JOB_ID integer references JOB,
+      A integer, B integer,
+      DELETE_ON_LOAD integer default 0
+    );
+    create table AGGREGATOR_WORK_ITEM (
+      WORK_ID integer primary key,
+      JOB_ID integer references JOB,
+      A integer, B integer,
+      DELETE_ON_LOAD integer default 0
+    );
</ins><span class="cx">     &quot;&quot;&quot;
</span><span class="cx"> )
</span><span class="cx"> 
</span><span class="lines">@@ -227,13 +246,24 @@
</span><span class="cx"> 
</span><span class="cx">     dropSQL = [
</span><span class="cx">         &quot;drop table {name} cascade&quot;.format(name=table)
</span><del>-        for table in (&quot;DUMMY_WORK_ITEM&quot;,)
</del><ins>+        for table in (
+            &quot;DUMMY_WORK_ITEM&quot;,
+            &quot;DUMMY_WORK_SINGLETON_ITEM&quot;,
+            &quot;DUMMY_WORK_PAUSE_ITEM&quot;,
+            &quot;AGGREGATOR_WORK_ITEM&quot;
+        )
</ins><span class="cx">     ] + [&quot;delete from job&quot;]
</span><span class="cx"> except SkipTest as e:
</span><span class="cx">     DummyWorkItemTable = object
</span><ins>+    DummyWorkSingletonItemTable = object
+    DummyWorkPauseItemTable = object
+    AggregatorWorkItemTable = object
</ins><span class="cx">     skip = e
</span><span class="cx"> else:
</span><span class="cx">     DummyWorkItemTable = fromTable(schema.DUMMY_WORK_ITEM)
</span><ins>+    DummyWorkSingletonItemTable = fromTable(schema.DUMMY_WORK_SINGLETON_ITEM)
+    DummyWorkPauseItemTable = fromTable(schema.DUMMY_WORK_PAUSE_ITEM)
+    AggregatorWorkItemTable = fromTable(schema.AGGREGATOR_WORK_ITEM)
</ins><span class="cx">     skip = False
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="lines">@@ -271,7 +301,7 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx"> 
</span><del>-class DummyWorkSingletonItem(SingletonWorkItem, DummyWorkItemTable):
</del><ins>+class DummyWorkSingletonItem(SingletonWorkItem, DummyWorkSingletonItemTable):
</ins><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     Sample L{SingletonWorkItem} subclass that adds two integers together and stores them
</span><span class="cx">     in another table.
</span><span class="lines">@@ -287,6 +317,42 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+class DummyWorkPauseItem(WorkItem, DummyWorkPauseItemTable):
+    &quot;&quot;&quot;
+    Sample L{WorkItem} subclass that pauses until a Deferred is fired.
+    &quot;&quot;&quot;
+
+    workStarted = None
+    unpauseWork = None
+
+    def doWork(self):
+        self.workStarted.callback(None)
+        return self.unpauseWork
+
+
+
+class AggregatorWorkItem(WorkItem, AggregatorWorkItemTable):
+    &quot;&quot;&quot;
+    Sample L{WorkItem} subclass that deletes others with the same
+    value and than pauses for a bit.
+    &quot;&quot;&quot;
+
+    group = property(lambda self: (self.table.B == self.b))
+
+    @inlineCallbacks
+    def doWork(self):
+        # Delete the work items we match
+        yield Delete(
+            From=self.table,
+            Where=(self.table.A == self.a)
+        ).on(self.transaction)
+
+        d = Deferred()
+        reactor.callLater(2.0, lambda: d.callback(None))
+        yield d
+
+
+
</ins><span class="cx"> class AMPTests(TestCase):
</span><span class="cx">     &quot;&quot;&quot;
</span><span class="cx">     Tests for L{AMP} faithfully relaying ids across the wire.
</span><span class="lines">@@ -396,7 +462,7 @@
</span><span class="cx">     @inlineCallbacks
</span><span class="cx">     def test_assign(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
</del><ins>+        L{JobItem.assign} will mark a job as assigned.
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
</span><span class="cx">         yield self._enqueue(dbpool, 1, 2)
</span><span class="lines">@@ -412,7 +478,7 @@
</span><span class="cx">         @inlineCallbacks
</span><span class="cx">         def assignJob(txn):
</span><span class="cx">             job = yield JobItem.load(txn, jobs[0].jobID)
</span><del>-            yield job.assign(datetime.datetime.utcnow())
</del><ins>+            yield job.assign(datetime.datetime.utcnow(), PeerConnectionPool.queueOverdueTimeout)
</ins><span class="cx">         yield inTransaction(dbpool.connection, assignJob)
</span><span class="cx"> 
</span><span class="cx">         jobs = yield inTransaction(dbpool.connection, checkJob)
</span><span class="lines">@@ -423,7 +489,7 @@
</span><span class="cx">     @inlineCallbacks
</span><span class="cx">     def test_nextjob(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
</del><ins>+        L{JobItem.nextjob} returns the correct job based on priority.
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx"> 
</span><span class="cx">         dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
</span><span class="lines">@@ -432,7 +498,7 @@
</span><span class="cx">         # Empty job queue
</span><span class="cx">         @inlineCallbacks
</span><span class="cx">         def _next(txn, priority=WORK_PRIORITY_LOW):
</span><del>-            job = yield JobItem.nextjob(txn, now, priority, now - datetime.timedelta(seconds=PeerConnectionPool.queueOrphanTimeout))
</del><ins>+            job = yield JobItem.nextjob(txn, now, priority)
</ins><span class="cx">             if job is not None:
</span><span class="cx">                 work = yield job.workItem()
</span><span class="cx">             else:
</span><span class="lines">@@ -459,7 +525,7 @@
</span><span class="cx">         @inlineCallbacks
</span><span class="cx">         def assignJob(txn, when=None):
</span><span class="cx">             assignee = yield JobItem.load(txn, assignID)
</span><del>-            yield assignee.assign(now if when is None else when)
</del><ins>+            yield assignee.assign(now if when is None else when, PeerConnectionPool.queueOverdueTimeout)
</ins><span class="cx">         yield inTransaction(dbpool.connection, assignJob)
</span><span class="cx">         job, work = yield inTransaction(dbpool.connection, _next)
</span><span class="cx">         self.assertTrue(job is None)
</span><span class="lines">@@ -1184,7 +1250,201 @@
</span><span class="cx">         self.assertEquals(DummyWorkItem.results, {})
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+    @inlineCallbacks
+    def test_locked(self):
+        &quot;&quot;&quot;
+        L{JobItem.run} locks the work item.
+        &quot;&quot;&quot;
</ins><span class="cx"> 
</span><ins>+        DummyWorkPauseItem.workStarted = Deferred()
+        DummyWorkPauseItem.unpauseWork = Deferred()
+
+        @transactionally(self.store.newTransaction)
+        def _enqueue(txn):
+            return txn.enqueue(
+                DummyWorkPauseItem, a=30, b=40, workID=1
+            )
+        yield _enqueue
+
+        # Make sure we have one JOB and one DUMMY_WORK_ITEM
+        def checkJob(txn):
+            return JobItem.all(txn)
+
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 1)
+
+        yield DummyWorkPauseItem.workStarted
+
+        @transactionally(self.store.newTransaction)
+        def _trylock(txn):
+            job = yield JobItem.load(txn, jobs[0].jobID)
+            work = yield job.workItem()
+            locked = yield work.trylock()
+            self.assertFalse(locked)
+        yield _trylock
+
+        DummyWorkPauseItem.unpauseWork.callback(None)
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 0)
+
+
+    @inlineCallbacks
+    def test_overdue(self):
+        &quot;&quot;&quot;
+        L{JobItem.run} locks the work item.
+        &quot;&quot;&quot;
+
+        # Patch JobItem.assign and JobItem.bumpOverdue to track how many times
+        # they are called.
+        assigned = [0]
+        _oldAssign = JobItem.assign
+        def _newAssign(self, when, overdue):
+            assigned[0] += 1
+            return _oldAssign(self, when, 1)
+        self.patch(JobItem, &quot;assign&quot;, _newAssign)
+
+        bumped = [0]
+        _oldBumped = JobItem.bumpOverdue
+        def _newBump(self, bump):
+            bumped[0] += 1
+            return _oldBumped(self, 100)
+        self.patch(JobItem, &quot;bumpOverdue&quot;, _newBump)
+
+        DummyWorkPauseItem.workStarted = Deferred()
+        DummyWorkPauseItem.unpauseWork = Deferred()
+
+        @transactionally(self.store.newTransaction)
+        def _enqueue(txn):
+            return txn.enqueue(
+                DummyWorkPauseItem, a=30, b=40, workID=1
+            )
+        yield _enqueue
+
+        # Make sure we have one JOB and one DUMMY_WORK_ITEM
+        def checkJob(txn):
+            return JobItem.all(txn)
+
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(assigned[0] == 0)
+        self.assertTrue(bumped[0] == 0)
+
+        yield DummyWorkPauseItem.workStarted
+
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(assigned[0] == 1)
+        self.assertTrue(bumped[0] == 0)
+
+        d = Deferred()
+        reactor.callLater(2, lambda: d.callback(None))
+        yield d
+
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(assigned[0] == 1)
+        if bumped[0] != 1:
+            pass
+        self.assertTrue(bumped[0] == 1)
+
+        DummyWorkPauseItem.unpauseWork.callback(None)
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 0)
+        self.assertTrue(assigned[0] == 1)
+        self.assertTrue(bumped[0] == 1)
+
+
+    @inlineCallbacks
+    def test_aggregator_lock(self):
+        &quot;&quot;&quot;
+        L{JobItem.run} fails an aggregated work item and then ignores it.
+        &quot;&quot;&quot;
+
+        # Patch JobItem.assign and JobItem.bumpOverdue to track how many times
+        # they are called.
+        failed = [0]
+        _oldFailed = JobItem.failedToRun
+        def _newFailed(self, delay):
+            failed[0] += 1
+            return _oldFailed(self, delay)
+        self.patch(JobItem, &quot;failedToRun&quot;, _newFailed)
+
+        @transactionally(self.store.newTransaction)
+        def _enqueue1(txn):
+            return txn.enqueue(
+                AggregatorWorkItem, a=1, b=1, workID=1
+            )
+        yield _enqueue1
+
+        @transactionally(self.store.newTransaction)
+        def _enqueue2(txn):
+            return txn.enqueue(
+                AggregatorWorkItem, a=1, b=2, workID=2
+            )
+        yield _enqueue2
+
+        # Make sure we have one JOB and one DUMMY_WORK_ITEM
+        def checkJob(txn):
+            return JobItem.all(txn)
+
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 2)
+
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertEqual(len(jobs), 0)
+        self.assertEqual(failed[0], 1)
+
+
+    @inlineCallbacks
+    def test_aggregator_no_deadlock(self):
+        &quot;&quot;&quot;
+        L{JobItem.run} fails an aggregated work item and then ignores it.
+        &quot;&quot;&quot;
+
+        # Patch JobItem.assign and JobItem.bumpOverdue to track how many times
+        # they are called.
+        failed = [0]
+        _oldFailed = JobItem.failedToRun
+        def _newFailed(self, delay):
+            failed[0] += 1
+            return _oldFailed(self, delay)
+        self.patch(JobItem, &quot;failedToRun&quot;, _newFailed)
+
+        @transactionally(self.store.newTransaction)
+        def _enqueue1(txn):
+            return txn.enqueue(
+                AggregatorWorkItem, a=1, b=1, workID=1
+            )
+        yield _enqueue1
+
+        @transactionally(self.store.newTransaction)
+        def _enqueue2(txn):
+            return txn.enqueue(
+                AggregatorWorkItem, a=1, b=1, workID=2
+            )
+        yield _enqueue2
+
+        # Make sure we have one JOB and one DUMMY_WORK_ITEM
+        def checkJob(txn):
+            return JobItem.all(txn)
+
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 2)
+
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 0)
+        self.assertEqual(failed[0], 1)
+
+
+
</ins><span class="cx"> class DummyProposal(object):
</span><span class="cx"> 
</span><span class="cx">     def __init__(self, *ignored):
</span></span></pre>
</div>
</div>

</body>
</html>