[CalendarServer-changes] [13687] twext/trunk/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Wed Jun 25 11:38:55 PDT 2014


Revision: 13687
          http://trac.calendarserver.org//changeset/13687
Author:   cdaboo at apple.com
Date:     2014-06-25 11:38:55 -0700 (Wed, 25 Jun 2014)
Log Message:
-----------
Add staggering and separate reschedule time for locked jobs. Fix logging error.

Modified Paths:
--------------
    twext/trunk/twext/enterprise/jobqueue.py
    twext/trunk/twext/enterprise/test/test_jobqueue.py

Modified: twext/trunk/twext/enterprise/jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/jobqueue.py	2014-06-25 18:31:40 UTC (rev 13686)
+++ twext/trunk/twext/enterprise/jobqueue.py	2014-06-25 18:38:55 UTC (rev 13687)
@@ -354,8 +354,8 @@
     _workTypes = None
     _workTypeMap = None
 
+    lockRescheduleInterval = 60     # When a job can't run because of a lock, reschedule it this number of seconds in the future
     failureRescheduleInterval = 60  # When a job fails, reschedule it this number of seconds in the future
-    lockRescheduleInterval = 5      # When a job is locked, reschedule it this number of seconds in the future
 
     def descriptor(self):
         return JobDescriptor(self.jobID, self.weight, self.workType)
@@ -385,21 +385,29 @@
         return self.update(overdue=self.overdue + timedelta(seconds=bump))
 
 
-    def failedToRun(self, delay=None):
+    def failedToRun(self, locked=False, delay=None):
         """
         The attempt to run the job failed. Leave it in the queue, but mark it
         as unassigned, bump the failure count and set to run at some point in
         the future.
 
-        @param delay: the number of seconds in the future at which to reschedule the
-            next execution of the job. If L{None} use the default class property value.
+        @param lock: indicates if the failure was due to a lock timeout.
+        @type lock: L{bool}
+        @param delay: how long before the job is run again, or C{None} for a default
+            staggered delay behavior.
         @type delay: L{int}
         """
+
+        # notBefore is set to the chosen interval multiplied by the failure count, which
+        # results in an incremental backoff for failures
+        if delay is None:
+            delay = self.lockRescheduleInterval if locked else self.failureRescheduleInterval
+            delay *= (self.failed + 1)
         return self.update(
             assigned=None,
             overdue=None,
-            failed=self.failed + 1,
-            notBefore=datetime.utcnow() + timedelta(seconds=self.failureRescheduleInterval if delay is None else delay)
+            failed=self.failed + (0 if locked else 1),
+            notBefore=datetime.utcnow() + timedelta(seconds=delay)
         )
 
 
@@ -452,18 +460,25 @@
 
         except (JobFailedError, JobRunningError) as e:
             # Job failed: abort with cleanup, but pretend this method succeeded
-            delay = job.lockRescheduleInterval if isinstance(e, JobRunningError) else job.failureRescheduleInterval
             def _cleanUp():
                 @inlineCallbacks
                 def _cleanUp2(txn2):
