[CalendarServer-changes] [13640] twext/trunk/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Mon Jun 16 13:21:42 PDT 2014


Revision: 13640
          http://trac.calendarserver.org//changeset/13640
Author:   cdaboo at apple.com
Date:     2014-06-16 13:21:42 -0700 (Mon, 16 Jun 2014)
Log Message:
-----------
Revised jobqueue locking to eliminate contention, deadlocks and NamedLock use.

Modified Paths:
--------------
    twext/trunk/twext/enterprise/dal/record.py
    twext/trunk/twext/enterprise/dal/test/test_record.py
    twext/trunk/twext/enterprise/fixtures.py
    twext/trunk/twext/enterprise/jobqueue.py
    twext/trunk/twext/enterprise/test/test_jobqueue.py

Modified: twext/trunk/twext/enterprise/dal/record.py
===================================================================
--- twext/trunk/twext/enterprise/dal/record.py	2014-06-16 20:09:15 UTC (rev 13639)
+++ twext/trunk/twext/enterprise/dal/record.py	2014-06-16 20:21:42 UTC (rev 13640)
@@ -31,7 +31,7 @@
 
 from twisted.internet.defer import inlineCallbacks, returnValue
 from twext.enterprise.dal.syntax import (
-    Select, Tuple, Constant, ColumnSyntax, Insert, Update, Delete
+    Select, Tuple, Constant, ColumnSyntax, Insert, Update, Delete, SavepointAction
 )
 from twext.enterprise.util import parseSQLTimestamp
 # from twext.enterprise.dal.syntax import ExpressionSyntax
@@ -330,6 +330,65 @@
         self.__dict__.update(kw)
 
 
