[CalendarServer-changes] [13483] twext/trunk/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Thu May 15 18:50:56 PDT 2014


Revision: 13483
          http://trac.calendarserver.org//changeset/13483
Author:   cdaboo at apple.com
Date:     2014-05-15 18:50:56 -0700 (Thu, 15 May 2014)
Log Message:
-----------
OPtimize work item lookup. Tweak logging. Add additional tracking for dashboard.

Modified Paths:
--------------
    twext/trunk/twext/enterprise/jobqueue.py
    twext/trunk/twext/enterprise/test/test_jobqueue.py

Modified: twext/trunk/twext/enterprise/jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/jobqueue.py	2014-05-16 01:44:14 UTC (rev 13482)
+++ twext/trunk/twext/enterprise/jobqueue.py	2014-05-16 01:50:56 UTC (rev 13483)
@@ -78,6 +78,34 @@
             # queuer is a provider of IQueuer, of which there are several
             # implementations in this module.
             queuer.enqueueWork(txn, CouponWork, customerID=customerID)
+
+More details:
+
+    A master process in each node (host in a multi-host setup) has a:
+
+    L{WorkerConnectionPool}: this maintains a list of child processes that
+        have connected to the master over AMP. It is responsible for
+        dispatching work that is to be performed locally on that node.
+        The child process is identified by an L{ConnectionFromWorker}
+        object which maintains the child AMP connection. The
+        L{ConnectionFromWorker} tracks the load on its child so that
+        work can be distributed evenly or halted if the node is too busy.
+
+    L{PeerConnectionPool}: this is an AMP based service that connects a node
+        to all the other nodes in the cluster. It also runs the main job
+        queue loop to dispatch enqueued work when it becomes due. The master
+        maintains a list of other nodes via L{ConnectionFromPeerNode} objects,
+        which maintain the AMP connections. L{ConnectionFromPeerNode} can
+        report its load to others, and can receive work which it must perform
+        locally (via a dispatch to a child).
+
+    A child process has:
+
+    L{ConnectionFromController}: an AMP connection to the master. The master
+        will use this to dispatch work to the child. The child can also
+        use this to tell the master that work needs to be performed - that
+        can be used when the work needs to be distributed evenly to any
+        child.
 """
 
 from functools import wraps
@@ -287,19 +315,13 @@
     associated with work items.
     """
 
+    _workTypes = None
+    _workTypeMap = None
+
     def descriptor(self):
         return JobDescriptor(self.jobID, self.weight)
 
 
-    @inlineCallbacks
-    def workItem(self):
-        workItemClass = WorkItem.forTableName(self.workType)
-        workItems = yield workItemClass.loadForJob(
-            self.transaction, self.jobID
-        )
-        returnValue(workItems[0] if len(workItems) == 1 else None)
-
-
     def assign(self, now):
         """
         Mark this job as assigned to a worker by setting the assigned column to the current,
@@ -350,13 +372,13 @@
         def _overtm(nb):
             return "{:.0f}".format(1000 * (t - astimestamp(nb)))
 
-        log.debug("JobItem: starting to run {jobid}".format(jobid=jobID))
+        log.debug("JobItem: {jobid} starting to run".format(jobid=jobID))
         txn = txnFactory(label="ultimatelyPerform: {}".format(jobID))
         try:
             job = yield cls.load(txn, jobID)
             if hasattr(txn, "_label"):
                 txn._label = "{} <{}>".format(txn._label, job.workType)
-            log.debug("JobItem: loaded {jobid} {work} t={tm}".format(
+            log.debug("JobItem: {jobid} loaded {work} t={tm}".format(
                 jobid=jobID,
                 work=job.workType,
                 tm=_tm())
@@ -366,7 +388,7 @@
         except NoSuchRecord:
             # The record has already been removed
             yield txn.commit()
-            log.debug("JobItem: already removed {jobid} t={tm}".format(jobid=jobID, tm=_tm()))
+            log.debug("JobItem: {jobid} already removed t={tm}".format(jobid=jobID, tm=_tm()))
 
         except JobFailedError:
             # Job failed: abort with cleanup, but pretend this method succeeded
@@ -374,12 +396,12 @@
                 @inlineCallbacks
                 def _cleanUp2(txn2):
                     job = yield cls.load(txn2, jobID)
-                    log.debug("JobItem: marking as failed {jobid}, failure count: {count} t={tm}".format(jobid=jobID, count=job.failed + 1, tm=_tm()))
+                    log.debug("JobItem: {jobid} marking as failed {count} t={tm}".format(jobid=jobID, count=job.failed + 1, tm=_tm()))
                     yield job.failedToRun()
                 return inTransaction(txnFactory, _cleanUp2, "ultimatelyPerform._cleanUp")
             txn.postAbort(_cleanUp)
             yield txn.abort()
-            log.debug("JobItem: failed {jobid} {work} t={tm}".format(
+            log.debug("JobItem: {jobid} failed {work} t={tm}".format(
                 jobid=jobID,
                 work=job.workType,
                 tm=_tm()
@@ -387,7 +409,7 @@
 
         except:
             f = Failure()
-            log.error("JobItem: Unknown exception for {jobid} failed t={tm} {exc}".format(
+            log.error("JobItem: {jobid} unknown exception t={tm} {exc}".format(
                 jobid=jobID,
                 tm=_tm(),
                 exc=f,
@@ -397,7 +419,7 @@
 
         else:
             yield txn.commit()
-            log.debug("JobItem: completed {jobid} {work} t={tm} over={over}".format(
+            log.debug("JobItem: {jobid} completed {work} t={tm} over={over}".format(
                 jobid=jobID,
                 work=job.workType,
                 tm=_tm(),
@@ -482,10 +504,24 @@
         Run this job item by finding the appropriate work item class and
         running that.
         """
