[CalendarServer-changes] [12808] twext/trunk/twext/enterprise/jobqueue.py
source_changes at macosforge.org
source_changes at macosforge.org
Tue Mar 4 20:39:53 PST 2014
Revision: 12808
http://trac.calendarserver.org//changeset/12808
Author: wsanchez at apple.com
Date: 2014-03-04 20:39:53 -0800 (Tue, 04 Mar 2014)
Log Message:
-----------
Add JobItem.workTypes()
Modified Paths:
--------------
twext/trunk/twext/enterprise/jobqueue.py
Modified: twext/trunk/twext/enterprise/jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/jobqueue.py 2014-03-05 04:38:34 UTC (rev 12807)
+++ twext/trunk/twext/enterprise/jobqueue.py 2014-03-05 04:39:53 UTC (rev 12808)
@@ -95,8 +95,9 @@
from twisted.python.reflect import qual
from twisted.python import log
-from twext.enterprise.dal.syntax import SchemaSyntax, Lock, NamedValue, Select, \
- Count
+from twext.enterprise.dal.syntax import (
+ SchemaSyntax, Lock, NamedValue, Select, Count
+)
from twext.enterprise.dal.model import ProcedureCall
from twext.enterprise.dal.record import Record, fromTable, NoSuchRecord
@@ -117,7 +118,7 @@
(in the worst case) pass from worker->controller->controller->worker.
"""
- def performJob(jobID): #@NoSelf
+ def performJob(jobID): # @NoSelf
"""
@param jobID: The primary key identifier of the given job.
@type jobID: L{int}
@@ -269,22 +270,25 @@
class JobItem(Record, fromTable(JobInfoSchema.JOB)):
"""
- An item in the job table. This is typically not directly used by code creating
- work items, but rather is used for internal book keeping of jobs associated with
- work items.
+ An item in the job table. This is typically not directly used by code
+ creating work items, but rather is used for internal book keeping of jobs
+ associated with work items.
"""
@inlineCallbacks
def getWorkForJob(self):
workItemClass = WorkItem.forTableName(self.workType)
- workItems = yield workItemClass.loadForJob(self.transaction, self.jobID)
+ workItems = yield workItemClass.loadForJob(
+ self.transaction, self.jobID
+ )
returnValue(workItems[0] if len(workItems) == 1 else None)
@inlineCallbacks
def run(self):
"""
- Run this job item by finding the appropriate work item class and running that.
+ Run this job item by finding the appropriate work item class and
+ running that.
"""
workItem = yield self.getWorkForJob()
if workItem is not None:
@@ -309,6 +313,15 @@
@classmethod
+ def workTypes(cls):
+ """
+ @return: All of the work item types.
+ @rtype: iterable of L{WorkItem} subclasses
+ """
+ return WorkItem.__subclasses__()
+
+
+ @classmethod
@inlineCallbacks
def histogram(cls, txn):
"""
@@ -323,24 +336,25 @@
results = dict(rows)
# Add in empty data for other work
- allwork = WorkItem.__subclasses__()
- for workitem in allwork:
- if workitem.table.model.name not in results:
- results[workitem.table.model.name] = 0
+ for workType in cls.workTypes():
+ results.setdefault(workType.table.model.name, 0)
+
returnValue(results)
@classmethod
def numberOfWorkTypes(cls):
- return len(WorkItem.__subclasses__())
+ return len(cls.workTypes())
+
# Priority for work - used to order work items in the job queue
WORK_PRIORITY_LOW = 1
WORK_PRIORITY_MEDIUM = 2
WORK_PRIORITY_HIGH = 3
+
class WorkItem(Record):
"""
A L{WorkItem} is an item of work which may be stored in a database, then
@@ -441,8 +455,8 @@
@inlineCallbacks
def makeJob(cls, transaction, **kwargs):
"""
- A new work item needs to be created. First we create a Job record, then we create
- the actual work item related to the job.
+ A new work item needs to be created. First we create a Job record, then
+ we create the actual work item related to the job.
@param transaction: the transaction to use
@type transaction: L{IAsyncTransaction}
@@ -451,6 +465,7 @@
jobargs = {
"workType": cls.table.model.name
}
+
def _transferArg(name):
if name in kwargs:
jobargs[name] = kwargs[name]
@@ -978,7 +993,9 @@
Create a L{ConnectionFromController} connected to the
transactionFactory and store.
"""
- return ConnectionFromController(self.transactionFactory, self.whenConnected)
+ return ConnectionFromController(
+ self.transactionFactory, self.whenConnected
+ )
@@ -1066,7 +1083,14 @@
self._whenExecuted.errback(why)
reactor = self._chooser.reactor
- when = max(0, astimestamp(created.job.notBefore) - reactor.seconds()) if created.job.notBefore is not None else 0
+
+ if created.job.notBefore is not None:
+ when = max(
+ 0,
+ astimestamp(created.job.notBefore) - reactor.seconds()
+ )
+ else:
+ when = 0
# TODO: Track the returned DelayedCall so it can be stopped
# when the service stops.
self._chooser.reactor.callLater(when, maybeLater)
@@ -1351,8 +1375,8 @@
(self.thisProcess.hostname, self.thisProcess.port)
)
- # TODO: here is where we should iterate over the unlocked items that
- # are due, ordered by priority, notBefore etc
+ # TODO: here is where we should iterate over the unlocked items
+ # that are due, ordered by priority, notBefore etc
tooLate = datetime.utcfromtimestamp(
self.reactor.seconds() - self.queueProcessTimeout
)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140304/da3a7589/attachment-0001.html>
More information about the calendarserver-changes
mailing list