+    @inlineCallbacks
+    def lock(self, where=None):
+        """
+        Lock with a select for update.
+
+        @param where: SQL expression used to match the rows to lock, by default this is just an expression
+            that matches the primary key of this L{Record}, but it can be used to lock multiple L{Records}
+            matching the expression in one go. If it is an L{str}, then all rows will be matched.
+        @type where: L{SQLExpression} or L{None}
+        @return: a L{Deferred} that fires when the lock has been acquired.
+        """
+        if where is None:
+            where = self._primaryKeyComparison(self._primaryKeyValue())
+        elif isinstance(where, str):
+            where = None
+        yield Select(
+            list(self.table),
+            From=self.table,
+            Where=where,
+            ForUpdate=True,
+        ).on(self.transaction)
+
+
+    @inlineCallbacks
+    def trylock(self, where=None):
+        """
+        Try to lock with a select for update no wait. If it fails, rollback to
+        a savepoint and return L{False}, else return L{True}.
+
+        @param where: SQL expression used to match the rows to lock, by default this is just an expression
+            that matches the primary key of this L{Record}, but it can be used to lock multiple L{Records}
+            matching the expression in one go. If it is an L{str}, then all rows will be matched.
+        @type where: L{SQLExpression} or L{None}
+        @return: a L{Deferred} that fires when the updates have been sent to
+            the database.
+        """
+
+        if where is None:
+            where = self._primaryKeyComparison(self._primaryKeyValue())
+        elif isinstance(where, str):
+            where = None
+        savepoint = SavepointAction("Record_trylock_{}".format(self.__class__.__name__))
+        yield savepoint.acquire(self.transaction)
+        try:
+            yield Select(
+                list(self.table),
+                From=self.table,
+                Where=where,
+                ForUpdate=True,
+                NoWait=True,
+            ).on(self.transaction)
+        except:
+            yield savepoint.rollback(self.transaction)
+            returnValue(False)
+        else:
+            yield savepoint.release(self.transaction)
+            returnValue(True)
+
+
     @classmethod
     def pop(cls, transaction, *primaryKey):
         """

Modified: twext/trunk/twext/enterprise/dal/test/test_record.py
===================================================================
--- twext/trunk/twext/enterprise/dal/test/test_record.py	2014-06-16 20:09:15 UTC (rev 13639)
+++ twext/trunk/twext/enterprise/dal/test/test_record.py	2014-06-16 20:21:42 UTC (rev 13640)
@@ -415,3 +415,47 @@
             Record.namingConvention(u"LIKE_THIS_ID"),
             "likeThisID"
         )
+
+
+    @inlineCallbacks
+    def test_lock(self):
+        """
+        A L{Record} may be locked, with L{Record.lock}.
+        """
+        txn = self.pool.connection()
+        for beta, gamma in [
+            (123, u"one"),
+            (234, u"two"),
+            (345, u"three"),
+            (356, u"three"),
+            (456, u"four"),
+        ]:
+            yield txn.execSQL(
+                "insert into ALPHA values (:1, :2)", [beta, gamma]
+            )
+
+        rec = yield TestRecord.load(txn, 234)
+        yield rec.lock()
+        self.assertEqual(rec.gamma, u'two')
+
+
+    @inlineCallbacks
+    def test_trylock(self):
+        """
+        A L{Record} may be locked, with L{Record.trylock}.
+        """
+        txn = self.pool.connection()
+        for beta, gamma in [
+            (123, u"one"),
+            (234, u"two"),
+            (345, u"three"),
+            (356, u"three"),
+            (456, u"four"),
+        ]:
+            yield txn.execSQL(
+                "insert into ALPHA values (:1, :2)", [beta, gamma]
+            )
+
+        rec = yield TestRecord.load(txn, 234)
+        result = yield rec.trylock()
+        self.assertTrue(result)

Modified: twext/trunk/twext/enterprise/fixtures.py
===================================================================
--- twext/trunk/twext/enterprise/fixtures.py	2014-06-16 20:09:15 UTC (rev 13639)
+++ twext/trunk/twext/enterprise/fixtures.py	2014-06-16 20:21:42 UTC (rev 13640)
@@ -59,7 +59,7 @@
     seqs = {}
 
     def connectionFactory(label=testCase.id()):
-        conn = sqlite3.connect(sqlitename)
+        conn = sqlite3.connect(sqlitename, isolation_level=None)
 
         def nextval(seq):
             result = seqs[seq] = seqs.get(seq, 0) + 1
@@ -326,7 +326,7 @@
     tmpdb = test.mktemp()
 
     def connect():
-        return sqlite3.connect(tmpdb)
+        return sqlite3.connect(tmpdb, isolation_level=None)
 
     return connect
 

Modified: twext/trunk/twext/enterprise/jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/jobqueue.py	2014-06-16 20:09:15 UTC (rev 13639)
+++ twext/trunk/twext/enterprise/jobqueue.py	2014-06-16 20:21:42 UTC (rev 13640)
@@ -136,7 +136,6 @@
 from twisted.internet.endpoints import TCP4ServerEndpoint
 from twext.enterprise.ienterprise import IQueuer
 from zope.interface.interface import Interface
-from twext.enterprise.locking import NamedLock
 
 import collections
 import time
@@ -219,6 +218,7 @@
     JobTable.addColumn("WEIGHT", SQLType("integer", 0), default=0)
     JobTable.addColumn("NOT_BEFORE", SQLType("timestamp", None), notNull=True)
     JobTable.addColumn("ASSIGNED", SQLType("timestamp", None), default=None)
+    JobTable.addColumn("OVERDUE", SQLType("timestamp", None), default=None)
     JobTable.addColumn("FAILED", SQLType("integer", 0), default=0)
 
     return inSchema
@@ -309,43 +309,97 @@
 
 
 
+class JobRunningError(Exception):
+    """
+    A job is already running.
+    """
+    pass
+
+
+
 class JobItem(Record, fromTable(JobInfoSchema.JOB)):
     """
     An item in the job table. This is typically not directly used by code
     creating work items, but rather is used for internal book keeping of jobs
     associated with work items.
