[CalendarServer-changes] [13486] twext/trunk/twext/enterprise/jobqueue.py
source_changes at macosforge.org
source_changes at macosforge.org
Fri May 16 11:05:32 PDT 2014
Revision: 13486
http://trac.calendarserver.org//changeset/13486
Author: cdaboo at apple.com
Date: 2014-05-16 11:05:32 -0700 (Fri, 16 May 2014)
Log Message:
-----------
Additional tracking of job stats.
Modified Paths:
--------------
twext/trunk/twext/enterprise/jobqueue.py
Modified: twext/trunk/twext/enterprise/jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/jobqueue.py 2014-05-16 14:30:46 UTC (rev 13485)
+++ twext/trunk/twext/enterprise/jobqueue.py 2014-05-16 18:05:32 UTC (rev 13486)
@@ -138,6 +138,7 @@
from zope.interface.interface import Interface
from twext.enterprise.locking import NamedLock
+import collections
import time
log = Logger()
@@ -319,7 +320,7 @@
_workTypeMap = None
def descriptor(self):
- return JobDescriptor(self.jobID, self.weight)
+ return JobDescriptor(self.jobID, self.weight, self.workType)
def assign(self, now):
@@ -621,25 +622,33 @@
"""
results = {}
now = datetime.utcnow()
- for workType in cls.workTypes():
- results.setdefault(workType.table.model.name, [0, 0, 0, 0])
+ for workItemType in cls.workTypes():
+ workType = workItemType.table.model.name
+ results.setdefault(workType, {
+ "queued": 0,
+ "assigned": 0,
+ "late": 0,
+ "failed": 0,
+ "completed": WorkerConnectionPool.completed.get(workType, 0),
+ "time": WorkerConnectionPool.timing.get(workType, 0.0)
+ })
jobs = yield cls.all(txn)
for job in jobs:
r = results[job.workType]
- r[0] += 1
+ r["queued"] += 1
if job.assigned is not None:
- r[1] += 1
+ r["assigned"] += 1
+ if job.assigned is None and job.notBefore < now:
+ r["late"] += 1
if job.failed:
- r[2] += 1
- if job.assigned is None and job.notBefore < now:
- r[3] += 1
+ r["failed"] += 1
returnValue(results)
-JobDescriptor = namedtuple("JobDescriptor", ["jobID", "weight"])
+JobDescriptor = namedtuple("JobDescriptor", ["jobID", "weight", "type"])
class JobDescriptorArg(Argument):
"""
@@ -650,7 +659,7 @@
def fromString(self, inString):
- return JobDescriptor(*map(int, inString.split(",")))
+ return JobDescriptor(*[f(s) for f, s in zip((int, int, str,), inString.split(","))])
# Priority for work - used to order work items in the job queue
@@ -1018,6 +1027,9 @@
"""
implements(_IJobPerformer)
+ completed = collections.defaultdict(int)
+ timing = collections.defaultdict(float)
+
def __init__(self, maximumLoadPerWorker=WORK_WEIGHT_CAPACITY):
self.workers = []
self.maximumLoadPerWorker = maximumLoadPerWorker
@@ -1088,6 +1100,7 @@
return sorted(self.workers[:], key=lambda w: w.currentLoad)[0]
+ @inlineCallbacks
def performJob(self, job):
"""
Select a local worker that is idle enough to perform the given job,
@@ -1100,9 +1113,15 @@
complete.
@rtype: L{Deferred} firing L{dict}
"""
+
+ t = time.time()
preferredWorker = self._selectLowestLoadWorker()
- result = preferredWorker.performJob(job)
- return result
+ try:
+ result = yield preferredWorker.performJob(job)
+ finally:
+ self.completed[job.type] += 1
+ self.timing[job.type] += time.time() - t
+ returnValue(result)
@@ -1536,6 +1555,7 @@
self._listeningPort = None
self._lastSeenTotalNodes = 1
self._lastSeenNodeIndex = 1
+ self._lastMinPriority = WORK_PRIORITY_LOW
def addPeerConnection(self, peer):
@@ -1653,16 +1673,19 @@
# Check overload level first
if level > self.overloadLevel:
- log.error("workCheck: jobqueue is overloaded")
+ if self._lastMinPriority != WORK_PRIORITY_HIGH + 1:
+ log.error("workCheck: jobqueue is overloaded")
+ self._lastMinPriority = WORK_PRIORITY_HIGH + 1
break
elif level > self.highPriorityLevel:
- log.debug("workCheck: jobqueue high priority only")
minPriority = WORK_PRIORITY_HIGH
elif level > self.mediumPriorityLevel:
- log.debug("workCheck: jobqueue high/medium priority only")
minPriority = WORK_PRIORITY_MEDIUM
else:
minPriority = WORK_PRIORITY_LOW
+ if self._lastMinPriority != minPriority:
+ log.debug("workCheck: jobqueue priority limit change: {}".format(minPriority))
+ self._lastMinPriority = minPriority
# Determine what the timestamp cutoff
# TODO: here is where we should iterate over the unlocked items
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140516/6b4e4fb0/attachment.html>
More information about the calendarserver-changes
mailing list