-                    job = yield cls.load(txn2, jobID)
-                    log.debug(
-                        "JobItem: {jobid} marking as failed {count} t={tm}",
-                        jobid=jobID,
-                        count=job.failed + 1,
-                        tm=_tm(),
-                    )
-                    yield job.failedToRun(delay=delay)
+                    try:
+                        job = yield cls.load(txn2, jobID)
+                    except NoSuchRecord:
+                        log.debug(
+                            "JobItem: {jobid} disappeared t={tm}",
+                            jobid=jobID,
+                            tm=_tm(),
+                        )
+                    else:
+                        log.debug(
+                            "JobItem: {jobid} marking as failed {count} t={tm}",
+                            jobid=jobID,
+                            count=job.failed + 1,
+                            tm=_tm(),
+                        )
+                        yield job.failedToRun(locked=isinstance(e, JobRunningError))
                 return inTransaction(txnFactory, _cleanUp2, "ultimatelyPerform._cleanUp")
             log.debug(
                 "JobItem: {jobid} {desc} {work} t={tm}",
@@ -662,7 +677,7 @@
 
             cls._workTypeMap = {}
             for subcls in cls._workTypes:
-                cls._workTypeMap[subcls.table.model.name] = subcls
+                cls._workTypeMap[subcls.workType()] = subcls
 
         return cls._workTypes
 
@@ -702,7 +717,7 @@
         results = {}
         now = datetime.utcnow()
         for workItemType in cls.workTypes():
-            workType = workItemType.table.model.name
+            workType = workItemType.workType()
             results.setdefault(workType, {
                 "queued": 0,
                 "assigned": 0,
@@ -860,6 +875,11 @@
     _tableNameMap = {}
 
     @classmethod
+    def workType(cls):
+        return cls.table.model.name
+
+
+    @classmethod
     @inlineCallbacks
     def makeJob(cls, transaction, **kwargs):
         """
@@ -871,7 +891,7 @@
         """
 
         jobargs = {
-            "workType": cls.table.model.name
+            "workType": cls.workType()
         }
 
         def _transferArg(name):
@@ -1965,7 +1985,7 @@
             return
 
         @passthru(
-            self._workCheck().addErrback(log.error).addCallback
+            self._workCheck().addErrback(lambda result: log.error("_workCheckLoop: {exc}", exc=result)).addCallback
         )
         def scheduleNext(result):
             # TODO: if multiple nodes are present, see if we can

Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/test/test_jobqueue.py	2014-06-25 18:31:40 UTC (rev 13686)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py	2014-06-25 18:38:55 UTC (rev 13687)
@@ -1364,13 +1364,12 @@
         L{JobItem.run} fails an aggregated work item and then ignores it.
         """
 
-        # Patch JobItem.assign and JobItem.bumpOverdue to track how many times
-        # they are called.
+        # Patch JobItem.failedToRun to track how many times it is called.
         failed = [0]
         _oldFailed = JobItem.failedToRun
-        def _newFailed(self, delay):
+        def _newFailed(self, locked=False, delay=None):
             failed[0] += 1
-            return _oldFailed(self, delay)
+            return _oldFailed(self, locked, 5)
         self.patch(JobItem, "failedToRun", _newFailed)
 
         @transactionally(self.store.newTransaction)
@@ -1378,13 +1377,14 @@
             return txn.enqueue(
                 AggregatorWorkItem, a=1, b=1, workID=1
             )
-        yield _enqueue1
 
         @transactionally(self.store.newTransaction)
         def _enqueue2(txn):
             return txn.enqueue(
                 AggregatorWorkItem, a=1, b=2, workID=2
             )
+
+        yield _enqueue1
         yield _enqueue2
 
         # Make sure we have one JOB and one DUMMY_WORK_ITEM
@@ -1411,9 +1411,9 @@
         # they are called.
         failed = [0]
         _oldFailed = JobItem.failedToRun
-        def _newFailed(self, delay):
+        def _newFailed(self, locked=False, delay=None):
             failed[0] += 1
-            return _oldFailed(self, delay)
+            return _oldFailed(self, locked, 5)
         self.patch(JobItem, "failedToRun", _newFailed)
 
         @transactionally(self.store.newTransaction)
@@ -1421,13 +1421,14 @@
             return txn.enqueue(
                 AggregatorWorkItem, a=1, b=1, workID=1
             )
-        yield _enqueue1
 
         @transactionally(self.store.newTransaction)
         def _enqueue2(txn):
             return txn.enqueue(
                 AggregatorWorkItem, a=1, b=1, workID=2
             )
+
+        yield _enqueue1
         yield _enqueue2
 
         # Make sure we have one JOB and one DUMMY_WORK_ITEM
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140625/53bf61a2/attachment.html>


More information about the calendarserver-changes mailing list