<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>[15118] twext/trunk/twext/enterprise/jobs</title>
</head>
<body>
<style type="text/css"><!--
#msg dl.meta { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; }
#msg dl.meta dt { float: left; width: 6em; font-weight: bold; }
#msg dt:after { content:':';}
#msg dl, #msg dt, #msg ul, #msg li, #header, #footer, #logmsg { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; }
#msg dl a { font-weight: bold}
#msg dl a:link { color:#fc3; }
#msg dl a:active { color:#ff0; }
#msg dl a:visited { color:#cc6; }
h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: bold; }
#msg pre { overflow: auto; background: #ffc; border: 1px #fa0 solid; padding: 6px; }
#logmsg { background: #ffc; border: 1px #fa0 solid; padding: 1em 1em 0 1em; }
#logmsg p, #logmsg pre, #logmsg blockquote { margin: 0 0 1em 0; }
#logmsg p, #logmsg li, #logmsg dt, #logmsg dd { line-height: 14pt; }
#logmsg h1, #logmsg h2, #logmsg h3, #logmsg h4, #logmsg h5, #logmsg h6 { margin: .5em 0; }
#logmsg h1:first-child, #logmsg h2:first-child, #logmsg h3:first-child, #logmsg h4:first-child, #logmsg h5:first-child, #logmsg h6:first-child { margin-top: 0; }
#logmsg ul, #logmsg ol { padding: 0; list-style-position: inside; margin: 0 0 0 1em; }
#logmsg ul { text-indent: -1em; padding-left: 1em; }#logmsg ol { text-indent: -1.5em; padding-left: 1.5em; }
#logmsg > ul, #logmsg > ol { margin: 0 0 1em 0; }
#logmsg pre { background: #eee; padding: 1em; }
#logmsg blockquote { border: 1px solid #fa0; border-left-width: 10px; padding: 1em 1em 0 1em; background: white;}
#logmsg dl { margin: 0; }
#logmsg dt { font-weight: bold; }
#logmsg dd { margin: 0; padding: 0 0 0.5em 0; }
#logmsg dd:before { content:'\00bb';}
#logmsg table { border-spacing: 0px; border-collapse: collapse; border-top: 4px solid #fa0; border-bottom: 1px solid #fa0; background: #fff; }
#logmsg table th { text-align: left; font-weight: normal; padding: 0.2em 0.5em; border-top: 1px dotted #fa0; }
#logmsg table td { text-align: right; border-top: 1px dotted #fa0; padding: 0.2em 0.5em; }
#logmsg table thead th { text-align: center; border-bottom: 1px solid #fa0; }
#logmsg table th.Corner { text-align: left; }
#logmsg hr { border: none 0; border-top: 2px dashed #fa0; height: 1px; }
#header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: 6px; }
#patch { width: 100%; }
#patch h4 {font-family: verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;}
#patch .propset h4, #patch .binary h4 {margin:0;}
#patch pre {padding:0;line-height:1.2em;margin:0;}
#patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;}
#patch .propset .diff, #patch .binary .diff {padding:10px 0;}
#patch span {display:block;padding:0 10px;}
#patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, #patch .copfile {border:1px solid #ccc;margin:10px 0;}
#patch ins {background:#dfd;text-decoration:none;display:block;padding:0 10px;}
#patch del {background:#fdd;text-decoration:none;display:block;padding:0 10px;}
#patch .lines, .info {color:#888;background:#fff;}
--></style>
<div id="msg">
<dl class="meta">
<dt>Revision</dt> <dd><a href="http://trac.calendarserver.org//changeset/15118">15118</a></dd>
<dt>Author</dt> <dd>cdaboo@apple.com</dd>
<dt>Date</dt> <dd>2015-09-10 12:48:57 -0700 (Thu, 10 Sep 2015)</dd>
</dl>
<h3>Log Message</h3>
<pre>Add ability to override default work item priority and weight values.</pre>
<h3>Modified Paths</h3>
<ul>
<li><a href="#twexttrunktwextenterprisejobsjobitempy">twext/trunk/twext/enterprise/jobs/jobitem.py</a></li>
<li><a href="#twexttrunktwextenterprisejobstesttest_jobspy">twext/trunk/twext/enterprise/jobs/test/test_jobs.py</a></li>
<li><a href="#twexttrunktwextenterprisejobsworkitempy">twext/trunk/twext/enterprise/jobs/workitem.py</a></li>
</ul>
</div>
<div id="patch">
<h3>Diff</h3>
<a id="twexttrunktwextenterprisejobsjobitempy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobs/jobitem.py (15117 => 15118)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobs/jobitem.py        2015-09-10 16:15:53 UTC (rev 15117)
+++ twext/trunk/twext/enterprise/jobs/jobitem.py        2015-09-10 19:48:57 UTC (rev 15118)
</span><span class="lines">@@ -594,6 +594,19 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx"> @classmethod
</span><ins>+ def allWorkTypes(cls):
+ """
+ Map all L{WorkItem} sub-classes table names to the class type.
+
+ @return: All of the work item types.
+ @rtype: L{dict}
+ """
+ if cls._workTypeMap is None:
+ cls.workTypes()
+ return cls._workTypeMap
+
+
+ @classmethod
</ins><span class="cx"> def numberOfWorkTypes(cls):
</span><span class="cx"> return len(cls.workTypes())
</span><span class="cx">
</span></span></pre></div>
<a id="twexttrunktwextenterprisejobstesttest_jobspy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobs/test/test_jobs.py (15117 => 15118)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobs/test/test_jobs.py        2015-09-10 16:15:53 UTC (rev 15117)
+++ twext/trunk/twext/enterprise/jobs/test/test_jobs.py        2015-09-10 19:48:57 UTC (rev 15118)
</span><span class="lines">@@ -41,7 +41,8 @@
</span><span class="cx"> from twext.enterprise.jobs.utils import inTransaction, astimestamp
</span><span class="cx"> from twext.enterprise.jobs.workitem import \
</span><span class="cx"> WorkItem, SingletonWorkItem, \
</span><del>- WORK_PRIORITY_LOW, WORK_PRIORITY_HIGH, WORK_PRIORITY_MEDIUM
</del><ins>+ WORK_PRIORITY_LOW, WORK_PRIORITY_HIGH, WORK_PRIORITY_MEDIUM, WORK_WEIGHT_5, \
+ WORK_WEIGHT_1, WORK_WEIGHT_10, WORK_WEIGHT_0
</ins><span class="cx"> from twext.enterprise.jobs.jobitem import \
</span><span class="cx"> JobItem, JobDescriptor, JobFailedError, JobTemporaryError
</span><span class="cx"> from twext.enterprise.jobs.queue import \
</span><span class="lines">@@ -187,18 +188,6 @@
</span><span class="cx">
</span><span class="cx"> SQL = passthru
</span><span class="cx">
</span><del>-nodeSchema = SQL(
- """
- create table NODE_INFO (
- HOSTNAME varchar(255) not null,
- PID integer not null,
- PORT integer not null,
- TIME timestamp default current_timestamp not null,
- primary key (HOSTNAME, PORT)
- );
- """
-)
-
</del><span class="cx"> jobSchema = SQL(
</span><span class="cx"> """
</span><span class="cx"> create table JOB (
</span><span class="lines">@@ -241,6 +230,12 @@
</span><span class="cx"> A integer, B integer,
</span><span class="cx"> DELETE_ON_LOAD integer default 0
</span><span class="cx"> );
</span><ins>+ create table UPDATE_WORK_ITEM (
+ WORK_ID integer primary key,
+ JOB_ID integer references JOB,
+ A integer, B integer,
+ DELETE_ON_LOAD integer default 0
+ );
</ins><span class="cx"> """
</span><span class="cx"> )
</span><span class="cx">
</span><span class="lines">@@ -253,7 +248,8 @@
</span><span class="cx"> "DUMMY_WORK_ITEM",
</span><span class="cx"> "DUMMY_WORK_SINGLETON_ITEM",
</span><span class="cx"> "DUMMY_WORK_PAUSE_ITEM",
</span><del>- "AGGREGATOR_WORK_ITEM"
</del><ins>+ "AGGREGATOR_WORK_ITEM",
+ "UPDATE_WORK_ITEM",
</ins><span class="cx"> )
</span><span class="cx"> ] + ["delete from job"]
</span><span class="cx"> except SkipTest as e:
</span><span class="lines">@@ -261,12 +257,14 @@
</span><span class="cx"> DummyWorkSingletonItemTable = object
</span><span class="cx"> DummyWorkPauseItemTable = object
</span><span class="cx"> AggregatorWorkItemTable = object
</span><ins>+ UpdateWorkItemTable = object
</ins><span class="cx"> skip = e
</span><span class="cx"> else:
</span><span class="cx"> DummyWorkItemTable = fromTable(schema.DUMMY_WORK_ITEM)
</span><span class="cx"> DummyWorkSingletonItemTable = fromTable(schema.DUMMY_WORK_SINGLETON_ITEM)
</span><span class="cx"> DummyWorkPauseItemTable = fromTable(schema.DUMMY_WORK_PAUSE_ITEM)
</span><span class="cx"> AggregatorWorkItemTable = fromTable(schema.AGGREGATOR_WORK_ITEM)
</span><ins>+ UpdateWorkItemTable = fromTable(schema.UPDATE_WORK_ITEM)
</ins><span class="cx"> skip = False
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -358,6 +356,17 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx">
</span><ins>+class UpdateWorkItem(WorkItem, UpdateWorkItemTable):
+ """
+ Sample L{WorkItem} subclass that will have its weight and
+ priority changed.
+ """
+
+ default_priority = WORK_PRIORITY_MEDIUM
+ default_weight = WORK_WEIGHT_5
+
+
+
</ins><span class="cx"> class AMPTests(TestCase):
</span><span class="cx"> """
</span><span class="cx"> Tests for L{AMP} faithfully relaying ids across the wire.
</span><span class="lines">@@ -442,7 +451,7 @@
</span><span class="cx"> """
</span><span class="cx"> L{ControllerQueue.enqueueWork} will insert a job and a work item.
</span><span class="cx"> """
</span><del>- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
</del><ins>+ dbpool = buildConnectionPool(self, jobSchema + schemaText)
</ins><span class="cx"> yield self._enqueue(dbpool, 1, 2)
</span><span class="cx">
</span><span class="cx"> # Make sure we have one JOB and one DUMMY_WORK_ITEM
</span><span class="lines">@@ -469,7 +478,7 @@
</span><span class="cx"> """
</span><span class="cx"> L{JobItem.assign} will mark a job as assigned.
</span><span class="cx"> """
</span><del>- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
</del><ins>+ dbpool = buildConnectionPool(self, jobSchema + schemaText)
</ins><span class="cx"> yield self._enqueue(dbpool, 1, 2)
</span><span class="cx">
</span><span class="cx"> # Make sure we have one JOB and one DUMMY_WORK_ITEM
</span><span class="lines">@@ -497,7 +506,7 @@
</span><span class="cx"> L{JobItem.nextjob} returns the correct job based on priority.
</span><span class="cx"> """
</span><span class="cx">
</span><del>- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
</del><ins>+ dbpool = buildConnectionPool(self, jobSchema + schemaText)
</ins><span class="cx"> now = datetime.datetime.utcnow()
</span><span class="cx">
</span><span class="cx"> # Empty job queue
</span><span class="lines">@@ -585,7 +594,7 @@
</span><span class="cx"> """
</span><span class="cx"> L{ControllerQueue.enqueueWork} will insert a job and a work item.
</span><span class="cx"> """
</span><del>- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
</del><ins>+ dbpool = buildConnectionPool(self, jobSchema + schemaText)
</ins><span class="cx">
</span><span class="cx"> yield self._enqueue(dbpool, 1, 2, cl=DummyWorkItem)
</span><span class="cx">
</span><span class="lines">@@ -606,7 +615,7 @@
</span><span class="cx"> """
</span><span class="cx"> L{ControllerQueue.enqueueWork} will insert a job and a work item.
</span><span class="cx"> """
</span><del>- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
</del><ins>+ dbpool = buildConnectionPool(self, jobSchema + schemaText)
</ins><span class="cx">
</span><span class="cx"> yield self._enqueue(dbpool, 1, 2, cl=DummyWorkSingletonItem)
</span><span class="cx">
</span><span class="lines">@@ -627,7 +636,7 @@
</span><span class="cx"> """
</span><span class="cx"> L{ControllerQueue.enqueueWork} will insert a job and a work item.
</span><span class="cx"> """
</span><del>- dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
</del><ins>+ dbpool = buildConnectionPool(self, jobSchema + schemaText)
</ins><span class="cx">
</span><span class="cx"> qpool = yield self._enqueue(dbpool, 1, 2, cl=DummyWorkSingletonItem, notBefore=datetime.datetime(2014, 5, 17, 12, 0, 0))
</span><span class="cx">
</span><span class="lines">@@ -657,7 +666,138 @@
</span><span class="cx"> self.assertTrue(work[0][1].notBefore != datetime.datetime(2014, 5, 17, 12, 0, 0))
</span><span class="cx">
</span><span class="cx">
</span><ins>+ def test_updateWorkTypes(self):
+ """
+ L{workItem.updateWorkTypes} updates weight and priority correctly.
+ """
+ _ignore_dbpool = buildConnectionPool(self, jobSchema + schemaText)
</ins><span class="cx">
</span><ins>+ def _validate(priority, weight):
+ self.assertEqual(AggregatorWorkItem.default_priority, WORK_PRIORITY_LOW)
+ self.assertEqual(AggregatorWorkItem.default_weight, WORK_WEIGHT_5)
+ self.assertEqual(UpdateWorkItem.default_priority, priority)
+ self.assertEqual(UpdateWorkItem.default_weight, weight)
+
+ # Check current values
+ _validate(WORK_PRIORITY_MEDIUM, WORK_WEIGHT_5)
+
+ # Change priority and weight
+ WorkItem.updateWorkTypes({
+ "UPDATE_WORK_ITEM": {
+ "priority": WORK_PRIORITY_HIGH,
+ "weight": WORK_WEIGHT_1,
+ },
+ })
+ _validate(WORK_PRIORITY_HIGH, WORK_WEIGHT_1)
+
+ # Change priority only
+ WorkItem.updateWorkTypes({
+ "UPDATE_WORK_ITEM": {
+ "priority": WORK_PRIORITY_LOW,
+ },
+ })
+ _validate(WORK_PRIORITY_LOW, WORK_WEIGHT_1)
+
+ # Change weight only
+ WorkItem.updateWorkTypes({
+ "UPDATE_WORK_ITEM": {
+ "weight": WORK_WEIGHT_10,
+ },
+ })
+ _validate(WORK_PRIORITY_LOW, WORK_WEIGHT_10)
+
+ # No change
+ WorkItem.updateWorkTypes({
+ "UPDATE_WORK_ITEM": {
+ },
+ })
+ _validate(WORK_PRIORITY_LOW, WORK_WEIGHT_10)
+ WorkItem.updateWorkTypes({
+ })
+ _validate(WORK_PRIORITY_LOW, WORK_WEIGHT_10)
+
+ # Change priority and weight, with invalid work
+ WorkItem.updateWorkTypes({
+ "UPDATE_WORK_ITEM": {
+ "priority": WORK_PRIORITY_HIGH,
+ "weight": WORK_WEIGHT_1,
+ },
+ "FOO_WORK_ITEM": {
+ "priority": WORK_PRIORITY_HIGH,
+ "weight": WORK_WEIGHT_1,
+ },
+ })
+ _validate(WORK_PRIORITY_HIGH, WORK_WEIGHT_1)
+
+ # Invalid priority and weight
+ WorkItem.updateWorkTypes({
+ "UPDATE_WORK_ITEM": {
+ "priority": WORK_PRIORITY_HIGH + 1,
+ "weight": WORK_WEIGHT_10 + 11,
+ },
+ })
+ _validate(WORK_PRIORITY_HIGH, WORK_WEIGHT_1)
+
+ # Invalid priority and valid weight
+ WorkItem.updateWorkTypes({
+ "UPDATE_WORK_ITEM": {
+ "priority": WORK_PRIORITY_HIGH + 1,
+ "weight": WORK_WEIGHT_0,
+ },
+ })
+ _validate(WORK_PRIORITY_HIGH, WORK_WEIGHT_0)
+
+ # Valid priority and invalid weight
+ WorkItem.updateWorkTypes({
+ "UPDATE_WORK_ITEM": {
+ "priority": WORK_PRIORITY_LOW,
+ "weight": WORK_WEIGHT_0 - 1,
+ },
+ })
+ _validate(WORK_PRIORITY_LOW, WORK_WEIGHT_0)
+
+
+ def test_dumpWorkTypes(self):
+ """
+ L{workItem.dumpWorkTypes} dumps weight and priority correctly.
+ """
+ _ignore_dbpool = buildConnectionPool(self, jobSchema + schemaText)
+
+ results = WorkItem.dumpWorkTypes()
+ self.assertTrue("DUMMY_WORK_SINGLETON_ITEM" in results)
+ self.assertEqual(
+ results["DUMMY_WORK_SINGLETON_ITEM"],
+ {"priority": WORK_PRIORITY_LOW, "weight": WORK_WEIGHT_5},
+ )
+
+ # Change priority and weight
+ WorkItem.updateWorkTypes({
+ "UPDATE_WORK_ITEM": {
+ "priority": WORK_PRIORITY_HIGH,
+ "weight": WORK_WEIGHT_1,
+ },
+ })
+ results = WorkItem.dumpWorkTypes()
+ self.assertEqual(
+ results["UPDATE_WORK_ITEM"],
+ {"priority": WORK_PRIORITY_HIGH, "weight": WORK_WEIGHT_1},
+ )
+
+ # Change priority and weight again
+ WorkItem.updateWorkTypes({
+ "UPDATE_WORK_ITEM": {
+ "priority": WORK_PRIORITY_MEDIUM,
+ "weight": WORK_WEIGHT_10,
+ },
+ })
+ results = WorkItem.dumpWorkTypes()
+ self.assertEqual(
+ results["UPDATE_WORK_ITEM"],
+ {"priority": WORK_PRIORITY_MEDIUM, "weight": WORK_WEIGHT_10},
+ )
+
+
+
</ins><span class="cx"> class WorkerConnectionPoolTests(TestCase):
</span><span class="cx"> """
</span><span class="cx"> A L{WorkerConnectionPool} is responsible for managing, in a node's
</span><span class="lines">@@ -694,7 +834,7 @@
</span><span class="cx"> Setup pool and reactor clock for time stepped tests.
</span><span class="cx"> """
</span><span class="cx"> reactor = MemoryReactorWithClock()
</span><del>- cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
</del><ins>+ cph = SteppablePoolHelper(jobSchema + schemaText)
</ins><span class="cx"> then = datetime.datetime(2012, 12, 12, 12, 12, 12)
</span><span class="cx"> reactor.advance(astimestamp(then))
</span><span class="cx"> cph.setUp(self)
</span><span class="lines">@@ -903,7 +1043,7 @@
</span><span class="cx"> L{ControllerQueue.startService} kicks off the idle work-check loop.
</span><span class="cx"> """
</span><span class="cx"> reactor = MemoryReactorWithClock()
</span><del>- cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
</del><ins>+ cph = SteppablePoolHelper(jobSchema + schemaText)
</ins><span class="cx"> then = datetime.datetime(2012, 12, 12, 12, 12, 0)
</span><span class="cx"> reactor.advance(astimestamp(then))
</span><span class="cx"> cph.setUp(self)
</span></span></pre></div>
<a id="twexttrunktwextenterprisejobsworkitempy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobs/workitem.py (15117 => 15118)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobs/workitem.py        2015-09-10 16:15:53 UTC (rev 15117)
+++ twext/trunk/twext/enterprise/jobs/workitem.py        2015-09-10 19:48:57 UTC (rev 15118)
</span><span class="lines">@@ -194,6 +194,83 @@
</span><span class="cx"> returnValue(workItems)
</span><span class="cx">
</span><span class="cx">
</span><ins>+ @classmethod
+ def updateWorkTypes(cls, updates):
+ """
+ Update the priority and weight values of each specified work type.
+
+ @param updates: a dict whose workType is the work class name, and whose
+ settings is a dict containing one or both of "weight" and "priority"
+ keys and numeric values to change to.
+ @type updates: L{dict}
+ """
+
+ for workType, settings in updates.items():
+ try:
+ workItem = JobItem.workItemForType(workType)
+ except KeyError:
+ log.error(
+ "updateWorkTypes: '{workType}' is not a valid work type",
+ workType=workType,
+ )
+ continue
+ if "priority" in settings:
+ priority = settings["priority"]
+ try:
+ priority = int(priority)
+ if not (WORK_PRIORITY_LOW <= priority <= WORK_PRIORITY_HIGH):
+ raise ValueError
+ except ValueError:
+ log.error(
+ "updateWorkTypes: '{workType}' priority '{priority}' is not value",
+ workType=workType, priority=priority,
+ )
+ else:
+ workItem.default_priority = priority
+ else:
+ priority = "unchanged"
+ if "weight" in settings:
+ weight = settings["weight"]
+ try:
+ weight = int(weight)
+ if not (WORK_WEIGHT_0 <= weight <= WORK_WEIGHT_10):
+ raise ValueError
+ except ValueError:
+ log.error(
+ "updateWorkTypes: '{workType}' weight '{weight}' is not value",
+ workType=workType, weight=weight,
+ )
+ else:
+ workItem.default_weight = weight
+ else:
+ weight = "unchanged"
+ log.info(
+ "updateWorkTypes: '{workType}' priority: '{priority}' weight: '{weight}' ",
+ workType=workType, priority=priority,
+ )
+
+
+ @classmethod
+ def dumpWorkTypes(cls):
+ """
+ Dump the priority and weight values of each known work type.
+
+ @return: a dict whose workType is the work class name, and whose
+ settings is a dict containing one or both of "weight" and "priority"
+ keys and numeric values to change to.
+ @rtype: L{dict}
+ """
+
+ results = {}
+ for workType, workClass in JobItem.allWorkTypes().items():
+ results[workType] = {
+ "priority": workClass.default_priority,
+ "weight": workClass.default_weight,
+ }
+
+ return results
+
+
</ins><span class="cx"> @inlineCallbacks
</span><span class="cx"> def runlock(self):
</span><span class="cx"> """
</span></span></pre>
</div>
</div>
</body>
</html>