[CalendarServer-changes] [13483] twext/trunk/twext/enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Thu May 15 18:50:56 PDT 2014
Revision: 13483
http://trac.calendarserver.org//changeset/13483
Author: cdaboo at apple.com
Date: 2014-05-15 18:50:56 -0700 (Thu, 15 May 2014)
Log Message:
-----------
OPtimize work item lookup. Tweak logging. Add additional tracking for dashboard.
Modified Paths:
--------------
twext/trunk/twext/enterprise/jobqueue.py
twext/trunk/twext/enterprise/test/test_jobqueue.py
Modified: twext/trunk/twext/enterprise/jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/jobqueue.py 2014-05-16 01:44:14 UTC (rev 13482)
+++ twext/trunk/twext/enterprise/jobqueue.py 2014-05-16 01:50:56 UTC (rev 13483)
@@ -78,6 +78,34 @@
# queuer is a provider of IQueuer, of which there are several
# implementations in this module.
queuer.enqueueWork(txn, CouponWork, customerID=customerID)
+
+More details:
+
+ A master process in each node (host in a multi-host setup) has a:
+
+ L{WorkerConnectionPool}: this maintains a list of child processes that
+ have connected to the master over AMP. It is responsible for
+ dispatching work that is to be performed locally on that node.
+ The child process is identified by an L{ConnectionFromWorker}
+ object which maintains the child AMP connection. The
+ L{ConnectionFromWorker} tracks the load on its child so that
+ work can be distributed evenly or halted if the node is too busy.
+
+ L{PeerConnectionPool}: this is an AMP based service that connects a node
+ to all the other nodes in the cluster. It also runs the main job
+ queue loop to dispatch enqueued work when it becomes due. The master
+ maintains a list of other nodes via L{ConnectionFromPeerNode} objects,
+ which maintain the AMP connections. L{ConnectionFromPeerNode} can
+ report its load to others, and can receive work which it must perform
+ locally (via a dispatch to a child).
+
+ A child process has:
+
+ L{ConnectionFromController}: an AMP connection to the master. The master
+ will use this to dispatch work to the child. The child can also
+ use this to tell the master that work needs to be performed - that
+ can be used when the work needs to be distributed evenly to any
+ child.
"""
from functools import wraps
@@ -287,19 +315,13 @@
associated with work items.
"""
+ _workTypes = None
+ _workTypeMap = None
+
def descriptor(self):
return JobDescriptor(self.jobID, self.weight)
- @inlineCallbacks
- def workItem(self):
- workItemClass = WorkItem.forTableName(self.workType)
- workItems = yield workItemClass.loadForJob(
- self.transaction, self.jobID
- )
- returnValue(workItems[0] if len(workItems) == 1 else None)
-
-
def assign(self, now):
"""
Mark this job as assigned to a worker by setting the assigned column to the current,
@@ -350,13 +372,13 @@
def _overtm(nb):
return "{:.0f}".format(1000 * (t - astimestamp(nb)))
- log.debug("JobItem: starting to run {jobid}".format(jobid=jobID))
+ log.debug("JobItem: {jobid} starting to run".format(jobid=jobID))
txn = txnFactory(label="ultimatelyPerform: {}".format(jobID))
try:
job = yield cls.load(txn, jobID)
if hasattr(txn, "_label"):
txn._label = "{} <{}>".format(txn._label, job.workType)
- log.debug("JobItem: loaded {jobid} {work} t={tm}".format(
+ log.debug("JobItem: {jobid} loaded {work} t={tm}".format(
jobid=jobID,
work=job.workType,
tm=_tm())
@@ -366,7 +388,7 @@
except NoSuchRecord:
# The record has already been removed
yield txn.commit()
- log.debug("JobItem: already removed {jobid} t={tm}".format(jobid=jobID, tm=_tm()))
+ log.debug("JobItem: {jobid} already removed t={tm}".format(jobid=jobID, tm=_tm()))
except JobFailedError:
# Job failed: abort with cleanup, but pretend this method succeeded
@@ -374,12 +396,12 @@
@inlineCallbacks
def _cleanUp2(txn2):
job = yield cls.load(txn2, jobID)
- log.debug("JobItem: marking as failed {jobid}, failure count: {count} t={tm}".format(jobid=jobID, count=job.failed + 1, tm=_tm()))
+ log.debug("JobItem: {jobid} marking as failed {count} t={tm}".format(jobid=jobID, count=job.failed + 1, tm=_tm()))
yield job.failedToRun()
return inTransaction(txnFactory, _cleanUp2, "ultimatelyPerform._cleanUp")
txn.postAbort(_cleanUp)
yield txn.abort()
- log.debug("JobItem: failed {jobid} {work} t={tm}".format(
+ log.debug("JobItem: {jobid} failed {work} t={tm}".format(
jobid=jobID,
work=job.workType,
tm=_tm()
@@ -387,7 +409,7 @@
except:
f = Failure()
- log.error("JobItem: Unknown exception for {jobid} failed t={tm} {exc}".format(
+ log.error("JobItem: {jobid} unknown exception t={tm} {exc}".format(
jobid=jobID,
tm=_tm(),
exc=f,
@@ -397,7 +419,7 @@
else:
yield txn.commit()
- log.debug("JobItem: completed {jobid} {work} t={tm} over={over}".format(
+ log.debug("JobItem: {jobid} completed {work} t={tm} over={over}".format(
jobid=jobID,
work=job.workType,
tm=_tm(),
@@ -482,10 +504,24 @@
Run this job item by finding the appropriate work item class and
running that.
"""
+
+ # TODO: Move group into the JOB table.
+ # Do a select * where group = X for update nowait to lock
+ # all rows in the group - on exception raise JobFailed
+ # with a rollback to allow the job to be re-assigned to a later
+ # date.
workItem = yield self.workItem()
if workItem is not None:
if workItem.group is not None:
- yield NamedLock.acquire(self.transaction, workItem.group)
+ try:
+ yield NamedLock.acquire(self.transaction, workItem.group)
+ except Exception as e:
+ log.error("JobItem: {jobid}, WorkItem: {workid} lock failed: {exc}".format(
+ jobid=self.jobID,
+ workid=workItem.workID,
+ exc=e,
+ ))
+ raise JobFailedError(e)
try:
# Once the work is done we delete ourselves
@@ -512,16 +548,51 @@
pass
+ @inlineCallbacks
+ def workItem(self):
+ workItemClass = self.workItemForType(self.workType)
+ workItems = yield workItemClass.loadForJob(
+ self.transaction, self.jobID
+ )
+ returnValue(workItems[0] if len(workItems) == 1 else None)
+
+
@classmethod
+ def workItemForType(cls, workType):
+ if cls._workTypeMap is None:
+ cls.workTypes()
+ return cls._workTypeMap[workType]
+
+
+ @classmethod
def workTypes(cls):
"""
@return: All of the work item types.
@rtype: iterable of L{WorkItem} subclasses
"""
- return WorkItem.__subclasses__()
+ if cls._workTypes is None:
+ cls._workTypes = []
+ def getWorkType(subcls, appendTo):
+ if hasattr(subcls, "table"):
+ appendTo.append(subcls)
+ else:
+ for subsubcls in subcls.__subclasses__():
+ getWorkType(subsubcls, appendTo)
+ getWorkType(WorkItem, cls._workTypes)
+ cls._workTypeMap = {}
+ for subcls in cls._workTypes:
+ cls._workTypeMap[subcls.table.model.name] = subcls
+ return cls._workTypes
+
+
@classmethod
+ def numberOfWorkTypes(cls):
+ return len(cls.workTypes())
+
+
+ @classmethod
@inlineCallbacks
def waitEmpty(cls, txnCreator, reactor, timeout):
"""
@@ -568,11 +639,6 @@
returnValue(results)
- @classmethod
- def numberOfWorkTypes(cls):
- return len(cls.workTypes())
-
-
JobDescriptor = namedtuple("JobDescriptor", ["jobID", "weight"])
class JobDescriptorArg(Argument):
@@ -703,8 +769,8 @@
group = None
default_priority = WORK_PRIORITY_LOW # Default - subclasses should override
default_weight = WORK_WEIGHT_5 # Default - subclasses should override
+ _tableNameMap = {}
-
@classmethod
@inlineCallbacks
def makeJob(cls, transaction, **kwargs):
@@ -763,30 +829,7 @@
raise NotImplementedError
- @classmethod
- def forTableName(cls, tableName):
- """
- Look up a work-item class given a particular table name. Factoring
- this correctly may place it into L{twext.enterprise.record.Record}
- instead; it is probably generally useful to be able to look up a mapped
- class from a table.
- @param tableName: the name of the table to look up
- @type tableName: L{str}
-
- @return: the relevant subclass
- @rtype: L{type}
- """
- for subcls in cls.__subclasses__():
- clstable = getattr(subcls, "table", None)
- if tableName == clstable.model.name:
- return subcls
- raise KeyError("No mapped {0} class for {1}.".format(
- cls, tableName
- ))
-
-
-
class PerformJob(Command):
"""
Notify another process that it must do a job that has been persisted to
@@ -1024,7 +1067,7 @@
"""
The load of all currently connected workers.
"""
- return [(worker.currentLoad, worker.totalCompleted) for worker in self.workers]
+ return [(worker.currentAssigned, worker.currentLoad, worker.totalCompleted) for worker in self.workers]
def allWorkerLoad(self):
@@ -1072,11 +1115,20 @@
def __init__(self, peerPool, boxReceiver=None, locator=None):
super(ConnectionFromWorker, self).__init__(boxReceiver, locator)
self.peerPool = peerPool
+ self._assigned = 0
self._load = 0
self._completed = 0
@property
+ def currentAssigned(self):
+ """
+ How many jobs currently assigned to this worker?
+ """
+ return self._assigned
+
+
+ @property
def currentLoad(self):
"""
What is the current load of this worker?
@@ -1120,10 +1172,12 @@
L{ConnectionFromController.actuallyReallyExecuteJobHere}.
"""
d = self.callRemote(PerformJob, job=job)
+ self._assigned += 1
self._load += job.weight
@d.addBoth
def f(result):
+ self._assigned -= 1
self._load -= job.weight
self._completed += 1
return result
Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/test/test_jobqueue.py 2014-05-16 01:44:14 UTC (rev 13482)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py 2014-05-16 01:50:56 UTC (rev 13483)
@@ -313,7 +313,7 @@
table.
"""
self.assertIdentical(
- WorkItem.forTableName(schema.DUMMY_WORK_ITEM.model.name), DummyWorkItem
+ JobItem.workItemForType(schema.DUMMY_WORK_ITEM.model.name), DummyWorkItem
)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140515/cafe600e/attachment-0001.html>
More information about the calendarserver-changes
mailing list