[CalendarServer-changes] [15096] twext/trunk/twext/enterprise/jobs

source_changes at macosforge.org source_changes at macosforge.org
Thu Sep 3 13:49:07 PDT 2015


Revision: 15096
          http://trac.calendarserver.org//changeset/15096
Author:   cdaboo at apple.com
Date:     2015-09-03 13:49:06 -0700 (Thu, 03 Sep 2015)
Log Message:
-----------
Split the orphan work check out into its own loop to help speed up the main work loop. Improve some logging.

Modified Paths:
--------------
    twext/trunk/twext/enterprise/jobs/jobitem.py
    twext/trunk/twext/enterprise/jobs/queue.py
    twext/trunk/twext/enterprise/jobs/test/test_jobs.py

Modified: twext/trunk/twext/enterprise/jobs/jobitem.py
===================================================================
--- twext/trunk/twext/enterprise/jobs/jobitem.py	2015-09-03 20:40:30 UTC (rev 15095)
+++ twext/trunk/twext/enterprise/jobs/jobitem.py	2015-09-03 20:49:06 UTC (rev 15096)
@@ -18,7 +18,8 @@
 from twext.enterprise.dal.model import Sequence
 from twext.enterprise.dal.model import Table, Schema, SQLType
 from twext.enterprise.dal.record import Record, fromTable, NoSuchRecord
-from twext.enterprise.dal.syntax import SchemaSyntax
+from twext.enterprise.dal.syntax import SchemaSyntax, Count, NullIf, Constant, \
+    Sum
 from twext.enterprise.ienterprise import ORACLE_DIALECT
 from twext.enterprise.jobs.utils import inTransaction, astimestamp
 from twext.python.log import Logger
@@ -107,6 +108,12 @@
 
 
 
+# Priority for work - used to order work items in the job queue
+JOB_PRIORITY_LOW = 0
+JOB_PRIORITY_MEDIUM = 1
+JOB_PRIORITY_HIGH = 2
+
+
 class JobItem(Record, fromTable(JobInfoSchema.JOB)):
     """
     @DynamicAttrs
@@ -165,6 +172,13 @@
         return self.update(assigned=when, overdue=when + timedelta(seconds=overdue))
 
 
+    def unassign(self):
+        """
+        Mark this job as unassigned by setting the assigned and overdue columns to L{None}.
+        """
+        return self.update(assigned=None, overdue=None)
+
+
     def bumpOverdue(self, bump):
         """
         Increment the overdue value by the specified number of seconds. Used when an overdue job
@@ -219,7 +233,7 @@
 
     @classmethod
     @inlineCallbacks
-    def ultimatelyPerform(cls, txnFactory, jobID):
+    def ultimatelyPerform(cls, txnFactory, jobDescriptor):
         """
         Eventually, after routing the job to the appropriate place, somebody
         actually has to I{do} it. This method basically calls L{JobItem.run}
@@ -229,8 +243,8 @@
         @param txnFactory: a 0- or 1-argument callable that creates an
             L{IAsyncTransaction}
         @type txnFactory: L{callable}
-        @param jobID: the ID of the job to be performed
-        @type jobID: L{int}
+        @param jobDescriptor: the job descriptor
+        @type jobID: L{JobDescriptor}
         @return: a L{Deferred} which fires with C{None} when the job has been
             performed, or fails if the job can't be performed.
         """
@@ -246,32 +260,35 @@
             @inlineCallbacks
             def _cleanUp2(txn2):
                 try:
-                    job = yield cls.load(txn2, jobID)
+                    job = yield cls.load(txn2, jobDescriptor.jobID)
                 except NoSuchRecord:
                     log.debug(
-                        "JobItem: {jobid} disappeared t={tm}",
-                        jobid=jobID,
+                        "JobItem: {workType} {jobid} disappeared t={tm}",
+                        workType=jobDescriptor.workType,
+                        jobid=jobDescriptor.jobID,
                         tm=_tm(),
                     )
                 else:
                     log.debug(
-                        "JobItem: {jobid} marking as failed {count} t={tm}",
-                        jobid=jobID,
+                        "JobItem: {workType} {jobid} marking as failed {count} t={tm}",
+                        workType=jobDescriptor.workType,
+                        jobid=jobDescriptor.jobID,
                         count=job.failed + 1,
                         tm=_tm(),
                     )
                     yield job.failedToRun(locked=isinstance(e, JobRunningError), delay=delay)
             return inTransaction(txnFactory, _cleanUp2, "ultimatelyPerform._failureCleanUp")
 
