[CalendarServer-changes] [15018] twext/trunk/twext/enterprise/jobqueue.py
source_changes at macosforge.org
source_changes at macosforge.org
Fri Jul 31 12:10:22 PDT 2015
Revision: 15018
http://trac.calendarserver.org//changeset/15018
Author: cdaboo at apple.com
Date: 2015-07-31 12:10:22 -0700 (Fri, 31 Jul 2015)
Log Message:
-----------
Fix hang on service shut down due to race condition with waiting for work check loop to finish.
Modified Paths:
--------------
twext/trunk/twext/enterprise/jobqueue.py
Modified: twext/trunk/twext/enterprise/jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/jobqueue.py 2015-07-31 16:46:12 UTC (rev 15017)
+++ twext/trunk/twext/enterprise/jobqueue.py 2015-07-31 19:10:22 UTC (rev 15018)
@@ -2115,8 +2115,6 @@
if not self.running or self.disableWorkProcessing:
returnValue(None)
- self._inWorkCheck = True
-
# Check the overall service load - if overloaded skip this poll cycle.
# FIXME: need to include capacity of other nodes. For now we only check
# our own capacity and stop processing if too busy. Other nodes that
@@ -2151,9 +2149,10 @@
# that are due, ordered by priority, notBefore etc
nowTime = datetime.utcfromtimestamp(self.reactor.seconds())
- txn = self.transactionFactory(label="jobqueue.workCheck")
- nextJob = None
+ self._inWorkCheck = True
+ txn = nextJob = None
try:
+ txn = self.transactionFactory(label="jobqueue.workCheck")
nextJob = yield JobItem.nextjob(txn, nowTime, minPriority)
if nextJob is None:
break
@@ -2193,8 +2192,9 @@
jobID=nextJob.jobID if nextJob else "?",
exc=e,
)
- yield txn.abort()
- txn = None
+ if txn is not None:
+ yield txn.abort()
+ txn = None
# If we can identify the problem job, try and set it to failed so that it
# won't block other jobs behind it (it will be picked again when the failure
@@ -2228,7 +2228,7 @@
log.error("Cannot mark failed new job")
break
finally:
- if txn:
+ if txn is not None:
yield txn.commit()
txn = None
self._inWorkCheck = False
@@ -2386,11 +2386,14 @@
for peer in self.peers:
peer.transport.abortConnection()
- # Wait for any active work check to finish
+ # Wait for any active work check to finish (but no more than 1 minute)
+ start = time.time()
while self._inWorkCheck:
d = Deferred()
self.reactor.callLater(0.5, lambda : d.callback(None))
yield d
+ if time.time() - start >= 60:
+ break
def activeNodes(self, txn):
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150731/7d8dc64d/attachment.html>
More information about the calendarserver-changes
mailing list