[CalendarServer-changes] [15118] twext/trunk/twext/enterprise/jobs

source_changes at macosforge.org source_changes at macosforge.org
Thu Sep 10 12:48:57 PDT 2015


Revision: 15118
          http://trac.calendarserver.org//changeset/15118
Author:   cdaboo at apple.com
Date:     2015-09-10 12:48:57 -0700 (Thu, 10 Sep 2015)
Log Message:
-----------
Add ability to override default work item priority and weight values.

Modified Paths:
--------------
    twext/trunk/twext/enterprise/jobs/jobitem.py
    twext/trunk/twext/enterprise/jobs/test/test_jobs.py
    twext/trunk/twext/enterprise/jobs/workitem.py

Modified: twext/trunk/twext/enterprise/jobs/jobitem.py
===================================================================
--- twext/trunk/twext/enterprise/jobs/jobitem.py	2015-09-10 16:15:53 UTC (rev 15117)
+++ twext/trunk/twext/enterprise/jobs/jobitem.py	2015-09-10 19:48:57 UTC (rev 15118)
@@ -594,6 +594,19 @@
 
 
     @classmethod
+    def allWorkTypes(cls):
+        """
+        Map all L{WorkItem} sub-classes table names to the class type.
+
+        @return: All of the work item types.
+        @rtype: L{dict}
+        """
+        if cls._workTypeMap is None:
+            cls.workTypes()
+        return cls._workTypeMap
+
+
+    @classmethod
     def numberOfWorkTypes(cls):
         return len(cls.workTypes())
 

Modified: twext/trunk/twext/enterprise/jobs/test/test_jobs.py
===================================================================
--- twext/trunk/twext/enterprise/jobs/test/test_jobs.py	2015-09-10 16:15:53 UTC (rev 15117)
+++ twext/trunk/twext/enterprise/jobs/test/test_jobs.py	2015-09-10 19:48:57 UTC (rev 15118)
@@ -41,7 +41,8 @@
 from twext.enterprise.jobs.utils import inTransaction, astimestamp
 from twext.enterprise.jobs.workitem import \
     WorkItem, SingletonWorkItem, \
-    WORK_PRIORITY_LOW, WORK_PRIORITY_HIGH, WORK_PRIORITY_MEDIUM
+    WORK_PRIORITY_LOW, WORK_PRIORITY_HIGH, WORK_PRIORITY_MEDIUM, WORK_WEIGHT_5, \
+    WORK_WEIGHT_1, WORK_WEIGHT_10, WORK_WEIGHT_0
 from twext.enterprise.jobs.jobitem import \
     JobItem, JobDescriptor, JobFailedError, JobTemporaryError
 from twext.enterprise.jobs.queue import \
@@ -187,18 +188,6 @@
 
 SQL = passthru
 
-nodeSchema = SQL(
-    """
-    create table NODE_INFO (
-      HOSTNAME varchar(255) not null,
-      PID integer not null,
-      PORT integer not null,
-      TIME timestamp default current_timestamp not null,
-      primary key (HOSTNAME, PORT)
-    );
-    """
-)
-
 jobSchema = SQL(
     """
     create table JOB (
@@ -241,6 +230,12 @@
       A integer, B integer,
       DELETE_ON_LOAD integer default 0
     );
+    create table UPDATE_WORK_ITEM (
+      WORK_ID integer primary key,
+      JOB_ID integer references JOB,
+      A integer, B integer,
+      DELETE_ON_LOAD integer default 0
+    );
     """
 )
 
@@ -253,7 +248,8 @@
             "DUMMY_WORK_ITEM",
             "DUMMY_WORK_SINGLETON_ITEM",
             "DUMMY_WORK_PAUSE_ITEM",
-            "AGGREGATOR_WORK_ITEM"
+            "AGGREGATOR_WORK_ITEM",
+            "UPDATE_WORK_ITEM",
         )
     ] + ["delete from job"]
 except SkipTest as e:
@@ -261,12 +257,14 @@
     DummyWorkSingletonItemTable = object
     DummyWorkPauseItemTable = object
     AggregatorWorkItemTable = object