+
+    The JOB table has some important columns that determine how a job is being scheduled:
+
+    NOT_BEFORE - this is a timestamp indicating when the job is expected to run. It will not
+    run before this time, but may run quite some time after (if the service is busy).
+
+    ASSIGNED - this is a timestamp that is initially NULL but set when the job processing loop
+    assigns the job to a child process to be executed. Thus, if the value is not NULL, then the
+    job is (probably) being executed. The child process is supposed to delete the L{JobItem}
+    when it is done, however if the child dies without executing the job, then the job
+    processing loop needs to detect it.
+
+    OVERDUE - this is a timestamp initially set when an L{JobItem} is assigned. It represents
+    a point in the future when the job is expected to be finished. The job processing loop skips
+    jobs that have a non-NULL ASSIGNED value and whose OVERDUE value has not been passed. If
+    OVERDUE is in the past, then the job processing loop checks to see if the job is still
+    running - which is determined by whether a row lock exists on the work item (see
+    L{isRunning}. If the job is still running then OVERDUE is bumped up to a new point in the
+    future, if it is not still running the job is marked as failed - which will reschedule it.
+
+    FAILED - a count of the number of times a job has failed or had its overdue count bumped.
+
+    The above behavior depends on some important locking behavior: when an L{JobItem} is run,
+    it locks the L{WorkItem} row corresponding to the job (it may lock other associated
+    rows - e.g., other L{WorkItem}'s in the same group). It does not lock the L{JobItem}
+    row corresponding to the job because the job processing loop may need to update the
+    OVERDUE value of that row if the work takes a long time to complete.
     """
 
     _workTypes = None
     _workTypeMap = None
 
+    failureRescheduleInterval = 60  # When a job fails, reschedule it this number of seconds in the future
+    lockRescheduleInterval = 5      # When a job is locked, reschedule it this number of seconds in the future
+
     def descriptor(self):
         return JobDescriptor(self.jobID, self.weight, self.workType)
 
 
-    def assign(self, now):
+    def assign(self, when, overdue):
         """
         Mark this job as assigned to a worker by setting the assigned column to the current,
-        or provided, timestamp.
+        or provided, timestamp. Also set the overdue value to help determine if a job is orphaned.
 
-        @param now: current timestamp
-        @type now: L{datetime.datetime}
-        @param when: explicitly set the assigned time - typically only used in tests
-        @type when: L{datetime.datetime} or L{None}
+        @param when: current timestamp
+        @type when: L{datetime.datetime}
+        @param overdue: number of seconds after assignment that the job will be considered overdue
+        @type overdue: L{int}
         """
-        return self.update(assigned=now)
+        return self.update(assigned=when, overdue=when + timedelta(seconds=overdue))
 
 
-    def failedToRun(self):
+    def bumpOverdue(self, bump):
         """
+        Increment the overdue value by the specified number of seconds. Used when an overdue job
+        is still running in a child process but the job processing loop has detected it as overdue.
+
+        @param bump: number of seconds to increment overdue by
+        @type bump: L{int}
+        """
+        return self.update(overdue=self.overdue + timedelta(seconds=bump))
+
+
+    def failedToRun(self, delay=None):
+        """
         The attempt to run the job failed. Leave it in the queue, but mark it
         as unassigned, bump the failure count and set to run at some point in
         the future.
+
+        @param delay: the number of seconds in the future at which to reschedule the
+            next execution of the job. If L{None} use the default class property value.
+        @type delay: L{int}
         """
         return self.update(
             assigned=None,
+            overdue=None,
             failed=self.failed + 1,
-            notBefore=datetime.utcnow() + timedelta(seconds=60)
+            notBefore=datetime.utcnow() + timedelta(seconds=self.failureRescheduleInterval if delay is None else delay)
         )
 
 
@@ -354,15 +408,15 @@
     def ultimatelyPerform(cls, txnFactory, jobID):
         """
         Eventually, after routing the job to the appropriate place, somebody
-        actually has to I{do} it.
+        actually has to I{do} it. This method basically calls L{JobItem.run}
+        but it does a bunch of "booking" to track the transaction and log failures
+        and timing information.
 
         @param txnFactory: a 0- or 1-argument callable that creates an
             L{IAsyncTransaction}
         @type txnFactory: L{callable}
-
         @param jobID: the ID of the job to be performed
         @type jobID: L{int}
-
         @return: a L{Deferred} which fires with C{None} when the job has been
             performed, or fails if the job can't be performed.
         """
@@ -396,8 +450,9 @@
                 tm=_tm(),
             )
 
-        except JobFailedError:
+        except (JobFailedError, JobRunningError) as e:
             # Job failed: abort with cleanup, but pretend this method succeeded
+            delay = job.lockRescheduleInterval if isinstance(e, JobRunningError) else job.failureRescheduleInterval
             def _cleanUp():
                 @inlineCallbacks
                 def _cleanUp2(txn2):
@@ -408,16 +463,17 @@
                         count=job.failed + 1,
                         tm=_tm(),
                     )
-                    yield job.failedToRun()
+                    yield job.failedToRun(delay=delay)
                 return inTransaction(txnFactory, _cleanUp2, "ultimatelyPerform._cleanUp")
-            txn.postAbort(_cleanUp)
-            yield txn.abort()
             log.debug(
-                "JobItem: {jobid} failed {work} t={tm}",
+                "JobItem: {jobid} {desc} {work} t={tm}",
                 jobid=jobID,
+                desc="failed" if isinstance(e, JobFailedError) else "locked",
                 work=job.workType,
                 tm=_tm(),
             )
+            txn.postAbort(_cleanUp)
+            yield txn.abort()
 
         except:
             f = Failure()
@@ -445,27 +501,25 @@
 
     @classmethod
     @inlineCallbacks
-    def nextjob(cls, txn, now, minPriority, overdue):
+    def nextjob(cls, txn, now, minPriority):
         """
         Find the next available job based on priority, also return any that are overdue. This
-        method relies on there being a nextjob() SQL stored procedure to enable skipping over
-        items which are row locked to help avoid contention when multiple nodes are operating
-        on the job queue simultaneously.
+        method uses an SQL query to find the matching jobs, and sorts based on the NOT_BEFORE
+        value and priority..
 
         @param txn: the transaction to use
         @type txn: L{IAsyncTransaction}
-        @param now: current timestamp
+        @param now: current timestamp - needed for unit tests that might use their
+            own clock.
         @type now: L{datetime.datetime}
         @param minPriority: lowest priority level to query for
         @type minPriority: L{int}
-        @param overdue: how long before an assigned item is considered overdue
-        @type overdue: L{datetime.datetime}
 
         @return: the job record
         @rtype: L{JobItem}
         """
 
-        jobs = yield cls.nextjobs(txn, now, minPriority, overdue, limit=1)
+        jobs = yield cls.nextjobs(txn, now, minPriority, limit=1)
 
         # Must only be one or zero
         if jobs and len(jobs) > 1:
@@ -476,7 +530,7 @@
 
     @classmethod
     @inlineCallbacks
-    def nextjobs(cls, txn, now, minPriority, overdue, limit=1):
+    def nextjobs(cls, txn, now, minPriority, limit=1):
         """
         Find the next available job based on priority, also return any that are overdue. This
         method relies on there being a nextjob() SQL stored procedure to enable skipping over
@@ -489,8 +543,6 @@
         @type now: L{datetime.datetime}
         @param minPriority: lowest priority level to query for
         @type minPriority: L{int}
-        @param overdue: how long before an assigned item is considered overdue
-        @type overdue: L{datetime.datetime}
         @param limit: limit on number of jobs to return
         @type limit: L{int}
 
@@ -501,7 +553,7 @@
         jobs = yield cls.query(
             txn,
             (cls.notBefore <= now).And
-            (((cls.priority >= minPriority).And(cls.assigned == None)).Or(cls.assigned < overdue)),
+            (((cls.priority >= minPriority).And(cls.assigned == None)).Or(cls.overdue < now)),
             order=(cls.assigned, cls.priority),
             ascending=False,
             forUpdate=True,
@@ -516,17 +568,20 @@
     def run(self):
         """
         Run this job item by finding the appropriate work item class and
-        running that.
+        running that, with appropriate locking.
         """
 
-        # TODO: Move group into the JOB table.
-        # Do a select * where group = X for update nowait to lock
-        # all rows in the group - on exception raise JobFailed
-        # with a rollback to allow the job to be re-assigned to a later
-        # date.
         workItem = yield self.workItem()
         if workItem is not None:
+
+            # First we lock the L{WorkItem}
+            locked = yield workItem.runlock()
+            if not locked:
+                raise JobRunningError()
+
             try:
+                # Run in three steps, allowing for before/after hooks that sub-classes
+                # may override
                 okToGo = yield workItem.beforeWork()
                 if okToGo:
                     yield workItem.doWork()
@@ -541,7 +596,8 @@
                 raise JobFailedError(e)
 
         try:
-            # Once the work is done we delete ourselves
+            # Once the work is done we delete ourselves - NB this must be the last thing done
+            # to ensure the L{JobItem} row is not locked for very long.
             yield self.delete()
         except NoSuchRecord:
             # The record has already been removed
@@ -549,7 +605,23 @@
 
 
     @inlineCallbacks
+    def isRunning(self):
+        """
+        Return L{True} if the job is currently running (its L{WorkItem} is locked).
+        """
+        workItem = yield self.workItem()
+        if workItem is not None:
+            locked = yield workItem.trylock()
+            returnValue(not locked)
+        else:
+            returnValue(False)
+
+
+    @inlineCallbacks
     def workItem(self):
+        """
+        Return the L{WorkItem} corresponding to this L{JobItem}.
+        """
         workItemClass = self.workItemForType(self.workType)
         workItems = yield workItemClass.loadForJob(
             self.transaction, self.jobID
@@ -559,6 +631,12 @@
 
     @classmethod
     def workItemForType(cls, workType):
+        """
+        Return the class of the L{WorkItem} associated with this L{JobItem}.
+
+        @param workType: the name of the L{WorkItem}'s table
+        @type workType: L{str}
+        """
         if cls._workTypeMap is None:
             cls.workTypes()
         return cls._workTypeMap[workType]
@@ -567,6 +645,8 @@
     @classmethod
     def workTypes(cls):
         """
+        Map all L{WorkItem} sub-classes table names to the class type.
+
         @return: All of the work item types.
         @rtype: iterable of L{WorkItem} subclasses
         """
@@ -651,7 +731,7 @@
 
 class JobDescriptorArg(Argument):
     """
-    Comma-separated.
+    Comma-separated representation of an L{JobDescriptor} for AMP-serialization.
     """
     def toString(self, inObject):
         return ",".join(map(str, inObject))
@@ -826,6 +906,30 @@
 
 
     @inlineCallbacks
+    def runlock(self):
+        """
+        Used to lock an L{WorkItem} before it is run. The L{WorkItem}'s row MUST be
+        locked via SELECT FOR UPDATE to ensure the job queue knows it is being worked
+        on so that it can detect when an overdue job needs to be restarted or not.
+
+        Note that the locking used here may cause deadlocks if not done in the correct
+        order. In particular anything that might cause locks across multiple LWorkItem}s,
+        such as group locks, multi-row locks, etc, MUST be done first.
+
+        @return: an L{Deferred} that fires with L{True} if the L{WorkItem} was locked,
+            L{False} if not.
+        @rtype: L{Deferred}
+        """
+
+        # Do the group lock first since this can impact multiple rows and thus could
+        # cause deadlocks if done in the wrong order
+
+        # Row level lock on this item
+        locked = yield self.trylock(self.group)
+        returnValue(locked)
+
+
+    @inlineCallbacks
     def beforeWork(self):
         """
         A hook that gets called before the L{WorkItem} does its real work. This can be used
@@ -836,18 +940,6 @@
             should continue, L{False} if it should be skipped without error.
         @rtype: L{Deferred}
         """
-        if self.group is not None:
-            try:
-                yield NamedLock.acquire(self.transaction, self.group)
-            except Exception as e:
-                log.error(
-                    "JobItem: {jobid}, WorkItem: {workid} lock failed: {exc}",
-                    jobid=self.jobID,
-                    workid=self.workID,
-                    exc=e,
-                )
-                raise JobFailedError(e)
-
         try:
             # Work item is deleted before doing work - but someone else may have
             # done it whilst we waited on the lock so handle that by simply
@@ -942,7 +1034,7 @@
     @inlineCallbacks
     def beforeWork(self):
         """
-        No need to lock - for safety just delete any others.
+        For safety just delete any others.
         """
 
         # Delete all other work items
@@ -1629,7 +1721,7 @@
     getpid = staticmethod(getpid)
 
     queuePollInterval = 0.1             # How often to poll for new work
-    queueOrphanTimeout = 5.0 * 60.0     # How long before assigned work is possibly orphaned
+    queueOverdueTimeout = 5.0 * 60.0    # How long before assigned work is possibly overdue
 
     overloadLevel = 95          # Percentage load level above which job queue processing stops
     highPriorityLevel = 80      # Percentage load level above which only high priority jobs are processed
@@ -1802,21 +1894,39 @@
             # TODO: here is where we should iterate over the unlocked items
             # that are due, ordered by priority, notBefore etc
             nowTime = datetime.utcfromtimestamp(self.reactor.seconds())
-            orphanTime = nowTime - timedelta(seconds=self.queueOrphanTimeout)
 
             txn = self.transactionFactory(label="jobqueue.workCheck")
             try:
-                nextJob = yield JobItem.nextjob(txn, nowTime, minPriority, orphanTime)
+                nextJob = yield JobItem.nextjob(txn, nowTime, minPriority)
                 if nextJob is None:
                     break
 
-                # If it is now assigned but not earlier than the orphan time, ignore as this may have
-                # been returned after another txn just assigned it
-                if nextJob.assigned is not None and nextJob.assigned > orphanTime:
-                    continue
+                if nextJob.assigned is not None:
+                    if nextJob.overdue > nowTime:
+                        # If it is now assigned but not overdue, ignore as this may have
+                        # been returned after another txn just assigned it
+                        continue
+                    else:
+                        # It is overdue - check to see whether the work item is currently locked - if so no
+                        # need to re-assign
+                        running = yield nextJob.isRunning()
+                        if running:
+                            # Change the overdue to further in the future whilst we wait for
+                            # the running job to complete
+                            yield nextJob.bumpOverdue(self.queueOverdueTimeout)
+                            log.debug(
+                                "workCheck: bumped overdue timeout on jobid={jobid}",
+                                jobid=nextJob.jobID,
+                            )
+                            continue
+                        else:
+                            log.debug(
+                                "workCheck: overdue re-assignment for jobid={jobid}",
+                                jobid=nextJob.jobID,
+                            )
 
                 # Always assign as a new job even when it is an orphan
-                yield nextJob.assign(nowTime)
+                yield nextJob.assign(nowTime, self.queueOverdueTimeout)
                 loopCounter += 1
 
             except Exception as e:

Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/test/test_jobqueue.py	2014-06-16 20:09:15 UTC (rev 13639)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py	2014-06-16 20:21:42 UTC (rev 13640)
@@ -32,7 +32,7 @@
 from twisted.protocols.amp import Command, AMP, Integer
 from twisted.application.service import Service, MultiService
 
-from twext.enterprise.dal.syntax import SchemaSyntax
+from twext.enterprise.dal.syntax import SchemaSyntax, Delete
 from twext.enterprise.dal.record import fromTable
 from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
 from twext.enterprise.fixtures import buildConnectionPool
@@ -206,6 +206,7 @@
       WEIGHT      integer default 0,
       NOT_BEFORE  timestamp not null,
       ASSIGNED    timestamp default null,
+      OVERDUE     timestamp default null,
       FAILED      integer default 0
     );
     """
@@ -219,6 +220,24 @@
       A integer, B integer,
       DELETE_ON_LOAD integer default 0
     );
+    create table DUMMY_WORK_SINGLETON_ITEM (
+      WORK_ID integer primary key,
+      JOB_ID integer references JOB,
+      A integer, B integer,
+      DELETE_ON_LOAD integer default 0
+    );
+    create table DUMMY_WORK_PAUSE_ITEM (
+      WORK_ID integer primary key,
+      JOB_ID integer references JOB,
+      A integer, B integer,
+      DELETE_ON_LOAD integer default 0
+    );
+    create table AGGREGATOR_WORK_ITEM (
+      WORK_ID integer primary key,
+      JOB_ID integer references JOB,
+      A integer, B integer,
+      DELETE_ON_LOAD integer default 0
+    );
     """
 )
 
@@ -227,13 +246,24 @@
 
     dropSQL = [
         "drop table {name} cascade".format(name=table)
-        for table in ("DUMMY_WORK_ITEM",)
+        for table in (
+            "DUMMY_WORK_ITEM",
+            "DUMMY_WORK_SINGLETON_ITEM",
+            "DUMMY_WORK_PAUSE_ITEM",
+            "AGGREGATOR_WORK_ITEM"
+        )
     ] + ["delete from job"]
 except SkipTest as e:
     DummyWorkItemTable = object
+    DummyWorkSingletonItemTable = object
+    DummyWorkPauseItemTable = object
+    AggregatorWorkItemTable = object
     skip = e
 else:
     DummyWorkItemTable = fromTable(schema.DUMMY_WORK_ITEM)
+    DummyWorkSingletonItemTable = fromTable(schema.DUMMY_WORK_SINGLETON_ITEM)
+    DummyWorkPauseItemTable = fromTable(schema.DUMMY_WORK_PAUSE_ITEM)
+    AggregatorWorkItemTable = fromTable(schema.AGGREGATOR_WORK_ITEM)
     skip = False
 
 
@@ -271,7 +301,7 @@
 
 
 
-class DummyWorkSingletonItem(SingletonWorkItem, DummyWorkItemTable):
+class DummyWorkSingletonItem(SingletonWorkItem, DummyWorkSingletonItemTable):
     """
     Sample L{SingletonWorkItem} subclass that adds two integers together and stores them
     in another table.
@@ -287,6 +317,42 @@
 
 
 
+class DummyWorkPauseItem(WorkItem, DummyWorkPauseItemTable):
+    """
+    Sample L{WorkItem} subclass that pauses until a Deferred is fired.
+    """
+
+    workStarted = None
+    unpauseWork = None
+
+    def doWork(self):
+        self.workStarted.callback(None)
+        return self.unpauseWork
+
+
+
+class AggregatorWorkItem(WorkItem, AggregatorWorkItemTable):
+    """
+    Sample L{WorkItem} subclass that deletes others with the same
+    value and than pauses for a bit.
+    """
+
+    group = property(lambda self: (self.table.B == self.b))
+
+    @inlineCallbacks
+    def doWork(self):
+        # Delete the work items we match
+        yield Delete(
+            From=self.table,
+            Where=(self.table.A == self.a)
+        ).on(self.transaction)
+
+        d = Deferred()
+        reactor.callLater(2.0, lambda: d.callback(None))
+        yield d
+
+
+
 class AMPTests(TestCase):
     """
     Tests for L{AMP} faithfully relaying ids across the wire.
@@ -396,7 +462,7 @@
     @inlineCallbacks
     def test_assign(self):
         """
-        L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
+        L{JobItem.assign} will mark a job as assigned.
         """
         dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
         yield self._enqueue(dbpool, 1, 2)
@@ -412,7 +478,7 @@
         @inlineCallbacks
         def assignJob(txn):
             job = yield JobItem.load(txn, jobs[0].jobID)
-            yield job.assign(datetime.datetime.utcnow())
+            yield job.assign(datetime.datetime.utcnow(), PeerConnectionPool.queueOverdueTimeout)
         yield inTransaction(dbpool.connection, assignJob)
 
         jobs = yield inTransaction(dbpool.connection, checkJob)
@@ -423,7 +489,7 @@
     @inlineCallbacks
     def test_nextjob(self):
         """
-        L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
+        L{JobItem.nextjob} returns the correct job based on priority.
         """
 
         dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
@@ -432,7 +498,7 @@
         # Empty job queue
         @inlineCallbacks
         def _next(txn, priority=WORK_PRIORITY_LOW):
-            job = yield JobItem.nextjob(txn, now, priority, now - datetime.timedelta(seconds=PeerConnectionPool.queueOrphanTimeout))
+            job = yield JobItem.nextjob(txn, now, priority)
             if job is not None:
                 work = yield job.workItem()
             else:
@@ -459,7 +525,7 @@
         @inlineCallbacks
         def assignJob(txn, when=None):
             assignee = yield JobItem.load(txn, assignID)
-            yield assignee.assign(now if when is None else when)
+            yield assignee.assign(now if when is None else when, PeerConnectionPool.queueOverdueTimeout)
         yield inTransaction(dbpool.connection, assignJob)
         job, work = yield inTransaction(dbpool.connection, _next)
         self.assertTrue(job is None)
@@ -1184,7 +1250,201 @@
         self.assertEquals(DummyWorkItem.results, {})
 
 
+    @inlineCallbacks
+    def test_locked(self):
+        """
+        L{JobItem.run} locks the work item.
+        """
 
+        DummyWorkPauseItem.workStarted = Deferred()
+        DummyWorkPauseItem.unpauseWork = Deferred()
+
+        @transactionally(self.store.newTransaction)
+        def _enqueue(txn):
+            return txn.enqueue(
+                DummyWorkPauseItem, a=30, b=40, workID=1
+            )
+        yield _enqueue
+
+        # Make sure we have one JOB and one DUMMY_WORK_ITEM
+        def checkJob(txn):
+            return JobItem.all(txn)
+
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 1)
+
+        yield DummyWorkPauseItem.workStarted
+
+        @transactionally(self.store.newTransaction)
+        def _trylock(txn):
+            job = yield JobItem.load(txn, jobs[0].jobID)
+            work = yield job.workItem()
+            locked = yield work.trylock()
+            self.assertFalse(locked)
+        yield _trylock
+
+        DummyWorkPauseItem.unpauseWork.callback(None)
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 0)
+
+
+    @inlineCallbacks
+    def test_overdue(self):
+        """
+        L{JobItem.run} locks the work item.
+        """
+
+        # Patch JobItem.assign and JobItem.bumpOverdue to track how many times
+        # they are called.
+        assigned = [0]
+        _oldAssign = JobItem.assign
+        def _newAssign(self, when, overdue):
+            assigned[0] += 1
+            return _oldAssign(self, when, 1)
+        self.patch(JobItem, "assign", _newAssign)
+
+        bumped = [0]
+        _oldBumped = JobItem.bumpOverdue
+        def _newBump(self, bump):
+            bumped[0] += 1
+            return _oldBumped(self, 100)
+        self.patch(JobItem, "bumpOverdue", _newBump)
+
+        DummyWorkPauseItem.workStarted = Deferred()
+        DummyWorkPauseItem.unpauseWork = Deferred()
+
+        @transactionally(self.store.newTransaction)
+        def _enqueue(txn):
+            return txn.enqueue(
+                DummyWorkPauseItem, a=30, b=40, workID=1
+            )
+        yield _enqueue
+
+        # Make sure we have one JOB and one DUMMY_WORK_ITEM
+        def checkJob(txn):
+            return JobItem.all(txn)
+
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(assigned[0] == 0)
+        self.assertTrue(bumped[0] == 0)
+
+        yield DummyWorkPauseItem.workStarted
+
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(assigned[0] == 1)
+        self.assertTrue(bumped[0] == 0)
+
+        d = Deferred()
+        reactor.callLater(2, lambda: d.callback(None))
+        yield d
+
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(assigned[0] == 1)
+        if bumped[0] != 1:
+            pass
+        self.assertTrue(bumped[0] == 1)
+
+        DummyWorkPauseItem.unpauseWork.callback(None)
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 0)
+        self.assertTrue(assigned[0] == 1)
+        self.assertTrue(bumped[0] == 1)
+
+
+    @inlineCallbacks
+    def test_aggregator_lock(self):
+        """
+        L{JobItem.run} fails an aggregated work item and then ignores it.
+        """
+
+        # Patch JobItem.assign and JobItem.bumpOverdue to track how many times
+        # they are called.
+        failed = [0]
+        _oldFailed = JobItem.failedToRun
+        def _newFailed(self, delay):
+            failed[0] += 1
+            return _oldFailed(self, delay)
+        self.patch(JobItem, "failedToRun", _newFailed)
+
+        @transactionally(self.store.newTransaction)
+        def _enqueue1(txn):
+            return txn.enqueue(
+                AggregatorWorkItem, a=1, b=1, workID=1
+            )
+        yield _enqueue1
+
+        @transactionally(self.store.newTransaction)
+        def _enqueue2(txn):
+            return txn.enqueue(
+                AggregatorWorkItem, a=1, b=2, workID=2
+            )
+        yield _enqueue2
+
+        # Make sure we have one JOB and one DUMMY_WORK_ITEM
+        def checkJob(txn):
+            return JobItem.all(txn)
+
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 2)
+
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertEqual(len(jobs), 0)
+        self.assertEqual(failed[0], 1)
+
+
+    @inlineCallbacks
+    def test_aggregator_no_deadlock(self):
+        """
+        L{JobItem.run} fails an aggregated work item and then ignores it.
+        """
+
+        # Patch JobItem.assign and JobItem.bumpOverdue to track how many times
+        # they are called.
+        failed = [0]
+        _oldFailed = JobItem.failedToRun
+        def _newFailed(self, delay):
+            failed[0] += 1
+            return _oldFailed(self, delay)
+        self.patch(JobItem, "failedToRun", _newFailed)
+
+        @transactionally(self.store.newTransaction)
+        def _enqueue1(txn):
+            return txn.enqueue(
+                AggregatorWorkItem, a=1, b=1, workID=1
+            )
+        yield _enqueue1
+
+        @transactionally(self.store.newTransaction)
+        def _enqueue2(txn):
+            return txn.enqueue(
+                AggregatorWorkItem, a=1, b=1, workID=2
+            )
+        yield _enqueue2
+
+        # Make sure we have one JOB and one DUMMY_WORK_ITEM
+        def checkJob(txn):
+            return JobItem.all(txn)
+
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 2)
+
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 0)
+        self.assertEqual(failed[0], 1)
+
+
+
 class DummyProposal(object):
 
     def __init__(self, *ignored):
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140616/0761133b/attachment-0001.html>


More information about the calendarserver-changes mailing list