[CalendarServer-changes] [14775] twext/trunk/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Mon May 11 13:11:43 PDT 2015


Revision: 14775
          http://trac.calendarserver.org//changeset/14775
Author:   cdaboo at apple.com
Date:     2015-05-11 13:11:43 -0700 (Mon, 11 May 2015)
Log Message:
-----------
Support the ability to pause/unpause a job.

Modified Paths:
--------------
    twext/trunk/twext/enterprise/dal/record.py
    twext/trunk/twext/enterprise/dal/test/test_record.py
    twext/trunk/twext/enterprise/jobqueue.py
    twext/trunk/twext/enterprise/test/test_jobqueue.py

Modified: twext/trunk/twext/enterprise/dal/record.py
===================================================================
--- twext/trunk/twext/enterprise/dal/record.py	2015-05-10 15:40:44 UTC (rev 14774)
+++ twext/trunk/twext/enterprise/dal/record.py	2015-05-11 20:11:43 UTC (rev 14775)
@@ -594,6 +594,21 @@
 
 
     @classmethod
+    def updatesome(cls, transaction, where, **kw):
+        """
+        Update rows matching the where expression from the table that corresponds to C{cls}.
+        """
+        colmap = {}
+        for k, v in kw.iteritems():
+            colmap[cls.__attrmap__[k]] = v
+
+        return Update(
+            colmap,
+            Where=where
+        ).on(transaction)
+
+
+    @classmethod
     def deleteall(cls, transaction):
         """
         Delete all rows from the table that corresponds to C{cls}.

Modified: twext/trunk/twext/enterprise/dal/test/test_record.py
===================================================================
--- twext/trunk/twext/enterprise/dal/test/test_record.py	2015-05-10 15:40:44 UTC (rev 14774)
+++ twext/trunk/twext/enterprise/dal/test/test_record.py	2015-05-11 20:11:43 UTC (rev 14775)
@@ -363,6 +363,45 @@
 
 
     @inlineCallbacks
+    def test_updatesome(self):
+        """
+        L{Record.updatesome} will update all instances of the matching records.
+        """
+        txn = self.pool.connection()
+        data = [(123, u"one"), (456, u"four"), (345, u"three"),
+                (234, u"two"), (356, u"three")]
+        for beta, gamma in data:
+            yield txn.execSQL("insert into ALPHA values (:1, :2)",
+                              [beta, gamma])
+
+        yield TestRecord.updatesome(txn, where=(TestRecord.beta == 123), gamma=u"changed")
+        yield txn.commit()
+
+        txn = self.pool.connection()
+        records = yield TestRecord.all(txn)
+        self.assertEqual(
+            set([(record.beta, record.gamma,) for record in records]),
+            set([
+                (123, u"changed"), (456, u"four"), (345, u"three"),
+                (234, u"two"), (356, u"three")
+            ])
+        )
+
+        yield TestRecord.updatesome(txn, where=(TestRecord.beta.In((234, 345,))), gamma=u"changed-2")
+        yield txn.commit()
+
+        txn = self.pool.connection()
+        records = yield TestRecord.all(txn)
+        self.assertEqual(
+            set([(record.beta, record.gamma,) for record in records]),
+            set([
+                (123, u"changed"), (456, u"four"), (345, u"changed-2"),
+                (234, u"changed-2"), (356, u"three")
+            ])
+        )
+
+
+    @inlineCallbacks
     def test_deleteall(self):
         """
         L{Record.deleteall} will delete all instances of the record.

Modified: twext/trunk/twext/enterprise/jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/jobqueue.py	2015-05-10 15:40:44 UTC (rev 14774)
+++ twext/trunk/twext/enterprise/jobqueue.py	2015-05-11 20:11:43 UTC (rev 14775)
@@ -146,7 +146,8 @@
 )
 
 from twext.enterprise.dal.model import ProcedureCall, Sequence
-from twext.enterprise.dal.record import Record, fromTable, NoSuchRecord
+from twext.enterprise.dal.record import Record, fromTable, NoSuchRecord, \
+    SerializableRecord
 from twisted.python.failure import Failure
 
 from twext.enterprise.dal.model import Table, Schema, SQLType, Constraint
@@ -237,6 +238,7 @@
     JobTable.addColumn("ASSIGNED", SQLType("timestamp", None), default=None)
     JobTable.addColumn("OVERDUE", SQLType("timestamp", None), default=None)
     JobTable.addColumn("FAILED", SQLType("integer", 0), default=0)
+    JobTable.addColumn("PAUSE", SQLType("integer", 0), default=0)
 
     return inSchema
 