-        log.debug("JobItem: {jobid} starting to run", jobid=jobID)
-        txn = txnFactory(label="ultimatelyPerform: {}".format(jobID))
+        log.debug("JobItem: {workType} {jobid} starting to run", workType=jobDescriptor.workType, jobid=jobDescriptor.jobID)
+        txn = txnFactory(label="ultimatelyPerform: {workType} {jobid}".format(workType=jobDescriptor.workType, jobid=jobDescriptor.jobID))
         try:
-            job = yield cls.load(txn, jobID)
+            job = yield cls.load(txn, jobDescriptor.jobID)
             if hasattr(txn, "_label"):
                 txn._label = "{} <{}>".format(txn._label, job.workType)
             log.debug(
-                "JobItem: {jobid} loaded {work} t={tm}",
-                jobid=jobID,
+                "JobItem: {workType} {jobid} loaded {work} t={tm}",
+                workType=jobDescriptor.workType,
+                jobid=jobDescriptor.jobID,
                 work=job.workType,
                 tm=_tm(),
             )
@@ -281,8 +298,9 @@
             # The record has already been removed
             yield txn.commit()
             log.debug(
-                "JobItem: {jobid} already removed t={tm}",
-                jobid=jobID,
+                "JobItem: {workType} {jobid} already removed t={tm}",
+                workType=jobDescriptor.workType,
+                jobid=jobDescriptor.jobID,
                 tm=_tm(),
             )
 
@@ -292,10 +310,10 @@
             def _temporaryFailure():
                 return _failureCleanUp(delay=e.delay * (job.failed + 1))
             log.debug(
-                "JobItem: {jobid} {desc} {work} t={tm}",
-                jobid=jobID,
+                "JobItem: {workType} {jobid} {desc} t={tm}",
+                workType=jobDescriptor.workType,
+                jobid=jobDescriptor.jobID,
                 desc="temporary failure #{}".format(job.failed + 1),
-                work=job.workType,
                 tm=_tm(),
             )
             txn.postAbort(_temporaryFailure)
@@ -305,10 +323,10 @@
 
             # Permanent failure
             log.debug(
-                "JobItem: {jobid} {desc} {work} t={tm}",
-                jobid=jobID,
+                "JobItem: {workType} {jobid} {desc} t={tm}",
+                workType=jobDescriptor.workType,
+                jobid=jobDescriptor.jobID,
                 desc="failed" if isinstance(e, JobFailedError) else "locked",
-                work=job.workType,
                 tm=_tm(),
             )
             txn.postAbort(_failureCleanUp)
@@ -317,8 +335,9 @@
         except:
             f = Failure()
             log.error(
-                "JobItem: {jobid} unknown exception t={tm} {exc}",
-                jobid=jobID,
+                "JobItem: {workType} {jobid} exception t={tm} {exc}",
+                workType=jobDescriptor.workType,
+                jobid=jobDescriptor.jobID,
                 tm=_tm(),
                 exc=f,
             )
@@ -328,9 +347,9 @@
         else:
             yield txn.commit()
             log.debug(
-                "JobItem: {jobid} completed {work} t={tm} over={over}",
-                jobid=jobID,
-                work=job.workType,
+                "JobItem: {workType} {jobid} completed t={tm} over={over}",
+                workType=jobDescriptor.workType,
+                jobid=jobDescriptor.jobID,
                 tm=_tm(),
                 over=_overtm(job.notBefore),
             )
@@ -344,7 +363,7 @@
         """
         Find the next available job based on priority, also return any that are overdue. This
         method uses an SQL query to find the matching jobs, and sorts based on the NOT_BEFORE
-        value and priority..
+        value and priority.
 
         @param txn: the transaction to use
         @type txn: L{IAsyncTransaction}
@@ -362,7 +381,7 @@
 
         # Must only be one or zero
         if jobs and len(jobs) > 1:
-            raise AssertionError("next_job() returned more than one row")
+            raise AssertionError("nextjob() returned more than one row")
 
         returnValue(jobs[0] if jobs else None)
 
@@ -386,10 +405,24 @@
         @rtype: L{JobItem}
         """
 
-        queryExpr = (cls.notBefore <= now).And(cls.priority >= minPriority).And(cls.pause == 0).And(
-            (cls.assigned == None).Or(cls.overdue < now)
-        )
+        # Only add the PRIORITY term if minimum is greater than zero
+        queryExpr = (cls.assigned == None).And(cls.pause == 0).And(cls.notBefore <= now)
 