+    UpdateWorkItemTable = object
     skip = e
 else:
     DummyWorkItemTable = fromTable(schema.DUMMY_WORK_ITEM)
     DummyWorkSingletonItemTable = fromTable(schema.DUMMY_WORK_SINGLETON_ITEM)
     DummyWorkPauseItemTable = fromTable(schema.DUMMY_WORK_PAUSE_ITEM)
     AggregatorWorkItemTable = fromTable(schema.AGGREGATOR_WORK_ITEM)
+    UpdateWorkItemTable = fromTable(schema.UPDATE_WORK_ITEM)
     skip = False
 
 
@@ -358,6 +356,17 @@
 
 
 
+class UpdateWorkItem(WorkItem, UpdateWorkItemTable):
+    """
+    Sample L{WorkItem} subclass that will have its weight and
+    priority changed.
+    """
+
+    default_priority = WORK_PRIORITY_MEDIUM
+    default_weight = WORK_WEIGHT_5
+
+
+
 class AMPTests(TestCase):
     """
     Tests for L{AMP} faithfully relaying ids across the wire.
@@ -442,7 +451,7 @@
         """
         L{ControllerQueue.enqueueWork} will insert a job and a work item.
         """
-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        dbpool = buildConnectionPool(self, jobSchema + schemaText)
         yield self._enqueue(dbpool, 1, 2)
 
         # Make sure we have one JOB and one DUMMY_WORK_ITEM
@@ -469,7 +478,7 @@
         """
         L{JobItem.assign} will mark a job as assigned.
         """
-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        dbpool = buildConnectionPool(self, jobSchema + schemaText)
         yield self._enqueue(dbpool, 1, 2)
 
         # Make sure we have one JOB and one DUMMY_WORK_ITEM
@@ -497,7 +506,7 @@
         L{JobItem.nextjob} returns the correct job based on priority.
         """
 
-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        dbpool = buildConnectionPool(self, jobSchema + schemaText)
         now = datetime.datetime.utcnow()
 
         # Empty job queue
@@ -585,7 +594,7 @@
         """
         L{ControllerQueue.enqueueWork} will insert a job and a work item.
         """
-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        dbpool = buildConnectionPool(self, jobSchema + schemaText)
 
         yield self._enqueue(dbpool, 1, 2, cl=DummyWorkItem)
 
@@ -606,7 +615,7 @@
         """
         L{ControllerQueue.enqueueWork} will insert a job and a work item.
         """
-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        dbpool = buildConnectionPool(self, jobSchema + schemaText)
 
         yield self._enqueue(dbpool, 1, 2, cl=DummyWorkSingletonItem)
 
@@ -627,7 +636,7 @@
         """
         L{ControllerQueue.enqueueWork} will insert a job and a work item.
         """
-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        dbpool = buildConnectionPool(self, jobSchema + schemaText)
 
         qpool = yield self._enqueue(dbpool, 1, 2, cl=DummyWorkSingletonItem, notBefore=datetime.datetime(2014, 5, 17, 12, 0, 0))
 
@@ -657,7 +666,138 @@
         self.assertTrue(work[0][1].notBefore != datetime.datetime(2014, 5, 17, 12, 0, 0))
 
 
+    def test_updateWorkTypes(self):
+        """
+        L{workItem.updateWorkTypes} updates weight and priority correctly.
+        """
+        _ignore_dbpool = buildConnectionPool(self, jobSchema + schemaText)
 
