[CalendarServer-changes] [13687] twext/trunk/twext/enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Wed Jun 25 11:38:55 PDT 2014
Revision: 13687
http://trac.calendarserver.org//changeset/13687
Author: cdaboo at apple.com
Date: 2014-06-25 11:38:55 -0700 (Wed, 25 Jun 2014)
Log Message:
-----------
Add staggering and separate reschedule time for locked jobs. Fix logging error.
Modified Paths:
--------------
twext/trunk/twext/enterprise/jobqueue.py
twext/trunk/twext/enterprise/test/test_jobqueue.py
Modified: twext/trunk/twext/enterprise/jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/jobqueue.py 2014-06-25 18:31:40 UTC (rev 13686)
+++ twext/trunk/twext/enterprise/jobqueue.py 2014-06-25 18:38:55 UTC (rev 13687)
@@ -354,8 +354,8 @@
_workTypes = None
_workTypeMap = None
+ lockRescheduleInterval = 60 # When a job can't run because of a lock, reschedule it this number of seconds in the future
failureRescheduleInterval = 60 # When a job fails, reschedule it this number of seconds in the future
- lockRescheduleInterval = 5 # When a job is locked, reschedule it this number of seconds in the future
def descriptor(self):
return JobDescriptor(self.jobID, self.weight, self.workType)
@@ -385,21 +385,29 @@
return self.update(overdue=self.overdue + timedelta(seconds=bump))
- def failedToRun(self, delay=None):
+ def failedToRun(self, locked=False, delay=None):
"""
The attempt to run the job failed. Leave it in the queue, but mark it
as unassigned, bump the failure count and set to run at some point in
the future.
- @param delay: the number of seconds in the future at which to reschedule the
- next execution of the job. If L{None} use the default class property value.
+ @param lock: indicates if the failure was due to a lock timeout.
+ @type lock: L{bool}
+ @param delay: how long before the job is run again, or C{None} for a default
+ staggered delay behavior.
@type delay: L{int}
"""
+
+ # notBefore is set to the chosen interval multiplied by the failure count, which
+ # results in an incremental backoff for failures
+ if delay is None:
+ delay = self.lockRescheduleInterval if locked else self.failureRescheduleInterval
+ delay *= (self.failed + 1)
return self.update(
assigned=None,
overdue=None,
- failed=self.failed + 1,
- notBefore=datetime.utcnow() + timedelta(seconds=self.failureRescheduleInterval if delay is None else delay)
+ failed=self.failed + (0 if locked else 1),
+ notBefore=datetime.utcnow() + timedelta(seconds=delay)
)
@@ -452,18 +460,25 @@
except (JobFailedError, JobRunningError) as e:
# Job failed: abort with cleanup, but pretend this method succeeded
- delay = job.lockRescheduleInterval if isinstance(e, JobRunningError) else job.failureRescheduleInterval
def _cleanUp():
@inlineCallbacks
def _cleanUp2(txn2):
- job = yield cls.load(txn2, jobID)
- log.debug(
- "JobItem: {jobid} marking as failed {count} t={tm}",
- jobid=jobID,
- count=job.failed + 1,
- tm=_tm(),
- )
- yield job.failedToRun(delay=delay)
+ try:
+ job = yield cls.load(txn2, jobID)
+ except NoSuchRecord:
+ log.debug(
+ "JobItem: {jobid} disappeared t={tm}",
+ jobid=jobID,
+ tm=_tm(),
+ )
+ else:
+ log.debug(
+ "JobItem: {jobid} marking as failed {count} t={tm}",
+ jobid=jobID,
+ count=job.failed + 1,
+ tm=_tm(),
+ )
+ yield job.failedToRun(locked=isinstance(e, JobRunningError))
return inTransaction(txnFactory, _cleanUp2, "ultimatelyPerform._cleanUp")
log.debug(
"JobItem: {jobid} {desc} {work} t={tm}",
@@ -662,7 +677,7 @@
cls._workTypeMap = {}
for subcls in cls._workTypes:
- cls._workTypeMap[subcls.table.model.name] = subcls
+ cls._workTypeMap[subcls.workType()] = subcls
return cls._workTypes
@@ -702,7 +717,7 @@
results = {}
now = datetime.utcnow()
for workItemType in cls.workTypes():
- workType = workItemType.table.model.name
+ workType = workItemType.workType()
results.setdefault(workType, {
"queued": 0,
"assigned": 0,
@@ -860,6 +875,11 @@
_tableNameMap = {}
@classmethod
+ def workType(cls):
+ return cls.table.model.name
+
+
+ @classmethod
@inlineCallbacks
def makeJob(cls, transaction, **kwargs):
"""
@@ -871,7 +891,7 @@
"""
jobargs = {
- "workType": cls.table.model.name
+ "workType": cls.workType()
}
def _transferArg(name):
@@ -1965,7 +1985,7 @@
return
@passthru(
- self._workCheck().addErrback(log.error).addCallback
+ self._workCheck().addErrback(lambda result: log.error("_workCheckLoop: {exc}", exc=result)).addCallback
)
def scheduleNext(result):
# TODO: if multiple nodes are present, see if we can
Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/test/test_jobqueue.py 2014-06-25 18:31:40 UTC (rev 13686)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py 2014-06-25 18:38:55 UTC (rev 13687)
@@ -1364,13 +1364,12 @@
L{JobItem.run} fails an aggregated work item and then ignores it.
"""
- # Patch JobItem.assign and JobItem.bumpOverdue to track how many times
- # they are called.
+ # Patch JobItem.failedToRun to track how many times it is called.
failed = [0]
_oldFailed = JobItem.failedToRun
- def _newFailed(self, delay):
+ def _newFailed(self, locked=False, delay=None):
failed[0] += 1
- return _oldFailed(self, delay)
+ return _oldFailed(self, locked, 5)
self.patch(JobItem, "failedToRun", _newFailed)
@transactionally(self.store.newTransaction)
@@ -1378,13 +1377,14 @@
return txn.enqueue(
AggregatorWorkItem, a=1, b=1, workID=1
)
- yield _enqueue1
@transactionally(self.store.newTransaction)
def _enqueue2(txn):
return txn.enqueue(
AggregatorWorkItem, a=1, b=2, workID=2
)
+
+ yield _enqueue1
yield _enqueue2
# Make sure we have one JOB and one DUMMY_WORK_ITEM
@@ -1411,9 +1411,9 @@
# they are called.
failed = [0]
_oldFailed = JobItem.failedToRun
- def _newFailed(self, delay):
+ def _newFailed(self, locked=False, delay=None):
failed[0] += 1
- return _oldFailed(self, delay)
+ return _oldFailed(self, locked, 5)
self.patch(JobItem, "failedToRun", _newFailed)
@transactionally(self.store.newTransaction)
@@ -1421,13 +1421,14 @@
return txn.enqueue(
AggregatorWorkItem, a=1, b=1, workID=1
)
- yield _enqueue1
@transactionally(self.store.newTransaction)
def _enqueue2(txn):
return txn.enqueue(
AggregatorWorkItem, a=1, b=1, workID=2
)
+
+ yield _enqueue1
yield _enqueue2
# Make sure we have one JOB and one DUMMY_WORK_ITEM
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140625/53bf61a2/attachment.html>
More information about the calendarserver-changes
mailing list