+        # PRIORITY can only be 0, 1, or 2. So we can convert an inequality into
+        # an equality test as follows:
+        #
+        # PRIORITY >= 0 - no test needed all values match all the time
+        # PRIORITY >= 1 === PRIORITY != 0
+        # PRIORITY >= 2 === PRIORITY == 2
+        #
+        # Doing this allows use of the PRIORITY column in an index since we already
+        # have one inequality in the index (NOT_BEFORE)
+
+        if minPriority == JOB_PRIORITY_MEDIUM:
+            queryExpr = (cls.priority != JOB_PRIORITY_LOW).And(queryExpr)
+        elif minPriority == JOB_PRIORITY_HIGH:
+            queryExpr = (cls.priority == JOB_PRIORITY_HIGH).And(queryExpr)
+
         if txn.dialect == ORACLE_DIALECT:
             # Oracle does not support a "for update" clause with "order by". So do the
             # "for update" as a second query right after the first. Will need to check
@@ -397,7 +430,7 @@
             jobs = yield cls.query(
                 txn,
                 queryExpr,
-                order=(cls.assigned, cls.priority),
+                order=cls.priority,
                 ascending=False,
                 limit=limit,
             )
@@ -412,7 +445,7 @@
             jobs = yield cls.query(
                 txn,
                 queryExpr,
-                order=(cls.assigned, cls.priority),
+                order=cls.priority,
                 ascending=False,
                 forUpdate=True,
                 noWait=False,
@@ -422,7 +455,37 @@
         returnValue(jobs)
 
 
+    @classmethod
     @inlineCallbacks
+    def overduejobs(cls, txn, now, limit=None):
+        """
+        Find the next overdue job based on priority.
+
+        @param txn: the transaction to use
+        @type txn: L{IAsyncTransaction}
+        @param now: current timestamp
+        @type now: L{datetime.datetime}
+        @param limit: limit on number of jobs to return
+        @type limit: L{int}
+
+        @return: the job record
+        @rtype: L{JobItem}
+        """
+
+        queryExpr = (cls.assigned != None).And(cls.overdue < now)
+
+        jobs = yield cls.query(
+            txn,
+            queryExpr,
+            forUpdate=True,
+            noWait=False,
+            limit=limit,
+        )
+
+        returnValue(jobs)
+
+
+    @inlineCallbacks
     def run(self):
         """
         Run this job item by finding the appropriate work item class and
@@ -613,6 +676,10 @@
         Generate a histogram of work items currently in the queue.
         """
         from twext.enterprise.jobs.queue import WorkerConnectionPool
+
+        # Fill out an empty set of results for all the known work types. The SQL
+        # query will only return work types that are currently queued, but we want
+        # results for all possible work.
         results = {}
         now = datetime.utcnow()
         for workItemType in cls.workTypes():
@@ -626,22 +693,32 @@
                 "time": WorkerConnectionPool.timing.get(workType, 0.0)
             })
 
-        jobs = yield cls.all(txn)
+        # Use an aggregate query to get the results for each currently queued
+        # work type.
+        jobs = yield cls.queryExpr(
+            expr=None,
+            attributes=(
+                cls.workType,
+                Count(cls.workType),
+                Count(cls.assigned),
+                Count(NullIf(cls.assigned is not None and cls.notBefore < now, Constant(False))),
+                Sum(cls.failed),
+            ),
+            group=cls.workType
+        ).on(txn)
 
-        for job in jobs:
-            r = results[job.workType]
-            r["queued"] += 1
-            if job.assigned is not None:
-                r["assigned"] += 1
-            if job.assigned is None and job.notBefore < now:
-                r["late"] += 1
-            if job.failed:
-                r["failed"] += 1
+        for workType, queued, assigned, late, failed in jobs:
+            results[workType].update({
+                "queued": queued,
+                "assigned": assigned,
+                "late": late,
+                "failed": failed,
+            })
 
         returnValue(results)
 
 