+        def _validate(priority, weight):
+            self.assertEqual(AggregatorWorkItem.default_priority, WORK_PRIORITY_LOW)
+            self.assertEqual(AggregatorWorkItem.default_weight, WORK_WEIGHT_5)
+            self.assertEqual(UpdateWorkItem.default_priority, priority)
+            self.assertEqual(UpdateWorkItem.default_weight, weight)
+
+        # Check current values
+        _validate(WORK_PRIORITY_MEDIUM, WORK_WEIGHT_5)
+
+        # Change priority and weight
+        WorkItem.updateWorkTypes({
+            "UPDATE_WORK_ITEM": {
+                "priority": WORK_PRIORITY_HIGH,
+                "weight": WORK_WEIGHT_1,
+            },
+        })
+        _validate(WORK_PRIORITY_HIGH, WORK_WEIGHT_1)
+
+        # Change priority only
+        WorkItem.updateWorkTypes({
+            "UPDATE_WORK_ITEM": {
+                "priority": WORK_PRIORITY_LOW,
+            },
+        })
+        _validate(WORK_PRIORITY_LOW, WORK_WEIGHT_1)
+
+        # Change weight only
+        WorkItem.updateWorkTypes({
+            "UPDATE_WORK_ITEM": {
+                "weight": WORK_WEIGHT_10,
+            },
+        })
+        _validate(WORK_PRIORITY_LOW, WORK_WEIGHT_10)
+
+        # No change
+        WorkItem.updateWorkTypes({
+            "UPDATE_WORK_ITEM": {
+            },
+        })
+        _validate(WORK_PRIORITY_LOW, WORK_WEIGHT_10)
+        WorkItem.updateWorkTypes({
+        })
+        _validate(WORK_PRIORITY_LOW, WORK_WEIGHT_10)
+
+        # Change priority and weight, with invalid work
+        WorkItem.updateWorkTypes({
+            "UPDATE_WORK_ITEM": {
+                "priority": WORK_PRIORITY_HIGH,
+                "weight": WORK_WEIGHT_1,
+            },
+            "FOO_WORK_ITEM": {
+                "priority": WORK_PRIORITY_HIGH,
+                "weight": WORK_WEIGHT_1,
+            },
+        })
+        _validate(WORK_PRIORITY_HIGH, WORK_WEIGHT_1)
+
+        # Invalid priority and weight
+        WorkItem.updateWorkTypes({
+            "UPDATE_WORK_ITEM": {
+                "priority": WORK_PRIORITY_HIGH + 1,
+                "weight": WORK_WEIGHT_10 + 11,
+            },
+        })
+        _validate(WORK_PRIORITY_HIGH, WORK_WEIGHT_1)
+
+        # Invalid priority and valid weight
+        WorkItem.updateWorkTypes({
+            "UPDATE_WORK_ITEM": {
+                "priority": WORK_PRIORITY_HIGH + 1,
+                "weight": WORK_WEIGHT_0,
+            },
+        })
+        _validate(WORK_PRIORITY_HIGH, WORK_WEIGHT_0)
+
+        # Valid priority and invalid weight
+        WorkItem.updateWorkTypes({
+            "UPDATE_WORK_ITEM": {
+                "priority": WORK_PRIORITY_LOW,
+                "weight": WORK_WEIGHT_0 - 1,
+            },
+        })
+        _validate(WORK_PRIORITY_LOW, WORK_WEIGHT_0)
+
+
+    def test_dumpWorkTypes(self):
+        """
+        L{workItem.dumpWorkTypes} dumps weight and priority correctly.
+        """
+        _ignore_dbpool = buildConnectionPool(self, jobSchema + schemaText)
+
+        results = WorkItem.dumpWorkTypes()
+        self.assertTrue("DUMMY_WORK_SINGLETON_ITEM" in results)
+        self.assertEqual(
+            results["DUMMY_WORK_SINGLETON_ITEM"],
+            {"priority": WORK_PRIORITY_LOW, "weight": WORK_WEIGHT_5},
+        )
+
+        # Change priority and weight
+        WorkItem.updateWorkTypes({
+            "UPDATE_WORK_ITEM": {
+                "priority": WORK_PRIORITY_HIGH,
+                "weight": WORK_WEIGHT_1,
+            },
+        })
+        results = WorkItem.dumpWorkTypes()
+        self.assertEqual(
+            results["UPDATE_WORK_ITEM"],
+            {"priority": WORK_PRIORITY_HIGH, "weight": WORK_WEIGHT_1},
+        )
+
+        # Change priority and weight again
+        WorkItem.updateWorkTypes({
+            "UPDATE_WORK_ITEM": {
+                "priority": WORK_PRIORITY_MEDIUM,
+                "weight": WORK_WEIGHT_10,
+            },
+        })
+        results = WorkItem.dumpWorkTypes()
+        self.assertEqual(
+            results["UPDATE_WORK_ITEM"],
+            {"priority": WORK_PRIORITY_MEDIUM, "weight": WORK_WEIGHT_10},
+        )
+
+
+
 class WorkerConnectionPoolTests(TestCase):
     """
     A L{WorkerConnectionPool} is responsible for managing, in a node's
@@ -694,7 +834,7 @@
         Setup pool and reactor clock for time stepped tests.
         """
         reactor = MemoryReactorWithClock()
