[CalendarServer-changes] [14665] twext/trunk/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Wed Apr 8 06:51:33 PDT 2015


Revision: 14665
          http://trac.calendarserver.org//changeset/14665
Author:   cdaboo at apple.com
Date:     2015-04-08 06:51:33 -0700 (Wed, 08 Apr 2015)
Log Message:
-----------
Support temporary failures for job queue items.

Modified Paths:
--------------
    twext/trunk/twext/enterprise/jobqueue.py
    twext/trunk/twext/enterprise/test/test_jobqueue.py

Modified: twext/trunk/twext/enterprise/jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/jobqueue.py	2015-04-08 13:50:32 UTC (rev 14664)
+++ twext/trunk/twext/enterprise/jobqueue.py	2015-04-08 13:51:33 UTC (rev 14665)
@@ -326,6 +326,21 @@
 
 
 
+class JobTemporaryError(Exception):
+    """
+    A job failed to run due to a temporary failure. We will get the job to run again after the specified
+    interval (with a built-in back-off based on the number of failures also applied).
+    """
+
+    def __init__(self, delay):
+        """
+        @param delay: amount of time in seconds before it should run again
+        @type delay: L{int}
+        """
+        self.delay = delay
+
+
+
 class JobRunningError(Exception):
     """
     A job is already running.
@@ -452,6 +467,28 @@
         def _overtm(nb):
             return "{:.0f}".format(1000 * (t - astimestamp(nb)))
 
+        # Failed job clean-up
+        def _failureCleanUp(delay=None):
+            @inlineCallbacks
+            def _cleanUp2(txn2):
+                try:
+                    job = yield cls.load(txn2, jobID)
+                except NoSuchRecord:
+                    log.debug(
+                        "JobItem: {jobid} disappeared t={tm}",
+                        jobid=jobID,
+                        tm=_tm(),
+                    )
+                else:
+                    log.debug(
+                        "JobItem: {jobid} marking as failed {count} t={tm}",
+                        jobid=jobID,
+                        count=job.failed + 1,
+                        tm=_tm(),
+                    )
+                    yield job.failedToRun(locked=isinstance(e, JobRunningError), delay=delay)
+            return inTransaction(txnFactory, _cleanUp2, "ultimatelyPerform._failureCleanUp")
+
         log.debug("JobItem: {jobid} starting to run", jobid=jobID)
         txn = txnFactory(label="ultimatelyPerform: {}".format(jobID))
         try:
@@ -475,28 +512,24 @@
                 tm=_tm(),
             )
 
+        except JobTemporaryError as e:
+
+            # Temporary failure delay with back-off
+            def _temporaryFailure():
+                return _failureCleanUp(delay=e.delay * (job.failed + 1))
+            log.debug(
+                "JobItem: {jobid} {desc} {work} t={tm}",
+                jobid=jobID,
+                desc="temporary failure #{}".format(job.failed + 1),
+                work=job.workType,
+                tm=_tm(),
+            )
+            txn.postAbort(_temporaryFailure)
+            yield txn.abort()
+
         except (JobFailedError, JobRunningError) as e:
-            # Job failed: abort with cleanup, but pretend this method succeeded
-            def _cleanUp():
-                @inlineCallbacks
-                def _cleanUp2(txn2):
-                    try:
-                        job = yield cls.load(txn2, jobID)
-                    except NoSuchRecord:
-                        log.debug(
-                            "JobItem: {jobid} disappeared t={tm}",
-                            jobid=jobID,
-                            tm=_tm(),
-                        )
-                    else:
-                        log.debug(
-                            "JobItem: {jobid} marking as failed {count} t={tm}",
-                            jobid=jobID,
-                            count=job.failed + 1,
-                            tm=_tm(),
-                        )
-                        yield job.failedToRun(locked=isinstance(e, JobRunningError))
-                return inTransaction(txnFactory, _cleanUp2, "ultimatelyPerform._cleanUp")
+
+            # Permanent failure
             log.debug(
                 "JobItem: {jobid} {desc} {work} t={tm}",
                 jobid=jobID,
@@ -504,7 +537,7 @@
                 work=job.workType,
                 tm=_tm(),
             )
-            txn.postAbort(_cleanUp)
+            txn.postAbort(_failureCleanUp)
             yield txn.abort()
 
         except:
@@ -645,7 +678,10 @@
                     workid=workItem.workID,
                     exc=f,
                 )
-                raise JobFailedError(e)
+                if isinstance(e, JobTemporaryError):
+                    raise
+                else:
+                    raise JobFailedError(e)
 
         try:
             # Once the work is done we delete ourselves - NB this must be the last thing done

Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/test/test_jobqueue.py	2015-04-08 13:50:32 UTC (rev 14664)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py	2015-04-08 13:51:33 UTC (rev 14665)
@@ -38,14 +38,13 @@
 from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
 from twext.enterprise.fixtures import buildConnectionPool
 from twext.enterprise.fixtures import SteppablePoolHelper
-from twext.enterprise.jobqueue import (
-    inTransaction, PeerConnectionPool, astimestamp,
-    LocalPerformer, _IJobPerformer, WorkItem, WorkerConnectionPool,
-    ConnectionFromPeerNode,
-    _BaseQueuer, NonPerformingQueuer, JobItem,
-    WORK_PRIORITY_LOW, WORK_PRIORITY_HIGH, WORK_PRIORITY_MEDIUM,
-    JobDescriptor, SingletonWorkItem, JobFailedError
-)
+from twext.enterprise.jobqueue import \
+    inTransaction, PeerConnectionPool, astimestamp, \
+    LocalPerformer, _IJobPerformer, WorkItem, WorkerConnectionPool, \
+    ConnectionFromPeerNode, \
+    _BaseQueuer, NonPerformingQueuer, JobItem, \
+    WORK_PRIORITY_LOW, WORK_PRIORITY_HIGH, WORK_PRIORITY_MEDIUM, \
+    JobDescriptor, SingletonWorkItem, JobFailedError, JobTemporaryError
 import twext.enterprise.jobqueue
 
 # TODO: There should be a store-building utility within twext.enterprise.
@@ -280,6 +279,8 @@
     def doWork(self):
         if self.a == -1:
             raise ValueError("Ooops")
+        elif self.a == -2:
+            raise JobTemporaryError(120)
         self.results[self.jobID] = self.a + self.b
         return succeed(None)
 
@@ -1000,30 +1001,23 @@
         @transactionally(dbpool.pool.connection)
         @inlineCallbacks
         def setup(txn):
-            # First, one that's right now.
+            # OK
             yield DummyWorkItem.makeJob(
                 txn, a=1, b=0, notBefore=fakeNow - datetime.timedelta(20 * 60)
             )
 
-            # Next, create one that's actually far enough into the past to run.
+            # Error
             yield DummyWorkItem.makeJob(
                 txn, a=-1, b=1, notBefore=fakeNow - datetime.timedelta(20 * 60)
             )
 
-            # Finally, one that's actually scheduled for the future.
+            # OK
             yield DummyWorkItem.makeJob(
                 txn, a=2, b=0, notBefore=fakeNow - datetime.timedelta(20 * 60)
             )
         yield setup
         clock.advance(20 - 12)
 
-        # Wait for job
-#        while True:
-#            jobs = yield inTransaction(dbpool.pool.connection, lambda txn: JobItem.all(txn))
-#            if all([job.a == -1 for job in jobs]):
-#                break
-#            clock.advance(1)
-
         # Work item complete
         self.assertTrue(DummyWorkItem.results == {1: 1, 3: 2})
 
@@ -1062,7 +1056,41 @@
         self.assertTrue(jobs[0].notBefore > datetime.datetime.utcnow())
 
 
+    @inlineCallbacks
+    def test_temporaryFailure(self):
+        """
+        When a work item temporARILY fails it should appear as unassigned in the JOB
+        table and have the failure count bumped, and a notBefore set to the temporary delay.
+        """
+        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
+        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
 
+        # Let's create a couple of work items directly, not via the enqueue
+        # method, so that they exist but nobody will try to immediately execute
+        # them.
+
+        @transactionally(dbpool.pool.connection)
+        @inlineCallbacks
+        def setup(txn):
+            # Next, create failing work that's actually far enough into the past to run.
+            yield DummyWorkItem.makeJob(
+                txn, a=-2, b=1, notBefore=fakeNow - datetime.timedelta(20 * 60)
+            )
+        yield setup
+        clock.advance(20 - 12)
+
+        @transactionally(dbpool.pool.connection)
+        def check(txn):
+            return JobItem.all(txn)
+
+        jobs = yield check
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(jobs[0].assigned is None)
+        self.assertTrue(jobs[0].failed == 1)
+        self.assertTrue(jobs[0].notBefore > datetime.datetime.utcnow() + datetime.timedelta(seconds=90))
+
+
+
 class HalfConnection(object):
     def __init__(self, protocol):
         self.protocol = protocol
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150408/646c138a/attachment.html>


More information about the calendarserver-changes mailing list