<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>[13640] twext/trunk/twext/enterprise</title>
</head>
<body>
<style type="text/css"><!--
#msg dl.meta { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; }
#msg dl.meta dt { float: left; width: 6em; font-weight: bold; }
#msg dt:after { content:':';}
#msg dl, #msg dt, #msg ul, #msg li, #header, #footer, #logmsg { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; }
#msg dl a { font-weight: bold}
#msg dl a:link { color:#fc3; }
#msg dl a:active { color:#ff0; }
#msg dl a:visited { color:#cc6; }
h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: bold; }
#msg pre { overflow: auto; background: #ffc; border: 1px #fa0 solid; padding: 6px; }
#logmsg { background: #ffc; border: 1px #fa0 solid; padding: 1em 1em 0 1em; }
#logmsg p, #logmsg pre, #logmsg blockquote { margin: 0 0 1em 0; }
#logmsg p, #logmsg li, #logmsg dt, #logmsg dd { line-height: 14pt; }
#logmsg h1, #logmsg h2, #logmsg h3, #logmsg h4, #logmsg h5, #logmsg h6 { margin: .5em 0; }
#logmsg h1:first-child, #logmsg h2:first-child, #logmsg h3:first-child, #logmsg h4:first-child, #logmsg h5:first-child, #logmsg h6:first-child { margin-top: 0; }
#logmsg ul, #logmsg ol { padding: 0; list-style-position: inside; margin: 0 0 0 1em; }
#logmsg ul { text-indent: -1em; padding-left: 1em; }#logmsg ol { text-indent: -1.5em; padding-left: 1.5em; }
#logmsg > ul, #logmsg > ol { margin: 0 0 1em 0; }
#logmsg pre { background: #eee; padding: 1em; }
#logmsg blockquote { border: 1px solid #fa0; border-left-width: 10px; padding: 1em 1em 0 1em; background: white;}
#logmsg dl { margin: 0; }
#logmsg dt { font-weight: bold; }
#logmsg dd { margin: 0; padding: 0 0 0.5em 0; }
#logmsg dd:before { content:'\00bb';}
#logmsg table { border-spacing: 0px; border-collapse: collapse; border-top: 4px solid #fa0; border-bottom: 1px solid #fa0; background: #fff; }
#logmsg table th { text-align: left; font-weight: normal; padding: 0.2em 0.5em; border-top: 1px dotted #fa0; }
#logmsg table td { text-align: right; border-top: 1px dotted #fa0; padding: 0.2em 0.5em; }
#logmsg table thead th { text-align: center; border-bottom: 1px solid #fa0; }
#logmsg table th.Corner { text-align: left; }
#logmsg hr { border: none 0; border-top: 2px dashed #fa0; height: 1px; }
#header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: 6px; }
#patch { width: 100%; }
#patch h4 {font-family: verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;}
#patch .propset h4, #patch .binary h4 {margin:0;}
#patch pre {padding:0;line-height:1.2em;margin:0;}
#patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;}
#patch .propset .diff, #patch .binary .diff {padding:10px 0;}
#patch span {display:block;padding:0 10px;}
#patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, #patch .copfile {border:1px solid #ccc;margin:10px 0;}
#patch ins {background:#dfd;text-decoration:none;display:block;padding:0 10px;}
#patch del {background:#fdd;text-decoration:none;display:block;padding:0 10px;}
#patch .lines, .info {color:#888;background:#fff;}
--></style>
<div id="msg">
<dl class="meta">
<dt>Revision</dt> <dd><a href="http://trac.calendarserver.org//changeset/13640">13640</a></dd>
<dt>Author</dt> <dd>cdaboo@apple.com</dd>
<dt>Date</dt> <dd>2014-06-16 13:21:42 -0700 (Mon, 16 Jun 2014)</dd>
</dl>
<h3>Log Message</h3>
<pre>Revised jobqueue locking to eliminate contention, deadlocks and NamedLock use.</pre>
<h3>Modified Paths</h3>
<ul>
<li><a href="#twexttrunktwextenterprisedalrecordpy">twext/trunk/twext/enterprise/dal/record.py</a></li>
<li><a href="#twexttrunktwextenterprisedaltesttest_recordpy">twext/trunk/twext/enterprise/dal/test/test_record.py</a></li>
<li><a href="#twexttrunktwextenterprisefixturespy">twext/trunk/twext/enterprise/fixtures.py</a></li>
<li><a href="#twexttrunktwextenterprisejobqueuepy">twext/trunk/twext/enterprise/jobqueue.py</a></li>
<li><a href="#twexttrunktwextenterprisetesttest_jobqueuepy">twext/trunk/twext/enterprise/test/test_jobqueue.py</a></li>
</ul>
</div>
<div id="patch">
<h3>Diff</h3>
<a id="twexttrunktwextenterprisedalrecordpy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/dal/record.py (13639 => 13640)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/dal/record.py        2014-06-16 20:09:15 UTC (rev 13639)
+++ twext/trunk/twext/enterprise/dal/record.py        2014-06-16 20:21:42 UTC (rev 13640)
</span><span class="lines">@@ -31,7 +31,7 @@
</span><span class="cx">
</span><span class="cx"> from twisted.internet.defer import inlineCallbacks, returnValue
</span><span class="cx"> from twext.enterprise.dal.syntax import (
</span><del>- Select, Tuple, Constant, ColumnSyntax, Insert, Update, Delete
</del><ins>+ Select, Tuple, Constant, ColumnSyntax, Insert, Update, Delete, SavepointAction
</ins><span class="cx"> )
</span><span class="cx"> from twext.enterprise.util import parseSQLTimestamp
</span><span class="cx"> # from twext.enterprise.dal.syntax import ExpressionSyntax
</span><span class="lines">@@ -330,6 +330,65 @@
</span><span class="cx"> self.__dict__.update(kw)
</span><span class="cx">
</span><span class="cx">
</span><ins>+ @inlineCallbacks
+ def lock(self, where=None):
+ """
+ Lock with a select for update.
+
+ @param where: SQL expression used to match the rows to lock, by default this is just an expression
+ that matches the primary key of this L{Record}, but it can be used to lock multiple L{Records}
+ matching the expression in one go. If it is an L{str}, then all rows will be matched.
+ @type where: L{SQLExpression} or L{None}
+ @return: a L{Deferred} that fires when the lock has been acquired.
+ """
+ if where is None:
+ where = self._primaryKeyComparison(self._primaryKeyValue())
+ elif isinstance(where, str):
+ where = None
+ yield Select(
+ list(self.table),
+ From=self.table,
+ Where=where,
+ ForUpdate=True,
+ ).on(self.transaction)
+
+
+ @inlineCallbacks
+ def trylock(self, where=None):
+ """
+ Try to lock with a select for update no wait. If it fails, rollback to
+ a savepoint and return L{False}, else return L{True}.
+
+ @param where: SQL expression used to match the rows to lock, by default this is just an expression
+ that matches the primary key of this L{Record}, but it can be used to lock multiple L{Records}
+ matching the expression in one go. If it is an L{str}, then all rows will be matched.
+ @type where: L{SQLExpression} or L{None}
+ @return: a L{Deferred} that fires when the updates have been sent to
+ the database.
+ """
+
+ if where is None:
+ where = self._primaryKeyComparison(self._primaryKeyValue())
+ elif isinstance(where, str):
+ where = None
+ savepoint = SavepointAction("Record_trylock_{}".format(self.__class__.__name__))
+ yield savepoint.acquire(self.transaction)
+ try:
+ yield Select(
+ list(self.table),
+ From=self.table,
+ Where=where,
+ ForUpdate=True,
+ NoWait=True,
+ ).on(self.transaction)
+ except:
+ yield savepoint.rollback(self.transaction)
+ returnValue(False)
+ else:
+ yield savepoint.release(self.transaction)
+ returnValue(True)
+
+
</ins><span class="cx"> @classmethod
</span><span class="cx"> def pop(cls, transaction, *primaryKey):
</span><span class="cx"> """
</span></span></pre></div>
<a id="twexttrunktwextenterprisedaltesttest_recordpy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/dal/test/test_record.py (13639 => 13640)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/dal/test/test_record.py        2014-06-16 20:09:15 UTC (rev 13639)
+++ twext/trunk/twext/enterprise/dal/test/test_record.py        2014-06-16 20:21:42 UTC (rev 13640)
</span><span class="lines">@@ -415,3 +415,47 @@
</span><span class="cx"> Record.namingConvention(u"LIKE_THIS_ID"),
</span><span class="cx"> "likeThisID"
</span><span class="cx"> )
</span><ins>+
+
+ @inlineCallbacks
+ def test_lock(self):
+ """
+ A L{Record} may be locked, with L{Record.lock}.
+ """
+ txn = self.pool.connection()
+ for beta, gamma in [
+ (123, u"one"),
+ (234, u"two"),
+ (345, u"three"),
+ (356, u"three"),
+ (456, u"four"),
+ ]:
+ yield txn.execSQL(
+ "insert into ALPHA values (:1, :2)", [beta, gamma]
+ )
+
+ rec = yield TestRecord.load(txn, 234)
+ yield rec.lock()
+ self.assertEqual(rec.gamma, u'two')
+
+
+ @inlineCallbacks
+ def test_trylock(self):
+ """
+ A L{Record} may be locked, with L{Record.trylock}.
+ """
+ txn = self.pool.connection()
+ for beta, gamma in [
+ (123, u"one"),
+ (234, u"two"),
+ (345, u"three"),
+ (356, u"three"),
+ (456, u"four"),
+ ]:
+ yield txn.execSQL(
+ "insert into ALPHA values (:1, :2)", [beta, gamma]
+ )
+
+ rec = yield TestRecord.load(txn, 234)
+ result = yield rec.trylock()
+ self.assertTrue(result)
</ins></span></pre></div>
<a id="twexttrunktwextenterprisefixturespy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/fixtures.py (13639 => 13640)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/fixtures.py        2014-06-16 20:09:15 UTC (rev 13639)
+++ twext/trunk/twext/enterprise/fixtures.py        2014-06-16 20:21:42 UTC (rev 13640)
</span><span class="lines">@@ -59,7 +59,7 @@
</span><span class="cx"> seqs = {}
</span><span class="cx">
</span><span class="cx"> def connectionFactory(label=testCase.id()):
</span><del>- conn = sqlite3.connect(sqlitename)
</del><ins>+ conn = sqlite3.connect(sqlitename, isolation_level=None)
</ins><span class="cx">
</span><span class="cx"> def nextval(seq):
</span><span class="cx"> result = seqs[seq] = seqs.get(seq, 0) + 1
</span><span class="lines">@@ -326,7 +326,7 @@
</span><span class="cx"> tmpdb = test.mktemp()
</span><span class="cx">
</span><span class="cx"> def connect():
</span><del>- return sqlite3.connect(tmpdb)
</del><ins>+ return sqlite3.connect(tmpdb, isolation_level=None)
</ins><span class="cx">
</span><span class="cx"> return connect
</span><span class="cx">
</span></span></pre></div>
<a id="twexttrunktwextenterprisejobqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobqueue.py (13639 => 13640)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobqueue.py        2014-06-16 20:09:15 UTC (rev 13639)
+++ twext/trunk/twext/enterprise/jobqueue.py        2014-06-16 20:21:42 UTC (rev 13640)
</span><span class="lines">@@ -136,7 +136,6 @@
</span><span class="cx"> from twisted.internet.endpoints import TCP4ServerEndpoint
</span><span class="cx"> from twext.enterprise.ienterprise import IQueuer
</span><span class="cx"> from zope.interface.interface import Interface
</span><del>-from twext.enterprise.locking import NamedLock
</del><span class="cx">
</span><span class="cx"> import collections
</span><span class="cx"> import time
</span><span class="lines">@@ -219,6 +218,7 @@
</span><span class="cx"> JobTable.addColumn("WEIGHT", SQLType("integer", 0), default=0)
</span><span class="cx"> JobTable.addColumn("NOT_BEFORE", SQLType("timestamp", None), notNull=True)
</span><span class="cx"> JobTable.addColumn("ASSIGNED", SQLType("timestamp", None), default=None)
</span><ins>+ JobTable.addColumn("OVERDUE", SQLType("timestamp", None), default=None)
</ins><span class="cx"> JobTable.addColumn("FAILED", SQLType("integer", 0), default=0)
</span><span class="cx">
</span><span class="cx"> return inSchema
</span><span class="lines">@@ -309,43 +309,97 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx">
</span><ins>+class JobRunningError(Exception):
+ """
+ A job is already running.
+ """
+ pass
+
+
+
</ins><span class="cx"> class JobItem(Record, fromTable(JobInfoSchema.JOB)):
</span><span class="cx"> """
</span><span class="cx"> An item in the job table. This is typically not directly used by code
</span><span class="cx"> creating work items, but rather is used for internal book keeping of jobs
</span><span class="cx"> associated with work items.
</span><ins>+
+ The JOB table has some important columns that determine how a job is being scheduled:
+
+ NOT_BEFORE - this is a timestamp indicating when the job is expected to run. It will not
+ run before this time, but may run quite some time after (if the service is busy).
+
+ ASSIGNED - this is a timestamp that is initially NULL but set when the job processing loop
+ assigns the job to a child process to be executed. Thus, if the value is not NULL, then the
+ job is (probably) being executed. The child process is supposed to delete the L{JobItem}
+ when it is done, however if the child dies without executing the job, then the job
+ processing loop needs to detect it.
+
+ OVERDUE - this is a timestamp initially set when an L{JobItem} is assigned. It represents
+ a point in the future when the job is expected to be finished. The job processing loop skips
+ jobs that have a non-NULL ASSIGNED value and whose OVERDUE value has not been passed. If
+ OVERDUE is in the past, then the job processing loop checks to see if the job is still
+ running - which is determined by whether a row lock exists on the work item (see
+ L{isRunning}. If the job is still running then OVERDUE is bumped up to a new point in the
+ future, if it is not still running the job is marked as failed - which will reschedule it.
+
+ FAILED - a count of the number of times a job has failed or had its overdue count bumped.
+
+ The above behavior depends on some important locking behavior: when an L{JobItem} is run,
+ it locks the L{WorkItem} row corresponding to the job (it may lock other associated
+ rows - e.g., other L{WorkItem}'s in the same group). It does not lock the L{JobItem}
+ row corresponding to the job because the job processing loop may need to update the
+ OVERDUE value of that row if the work takes a long time to complete.
</ins><span class="cx"> """
</span><span class="cx">
</span><span class="cx"> _workTypes = None
</span><span class="cx"> _workTypeMap = None
</span><span class="cx">
</span><ins>+ failureRescheduleInterval = 60 # When a job fails, reschedule it this number of seconds in the future
+ lockRescheduleInterval = 5 # When a job is locked, reschedule it this number of seconds in the future
+
</ins><span class="cx"> def descriptor(self):
</span><span class="cx"> return JobDescriptor(self.jobID, self.weight, self.workType)
</span><span class="cx">
</span><span class="cx">
</span><del>- def assign(self, now):
</del><ins>+ def assign(self, when, overdue):
</ins><span class="cx"> """
</span><span class="cx"> Mark this job as assigned to a worker by setting the assigned column to the current,
</span><del>- or provided, timestamp.
</del><ins>+ or provided, timestamp. Also set the overdue value to help determine if a job is orphaned.
</ins><span class="cx">
</span><del>- @param now: current timestamp
- @type now: L{datetime.datetime}
- @param when: explicitly set the assigned time - typically only used in tests
- @type when: L{datetime.datetime} or L{None}
</del><ins>+ @param when: current timestamp
+ @type when: L{datetime.datetime}
+ @param overdue: number of seconds after assignment that the job will be considered overdue
+ @type overdue: L{int}
</ins><span class="cx"> """
</span><del>- return self.update(assigned=now)
</del><ins>+ return self.update(assigned=when, overdue=when + timedelta(seconds=overdue))
</ins><span class="cx">
</span><span class="cx">
</span><del>- def failedToRun(self):
</del><ins>+ def bumpOverdue(self, bump):
</ins><span class="cx"> """
</span><ins>+ Increment the overdue value by the specified number of seconds. Used when an overdue job
+ is still running in a child process but the job processing loop has detected it as overdue.
+
+ @param bump: number of seconds to increment overdue by
+ @type bump: L{int}
+ """
+ return self.update(overdue=self.overdue + timedelta(seconds=bump))
+
+
+ def failedToRun(self, delay=None):
+ """
</ins><span class="cx"> The attempt to run the job failed. Leave it in the queue, but mark it
</span><span class="cx"> as unassigned, bump the failure count and set to run at some point in
</span><span class="cx"> the future.
</span><ins>+
+ @param delay: the number of seconds in the future at which to reschedule the
+ next execution of the job. If L{None} use the default class property value.
+ @type delay: L{int}
</ins><span class="cx"> """
</span><span class="cx"> return self.update(
</span><span class="cx"> assigned=None,
</span><ins>+ overdue=None,
</ins><span class="cx"> failed=self.failed + 1,
</span><del>- notBefore=datetime.utcnow() + timedelta(seconds=60)
</del><ins>+ notBefore=datetime.utcnow() + timedelta(seconds=self.failureRescheduleInterval if delay is None else delay)
</ins><span class="cx"> )
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -354,15 +408,15 @@
</span><span class="cx"> def ultimatelyPerform(cls, txnFactory, jobID):
</span><span class="cx"> """
</span><span class="cx"> Eventually, after routing the job to the appropriate place, somebody
</span><del>- actually has to I{do} it.
</del><ins>+ actually has to I{do} it. This method basically calls L{JobItem.run}
+ but it does a bunch of "booking" to track the transaction and log failures
+ and timing information.
</ins><span class="cx">
</span><span class="cx"> @param txnFactory: a 0- or 1-argument callable that creates an
</span><span class="cx"> L{IAsyncTransaction}
</span><span class="cx"> @type txnFactory: L{callable}
</span><del>-
</del><span class="cx"> @param jobID: the ID of the job to be performed
</span><span class="cx"> @type jobID: L{int}
</span><del>-
</del><span class="cx"> @return: a L{Deferred} which fires with C{None} when the job has been
</span><span class="cx"> performed, or fails if the job can't be performed.
</span><span class="cx"> """
</span><span class="lines">@@ -396,8 +450,9 @@
</span><span class="cx"> tm=_tm(),
</span><span class="cx"> )
</span><span class="cx">
</span><del>- except JobFailedError:
</del><ins>+ except (JobFailedError, JobRunningError) as e:
</ins><span class="cx"> # Job failed: abort with cleanup, but pretend this method succeeded
</span><ins>+ delay = job.lockRescheduleInterval if isinstance(e, JobRunningError) else job.failureRescheduleInterval
</ins><span class="cx"> def _cleanUp():
</span><span class="cx"> @inlineCallbacks
</span><span class="cx"> def _cleanUp2(txn2):
</span><span class="lines">@@ -408,16 +463,17 @@
</span><span class="cx"> count=job.failed + 1,
</span><span class="cx"> tm=_tm(),
</span><span class="cx"> )
</span><del>- yield job.failedToRun()
</del><ins>+ yield job.failedToRun(delay=delay)
</ins><span class="cx"> return inTransaction(txnFactory, _cleanUp2, "ultimatelyPerform._cleanUp")
</span><del>- txn.postAbort(_cleanUp)
- yield txn.abort()
</del><span class="cx"> log.debug(
</span><del>- "JobItem: {jobid} failed {work} t={tm}",
</del><ins>+ "JobItem: {jobid} {desc} {work} t={tm}",
</ins><span class="cx"> jobid=jobID,
</span><ins>+ desc="failed" if isinstance(e, JobFailedError) else "locked",
</ins><span class="cx"> work=job.workType,
</span><span class="cx"> tm=_tm(),
</span><span class="cx"> )
</span><ins>+ txn.postAbort(_cleanUp)
+ yield txn.abort()
</ins><span class="cx">
</span><span class="cx"> except:
</span><span class="cx"> f = Failure()
</span><span class="lines">@@ -445,27 +501,25 @@
</span><span class="cx">
</span><span class="cx"> @classmethod
</span><span class="cx"> @inlineCallbacks
</span><del>- def nextjob(cls, txn, now, minPriority, overdue):
</del><ins>+ def nextjob(cls, txn, now, minPriority):
</ins><span class="cx"> """
</span><span class="cx"> Find the next available job based on priority, also return any that are overdue. This
</span><del>- method relies on there being a nextjob() SQL stored procedure to enable skipping over
- items which are row locked to help avoid contention when multiple nodes are operating
- on the job queue simultaneously.
</del><ins>+ method uses an SQL query to find the matching jobs, and sorts based on the NOT_BEFORE
+ value and priority..
</ins><span class="cx">
</span><span class="cx"> @param txn: the transaction to use
</span><span class="cx"> @type txn: L{IAsyncTransaction}
</span><del>- @param now: current timestamp
</del><ins>+ @param now: current timestamp - needed for unit tests that might use their
+ own clock.
</ins><span class="cx"> @type now: L{datetime.datetime}
</span><span class="cx"> @param minPriority: lowest priority level to query for
</span><span class="cx"> @type minPriority: L{int}
</span><del>- @param overdue: how long before an assigned item is considered overdue
- @type overdue: L{datetime.datetime}
</del><span class="cx">
</span><span class="cx"> @return: the job record
</span><span class="cx"> @rtype: L{JobItem}
</span><span class="cx"> """
</span><span class="cx">
</span><del>- jobs = yield cls.nextjobs(txn, now, minPriority, overdue, limit=1)
</del><ins>+ jobs = yield cls.nextjobs(txn, now, minPriority, limit=1)
</ins><span class="cx">
</span><span class="cx"> # Must only be one or zero
</span><span class="cx"> if jobs and len(jobs) > 1:
</span><span class="lines">@@ -476,7 +530,7 @@
</span><span class="cx">
</span><span class="cx"> @classmethod
</span><span class="cx"> @inlineCallbacks
</span><del>- def nextjobs(cls, txn, now, minPriority, overdue, limit=1):
</del><ins>+ def nextjobs(cls, txn, now, minPriority, limit=1):
</ins><span class="cx"> """
</span><span class="cx"> Find the next available job based on priority, also return any that are overdue. This
</span><span class="cx"> method relies on there being a nextjob() SQL stored procedure to enable skipping over
</span><span class="lines">@@ -489,8 +543,6 @@
</span><span class="cx"> @type now: L{datetime.datetime}
</span><span class="cx"> @param minPriority: lowest priority level to query for
</span><span class="cx"> @type minPriority: L{int}
</span><del>- @param overdue: how long before an assigned item is considered overdue
- @type overdue: L{datetime.datetime}
</del><span class="cx"> @param limit: limit on number of jobs to return
</span><span class="cx"> @type limit: L{int}
</span><span class="cx">
</span><span class="lines">@@ -501,7 +553,7 @@
</span><span class="cx"> jobs = yield cls.query(
</span><span class="cx"> txn,
</span><span class="cx"> (cls.notBefore <= now).And
</span><del>- (((cls.priority >= minPriority).And(cls.assigned == None)).Or(cls.assigned < overdue)),
</del><ins>+ (((cls.priority >= minPriority).And(cls.assigned == None)).Or(cls.overdue < now)),
</ins><span class="cx"> order=(cls.assigned, cls.priority),
</span><span class="cx"> ascending=False,
</span><span class="cx"> forUpdate=True,
</span><span class="lines">@@ -516,17 +568,20 @@
</span><span class="cx"> def run(self):
</span><span class="cx"> """
</span><span class="cx"> Run this job item by finding the appropriate work item class and
</span><del>- running that.
</del><ins>+ running that, with appropriate locking.
</ins><span class="cx"> """
</span><span class="cx">
</span><del>- # TODO: Move group into the JOB table.
- # Do a select * where group = X for update nowait to lock
- # all rows in the group - on exception raise JobFailed
- # with a rollback to allow the job to be re-assigned to a later
- # date.
</del><span class="cx"> workItem = yield self.workItem()
</span><span class="cx"> if workItem is not None:
</span><ins>+
+ # First we lock the L{WorkItem}
+ locked = yield workItem.runlock()
+ if not locked:
+ raise JobRunningError()
+
</ins><span class="cx"> try:
</span><ins>+ # Run in three steps, allowing for before/after hooks that sub-classes
+ # may override
</ins><span class="cx"> okToGo = yield workItem.beforeWork()
</span><span class="cx"> if okToGo:
</span><span class="cx"> yield workItem.doWork()
</span><span class="lines">@@ -541,7 +596,8 @@
</span><span class="cx"> raise JobFailedError(e)
</span><span class="cx">
</span><span class="cx"> try:
</span><del>- # Once the work is done we delete ourselves
</del><ins>+ # Once the work is done we delete ourselves - NB this must be the last thing done
+ # to ensure the L{JobItem} row is not locked for very long.
</ins><span class="cx"> yield self.delete()
</span><span class="cx"> except NoSuchRecord:
</span><span class="cx"> # The record has already been removed
</span><span class="lines">@@ -549,7 +605,23 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx"> @inlineCallbacks
</span><ins>+ def isRunning(self):
+ """
+ Return L{True} if the job is currently running (its L{WorkItem} is locked).
+ """
+ workItem = yield self.workItem()
+ if workItem is not None:
+ locked = yield workItem.trylock()
+ returnValue(not locked)
+ else:
+ returnValue(False)
+
+
+ @inlineCallbacks
</ins><span class="cx"> def workItem(self):
</span><ins>+ """
+ Return the L{WorkItem} corresponding to this L{JobItem}.
+ """
</ins><span class="cx"> workItemClass = self.workItemForType(self.workType)
</span><span class="cx"> workItems = yield workItemClass.loadForJob(
</span><span class="cx"> self.transaction, self.jobID
</span><span class="lines">@@ -559,6 +631,12 @@
</span><span class="cx">
</span><span class="cx"> @classmethod
</span><span class="cx"> def workItemForType(cls, workType):
</span><ins>+ """
+ Return the class of the L{WorkItem} associated with this L{JobItem}.
+
+ @param workType: the name of the L{WorkItem}'s table
+ @type workType: L{str}
+ """
</ins><span class="cx"> if cls._workTypeMap is None:
</span><span class="cx"> cls.workTypes()
</span><span class="cx"> return cls._workTypeMap[workType]
</span><span class="lines">@@ -567,6 +645,8 @@
</span><span class="cx"> @classmethod
</span><span class="cx"> def workTypes(cls):
</span><span class="cx"> """
</span><ins>+ Map all L{WorkItem} sub-classes table names to the class type.
+
</ins><span class="cx"> @return: All of the work item types.
</span><span class="cx"> @rtype: iterable of L{WorkItem} subclasses
</span><span class="cx"> """
</span><span class="lines">@@ -651,7 +731,7 @@
</span><span class="cx">
</span><span class="cx"> class JobDescriptorArg(Argument):
</span><span class="cx"> """
</span><del>- Comma-separated.
</del><ins>+ Comma-separated representation of an L{JobDescriptor} for AMP-serialization.
</ins><span class="cx"> """
</span><span class="cx"> def toString(self, inObject):
</span><span class="cx"> return ",".join(map(str, inObject))
</span><span class="lines">@@ -826,6 +906,30 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx"> @inlineCallbacks
</span><ins>+ def runlock(self):
+ """
+ Used to lock an L{WorkItem} before it is run. The L{WorkItem}'s row MUST be
+ locked via SELECT FOR UPDATE to ensure the job queue knows it is being worked
+ on so that it can detect when an overdue job needs to be restarted or not.
+
+ Note that the locking used here may cause deadlocks if not done in the correct
+ order. In particular anything that might cause locks across multiple LWorkItem}s,
+ such as group locks, multi-row locks, etc, MUST be done first.
+
+ @return: an L{Deferred} that fires with L{True} if the L{WorkItem} was locked,
+ L{False} if not.
+ @rtype: L{Deferred}
+ """
+
+ # Do the group lock first since this can impact multiple rows and thus could
+ # cause deadlocks if done in the wrong order
+
+ # Row level lock on this item
+ locked = yield self.trylock(self.group)
+ returnValue(locked)
+
+
+ @inlineCallbacks
</ins><span class="cx"> def beforeWork(self):
</span><span class="cx"> """
</span><span class="cx"> A hook that gets called before the L{WorkItem} does its real work. This can be used
</span><span class="lines">@@ -836,18 +940,6 @@
</span><span class="cx"> should continue, L{False} if it should be skipped without error.
</span><span class="cx"> @rtype: L{Deferred}
</span><span class="cx"> """
</span><del>- if self.group is not None:
- try:
- yield NamedLock.acquire(self.transaction, self.group)
- except Exception as e:
- log.error(
- "JobItem: {jobid}, WorkItem: {workid} lock failed: {exc}",
- jobid=self.jobID,
- workid=self.workID,
- exc=e,
- )
- raise JobFailedError(e)
-
</del><span class="cx"> try:
</span><span class="cx"> # Work item is deleted before doing work - but someone else may have
</span><span class="cx"> # done it whilst we waited on the lock so handle that by simply
</span><span class="lines">@@ -942,7 +1034,7 @@
</span><span class="cx"> @inlineCallbacks
</span><span class="cx"> def beforeWork(self):
</span><span class="cx"> """
</span><del>- No need to lock - for safety just delete any others.
</del><ins>+ For safety just delete any others.
</ins><span class="cx"> """
</span><span class="cx">
</span><span class="cx"> # Delete all other work items
</span><span class="lines">@@ -1629,7 +1721,7 @@
</span><span class="cx"> getpid = staticmethod(getpid)
</span><span class="cx">
</span><span class="cx"> queuePollInterval = 0.1 # How often to poll for new work
</span><del>- queueOrphanTimeout = 5.0 * 60.0 # How long before assigned work is possibly orphaned
</del><ins>+ queueOverdueTimeout = 5.0 * 60.0 # How long before assigned work is possibly overdue
</ins><span class="cx">
</span><span class="cx"> overloadLevel = 95 # Percentage load level above which job queue processing stops
</span><span class="cx"> highPriorityLevel = 80 # Percentage load level above which only high priority jobs are processed
</span><span class="lines">@@ -1802,21 +1894,39 @@
</span><span class="cx"> # TODO: here is where we should iterate over the unlocked items
</span><span class="cx"> # that are due, ordered by priority, notBefore etc
</span><span class="cx"> nowTime = datetime.utcfromtimestamp(self.reactor.seconds())
</span><del>- orphanTime = nowTime - timedelta(seconds=self.queueOrphanTimeout)
</del><span class="cx">
</span><span class="cx"> txn = self.transactionFactory(label="jobqueue.workCheck")
</span><span class="cx"> try:
</span><del>- nextJob = yield JobItem.nextjob(txn, nowTime, minPriority, orphanTime)
</del><ins>+ nextJob = yield JobItem.nextjob(txn, nowTime, minPriority)
</ins><span class="cx"> if nextJob is None:
</span><span class="cx"> break
</span><span class="cx">
</span><del>- # If it is now assigned but not earlier than the orphan time, ignore as this may have
- # been returned after another txn just assigned it
- if nextJob.assigned is not None and nextJob.assigned > orphanTime:
- continue
</del><ins>+ if nextJob.assigned is not None:
+ if nextJob.overdue > nowTime:
+ # If it is now assigned but not overdue, ignore as this may have
+ # been returned after another txn just assigned it
+ continue
+ else:
+ # It is overdue - check to see whether the work item is currently locked - if so no
+ # need to re-assign
+ running = yield nextJob.isRunning()
+ if running:
+ # Change the overdue to further in the future whilst we wait for
+ # the running job to complete
+ yield nextJob.bumpOverdue(self.queueOverdueTimeout)
+ log.debug(
+ "workCheck: bumped overdue timeout on jobid={jobid}",
+ jobid=nextJob.jobID,
+ )
+ continue
+ else:
+ log.debug(
+ "workCheck: overdue re-assignment for jobid={jobid}",
+ jobid=nextJob.jobID,
+ )
</ins><span class="cx">
</span><span class="cx"> # Always assign as a new job even when it is an orphan
</span><del>- yield nextJob.assign(nowTime)
</del><ins>+ yield nextJob.assign(nowTime, self.queueOverdueTimeout)
</ins><span class="cx"> loopCounter += 1
</span><span class="cx">
</span><span class="cx"> except Exception as e:
</span></span></pre></div>
<a id="twexttrunktwextenterprisetesttest_jobqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py (13639 => 13640)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/test/test_jobqueue.py        2014-06-16 20:09:15 UTC (rev 13639)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py        2014-06-16 20:21:42 UTC (rev 13640)
</span><span class="lines">@@ -32,7 +32,7 @@
</span><span class="cx"> from twisted.protocols.amp import Command, AMP, Integer
</span><span class="cx"> from twisted.application.service import Service, MultiService
</span><span class="cx">
</span><del>-from twext.enterprise.dal.syntax import SchemaSyntax
</del><ins>+from twext.enterprise.dal.syntax import SchemaSyntax, Delete
</ins><span class="cx"> from twext.enterprise.dal.record import fromTable
</span><span class="cx"> from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
</span><span class="cx"> from twext.enterprise.fixtures import buildConnectionPool
</span><span class="lines">@@ -206,6 +206,7 @@
</span><span class="cx"> WEIGHT integer default 0,
</span><span class="cx"> NOT_BEFORE timestamp not null,
</span><span class="cx"> ASSIGNED timestamp default null,
</span><ins>+ OVERDUE timestamp default null,
</ins><span class="cx"> FAILED integer default 0
</span><span class="cx"> );
</span><span class="cx"> """
</span><span class="lines">@@ -219,6 +220,24 @@
</span><span class="cx"> A integer, B integer,
</span><span class="cx"> DELETE_ON_LOAD integer default 0
</span><span class="cx"> );
</span><ins>+ create table DUMMY_WORK_SINGLETON_ITEM (
+ WORK_ID integer primary key,
+ JOB_ID integer references JOB,
+ A integer, B integer,
+ DELETE_ON_LOAD integer default 0
+ );
+ create table DUMMY_WORK_PAUSE_ITEM (
+ WORK_ID integer primary key,
+ JOB_ID integer references JOB,
+ A integer, B integer,
+ DELETE_ON_LOAD integer default 0
+ );
+ create table AGGREGATOR_WORK_ITEM (
+ WORK_ID integer primary key,
+ JOB_ID integer references JOB,
+ A integer, B integer,
+ DELETE_ON_LOAD integer default 0
+ );
</ins><span class="cx"> """
</span><span class="cx"> )
</span><span class="cx">
</span><span class="lines">@@ -227,13 +246,24 @@
</span><span class="cx">
</span><span class="cx"> dropSQL = [
</span><span class="cx"> "drop table {name} cascade".format(name=table)
</span><del>- for table in ("DUMMY_WORK_ITEM",)
</del><ins>+ for table in (
+ "DUMMY_WORK_ITEM",
+ "DUMMY_WORK_SINGLETON_ITEM",
+ "DUMMY_WORK_PAUSE_ITEM",
+ "AGGREGATOR_WORK_ITEM"
+ )
</ins><span class="cx"> ] + ["delete from job"]
</span><span class="cx"> except SkipTest as e:
</span><span class="cx"> DummyWorkItemTable = object
</span><ins>+ DummyWorkSingletonItemTable = object
+ DummyWorkPauseItemTable = object
+ AggregatorWorkItemTable = object
</ins><span class="cx"> skip = e
</span><span class="cx"> else:
</span><span class="cx"> DummyWorkItemTable = fromTable(schema.DUMMY_WORK_ITEM)
</span><ins>+ DummyWorkSingletonItemTable = fromTable(schema.DUMMY_WORK_SINGLETON_ITEM)
+ DummyWorkPauseItemTable = fromTable(schema.DUMMY_WORK_PAUSE_ITEM)
+ AggregatorWorkItemTable = fromTable(schema.AGGREGATOR_WORK_ITEM)
</ins><span class="cx"> skip = False
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -271,7 +301,7 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx">
</span><del>-class DummyWorkSingletonItem(SingletonWorkItem, DummyWorkItemTable):
</del><ins>+class DummyWorkSingletonItem(SingletonWorkItem, DummyWorkSingletonItemTable):
</ins><span class="cx"> """
</span><span class="cx"> Sample L{SingletonWorkItem} subclass that adds two integers together and stores them
</span><span class="cx"> in another table.
</span><span class="lines">@@ -287,6 +317,42 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx">
</span><ins>+class DummyWorkPauseItem(WorkItem, DummyWorkPauseItemTable):
+ """
+ Sample L{WorkItem} subclass that pauses until a Deferred is fired.
+ """
+
+ workStarted = None
+ unpauseWork = None
+
+ def doWork(self):
+ self.workStarted.callback(None)
+ return self.unpauseWork
+
+
+
+class AggregatorWorkItem(WorkItem, AggregatorWorkItemTable):
+ """
+ Sample L{WorkItem} subclass that deletes others with the same
+ value and than pauses for a bit.
+ """
+
+ group = property(lambda self: (self.table.B == self.b))
+
+ @inlineCallbacks
+ def doWork(self):
+ # Delete the work items we match
+ yield Delete(
+ From=self.table,
+ Where=(self.table.A == self.a)
+ ).on(self.transaction)
+
+ d = Deferred()
+ reactor.callLater(2.0, lambda: d.callback(None))
+ yield d
+
+
+
</ins><span class="cx"> class AMPTests(TestCase):
</span><span class="cx"> """
</span><span class="cx"> Tests for L{AMP} faithfully relaying ids across the wire.
</span><span class="lines">@@ -396,7 +462,7 @@
</span><span class="cx"> @inlineCallbacks
</span><span class="cx"> def test_assign(self):
</span><span class="cx"> """
</span><del>- L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
</del><ins>+ L{JobItem.assign} will mark a job as assigned.
</ins><span class="cx"> """
</span><span class="cx"> dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
</span><span class="cx"> yield self._enqueue(dbpool, 1, 2)
</span><span class="lines">@@ -412,7 +478,7 @@
</span><span class="cx"> @inlineCallbacks
</span><span class="cx"> def assignJob(txn):
</span><span class="cx"> job = yield JobItem.load(txn, jobs[0].jobID)
</span><del>- yield job.assign(datetime.datetime.utcnow())
</del><ins>+ yield job.assign(datetime.datetime.utcnow(), PeerConnectionPool.queueOverdueTimeout)
</ins><span class="cx"> yield inTransaction(dbpool.connection, assignJob)
</span><span class="cx">
</span><span class="cx"> jobs = yield inTransaction(dbpool.connection, checkJob)
</span><span class="lines">@@ -423,7 +489,7 @@
</span><span class="cx"> @inlineCallbacks
</span><span class="cx"> def test_nextjob(self):
</span><span class="cx"> """
</span><del>- L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
</del><ins>+ L{JobItem.nextjob} returns the correct job based on priority.
</ins><span class="cx"> """
</span><span class="cx">
</span><span class="cx"> dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
</span><span class="lines">@@ -432,7 +498,7 @@
</span><span class="cx"> # Empty job queue
</span><span class="cx"> @inlineCallbacks
</span><span class="cx"> def _next(txn, priority=WORK_PRIORITY_LOW):
</span><del>- job = yield JobItem.nextjob(txn, now, priority, now - datetime.timedelta(seconds=PeerConnectionPool.queueOrphanTimeout))
</del><ins>+ job = yield JobItem.nextjob(txn, now, priority)
</ins><span class="cx"> if job is not None:
</span><span class="cx"> work = yield job.workItem()
</span><span class="cx"> else:
</span><span class="lines">@@ -459,7 +525,7 @@
</span><span class="cx"> @inlineCallbacks
</span><span class="cx"> def assignJob(txn, when=None):
</span><span class="cx"> assignee = yield JobItem.load(txn, assignID)
</span><del>- yield assignee.assign(now if when is None else when)
</del><ins>+ yield assignee.assign(now if when is None else when, PeerConnectionPool.queueOverdueTimeout)
</ins><span class="cx"> yield inTransaction(dbpool.connection, assignJob)
</span><span class="cx"> job, work = yield inTransaction(dbpool.connection, _next)
</span><span class="cx"> self.assertTrue(job is None)
</span><span class="lines">@@ -1184,7 +1250,201 @@
</span><span class="cx"> self.assertEquals(DummyWorkItem.results, {})
</span><span class="cx">
</span><span class="cx">
</span><ins>+ @inlineCallbacks
+ def test_locked(self):
+ """
+ L{JobItem.run} locks the work item.
+ """
</ins><span class="cx">
</span><ins>+ DummyWorkPauseItem.workStarted = Deferred()
+ DummyWorkPauseItem.unpauseWork = Deferred()
+
+ @transactionally(self.store.newTransaction)
+ def _enqueue(txn):
+ return txn.enqueue(
+ DummyWorkPauseItem, a=30, b=40, workID=1
+ )
+ yield _enqueue
+
+ # Make sure we have one JOB and one DUMMY_WORK_ITEM
+ def checkJob(txn):
+ return JobItem.all(txn)
+
+ jobs = yield inTransaction(self.store.newTransaction, checkJob)
+ self.assertTrue(len(jobs) == 1)
+
+ yield DummyWorkPauseItem.workStarted
+
+ @transactionally(self.store.newTransaction)
+ def _trylock(txn):
+ job = yield JobItem.load(txn, jobs[0].jobID)
+ work = yield job.workItem()
+ locked = yield work.trylock()
+ self.assertFalse(locked)
+ yield _trylock
+
+ DummyWorkPauseItem.unpauseWork.callback(None)
+ yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+
+ jobs = yield inTransaction(self.store.newTransaction, checkJob)
+ self.assertTrue(len(jobs) == 0)
+
+
+ @inlineCallbacks
+ def test_overdue(self):
+ """
+ L{JobItem.run} locks the work item.
+ """
+
+ # Patch JobItem.assign and JobItem.bumpOverdue to track how many times
+ # they are called.
+ assigned = [0]
+ _oldAssign = JobItem.assign
+ def _newAssign(self, when, overdue):
+ assigned[0] += 1
+ return _oldAssign(self, when, 1)
+ self.patch(JobItem, "assign", _newAssign)
+
+ bumped = [0]
+ _oldBumped = JobItem.bumpOverdue
+ def _newBump(self, bump):
+ bumped[0] += 1
+ return _oldBumped(self, 100)
+ self.patch(JobItem, "bumpOverdue", _newBump)
+
+ DummyWorkPauseItem.workStarted = Deferred()
+ DummyWorkPauseItem.unpauseWork = Deferred()
+
+ @transactionally(self.store.newTransaction)
+ def _enqueue(txn):
+ return txn.enqueue(
+ DummyWorkPauseItem, a=30, b=40, workID=1
+ )
+ yield _enqueue
+
+ # Make sure we have one JOB and one DUMMY_WORK_ITEM
+ def checkJob(txn):
+ return JobItem.all(txn)
+
+ jobs = yield inTransaction(self.store.newTransaction, checkJob)
+ self.assertTrue(len(jobs) == 1)
+ self.assertTrue(assigned[0] == 0)
+ self.assertTrue(bumped[0] == 0)
+
+ yield DummyWorkPauseItem.workStarted
+
+ jobs = yield inTransaction(self.store.newTransaction, checkJob)
+ self.assertTrue(len(jobs) == 1)
+ self.assertTrue(assigned[0] == 1)
+ self.assertTrue(bumped[0] == 0)
+
+ d = Deferred()
+ reactor.callLater(2, lambda: d.callback(None))
+ yield d
+
+ jobs = yield inTransaction(self.store.newTransaction, checkJob)
+ self.assertTrue(len(jobs) == 1)
+ self.assertTrue(assigned[0] == 1)
+ if bumped[0] != 1:
+ pass
+ self.assertTrue(bumped[0] == 1)
+
+ DummyWorkPauseItem.unpauseWork.callback(None)
+ yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+
+ jobs = yield inTransaction(self.store.newTransaction, checkJob)
+ self.assertTrue(len(jobs) == 0)
+ self.assertTrue(assigned[0] == 1)
+ self.assertTrue(bumped[0] == 1)
+
+
+ @inlineCallbacks
+ def test_aggregator_lock(self):
+ """
+ L{JobItem.run} fails an aggregated work item and then ignores it.
+ """
+
+ # Patch JobItem.assign and JobItem.bumpOverdue to track how many times
+ # they are called.
+ failed = [0]
+ _oldFailed = JobItem.failedToRun
+ def _newFailed(self, delay):
+ failed[0] += 1
+ return _oldFailed(self, delay)
+ self.patch(JobItem, "failedToRun", _newFailed)
+
+ @transactionally(self.store.newTransaction)
+ def _enqueue1(txn):
+ return txn.enqueue(
+ AggregatorWorkItem, a=1, b=1, workID=1
+ )
+ yield _enqueue1
+
+ @transactionally(self.store.newTransaction)
+ def _enqueue2(txn):
+ return txn.enqueue(
+ AggregatorWorkItem, a=1, b=2, workID=2
+ )
+ yield _enqueue2
+
+ # Make sure we have one JOB and one DUMMY_WORK_ITEM
+ def checkJob(txn):
+ return JobItem.all(txn)
+
+ jobs = yield inTransaction(self.store.newTransaction, checkJob)
+ self.assertTrue(len(jobs) == 2)
+
+ yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+
+ jobs = yield inTransaction(self.store.newTransaction, checkJob)
+ self.assertEqual(len(jobs), 0)
+ self.assertEqual(failed[0], 1)
+
+
+ @inlineCallbacks
+ def test_aggregator_no_deadlock(self):
+ """
+ L{JobItem.run} fails an aggregated work item and then ignores it.
+ """
+
+ # Patch JobItem.assign and JobItem.bumpOverdue to track how many times
+ # they are called.
+ failed = [0]
+ _oldFailed = JobItem.failedToRun
+ def _newFailed(self, delay):
+ failed[0] += 1
+ return _oldFailed(self, delay)
+ self.patch(JobItem, "failedToRun", _newFailed)
+
+ @transactionally(self.store.newTransaction)
+ def _enqueue1(txn):
+ return txn.enqueue(
+ AggregatorWorkItem, a=1, b=1, workID=1
+ )
+ yield _enqueue1
+
+ @transactionally(self.store.newTransaction)
+ def _enqueue2(txn):
+ return txn.enqueue(
+ AggregatorWorkItem, a=1, b=1, workID=2
+ )
+ yield _enqueue2
+
+ # Make sure we have one JOB and one DUMMY_WORK_ITEM
+ def checkJob(txn):
+ return JobItem.all(txn)
+
+ jobs = yield inTransaction(self.store.newTransaction, checkJob)
+ self.assertTrue(len(jobs) == 2)
+
+ yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+
+ jobs = yield inTransaction(self.store.newTransaction, checkJob)
+ self.assertTrue(len(jobs) == 0)
+ self.assertEqual(failed[0], 1)
+
+
+
</ins><span class="cx"> class DummyProposal(object):
</span><span class="cx">
</span><span class="cx"> def __init__(self, *ignored):
</span></span></pre>
</div>
</div>
</body>
</html>