[CalendarServer-changes] [15018] twext/trunk/twext/enterprise/jobqueue.py

source_changes at macosforge.org source_changes at macosforge.org
Fri Jul 31 12:10:22 PDT 2015


Revision: 15018
          http://trac.calendarserver.org//changeset/15018
Author:   cdaboo at apple.com
Date:     2015-07-31 12:10:22 -0700 (Fri, 31 Jul 2015)
Log Message:
-----------
Fix hang on service shut down due to race condition with waiting for work check loop to finish.

Modified Paths:
--------------
    twext/trunk/twext/enterprise/jobqueue.py

Modified: twext/trunk/twext/enterprise/jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/jobqueue.py	2015-07-31 16:46:12 UTC (rev 15017)
+++ twext/trunk/twext/enterprise/jobqueue.py	2015-07-31 19:10:22 UTC (rev 15018)
@@ -2115,8 +2115,6 @@
             if not self.running or self.disableWorkProcessing:
                 returnValue(None)
 
-            self._inWorkCheck = True
-
             # Check the overall service load - if overloaded skip this poll cycle.
             # FIXME: need to include capacity of other nodes. For now we only check
             # our own capacity and stop processing if too busy. Other nodes that
@@ -2151,9 +2149,10 @@
             # that are due, ordered by priority, notBefore etc
             nowTime = datetime.utcfromtimestamp(self.reactor.seconds())
 
-            txn = self.transactionFactory(label="jobqueue.workCheck")
-            nextJob = None
+            self._inWorkCheck = True
+            txn = nextJob = None
             try:
+                txn = self.transactionFactory(label="jobqueue.workCheck")
                 nextJob = yield JobItem.nextjob(txn, nowTime, minPriority)
                 if nextJob is None:
                     break
@@ -2193,8 +2192,9 @@
                     jobID=nextJob.jobID if nextJob else "?",
                     exc=e,
                 )
-                yield txn.abort()
-                txn = None
+                if txn is not None:
+                    yield txn.abort()
+                    txn = None
 
                 # If we can identify the problem job, try and set it to failed so that it
                 # won't block other jobs behind it (it will be picked again when the failure
@@ -2228,7 +2228,7 @@
                     log.error("Cannot mark failed new job")
                     break
             finally:
-                if txn:
+                if txn is not None:
                     yield txn.commit()
                     txn = None
                 self._inWorkCheck = False
@@ -2386,11 +2386,14 @@
         for peer in self.peers:
             peer.transport.abortConnection()
 
-        # Wait for any active work check to finish
+        # Wait for any active work check to finish (but no more than 1 minute)
+        start = time.time()
         while self._inWorkCheck:
             d = Deferred()
             self.reactor.callLater(0.5, lambda : d.callback(None))
             yield d
+            if time.time() - start >= 60:
+                break
 
 
     def activeNodes(self, txn):
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150731/7d8dc64d/attachment.html>


More information about the calendarserver-changes mailing list