[CalendarServer-changes] [15118] twext/trunk/twext/enterprise/jobs
source_changes at macosforge.org
source_changes at macosforge.org
Thu Sep 10 12:48:57 PDT 2015
Revision: 15118
http://trac.calendarserver.org//changeset/15118
Author: cdaboo at apple.com
Date: 2015-09-10 12:48:57 -0700 (Thu, 10 Sep 2015)
Log Message:
-----------
Add ability to override default work item priority and weight values.
Modified Paths:
--------------
twext/trunk/twext/enterprise/jobs/jobitem.py
twext/trunk/twext/enterprise/jobs/test/test_jobs.py
twext/trunk/twext/enterprise/jobs/workitem.py
Modified: twext/trunk/twext/enterprise/jobs/jobitem.py
===================================================================
--- twext/trunk/twext/enterprise/jobs/jobitem.py 2015-09-10 16:15:53 UTC (rev 15117)
+++ twext/trunk/twext/enterprise/jobs/jobitem.py 2015-09-10 19:48:57 UTC (rev 15118)
@@ -594,6 +594,19 @@
@classmethod
+ def allWorkTypes(cls):
+ """
+ Map all L{WorkItem} sub-classes table names to the class type.
+
+ @return: All of the work item types.
+ @rtype: L{dict}
+ """
+ if cls._workTypeMap is None:
+ cls.workTypes()
+ return cls._workTypeMap
+
+
+ @classmethod
def numberOfWorkTypes(cls):
return len(cls.workTypes())
Modified: twext/trunk/twext/enterprise/jobs/test/test_jobs.py
===================================================================
--- twext/trunk/twext/enterprise/jobs/test/test_jobs.py 2015-09-10 16:15:53 UTC (rev 15117)
+++ twext/trunk/twext/enterprise/jobs/test/test_jobs.py 2015-09-10 19:48:57 UTC (rev 15118)
@@ -41,7 +41,8 @@
from twext.enterprise.jobs.utils import inTransaction, astimestamp
from twext.enterprise.jobs.workitem import \
WorkItem, SingletonWorkItem, \
- WORK_PRIORITY_LOW, WORK_PRIORITY_HIGH, WORK_PRIORITY_MEDIUM
+ WORK_PRIORITY_LOW, WORK_PRIORITY_HIGH, WORK_PRIORITY_MEDIUM, WORK_WEIGHT_5, \
+ WORK_WEIGHT_1, WORK_WEIGHT_10, WORK_WEIGHT_0
from twext.enterprise.jobs.jobitem import \
JobItem, JobDescriptor, JobFailedError, JobTemporaryError
from twext.enterprise.jobs.queue import \
@@ -187,18 +188,6 @@
SQL = passthru
-nodeSchema = SQL(
- """
- create table NODE_INFO (
- HOSTNAME varchar(255) not null,
- PID integer not null,
- PORT integer not null,
- TIME timestamp default current_timestamp not null,
- primary key (HOSTNAME, PORT)
- );
- """
-)
-
jobSchema = SQL(
"""
create table JOB (
@@ -241,6 +230,12 @@
A integer, B integer,
DELETE_ON_LOAD integer default 0
);
+ create table UPDATE_WORK_ITEM (
+ WORK_ID integer primary key,
+ JOB_ID integer references JOB,
+ A integer, B integer,
+ DELETE_ON_LOAD integer default 0
+ );
"""
)
@@ -253,7 +248,8 @@
"DUMMY_WORK_ITEM",
"DUMMY_WORK_SINGLETON_ITEM",
"DUMMY_WORK_PAUSE_ITEM",
- "AGGREGATOR_WORK_ITEM"
+ "AGGREGATOR_WORK_ITEM",
+ "UPDATE_WORK_ITEM",
)
] + ["delete from job"]
except SkipTest as e:
@@ -261,12 +257,14 @@
DummyWorkSingletonItemTable = object
DummyWorkPauseItemTable = object
AggregatorWorkItemTable = object
+ UpdateWorkItemTable = object
skip = e
else:
DummyWorkItemTable = fromTable(schema.DUMMY_WORK_ITEM)
DummyWorkSingletonItemTable = fromTable(schema.DUMMY_WORK_SINGLETON_ITEM)
DummyWorkPauseItemTable = fromTable(schema.DUMMY_WORK_PAUSE_ITEM)
AggregatorWorkItemTable = fromTable(schema.AGGREGATOR_WORK_ITEM)
+ UpdateWorkItemTable = fromTable(schema.UPDATE_WORK_ITEM)
skip = False
@@ -358,6 +356,17 @@
+class UpdateWorkItem(WorkItem, UpdateWorkItemTable):
+ """
+ Sample L{WorkItem} subclass that will have its weight and
+ priority changed.
+ """
+
+ default_priority = WORK_PRIORITY_MEDIUM
+ default_weight = WORK_WEIGHT_5
+
+
+
class AMPTests(TestCase):
"""
Tests for L{AMP} faithfully relaying ids across the wire.
@@ -442,7 +451,7 @@
"""
L{ControllerQueue.enqueueWork} will insert a job and a work item.
"""
- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ dbpool = buildConnectionPool(self, jobSchema + schemaText)
yield self._enqueue(dbpool, 1, 2)
# Make sure we have one JOB and one DUMMY_WORK_ITEM
@@ -469,7 +478,7 @@
"""
L{JobItem.assign} will mark a job as assigned.
"""
- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ dbpool = buildConnectionPool(self, jobSchema + schemaText)
yield self._enqueue(dbpool, 1, 2)
# Make sure we have one JOB and one DUMMY_WORK_ITEM
@@ -497,7 +506,7 @@
L{JobItem.nextjob} returns the correct job based on priority.
"""
- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ dbpool = buildConnectionPool(self, jobSchema + schemaText)
now = datetime.datetime.utcnow()
# Empty job queue
@@ -585,7 +594,7 @@
"""
L{ControllerQueue.enqueueWork} will insert a job and a work item.
"""
- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ dbpool = buildConnectionPool(self, jobSchema + schemaText)
yield self._enqueue(dbpool, 1, 2, cl=DummyWorkItem)
@@ -606,7 +615,7 @@
"""
L{ControllerQueue.enqueueWork} will insert a job and a work item.
"""
- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ dbpool = buildConnectionPool(self, jobSchema + schemaText)
yield self._enqueue(dbpool, 1, 2, cl=DummyWorkSingletonItem)
@@ -627,7 +636,7 @@
"""
L{ControllerQueue.enqueueWork} will insert a job and a work item.
"""
- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ dbpool = buildConnectionPool(self, jobSchema + schemaText)
qpool = yield self._enqueue(dbpool, 1, 2, cl=DummyWorkSingletonItem, notBefore=datetime.datetime(2014, 5, 17, 12, 0, 0))
@@ -657,7 +666,138 @@
self.assertTrue(work[0][1].notBefore != datetime.datetime(2014, 5, 17, 12, 0, 0))
+ def test_updateWorkTypes(self):
+ """
+ L{workItem.updateWorkTypes} updates weight and priority correctly.
+ """
+ _ignore_dbpool = buildConnectionPool(self, jobSchema + schemaText)
+ def _validate(priority, weight):
+ self.assertEqual(AggregatorWorkItem.default_priority, WORK_PRIORITY_LOW)
+ self.assertEqual(AggregatorWorkItem.default_weight, WORK_WEIGHT_5)
+ self.assertEqual(UpdateWorkItem.default_priority, priority)
+ self.assertEqual(UpdateWorkItem.default_weight, weight)
+
+ # Check current values
+ _validate(WORK_PRIORITY_MEDIUM, WORK_WEIGHT_5)
+
+ # Change priority and weight
+ WorkItem.updateWorkTypes({
+ "UPDATE_WORK_ITEM": {
+ "priority": WORK_PRIORITY_HIGH,
+ "weight": WORK_WEIGHT_1,
+ },
+ })
+ _validate(WORK_PRIORITY_HIGH, WORK_WEIGHT_1)
+
+ # Change priority only
+ WorkItem.updateWorkTypes({
+ "UPDATE_WORK_ITEM": {
+ "priority": WORK_PRIORITY_LOW,
+ },
+ })
+ _validate(WORK_PRIORITY_LOW, WORK_WEIGHT_1)
+
+ # Change weight only
+ WorkItem.updateWorkTypes({
+ "UPDATE_WORK_ITEM": {
+ "weight": WORK_WEIGHT_10,
+ },
+ })
+ _validate(WORK_PRIORITY_LOW, WORK_WEIGHT_10)
+
+ # No change
+ WorkItem.updateWorkTypes({
+ "UPDATE_WORK_ITEM": {
+ },
+ })
+ _validate(WORK_PRIORITY_LOW, WORK_WEIGHT_10)
+ WorkItem.updateWorkTypes({
+ })
+ _validate(WORK_PRIORITY_LOW, WORK_WEIGHT_10)
+
+ # Change priority and weight, with invalid work
+ WorkItem.updateWorkTypes({
+ "UPDATE_WORK_ITEM": {
+ "priority": WORK_PRIORITY_HIGH,
+ "weight": WORK_WEIGHT_1,
+ },
+ "FOO_WORK_ITEM": {
+ "priority": WORK_PRIORITY_HIGH,
+ "weight": WORK_WEIGHT_1,
+ },
+ })
+ _validate(WORK_PRIORITY_HIGH, WORK_WEIGHT_1)
+
+ # Invalid priority and weight
+ WorkItem.updateWorkTypes({
+ "UPDATE_WORK_ITEM": {
+ "priority": WORK_PRIORITY_HIGH + 1,
+ "weight": WORK_WEIGHT_10 + 11,
+ },
+ })
+ _validate(WORK_PRIORITY_HIGH, WORK_WEIGHT_1)
+
+ # Invalid priority and valid weight
+ WorkItem.updateWorkTypes({
+ "UPDATE_WORK_ITEM": {
+ "priority": WORK_PRIORITY_HIGH + 1,
+ "weight": WORK_WEIGHT_0,
+ },
+ })
+ _validate(WORK_PRIORITY_HIGH, WORK_WEIGHT_0)
+
+ # Valid priority and invalid weight
+ WorkItem.updateWorkTypes({
+ "UPDATE_WORK_ITEM": {
+ "priority": WORK_PRIORITY_LOW,
+ "weight": WORK_WEIGHT_0 - 1,
+ },
+ })
+ _validate(WORK_PRIORITY_LOW, WORK_WEIGHT_0)
+
+
+ def test_dumpWorkTypes(self):
+ """
+ L{workItem.dumpWorkTypes} dumps weight and priority correctly.
+ """
+ _ignore_dbpool = buildConnectionPool(self, jobSchema + schemaText)
+
+ results = WorkItem.dumpWorkTypes()
+ self.assertTrue("DUMMY_WORK_SINGLETON_ITEM" in results)
+ self.assertEqual(
+ results["DUMMY_WORK_SINGLETON_ITEM"],
+ {"priority": WORK_PRIORITY_LOW, "weight": WORK_WEIGHT_5},
+ )
+
+ # Change priority and weight
+ WorkItem.updateWorkTypes({
+ "UPDATE_WORK_ITEM": {
+ "priority": WORK_PRIORITY_HIGH,
+ "weight": WORK_WEIGHT_1,
+ },
+ })
+ results = WorkItem.dumpWorkTypes()
+ self.assertEqual(
+ results["UPDATE_WORK_ITEM"],
+ {"priority": WORK_PRIORITY_HIGH, "weight": WORK_WEIGHT_1},
+ )
+
+ # Change priority and weight again
+ WorkItem.updateWorkTypes({
+ "UPDATE_WORK_ITEM": {
+ "priority": WORK_PRIORITY_MEDIUM,
+ "weight": WORK_WEIGHT_10,
+ },
+ })
+ results = WorkItem.dumpWorkTypes()
+ self.assertEqual(
+ results["UPDATE_WORK_ITEM"],
+ {"priority": WORK_PRIORITY_MEDIUM, "weight": WORK_WEIGHT_10},
+ )
+
+
+
class WorkerConnectionPoolTests(TestCase):
"""
A L{WorkerConnectionPool} is responsible for managing, in a node's
@@ -694,7 +834,7 @@
Setup pool and reactor clock for time stepped tests.
"""
reactor = MemoryReactorWithClock()
- cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
+ cph = SteppablePoolHelper(jobSchema + schemaText)
then = datetime.datetime(2012, 12, 12, 12, 12, 12)
reactor.advance(astimestamp(then))
cph.setUp(self)
@@ -903,7 +1043,7 @@
L{ControllerQueue.startService} kicks off the idle work-check loop.
"""
reactor = MemoryReactorWithClock()
- cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
+ cph = SteppablePoolHelper(jobSchema + schemaText)
then = datetime.datetime(2012, 12, 12, 12, 12, 0)
reactor.advance(astimestamp(then))
cph.setUp(self)
Modified: twext/trunk/twext/enterprise/jobs/workitem.py
===================================================================
--- twext/trunk/twext/enterprise/jobs/workitem.py 2015-09-10 16:15:53 UTC (rev 15117)
+++ twext/trunk/twext/enterprise/jobs/workitem.py 2015-09-10 19:48:57 UTC (rev 15118)
@@ -194,6 +194,83 @@
returnValue(workItems)
+ @classmethod
+ def updateWorkTypes(cls, updates):
+ """
+ Update the priority and weight values of each specified work type.
+
+ @param updates: a dict whose workType is the work class name, and whose
+ settings is a dict containing one or both of "weight" and "priority"
+ keys and numeric values to change to.
+ @type updates: L{dict}
+ """
+
+ for workType, settings in updates.items():
+ try:
+ workItem = JobItem.workItemForType(workType)
+ except KeyError:
+ log.error(
+ "updateWorkTypes: '{workType}' is not a valid work type",
+ workType=workType,
+ )
+ continue
+ if "priority" in settings:
+ priority = settings["priority"]
+ try:
+ priority = int(priority)
+ if not (WORK_PRIORITY_LOW <= priority <= WORK_PRIORITY_HIGH):
+ raise ValueError
+ except ValueError:
+ log.error(
+ "updateWorkTypes: '{workType}' priority '{priority}' is not value",
+ workType=workType, priority=priority,
+ )
+ else:
+ workItem.default_priority = priority
+ else:
+ priority = "unchanged"
+ if "weight" in settings:
+ weight = settings["weight"]
+ try:
+ weight = int(weight)
+ if not (WORK_WEIGHT_0 <= weight <= WORK_WEIGHT_10):
+ raise ValueError
+ except ValueError:
+ log.error(
+ "updateWorkTypes: '{workType}' weight '{weight}' is not value",
+ workType=workType, weight=weight,
+ )
+ else:
+ workItem.default_weight = weight
+ else:
+ weight = "unchanged"
+ log.info(
+ "updateWorkTypes: '{workType}' priority: '{priority}' weight: '{weight}' ",
+ workType=workType, priority=priority,
+ )
+
+
+ @classmethod
+ def dumpWorkTypes(cls):
+ """
+ Dump the priority and weight values of each known work type.
+
+ @return: a dict whose workType is the work class name, and whose
+ settings is a dict containing one or both of "weight" and "priority"
+ keys and numeric values to change to.
+ @rtype: L{dict}
+ """
+
+ results = {}
+ for workType, workClass in JobItem.allWorkTypes().items():
+ results[workType] = {
+ "priority": workClass.default_priority,
+ "weight": workClass.default_weight,
+ }
+
+ return results
+
+
@inlineCallbacks
def runlock(self):
"""
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150910/a457d7f1/attachment-0001.html>
More information about the calendarserver-changes
mailing list