[CalendarServer-changes] [13486] twext/trunk/twext/enterprise/jobqueue.py

source_changes at macosforge.org source_changes at macosforge.org
Fri May 16 11:05:32 PDT 2014


Revision: 13486
          http://trac.calendarserver.org//changeset/13486
Author:   cdaboo at apple.com
Date:     2014-05-16 11:05:32 -0700 (Fri, 16 May 2014)
Log Message:
-----------
Additional tracking of job stats.

Modified Paths:
--------------
    twext/trunk/twext/enterprise/jobqueue.py

Modified: twext/trunk/twext/enterprise/jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/jobqueue.py	2014-05-16 14:30:46 UTC (rev 13485)
+++ twext/trunk/twext/enterprise/jobqueue.py	2014-05-16 18:05:32 UTC (rev 13486)
@@ -138,6 +138,7 @@
 from zope.interface.interface import Interface
 from twext.enterprise.locking import NamedLock
 
+import collections
 import time
 
 log = Logger()
@@ -319,7 +320,7 @@
     _workTypeMap = None
 
     def descriptor(self):
-        return JobDescriptor(self.jobID, self.weight)
+        return JobDescriptor(self.jobID, self.weight, self.workType)
 
 
     def assign(self, now):
@@ -621,25 +622,33 @@
         """
         results = {}
         now = datetime.utcnow()
-        for workType in cls.workTypes():
-            results.setdefault(workType.table.model.name, [0, 0, 0, 0])
+        for workItemType in cls.workTypes():
+            workType = workItemType.table.model.name
+            results.setdefault(workType, {
+                "queued": 0,
+                "assigned": 0,
+                "late": 0,
+                "failed": 0,
+                "completed": WorkerConnectionPool.completed.get(workType, 0),
+                "time": WorkerConnectionPool.timing.get(workType, 0.0)
+            })
 
         jobs = yield cls.all(txn)
 
         for job in jobs:
             r = results[job.workType]
-            r[0] += 1
+            r["queued"] += 1
             if job.assigned is not None:
-                r[1] += 1
+                r["assigned"] += 1
+            if job.assigned is None and job.notBefore < now:
+                r["late"] += 1
             if job.failed:
-                r[2] += 1
-            if job.assigned is None and job.notBefore < now:
-                r[3] += 1
+                r["failed"] += 1
 
         returnValue(results)
 
 
-JobDescriptor = namedtuple("JobDescriptor", ["jobID", "weight"])
+JobDescriptor = namedtuple("JobDescriptor", ["jobID", "weight", "type"])
 
 class JobDescriptorArg(Argument):
     """
@@ -650,7 +659,7 @@
 
 
     def fromString(self, inString):
-        return JobDescriptor(*map(int, inString.split(",")))
+        return JobDescriptor(*[f(s) for f, s in zip((int, int, str,), inString.split(","))])
 
 
 # Priority for work - used to order work items in the job queue
@@ -1018,6 +1027,9 @@
     """
     implements(_IJobPerformer)
 
+    completed = collections.defaultdict(int)
+    timing = collections.defaultdict(float)
+
     def __init__(self, maximumLoadPerWorker=WORK_WEIGHT_CAPACITY):
         self.workers = []
         self.maximumLoadPerWorker = maximumLoadPerWorker
@@ -1088,6 +1100,7 @@
         return sorted(self.workers[:], key=lambda w: w.currentLoad)[0]
 
 
+    @inlineCallbacks
     def performJob(self, job):
         """
         Select a local worker that is idle enough to perform the given job,
@@ -1100,9 +1113,15 @@
             complete.
         @rtype: L{Deferred} firing L{dict}
         """
+
+        t = time.time()
         preferredWorker = self._selectLowestLoadWorker()
-        result = preferredWorker.performJob(job)
-        return result
+        try:
+            result = yield preferredWorker.performJob(job)
+        finally:
+            self.completed[job.type] += 1
+            self.timing[job.type] += time.time() - t
+        returnValue(result)
 
 
 
@@ -1536,6 +1555,7 @@
         self._listeningPort = None
         self._lastSeenTotalNodes = 1
         self._lastSeenNodeIndex = 1
+        self._lastMinPriority = WORK_PRIORITY_LOW
 
 
     def addPeerConnection(self, peer):
@@ -1653,16 +1673,19 @@
 
             # Check overload level first
             if level > self.overloadLevel:
-                log.error("workCheck: jobqueue is overloaded")
+                if self._lastMinPriority != WORK_PRIORITY_HIGH + 1:
+                    log.error("workCheck: jobqueue is overloaded")
+                self._lastMinPriority = WORK_PRIORITY_HIGH + 1
                 break
             elif level > self.highPriorityLevel:
-                log.debug("workCheck: jobqueue high priority only")
                 minPriority = WORK_PRIORITY_HIGH
             elif level > self.mediumPriorityLevel:
-                log.debug("workCheck: jobqueue high/medium priority only")
                 minPriority = WORK_PRIORITY_MEDIUM
             else:
                 minPriority = WORK_PRIORITY_LOW
+            if self._lastMinPriority != minPriority:
+                log.debug("workCheck: jobqueue priority limit change: {}".format(minPriority))
+            self._lastMinPriority = minPriority
 
             # Determine what the timestamp cutoff
             # TODO: here is where we should iterate over the unlocked items
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140516/6b4e4fb0/attachment.html>


More information about the calendarserver-changes mailing list