-        cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
+        cph = SteppablePoolHelper(jobSchema + schemaText)
         then = datetime.datetime(2012, 12, 12, 12, 12, 12)
         reactor.advance(astimestamp(then))
         cph.setUp(self)
@@ -903,7 +1043,7 @@
         L{ControllerQueue.startService} kicks off the idle work-check loop.
         """
         reactor = MemoryReactorWithClock()
-        cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
+        cph = SteppablePoolHelper(jobSchema + schemaText)
         then = datetime.datetime(2012, 12, 12, 12, 12, 0)
         reactor.advance(astimestamp(then))
         cph.setUp(self)

Modified: twext/trunk/twext/enterprise/jobs/workitem.py
===================================================================
--- twext/trunk/twext/enterprise/jobs/workitem.py	2015-09-10 16:15:53 UTC (rev 15117)
+++ twext/trunk/twext/enterprise/jobs/workitem.py	2015-09-10 19:48:57 UTC (rev 15118)
@@ -194,6 +194,83 @@
         returnValue(workItems)
 
 
+    @classmethod
+    def updateWorkTypes(cls, updates):
+        """
+        Update the priority and weight values of each specified work type.
+
+        @param updates: a dict whose workType is the work class name, and whose
+            settings is a dict containing one or both of "weight" and "priority"
+            keys and numeric values to change to.
+        @type updates: L{dict}
+        """
+
+        for workType, settings in updates.items():
+            try:
+                workItem = JobItem.workItemForType(workType)
+            except KeyError:
+                log.error(
+                    "updateWorkTypes: '{workType}' is not a valid work type",
+                    workType=workType,
+                )
+                continue
+            if "priority" in settings:
+                priority = settings["priority"]
+                try:
+                    priority = int(priority)
+                    if not (WORK_PRIORITY_LOW <= priority <= WORK_PRIORITY_HIGH):
+                        raise ValueError
+                except ValueError:
+                    log.error(
+                        "updateWorkTypes: '{workType}' priority '{priority}' is not value",
+                        workType=workType, priority=priority,
+                    )
+                else:
+                    workItem.default_priority = priority
+            else:
+                priority = "unchanged"
+            if "weight" in settings:
+                weight = settings["weight"]
+                try:
+                    weight = int(weight)
+                    if not (WORK_WEIGHT_0 <= weight <= WORK_WEIGHT_10):
+                        raise ValueError
+                except ValueError:
+                    log.error(
+                        "updateWorkTypes: '{workType}' weight '{weight}' is not value",
+                        workType=workType, weight=weight,
+                    )
+                else:
+                    workItem.default_weight = weight
+            else:
+                weight = "unchanged"
+            log.info(
+                "updateWorkTypes: '{workType}' priority: '{priority}' weight: '{weight}' ",
+                workType=workType, priority=priority,
+            )
+
+
+    @classmethod
+    def dumpWorkTypes(cls):
+        """
+        Dump the priority and weight values of each known work type.
+
+        @return: a dict whose workType is the work class name, and whose
+            settings is a dict containing one or both of "weight" and "priority"
+            keys and numeric values to change to.
+        @rtype: L{dict}
+        """
+
+        results = {}
+        for workType, workClass in JobItem.allWorkTypes().items():
+            results[workType] = {
+                "priority": workClass.default_priority,
+                "weight": workClass.default_weight,
+            }
+
+        return results
+
+
     @inlineCallbacks
     def runlock(self):
         """
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150910/a457d7f1/attachment-0001.html>


More information about the calendarserver-changes mailing list