+
+        # TODO: Move group into the JOB table.
+        # Do a select * where group = X for update nowait to lock
+        # all rows in the group - on exception raise JobFailed
+        # with a rollback to allow the job to be re-assigned to a later
+        # date.
         workItem = yield self.workItem()
         if workItem is not None:
             if workItem.group is not None:
-                yield NamedLock.acquire(self.transaction, workItem.group)
+                try:
+                    yield NamedLock.acquire(self.transaction, workItem.group)
+                except Exception as e:
+                    log.error("JobItem: {jobid}, WorkItem: {workid} lock failed: {exc}".format(
+                        jobid=self.jobID,
+                        workid=workItem.workID,
+                        exc=e,
+                    ))
+                    raise JobFailedError(e)
 
             try:
                 # Once the work is done we delete ourselves
@@ -512,16 +548,51 @@
             pass
 
 
+    @inlineCallbacks
+    def workItem(self):
+        workItemClass = self.workItemForType(self.workType)
+        workItems = yield workItemClass.loadForJob(
+            self.transaction, self.jobID
+        )
+        returnValue(workItems[0] if len(workItems) == 1 else None)
+
+
     @classmethod
+    def workItemForType(cls, workType):
+        if cls._workTypeMap is None:
+            cls.workTypes()
+        return cls._workTypeMap[workType]
+
+
+    @classmethod
     def workTypes(cls):
         """
         @return: All of the work item types.
         @rtype: iterable of L{WorkItem} subclasses
         """
-        return WorkItem.__subclasses__()
+        if cls._workTypes is None:
+            cls._workTypes = []
+            def getWorkType(subcls, appendTo):
+                if hasattr(subcls, "table"):
+                    appendTo.append(subcls)
+                else:
+                    for subsubcls in subcls.__subclasses__():
+                        getWorkType(subsubcls, appendTo)
+            getWorkType(WorkItem, cls._workTypes)
 
+            cls._workTypeMap = {}
+            for subcls in cls._workTypes:
+                cls._workTypeMap[subcls.table.model.name] = subcls
 
+        return cls._workTypes
+
+
     @classmethod
+    def numberOfWorkTypes(cls):
+        return len(cls.workTypes())
+
+
+    @classmethod
     @inlineCallbacks
     def waitEmpty(cls, txnCreator, reactor, timeout):
         """
@@ -568,11 +639,6 @@
         returnValue(results)
 
 
-    @classmethod
-    def numberOfWorkTypes(cls):
-        return len(cls.workTypes())
-
-
 JobDescriptor = namedtuple("JobDescriptor", ["jobID", "weight"])
 
 class JobDescriptorArg(Argument):
@@ -703,8 +769,8 @@
     group = None
     default_priority = WORK_PRIORITY_LOW    # Default - subclasses should override
     default_weight = WORK_WEIGHT_5          # Default - subclasses should override
+    _tableNameMap = {}
 
-
     @classmethod
     @inlineCallbacks
     def makeJob(cls, transaction, **kwargs):
@@ -763,30 +829,7 @@
         raise NotImplementedError
 
 
-    @classmethod
-    def forTableName(cls, tableName):
-        """
-        Look up a work-item class given a particular table name.  Factoring
-        this correctly may place it into L{twext.enterprise.record.Record}
-        instead; it is probably generally useful to be able to look up a mapped
-        class from a table.
 
-        @param tableName: the name of the table to look up
-        @type tableName: L{str}
-
-        @return: the relevant subclass
-        @rtype: L{type}
-        """
-        for subcls in cls.__subclasses__():
-            clstable = getattr(subcls, "table", None)
-            if tableName == clstable.model.name:
-                return subcls
-        raise KeyError("No mapped {0} class for {1}.".format(
-            cls, tableName
-        ))
-
-
-
 class PerformJob(Command):
     """
     Notify another process that it must do a job that has been persisted to
@@ -1024,7 +1067,7 @@
         """
         The load of all currently connected workers.
         """
-        return [(worker.currentLoad, worker.totalCompleted) for worker in self.workers]
+        return [(worker.currentAssigned, worker.currentLoad, worker.totalCompleted) for worker in self.workers]
 
 
     def allWorkerLoad(self):
@@ -1072,11 +1115,20 @@
     def __init__(self, peerPool, boxReceiver=None, locator=None):
         super(ConnectionFromWorker, self).__init__(boxReceiver, locator)
         self.peerPool = peerPool
+        self._assigned = 0
         self._load = 0
         self._completed = 0
 
 
     @property
+    def currentAssigned(self):
+        """
+        How many jobs currently assigned to this worker?
+        """
+        return self._assigned
+
+
+    @property
     def currentLoad(self):
         """
         What is the current load of this worker?
@@ -1120,10 +1172,12 @@
             L{ConnectionFromController.actuallyReallyExecuteJobHere}.
         """
         d = self.callRemote(PerformJob, job=job)
+        self._assigned += 1
         self._load += job.weight
 
         @d.addBoth
         def f(result):
+            self._assigned -= 1
             self._load -= job.weight
             self._completed += 1
             return result

Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/test/test_jobqueue.py	2014-05-16 01:44:14 UTC (rev 13482)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py	2014-05-16 01:50:56 UTC (rev 13483)
@@ -313,7 +313,7 @@
         table.
         """
         self.assertIdentical(
-            WorkItem.forTableName(schema.DUMMY_WORK_ITEM.model.name), DummyWorkItem
+            JobItem.workItemForType(schema.DUMMY_WORK_ITEM.model.name), DummyWorkItem
         )
 
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140515/cafe600e/attachment-0001.html>


More information about the calendarserver-changes mailing list