[CalendarServer-changes] [14775] twext/trunk/twext/enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Mon May 11 13:11:43 PDT 2015
Revision: 14775
http://trac.calendarserver.org//changeset/14775
Author: cdaboo at apple.com
Date: 2015-05-11 13:11:43 -0700 (Mon, 11 May 2015)
Log Message:
-----------
Support the ability to pause/unpause a job.
Modified Paths:
--------------
twext/trunk/twext/enterprise/dal/record.py
twext/trunk/twext/enterprise/dal/test/test_record.py
twext/trunk/twext/enterprise/jobqueue.py
twext/trunk/twext/enterprise/test/test_jobqueue.py
Modified: twext/trunk/twext/enterprise/dal/record.py
===================================================================
--- twext/trunk/twext/enterprise/dal/record.py 2015-05-10 15:40:44 UTC (rev 14774)
+++ twext/trunk/twext/enterprise/dal/record.py 2015-05-11 20:11:43 UTC (rev 14775)
@@ -594,6 +594,21 @@
@classmethod
+ def updatesome(cls, transaction, where, **kw):
+ """
+ Update rows matching the where expression from the table that corresponds to C{cls}.
+ """
+ colmap = {}
+ for k, v in kw.iteritems():
+ colmap[cls.__attrmap__[k]] = v
+
+ return Update(
+ colmap,
+ Where=where
+ ).on(transaction)
+
+
+ @classmethod
def deleteall(cls, transaction):
"""
Delete all rows from the table that corresponds to C{cls}.
Modified: twext/trunk/twext/enterprise/dal/test/test_record.py
===================================================================
--- twext/trunk/twext/enterprise/dal/test/test_record.py 2015-05-10 15:40:44 UTC (rev 14774)
+++ twext/trunk/twext/enterprise/dal/test/test_record.py 2015-05-11 20:11:43 UTC (rev 14775)
@@ -363,6 +363,45 @@
@inlineCallbacks
+ def test_updatesome(self):
+ """
+ L{Record.updatesome} will update all instances of the matching records.
+ """
+ txn = self.pool.connection()
+ data = [(123, u"one"), (456, u"four"), (345, u"three"),
+ (234, u"two"), (356, u"three")]
+ for beta, gamma in data:
+ yield txn.execSQL("insert into ALPHA values (:1, :2)",
+ [beta, gamma])
+
+ yield TestRecord.updatesome(txn, where=(TestRecord.beta == 123), gamma=u"changed")
+ yield txn.commit()
+
+ txn = self.pool.connection()
+ records = yield TestRecord.all(txn)
+ self.assertEqual(
+ set([(record.beta, record.gamma,) for record in records]),
+ set([
+ (123, u"changed"), (456, u"four"), (345, u"three"),
+ (234, u"two"), (356, u"three")
+ ])
+ )
+
+ yield TestRecord.updatesome(txn, where=(TestRecord.beta.In((234, 345,))), gamma=u"changed-2")
+ yield txn.commit()
+
+ txn = self.pool.connection()
+ records = yield TestRecord.all(txn)
+ self.assertEqual(
+ set([(record.beta, record.gamma,) for record in records]),
+ set([
+ (123, u"changed"), (456, u"four"), (345, u"changed-2"),
+ (234, u"changed-2"), (356, u"three")
+ ])
+ )
+
+
+ @inlineCallbacks
def test_deleteall(self):
"""
L{Record.deleteall} will delete all instances of the record.
Modified: twext/trunk/twext/enterprise/jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/jobqueue.py 2015-05-10 15:40:44 UTC (rev 14774)
+++ twext/trunk/twext/enterprise/jobqueue.py 2015-05-11 20:11:43 UTC (rev 14775)
@@ -146,7 +146,8 @@
)
from twext.enterprise.dal.model import ProcedureCall, Sequence
-from twext.enterprise.dal.record import Record, fromTable, NoSuchRecord
+from twext.enterprise.dal.record import Record, fromTable, NoSuchRecord, \
+ SerializableRecord
from twisted.python.failure import Failure
from twext.enterprise.dal.model import Table, Schema, SQLType, Constraint
@@ -237,6 +238,7 @@
JobTable.addColumn("ASSIGNED", SQLType("timestamp", None), default=None)
JobTable.addColumn("OVERDUE", SQLType("timestamp", None), default=None)
JobTable.addColumn("FAILED", SQLType("integer", 0), default=0)
+ JobTable.addColumn("PAUSE", SQLType("integer", 0), default=0)
return inSchema
@@ -351,6 +353,7 @@
class JobItem(Record, fromTable(JobInfoSchema.JOB)):
"""
+ @DynamicAttrs
An item in the job table. This is typically not directly used by code
creating work items, but rather is used for internal book keeping of jobs
associated with work items.
@@ -443,6 +446,21 @@
)
+ def pauseIt(self, pause=False):
+ """
+ Pause the L{JobItem} leaving all other attributes the same. The job processing loop
+ will skip paused items.
+
+ @param pause: indicates whether the job should be paused.
+ @type pause: L{bool}
+ @param delay: how long before the job is run again, or C{None} for a default
+ staggered delay behavior.
+ @type delay: L{int}
+ """
+
+ return self.update(pause=pause)
+
+
@classmethod
@inlineCallbacks
def ultimatelyPerform(cls, txnFactory, jobID):
@@ -612,15 +630,17 @@
@rtype: L{JobItem}
"""
+ queryExpr = (cls.notBefore <= now).And(cls.priority >= minPriority).And(cls.pause == 0).And(
+ (cls.assigned == None).Or(cls.overdue < now)
+ )
+
if txn.dialect == ORACLE_DIALECT:
# Oracle does not support a "for update" clause with "order by". So do the
# "for update" as a second query right after the first. Will need to check
# how this might impact concurrency in a multi-host setup.
jobs = yield cls.query(
txn,
- (cls.notBefore <= now).And(cls.priority >= minPriority).And(
- (cls.assigned == None).Or(cls.overdue < now)
- ),
+ queryExpr,
order=(cls.assigned, cls.priority),
ascending=False,
limit=limit,
@@ -635,9 +655,7 @@
else:
jobs = yield cls.query(
txn,
- (cls.notBefore <= now).And(cls.priority >= minPriority).And(
- (cls.assigned == None).Or(cls.overdue < now)
- ),
+ queryExpr,
order=(cls.assigned, cls.priority),
ascending=False,
forUpdate=True,
@@ -900,7 +918,7 @@
-class WorkItem(Record):
+class WorkItem(SerializableRecord):
"""
A L{WorkItem} is an item of work which may be stored in a database, then
executed later.
@@ -1028,6 +1046,7 @@
_transferArg("priority")
_transferArg("weight")
_transferArg("notBefore")
+ _transferArg("pause")
# Always need a notBefore
if "notBefore" not in jobargs:
Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/test/test_jobqueue.py 2015-05-10 15:40:44 UTC (rev 14774)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py 2015-05-11 20:11:43 UTC (rev 14775)
@@ -207,7 +207,8 @@
NOT_BEFORE timestamp not null,
ASSIGNED timestamp default null,
OVERDUE timestamp default null,
- FAILED integer default 0
+ FAILED integer default 0,
+ PAUSE integer default 0
);
"""
)
@@ -533,6 +534,31 @@
self.assertTrue(job is None)
self.assertTrue(work is None)
+ # Unassigned, paused job with past notBefore not returned
+ yield self._enqueue(dbpool, 3, 1, now + datetime.timedelta(days=-1), priority=WORK_PRIORITY_HIGH)
+ @inlineCallbacks
+ def pauseJob(txn, pause=True):
+ works = yield DummyWorkItem.all(txn)
+ for work in works:
+ if work.a == 3:
+ job = yield JobItem.load(txn, work.jobID)
+ yield job.pauseIt(pause)
+ yield inTransaction(dbpool.connection, pauseJob)
+ job, work = yield inTransaction(dbpool.connection, _next)
+ self.assertTrue(job is None)
+ self.assertTrue(work is None)
+
+ # Unassigned, paused then unpaused job with past notBefore is returned
+ yield inTransaction(dbpool.connection, pauseJob, pause=False)
+ job, work = yield inTransaction(dbpool.connection, _next)
+ self.assertTrue(job is not None)
+ self.assertTrue(work.a == 3)
+ @inlineCallbacks
+ def deleteJob(txn, jobID):
+ job = yield JobItem.load(txn, jobID)
+ yield job.delete()
+ yield inTransaction(dbpool.connection, deleteJob, jobID=job.jobID)
+
# Unassigned low priority job with past notBefore not returned if high priority required
yield self._enqueue(dbpool, 4, 1, now + datetime.timedelta(days=-1))
job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_HIGH)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150511/81ac263a/attachment.html>
More information about the calendarserver-changes
mailing list