<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>[15096] twext/trunk/twext/enterprise/jobs</title>
</head>
<body>
<style type="text/css"><!--
#msg dl.meta { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; }
#msg dl.meta dt { float: left; width: 6em; font-weight: bold; }
#msg dt:after { content:':';}
#msg dl, #msg dt, #msg ul, #msg li, #header, #footer, #logmsg { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; }
#msg dl a { font-weight: bold}
#msg dl a:link { color:#fc3; }
#msg dl a:active { color:#ff0; }
#msg dl a:visited { color:#cc6; }
h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: bold; }
#msg pre { overflow: auto; background: #ffc; border: 1px #fa0 solid; padding: 6px; }
#logmsg { background: #ffc; border: 1px #fa0 solid; padding: 1em 1em 0 1em; }
#logmsg p, #logmsg pre, #logmsg blockquote { margin: 0 0 1em 0; }
#logmsg p, #logmsg li, #logmsg dt, #logmsg dd { line-height: 14pt; }
#logmsg h1, #logmsg h2, #logmsg h3, #logmsg h4, #logmsg h5, #logmsg h6 { margin: .5em 0; }
#logmsg h1:first-child, #logmsg h2:first-child, #logmsg h3:first-child, #logmsg h4:first-child, #logmsg h5:first-child, #logmsg h6:first-child { margin-top: 0; }
#logmsg ul, #logmsg ol { padding: 0; list-style-position: inside; margin: 0 0 0 1em; }
#logmsg ul { text-indent: -1em; padding-left: 1em; }#logmsg ol { text-indent: -1.5em; padding-left: 1.5em; }
#logmsg > ul, #logmsg > ol { margin: 0 0 1em 0; }
#logmsg pre { background: #eee; padding: 1em; }
#logmsg blockquote { border: 1px solid #fa0; border-left-width: 10px; padding: 1em 1em 0 1em; background: white;}
#logmsg dl { margin: 0; }
#logmsg dt { font-weight: bold; }
#logmsg dd { margin: 0; padding: 0 0 0.5em 0; }
#logmsg dd:before { content:'\00bb';}
#logmsg table { border-spacing: 0px; border-collapse: collapse; border-top: 4px solid #fa0; border-bottom: 1px solid #fa0; background: #fff; }
#logmsg table th { text-align: left; font-weight: normal; padding: 0.2em 0.5em; border-top: 1px dotted #fa0; }
#logmsg table td { text-align: right; border-top: 1px dotted #fa0; padding: 0.2em 0.5em; }
#logmsg table thead th { text-align: center; border-bottom: 1px solid #fa0; }
#logmsg table th.Corner { text-align: left; }
#logmsg hr { border: none 0; border-top: 2px dashed #fa0; height: 1px; }
#header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: 6px; }
#patch { width: 100%; }
#patch h4 {font-family: verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;}
#patch .propset h4, #patch .binary h4 {margin:0;}
#patch pre {padding:0;line-height:1.2em;margin:0;}
#patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;}
#patch .propset .diff, #patch .binary .diff {padding:10px 0;}
#patch span {display:block;padding:0 10px;}
#patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, #patch .copfile {border:1px solid #ccc;margin:10px 0;}
#patch ins {background:#dfd;text-decoration:none;display:block;padding:0 10px;}
#patch del {background:#fdd;text-decoration:none;display:block;padding:0 10px;}
#patch .lines, .info {color:#888;background:#fff;}
--></style>
<div id="msg">
<dl class="meta">
<dt>Revision</dt> <dd><a href="http://trac.calendarserver.org//changeset/15096">15096</a></dd>
<dt>Author</dt> <dd>cdaboo@apple.com</dd>
<dt>Date</dt> <dd>2015-09-03 13:49:06 -0700 (Thu, 03 Sep 2015)</dd>
</dl>
<h3>Log Message</h3>
<pre>Split the orphan work check out into its own loop to help speed up the main work loop. Improve some logging.</pre>
<h3>Modified Paths</h3>
<ul>
<li><a href="#twexttrunktwextenterprisejobsjobitempy">twext/trunk/twext/enterprise/jobs/jobitem.py</a></li>
<li><a href="#twexttrunktwextenterprisejobsqueuepy">twext/trunk/twext/enterprise/jobs/queue.py</a></li>
<li><a href="#twexttrunktwextenterprisejobstesttest_jobspy">twext/trunk/twext/enterprise/jobs/test/test_jobs.py</a></li>
</ul>
</div>
<div id="patch">
<h3>Diff</h3>
<a id="twexttrunktwextenterprisejobsjobitempy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobs/jobitem.py (15095 => 15096)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobs/jobitem.py        2015-09-03 20:40:30 UTC (rev 15095)
+++ twext/trunk/twext/enterprise/jobs/jobitem.py        2015-09-03 20:49:06 UTC (rev 15096)
</span><span class="lines">@@ -18,7 +18,8 @@
</span><span class="cx"> from twext.enterprise.dal.model import Sequence
</span><span class="cx"> from twext.enterprise.dal.model import Table, Schema, SQLType
</span><span class="cx"> from twext.enterprise.dal.record import Record, fromTable, NoSuchRecord
</span><del>-from twext.enterprise.dal.syntax import SchemaSyntax
</del><ins>+from twext.enterprise.dal.syntax import SchemaSyntax, Count, NullIf, Constant, \
+ Sum
</ins><span class="cx"> from twext.enterprise.ienterprise import ORACLE_DIALECT
</span><span class="cx"> from twext.enterprise.jobs.utils import inTransaction, astimestamp
</span><span class="cx"> from twext.python.log import Logger
</span><span class="lines">@@ -107,6 +108,12 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx">
</span><ins>+# Priority for work - used to order work items in the job queue
+JOB_PRIORITY_LOW = 0
+JOB_PRIORITY_MEDIUM = 1
+JOB_PRIORITY_HIGH = 2
+
+
</ins><span class="cx"> class JobItem(Record, fromTable(JobInfoSchema.JOB)):
</span><span class="cx"> """
</span><span class="cx"> @DynamicAttrs
</span><span class="lines">@@ -165,6 +172,13 @@
</span><span class="cx"> return self.update(assigned=when, overdue=when + timedelta(seconds=overdue))
</span><span class="cx">
</span><span class="cx">
</span><ins>+ def unassign(self):
+ """
+ Mark this job as unassigned by setting the assigned and overdue columns to L{None}.
+ """
+ return self.update(assigned=None, overdue=None)
+
+
</ins><span class="cx"> def bumpOverdue(self, bump):
</span><span class="cx"> """
</span><span class="cx"> Increment the overdue value by the specified number of seconds. Used when an overdue job
</span><span class="lines">@@ -219,7 +233,7 @@
</span><span class="cx">
</span><span class="cx"> @classmethod
</span><span class="cx"> @inlineCallbacks
</span><del>- def ultimatelyPerform(cls, txnFactory, jobID):
</del><ins>+ def ultimatelyPerform(cls, txnFactory, jobDescriptor):
</ins><span class="cx"> """
</span><span class="cx"> Eventually, after routing the job to the appropriate place, somebody
</span><span class="cx"> actually has to I{do} it. This method basically calls L{JobItem.run}
</span><span class="lines">@@ -229,8 +243,8 @@
</span><span class="cx"> @param txnFactory: a 0- or 1-argument callable that creates an
</span><span class="cx"> L{IAsyncTransaction}
</span><span class="cx"> @type txnFactory: L{callable}
</span><del>- @param jobID: the ID of the job to be performed
- @type jobID: L{int}
</del><ins>+ @param jobDescriptor: the job descriptor
+ @type jobID: L{JobDescriptor}
</ins><span class="cx"> @return: a L{Deferred} which fires with C{None} when the job has been
</span><span class="cx"> performed, or fails if the job can't be performed.
</span><span class="cx"> """
</span><span class="lines">@@ -246,32 +260,35 @@
</span><span class="cx"> @inlineCallbacks
</span><span class="cx"> def _cleanUp2(txn2):
</span><span class="cx"> try:
</span><del>- job = yield cls.load(txn2, jobID)
</del><ins>+ job = yield cls.load(txn2, jobDescriptor.jobID)
</ins><span class="cx"> except NoSuchRecord:
</span><span class="cx"> log.debug(
</span><del>- "JobItem: {jobid} disappeared t={tm}",
- jobid=jobID,
</del><ins>+ "JobItem: {workType} {jobid} disappeared t={tm}",
+ workType=jobDescriptor.workType,
+ jobid=jobDescriptor.jobID,
</ins><span class="cx"> tm=_tm(),
</span><span class="cx"> )
</span><span class="cx"> else:
</span><span class="cx"> log.debug(
</span><del>- "JobItem: {jobid} marking as failed {count} t={tm}",
- jobid=jobID,
</del><ins>+ "JobItem: {workType} {jobid} marking as failed {count} t={tm}",
+ workType=jobDescriptor.workType,
+ jobid=jobDescriptor.jobID,
</ins><span class="cx"> count=job.failed + 1,
</span><span class="cx"> tm=_tm(),
</span><span class="cx"> )
</span><span class="cx"> yield job.failedToRun(locked=isinstance(e, JobRunningError), delay=delay)
</span><span class="cx"> return inTransaction(txnFactory, _cleanUp2, "ultimatelyPerform._failureCleanUp")
</span><span class="cx">
</span><del>- log.debug("JobItem: {jobid} starting to run", jobid=jobID)
- txn = txnFactory(label="ultimatelyPerform: {}".format(jobID))
</del><ins>+ log.debug("JobItem: {workType} {jobid} starting to run", workType=jobDescriptor.workType, jobid=jobDescriptor.jobID)
+ txn = txnFactory(label="ultimatelyPerform: {workType} {jobid}".format(workType=jobDescriptor.workType, jobid=jobDescriptor.jobID))
</ins><span class="cx"> try:
</span><del>- job = yield cls.load(txn, jobID)
</del><ins>+ job = yield cls.load(txn, jobDescriptor.jobID)
</ins><span class="cx"> if hasattr(txn, "_label"):
</span><span class="cx"> txn._label = "{} <{}>".format(txn._label, job.workType)
</span><span class="cx"> log.debug(
</span><del>- "JobItem: {jobid} loaded {work} t={tm}",
- jobid=jobID,
</del><ins>+ "JobItem: {workType} {jobid} loaded {work} t={tm}",
+ workType=jobDescriptor.workType,
+ jobid=jobDescriptor.jobID,
</ins><span class="cx"> work=job.workType,
</span><span class="cx"> tm=_tm(),
</span><span class="cx"> )
</span><span class="lines">@@ -281,8 +298,9 @@
</span><span class="cx"> # The record has already been removed
</span><span class="cx"> yield txn.commit()
</span><span class="cx"> log.debug(
</span><del>- "JobItem: {jobid} already removed t={tm}",
- jobid=jobID,
</del><ins>+ "JobItem: {workType} {jobid} already removed t={tm}",
+ workType=jobDescriptor.workType,
+ jobid=jobDescriptor.jobID,
</ins><span class="cx"> tm=_tm(),
</span><span class="cx"> )
</span><span class="cx">
</span><span class="lines">@@ -292,10 +310,10 @@
</span><span class="cx"> def _temporaryFailure():
</span><span class="cx"> return _failureCleanUp(delay=e.delay * (job.failed + 1))
</span><span class="cx"> log.debug(
</span><del>- "JobItem: {jobid} {desc} {work} t={tm}",
- jobid=jobID,
</del><ins>+ "JobItem: {workType} {jobid} {desc} t={tm}",
+ workType=jobDescriptor.workType,
+ jobid=jobDescriptor.jobID,
</ins><span class="cx"> desc="temporary failure #{}".format(job.failed + 1),
</span><del>- work=job.workType,
</del><span class="cx"> tm=_tm(),
</span><span class="cx"> )
</span><span class="cx"> txn.postAbort(_temporaryFailure)
</span><span class="lines">@@ -305,10 +323,10 @@
</span><span class="cx">
</span><span class="cx"> # Permanent failure
</span><span class="cx"> log.debug(
</span><del>- "JobItem: {jobid} {desc} {work} t={tm}",
- jobid=jobID,
</del><ins>+ "JobItem: {workType} {jobid} {desc} t={tm}",
+ workType=jobDescriptor.workType,
+ jobid=jobDescriptor.jobID,
</ins><span class="cx"> desc="failed" if isinstance(e, JobFailedError) else "locked",
</span><del>- work=job.workType,
</del><span class="cx"> tm=_tm(),
</span><span class="cx"> )
</span><span class="cx"> txn.postAbort(_failureCleanUp)
</span><span class="lines">@@ -317,8 +335,9 @@
</span><span class="cx"> except:
</span><span class="cx"> f = Failure()
</span><span class="cx"> log.error(
</span><del>- "JobItem: {jobid} unknown exception t={tm} {exc}",
- jobid=jobID,
</del><ins>+ "JobItem: {workType} {jobid} exception t={tm} {exc}",
+ workType=jobDescriptor.workType,
+ jobid=jobDescriptor.jobID,
</ins><span class="cx"> tm=_tm(),
</span><span class="cx"> exc=f,
</span><span class="cx"> )
</span><span class="lines">@@ -328,9 +347,9 @@
</span><span class="cx"> else:
</span><span class="cx"> yield txn.commit()
</span><span class="cx"> log.debug(
</span><del>- "JobItem: {jobid} completed {work} t={tm} over={over}",
- jobid=jobID,
- work=job.workType,
</del><ins>+ "JobItem: {workType} {jobid} completed t={tm} over={over}",
+ workType=jobDescriptor.workType,
+ jobid=jobDescriptor.jobID,
</ins><span class="cx"> tm=_tm(),
</span><span class="cx"> over=_overtm(job.notBefore),
</span><span class="cx"> )
</span><span class="lines">@@ -344,7 +363,7 @@
</span><span class="cx"> """
</span><span class="cx"> Find the next available job based on priority, also return any that are overdue. This
</span><span class="cx"> method uses an SQL query to find the matching jobs, and sorts based on the NOT_BEFORE
</span><del>- value and priority..
</del><ins>+ value and priority.
</ins><span class="cx">
</span><span class="cx"> @param txn: the transaction to use
</span><span class="cx"> @type txn: L{IAsyncTransaction}
</span><span class="lines">@@ -362,7 +381,7 @@
</span><span class="cx">
</span><span class="cx"> # Must only be one or zero
</span><span class="cx"> if jobs and len(jobs) > 1:
</span><del>- raise AssertionError("next_job() returned more than one row")
</del><ins>+ raise AssertionError("nextjob() returned more than one row")
</ins><span class="cx">
</span><span class="cx"> returnValue(jobs[0] if jobs else None)
</span><span class="cx">
</span><span class="lines">@@ -386,10 +405,24 @@
</span><span class="cx"> @rtype: L{JobItem}
</span><span class="cx"> """
</span><span class="cx">
</span><del>- queryExpr = (cls.notBefore <= now).And(cls.priority >= minPriority).And(cls.pause == 0).And(
- (cls.assigned == None).Or(cls.overdue < now)
- )
</del><ins>+ # Only add the PRIORITY term if minimum is greater than zero
+ queryExpr = (cls.assigned == None).And(cls.pause == 0).And(cls.notBefore <= now)
</ins><span class="cx">
</span><ins>+ # PRIORITY can only be 0, 1, or 2. So we can convert an inequality into
+ # an equality test as follows:
+ #
+ # PRIORITY >= 0 - no test needed all values match all the time
+ # PRIORITY >= 1 === PRIORITY != 0
+ # PRIORITY >= 2 === PRIORITY == 2
+ #
+ # Doing this allows use of the PRIORITY column in an index since we already
+ # have one inequality in the index (NOT_BEFORE)
+
+ if minPriority == JOB_PRIORITY_MEDIUM:
+ queryExpr = (cls.priority != JOB_PRIORITY_LOW).And(queryExpr)
+ elif minPriority == JOB_PRIORITY_HIGH:
+ queryExpr = (cls.priority == JOB_PRIORITY_HIGH).And(queryExpr)
+
</ins><span class="cx"> if txn.dialect == ORACLE_DIALECT:
</span><span class="cx"> # Oracle does not support a "for update" clause with "order by". So do the
</span><span class="cx"> # "for update" as a second query right after the first. Will need to check
</span><span class="lines">@@ -397,7 +430,7 @@
</span><span class="cx"> jobs = yield cls.query(
</span><span class="cx"> txn,
</span><span class="cx"> queryExpr,
</span><del>- order=(cls.assigned, cls.priority),
</del><ins>+ order=cls.priority,
</ins><span class="cx"> ascending=False,
</span><span class="cx"> limit=limit,
</span><span class="cx"> )
</span><span class="lines">@@ -412,7 +445,7 @@
</span><span class="cx"> jobs = yield cls.query(
</span><span class="cx"> txn,
</span><span class="cx"> queryExpr,
</span><del>- order=(cls.assigned, cls.priority),
</del><ins>+ order=cls.priority,
</ins><span class="cx"> ascending=False,
</span><span class="cx"> forUpdate=True,
</span><span class="cx"> noWait=False,
</span><span class="lines">@@ -422,7 +455,37 @@
</span><span class="cx"> returnValue(jobs)
</span><span class="cx">
</span><span class="cx">
</span><ins>+ @classmethod
</ins><span class="cx"> @inlineCallbacks
</span><ins>+ def overduejobs(cls, txn, now, limit=None):
+ """
+ Find the next overdue job based on priority.
+
+ @param txn: the transaction to use
+ @type txn: L{IAsyncTransaction}
+ @param now: current timestamp
+ @type now: L{datetime.datetime}
+ @param limit: limit on number of jobs to return
+ @type limit: L{int}
+
+ @return: the job record
+ @rtype: L{JobItem}
+ """
+
+ queryExpr = (cls.assigned != None).And(cls.overdue < now)
+
+ jobs = yield cls.query(
+ txn,
+ queryExpr,
+ forUpdate=True,
+ noWait=False,
+ limit=limit,
+ )
+
+ returnValue(jobs)
+
+
+ @inlineCallbacks
</ins><span class="cx"> def run(self):
</span><span class="cx"> """
</span><span class="cx"> Run this job item by finding the appropriate work item class and
</span><span class="lines">@@ -613,6 +676,10 @@
</span><span class="cx"> Generate a histogram of work items currently in the queue.
</span><span class="cx"> """
</span><span class="cx"> from twext.enterprise.jobs.queue import WorkerConnectionPool
</span><ins>+
+ # Fill out an empty set of results for all the known work types. The SQL
+ # query will only return work types that are currently queued, but we want
+ # results for all possible work.
</ins><span class="cx"> results = {}
</span><span class="cx"> now = datetime.utcnow()
</span><span class="cx"> for workItemType in cls.workTypes():
</span><span class="lines">@@ -626,22 +693,32 @@
</span><span class="cx"> "time": WorkerConnectionPool.timing.get(workType, 0.0)
</span><span class="cx"> })
</span><span class="cx">
</span><del>- jobs = yield cls.all(txn)
</del><ins>+ # Use an aggregate query to get the results for each currently queued
+ # work type.
+ jobs = yield cls.queryExpr(
+ expr=None,
+ attributes=(
+ cls.workType,
+ Count(cls.workType),
+ Count(cls.assigned),
+ Count(NullIf(cls.assigned is not None and cls.notBefore < now, Constant(False))),
+ Sum(cls.failed),
+ ),
+ group=cls.workType
+ ).on(txn)
</ins><span class="cx">
</span><del>- for job in jobs:
- r = results[job.workType]
- r["queued"] += 1
- if job.assigned is not None:
- r["assigned"] += 1
- if job.assigned is None and job.notBefore < now:
- r["late"] += 1
- if job.failed:
- r["failed"] += 1
</del><ins>+ for workType, queued, assigned, late, failed in jobs:
+ results[workType].update({
+ "queued": queued,
+ "assigned": assigned,
+ "late": late,
+ "failed": failed,
+ })
</ins><span class="cx">
</span><span class="cx"> returnValue(results)
</span><span class="cx">
</span><span class="cx">
</span><del>-JobDescriptor = namedtuple("JobDescriptor", ["jobID", "weight", "type"])
</del><ins>+JobDescriptor = namedtuple("JobDescriptor", ["jobID", "weight", "workType"])
</ins><span class="cx">
</span><span class="cx"> class JobDescriptorArg(Argument):
</span><span class="cx"> """
</span></span></pre></div>
<a id="twexttrunktwextenterprisejobsqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobs/queue.py (15095 => 15096)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobs/queue.py        2015-09-03 20:40:30 UTC (rev 15095)
+++ twext/trunk/twext/enterprise/jobs/queue.py        2015-09-03 20:49:06 UTC (rev 15096)
</span><span class="lines">@@ -24,7 +24,7 @@
</span><span class="cx"> from twext.python.log import Logger
</span><span class="cx">
</span><span class="cx"> from twisted.application.service import MultiService
</span><del>-from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, passthru, succeed
</del><ins>+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, succeed
</ins><span class="cx"> from twisted.internet.error import AlreadyCalled, AlreadyCancelled
</span><span class="cx"> from twisted.internet.protocol import Factory
</span><span class="cx"> from twisted.protocols.amp import AMP, Command
</span><span class="lines">@@ -181,9 +181,12 @@
</span><span class="cx"> @return: a worker connection with the lowest current load.
</span><span class="cx"> @rtype: L{ConnectionFromWorker}
</span><span class="cx"> """
</span><del>- return sorted(self.workers[:], key=lambda w: w.currentLoad)[0]
</del><span class="cx">
</span><ins>+ # Stable sort based on worker load
+ self.workers.sort(key=lambda w: w.currentLoad)
+ return self.workers[0]
</ins><span class="cx">
</span><ins>+
</ins><span class="cx"> @inlineCallbacks
</span><span class="cx"> def performJob(self, job):
</span><span class="cx"> """
</span><span class="lines">@@ -203,8 +206,8 @@
</span><span class="cx"> try:
</span><span class="cx"> result = yield preferredWorker.performJob(job)
</span><span class="cx"> finally:
</span><del>- self.completed[job.type] += 1
- self.timing[job.type] += time.time() - t
</del><ins>+ self.completed[job.workType] += 1
+ self.timing[job.workType] += time.time() - t
</ins><span class="cx"> returnValue(result)
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -352,7 +355,7 @@
</span><span class="cx"> process has instructed this worker to do it; so, look up the data in
</span><span class="cx"> the row, and do it.
</span><span class="cx"> """
</span><del>- d = JobItem.ultimatelyPerform(self.transactionFactory, job.jobID)
</del><ins>+ d = JobItem.ultimatelyPerform(self.transactionFactory, job)
</ins><span class="cx"> d.addCallback(lambda ignored: {})
</span><span class="cx"> return d
</span><span class="cx">
</span><span class="lines">@@ -452,6 +455,7 @@
</span><span class="cx"> implements(IQueuer)
</span><span class="cx">
</span><span class="cx"> queuePollInterval = 0.1 # How often to poll for new work
</span><ins>+ queueOverduePollInterval = 60.0 # How often to poll for overdue work
</ins><span class="cx"> queueOverdueTimeout = 5.0 * 60.0 # How long before assigned work is possibly overdue
</span><span class="cx"> queuePollingBackoff = ((60.0, 60.0), (5.0, 1.0),) # Polling backoffs
</span><span class="cx">
</span><span class="lines">@@ -478,6 +482,7 @@
</span><span class="cx"> self._timeOfLastWork = time.time()
</span><span class="cx"> self._actualPollInterval = self.queuePollInterval
</span><span class="cx"> self._inWorkCheck = False
</span><ins>+ self._inOverdueCheck = False
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> def enable(self):
</span><span class="lines">@@ -576,30 +581,6 @@
</span><span class="cx"> if nextJob is None:
</span><span class="cx"> break
</span><span class="cx">
</span><del>- if nextJob.assigned is not None:
- if nextJob.overdue > nowTime:
- # If it is now assigned but not overdue, ignore as this may have
- # been returned after another txn just assigned it
- continue
- else:
- # It is overdue - check to see whether the work item is currently locked - if so no
- # need to re-assign
- running = yield nextJob.isRunning()
- if running:
- # Change the overdue to further in the future whilst we wait for
- # the running job to complete
- yield nextJob.bumpOverdue(self.queueOverdueTimeout)
- log.debug(
- "workCheck: bumped overdue timeout on jobid={jobid}",
- jobid=nextJob.jobID,
- )
- continue
- else:
- log.debug(
- "workCheck: overdue re-assignment for jobid={jobid}",
- jobid=nextJob.jobID,
- )
-
</del><span class="cx"> # Always assign as a new job even when it is an orphan
</span><span class="cx"> yield nextJob.assign(nowTime, self.queueOverdueTimeout)
</span><span class="cx"> self._timeOfLastWork = time.time()
</span><span class="lines">@@ -664,46 +645,169 @@
</span><span class="cx"> if loopCounter:
</span><span class="cx"> log.debug("workCheck: processed {ctr} jobs in one loop", ctr=loopCounter)
</span><span class="cx">
</span><del>- _currentWorkDeferred = None
</del><span class="cx"> _workCheckCall = None
</span><span class="cx">
</span><ins>+ @inlineCallbacks
</ins><span class="cx"> def _workCheckLoop(self):
</span><span class="cx"> """
</span><del>- While the service is running, keep checking for any overdue / lost work
- items and re-submit them to the cluster for processing.
</del><ins>+ While the service is running, keep check for work items and execute
+ them. Use a back-off strategy for polling to avoid using too much CPU
+ when there is not a lot to do.
</ins><span class="cx"> """
</span><span class="cx"> self._workCheckCall = None
</span><span class="cx">
</span><span class="cx"> if not self.running:
</span><del>- return
</del><ins>+ returnValue(None)
</ins><span class="cx">
</span><del>- @passthru(
- self._workCheck().addErrback(lambda result: log.error("_workCheckLoop: {exc}", exc=result)).addCallback
</del><ins>+ try:
+ yield self._workCheck()
+ except Exception as e:
+ log.error("_workCheckLoop: {exc}", exc=e)
+
+ if not self.running:
+ returnValue(None)
+
+ # Check for adjustment to poll interval - if the workCheck is idle for certain
+ # periods of time we will gradually increase the poll interval to avoid consuming
+ # excessive power when there is nothing to do
+ interval = self.queuePollInterval
+ idle = time.time() - self._timeOfLastWork
+ for threshold, poll in self.queuePollingBackoff:
+ if idle > threshold:
+ interval = poll
+ break
+ if self._actualPollInterval != interval:
+ log.debug("workCheckLoop: interval set to {interval}s", interval=interval)
+ self._actualPollInterval = interval
+ self._workCheckCall = self.reactor.callLater(
+ self._actualPollInterval, self._workCheckLoop
</ins><span class="cx"> )
</span><del>- def scheduleNext(result):
- self._currentWorkDeferred = None
- if not self.running:
- return
</del><span class="cx">
</span><del>- # Check for adjustment to poll interval - if the workCheck is idle for certain
- # periods of time we will gradually increase the poll interval to avoid consuming
- # excessive power when there is nothing to do
- interval = self.queuePollInterval
- idle = time.time() - self._timeOfLastWork
- for threshold, poll in self.queuePollingBackoff:
- if idle > threshold:
- interval = poll
</del><ins>+
+ @inlineCallbacks
+ def _overdueCheck(self):
+ """
+ Every controller will periodically check for any overdue work and unassign that
+ work so that it gets execute during the next regular work check.
+ """
+
+ loopCounter = 0
+ while True:
+ if not self.running or self.disableWorkProcessing:
+ returnValue(None)
+
+ # Determine what the timestamp cutoff
+ # TODO: here is where we should iterate over the unlocked items
+ # that are due, ordered by priority, notBefore etc
+ nowTime = datetime.utcfromtimestamp(self.reactor.seconds())
+
+ self._inOverdueCheck = True
+ txn = overdueJob = None
+ try:
+ txn = self.transactionFactory(label="jobqueue.overdueCheck")
+ overdueJobs = yield JobItem.overduejobs(txn, nowTime, limit=1)
+ if overdueJobs:
+ overdueJob = overdueJobs[0]
+ else:
</ins><span class="cx"> break
</span><del>- if self._actualPollInterval != interval:
- log.debug("workCheckLoop: interval set to {interval}s", interval=interval)
- self._actualPollInterval = interval
- self._workCheckCall = self.reactor.callLater(
- self._actualPollInterval, self._workCheckLoop
- )
</del><span class="cx">
</span><del>- self._currentWorkDeferred = scheduleNext
</del><ins>+ # It is overdue - check to see whether the work item is currently locked - if so no
+ # need to re-assign
+ running = yield overdueJob.isRunning()
+ if running:
+ # Change the overdue to further in the future whilst we wait for
+ # the running job to complete
+ yield overdueJob.bumpOverdue(self.queueOverdueTimeout)
+ log.debug(
+ "overdueCheck: bumped overdue timeout on jobid={jobid}",
+ jobid=overdueJob.jobID,
+ )
+ else:
+ # Unassign the job so it is picked up by the next L{_workCheck}
+ yield overdueJob.unassign()
+ log.debug(
+ "overdueCheck: overdue unassigned for jobid={jobid}",
+ jobid=overdueJob.jobID,
+ )
+ loopCounter += 1
</ins><span class="cx">
</span><ins>+ except Exception as e:
+ log.error(
+ "Failed to process overdue job: {jobID}, {exc}",
+ jobID=overdueJob.jobID if overdueJob else "?",
+ exc=e,
+ )
+ if txn is not None:
+ yield txn.abort()
+ txn = None
</ins><span class="cx">
</span><ins>+ # If we can identify the problem job, try and set it to failed so that it
+ # won't block other jobs behind it (it will be picked again when the failure
+ # interval is exceeded - but that has a back off so a permanently stuck item
+ # should fade away. We probably want to have some additional logic to simply
+ # remove something that is permanently failing.
+ if overdueJob is not None:
+ txn = self.transactionFactory(label="jobqueue.overdueCheck.failed")
+ try:
+ failedJob = yield JobItem.load(txn, overdueJob.jobID)
+ yield failedJob.failedToRun()
+ except Exception as e:
+ # Could not mark as failed - break out of the overdue job loop
+ log.error(
+ "Failed to mark failed overdue job:{}, {exc}",
+ jobID=overdueJob.jobID,
+ exc=e,
+ )
+ yield txn.abort()
+ txn = None
+ overdueJob = None
+ break
+ else:
+ # Marked the problem one as failed, so keep going and get the next overdue job
+ log.error("Marked failed overdue job: {jobID}", jobID=overdueJob.jobID)
+ yield txn.commit()
+ txn = None
+ overdueJob = None
+ else:
+ # Cannot mark anything as failed - break out of overdue job loop
+ log.error("Cannot mark failed overdue job")
+ break
+ finally:
+ if txn is not None:
+ yield txn.commit()
+ txn = None
+ self._inOverdueCheck = False
+
+ if loopCounter:
+ # Make sure the regular work check loop runs immediately if we processed any overdue items
+ yield self.enqueuedJob()
+ log.debug("overdueCheck: processed {ctr} jobs in one loop", ctr=loopCounter)
+
+ _overdueCheckCall = None
+
+ @inlineCallbacks
+ def _overdueCheckLoop(self):
+ """
+ While the service is running, keep checking for any overdue items.
+ """
+ self._overdueCheckCall = None
+
+ if not self.running:
+ returnValue(None)
+
+ try:
+ yield self._overdueCheck()
+ except Exception as e:
+ log.error("_overdueCheckLoop: {exc}", exc=e)
+
+ if not self.running:
+ returnValue(None)
+
+ self._overdueCheckCall = self.reactor.callLater(
+ self.queueOverduePollInterval, self._overdueCheckLoop
+ )
+
+
</ins><span class="cx"> def enqueuedJob(self):
</span><span class="cx"> """
</span><span class="cx"> Reschedule the work check loop to run right now. This should be called in response to "external" activity that
</span><span class="lines">@@ -733,6 +837,7 @@
</span><span class="cx"> """
</span><span class="cx"> super(ControllerQueue, self).startService()
</span><span class="cx"> self._workCheckLoop()
</span><ins>+ self._overdueCheckLoop()
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> @inlineCallbacks
</span><span class="lines">@@ -747,13 +852,13 @@
</span><span class="cx"> self._workCheckCall.cancel()
</span><span class="cx"> self._workCheckCall = None
</span><span class="cx">
</span><del>- if self._currentWorkDeferred is not None:
- self._currentWorkDeferred.cancel()
- self._currentWorkDeferred = None
</del><ins>+ if self._overdueCheckCall is not None:
+ self._overdueCheckCall.cancel()
+ self._overdueCheckCall = None
</ins><span class="cx">
</span><span class="cx"> # Wait for any active work check to finish (but no more than 1 minute)
</span><span class="cx"> start = time.time()
</span><del>- while self._inWorkCheck:
</del><ins>+ while self._inWorkCheck and self._inOverdueCheck:
</ins><span class="cx"> d = Deferred()
</span><span class="cx"> self.reactor.callLater(0.5, lambda : d.callback(None))
</span><span class="cx"> yield d
</span><span class="lines">@@ -780,7 +885,7 @@
</span><span class="cx"> """
</span><span class="cx"> Perform the given job right now.
</span><span class="cx"> """
</span><del>- return JobItem.ultimatelyPerform(self.txnFactory, job.jobID)
</del><ins>+ return JobItem.ultimatelyPerform(self.txnFactory, job)
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx">
</span></span></pre></div>
<a id="twexttrunktwextenterprisejobstesttest_jobspy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobs/test/test_jobs.py (15095 => 15096)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobs/test/test_jobs.py        2015-09-03 20:40:30 UTC (rev 15095)
+++ twext/trunk/twext/enterprise/jobs/test/test_jobs.py        2015-09-03 20:49:06 UTC (rev 15096)
</span><span class="lines">@@ -573,11 +573,11 @@
</span><span class="cx"> self.assertTrue(job is None)
</span><span class="cx"> self.assertTrue(work is None)
</span><span class="cx">
</span><del>- # Assigned job with past notBefore, but overdue is returned
</del><ins>+ # Assigned job with past notBefore, but overdue is not returned
</ins><span class="cx"> yield inTransaction(dbpool.connection, assignJob, when=now + datetime.timedelta(days=-1))
</span><span class="cx"> job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_HIGH)
</span><del>- self.assertTrue(job is not None)
- self.assertTrue(work.a == 2)
</del><ins>+ self.assertTrue(job is None)
+ self.assertTrue(work is None)
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> @inlineCallbacks
</span><span class="lines">@@ -1481,6 +1481,8 @@
</span><span class="cx"> return _oldBumped(self, 100)
</span><span class="cx"> self.patch(JobItem, "bumpOverdue", _newBump)
</span><span class="cx">
</span><ins>+ self.patch(ControllerQueue, "queueOverduePollInterval", 0.5)
+
</ins><span class="cx"> DummyWorkPauseItem.workStarted = Deferred()
</span><span class="cx"> DummyWorkPauseItem.unpauseWork = Deferred()
</span><span class="cx">
</span><span class="lines">@@ -1567,6 +1569,8 @@
</span><span class="cx"> DummyWorkPauseItem.workStarted = Deferred()
</span><span class="cx"> self.patch(DummyWorkPauseItem, "doWork", _newDoWorkRaise)
</span><span class="cx">
</span><ins>+ self.patch(ControllerQueue, "queueOverduePollInterval", 0.5)
+
</ins><span class="cx"> @transactionally(self.store.newTransaction)
</span><span class="cx"> def _enqueue(txn):
</span><span class="cx"> return txn.enqueue(
</span><span class="lines">@@ -1635,6 +1639,8 @@
</span><span class="cx"> DummyWorkPauseItem.workStarted = Deferred()
</span><span class="cx"> self.patch(DummyWorkPauseItem, "doWork", _newDoWorkRaise)
</span><span class="cx">
</span><ins>+ self.patch(ControllerQueue, "queueOverduePollInterval", 0.5)
+
</ins><span class="cx"> @transactionally(self.store.newTransaction)
</span><span class="cx"> def _enqueue(txn):
</span><span class="cx"> return txn.enqueue(
</span></span></pre>
</div>
</div>
</body>
</html>