-JobDescriptor = namedtuple("JobDescriptor", ["jobID", "weight", "type"])
+JobDescriptor = namedtuple("JobDescriptor", ["jobID", "weight", "workType"])
 
 class JobDescriptorArg(Argument):
     """

Modified: twext/trunk/twext/enterprise/jobs/queue.py
===================================================================
--- twext/trunk/twext/enterprise/jobs/queue.py	2015-09-03 20:40:30 UTC (rev 15095)
+++ twext/trunk/twext/enterprise/jobs/queue.py	2015-09-03 20:49:06 UTC (rev 15096)
@@ -24,7 +24,7 @@
 from twext.python.log import Logger
 
 from twisted.application.service import MultiService
-from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, passthru, succeed
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, succeed
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 from twisted.internet.protocol import Factory
 from twisted.protocols.amp import AMP, Command
@@ -181,9 +181,12 @@
         @return: a worker connection with the lowest current load.
         @rtype: L{ConnectionFromWorker}
         """
-        return sorted(self.workers[:], key=lambda w: w.currentLoad)[0]
 
+        # Stable sort based on worker load
+        self.workers.sort(key=lambda w: w.currentLoad)
+        return self.workers[0]
 
+
     @inlineCallbacks
     def performJob(self, job):
         """
@@ -203,8 +206,8 @@
         try:
             result = yield preferredWorker.performJob(job)
         finally:
-            self.completed[job.type] += 1
-            self.timing[job.type] += time.time() - t
+            self.completed[job.workType] += 1
+            self.timing[job.workType] += time.time() - t
         returnValue(result)
 
 
@@ -352,7 +355,7 @@
         process has instructed this worker to do it; so, look up the data in
         the row, and do it.
         """
-        d = JobItem.ultimatelyPerform(self.transactionFactory, job.jobID)
+        d = JobItem.ultimatelyPerform(self.transactionFactory, job)
         d.addCallback(lambda ignored: {})
         return d
 
@@ -452,6 +455,7 @@
     implements(IQueuer)
 
     queuePollInterval = 0.1             # How often to poll for new work
+    queueOverduePollInterval = 60.0     # How often to poll for overdue work
     queueOverdueTimeout = 5.0 * 60.0    # How long before assigned work is possibly overdue
     queuePollingBackoff = ((60.0, 60.0), (5.0, 1.0),)   # Polling backoffs
 
@@ -478,6 +482,7 @@
         self._timeOfLastWork = time.time()
         self._actualPollInterval = self.queuePollInterval
         self._inWorkCheck = False
+        self._inOverdueCheck = False
 
 
     def enable(self):
@@ -576,30 +581,6 @@
                 if nextJob is None:
                     break
 
-                if nextJob.assigned is not None:
-                    if nextJob.overdue > nowTime:
-                        # If it is now assigned but not overdue, ignore as this may have
-                        # been returned after another txn just assigned it
-                        continue
-                    else:
-                        # It is overdue - check to see whether the work item is currently locked - if so no
-                        # need to re-assign
-                        running = yield nextJob.isRunning()
-                        if running:
-                            # Change the overdue to further in the future whilst we wait for
-                            # the running job to complete
-                            yield nextJob.bumpOverdue(self.queueOverdueTimeout)
-                            log.debug(
-                                "workCheck: bumped overdue timeout on jobid={jobid}",
-                                jobid=nextJob.jobID,
-                            )
-                            continue
-                        else:
-                            log.debug(
-                                "workCheck: overdue re-assignment for jobid={jobid}",
-                                jobid=nextJob.jobID,
-                            )
-
                 # Always assign as a new job even when it is an orphan
                 yield nextJob.assign(nowTime, self.queueOverdueTimeout)
                 self._timeOfLastWork = time.time()
@@ -664,46 +645,169 @@
         if loopCounter:
             log.debug("workCheck: processed {ctr} jobs in one loop", ctr=loopCounter)
 
-    _currentWorkDeferred = None
     _workCheckCall = None
 
+    @inlineCallbacks
     def _workCheckLoop(self):
         """
-        While the service is running, keep checking for any overdue / lost work
-        items and re-submit them to the cluster for processing.
+        While the service is running, keep check for work items and execute
+        them. Use a back-off strategy for polling to avoid using too much CPU
+        when there is not a lot to do.
         """
         self._workCheckCall = None
 
         if not self.running:
-            return
+            returnValue(None)
 