@@ -351,6 +353,7 @@
 
 class JobItem(Record, fromTable(JobInfoSchema.JOB)):
     """
+    @DynamicAttrs
     An item in the job table. This is typically not directly used by code
     creating work items, but rather is used for internal book keeping of jobs
     associated with work items.
@@ -443,6 +446,21 @@
         )
 
 
+    def pauseIt(self, pause=False):
+        """
+        Pause the L{JobItem} leaving all other attributes the same. The job processing loop
+        will skip paused items.
+
+        @param pause: indicates whether the job should be paused.
+        @type pause: L{bool}
+        @param delay: how long before the job is run again, or C{None} for a default
+            staggered delay behavior.
+        @type delay: L{int}
+        """
+
+        return self.update(pause=pause)
+
+
     @classmethod
     @inlineCallbacks
     def ultimatelyPerform(cls, txnFactory, jobID):
@@ -612,15 +630,17 @@
         @rtype: L{JobItem}
         """
 
+        queryExpr = (cls.notBefore <= now).And(cls.priority >= minPriority).And(cls.pause == 0).And(
+            (cls.assigned == None).Or(cls.overdue < now)
+        )
+
         if txn.dialect == ORACLE_DIALECT:
             # Oracle does not support a "for update" clause with "order by". So do the
             # "for update" as a second query right after the first. Will need to check
             # how this might impact concurrency in a multi-host setup.
             jobs = yield cls.query(
                 txn,
-                (cls.notBefore <= now).And(cls.priority >= minPriority).And(
-                    (cls.assigned == None).Or(cls.overdue < now)
-                ),
+                queryExpr,
                 order=(cls.assigned, cls.priority),
                 ascending=False,
                 limit=limit,
@@ -635,9 +655,7 @@
         else:
             jobs = yield cls.query(
                 txn,
-                (cls.notBefore <= now).And(cls.priority >= minPriority).And(
-                    (cls.assigned == None).Or(cls.overdue < now)
-                ),
+                queryExpr,
                 order=(cls.assigned, cls.priority),
                 ascending=False,
                 forUpdate=True,
@@ -900,7 +918,7 @@
 
 
 
-class WorkItem(Record):
+class WorkItem(SerializableRecord):
     """
     A L{WorkItem} is an item of work which may be stored in a database, then
     executed later.
@@ -1028,6 +1046,7 @@
         _transferArg("priority")
         _transferArg("weight")
         _transferArg("notBefore")
+        _transferArg("pause")
 
         # Always need a notBefore
         if "notBefore" not in jobargs:

Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/test/test_jobqueue.py	2015-05-10 15:40:44 UTC (rev 14774)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py	2015-05-11 20:11:43 UTC (rev 14775)
@@ -207,7 +207,8 @@
       NOT_BEFORE  timestamp not null,
       ASSIGNED    timestamp default null,
       OVERDUE     timestamp default null,
-      FAILED      integer default 0
+      FAILED      integer default 0,
+      PAUSE       integer default 0
     );
     """
 )
@@ -533,6 +534,31 @@
         self.assertTrue(job is None)
         self.assertTrue(work is None)
 
+        # Unassigned, paused job with past notBefore not returned
+        yield self._enqueue(dbpool, 3, 1, now + datetime.timedelta(days=-1), priority=WORK_PRIORITY_HIGH)
+        @inlineCallbacks
+        def pauseJob(txn, pause=True):
+            works = yield DummyWorkItem.all(txn)
+            for work in works:
+                if work.a == 3:
+                    job = yield JobItem.load(txn, work.jobID)
+                    yield job.pauseIt(pause)
+        yield inTransaction(dbpool.connection, pauseJob)
+        job, work = yield inTransaction(dbpool.connection, _next)
+        self.assertTrue(job is None)
+        self.assertTrue(work is None)
+
+        # Unassigned, paused then unpaused job with past notBefore is returned
+        yield inTransaction(dbpool.connection, pauseJob, pause=False)
+        job, work = yield inTransaction(dbpool.connection, _next)
+        self.assertTrue(job is not None)
+        self.assertTrue(work.a == 3)
+        @inlineCallbacks
+        def deleteJob(txn, jobID):
+            job = yield JobItem.load(txn, jobID)
+            yield job.delete()
+        yield inTransaction(dbpool.connection, deleteJob, jobID=job.jobID)
+
         # Unassigned low priority job with past notBefore not returned if high priority required
         yield self._enqueue(dbpool, 4, 1, now + datetime.timedelta(days=-1))
         job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_HIGH)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150511/81ac263a/attachment.html>


More information about the calendarserver-changes mailing list