-        @passthru(
-            self._workCheck().addErrback(lambda result: log.error("_workCheckLoop: {exc}", exc=result)).addCallback
+        try:
+            yield self._workCheck()
+        except Exception as e:
+            log.error("_workCheckLoop: {exc}", exc=e)
+
+        if not self.running:
+            returnValue(None)
+
+        # Check for adjustment to poll interval - if the workCheck is idle for certain
+        # periods of time we will gradually increase the poll interval to avoid consuming
+        # excessive power when there is nothing to do
+        interval = self.queuePollInterval
+        idle = time.time() - self._timeOfLastWork
+        for threshold, poll in self.queuePollingBackoff:
+            if idle > threshold:
+                interval = poll
+                break
+        if self._actualPollInterval != interval:
+            log.debug("workCheckLoop: interval set to {interval}s", interval=interval)
+        self._actualPollInterval = interval
+        self._workCheckCall = self.reactor.callLater(
+            self._actualPollInterval, self._workCheckLoop
         )
-        def scheduleNext(result):
-            self._currentWorkDeferred = None
-            if not self.running:
-                return
 
-            # Check for adjustment to poll interval - if the workCheck is idle for certain
-            # periods of time we will gradually increase the poll interval to avoid consuming
-            # excessive power when there is nothing to do
-            interval = self.queuePollInterval
-            idle = time.time() - self._timeOfLastWork
-            for threshold, poll in self.queuePollingBackoff:
-                if idle > threshold:
-                    interval = poll
+
+    @inlineCallbacks
+    def _overdueCheck(self):
+        """
+        Every controller will periodically check for any overdue work and unassign that
+        work so that it gets execute during the next regular work check.
+        """
+
+        loopCounter = 0
+        while True:
+            if not self.running or self.disableWorkProcessing:
+                returnValue(None)
+
+            # Determine what the timestamp cutoff
+            # TODO: here is where we should iterate over the unlocked items
+            # that are due, ordered by priority, notBefore etc
+            nowTime = datetime.utcfromtimestamp(self.reactor.seconds())
+
+            self._inOverdueCheck = True
+            txn = overdueJob = None
+            try:
+                txn = self.transactionFactory(label="jobqueue.overdueCheck")
+                overdueJobs = yield JobItem.overduejobs(txn, nowTime, limit=1)
+                if overdueJobs:
+                    overdueJob = overdueJobs[0]
+                else:
                     break
-            if self._actualPollInterval != interval:
-                log.debug("workCheckLoop: interval set to {interval}s", interval=interval)
-            self._actualPollInterval = interval
-            self._workCheckCall = self.reactor.callLater(
-                self._actualPollInterval, self._workCheckLoop
-            )
 
-        self._currentWorkDeferred = scheduleNext
+                # It is overdue - check to see whether the work item is currently locked - if so no
+                # need to re-assign
+                running = yield overdueJob.isRunning()
+                if running:
+                    # Change the overdue to further in the future whilst we wait for
+                    # the running job to complete
+                    yield overdueJob.bumpOverdue(self.queueOverdueTimeout)
+                    log.debug(
+                        "overdueCheck: bumped overdue timeout on jobid={jobid}",
+                        jobid=overdueJob.jobID,
+                    )
+                else:
+                    # Unassign the job so it is picked up by the next L{_workCheck}
+                    yield overdueJob.unassign()
+                    log.debug(
+                        "overdueCheck: overdue unassigned for jobid={jobid}",
+                        jobid=overdueJob.jobID,
+                    )
+                loopCounter += 1
 
+            except Exception as e:
+                log.error(
+                    "Failed to process overdue job: {jobID}, {exc}",
+                    jobID=overdueJob.jobID if overdueJob else "?",
+                    exc=e,
+                )
+                if txn is not None:
+                    yield txn.abort()
+                    txn = None
 
+                # If we can identify the problem job, try and set it to failed so that it
+                # won't block other jobs behind it (it will be picked again when the failure
+                # interval is exceeded - but that has a back off so a permanently stuck item
+                # should fade away. We probably want to have some additional logic to simply
+                # remove something that is permanently failing.
+                if overdueJob is not None:
+                    txn = self.transactionFactory(label="jobqueue.overdueCheck.failed")
+                    try:
+                        failedJob = yield JobItem.load(txn, overdueJob.jobID)
+                        yield failedJob.failedToRun()
+                    except Exception as e:
+                        # Could not mark as failed - break out of the overdue job loop
+                        log.error(
+                            "Failed to mark failed overdue job:{}, {exc}",
+                            jobID=overdueJob.jobID,
+                            exc=e,
+                        )
+                        yield txn.abort()
+                        txn = None
+                        overdueJob = None
+                        break
+                    else:
+                        # Marked the problem one as failed, so keep going and get the next overdue job
+                        log.error("Marked failed overdue job: {jobID}", jobID=overdueJob.jobID)
+                        yield txn.commit()
+                        txn = None
+                        overdueJob = None
+                else:
+                    # Cannot mark anything as failed - break out of overdue job loop
+                    log.error("Cannot mark failed overdue job")
+                    break
+            finally:
+                if txn is not None:
+                    yield txn.commit()
+                    txn = None
+                self._inOverdueCheck = False
+
+        if loopCounter:
+            # Make sure the regular work check loop runs immediately if we processed any overdue items
+            yield self.enqueuedJob()
+            log.debug("overdueCheck: processed {ctr} jobs in one loop", ctr=loopCounter)
+
+    _overdueCheckCall = None
+
+    @inlineCallbacks
+    def _overdueCheckLoop(self):
+        """
+        While the service is running, keep checking for any overdue items.
+        """
+        self._overdueCheckCall = None
+
+        if not self.running:
+            returnValue(None)
+
+        try:
+            yield self._overdueCheck()
+        except Exception as e:
+            log.error("_overdueCheckLoop: {exc}", exc=e)
+
+        if not self.running:
+            returnValue(None)
+
+        self._overdueCheckCall = self.reactor.callLater(
+            self.queueOverduePollInterval, self._overdueCheckLoop
+        )
+
+
     def enqueuedJob(self):
         """
         Reschedule the work check loop to run right now. This should be called in response to "external" activity that
@@ -733,6 +837,7 @@
         """
         super(ControllerQueue, self).startService()
         self._workCheckLoop()
+        self._overdueCheckLoop()
 
 
     @inlineCallbacks
@@ -747,13 +852,13 @@
             self._workCheckCall.cancel()
             self._workCheckCall = None
 
-        if self._currentWorkDeferred is not None:
-            self._currentWorkDeferred.cancel()
-            self._currentWorkDeferred = None
+        if self._overdueCheckCall is not None:
+            self._overdueCheckCall.cancel()
+            self._overdueCheckCall = None
 
         # Wait for any active work check to finish (but no more than 1 minute)
         start = time.time()
-        while self._inWorkCheck:
+        while self._inWorkCheck and self._inOverdueCheck:
             d = Deferred()
             self.reactor.callLater(0.5, lambda : d.callback(None))
             yield d
@@ -780,7 +885,7 @@
         """
         Perform the given job right now.
         """
-        return JobItem.ultimatelyPerform(self.txnFactory, job.jobID)
+        return JobItem.ultimatelyPerform(self.txnFactory, job)
 
 
 

Modified: twext/trunk/twext/enterprise/jobs/test/test_jobs.py
===================================================================
--- twext/trunk/twext/enterprise/jobs/test/test_jobs.py	2015-09-03 20:40:30 UTC (rev 15095)
+++ twext/trunk/twext/enterprise/jobs/test/test_jobs.py	2015-09-03 20:49:06 UTC (rev 15096)
@@ -573,11 +573,11 @@
         self.assertTrue(job is None)
         self.assertTrue(work is None)
 
-        # Assigned job with past notBefore, but overdue is returned
+        # Assigned job with past notBefore, but overdue is not returned
         yield inTransaction(dbpool.connection, assignJob, when=now + datetime.timedelta(days=-1))
         job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_HIGH)
-        self.assertTrue(job is not None)
-        self.assertTrue(work.a == 2)
+        self.assertTrue(job is None)
+        self.assertTrue(work is None)
 
 
     @inlineCallbacks
@@ -1481,6 +1481,8 @@
             return _oldBumped(self, 100)
         self.patch(JobItem, "bumpOverdue", _newBump)
 
+        self.patch(ControllerQueue, "queueOverduePollInterval", 0.5)
+
         DummyWorkPauseItem.workStarted = Deferred()
         DummyWorkPauseItem.unpauseWork = Deferred()
 
@@ -1567,6 +1569,8 @@
         DummyWorkPauseItem.workStarted = Deferred()
         self.patch(DummyWorkPauseItem, "doWork", _newDoWorkRaise)
 
+        self.patch(ControllerQueue, "queueOverduePollInterval", 0.5)
+
         @transactionally(self.store.newTransaction)
         def _enqueue(txn):
             return txn.enqueue(
@@ -1635,6 +1639,8 @@
         DummyWorkPauseItem.workStarted = Deferred()
         self.patch(DummyWorkPauseItem, "doWork", _newDoWorkRaise)
 
+        self.patch(ControllerQueue, "queueOverduePollInterval", 0.5)
+
         @transactionally(self.store.newTransaction)
         def _enqueue(txn):
             return txn.enqueue(
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150903/ff08d70e/attachment-0001.html>


More information about the calendarserver-changes mailing list