[CalendarServer-changes] [15035] twext/trunk/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Fri Aug 7 13:06:34 PDT 2015

Revision: 15035
Author:   cdaboo at apple.com
Date:     2015-08-07 13:06:34 -0700 (Fri, 07 Aug 2015)
Log Message:
Refactor the jobqueue into a separate package and remove some redundant pieces.

Modified Paths:

Added Paths:

Removed Paths:

Modified: twext/trunk/twext/enterprise/ienterprise.py
--- twext/trunk/twext/enterprise/ienterprise.py	2015-08-06 21:04:52 UTC (rev 15034)
+++ twext/trunk/twext/enterprise/ienterprise.py	2015-08-07 20:06:34 UTC (rev 15035)
@@ -282,26 +282,12 @@
         @param workItemType: the type of work item to create.
         @type workItemType: L{type}, specifically, a subtype of L{WorkItem
-            <twext.enterprise.jobqueue.WorkItem>}
+            <twext.enterprise.jobs.workitem.WorkItem>}
         @param kw: The keyword parameters are relayed to C{workItemType.create}
             to create an appropriately initialized item.
         @return: a work proposal that allows tracking of the various phases of
             completion of the work item.
-        @rtype: L{twext.enterprise.jobqueue.WorkItem}
+        @rtype: L{twext.enterprise.jobs.workitem.WorkItem}
-    def callWithNewProposals(self, callback):
-        """
-        Tells the IQueuer to call a callback method whenever a new WorkProposal
-        is created.
-        @param callback: a callable which accepts a single parameter, a
-            L{WorkProposal}
-        """
-    def transferProposalCallbacks(self, newQueuer):
-        """
-        Transfer the registered callbacks to the new queuer.
-        """

Deleted: twext/trunk/twext/enterprise/jobqueue.py
--- twext/trunk/twext/enterprise/jobqueue.py	2015-08-06 21:04:52 UTC (rev 15034)
+++ twext/trunk/twext/enterprise/jobqueue.py	2015-08-07 20:06:34 UTC (rev 15035)
@@ -1,2532 +0,0 @@
-# -*- test-case-name: twext.enterprise.test.test_queue -*-
-# Copyright (c) 2012-2015 Apple Inc. All rights reserved.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-# http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# See the License for the specific language governing permissions and
-# limitations under the License.
-L{twext.enterprise.jobqueue} is an U{eventually consistent
-<https://en.wikipedia.org/wiki/Eventual_consistency>} task-queuing system for
-use by applications with multiple front-end servers talking to a single
-database instance, that want to defer and parallelize work that involves
-storing the results of computation.
-By enqueuing with L{twisted.enterprise.jobqueue}, you may guarantee that the work
-will I{eventually} be done, and reliably commit to doing it in the future.
-To pick a hypothetical example, let's say that you have a store which wants to
-issue a promotional coupon based on a customer loyalty program, in response to
-an administrator clicking on a button.  Determining the list of customers to
-send the coupon to is quick: a simple query will get you all their names.
-However, analyzing each user's historical purchase data is (A) time consuming
-and (B) relatively isolated, so it would be good to do that in parallel, and it
-would also be acceptable to have that happen at a later time, outside the
-critical path.
-Such an application might be implemented with this queuing system like so::
-    from twext.enterprise.jobqueue import WorkItem, queueFromTransaction
-    from twext.enterprise.dal.parseschema import addSQLToSchema
-    from twext.enterprise.dal.syntax import SchemaSyntax
-    schemaModel = Schema()
-    addSQLToSchema('''
-        create table CUSTOMER (NAME varchar(255), ID integer primary key);
-        create table PRODUCT (NAME varchar(255), ID integer primary key);
-        create table PURCHASE (NAME varchar(255), WHEN timestamp,
-                               CUSTOMER_ID integer references CUSTOMER,
-                               PRODUCT_ID integer references PRODUCT;
-        create table COUPON_WORK (WORK_ID integer primary key,
-                                  CUSTOMER_ID integer references CUSTOMER);
-        create table COUPON (ID integer primary key,
-                            CUSTOMER_ID integer references customer,
-                            AMOUNT integer);
-    ''')
-    schema = SchemaSyntax(schemaModel)
-    class Coupon(Record, fromTable(schema.COUPON_WORK)):
-        pass
-    class CouponWork(WorkItem, fromTable(schema.COUPON_WORK)):
-        @inlineCallbacks
-        def doWork(self):
-            purchases = yield Select(schema.PURCHASE,
-                                     Where=schema.PURCHASE.CUSTOMER_ID
-                                     == self.customerID).on(self.transaction)
-            couponAmount = yield doSomeMathThatTakesAWhile(purchases)
-            yield Coupon.create(customerID=self.customerID,
-                                amount=couponAmount)
-    @inlineCallbacks
-    def makeSomeCoupons(txn):
-        # Note, txn was started before, will be committed later...
-        for customerID in (yield Select([schema.CUSTOMER.CUSTOMER_ID],
-                                        From=schema.CUSTOMER).on(txn)):
-            # queuer is a provider of IQueuer, of which there are several
-            # implementations in this module.
-            queuer.enqueueWork(txn, CouponWork, customerID=customerID)
-More details:
-    Terminology:
-        node: a host in a multi-host setup. Each node will contain a
-            "controller" process and a set of "worker" processes.
-            Nodes communicate with each other to allow load balancing
-            of jobs across the entire cluster.
-        controller: a process running in a node that is in charge of
-            managing "workers" as well as connections to other nodes. The
-            controller polls the job queue and dispatches outstanding jobs
-            to its "workers".
-        worker: a process running in a node that is responsible for
-            executing jobs sent to it by the "controller". It also
-            handles enqueuing of jobs as dictated by operations it
-            is doing.
-    A controller has a:
-    L{WorkerConnectionPool}: this maintains a list of worker processes that
-        have connected to the controller over AMP. It is responsible for
-        dispatching jobs that are to be performed locally on that node.
-        The worker process is identified by an L{ConnectionFromWorker}
-        object which maintains the AMP connection. The
-        L{ConnectionFromWorker} tracks the load on its workers so that
-        jobs can be distributed evenly or halted if the node is too busy.
-    L{PeerConnectionPool}: this is an AMP based service that connects a node
-        to all the other nodes in the cluster. It also runs the main job
-        queue loop to dispatch enqueued work when it becomes due. The controller
-        maintains a list of other nodes via L{ConnectionFromPeerNode} objects,
-        which maintain the AMP connections. L{ConnectionFromPeerNode} can
-        report its load to others, and can receive jobs which it must perform
-        locally (via a dispatch to a worker).
-    A worker process has:
-    L{ConnectionFromController}: an AMP connection to the controller which
-        is managed by an L{ConnectionFromWorker} object in the controller. The
-        controller will dispatch jobs to the worker using this connection. The
-        worker uses this object to enqueue jobs which the controller will pick up
-        at the appropriate time in its job queue polling.
-from functools import wraps
-from datetime import datetime, timedelta
-from collections import namedtuple
-from zope.interface import implements
-from twisted.application.service import MultiService
-from twisted.internet.protocol import Factory
-from twisted.internet.defer import (
-    inlineCallbacks, returnValue, Deferred, passthru, succeed
-from twisted.internet.endpoints import TCP4ClientEndpoint
-from twisted.internet.error import AlreadyCalled, AlreadyCancelled
-from twisted.protocols.amp import AMP, Command, Integer, String, Argument
-from twisted.python.reflect import qual
-from twext.python.log import Logger
-from twext.enterprise.dal.syntax import (
-    SchemaSyntax, Lock, NamedValue
-from twext.enterprise.dal.model import ProcedureCall, Sequence
-from twext.enterprise.dal.record import Record, fromTable, NoSuchRecord, \
-    SerializableRecord
-from twisted.python.failure import Failure
-from twext.enterprise.dal.model import Table, Schema, SQLType, Constraint
-from twisted.internet.endpoints import TCP4ServerEndpoint
-from twext.enterprise.ienterprise import IQueuer, ORACLE_DIALECT
-from zope.interface.interface import Interface
-import collections
-import time
-log = Logger()
-class _IJobPerformer(Interface):
-    """
-    An object that can perform work.
-    Internal interface; implemented by several classes here since work has to
-    (in the worst case) pass from worker->controller->controller->worker.
-    """
-    def performJob(job):  # @NoSelf
-        """
-        @param job: Details about the job to perform.
-        @type job: L{JobDescriptor}
-        @return: a L{Deferred} firing with an empty dictionary when the work is
-            complete.
-        @rtype: L{Deferred} firing L{dict}
-        """
-def makeNodeSchema(inSchema):
-    """
-    Create a self-contained schema for L{NodeInfo} to use, in C{inSchema}.
-    @param inSchema: a L{Schema} to add the node-info table to.
-    @type inSchema: L{Schema}
-    @return: a schema with just the one table.
-    """
-    # Initializing this duplicate schema avoids a circular dependency, but this
-    # should really be accomplished with independent schema objects that the
-    # transaction is made aware of somehow.
-    NodeTable = Table(inSchema, "NODE_INFO")
-    NodeTable.addColumn("HOSTNAME", SQLType("varchar", 255))
-    NodeTable.addColumn("PID", SQLType("integer", None))
-    NodeTable.addColumn("PORT", SQLType("integer", None))
-    NodeTable.addColumn("TIME", SQLType("timestamp", None)).setDefaultValue(
-        # Note: in the real data structure, this is actually a not-cleaned-up
-        # sqlparse internal data structure, but it *should* look closer to
-        # this.
-        ProcedureCall("timezone", ["UTC", NamedValue("CURRENT_TIMESTAMP")])
-    )
-    for column in NodeTable.columns:
-        NodeTable.tableConstraint(Constraint.NOT_NULL, [column.name])
-    NodeTable.primaryKey = [
-        NodeTable.columnNamed("HOSTNAME"),
-        NodeTable.columnNamed("PORT"),
-    ]
-    return inSchema
-NodeInfoSchema = SchemaSyntax(makeNodeSchema(Schema(__file__)))
-def makeJobSchema(inSchema):
-    """
-    Create a self-contained schema for L{JobInfo} to use, in C{inSchema}.
-    @param inSchema: a L{Schema} to add the node-info table to.
-    @type inSchema: L{Schema}
-    @return: a schema with just the one table.
-    """
-    # Initializing this duplicate schema avoids a circular dependency, but this
-    # should really be accomplished with independent schema objects that the
-    # transaction is made aware of somehow.
-    JobTable = Table(inSchema, "JOB")
-    JobTable.addColumn("JOB_ID", SQLType("integer", None), default=Sequence(inSchema, "JOB_SEQ"), notNull=True, primaryKey=True)
-    JobTable.addColumn("WORK_TYPE", SQLType("varchar", 255), notNull=True)
-    JobTable.addColumn("PRIORITY", SQLType("integer", 0), default=0)
-    JobTable.addColumn("WEIGHT", SQLType("integer", 0), default=0)
-    JobTable.addColumn("NOT_BEFORE", SQLType("timestamp", None), notNull=True)
-    JobTable.addColumn("ASSIGNED", SQLType("timestamp", None), default=None)
-    JobTable.addColumn("OVERDUE", SQLType("timestamp", None), default=None)
-    JobTable.addColumn("FAILED", SQLType("integer", 0), default=0)
-    JobTable.addColumn("PAUSE", SQLType("integer", 0), default=0)
-    return inSchema
-JobInfoSchema = SchemaSyntax(makeJobSchema(Schema(__file__)))
- at inlineCallbacks
-def inTransaction(transactionCreator, operation, label="jobqueue.inTransaction", **kwargs):
-    """
-    Perform the given operation in a transaction, committing or aborting as
-    required.
-    @param transactionCreator: a 0-arg callable that returns an
-        L{IAsyncTransaction}
-    @param operation: a 1-arg callable that takes an L{IAsyncTransaction} and
-        returns a value.
-    @param label: label to be used with the transaction.
-    @return: a L{Deferred} that fires with C{operation}'s result or fails with
-        its error, unless there is an error creating, aborting or committing
-        the transaction.
-    """
-    txn = transactionCreator(label=label)
-    try:
-        result = yield operation(txn, **kwargs)
-    except:
-        f = Failure()
-        yield txn.abort()
-        returnValue(f)
-    else:
-        yield txn.commit()
-        returnValue(result)
-def astimestamp(v):
-    """
-    Convert the given datetime to a POSIX timestamp.
-    """
-    return (v - datetime.utcfromtimestamp(0)).total_seconds()
-class NodeInfo(Record, fromTable(NodeInfoSchema.NODE_INFO)):
-    """
-    A L{NodeInfo} is information about a currently-active Node process.
-    """
-    def endpoint(self, reactor):
-        """
-        Create an L{IStreamServerEndpoint} that will talk to the node process
-        that is described by this L{NodeInfo}.
-        @return: an endpoint that will connect to this host.
-        @rtype: L{IStreamServerEndpoint}
-        """
-        return TCP4ClientEndpoint(reactor, self.hostname, self.port)
-def abstract(thunk):
-    """
-    The decorated function is abstract.
-    @note: only methods are currently supported.
-    """
-    @classmethod
-    @wraps(thunk)
-    def inner(cls, *a, **k):
-        raise NotImplementedError(
-            qual(cls) + " does not implement " + thunk.func_name
-        )
-    return inner
-class JobFailedError(Exception):
-    """
-    A job failed to run - we need to be smart about clean up.
-    """
-    def __init__(self, ex):
-        self._ex = ex
-class JobTemporaryError(Exception):
-    """
-    A job failed to run due to a temporary failure. We will get the job to run again after the specified
-    interval (with a built-in back-off based on the number of failures also applied).
-    """
-    def __init__(self, delay):
-        """
-        @param delay: amount of time in seconds before it should run again
-        @type delay: L{int}
-        """
-        self.delay = delay
-class JobRunningError(Exception):
-    """
-    A job is already running.
-    """
-    pass
-class JobItem(Record, fromTable(JobInfoSchema.JOB)):
-    """
-    @DynamicAttrs
-    An item in the job table. This is typically not directly used by code
-    creating work items, but rather is used for internal book keeping of jobs
-    associated with work items.
-    The JOB table has some important columns that determine how a job is being scheduled:
-    NOT_BEFORE - this is a timestamp indicating when the job is expected to run. It will not
-    run before this time, but may run quite some time after (if the service is busy).
-    ASSIGNED - this is a timestamp that is initially NULL but set when the job processing loop
-    assigns the job to a child process to be executed. Thus, if the value is not NULL, then the
-    job is (probably) being executed. The child process is supposed to delete the L{JobItem}
-    when it is done, however if the child dies without executing the job, then the job
-    processing loop needs to detect it.
-    OVERDUE - this is a timestamp initially set when an L{JobItem} is assigned. It represents
-    a point in the future when the job is expected to be finished. The job processing loop skips
-    jobs that have a non-NULL ASSIGNED value and whose OVERDUE value has not been passed. If
-    OVERDUE is in the past, then the job processing loop checks to see if the job is still
-    running - which is determined by whether a row lock exists on the work item (see
-    L{isRunning}. If the job is still running then OVERDUE is bumped up to a new point in the
-    future, if it is not still running the job is marked as failed - which will reschedule it.
-    FAILED - a count of the number of times a job has failed or had its overdue count bumped.
-    The above behavior depends on some important locking behavior: when an L{JobItem} is run,
-    it locks the L{WorkItem} row corresponding to the job (it may lock other associated
-    rows - e.g., other L{WorkItem}'s in the same group). It does not lock the L{JobItem}
-    row corresponding to the job because the job processing loop may need to update the
-    OVERDUE value of that row if the work takes a long time to complete.
-    """
-    _workTypes = None
-    _workTypeMap = None
-    lockRescheduleInterval = 60     # When a job can't run because of a lock, reschedule it this number of seconds in the future
-    failureRescheduleInterval = 60  # When a job fails, reschedule it this number of seconds in the future
-    def descriptor(self):
-        return JobDescriptor(self.jobID, self.weight, self.workType)
-    def assign(self, when, overdue):
-        """
-        Mark this job as assigned to a worker by setting the assigned column to the current,
-        or provided, timestamp. Also set the overdue value to help determine if a job is orphaned.
-        @param when: current timestamp
-        @type when: L{datetime.datetime}
-        @param overdue: number of seconds after assignment that the job will be considered overdue
-        @type overdue: L{int}
-        """
-        return self.update(assigned=when, overdue=when + timedelta(seconds=overdue))
-    def bumpOverdue(self, bump):
-        """
-        Increment the overdue value by the specified number of seconds. Used when an overdue job
-        is still running in a child process but the job processing loop has detected it as overdue.
-        @param bump: number of seconds to increment overdue by
-        @type bump: L{int}
-        """
-        return self.update(overdue=self.overdue + timedelta(seconds=bump))
-    def failedToRun(self, locked=False, delay=None):
-        """
-        The attempt to run the job failed. Leave it in the queue, but mark it
-        as unassigned, bump the failure count and set to run at some point in
-        the future.
-        @param lock: indicates if the failure was due to a lock timeout.
-        @type lock: L{bool}
-        @param delay: how long before the job is run again, or C{None} for a default
-            staggered delay behavior.
-        @type delay: L{int}
-        """
-        # notBefore is set to the chosen interval multiplied by the failure count, which
-        # results in an incremental backoff for failures
-        if delay is None:
-            delay = self.lockRescheduleInterval if locked else self.failureRescheduleInterval
-            delay *= (self.failed + 1)
-        return self.update(
-            assigned=None,
-            overdue=None,
-            failed=self.failed + (0 if locked else 1),
-            notBefore=datetime.utcnow() + timedelta(seconds=delay)
-        )
-    def pauseIt(self, pause=False):
-        """
-        Pause the L{JobItem} leaving all other attributes the same. The job processing loop
-        will skip paused items.
-        @param pause: indicates whether the job should be paused.
-        @type pause: L{bool}
-        @param delay: how long before the job is run again, or C{None} for a default
-            staggered delay behavior.
-        @type delay: L{int}
-        """
-        return self.update(pause=pause)
-    @classmethod
-    @inlineCallbacks
-    def ultimatelyPerform(cls, txnFactory, jobID):
-        """
-        Eventually, after routing the job to the appropriate place, somebody
-        actually has to I{do} it. This method basically calls L{JobItem.run}
-        but it does a bunch of "booking" to track the transaction and log failures
-        and timing information.
-        @param txnFactory: a 0- or 1-argument callable that creates an
-            L{IAsyncTransaction}
-        @type txnFactory: L{callable}
-        @param jobID: the ID of the job to be performed
-        @type jobID: L{int}
-        @return: a L{Deferred} which fires with C{None} when the job has been
-            performed, or fails if the job can't be performed.
-        """
-        t = time.time()
-        def _tm():
-            return "{:.3f}".format(1000 * (time.time() - t))
-        def _overtm(nb):
-            return "{:.0f}".format(1000 * (t - astimestamp(nb)))
-        # Failed job clean-up
-        def _failureCleanUp(delay=None):
-            @inlineCallbacks
-            def _cleanUp2(txn2):
-                try:
-                    job = yield cls.load(txn2, jobID)
-                except NoSuchRecord:
-                    log.debug(
-                        "JobItem: {jobid} disappeared t={tm}",
-                        jobid=jobID,
-                        tm=_tm(),
-                    )
-                else:
-                    log.debug(
-                        "JobItem: {jobid} marking as failed {count} t={tm}",
-                        jobid=jobID,
-                        count=job.failed + 1,
-                        tm=_tm(),
-                    )
-                    yield job.failedToRun(locked=isinstance(e, JobRunningError), delay=delay)
-            return inTransaction(txnFactory, _cleanUp2, "ultimatelyPerform._failureCleanUp")
-        log.debug("JobItem: {jobid} starting to run", jobid=jobID)
-        txn = txnFactory(label="ultimatelyPerform: {}".format(jobID))
-        try:
-            job = yield cls.load(txn, jobID)
-            if hasattr(txn, "_label"):
-                txn._label = "{} <{}>".format(txn._label, job.workType)
-            log.debug(
-                "JobItem: {jobid} loaded {work} t={tm}",
-                jobid=jobID,
-                work=job.workType,
-                tm=_tm(),
-            )
-            yield job.run()
-        except NoSuchRecord:
-            # The record has already been removed
-            yield txn.commit()
-            log.debug(
-                "JobItem: {jobid} already removed t={tm}",
-                jobid=jobID,
-                tm=_tm(),
-            )
-        except JobTemporaryError as e:
-            # Temporary failure delay with back-off
-            def _temporaryFailure():
-                return _failureCleanUp(delay=e.delay * (job.failed + 1))
-            log.debug(
-                "JobItem: {jobid} {desc} {work} t={tm}",
-                jobid=jobID,
-                desc="temporary failure #{}".format(job.failed + 1),
-                work=job.workType,
-                tm=_tm(),
-            )
-            txn.postAbort(_temporaryFailure)
-            yield txn.abort()
-        except (JobFailedError, JobRunningError) as e:
-            # Permanent failure
-            log.debug(
-                "JobItem: {jobid} {desc} {work} t={tm}",
-                jobid=jobID,
-                desc="failed" if isinstance(e, JobFailedError) else "locked",
-                work=job.workType,
-                tm=_tm(),
-            )
-            txn.postAbort(_failureCleanUp)
-            yield txn.abort()
-        except:
-            f = Failure()
-            log.error(
-                "JobItem: {jobid} unknown exception t={tm} {exc}",
-                jobid=jobID,
-                tm=_tm(),
-                exc=f,
-            )
-            yield txn.abort()
-            returnValue(f)
-        else:
-            yield txn.commit()
-            log.debug(
-                "JobItem: {jobid} completed {work} t={tm} over={over}",
-                jobid=jobID,
-                work=job.workType,
-                tm=_tm(),
-                over=_overtm(job.notBefore),
-            )
-        returnValue(None)
-    @classmethod
-    @inlineCallbacks
-    def nextjob(cls, txn, now, minPriority):
-        """
-        Find the next available job based on priority, also return any that are overdue. This
-        method uses an SQL query to find the matching jobs, and sorts based on the NOT_BEFORE
-        value and priority..
-        @param txn: the transaction to use
-        @type txn: L{IAsyncTransaction}
-        @param now: current timestamp - needed for unit tests that might use their
-            own clock.
-        @type now: L{datetime.datetime}
-        @param minPriority: lowest priority level to query for
-        @type minPriority: L{int}
-        @return: the job record
-        @rtype: L{JobItem}
-        """
-        jobs = yield cls.nextjobs(txn, now, minPriority, limit=1)
-        # Must only be one or zero
-        if jobs and len(jobs) > 1:
-            raise AssertionError("next_job() returned more than one row")
-        returnValue(jobs[0] if jobs else None)
-    @classmethod
-    @inlineCallbacks
-    def nextjobs(cls, txn, now, minPriority, limit=1):
-        """
-        Find the next available job based on priority, also return any that are overdue.
-        @param txn: the transaction to use
-        @type txn: L{IAsyncTransaction}
-        @param now: current timestamp
-        @type now: L{datetime.datetime}
-        @param minPriority: lowest priority level to query for
-        @type minPriority: L{int}
-        @param limit: limit on number of jobs to return
-        @type limit: L{int}
-        @return: the job record
-        @rtype: L{JobItem}
-        """
-        queryExpr = (cls.notBefore <= now).And(cls.priority >= minPriority).And(cls.pause == 0).And(
-            (cls.assigned == None).Or(cls.overdue < now)
-        )
-        if txn.dialect == ORACLE_DIALECT:
-            # Oracle does not support a "for update" clause with "order by". So do the
-            # "for update" as a second query right after the first. Will need to check
-            # how this might impact concurrency in a multi-host setup.
-            jobs = yield cls.query(
-                txn,
-                queryExpr,
-                order=(cls.assigned, cls.priority),
-                ascending=False,
-                limit=limit,
-            )
-            if jobs:
-                yield cls.query(
-                    txn,
-                    (cls.jobID.In([job.jobID for job in jobs])),
-                    forUpdate=True,
-                    noWait=False,
-                )
-        else:
-            jobs = yield cls.query(
-                txn,
-                queryExpr,
-                order=(cls.assigned, cls.priority),
-                ascending=False,
-                forUpdate=True,
-                noWait=False,
-                limit=limit,
-            )
-        returnValue(jobs)
-    @inlineCallbacks
-    def run(self):
-        """
-        Run this job item by finding the appropriate work item class and
-        running that, with appropriate locking.
-        """
-        workItem = yield self.workItem()
-        if workItem is not None:
-            # First we lock the L{WorkItem}
-            locked = yield workItem.runlock()
-            if not locked:
-                raise JobRunningError()
-            try:
-                # Run in three steps, allowing for before/after hooks that sub-classes
-                # may override
-                okToGo = yield workItem.beforeWork()
-                if okToGo:
-                    yield workItem.doWork()
-                    yield workItem.afterWork()
-            except Exception as e:
-                f = Failure()
-                log.error(
-                    "JobItem: {jobid}, WorkItem: {workid} failed: {exc}",
-                    jobid=self.jobID,
-                    workid=workItem.workID,
-                    exc=f,
-                )
-                if isinstance(e, JobTemporaryError):
-                    raise
-                else:
-                    raise JobFailedError(e)
-        try:
-            # Once the work is done we delete ourselves - NB this must be the last thing done
-            # to ensure the L{JobItem} row is not locked for very long.
-            yield self.delete()
-        except NoSuchRecord:
-            # The record has already been removed
-            pass
-    @inlineCallbacks
-    def isRunning(self):
-        """
-        Return L{True} if the job is currently running (its L{WorkItem} is locked).
-        """
-        workItem = yield self.workItem()
-        if workItem is not None:
-            locked = yield workItem.trylock()
-            returnValue(not locked)
-        else:
-            returnValue(False)
-    @inlineCallbacks
-    def workItem(self):
-        """
-        Return the L{WorkItem} corresponding to this L{JobItem}.
-        """
-        workItemClass = self.workItemForType(self.workType)
-        workItems = yield workItemClass.loadForJob(
-            self.transaction, self.jobID
-        )
-        returnValue(workItems[0] if len(workItems) == 1 else None)
-    @classmethod
-    def workItemForType(cls, workType):
-        """
-        Return the class of the L{WorkItem} associated with this L{JobItem}.
-        @param workType: the name of the L{WorkItem}'s table
-        @type workType: L{str}
-        """
-        if cls._workTypeMap is None:
-            cls.workTypes()
-        return cls._workTypeMap[workType]
-    @classmethod
-    def workTypes(cls):
-        """
-        Map all L{WorkItem} sub-classes table names to the class type.
-        @return: All of the work item types.
-        @rtype: iterable of L{WorkItem} subclasses
-        """
-        if cls._workTypes is None:
-            cls._workTypes = []
-            def getWorkType(subcls, appendTo):
-                if hasattr(subcls, "table"):
-                    appendTo.append(subcls)
-                else:
-                    for subsubcls in subcls.__subclasses__():
-                        getWorkType(subsubcls, appendTo)
-            getWorkType(WorkItem, cls._workTypes)
-            cls._workTypeMap = {}
-            for subcls in cls._workTypes:
-                cls._workTypeMap[subcls.workType()] = subcls
-        return cls._workTypes
-    @classmethod
-    def numberOfWorkTypes(cls):
-        return len(cls.workTypes())
-    @classmethod
-    @inlineCallbacks
-    def waitEmpty(cls, txnCreator, reactor, timeout):
-        """
-        Wait for the job queue to drain. Only use this in tests
-        that need to wait for results from jobs.
-        """
-        t = time.time()
-        while True:
-            work = yield inTransaction(txnCreator, cls.all)
-            if not work:
-                break
-            if time.time() - t > timeout:
-                returnValue(False)
-            d = Deferred()
-            reactor.callLater(0.1, lambda : d.callback(None))
-            yield d
-        returnValue(True)
-    @classmethod
-    @inlineCallbacks
-    def waitJobDone(cls, txnCreator, reactor, timeout, jobID):
-        """
-        Wait for the specified job to complete. Only use this in tests
-        that need to wait for results from jobs.
-        """
-        t = time.time()
-        while True:
-            work = yield inTransaction(txnCreator, cls.query, expr=(cls.jobID == jobID))
-            if not work:
-                break
-            if time.time() - t > timeout:
-                returnValue(False)
-            d = Deferred()
-            reactor.callLater(0.1, lambda : d.callback(None))
-            yield d
-        returnValue(True)
-    @classmethod
-    @inlineCallbacks
-    def waitWorkDone(cls, txnCreator, reactor, timeout, workTypes):
-        """
-        Wait for the specified job to complete. Only use this in tests
-        that need to wait for results from jobs.
-        """
-        t = time.time()
-        while True:
-            count = [0]
-            @inlineCallbacks
-            def _countTypes(txn):
-                for t in workTypes:
-                    work = yield t.all(txn)
-                    count[0] += len(work)
-            yield inTransaction(txnCreator, _countTypes)
-            if count[0] == 0:
-                break
-            if time.time() - t > timeout:
-                returnValue(False)
-            d = Deferred()
-            reactor.callLater(0.1, lambda : d.callback(None))
-            yield d
-        returnValue(True)
-    @classmethod
-    @inlineCallbacks
-    def histogram(cls, txn):
-        """
-        Generate a histogram of work items currently in the queue.
-        """
-        results = {}
-        now = datetime.utcnow()
-        for workItemType in cls.workTypes():
-            workType = workItemType.workType()
-            results.setdefault(workType, {
-                "queued": 0,
-                "assigned": 0,
-                "late": 0,
-                "failed": 0,
-                "completed": WorkerConnectionPool.completed.get(workType, 0),
-                "time": WorkerConnectionPool.timing.get(workType, 0.0)
-            })
-        jobs = yield cls.all(txn)
-        for job in jobs:
-            r = results[job.workType]
-            r["queued"] += 1
-            if job.assigned is not None:
-                r["assigned"] += 1
-            if job.assigned is None and job.notBefore < now:
-                r["late"] += 1
-            if job.failed:
-                r["failed"] += 1
-        returnValue(results)
-JobDescriptor = namedtuple("JobDescriptor", ["jobID", "weight", "type"])
-class JobDescriptorArg(Argument):
-    """
-    Comma-separated representation of an L{JobDescriptor} for AMP-serialization.
-    """
-    def toString(self, inObject):
-        return ",".join(map(str, inObject))
-    def fromString(self, inString):
-        return JobDescriptor(*[f(s) for f, s in zip((int, int, str,), inString.split(","))])
-# Priority for work - used to order work items in the job queue
-# Weight for work - used to schedule workers based on capacity
-WORK_WEIGHT_10 = 10
-WORK_WEIGHT_CAPACITY = 10   # Total amount of work any one worker can manage
-class WorkItem(SerializableRecord):
-    """
-    A L{WorkItem} is an item of work which may be stored in a database, then
-    executed later.
-    L{WorkItem} is an abstract class, since it is a L{Record} with no table
-    associated via L{fromTable}.  Concrete subclasses must associate a specific
-    table by inheriting like so::
-        class MyWorkItem(WorkItem, fromTable(schema.MY_TABLE)):
-    Concrete L{WorkItem}s should generally not be created directly; they are
-    both created and thereby implicitly scheduled to be executed by calling
-    L{enqueueWork <twext.enterprise.ienterprise.IQueuer.enqueueWork>} with the
-    appropriate L{WorkItem} concrete subclass.  There are different queue
-    implementations (L{PeerConnectionPool} and L{LocalQueuer}, for example), so
-    the exact timing and location of the work execution may differ.
-    L{WorkItem}s may be constrained in the ordering and timing of their
-    execution, to control concurrency and for performance reasons respectively.
-    Although all the usual database mutual-exclusion rules apply to work
-    executed in L{WorkItem.doWork}, implicit database row locking is not always
-    the best way to manage concurrency.  They have some problems, including:
-        - implicit locks are easy to accidentally acquire out of order, which
-          can lead to deadlocks
-        - implicit locks are easy to forget to acquire correctly - for example,
-          any read operation which subsequently turns into a write operation
-          must have been acquired with C{Select(..., ForUpdate=True)}, but it
-          is difficult to consistently indicate that methods which abstract out
-          read operations must pass this flag in certain cases and not others.
-        - implicit locks are held until the transaction ends, which means that
-          if expensive (long-running) queue operations share the same lock with
-          cheap (short-running) queue operations or user interactions, the
-          cheap operations all have to wait for the expensive ones to complete,
-          but continue to consume whatever database resources they were using.
-    In order to ameliorate these problems with potentially concurrent work
-    that uses the same resources, L{WorkItem} provides a database-wide mutex
-    that is automatically acquired at the beginning of the transaction and
-    released at the end.  To use it, simply L{align
-    <twext.enterprise.dal.record.Record.namingConvention>} the C{group}
-    attribute on your L{WorkItem} subclass with a column holding a string
-    (varchar).  L{WorkItem} subclasses with the same value for C{group} will
-    not execute their C{doWork} methods concurrently.  Furthermore, if the lock
-    cannot be quickly acquired, database resources associated with the
-    transaction attempting it will be released, and the transaction rolled back
-    until a future transaction I{can} can acquire it quickly.  If you do not
-    want any limits to concurrency, simply leave it set to C{None}.
-    In some applications it's possible to coalesce work together; to grab
-    multiple L{WorkItem}s in one C{doWork} transaction.  All you need to do is
-    to delete the rows which back other L{WorkItem}s from the database, and
-    they won't be processed.  Using the C{group} attribute, you can easily
-    prevent concurrency so that you can easily group these items together and
-    remove them as a set (otherwise, other workers might be attempting to
-    concurrently work on them and you'll get deletion errors).
-    However, if doing more work at once is less expensive, and you want to
-    avoid processing lots of individual rows in tiny transactions, you may also
-    delay the execution of a L{WorkItem} by setting its C{notBefore} attribute.
-    This must be backed by a database timestamp, so that processes which happen
-    to be restarting and examining the work to be done in the database don't
-    jump the gun and do it too early.
-    @cvar workID: the unique identifier (primary key) for items of this type.
-        On an instance of a concrete L{WorkItem} subclass, this attribute must
-        be an integer; on the concrete L{WorkItem} subclass itself, this
-        attribute must be a L{twext.enterprise.dal.syntax.ColumnSyntax}.  Note
-        that this is automatically taken care of if you simply have a
-        corresponding C{work_id} column in the associated L{fromTable} on your
-        L{WorkItem} subclass.  This column must be unique, and it must be an
-        integer.  In almost all cases, this column really ought to be filled
-        out by a database-defined sequence; if not, you need some other
-        mechanism for establishing a cluster-wide sequence.
-    @type workID: L{int} on instance,
-        L{twext.enterprise.dal.syntax.ColumnSyntax} on class.
-    @cvar notBefore: the timestamp before which this item should I{not} be
-        processed.  If unspecified, this should be the date and time of the
-        creation of the L{WorkItem}.
-    @type notBefore: L{datetime.datetime} on instance,
-        L{twext.enterprise.dal.syntax.ColumnSyntax} on class.
-    @ivar group: If not C{None}, a unique-to-the-database identifier for which
-        only one L{WorkItem} will execute at a time.
-    @type group: L{unicode} or L{NoneType}
-    """
-    group = None
-    default_priority = WORK_PRIORITY_LOW    # Default - subclasses should override
-    default_weight = WORK_WEIGHT_5          # Default - subclasses should override
-    _tableNameMap = {}
-    @classmethod
-    def workType(cls):
-        return cls.table.model.name
-    @classmethod
-    @inlineCallbacks
-    def makeJob(cls, transaction, **kwargs):
-        """
-        A new work item needs to be created. First we create a Job record, then
-        we create the actual work item related to the job.
-        @param transaction: the transaction to use
-        @type transaction: L{IAsyncTransaction}
-        """
-        jobargs = {
-            "workType": cls.workType()
-        }
-        def _transferArg(name):
-            arg = kwargs.pop(name, None)
-            if arg is not None:
-                jobargs[name] = arg
-            elif hasattr(cls, "default_{}".format(name)):
-                jobargs[name] = getattr(cls, "default_{}".format(name))
-        _transferArg("jobID")
-        _transferArg("priority")
-        _transferArg("weight")
-        _transferArg("notBefore")
-        _transferArg("pause")
-        # Always need a notBefore
-        if "notBefore" not in jobargs:
-            jobargs["notBefore"] = datetime.utcnow()
-        job = yield JobItem.create(transaction, **jobargs)
-        kwargs["jobID"] = job.jobID
-        work = yield cls.create(transaction, **kwargs)
-        work.__dict__["job"] = job
-        returnValue(work)
-    @classmethod
-    @inlineCallbacks
-    def loadForJob(cls, txn, jobID):
-        workItems = yield cls.query(txn, (cls.jobID == jobID))
-        returnValue(workItems)
-    @inlineCallbacks
-    def runlock(self):
-        """
-        Used to lock an L{WorkItem} before it is run. The L{WorkItem}'s row MUST be
-        locked via SELECT FOR UPDATE to ensure the job queue knows it is being worked
-        on so that it can detect when an overdue job needs to be restarted or not.
-        Note that the locking used here may cause deadlocks if not done in the correct
-        order. In particular anything that might cause locks across multiple LWorkItem}s,
-        such as group locks, multi-row locks, etc, MUST be done first.
-        @return: an L{Deferred} that fires with L{True} if the L{WorkItem} was locked,
-            L{False} if not.
-        @rtype: L{Deferred}
-        """
-        # Do the group lock first since this can impact multiple rows and thus could
-        # cause deadlocks if done in the wrong order
-        # Row level lock on this item
-        locked = yield self.trylock(self.group)
-        returnValue(locked)
-    @inlineCallbacks
-    def beforeWork(self):
-        """
-        A hook that gets called before the L{WorkItem} does its real work. This can be used
-        for common behaviors need by work items. The base implementation handles the group
-        locking behavior.
-        @return: an L{Deferred} that fires with L{True} if processing of the L{WorkItem}
-            should continue, L{False} if it should be skipped without error.
-        @rtype: L{Deferred}
-        """
-        try:
-            # Work item is deleted before doing work - but someone else may have
-            # done it whilst we waited on the lock so handle that by simply
-            # ignoring the work
-            yield self.delete()
-        except NoSuchRecord:
-            # The record has already been removed
-            returnValue(False)
-        else:
-            returnValue(True)
-    def doWork(self):
-        """
-        Subclasses must implement this to actually perform the queued work.
-        This method will be invoked in a worker process.
-        This method does I{not} need to delete the row referencing it; that
-        will be taken care of by the job queuing machinery.
-        """
-        raise NotImplementedError
-    def afterWork(self):
-        """
-        A hook that gets called after the L{WorkItem} does its real work. This can be used
-        for common clean-up behaviors. The base implementation does nothing.
-        """
-        return succeed(None)
-    @inlineCallbacks
-    def remove(self):
-        """
-        Remove this L{WorkItem} and the associated L{JobItem}. Typically work is not removed directly, but goes away
-        when processed, but in some cases (e.g., pod-2-pod migration) old work needs to be removed along with the
-        job (which is in a pause state and would otherwise never run).
-        """
-        # Delete the job, then self
-        yield JobItem.deletesome(self.transaction, JobItem.jobID == self.jobID)
-        yield self.delete()
-    @classmethod
-    @inlineCallbacks
-    def reschedule(cls, transaction, seconds, **kwargs):
-        """
-        Reschedule this work.
-        @param seconds: optional seconds delay - if not present use the class value.
-        @type seconds: L{int} or L{None}
-        """
-        if seconds is not None and seconds >= 0:
-            notBefore = (
-                datetime.utcnow() +
-                timedelta(seconds=seconds)
-            )
-            log.debug(
-                "Scheduling next {cls}: {when}",
-                cls=cls.__name__,
-                when=notBefore,
-            )
-            wp = yield transaction._queuer.enqueueWork(
-                transaction,
-                cls,
-                notBefore=notBefore,
-                **kwargs
-            )
-            returnValue(wp)
-        else:
-            returnValue(None)
-class SingletonWorkItem(WorkItem):
-    """
-    An L{WorkItem} that can only appear once no matter how many times an attempt is
-    made to create one. The L{allowOverride} class property determines whether the attempt
-    to create a new job is simply ignored, or whether the new job overrides any existing
-    one.
-    """
-    @classmethod
-    @inlineCallbacks
-    def makeJob(cls, transaction, **kwargs):
-        """
-        A new work item needs to be created. First we create a Job record, then
-        we create the actual work item related to the job.
-        @param transaction: the transaction to use
-        @type transaction: L{IAsyncTransaction}
-        """
-        all = yield cls.all(transaction)
-        if len(all):
-            # Silently ignore the creation of this work
-            returnValue(None)
-        result = yield super(SingletonWorkItem, cls).makeJob(transaction, **kwargs)
-        returnValue(result)
-    @inlineCallbacks
-    def beforeWork(self):
-        """
-        For safety just delete any others.
-        """
-        # Delete all other work items
-        yield self.deleteall(self.transaction)
-        returnValue(True)
-    @classmethod
-    @inlineCallbacks
-    def reschedule(cls, transaction, seconds, force=False, **kwargs):
-        """
-        Reschedule a singleton. If L{force} is set then delete any existing item before
-        creating the new one. This allows the caller to explicitly override an existing
-        singleton.
-        """
-        if force:
-            yield cls.deleteall(transaction)
-            yield cls.all(transaction)
-        result = yield super(SingletonWorkItem, cls).reschedule(transaction, seconds, **kwargs)
-        returnValue(result)
-class AggregatedWorkItem(WorkItem):
-    """
-    An L{WorkItem} that deletes all the others in the same group prior to running.
-    """
-    @inlineCallbacks
-    def beforeWork(self):
-        """
-        For safety just delete any others.
-        """
-        # Delete all other work items
-        yield self.deletesome(self.transaction, self.group)
-        returnValue(True)
-class RegeneratingWorkItem(SingletonWorkItem):
-    """
-    An L{SingletonWorkItem} that regenerates itself when work is done.
-    """
-    def regenerateInterval(self):
-        """
-        Return the interval in seconds between regenerating instances.
-        """
-        return None
-    @inlineCallbacks
-    def afterWork(self):
-        """
-        A hook that gets called after the L{WorkItem} does its real work. This can be used
-        for common clean-up behaviors. The base implementation does nothing.
-        """
-        yield super(RegeneratingWorkItem, self).afterWork()
-        yield self.reschedule(self.transaction, self.regenerateInterval())
-class PerformJob(Command):
-    """
-    Notify another process that it must do a job that has been persisted to
-    the database, by informing it of the job ID.
-    """
-    arguments = [
-        ("job", JobDescriptorArg()),
-    ]
-    response = []
-class EnqueuedJob(Command):
-    """
-    Notify the controller process that a worker enqueued some work. This is used to "wake up"
-    the controller if it has slowed its polling loop due to it being idle.
-    """
-    arguments = []
-    response = []
-class ReportLoad(Command):
-    """
-    Notify another node of the total, current load for this whole node (all of
-    its workers).
-    """
-    arguments = [
-        ("load", Integer())
-    ]
-    response = []
-class IdentifyNode(Command):
-    """
-    Identify this node to its peer.  The connector knows which hostname it's
-    looking for, and which hostname it considers itself to be, only the
-    initiator (not the listener) issues this command.  This command is
-    necessary because we don't want to rely on DNS; if reverse DNS weren't set
-    up perfectly, the listener would not be able to identify its peer, and it
-    is easier to modify local configuration so that L{socket.getfqdn} returns
-    the right value than to ensure that DNS does.
-    """
-    arguments = [
-        ("host", String()),
-        ("port", Integer()),
-    ]
-class ConnectionFromPeerNode(AMP):
-    """
-    A connection to a peer node.  Symmetric; since the "client" and the
-    "server" both serve the same role, the logic is the same in every node.
-    @ivar localWorkerPool: the pool of local worker processes that can process
-        queue work.
-    @type localWorkerPool: L{WorkerConnectionPool}
-    @ivar _reportedLoad: The number of outstanding requests being processed by
-        the peer of this connection, from all requestors (both the host of this
-        connection and others), as last reported by the most recent
-        L{ReportLoad} message received from the peer.
-    @type _reportedLoad: L{int}
-    @ivar _bonusLoad: The number of additional outstanding requests being
-        processed by the peer of this connection; the number of requests made
-        by the host of this connection since the last L{ReportLoad} message.
-    @type _bonusLoad: L{int}
-    """
-    implements(_IJobPerformer)
-    def __init__(self, peerPool, boxReceiver=None, locator=None):
-        """
-        Initialize this L{ConnectionFromPeerNode} with a reference to a
-        L{PeerConnectionPool}, as well as required initialization arguments for
-        L{AMP}.
-        @param peerPool: the connection pool within which this
-            L{ConnectionFromPeerNode} is a participant.
-        @type peerPool: L{PeerConnectionPool}
-        @see: L{AMP.__init__}
-        """
-        self.peerPool = peerPool
-        self._bonusLoad = 0
-        self._reportedLoad = 0
-        super(ConnectionFromPeerNode, self).__init__(
-            boxReceiver, locator
-        )
-    def reportCurrentLoad(self):
-        """
-        Report the current load for the local worker pool to this peer.
-        """
-        return self.callRemote(ReportLoad, load=self.totalLoad())
-    @ReportLoad.responder
-    def reportedLoad(self, load):
-        """
-        The peer reports its load.
-        """
-        self._reportedLoad = (load - self._bonusLoad)
-        return {}
-    def startReceivingBoxes(self, sender):
-        """
-        Connection is up and running; add this to the list of active peers.
-        """
-        r = super(ConnectionFromPeerNode, self).startReceivingBoxes(sender)
-        self.peerPool.addPeerConnection(self)
-        return r
-    def stopReceivingBoxes(self, reason):
-        """
-        The connection has shut down; remove this from the list of active
-        peers.
-        """
-        self.peerPool.removePeerConnection(self)
-        r = super(ConnectionFromPeerNode, self).stopReceivingBoxes(reason)
-        return r
-    def currentLoadEstimate(self):
-        """
-        What is the current load estimate for this peer?
-        @return: The number of full "slots", i.e. currently-being-processed
-            queue items (and other items which may contribute to this process's
-            load, such as currently-being-processed client requests).
-        @rtype: L{int}
-        """
-        return self._reportedLoad + self._bonusLoad
-    def performJob(self, job):
-        """
-        A L{local worker connection <ConnectionFromWorker>} is asking this
-        specific peer node-controller process to perform a job, having
-        already determined that it's appropriate.
-        @see: L{_IJobPerformer.performJob}
-        """
-        d = self.callRemote(PerformJob, job=job)
-        self._bonusLoad += job.weight
-        @d.addBoth
-        def performed(result):
-            self._bonusLoad -= job.weight
-            return result
-        @d.addCallback
-        def success(result):
-            return None
-        return d
-    @PerformJob.responder
-    def dispatchToWorker(self, job):
-        """
-        A remote peer node has asked this node to do a job; dispatch it to
-        a local worker on this node.
-        @param job: the details of the job.
-        @type job: L{JobDescriptor}
-        @return: a L{Deferred} that fires when the work has been completed.
-        """
-        d = self.peerPool.performJobForPeer(job)
-        d.addCallback(lambda ignored: {})
-        return d
-    @IdentifyNode.responder
-    def identifyPeer(self, host, port):
-        self.peerPool.mapPeer(host, port, self)
-        return {}
-class WorkerConnectionPool(object):
-    """
-    A pool of L{ConnectionFromWorker}s.
-    L{WorkerConnectionPool} also implements the same implicit protocol as a
-    L{ConnectionFromPeerNode}, but one that dispenses work to the local worker
-    processes rather than to a remote connection pool.
-    """
-    implements(_IJobPerformer)
-    completed = collections.defaultdict(int)
-    timing = collections.defaultdict(float)
-    def __init__(self, maximumLoadPerWorker=WORK_WEIGHT_CAPACITY):
-        self.workers = []
-        self.maximumLoadPerWorker = maximumLoadPerWorker
-    def addWorker(self, worker):
-        """
-        Add a L{ConnectionFromWorker} to this L{WorkerConnectionPool} so that
-        it can be selected.
-        """
-        self.workers.append(worker)
-    def removeWorker(self, worker):
-        """
-        Remove a L{ConnectionFromWorker} from this L{WorkerConnectionPool} that
-        was previously added.
-        """
-        self.workers.remove(worker)
-    def hasAvailableCapacity(self):
-        """
-        Does this worker connection pool have any local workers who have spare
-        hasAvailableCapacity to process another queue item?
-        """
-        for worker in self.workers:
-            if worker.currentLoad < self.maximumLoadPerWorker:
-                return True
-        return False
-    def loadLevel(self):
-        """
-        Return the overall load of this worker connection pool have as a percentage of
-        total capacity.
-        @return: current load percentage.
-        @rtype: L{int}
-        """
-        current = sum(worker.currentLoad for worker in self.workers)
-        total = len(self.workers) * self.maximumLoadPerWorker
-        return ((current * 100) / total) if total else 100
-    def eachWorkerLoad(self):
-        """
-        The load of all currently connected workers.
-        """
-        return [(worker.currentAssigned, worker.currentLoad, worker.totalCompleted) for worker in self.workers]
-    def allWorkerLoad(self):
-        """
-        The total load of all currently connected workers.
-        """
-        return sum(worker.currentLoad for worker in self.workers)
-    def _selectLowestLoadWorker(self):
-        """
-        Select the local connection with the lowest current load, or C{None} if
-        all workers are too busy.
-        @return: a worker connection with the lowest current load.
-        @rtype: L{ConnectionFromWorker}
-        """
-        return sorted(self.workers[:], key=lambda w: w.currentLoad)[0]
-    @inlineCallbacks
-    def performJob(self, job):
-        """
-        Select a local worker that is idle enough to perform the given job,
-        then ask them to perform it.
-        @param job: The details of the given job.
-        @type job: L{JobDescriptor}
-        @return: a L{Deferred} firing with an empty dictionary when the work is
-            complete.
-        @rtype: L{Deferred} firing L{dict}
-        """
-        t = time.time()
-        preferredWorker = self._selectLowestLoadWorker()
-        try:
-            result = yield preferredWorker.performJob(job)
-        finally:
-            self.completed[job.type] += 1
-            self.timing[job.type] += time.time() - t
-        returnValue(result)
-class ConnectionFromWorker(AMP):
-    """
-    An individual connection from a worker, as seen from the master's
-    perspective.  L{ConnectionFromWorker}s go into a L{WorkerConnectionPool}.
-    """
-    def __init__(self, peerPool, boxReceiver=None, locator=None):
-        super(ConnectionFromWorker, self).__init__(boxReceiver, locator)
-        self.peerPool = peerPool
-        self._assigned = 0
-        self._load = 0
-        self._completed = 0
-    @property
-    def currentAssigned(self):
-        """
-        How many jobs currently assigned to this worker?
-        """
-        return self._assigned
-    @property
-    def currentLoad(self):
-        """
-        What is the current load of this worker?
-        """
-        return self._load
-    @property
-    def totalCompleted(self):
-        """
-        What is the current load of this worker?
-        """
-        return self._completed
-    def startReceivingBoxes(self, sender):
-        """
-        Start receiving AMP boxes from the peer.  Initialize all necessary
-        state.
-        """
-        result = super(ConnectionFromWorker, self).startReceivingBoxes(sender)
-        self.peerPool.workerPool.addWorker(self)
-        return result
-    def stopReceivingBoxes(self, reason):
-        """
-        AMP boxes will no longer be received.
-        """
-        result = super(ConnectionFromWorker, self).stopReceivingBoxes(reason)
-        self.peerPool.workerPool.removeWorker(self)
-        return result
-    @PerformJob.responder
-    def performJob(self, job):
-        """
-        Dispatch a job to this worker.
-        @see: The responder for this should always be
-            L{ConnectionFromController.actuallyReallyExecuteJobHere}.
-        """
-        d = self.callRemote(PerformJob, job=job)
-        self._assigned += 1
-        self._load += job.weight
-        @d.addBoth
-        def f(result):
-            self._assigned -= 1
-            self._load -= job.weight
-            self._completed += 1
-            return result
-        return d
-    @EnqueuedJob.responder
-    def enqueuedJob(self):
-        """
-        A worker enqueued a job and is letting us know. We need to "ping" the
-        L{PeerConnectionPool} to ensure it is polling the job queue at its
-        normal "fast" rate, as opposed to slower idle rates.
-        """
-        self.peerPool.enqueuedJob()
-        return {}
-class ConnectionFromController(AMP):
-    """
-    A L{ConnectionFromController} is the connection to a node-controller
-    process, in a worker process.  It processes requests from its own
-    controller to do work.  It is the opposite end of the connection from
-    L{ConnectionFromWorker}.
-    """
-    implements(IQueuer)
-    def __init__(self, transactionFactory, whenConnected,
-                 boxReceiver=None, locator=None):
-        super(ConnectionFromController, self).__init__(boxReceiver, locator)
-        self._txnFactory = transactionFactory
-        self.whenConnected = whenConnected
-        # FIXME: Glyph it appears WorkProposal expects this to have reactor...
-        from twisted.internet import reactor
-        self.reactor = reactor
-    def transactionFactory(self, *args, **kwargs):
-        txn = self._txnFactory(*args, **kwargs)
-        txn._queuer = self
-        return txn
-    def startReceivingBoxes(self, sender):
-        super(ConnectionFromController, self).startReceivingBoxes(sender)
-        self.whenConnected(self)
-    def choosePerformer(self):
-        """
-        To conform with L{WorkProposal}'s expectations, which may run in either
-        a controller (against a L{PeerConnectionPool}) or in a worker (against
-        a L{ConnectionFromController}), this is implemented to always return
-        C{self}, since C{self} is also an object that has a C{performJob}
-        method.
-        """
-        return self
-    def performJob(self, job):
-        """
-        Ask the controller to perform a job on our behalf.
-        """
-        return self.callRemote(PerformJob, job=job)
-    @inlineCallbacks
-    def enqueueWork(self, txn, workItemType, **kw):
-        """
-        There is some work to do.  Do it, ideally someplace else, ideally in
-        parallel.  Later, let the caller know that the work has been completed
-        by firing a L{Deferred}.
-        @param workItemType: The type of work item to be enqueued.
-        @type workItemType: A subtype of L{WorkItem}
-        @param kw: The parameters to construct a work item.
-        @type kw: keyword parameters to C{workItemType.create}, i.e.
-            C{workItemType.__init__}
-        @return: an object that can track the enqueuing and remote execution of
-            this work.
-        @rtype: L{WorkProposal}
-        """
-        wp = WorkProposal(self, txn, workItemType, kw)
-        yield wp._start()
-        self.callRemote(EnqueuedJob)
-        returnValue(wp)
-    @PerformJob.responder
-    def actuallyReallyExecuteJobHere(self, job):
-        """
-        This is where it's time to actually do the job.  The controller
-        process has instructed this worker to do it; so, look up the data in
-        the row, and do it.
-        """
-        d = JobItem.ultimatelyPerform(self.transactionFactory, job.jobID)
-        d.addCallback(lambda ignored: {})
-        return d
-class LocalPerformer(object):
-    """
-    Implementor of C{performJob} that does its work in the local process,
-    regardless of other conditions.
-    """
-    implements(_IJobPerformer)
-    def __init__(self, txnFactory):
-        """
-        Create this L{LocalPerformer} with a transaction factory.
-        """
-        self.txnFactory = txnFactory
-    def performJob(self, job):
-        """
-        Perform the given job right now.
-        """
-        return JobItem.ultimatelyPerform(self.txnFactory, job.jobID)
-class WorkerFactory(Factory, object):
-    """
-    Factory, to be used as the client to connect from the worker to the
-    controller.
-    """
-    def __init__(self, transactionFactory, whenConnected):
-        """
-        Create a L{WorkerFactory} with a transaction factory and a schema.
-        """
-        self.transactionFactory = transactionFactory
-        self.whenConnected = whenConnected
-    def buildProtocol(self, addr):
-        """
-        Create a L{ConnectionFromController} connected to the
-        transactionFactory and store.
-        """
-        return ConnectionFromController(
-            self.transactionFactory, self.whenConnected
-        )
-def _cloneDeferred(d):
-    """
-    Make a new Deferred, adding callbacks to C{d}.
-    @return: another L{Deferred} that fires with C{d's} result when C{d} fires.
-    @rtype: L{Deferred}
-    """
-    d2 = Deferred()
-    d.chainDeferred(d2)
-    return d2
-class WorkProposal(object):
-    """
-    A L{WorkProposal} is a proposal for work that will be executed, perhaps on
-    another node, perhaps in the future.
-    @ivar _chooser: The object which will choose where the work in this
-        proposal gets performed.  This must have both a C{choosePerformer}
-        method and a C{reactor} attribute, providing an L{IReactorTime}.
-    @type _chooser: L{PeerConnectionPool} or L{LocalQueuer}
-    @ivar txn: The transaction where the work will be enqueued.
-    @type txn: L{IAsyncTransaction}
-    @ivar workItemType: The type of work to be enqueued by this L{WorkProposal}
-    @type workItemType: L{WorkItem} subclass
-    @ivar kw: The keyword arguments to pass to C{self.workItemType.create} to
-        construct it.
-    @type kw: L{dict}
-    """
-    def __init__(self, chooser, txn, workItemType, kw):
-        self._chooser = chooser
-        self.txn = txn
-        self.workItemType = workItemType
-        self.kw = kw
-        self.workItem = None
-    @inlineCallbacks
-    def _start(self):
-        """
-        Execute this L{WorkProposal} by creating the work item in the database,
-        waiting for the transaction where that addition was completed to
-        commit, and asking the local node controller process to do the work.
-        """
-        self.workItem = yield self.workItemType.makeJob(self.txn, **self.kw)
-class _BaseQueuer(object):
-    implements(IQueuer)
-    def __init__(self):
-        super(_BaseQueuer, self).__init__()
-        self.proposalCallbacks = set()
-    def callWithNewProposals(self, callback):
-        self.proposalCallbacks.add(callback)
-    def transferProposalCallbacks(self, newQueuer):
-        newQueuer.proposalCallbacks = self.proposalCallbacks
-        return newQueuer
-    @inlineCallbacks
-    def enqueueWork(self, txn, workItemType, **kw):
-        """
-        There is some work to do.  Do it, someplace else, ideally in parallel.
-        Later, let the caller know that the work has been completed by firing a
-        L{Deferred}.
-        @param workItemType: The type of work item to be enqueued.
-        @type workItemType: A subtype of L{WorkItem}
-        @param kw: The parameters to construct a work item.
-        @type kw: keyword parameters to C{workItemType.create}, i.e.
-            C{workItemType.__init__}
-        @return: an object that can track the enqueuing and remote execution of
-            this work.
-        @rtype: L{WorkProposal}
-        """
-        wp = WorkProposal(self, txn, workItemType, kw)
-        yield wp._start()
-        for callback in self.proposalCallbacks:
-            callback(wp)
-        self.enqueuedJob()
-        returnValue(wp)
-    def enqueuedJob(self):
-        """
-        Work has been enqueued
-        """
-        pass
-class PeerConnectionPool(_BaseQueuer, MultiService, object):
-    """
-    Each node has a L{PeerConnectionPool} connecting it to all the other nodes
-    currently active on the same database.
-    @ivar hostname: The hostname where this node process is running, as
-        reported by the local host's configuration.  Possibly this should be
-        obtained via C{config.ServerHostName} instead of C{socket.getfqdn()};
-        although hosts within a cluster may be configured with the same
-        C{ServerHostName}; TODO need to confirm.
-    @type hostname: L{bytes}
-    @ivar thisProcess: a L{NodeInfo} representing this process, which is
-        initialized when this L{PeerConnectionPool} service is started via
-        C{startService}.  May be C{None} if this service is not fully started
-        up or if it is shutting down.
-    @type thisProcess: L{NodeInfo}
-    @ivar queueProcessTimeout: The amount of time after a L{WorkItem} is
-        scheduled to be processed (its C{notBefore} attribute) that it is
-        considered to be "orphaned" and will be run by a lost-work check rather
-        than waiting for it to be requested.  By default, 10 minutes.
-    @type queueProcessTimeout: L{float} (in seconds)
-    @ivar queuePollInterval: The amount of time between database
-        pings, i.e. checks for over-due queue items that might have been
-        orphaned by a controller process that died mid-transaction.  This is
-        how often the shared database should be pinged by I{all} nodes (i.e.,
-        all controller processes, or each instance of L{PeerConnectionPool});
-        each individual node will ping commensurately less often as more nodes
-        join the database.
-    @type queuePollInterval: L{float} (in seconds)
-    @ivar reactor: The reactor used for scheduling timed events.
-    @type reactor: L{IReactorTime} provider.
-    @ivar peers: The list of currently connected peers.
-    @type peers: L{list} of L{PeerConnectionPool}
-    """
-    implements(IQueuer)
-    from socket import getfqdn
-    from os import getpid
-    getfqdn = staticmethod(getfqdn)
-    getpid = staticmethod(getpid)
-    queuePollInterval = 0.1             # How often to poll for new work
-    queueOverdueTimeout = 5.0 * 60.0    # How long before assigned work is possibly overdue
-    queuePollingBackoff = ((60.0, 60.0), (5.0, 1.0),)   # Polling backoffs
-    overloadLevel = 95          # Percentage load level above which job queue processing stops
-    highPriorityLevel = 80      # Percentage load level above which only high priority jobs are processed
-    mediumPriorityLevel = 50    # Percentage load level above which high and medium priority jobs are processed
-    def __init__(self, reactor, transactionFactory, ampPort, useWorkerPool=True, disableWorkProcessing=False):
-        """
-        Initialize a L{PeerConnectionPool}.
-        @param ampPort: The AMP TCP port number to listen on for inter-host
-            communication.  This must be an integer (and not, say, an endpoint,
-            or an endpoint description) because we need to communicate it to
-            the other peers in the cluster in a way that will be meaningful to
-            them as clients.
-        @type ampPort: L{int}
-        @param transactionFactory: a 0- or 1-argument callable that produces an
-            L{IAsyncTransaction}
-        @param useWorkerPool:  Whether to use a worker pool to manage load
-            or instead take on all work ourselves (e.g. in single process mode)
-        """
-        super(PeerConnectionPool, self).__init__()
-        self.reactor = reactor
-        self.transactionFactory = transactionFactory
-        self.hostname = self.getfqdn()
-        self.pid = self.getpid()
-        self.ampPort = ampPort
-        self.thisProcess = None
-        self.workerPool = WorkerConnectionPool() if useWorkerPool else None
-        self.disableWorkProcessing = disableWorkProcessing
-        self.peers = []
-        self.mappedPeers = {}
-        self._startingUp = None
-        self._listeningPort = None
-        self._lastSeenTotalNodes = 1
-        self._lastSeenNodeIndex = 1
-        self._lastMinPriority = WORK_PRIORITY_LOW
-        self._timeOfLastWork = time.time()
-        self._actualPollInterval = self.queuePollInterval
-        self._inWorkCheck = False
-    def addPeerConnection(self, peer):
-        """
-        Add a L{ConnectionFromPeerNode} to the active list of peers.
-        """
-        self.peers.append(peer)
-    def enable(self):
-        """
-        Turn on work queue processing.
-        """
-        self.disableWorkProcessing = False
-    def disable(self):
-        """
-        Turn off work queue processing.
-        """
-        self.disableWorkProcessing = True
-    def totalLoad(self):
-        return self.workerPool.allWorkerLoad() if self.workerPool else 0
-    def workerListenerFactory(self):
-        """
-        Factory that listens for connections from workers.
-        """
-        f = Factory()
-        f.buildProtocol = lambda addr: ConnectionFromWorker(self)
-        return f
-    def removePeerConnection(self, peer):
-        """
-        Remove a L{ConnectionFromPeerNode} to the active list of peers.
-        """
-        self.peers.remove(peer)
-    def choosePerformer(self, onlyLocally=False):
-        """
-        Choose a peer to distribute work to based on the current known slot
-        occupancy of the other nodes.  Note that this will prefer distributing
-        work to local workers until the current node is full, because that
-        should be lower-latency.  Also, if no peers are available, work will be
-        submitted locally even if the worker pool is already over-subscribed.
-        @return: the chosen peer.
-        @rtype: L{_IJobPerformer} L{ConnectionFromPeerNode} or
-            L{WorkerConnectionPool}
-        """
-        if self.workerPool:
-            if self.workerPool.hasAvailableCapacity():
-                return self.workerPool
-            if self.peers and not onlyLocally:
-                return sorted(self.peers, key=lambda p: p.currentLoadEstimate())[0]
-            else:
-                raise JobFailedError("No capacity for work")
-        return LocalPerformer(self.transactionFactory)
-    def performJobForPeer(self, job):
-        """
-        A peer has requested us to perform a job; choose a job performer
-        local to this node, and then execute it.
-        """
-        performer = self.choosePerformer(onlyLocally=True)
-        return performer.performJob(job)
-    def totalNumberOfNodes(self):
-        """
-        How many nodes are there, total?
-        @return: the maximum number of other L{PeerConnectionPool} instances
-            that may be connected to the database described by
-            C{self.transactionFactory}.  Note that this is not the current
-            count by connectivity, but the count according to the database.
-        @rtype: L{int}
-        """
-        # TODO
-        return self._lastSeenTotalNodes
-    def nodeIndex(self):
-        """
-        What ordinal does this node, i.e. this instance of
-        L{PeerConnectionPool}, occupy within the ordered set of all nodes
-        connected to the database described by C{self.transactionFactory}?
-        @return: the index of this node within the total collection.  For
-            example, if this L{PeerConnectionPool} is 6 out of 30, this method
-            will return C{6}.
-        @rtype: L{int}
-        """
-        # TODO
-        return self._lastSeenNodeIndex
-    @inlineCallbacks
-    def _workCheck(self):
-        """
-        Every node controller will periodically check for any new work to do, and dispatch
-        as much as possible given the current load.
-        """
-        # FIXME: not sure if we should do this node check on every work poll
-#        if self.thisProcess:
-#            nodes = [(node.hostname, node.port) for node in
-#                     (yield self.activeNodes(txn))]
-#            nodes.sort()
-#            self._lastSeenTotalNodes = len(nodes)
-#            self._lastSeenNodeIndex = nodes.index(
-#                (self.thisProcess.hostname, self.thisProcess.port)
-#            )
-        loopCounter = 0
-        while True:
-            if not self.running or self.disableWorkProcessing:
-                returnValue(None)
-            # Check the overall service load - if overloaded skip this poll cycle.
-            # FIXME: need to include capacity of other nodes. For now we only check
-            # our own capacity and stop processing if too busy. Other nodes that
-            # are not busy will pick up work.
-            # If no workerPool, set level to 0, taking on all work.
-            level = 0 if self.workerPool is None else self.workerPool.loadLevel()
-            # Check overload level first
-            if level > self.overloadLevel:
-                if self._lastMinPriority != WORK_PRIORITY_HIGH + 1:
-                    log.error("workCheck: jobqueue is overloaded")
-                self._lastMinPriority = WORK_PRIORITY_HIGH + 1
-                self._timeOfLastWork = time.time()
-                break
-            elif level > self.highPriorityLevel:
-                minPriority = WORK_PRIORITY_HIGH
-            elif level > self.mediumPriorityLevel:
-                minPriority = WORK_PRIORITY_MEDIUM
-            else:
-                minPriority = WORK_PRIORITY_LOW
-            if self._lastMinPriority != minPriority:
-                log.debug(
-                    "workCheck: jobqueue priority limit change: {limit}",
-                    limit=minPriority,
-                )
-                if self._lastMinPriority == WORK_PRIORITY_HIGH + 1:
-                    log.error("workCheck: jobqueue is no longer overloaded")
-            self._lastMinPriority = minPriority
-            # Determine what the timestamp cutoff
-            # TODO: here is where we should iterate over the unlocked items
-            # that are due, ordered by priority, notBefore etc
-            nowTime = datetime.utcfromtimestamp(self.reactor.seconds())
-            self._inWorkCheck = True
-            txn = nextJob = None
-            try:
-                txn = self.transactionFactory(label="jobqueue.workCheck")
-                nextJob = yield JobItem.nextjob(txn, nowTime, minPriority)
-                if nextJob is None:
-                    break
-                if nextJob.assigned is not None:
-                    if nextJob.overdue > nowTime:
-                        # If it is now assigned but not overdue, ignore as this may have
-                        # been returned after another txn just assigned it
-                        continue
-                    else:
-                        # It is overdue - check to see whether the work item is currently locked - if so no
-                        # need to re-assign
-                        running = yield nextJob.isRunning()
-                        if running:
-                            # Change the overdue to further in the future whilst we wait for
-                            # the running job to complete
-                            yield nextJob.bumpOverdue(self.queueOverdueTimeout)
-                            log.debug(
-                                "workCheck: bumped overdue timeout on jobid={jobid}",
-                                jobid=nextJob.jobID,
-                            )
-                            continue
-                        else:
-                            log.debug(
-                                "workCheck: overdue re-assignment for jobid={jobid}",
-                                jobid=nextJob.jobID,
-                            )
-                # Always assign as a new job even when it is an orphan
-                yield nextJob.assign(nowTime, self.queueOverdueTimeout)
-                self._timeOfLastWork = time.time()
-                loopCounter += 1
-            except Exception as e:
-                log.error(
-                    "Failed to pick a new job: {jobID}, {exc}",
-                    jobID=nextJob.jobID if nextJob else "?",
-                    exc=e,
-                )
-                if txn is not None:
-                    yield txn.abort()
-                    txn = None
-                # If we can identify the problem job, try and set it to failed so that it
-                # won't block other jobs behind it (it will be picked again when the failure
-                # interval is exceeded - but that has a back off so a permanently stuck item
-                # should fade away. We probably want to have some additional logic to simply
-                # remove something that is permanently failing.
-                if nextJob is not None:
-                    txn = self.transactionFactory(label="jobqueue.workCheck.failed")
-                    try:
-                        failedJob = yield JobItem.load(txn, nextJob.jobID)
-                        yield failedJob.failedToRun()
-                    except Exception as e:
-                        # Could not mark as failed - break out of the next job loop
-                        log.error(
-                            "Failed to mark failed new job:{}, {exc}",
-                            jobID=nextJob.jobID,
-                            exc=e,
-                        )
-                        yield txn.abort()
-                        txn = None
-                        nextJob = None
-                        break
-                    else:
-                        # Marked the problem one as failed, so keep going and get the next job
-                        log.error("Marked failed new job: {jobID}", jobID=nextJob.jobID)
-                        yield txn.commit()
-                        txn = None
-                        nextJob = None
-                else:
-                    # Cannot mark anything as failed - break out of next job loop
-                    log.error("Cannot mark failed new job")
-                    break
-            finally:
-                if txn is not None:
-                    yield txn.commit()
-                    txn = None
-                self._inWorkCheck = False
-            if nextJob is not None:
-                try:
-                    peer = self.choosePerformer(onlyLocally=True)
-                    # Send the job over but DO NOT block on the response - that will ensure
-                    # we can do stuff in parallel
-                    peer.performJob(nextJob.descriptor())
-                except Exception as e:
-                    log.error("Failed to perform job for jobid={jobid}, {exc}", jobid=nextJob.jobID, exc=e)
-        if loopCounter:
-            log.debug("workCheck: processed {ctr} jobs in one loop", ctr=loopCounter)
-    _currentWorkDeferred = None
-    _workCheckCall = None
-    def _workCheckLoop(self):
-        """
-        While the service is running, keep checking for any overdue / lost work
-        items and re-submit them to the cluster for processing.  Space out
-        those checks in time based on the size of the cluster.
-        """
-        self._workCheckCall = None
-        if not self.running:
-            return
-        @passthru(
-            self._workCheck().addErrback(lambda result: log.error("_workCheckLoop: {exc}", exc=result)).addCallback
-        )
-        def scheduleNext(result):
-            # TODO: if multiple nodes are present, see if we can
-            # stagger the polling to avoid contention.
-            self._currentWorkDeferred = None
-            if not self.running:
-                return
-            # Check for adjustment to poll interval - if the workCheck is idle for certain
-            # periods of time we will gradually increase the poll interval to avoid consuming
-            # excessive power when there is nothing to do
-            interval = self.queuePollInterval
-            idle = time.time() - self._timeOfLastWork
-            for threshold, poll in self.queuePollingBackoff:
-                if idle > threshold:
-                    interval = poll
-                    break
-            if self._actualPollInterval != interval:
-                log.debug("workCheckLoop: interval set to {interval}s", interval=interval)
-            self._actualPollInterval = interval
-            self._workCheckCall = self.reactor.callLater(
-                self._actualPollInterval, self._workCheckLoop
-            )
-        self._currentWorkDeferred = scheduleNext
-    def enqueuedJob(self):
-        """
-        Reschedule the work check loop to run right now. This should be called in response to "external" activity that
-        might want to "speed up" the job queue polling because new work may have been added.
-        """
-        # Only need to do this if the actual poll interval is greater than the default rapid value
-        if self._actualPollInterval == self.queuePollInterval:
-            return
-        # Bump time of last work so that we go back to the rapid (default) polling interval
-        self._timeOfLastWork = time.time()
-        # Reschedule the outstanding delayed call (handle exceptions by ignoring if its already running or
-        # just finished)
-        try:
-            if self._workCheckCall is not None:
-                self._workCheckCall.reset(0)
-        except (AlreadyCalled, AlreadyCancelled):
-            pass
-    def startService(self):
-        """
-        Register ourselves with the database and establish all outgoing
-        connections to other servers in the cluster.
-        """
-        @inlineCallbacks
-        def startup(txn):
-            if self.ampPort is not None:
-                endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
-                # If this fails, the failure mode is going to be ugly, just like
-                # all conflicted-port failures.  But, at least it won't proceed.
-                self._listeningPort = yield endpoint.listen(self.peerFactory())
-                self.ampPort = self._listeningPort.getHost().port
-                yield Lock.exclusive(NodeInfo.table).on(txn)
-                nodes = yield self.activeNodes(txn)
-                selves = [node for node in nodes
-                          if ((node.hostname == self.hostname) and
-                              (node.port == self.ampPort))]
-                if selves:
-                    self.thisProcess = selves[0]
-                    nodes.remove(self.thisProcess)
-                    yield self.thisProcess.update(pid=self.pid,
-                                                  time=datetime.now())
-                else:
-                    self.thisProcess = yield NodeInfo.create(
-                        txn, hostname=self.hostname, port=self.ampPort,
-                        pid=self.pid, time=datetime.now()
-                    )
-                for node in nodes:
-                    self._startConnectingTo(node)
-        self._startingUp = inTransaction(self.transactionFactory, startup, label="PeerConnectionPool.startService")
-        @self._startingUp.addBoth
-        def done(result):
-            self._startingUp = None
-            super(PeerConnectionPool, self).startService()
-            self._workCheckLoop()
-            return result
-    @inlineCallbacks
-    def stopService(self):
-        """
-        Stop this service, terminating any incoming or outgoing connections.
-        """
-        # If in the process of starting up, always wait for startup to complete before
-        # stopping,.
-        if self._startingUp is not None:
-            d = Deferred()
-            self._startingUp.addBoth(lambda result: d.callback(None))
-            yield d
-        yield super(PeerConnectionPool, self).stopService()
-        if self._listeningPort is not None:
-            yield self._listeningPort.stopListening()
-        if self._workCheckCall is not None:
-            self._workCheckCall.cancel()
-            self._workCheckCall = None
-        if self._currentWorkDeferred is not None:
-            self._currentWorkDeferred.cancel()
-            self._currentWorkDeferred = None
-        for connector in self._connectingToPeer:
-            d = Deferred()
-            connector.addBoth(lambda result: d.callback(None))
-            yield d
-        for peer in self.peers:
-            peer.transport.abortConnection()
-        # Wait for any active work check to finish (but no more than 1 minute)
-        start = time.time()
-        while self._inWorkCheck:
-            d = Deferred()
-            self.reactor.callLater(0.5, lambda : d.callback(None))
-            yield d
-            if time.time() - start >= 60:
-                break
-    def activeNodes(self, txn):
-        """
-        Load information about all other nodes.
-        """
-        return NodeInfo.all(txn)
-    def mapPeer(self, host, port, peer):
-        """
-        A peer has been identified as belonging to the given host/port
-        combination.  Disconnect any other peer that claims to be connected for
-        the same peer.
-        """
-        # if (host, port) in self.mappedPeers:
-        #     TODO: think about this for race conditions
-        #     self.mappedPeers.pop((host, port)).transport.loseConnection()
-        self.mappedPeers[(host, port)] = peer
-    _connectingToPeer = []
-    def _startConnectingTo(self, node):
-        """
-        Start an outgoing connection to another master process.
-        @param node: a description of the master to connect to.
-        @type node: L{NodeInfo}
-        """
-        connected = node.endpoint(self.reactor).connect(self.peerFactory())
-        self._connectingToPeer.append(connected)
-        def whenConnected(proto):
-            self._connectingToPeer.remove(connected)
-            self.mapPeer(node.hostname, node.port, proto)
-            proto.callRemote(
-                IdentifyNode,
-                host=self.thisProcess.hostname,
-                port=self.thisProcess.port
-            ).addErrback(noted, "identify")
-        def noted(err, x="connect"):
-            if x == "connect":
-                self._connectingToPeer.remove(connected)
-            log.error(
-                "Could not {action} to cluster peer {node} because {reason}",
-                action=x, node=node, reason=str(err.value),
-            )
-        connected.addCallbacks(whenConnected, noted)
-    def peerFactory(self):
-        """
-        Factory for peer connections.
-        @return: a L{Factory} that will produce L{ConnectionFromPeerNode}
-            protocols attached to this L{PeerConnectionPool}.
-        """
-        return _PeerPoolFactory(self)
-class _PeerPoolFactory(Factory, object):
-    """
-    Protocol factory responsible for creating L{ConnectionFromPeerNode}
-    connections, both client and server.
-    """
-    def __init__(self, peerConnectionPool):
-        self.peerConnectionPool = peerConnectionPool
-    def buildProtocol(self, addr):
-        return ConnectionFromPeerNode(self.peerConnectionPool)
-class LocalQueuer(_BaseQueuer):
-    """
-    When work is enqueued with this queuer, it is just executed locally.
-    """
-    implements(IQueuer)
-    def __init__(self, txnFactory, reactor=None):
-        super(LocalQueuer, self).__init__()
-        self.txnFactory = txnFactory
-        if reactor is None:
-            from twisted.internet import reactor
-        self.reactor = reactor
-    def choosePerformer(self):
-        """
-        Choose to perform the work locally.
-        """
-        return LocalPerformer(self.txnFactory)
-class NonPerformer(object):
-    """
-    Implementor of C{performJob} that doesn't actual perform any work.  This
-    is used in the case where you want to be able to enqueue work for someone
-    else to do, but not take on any work yourself (such as a command line
-    tool).
-    """
-    implements(_IJobPerformer)
-    def performJob(self, job):
-        """
-        Don't perform job.
-        """
-        return succeed(None)
-class NonPerformingQueuer(_BaseQueuer):
-    """
-    When work is enqueued with this queuer, it is never executed locally.
-    It's expected that the polling machinery will find the work and perform it.
-    """
-    implements(IQueuer)
-    def __init__(self, reactor=None):
-        super(NonPerformingQueuer, self).__init__()
-        if reactor is None:
-            from twisted.internet import reactor
-        self.reactor = reactor
-    def choosePerformer(self):
-        """
-        Choose to perform the work locally.
-        """
-        return NonPerformer()

Added: twext/trunk/twext/enterprise/jobs/__init__.py
--- twext/trunk/twext/enterprise/jobs/__init__.py	                        (rev 0)
+++ twext/trunk/twext/enterprise/jobs/__init__.py	2015-08-07 20:06:34 UTC (rev 15035)
@@ -0,0 +1,15 @@
+# Copyright (c) 2015 Apple Inc. All rights reserved.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.

Added: twext/trunk/twext/enterprise/jobs/jobitem.py
--- twext/trunk/twext/enterprise/jobs/jobitem.py	                        (rev 0)
+++ twext/trunk/twext/enterprise/jobs/jobitem.py	2015-08-07 20:06:34 UTC (rev 15035)
@@ -0,0 +1,655 @@
+# -*- test-case-name: twext.enterprise.test.test_queue -*-
+# Copyright (c) 2012-2015 Apple Inc. All rights reserved.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twext.enterprise.dal.model import Sequence
+from twext.enterprise.dal.model import Table, Schema, SQLType
+from twext.enterprise.dal.record import Record, fromTable, NoSuchRecord
+from twext.enterprise.dal.syntax import SchemaSyntax
+from twext.enterprise.ienterprise import ORACLE_DIALECT
+from twext.enterprise.jobs.utils import inTransaction, astimestamp
+from twext.python.log import Logger
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred
+from twisted.protocols.amp import Argument
+from twisted.python.failure import Failure
+from datetime import datetime, timedelta
+from collections import namedtuple
+import time
+log = Logger()
+A job is split into two pieces: an L{JobItem} (defined in this module) and an
+L{WorkItem} (defined in twext.enterprise.jobs.workitem). Each type of work has
+its own L{WorkItem} subclass. The overall work queue is a single table of
+L{JobItem}s which reference all the various L{WorkItem} tables. The
+L{ControllerQueue} then processes the items in the L{JobItem} table, which
+result in the appropriate L{WotkItem} being run. This split allows a single
+processing queue to handle many different types of work, each of which may have
+its own set of parameters.
+def makeJobSchema(inSchema):
+    """
+    Create a self-contained schema for L{JobInfo} to use, in C{inSchema}.
+    @param inSchema: a L{Schema} to add the job table to.
+    @type inSchema: L{Schema}
+    @return: a schema with just the one table.
+    """
+    # Initializing this duplicate schema avoids a circular dependency, but this
+    # should really be accomplished with independent schema objects that the
+    # transaction is made aware of somehow.
+    JobTable = Table(inSchema, "JOB")
+    JobTable.addColumn("JOB_ID", SQLType("integer", None), default=Sequence(inSchema, "JOB_SEQ"), notNull=True, primaryKey=True)
+    JobTable.addColumn("WORK_TYPE", SQLType("varchar", 255), notNull=True)
+    JobTable.addColumn("PRIORITY", SQLType("integer", 0), default=0)
+    JobTable.addColumn("WEIGHT", SQLType("integer", 0), default=0)
+    JobTable.addColumn("NOT_BEFORE", SQLType("timestamp", None), notNull=True)
+    JobTable.addColumn("ASSIGNED", SQLType("timestamp", None), default=None)
+    JobTable.addColumn("OVERDUE", SQLType("timestamp", None), default=None)
+    JobTable.addColumn("FAILED", SQLType("integer", 0), default=0)
+    JobTable.addColumn("PAUSE", SQLType("integer", 0), default=0)
+    return inSchema
+JobInfoSchema = SchemaSyntax(makeJobSchema(Schema(__file__)))
+class JobFailedError(Exception):
+    """
+    A job failed to run - we need to be smart about clean up.
+    """
+    def __init__(self, ex):
+        self._ex = ex
+class JobTemporaryError(Exception):
+    """
+    A job failed to run due to a temporary failure. We will get the job to run again after the specified
+    interval (with a built-in back-off based on the number of failures also applied).
+    """
+    def __init__(self, delay):
+        """
+        @param delay: amount of time in seconds before it should run again
+        @type delay: L{int}
+        """
+        self.delay = delay
+class JobRunningError(Exception):
+    """
+    A job is already running.
+    """
+    pass
+class JobItem(Record, fromTable(JobInfoSchema.JOB)):
+    """
+    @DynamicAttrs
+    An item in the job table. This is typically not directly used by code
+    creating work items, but rather is used for internal book keeping of jobs
+    associated with work items.
+    The JOB table has some important columns that determine how a job is being scheduled:
+    NOT_BEFORE - this is a timestamp indicating when the job is expected to run. It will not
+    run before this time, but may run quite some time after (if the service is busy).
+    ASSIGNED - this is a timestamp that is initially NULL but set when the job processing loop
+    assigns the job to a child process to be executed. Thus, if the value is not NULL, then the
+    job is (probably) being executed. The child process is supposed to delete the L{JobItem}
+    when it is done, however if the child dies without executing the job, then the job
+    processing loop needs to detect it.
+    OVERDUE - this is a timestamp initially set when an L{JobItem} is assigned. It represents
+    a point in the future when the job is expected to be finished. The job processing loop skips
+    jobs that have a non-NULL ASSIGNED value and whose OVERDUE value has not been passed. If
+    OVERDUE is in the past, then the job processing loop checks to see if the job is still
+    running - which is determined by whether a row lock exists on the work item (see
+    L{isRunning}. If the job is still running then OVERDUE is bumped up to a new point in the
+    future, if it is not still running the job is marked as failed - which will reschedule it.
+    FAILED - a count of the number of times a job has failed or had its overdue count bumped.
+    The above behavior depends on some important locking behavior: when an L{JobItem} is run,
+    it locks the L{WorkItem} row corresponding to the job (it may lock other associated
+    rows - e.g., other L{WorkItem}'s in the same group). It does not lock the L{JobItem}
+    row corresponding to the job because the job processing loop may need to update the
+    OVERDUE value of that row if the work takes a long time to complete.
+    """
+    _workTypes = None
+    _workTypeMap = None
+    lockRescheduleInterval = 60     # When a job can't run because of a lock, reschedule it this number of seconds in the future
+    failureRescheduleInterval = 60  # When a job fails, reschedule it this number of seconds in the future
+    def descriptor(self):
+        return JobDescriptor(self.jobID, self.weight, self.workType)
+    def assign(self, when, overdue):
+        """
+        Mark this job as assigned to a worker by setting the assigned column to the current,
+        or provided, timestamp. Also set the overdue value to help determine if a job is orphaned.
+        @param when: current timestamp
+        @type when: L{datetime.datetime}
+        @param overdue: number of seconds after assignment that the job will be considered overdue
+        @type overdue: L{int}
+        """
+        return self.update(assigned=when, overdue=when + timedelta(seconds=overdue))
+    def bumpOverdue(self, bump):
+        """
+        Increment the overdue value by the specified number of seconds. Used when an overdue job
+        is still running in a child process but the job processing loop has detected it as overdue.
+        @param bump: number of seconds to increment overdue by
+        @type bump: L{int}
+        """
+        return self.update(overdue=self.overdue + timedelta(seconds=bump))
+    def failedToRun(self, locked=False, delay=None):
+        """
+        The attempt to run the job failed. Leave it in the queue, but mark it
+        as unassigned, bump the failure count and set to run at some point in
+        the future.
+        @param lock: indicates if the failure was due to a lock timeout.
+        @type lock: L{bool}
+        @param delay: how long before the job is run again, or C{None} for a default
+            staggered delay behavior.
+        @type delay: L{int}
+        """
+        # notBefore is set to the chosen interval multiplied by the failure count, which
+        # results in an incremental backoff for failures
+        if delay is None:
+            delay = self.lockRescheduleInterval if locked else self.failureRescheduleInterval
+            delay *= (self.failed + 1)
+        return self.update(
+            assigned=None,
+            overdue=None,
+            failed=self.failed + (0 if locked else 1),
+            notBefore=datetime.utcnow() + timedelta(seconds=delay)
+        )
+    def pauseIt(self, pause=False):
+        """
+        Pause the L{JobItem} leaving all other attributes the same. The job processing loop
+        will skip paused items.
+        @param pause: indicates whether the job should be paused.
+        @type pause: L{bool}
+        @param delay: how long before the job is run again, or C{None} for a default
+            staggered delay behavior.
+        @type delay: L{int}
+        """
+        return self.update(pause=pause)
+    @classmethod
+    @inlineCallbacks
+    def ultimatelyPerform(cls, txnFactory, jobID):
+        """
+        Eventually, after routing the job to the appropriate place, somebody
+        actually has to I{do} it. This method basically calls L{JobItem.run}
+        but it does a bunch of "booking" to track the transaction and log failures
+        and timing information.
+        @param txnFactory: a 0- or 1-argument callable that creates an
+            L{IAsyncTransaction}
+        @type txnFactory: L{callable}
+        @param jobID: the ID of the job to be performed
+        @type jobID: L{int}
+        @return: a L{Deferred} which fires with C{None} when the job has been
+            performed, or fails if the job can't be performed.
+        """
+        t = time.time()
+        def _tm():
+            return "{:.3f}".format(1000 * (time.time() - t))
+        def _overtm(nb):
+            return "{:.0f}".format(1000 * (t - astimestamp(nb)))
+        # Failed job clean-up
+        def _failureCleanUp(delay=None):
+            @inlineCallbacks
+            def _cleanUp2(txn2):
+                try:
+                    job = yield cls.load(txn2, jobID)
+                except NoSuchRecord:
+                    log.debug(
+                        "JobItem: {jobid} disappeared t={tm}",
+                        jobid=jobID,
+                        tm=_tm(),
+                    )
+                else:
+                    log.debug(
+                        "JobItem: {jobid} marking as failed {count} t={tm}",
+                        jobid=jobID,
+                        count=job.failed + 1,
+                        tm=_tm(),
+                    )
+                    yield job.failedToRun(locked=isinstance(e, JobRunningError), delay=delay)
+            return inTransaction(txnFactory, _cleanUp2, "ultimatelyPerform._failureCleanUp")
+        log.debug("JobItem: {jobid} starting to run", jobid=jobID)
+        txn = txnFactory(label="ultimatelyPerform: {}".format(jobID))
+        try:
+            job = yield cls.load(txn, jobID)
+            if hasattr(txn, "_label"):
+                txn._label = "{} <{}>".format(txn._label, job.workType)
+            log.debug(
+                "JobItem: {jobid} loaded {work} t={tm}",
+                jobid=jobID,
+                work=job.workType,
+                tm=_tm(),
+            )
+            yield job.run()
+        except NoSuchRecord:
+            # The record has already been removed
+            yield txn.commit()
+            log.debug(
+                "JobItem: {jobid} already removed t={tm}",
+                jobid=jobID,
+                tm=_tm(),
+            )
+        except JobTemporaryError as e:
+            # Temporary failure delay with back-off
+            def _temporaryFailure():
+                return _failureCleanUp(delay=e.delay * (job.failed + 1))
+            log.debug(
+                "JobItem: {jobid} {desc} {work} t={tm}",
+                jobid=jobID,
+                desc="temporary failure #{}".format(job.failed + 1),
+                work=job.workType,
+                tm=_tm(),
+            )
+            txn.postAbort(_temporaryFailure)
+            yield txn.abort()
+        except (JobFailedError, JobRunningError) as e:
+            # Permanent failure
+            log.debug(
+                "JobItem: {jobid} {desc} {work} t={tm}",
+                jobid=jobID,
+                desc="failed" if isinstance(e, JobFailedError) else "locked",
+                work=job.workType,
+                tm=_tm(),
+            )
+            txn.postAbort(_failureCleanUp)
+            yield txn.abort()
+        except:
+            f = Failure()
+            log.error(
+                "JobItem: {jobid} unknown exception t={tm} {exc}",
+                jobid=jobID,
+                tm=_tm(),
+                exc=f,
+            )
+            yield txn.abort()
+            returnValue(f)
+        else:
+            yield txn.commit()
+            log.debug(
+                "JobItem: {jobid} completed {work} t={tm} over={over}",
+                jobid=jobID,
+                work=job.workType,
+                tm=_tm(),
+                over=_overtm(job.notBefore),
+            )
+        returnValue(None)
+    @classmethod
+    @inlineCallbacks
+    def nextjob(cls, txn, now, minPriority):
+        """
+        Find the next available job based on priority, also return any that are overdue. This
+        method uses an SQL query to find the matching jobs, and sorts based on the NOT_BEFORE
+        value and priority..
+        @param txn: the transaction to use
+        @type txn: L{IAsyncTransaction}
+        @param now: current timestamp - needed for unit tests that might use their
+            own clock.
+        @type now: L{datetime.datetime}
+        @param minPriority: lowest priority level to query for
+        @type minPriority: L{int}
+        @return: the job record
+        @rtype: L{JobItem}
+        """
+        jobs = yield cls.nextjobs(txn, now, minPriority, limit=1)
+        # Must only be one or zero
+        if jobs and len(jobs) > 1:
+            raise AssertionError("next_job() returned more than one row")
+        returnValue(jobs[0] if jobs else None)
+    @classmethod
+    @inlineCallbacks
+    def nextjobs(cls, txn, now, minPriority, limit=1):
+        """
+        Find the next available job based on priority, also return any that are overdue.
+        @param txn: the transaction to use
+        @type txn: L{IAsyncTransaction}
+        @param now: current timestamp
+        @type now: L{datetime.datetime}
+        @param minPriority: lowest priority level to query for
+        @type minPriority: L{int}
+        @param limit: limit on number of jobs to return
+        @type limit: L{int}
+        @return: the job record
+        @rtype: L{JobItem}
+        """
+        queryExpr = (cls.notBefore <= now).And(cls.priority >= minPriority).And(cls.pause == 0).And(
+            (cls.assigned == None).Or(cls.overdue < now)
+        )
+        if txn.dialect == ORACLE_DIALECT:
+            # Oracle does not support a "for update" clause with "order by". So do the
+            # "for update" as a second query right after the first. Will need to check
+            # how this might impact concurrency in a multi-host setup.
+            jobs = yield cls.query(
+                txn,
+                queryExpr,
+                order=(cls.assigned, cls.priority),
+                ascending=False,
+                limit=limit,
+            )
+            if jobs:
+                yield cls.query(
+                    txn,
+                    (cls.jobID.In([job.jobID for job in jobs])),
+                    forUpdate=True,
+                    noWait=False,
+                )
+        else:
+            jobs = yield cls.query(
+                txn,
+                queryExpr,
+                order=(cls.assigned, cls.priority),
+                ascending=False,
+                forUpdate=True,
+                noWait=False,
+                limit=limit,
+            )
+        returnValue(jobs)
+    @inlineCallbacks
+    def run(self):
+        """
+        Run this job item by finding the appropriate work item class and
+        running that, with appropriate locking.
+        """
+        workItem = yield self.workItem()
+        if workItem is not None:
+            # First we lock the L{WorkItem}
+            locked = yield workItem.runlock()
+            if not locked:
+                raise JobRunningError()
+            try:
+                # Run in three steps, allowing for before/after hooks that sub-classes
+                # may override
+                okToGo = yield workItem.beforeWork()
+                if okToGo:
+                    yield workItem.doWork()
+                    yield workItem.afterWork()
+            except Exception as e:
+                f = Failure()
+                log.error(
+                    "JobItem: {jobid}, WorkItem: {workid} failed: {exc}",
+                    jobid=self.jobID,
+                    workid=workItem.workID,
+                    exc=f,
+                )
+                if isinstance(e, JobTemporaryError):
+                    raise
+                else:
+                    raise JobFailedError(e)
+        try:
+            # Once the work is done we delete ourselves - NB this must be the last thing done
+            # to ensure the L{JobItem} row is not locked for very long.
+            yield self.delete()
+        except NoSuchRecord:
+            # The record has already been removed
+            pass
+    @inlineCallbacks
+    def isRunning(self):
+        """
+        Return L{True} if the job is currently running (its L{WorkItem} is locked).
+        """
+        workItem = yield self.workItem()
+        if workItem is not None:
+            locked = yield workItem.trylock()
+            returnValue(not locked)
+        else:
+            returnValue(False)
+    @inlineCallbacks
+    def workItem(self):
+        """
+        Return the L{WorkItem} corresponding to this L{JobItem}.
+        """
+        workItemClass = self.workItemForType(self.workType)
+        workItems = yield workItemClass.loadForJob(
+            self.transaction, self.jobID
+        )
+        returnValue(workItems[0] if len(workItems) == 1 else None)
+    @classmethod
+    def workItemForType(cls, workType):
+        """
+        Return the class of the L{WorkItem} associated with this L{JobItem}.
+        @param workType: the name of the L{WorkItem}'s table
+        @type workType: L{str}
+        """
+        if cls._workTypeMap is None:
+            cls.workTypes()
+        return cls._workTypeMap[workType]
+    @classmethod
+    def workTypes(cls):
+        """
+        Map all L{WorkItem} sub-classes table names to the class type.
+        @return: All of the work item types.
+        @rtype: iterable of L{WorkItem} subclasses
+        """
+        if cls._workTypes is None:
+            cls._workTypes = []
+            def getWorkType(subcls, appendTo):
+                if hasattr(subcls, "table"):
+                    appendTo.append(subcls)
+                else:
+                    for subsubcls in subcls.__subclasses__():
+                        getWorkType(subsubcls, appendTo)
+            from twext.enterprise.jobs.workitem import WorkItem
+            getWorkType(WorkItem, cls._workTypes)
+            cls._workTypeMap = {}
+            for subcls in cls._workTypes:
+                cls._workTypeMap[subcls.workType()] = subcls
+        return cls._workTypes
+    @classmethod
+    def numberOfWorkTypes(cls):
+        return len(cls.workTypes())
+    @classmethod
+    @inlineCallbacks
+    def waitEmpty(cls, txnCreator, reactor, timeout):
+        """
+        Wait for the job queue to drain. Only use this in tests
+        that need to wait for results from jobs.
+        """
+        t = time.time()
+        while True:
+            work = yield inTransaction(txnCreator, cls.all)
+            if not work:
+                break
+            if time.time() - t > timeout:
+                returnValue(False)
+            d = Deferred()
+            reactor.callLater(0.1, lambda : d.callback(None))
+            yield d
+        returnValue(True)
+    @classmethod
+    @inlineCallbacks
+    def waitJobDone(cls, txnCreator, reactor, timeout, jobID):
+        """
+        Wait for the specified job to complete. Only use this in tests
+        that need to wait for results from jobs.
+        """
+        t = time.time()
+        while True:
+            work = yield inTransaction(txnCreator, cls.query, expr=(cls.jobID == jobID))
+            if not work:
+                break
+            if time.time() - t > timeout:
+                returnValue(False)
+            d = Deferred()
+            reactor.callLater(0.1, lambda : d.callback(None))
+            yield d
+        returnValue(True)
+    @classmethod
+    @inlineCallbacks
+    def waitWorkDone(cls, txnCreator, reactor, timeout, workTypes):
+        """
+        Wait for the specified job to complete. Only use this in tests
+        that need to wait for results from jobs.
+        """
+        t = time.time()
+        while True:
+            count = [0]
+            @inlineCallbacks
+            def _countTypes(txn):
+                for t in workTypes:
+                    work = yield t.all(txn)
+                    count[0] += len(work)
+            yield inTransaction(txnCreator, _countTypes)
+            if count[0] == 0:
+                break
+            if time.time() - t > timeout:
+                returnValue(False)
+            d = Deferred()
+            reactor.callLater(0.1, lambda : d.callback(None))
+            yield d
+        returnValue(True)
+    @classmethod
+    @inlineCallbacks
+    def histogram(cls, txn):
+        """
+        Generate a histogram of work items currently in the queue.
+        """
+        from twext.enterprise.jobs.queue import WorkerConnectionPool
+        results = {}
+        now = datetime.utcnow()
+        for workItemType in cls.workTypes():
+            workType = workItemType.workType()
+            results.setdefault(workType, {
+                "queued": 0,
+                "assigned": 0,
+                "late": 0,
+                "failed": 0,
+                "completed": WorkerConnectionPool.completed.get(workType, 0),
+                "time": WorkerConnectionPool.timing.get(workType, 0.0)
+            })
+        jobs = yield cls.all(txn)
+        for job in jobs:
+            r = results[job.workType]
+            r["queued"] += 1
+            if job.assigned is not None:
+                r["assigned"] += 1
+            if job.assigned is None and job.notBefore < now:
+                r["late"] += 1
+            if job.failed:
+                r["failed"] += 1
+        returnValue(results)
+JobDescriptor = namedtuple("JobDescriptor", ["jobID", "weight", "type"])
+class JobDescriptorArg(Argument):
+    """
+    Comma-separated representation of an L{JobDescriptor} for AMP-serialization.
+    """
+    def toString(self, inObject):
+        return ",".join(map(str, inObject))
+    def fromString(self, inString):
+        return JobDescriptor(*[f(s) for f, s in zip((int, int, str,), inString.split(","))])

Added: twext/trunk/twext/enterprise/jobs/queue.py
--- twext/trunk/twext/enterprise/jobs/queue.py	                        (rev 0)
+++ twext/trunk/twext/enterprise/jobs/queue.py	2015-08-07 20:06:34 UTC (rev 15035)
@@ -0,0 +1,844 @@
+# -*- test-case-name: twext.enterprise.test.test_queue -*-
+# Copyright (c) 2012-2015 Apple Inc. All rights reserved.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twext.enterprise.ienterprise import IQueuer
+from twext.enterprise.jobs.jobitem import JobDescriptorArg, JobItem, \
+    JobFailedError
+from twext.enterprise.jobs.workitem import WORK_WEIGHT_CAPACITY, \
+from twext.python.log import Logger
+from twisted.application.service import MultiService
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, passthru, succeed
+from twisted.internet.error import AlreadyCalled, AlreadyCancelled
+from twisted.internet.protocol import Factory
+from twisted.protocols.amp import AMP, Command
+from zope.interface import implements
+from zope.interface.interface import Interface
+from datetime import datetime
+import collections
+import time
+log = Logger()
+This module imports the core of a distributed work queue. A "controller" process
+will poll the job queue for work and dispatch it to a suitable "worker" process
+based on the load of each worker. Controller and worker communicate with each
+other via AMP. Work is submitted to the job queue by the workers or the
+controller by storing an L{JobItem}/L{WorkItem} in the database.
+The key classes in this module are:
+In the master process:
+    L{ControllerQueue} - the L{Service} that the controller process has to run to process jobs.
+    L{ConnectionFromWorker} - an AMP protocol handler for connections from a worker process.
+    L{WorkerConnectionPool} - the list of all worker connections the controller knows about.
+In the worker process:
+    L{ConnectionFromController} - an AMP protocol handler for connections from a controller process.
+    L{WorkerFactory} - factory for the worker process to use to initiate connections to the controller.
+class _IJobPerformer(Interface):
+    """
+    An object that can perform work.
+    Internal interface; implemented by several classes here since work has to
+    (in the worst case) pass from worker->controller->controller->worker.
+    """
+    def performJob(job):  # @NoSelf
+        """
+        @param job: Details about the job to perform.
+        @type job: L{JobDescriptor}
+        @return: a L{Deferred} firing with an empty dictionary when the work is
+            complete.
+        @rtype: L{Deferred} firing L{dict}
+        """
+class PerformJob(Command):
+    """
+    Notify a worker that it must do a job that has been persisted to
+    the database, by informing it of the job ID.
+    """
+    arguments = [
+        ("job", JobDescriptorArg()),
+    ]
+    response = []
+class EnqueuedJob(Command):
+    """
+    Notify the controller process that a worker enqueued some work. This is used to "wake up"
+    the controller if it has slowed its polling loop due to it being idle.
+    """
+    arguments = []
+    response = []
+class WorkerConnectionPool(object):
+    """
+    A pool of L{ConnectionFromWorker}s. This represents the set of worker processes
+    that the controller process can dispatch work to. It tracks each L{ConnectionFromWorker},
+    reporting on the overall load, and allows for dispatching of work to the lowest load
+    worker.
+    """
+    implements(_IJobPerformer)
+    completed = collections.defaultdict(int)
+    timing = collections.defaultdict(float)
+    def __init__(self, maximumLoadPerWorker=WORK_WEIGHT_CAPACITY):
+        self.workers = []
+        self.maximumLoadPerWorker = maximumLoadPerWorker
+    def addWorker(self, worker):
+        """
+        Add a L{ConnectionFromWorker} to this L{WorkerConnectionPool} so that
+        it can be selected.
+        """
+        self.workers.append(worker)
+    def removeWorker(self, worker):
+        """
+        Remove a L{ConnectionFromWorker} from this L{WorkerConnectionPool} that
+        was previously added.
+        """
+        self.workers.remove(worker)
+    def hasAvailableCapacity(self):
+        """
+        Does this worker connection pool have any local workers who have spare
+        hasAvailableCapacity to process another queue item?
+        """
+        for worker in self.workers:
+            if worker.currentLoad < self.maximumLoadPerWorker:
+                return True
+        return False
+    def loadLevel(self):
+        """
+        Return the overall load of this worker connection pool have as a percentage of
+        total capacity.
+        @return: current load percentage.
+        @rtype: L{int}
+        """
+        current = sum(worker.currentLoad for worker in self.workers)
+        total = len(self.workers) * self.maximumLoadPerWorker
+        return ((current * 100) / total) if total else 100
+    def eachWorkerLoad(self):
+        """
+        The load of all currently connected workers.
+        """
+        return [(worker.currentAssigned, worker.currentLoad, worker.totalCompleted) for worker in self.workers]
+    def allWorkerLoad(self):
+        """
+        The total load of all currently connected workers.
+        """
+        return sum(worker.currentLoad for worker in self.workers)
+    def _selectLowestLoadWorker(self):
+        """
+        Select the local connection with the lowest current load, or C{None} if
+        all workers are too busy.
+        @return: a worker connection with the lowest current load.
+        @rtype: L{ConnectionFromWorker}
+        """
+        return sorted(self.workers[:], key=lambda w: w.currentLoad)[0]
+    @inlineCallbacks
+    def performJob(self, job):
+        """
+        Select a local worker that is idle enough to perform the given job,
+        then ask them to perform it.
+        @param job: The details of the given job.
+        @type job: L{JobDescriptor}
+        @return: a L{Deferred} firing with an empty dictionary when the work is
+            complete.
+        @rtype: L{Deferred} firing L{dict}
+        """
+        t = time.time()
+        preferredWorker = self._selectLowestLoadWorker()
+        try:
+            result = yield preferredWorker.performJob(job)
+        finally:
+            self.completed[job.type] += 1
+            self.timing[job.type] += time.time() - t
+        returnValue(result)
+class ConnectionFromWorker(AMP):
+    """
+    An individual connection from a worker, as seen from the controller's
+    perspective.  L{ConnectionFromWorker}s go into a L{WorkerConnectionPool}.
+    """
+    def __init__(self, controllerQueue, boxReceiver=None, locator=None):
+        super(ConnectionFromWorker, self).__init__(boxReceiver, locator)
+        self.controllerQueue = controllerQueue
+        self._assigned = 0
+        self._load = 0
+        self._completed = 0
+    @property
+    def currentAssigned(self):
+        """
+        How many jobs currently assigned to this worker?
+        """
+        return self._assigned
+    @property
+    def currentLoad(self):
+        """
+        What is the current load of this worker?
+        """
+        return self._load
+    @property
+    def totalCompleted(self):
+        """
+        What is the current load of this worker?
+        """
+        return self._completed
+    def startReceivingBoxes(self, sender):
+        """
+        Start receiving AMP boxes.  Initialize all necessary state.
+        """
+        result = super(ConnectionFromWorker, self).startReceivingBoxes(sender)
+        self.controllerQueue.workerPool.addWorker(self)
+        return result
+    def stopReceivingBoxes(self, reason):
+        """
+        AMP boxes will no longer be received.
+        """
+        result = super(ConnectionFromWorker, self).stopReceivingBoxes(reason)
+        self.controllerQueue.workerPool.removeWorker(self)
+        return result
+    def performJob(self, job):
+        """
+        Dispatch a job to this worker.
+        @see: The responder for this should always be
+            L{ConnectionFromController.executeJobHere}.
+        """
+        d = self.callRemote(PerformJob, job=job)
+        self._assigned += 1
+        self._load += job.weight
+        @d.addBoth
+        def f(result):
+            self._assigned -= 1
+            self._load -= job.weight
+            self._completed += 1
+            return result
+        return d
+    @EnqueuedJob.responder
+    def enqueuedJob(self):
+        """
+        A worker enqueued a job and is letting us know. We need to "ping" the
+        L{ControllerQueue} to ensure it is polling the job queue at its
+        normal "fast" rate, as opposed to slower idle rates.
+        """
+        self.controllerQueue.enqueuedJob()
+        return {}
+class ConnectionFromController(AMP):
+    """
+    A L{ConnectionFromController} is the connection to a controller
+    process, in a worker process.  It processes requests from its own
+    controller to do work.  It is the opposite end of the connection from
+    L{ConnectionFromWorker}.
+    """
+    implements(IQueuer)
+    def __init__(self, transactionFactory, whenConnected,
+                 boxReceiver=None, locator=None):
+        super(ConnectionFromController, self).__init__(boxReceiver, locator)
+        self._txnFactory = transactionFactory
+        self.whenConnected = whenConnected
+        from twisted.internet import reactor
+        self.reactor = reactor
+    def transactionFactory(self, *args, **kwargs):
+        txn = self._txnFactory(*args, **kwargs)
+        txn._queuer = self
+        return txn
+    def startReceivingBoxes(self, sender):
+        super(ConnectionFromController, self).startReceivingBoxes(sender)
+        self.whenConnected(self)
+    @inlineCallbacks
+    def enqueueWork(self, txn, workItemType, **kw):
+        """
+        There is some work to do.  Do it, ideally someplace else, ideally in
+        parallel.
+        @param workItemType: The type of work item to be enqueued.
+        @type workItemType: A subtype of L{WorkItem}
+        @param kw: The parameters to construct a work item.
+        @type kw: keyword parameters to C{workItemType.makeJob}
+        """
+        work = yield workItemType.makeJob(txn, **kw)
+        self.callRemote(EnqueuedJob)
+        returnValue(work)
+    @PerformJob.responder
+    def executeJobHere(self, job):
+        """
+        This is where it's time to actually do the job.  The controller
+        process has instructed this worker to do it; so, look up the data in
+        the row, and do it.
+        """
+        d = JobItem.ultimatelyPerform(self.transactionFactory, job.jobID)
+        d.addCallback(lambda ignored: {})
+        return d
+class WorkerFactory(Factory, object):
+    """
+    Factory, to be used as the client to connect from the worker to the
+    controller.
+    """
+    def __init__(self, transactionFactory, whenConnected):
+        """
+        Create a L{WorkerFactory} with a transaction factory and a schema.
+        """
+        self.transactionFactory = transactionFactory
+        self.whenConnected = whenConnected
+    def buildProtocol(self, addr):
+        """
+        Create a L{ConnectionFromController} connected to the
+        transactionFactory and store.
+        """
+        return ConnectionFromController(
+            self.transactionFactory, self.whenConnected
+        )
+class _BaseQueuer(object):
+    implements(IQueuer)
+    def __init__(self):
+        super(_BaseQueuer, self).__init__()
+        self.proposalCallbacks = set()
+    @inlineCallbacks
+    def enqueueWork(self, txn, workItemType, **kw):
+        """
+        There is some work to do.  Do it, someplace else, ideally in parallel.
+        @param workItemType: The type of work item to be enqueued.
+        @type workItemType: A subtype of L{WorkItem}
+        @param kw: The parameters to construct a work item.
+        @type kw: keyword parameters to C{workItemType.makeJob}
+        """
+        work = yield workItemType.makeJob(txn, **kw)
+        self.enqueuedJob()
+        returnValue(work)
+    def enqueuedJob(self):
+        """
+        Work has been enqueued
+        """
+        pass
+class ControllerQueue(_BaseQueuer, MultiService, object):
+    """
+    Each controller has a L{ControllerQueue} that polls the database
+    for work and dispatches to a worker.
+    @ivar queuePollInterval: The amount of time between database
+        pings, i.e. checks for over-due queue items that might have been
+        orphaned by a controller process that died mid-transaction.
+    @type queuePollInterval: L{float} (in seconds)
+    @ivar queueOverdueTimeout: The amount of time after a L{WorkItem} is
+        scheduled to be processed (its C{notBefore} attribute) that it is
+        considered to be "orphaned" and will be run by a lost-work check rather
+        than waiting for it to be requested.  By default, 10 minutes.
+    @type queueOverdueTimeout: L{float} (in seconds)
+    @ivar queuePollingBackoff: Defines the thresholds for queue polling
+        back-off.
+    @type queuePollingBackoff: L{tuple}
+    @ivar overloadLevel: The load level above which job dispatch will stop.
+    @type overloadLevel: L{int}
+    @ivar highPriorityLevel: The load level above which only high priority
+        jobs will be dispatched.
+    @type highPriorityLevel: L{int}
+    @ivar mediumPriorityLevel: The load level above which low priority
+        jobs will not be dispatched.
+    @type mediumPriorityLevel: L{int}
+    @ivar reactor: The reactor used for scheduling timed events.
+    @type reactor: L{IReactorTime} provider.
+    """
+    implements(IQueuer)
+    queuePollInterval = 0.1             # How often to poll for new work
+    queueOverdueTimeout = 5.0 * 60.0    # How long before assigned work is possibly overdue
+    queuePollingBackoff = ((60.0, 60.0), (5.0, 1.0),)   # Polling backoffs
+    overloadLevel = 95          # Percentage load level above which job queue processing stops
+    highPriorityLevel = 80      # Percentage load level above which only high priority jobs are processed
+    mediumPriorityLevel = 50    # Percentage load level above which high and medium priority jobs are processed
+    def __init__(self, reactor, transactionFactory, useWorkerPool=True, disableWorkProcessing=False):
+        """
+        Initialize a L{ControllerQueue}.
+        @param transactionFactory: a 0- or 1-argument callable that produces an
+            L{IAsyncTransaction}
+        @param useWorkerPool:  Whether to use a worker pool to manage load
+            or instead take on all work ourselves (e.g. in single process mode)
+        """
+        super(ControllerQueue, self).__init__()
+        self.reactor = reactor
+        self.transactionFactory = transactionFactory
+        self.workerPool = WorkerConnectionPool() if useWorkerPool else None
+        self.disableWorkProcessing = disableWorkProcessing
+        self._lastMinPriority = WORK_PRIORITY_LOW
+        self._timeOfLastWork = time.time()
+        self._actualPollInterval = self.queuePollInterval
+        self._inWorkCheck = False
+    def enable(self):
+        """
+        Turn on work queue processing.
+        """
+        self.disableWorkProcessing = False
+    def disable(self):
+        """
+        Turn off work queue processing.
+        """
+        self.disableWorkProcessing = True
+    def totalLoad(self):
+        return self.workerPool.allWorkerLoad() if self.workerPool else 0
+    def workerListenerFactory(self):
+        """
+        Factory that listens for connections from workers.
+        """
+        f = Factory()
+        f.buildProtocol = lambda addr: ConnectionFromWorker(self)
+        return f
+    def choosePerformer(self, onlyLocally=False):
+        """
+        Choose a worker to distribute work to based on the current known load
+        of each worker.  Also, if no workers are available, work will be
+        submitted locally.
+        @return: the chosen performer.
+        @rtype: L{_IJobPerformer} or L{WorkerConnectionPool}
+        """
+        if self.workerPool:
+            if self.workerPool.hasAvailableCapacity():
+                return self.workerPool
+            raise JobFailedError("No capacity for work")
+        return LocalPerformer(self.transactionFactory)
+    @inlineCallbacks
+    def _workCheck(self):
+        """
+        Every controller will periodically check for any new work to do, and dispatch
+        as much as possible given the current load.
+        """
+        loopCounter = 0
+        while True:
+            if not self.running or self.disableWorkProcessing:
+                returnValue(None)
+            # Check the overall service load - if overloaded skip this poll cycle.
+            # If no workerPool, set level to 0, taking on all work.
+            level = 0 if self.workerPool is None else self.workerPool.loadLevel()
+            # Check overload level first
+            if level > self.overloadLevel:
+                if self._lastMinPriority != WORK_PRIORITY_HIGH + 1:
+                    log.error("workCheck: jobqueue is overloaded")
+                self._lastMinPriority = WORK_PRIORITY_HIGH + 1
+                self._timeOfLastWork = time.time()
+                break
+            elif level > self.highPriorityLevel:
+                minPriority = WORK_PRIORITY_HIGH
+            elif level > self.mediumPriorityLevel:
+                minPriority = WORK_PRIORITY_MEDIUM
+            else:
+                minPriority = WORK_PRIORITY_LOW
+            if self._lastMinPriority != minPriority:
+                log.debug(
+                    "workCheck: jobqueue priority limit change: {limit}",
+                    limit=minPriority,
+                )
+                if self._lastMinPriority == WORK_PRIORITY_HIGH + 1:
+                    log.error("workCheck: jobqueue is no longer overloaded")
+            self._lastMinPriority = minPriority
+            # Determine what the timestamp cutoff
+            # TODO: here is where we should iterate over the unlocked items
+            # that are due, ordered by priority, notBefore etc
+            nowTime = datetime.utcfromtimestamp(self.reactor.seconds())
+            self._inWorkCheck = True
+            txn = nextJob = None
+            try:
+                txn = self.transactionFactory(label="jobqueue.workCheck")
+                nextJob = yield JobItem.nextjob(txn, nowTime, minPriority)
+                if nextJob is None:
+                    break
+                if nextJob.assigned is not None:
+                    if nextJob.overdue > nowTime:
+                        # If it is now assigned but not overdue, ignore as this may have
+                        # been returned after another txn just assigned it
+                        continue
+                    else:
+                        # It is overdue - check to see whether the work item is currently locked - if so no
+                        # need to re-assign
+                        running = yield nextJob.isRunning()
+                        if running:
+                            # Change the overdue to further in the future whilst we wait for
+                            # the running job to complete
+                            yield nextJob.bumpOverdue(self.queueOverdueTimeout)
+                            log.debug(
+                                "workCheck: bumped overdue timeout on jobid={jobid}",
+                                jobid=nextJob.jobID,
+                            )
+                            continue
+                        else:
+                            log.debug(
+                                "workCheck: overdue re-assignment for jobid={jobid}",
+                                jobid=nextJob.jobID,
+                            )
+                # Always assign as a new job even when it is an orphan
+                yield nextJob.assign(nowTime, self.queueOverdueTimeout)
+                self._timeOfLastWork = time.time()
+                loopCounter += 1
+            except Exception as e:
+                log.error(
+                    "Failed to pick a new job: {jobID}, {exc}",
+                    jobID=nextJob.jobID if nextJob else "?",
+                    exc=e,
+                )
+                if txn is not None:
+                    yield txn.abort()
+                    txn = None
+                # If we can identify the problem job, try and set it to failed so that it
+                # won't block other jobs behind it (it will be picked again when the failure
+                # interval is exceeded - but that has a back off so a permanently stuck item
+                # should fade away. We probably want to have some additional logic to simply
+                # remove something that is permanently failing.
+                if nextJob is not None:
+                    txn = self.transactionFactory(label="jobqueue.workCheck.failed")
+                    try:
+                        failedJob = yield JobItem.load(txn, nextJob.jobID)
+                        yield failedJob.failedToRun()
+                    except Exception as e:
+                        # Could not mark as failed - break out of the next job loop
+                        log.error(
+                            "Failed to mark failed new job:{}, {exc}",
+                            jobID=nextJob.jobID,
+                            exc=e,
+                        )
+                        yield txn.abort()
+                        txn = None
+                        nextJob = None
+                        break
+                    else:
+                        # Marked the problem one as failed, so keep going and get the next job
+                        log.error("Marked failed new job: {jobID}", jobID=nextJob.jobID)
+                        yield txn.commit()
+                        txn = None
+                        nextJob = None
+                else:
+                    # Cannot mark anything as failed - break out of next job loop
+                    log.error("Cannot mark failed new job")
+                    break
+            finally:
+                if txn is not None:
+                    yield txn.commit()
+                    txn = None
+                self._inWorkCheck = False
+            if nextJob is not None:
+                try:
+                    worker = self.choosePerformer(onlyLocally=True)
+                    # Send the job over but DO NOT block on the response - that will ensure
+                    # we can do stuff in parallel
+                    worker.performJob(nextJob.descriptor())
+                except Exception as e:
+                    log.error("Failed to perform job for jobid={jobid}, {exc}", jobid=nextJob.jobID, exc=e)
+        if loopCounter:
+            log.debug("workCheck: processed {ctr} jobs in one loop", ctr=loopCounter)
+    _currentWorkDeferred = None
+    _workCheckCall = None
+    def _workCheckLoop(self):
+        """
+        While the service is running, keep checking for any overdue / lost work
+        items and re-submit them to the cluster for processing.
+        """
+        self._workCheckCall = None
+        if not self.running:
+            return
+        @passthru(
+            self._workCheck().addErrback(lambda result: log.error("_workCheckLoop: {exc}", exc=result)).addCallback
+        )
+        def scheduleNext(result):
+            self._currentWorkDeferred = None
+            if not self.running:
+                return
+            # Check for adjustment to poll interval - if the workCheck is idle for certain
+            # periods of time we will gradually increase the poll interval to avoid consuming
+            # excessive power when there is nothing to do
+            interval = self.queuePollInterval
+            idle = time.time() - self._timeOfLastWork
+            for threshold, poll in self.queuePollingBackoff:
+                if idle > threshold:
+                    interval = poll
+                    break
+            if self._actualPollInterval != interval:
+                log.debug("workCheckLoop: interval set to {interval}s", interval=interval)
+            self._actualPollInterval = interval
+            self._workCheckCall = self.reactor.callLater(
+                self._actualPollInterval, self._workCheckLoop
+            )
+        self._currentWorkDeferred = scheduleNext
+    def enqueuedJob(self):
+        """
+        Reschedule the work check loop to run right now. This should be called in response to "external" activity that
+        might want to "speed up" the job queue polling because new work may have been added.
+        """
+        # Only need to do this if the actual poll interval is greater than the default rapid value
+        if self._actualPollInterval == self.queuePollInterval:
+            return
+        # Bump time of last work so that we go back to the rapid (default) polling interval
+        self._timeOfLastWork = time.time()
+        # Reschedule the outstanding delayed call (handle exceptions by ignoring if its already running or
+        # just finished)
+        try:
+            if self._workCheckCall is not None:
+                self._workCheckCall.reset(0)
+        except (AlreadyCalled, AlreadyCancelled):
+            pass
+    def startService(self):
+        """
+        Register ourselves with the database and establish all outgoing
+        connections to other servers in the cluster.
+        """
+        super(ControllerQueue, self).startService()
+        self._workCheckLoop()
+    @inlineCallbacks
+    def stopService(self):
+        """
+        Stop this service, terminating any incoming or outgoing connections.
+        """
+        yield super(ControllerQueue, self).stopService()
+        if self._workCheckCall is not None:
+            self._workCheckCall.cancel()
+            self._workCheckCall = None
+        if self._currentWorkDeferred is not None:
+            self._currentWorkDeferred.cancel()
+            self._currentWorkDeferred = None
+        # Wait for any active work check to finish (but no more than 1 minute)
+        start = time.time()
+        while self._inWorkCheck:
+            d = Deferred()
+            self.reactor.callLater(0.5, lambda : d.callback(None))
+            yield d
+            if time.time() - start >= 60:
+                break
+class LocalPerformer(object):
+    """
+    Implementor of C{performJob} that does its work in the local process,
+    regardless of other conditions.
+    """
+    implements(_IJobPerformer)
+    def __init__(self, txnFactory):
+        """
+        Create this L{LocalPerformer} with a transaction factory.
+        """
+        self.txnFactory = txnFactory
+    def performJob(self, job):
+        """
+        Perform the given job right now.
+        """
+        return JobItem.ultimatelyPerform(self.txnFactory, job.jobID)
+class LocalQueuer(_BaseQueuer):
+    """
+    When work is enqueued with this queuer, it is just executed locally.
+    """
+    implements(IQueuer)
+    def __init__(self, txnFactory, reactor=None):
+        super(LocalQueuer, self).__init__()
+        self.txnFactory = txnFactory
+        if reactor is None:
+            from twisted.internet import reactor
+        self.reactor = reactor
+    def choosePerformer(self):
+        """
+        Choose to perform the work locally.
+        """
+        return LocalPerformer(self.txnFactory)
+class NonPerformer(object):
+    """
+    Implementor of C{performJob} that doesn't actual perform any work.  This
+    is used in the case where you want to be able to enqueue work for someone
+    else to do, but not take on any work yourself (such as a command line
+    tool).
+    """
+    implements(_IJobPerformer)
+    def performJob(self, job):
+        """
+        Don't perform job.
+        """
+        return succeed(None)
+class NonPerformingQueuer(_BaseQueuer):
+    """
+    When work is enqueued with this queuer, it is never executed locally.
+    It's expected that the polling machinery will find the work and perform it.
+    """
+    implements(IQueuer)
+    def __init__(self, reactor=None):
+        super(NonPerformingQueuer, self).__init__()
+        if reactor is None:
+            from twisted.internet import reactor
+        self.reactor = reactor
+    def choosePerformer(self):
+        """
+        Choose to perform the work locally.
+        """
+        return NonPerformer()

Added: twext/trunk/twext/enterprise/jobs/test/__init__.py
--- twext/trunk/twext/enterprise/jobs/test/__init__.py	                        (rev 0)
+++ twext/trunk/twext/enterprise/jobs/test/__init__.py	2015-08-07 20:06:34 UTC (rev 15035)
@@ -0,0 +1,15 @@
+# Copyright (c) 2015 Apple Inc. All rights reserved.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.

Added: twext/trunk/twext/enterprise/jobs/test/test_jobs.py
--- twext/trunk/twext/enterprise/jobs/test/test_jobs.py	                        (rev 0)
+++ twext/trunk/twext/enterprise/jobs/test/test_jobs.py	2015-08-07 20:06:34 UTC (rev 15035)
@@ -0,0 +1,1821 @@
+# Copyright (c) 2012-2015 Apple Inc. All rights reserved.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+Tests for L{twext.enterprise.job.queue}.
+import datetime
+from zope.interface.verify import verifyObject
+from twisted.internet import reactor
+from twisted.trial.unittest import TestCase, SkipTest
+from twisted.test.proto_helpers import StringTransport, MemoryReactor
+from twisted.internet.defer import \
+    Deferred, inlineCallbacks, gatherResults, passthru, returnValue, succeed, \
+    CancelledError
+from twisted.internet.task import Clock as _Clock
+from twisted.protocols.amp import Command, AMP, Integer
+from twisted.application.service import Service, MultiService
+from twext.enterprise.dal.syntax import SchemaSyntax, Delete
+from twext.enterprise.dal.parseschema import splitSQLString
+from twext.enterprise.dal.record import fromTable
+from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
+from twext.enterprise.fixtures import buildConnectionPool
+from twext.enterprise.fixtures import SteppablePoolHelper
+from twext.enterprise.jobs.utils import inTransaction, astimestamp
+from twext.enterprise.jobs.workitem import \
+    WorkItem, SingletonWorkItem, \
+from twext.enterprise.jobs.jobitem import \
+    JobItem, JobDescriptor, JobFailedError, JobTemporaryError
+from twext.enterprise.jobs.queue import \
+    WorkerConnectionPool, ControllerQueue, \
+    LocalPerformer, _IJobPerformer, \
+    NonPerformingQueuer
+# TODO: There should be a store-building utility within twext.enterprise.
+    from txdav.common.datastore.test.util import buildStore
+except ImportError:
+    def buildStore(*args, **kwargs):
+        raise SkipTest(
+            "buildStore is not available, because it's in txdav; duh."
+        )
+class Clock(_Clock):
+    """
+    More careful L{IReactorTime} fake which mimics the exception behavior of
+    the real reactor.
+    """
+    def callLater(self, _seconds, _f, *args, **kw):
+        if _seconds < 0:
+            raise ValueError("%s<0: " % (_seconds,))
+        return super(Clock, self).callLater(_seconds, _f, *args, **kw)
+    @inlineCallbacks
+    def advanceCompletely(self, amount):
+        """
+        Move time on this clock forward by the given amount and run whatever
+        pending calls should be run. Always complete the deferred calls before
+        returning.
+        @type amount: C{float}
+        @param amount: The number of seconds which to advance this clock's
+        time.
+        """
+        self.rightNow += amount
+        self._sortCalls()
+        while self.calls and self.calls[0].getTime() <= self.seconds():
+            call = self.calls.pop(0)
+            call.called = 1
+            yield call.func(*call.args, **call.kw)
+            self._sortCalls()
+class MemoryReactorWithClock(MemoryReactor, Clock):
+    """
+    Simulate a real reactor.
+    """
+    def __init__(self):
+        MemoryReactor.__init__(self)
+        Clock.__init__(self)
+        self._sortCalls()
+def transactionally(transactionCreator):
+    """
+    Perform the decorated function immediately in a transaction, replacing its
+    name with a L{Deferred}.
+    Use like so::
+        @transactionally(connectionPool.connection)
+        @inlineCallbacks
+        def it(txn):
+            yield txn.doSomething()
+        it.addCallback(firedWhenDone)
+    @param transactionCreator: A 0-arg callable that returns an
+        L{IAsyncTransaction}.
+    """
+    def thunk(operation):
+        return inTransaction(transactionCreator, operation)
+    return thunk
+class UtilityTests(TestCase):
+    """
+    Tests for supporting utilities.
+    """
+    def test_inTransactionSuccess(self):
+        """
+        L{inTransaction} invokes its C{transactionCreator} argument, and then
+        returns a L{Deferred} which fires with the result of its C{operation}
+        argument when it succeeds.
+        """
+        class faketxn(object):
+            def __init__(self):
+                self.commits = []
+                self.aborts = []
+            def commit(self):
+                self.commits.append(Deferred())
+                return self.commits[-1]
+            def abort(self):
+                self.aborts.append(Deferred())
+                return self.aborts[-1]
+        createdTxns = []
+        def createTxn(label):
+            createdTxns.append(faketxn())
+            return createdTxns[-1]
+        dfrs = []
+        def operation(t):
+            self.assertIdentical(t, createdTxns[-1])
+            dfrs.append(Deferred())
+            return dfrs[-1]
+        d = inTransaction(createTxn, operation)
+        x = []
+        d.addCallback(x.append)
+        self.assertEquals(x, [])
+        self.assertEquals(len(dfrs), 1)
+        dfrs[0].callback(35)
+        # Commit in progress, so still no result...
+        self.assertEquals(x, [])
+        createdTxns[0].commits[0].callback(42)
+        # Committed, everything's done.
+        self.assertEquals(x, [35])
+class SimpleSchemaHelper(SchemaTestHelper):
+    def id(self):
+        return "worker"
+SQL = passthru
+nodeSchema = SQL(
+    """
+    create table NODE_INFO (
+      HOSTNAME varchar(255) not null,
+      PID integer not null,
+      PORT integer not null,
+      TIME timestamp default current_timestamp not null,
+      primary key (HOSTNAME, PORT)
+    );
+    """
+jobSchema = SQL(
+    """
+    create table JOB (
+      JOB_ID      integer primary key default 1,
+      WORK_TYPE   varchar(255) not null,
+      PRIORITY    integer default 0,
+      WEIGHT      integer default 0,
+      NOT_BEFORE  timestamp not null,
+      ASSIGNED    timestamp default null,
+      OVERDUE     timestamp default null,
+      FAILED      integer default 0,
+      PAUSE       integer default 0
+    );
+    """
+schemaText = SQL(
+    """
+    create table DUMMY_WORK_ITEM (
+      WORK_ID integer primary key,
+      JOB_ID integer references JOB,
+      A integer, B integer,
+      DELETE_ON_LOAD integer default 0
+    );
+    create table DUMMY_WORK_SINGLETON_ITEM (
+      WORK_ID integer primary key,
+      JOB_ID integer references JOB,
+      A integer, B integer,
+      DELETE_ON_LOAD integer default 0
+    );
+    create table DUMMY_WORK_PAUSE_ITEM (
+      WORK_ID integer primary key,
+      JOB_ID integer references JOB,
+      A integer, B integer,
+      DELETE_ON_LOAD integer default 0
+    );
+    create table AGGREGATOR_WORK_ITEM (
+      WORK_ID integer primary key,
+      JOB_ID integer references JOB,
+      A integer, B integer,
+      DELETE_ON_LOAD integer default 0
+    );
+    """
+    schema = SchemaSyntax(SimpleSchemaHelper().schemaFromString(jobSchema + schemaText))
+    dropSQL = [
+        "drop table {name} cascade".format(name=table)
+        for table in (
+            "DUMMY_WORK_ITEM",
+            "DUMMY_WORK_PAUSE_ITEM",
+        )
+    ] + ["delete from job"]
+except SkipTest as e:
+    DummyWorkItemTable = object
+    DummyWorkSingletonItemTable = object
+    DummyWorkPauseItemTable = object
+    AggregatorWorkItemTable = object
+    skip = e
+    DummyWorkItemTable = fromTable(schema.DUMMY_WORK_ITEM)
+    DummyWorkSingletonItemTable = fromTable(schema.DUMMY_WORK_SINGLETON_ITEM)
+    DummyWorkPauseItemTable = fromTable(schema.DUMMY_WORK_PAUSE_ITEM)
+    AggregatorWorkItemTable = fromTable(schema.AGGREGATOR_WORK_ITEM)
+    skip = False
+class DummyWorkItem(WorkItem, DummyWorkItemTable):
+    """
+    Sample L{WorkItem} subclass that adds two integers together and stores them
+    in another table.
+    """
+    results = {}
+    def doWork(self):
+        if self.a == -1:
+            raise ValueError("Ooops")
+        elif self.a == -2:
+            raise JobTemporaryError(120)
+        self.results[self.jobID] = self.a + self.b
+        return succeed(None)
+    @classmethod
+    @inlineCallbacks
+    def loadForJob(cls, txn, *a):
+        """
+        Load L{DummyWorkItem} as normal...  unless the loaded item has
+        C{DELETE_ON_LOAD} set, in which case, do a deletion of this same row in
+        a concurrent transaction, then commit it.
+        """
+        workItems = yield super(DummyWorkItem, cls).loadForJob(txn, *a)
+        if len(workItems) and workItems[0].deleteOnLoad:
+            otherTransaction = txn.store().newTransaction()
+            otherSelf = yield super(DummyWorkItem, cls).loadForJob(txn, *a)
+            yield otherSelf[0].delete()
+            yield otherTransaction.commit()
+        returnValue(workItems)
+class DummyWorkSingletonItem(SingletonWorkItem, DummyWorkSingletonItemTable):
+    """
+    Sample L{SingletonWorkItem} subclass that adds two integers together and stores them
+    in another table.
+    """
+    results = {}
+    def doWork(self):
+        if self.a == -1:
+            raise ValueError("Ooops")
+        self.results[self.jobID] = self.a + self.b
+        return succeed(None)
+class DummyWorkPauseItem(WorkItem, DummyWorkPauseItemTable):
+    """
+    Sample L{WorkItem} subclass that pauses until a Deferred is fired.
+    """
+    workStarted = None
+    unpauseWork = None
+    def doWork(self):
+        self.workStarted.callback(None)
+        return self.unpauseWork
+class AggregatorWorkItem(WorkItem, AggregatorWorkItemTable):
+    """
+    Sample L{WorkItem} subclass that deletes others with the same
+    value and than pauses for a bit.
+    """
+    group = property(lambda self: (self.table.B == self.b))
+    @inlineCallbacks
+    def doWork(self):
+        # Delete the work items we match
+        yield Delete(
+            From=self.table,
+            Where=(self.table.A == self.a)
+        ).on(self.transaction)
+        d = Deferred()
+        reactor.callLater(2.0, lambda: d.callback(None))
+        yield d
+class AMPTests(TestCase):
+    """
+    Tests for L{AMP} faithfully relaying ids across the wire.
+    """
+    def test_sendTableWithName(self):
+        """
+        You can send a reference to a table through a L{SchemaAMP} via
+        L{TableSyntaxByName}.
+        """
+        client = AMP()
+        class SampleCommand(Command):
+            arguments = [("id", Integer())]
+        class Receiver(AMP):
+            @SampleCommand.responder
+            def gotIt(self, id):
+                self.it = id
+                return {}
+        server = Receiver()
+        clientT = StringTransport()
+        serverT = StringTransport()
+        client.makeConnection(clientT)
+        server.makeConnection(serverT)
+        client.callRemote(SampleCommand, id=123)
+        server.dataReceived(clientT.io.getvalue())
+        self.assertEqual(server.it, 123)
+class WorkItemTests(TestCase):
+    """
+    A L{WorkItem} is an item of work that can be executed.
+    """
+    def test_forTableName(self):
+        """
+        L{WorkItem.forTable} returns L{WorkItem} subclasses mapped to the given
+        table.
+        """
+        self.assertIdentical(
+            JobItem.workItemForType(schema.DUMMY_WORK_ITEM.model.name), DummyWorkItem
+        )
+    @inlineCallbacks
+    def _enqueue(self, dbpool, a, b, notBefore=None, priority=None, weight=None, cl=DummyWorkItem):
+        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+        if notBefore is None:
+            notBefore = datetime.datetime(2012, 12, 13, 12, 12, 0)
+        sinceEpoch = astimestamp(fakeNow)
+        clock = Clock()
+        clock.advance(sinceEpoch)
+        qpool = ControllerQueue(clock, dbpool.connection, useWorkerPool=False)
+        realChoosePerformer = qpool.choosePerformer
+        performerChosen = []
+        def catchPerformerChoice():
+            result = realChoosePerformer()
+            performerChosen.append(True)
+            return result
+        qpool.choosePerformer = catchPerformerChoice
+        @transactionally(dbpool.connection)
+        def check(txn):
+            return qpool.enqueueWork(
+                txn, cl,
+                a=a, b=b, priority=priority, weight=weight,
+                notBefore=notBefore
+            )
+        yield check
+        returnValue(qpool)
+    @inlineCallbacks
+    def test_enqueue(self):
+        """
+        L{ControllerQueue.enqueueWork} will insert a job and a work item.
+        """
+        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        yield self._enqueue(dbpool, 1, 2)
+        # Make sure we have one JOB and one DUMMY_WORK_ITEM
+        @transactionally(dbpool.connection)
+        def checkJob(txn):
+            return JobItem.all(txn)
+        jobs = yield checkJob
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(jobs[0].workType == "DUMMY_WORK_ITEM")
+        self.assertTrue(jobs[0].assigned is None)
+        @transactionally(dbpool.connection)
+        def checkWork(txn):
+            return DummyWorkItem.all(txn)
+        work = yield checkWork
+        self.assertTrue(len(work) == 1)
+        self.assertTrue(work[0].jobID == jobs[0].jobID)
+    @inlineCallbacks
+    def test_assign(self):
+        """
+        L{JobItem.assign} will mark a job as assigned.
+        """
+        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        yield self._enqueue(dbpool, 1, 2)
+        # Make sure we have one JOB and one DUMMY_WORK_ITEM
+        def checkJob(txn):
+            return JobItem.all(txn)
+        jobs = yield inTransaction(dbpool.connection, checkJob)
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(jobs[0].assigned is None)
+        @inlineCallbacks
+        def assignJob(txn):
+            job = yield JobItem.load(txn, jobs[0].jobID)
+            yield job.assign(datetime.datetime.utcnow(), ControllerQueue.queueOverdueTimeout)
+        yield inTransaction(dbpool.connection, assignJob)
+        jobs = yield inTransaction(dbpool.connection, checkJob)
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(jobs[0].assigned is not None)
+    @inlineCallbacks
+    def test_nextjob(self):
+        """
+        L{JobItem.nextjob} returns the correct job based on priority.
+        """
+        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        now = datetime.datetime.utcnow()
+        # Empty job queue
+        @inlineCallbacks
+        def _next(txn, priority=WORK_PRIORITY_LOW):
+            job = yield JobItem.nextjob(txn, now, priority)
+            if job is not None:
+                work = yield job.workItem()
+            else:
+                work = None
+            returnValue((job, work))
+        job, work = yield inTransaction(dbpool.connection, _next)
+        self.assertTrue(job is None)
+        self.assertTrue(work is None)
+        # Unassigned job with future notBefore not returned
+        yield self._enqueue(dbpool, 1, 1, now + datetime.timedelta(days=1))
+        job, work = yield inTransaction(dbpool.connection, _next)
+        self.assertTrue(job is None)
+        self.assertTrue(work is None)
+        # Unassigned job with past notBefore returned
+        yield self._enqueue(dbpool, 2, 1, now + datetime.timedelta(days=-1), priority=WORK_PRIORITY_HIGH)
+        job, work = yield inTransaction(dbpool.connection, _next)
+        self.assertTrue(job is not None)
+        self.assertTrue(work.a == 2)
+        assignID = job.jobID
+        # Assigned job with past notBefore not returned
+        @inlineCallbacks
+        def assignJob(txn, when=None):
+            assignee = yield JobItem.load(txn, assignID)
+            yield assignee.assign(now if when is None else when, ControllerQueue.queueOverdueTimeout)
+        yield inTransaction(dbpool.connection, assignJob)
+        job, work = yield inTransaction(dbpool.connection, _next)
+        self.assertTrue(job is None)
+        self.assertTrue(work is None)
+        # Unassigned, paused job with past notBefore not returned
+        yield self._enqueue(dbpool, 3, 1, now + datetime.timedelta(days=-1), priority=WORK_PRIORITY_HIGH)
+        @inlineCallbacks
+        def pauseJob(txn, pause=True):
+            works = yield DummyWorkItem.all(txn)
+            for work in works:
+                if work.a == 3:
+                    job = yield JobItem.load(txn, work.jobID)
+                    yield job.pauseIt(pause)
+        yield inTransaction(dbpool.connection, pauseJob)
+        job, work = yield inTransaction(dbpool.connection, _next)
+        self.assertTrue(job is None)
+        self.assertTrue(work is None)
+        # Unassigned, paused then unpaused job with past notBefore is returned
+        yield inTransaction(dbpool.connection, pauseJob, pause=False)
+        job, work = yield inTransaction(dbpool.connection, _next)
+        self.assertTrue(job is not None)
+        self.assertTrue(work.a == 3)
+        @inlineCallbacks
+        def deleteJob(txn, jobID):
+            job = yield JobItem.load(txn, jobID)
+            yield job.delete()
+        yield inTransaction(dbpool.connection, deleteJob, jobID=job.jobID)
+        # Unassigned low priority job with past notBefore not returned if high priority required
+        yield self._enqueue(dbpool, 4, 1, now + datetime.timedelta(days=-1))
+        job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_HIGH)
+        self.assertTrue(job is None)
+        self.assertTrue(work is None)
+        # Unassigned low priority job with past notBefore not returned if medium priority required
+        yield self._enqueue(dbpool, 5, 1, now + datetime.timedelta(days=-1))
+        job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_MEDIUM)
+        self.assertTrue(job is None)
+        self.assertTrue(work is None)
+        # Assigned job with past notBefore, but overdue is returned
+        yield inTransaction(dbpool.connection, assignJob, when=now + datetime.timedelta(days=-1))
+        job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_HIGH)
+        self.assertTrue(job is not None)
+        self.assertTrue(work.a == 2)
+    @inlineCallbacks
+    def test_notsingleton(self):
+        """
+        L{ControllerQueue.enqueueWork} will insert a job and a work item.
+        """
+        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        yield self._enqueue(dbpool, 1, 2, cl=DummyWorkItem)
+        def allJobs(txn):
+            return DummyWorkItem.all(txn)
+        jobs = yield inTransaction(dbpool.connection, allJobs)
+        self.assertTrue(len(jobs) == 1)
+        yield self._enqueue(dbpool, 2, 3)
+        jobs = yield inTransaction(dbpool.connection, allJobs)
+        self.assertTrue(len(jobs) == 2)
+    @inlineCallbacks
+    def test_singleton(self):
+        """
+        L{ControllerQueue.enqueueWork} will insert a job and a work item.
+        """
+        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        yield self._enqueue(dbpool, 1, 2, cl=DummyWorkSingletonItem)
+        def allJobs(txn):
+            return DummyWorkSingletonItem.all(txn)
+        jobs = yield inTransaction(dbpool.connection, allJobs)
+        self.assertTrue(len(jobs) == 1)
+        yield self._enqueue(dbpool, 2, 3, cl=DummyWorkSingletonItem)
+        jobs = yield inTransaction(dbpool.connection, allJobs)
+        self.assertTrue(len(jobs) == 1)
+    @inlineCallbacks
+    def test_singleton_reschedule(self):
+        """
+        L{ControllerQueue.enqueueWork} will insert a job and a work item.
+        """
+        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        qpool = yield self._enqueue(dbpool, 1, 2, cl=DummyWorkSingletonItem, notBefore=datetime.datetime(2014, 5, 17, 12, 0, 0))
+        @inlineCallbacks
+        def allWork(txn):
+            jobs = yield JobItem.all(txn)
+            work = [((yield job.workItem()), job) for job in jobs]
+            returnValue(filter(lambda x: x[0], work))
+        work = yield inTransaction(dbpool.connection, allWork)
+        self.assertTrue(len(work) == 1)
+        self.assertTrue(work[0][1].notBefore == datetime.datetime(2014, 5, 17, 12, 0, 0))
+        def _reschedule_force(txn, force):
+            txn._queuer = qpool
+            return DummyWorkSingletonItem.reschedule(txn, 60, force=force)
+        yield inTransaction(dbpool.connection, _reschedule_force, force=False)
+        work = yield inTransaction(dbpool.connection, allWork)
+        self.assertTrue(len(work) == 1)
+        self.assertTrue(work[0][1].notBefore == datetime.datetime(2014, 5, 17, 12, 0, 0))
+        yield inTransaction(dbpool.connection, _reschedule_force, force=True)
+        work = yield inTransaction(dbpool.connection, allWork)
+        self.assertTrue(len(work) == 1)
+        self.assertTrue(work[0][1].notBefore != datetime.datetime(2014, 5, 17, 12, 0, 0))
+class WorkerConnectionPoolTests(TestCase):
+    """
+    A L{WorkerConnectionPool} is responsible for managing, in a node's
+    controller (master) process, the collection of worker (slave) processes
+    that are capable of executing queue work.
+    """
+class ControllerQueueUnitTests(TestCase):
+    """
+    L{ControllerQueue} has many internal components.
+    """
+    def setUp(self):
+        """
+        Create a L{ControllerQueue} that is just initialized enough.
+        """
+        self.pcp = ControllerQueue(None, None)
+        DummyWorkItem.results = {}
+    def checkPerformer(self, cls):
+        """
+        Verify that the performer returned by
+        L{ControllerQueue.choosePerformer}.
+        """
+        performer = self.pcp.choosePerformer()
+        self.failUnlessIsInstance(performer, cls)
+        verifyObject(_IJobPerformer, performer)
+    def _setupPools(self):
+        """
+        Setup pool and reactor clock for time stepped tests.
+        """
+        reactor = MemoryReactorWithClock()
+        cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
+        then = datetime.datetime(2012, 12, 12, 12, 12, 12)
+        reactor.advance(astimestamp(then))
+        cph.setUp(self)
+        qpool = ControllerQueue(reactor, cph.pool.connection, useWorkerPool=False)
+        realChoosePerformer = qpool.choosePerformer
+        performerChosen = []
+        def catchPerformerChoice(onlyLocally=False):
+            result = realChoosePerformer(onlyLocally=onlyLocally)
+            performerChosen.append(True)
+            return result
+        qpool.choosePerformer = catchPerformerChoice
+        reactor.callLater(0, qpool._workCheck)
+        qpool.startService()
+        cph.flushHolders()
+        return cph, qpool, reactor, performerChosen
+    def test_choosingPerformerWhenNoPeersAndNoWorkers(self):
+        """
+        If L{ControllerQueue.choosePerformer} is invoked when no workers
+        have spawned and no peers have established connections (either incoming
+        or outgoing), then it chooses an implementation of C{performJob} that
+        simply executes the work locally.
+        """
+        # If we're using worker pool, this should raise
+        try:
+            self.pcp.choosePerformer()
+        except JobFailedError:
+            pass
+        else:
+            self.fail("Didn't raise JobFailedError")
+        # If we're not using worker pool, we should get back LocalPerformer
+        self.pcp = ControllerQueue(None, None, useWorkerPool=False)
+        self.checkPerformer(LocalPerformer)
+    def test_choosingPerformerWithLocalCapacity(self):
+        """
+        If L{ControllerQueue.choosePerformer} is invoked when some workers
+        have spawned, then it should choose the worker pool as the local
+        performer.
+        """
+        # Give it some local capacity.
+        # In this case we want pcp to have a workerPool, so create a new pcp
+        # for this test
+        self.pcp = ControllerQueue(None, None)
+        wlf = self.pcp.workerListenerFactory()
+        proto = wlf.buildProtocol(None)
+        proto.makeConnection(StringTransport())
+        # Sanity check.
+        self.assertEqual(len(self.pcp.workerPool.workers), 1)
+        self.assertEqual(self.pcp.workerPool.hasAvailableCapacity(), True)
+        # Now it has some capacity.
+        self.checkPerformer(WorkerConnectionPool)
+    @inlineCallbacks
+    def test_notBeforeWhenCheckingForWork(self):
+        """
+        L{ControllerQueue._workCheck} should execute any
+        outstanding work items, but only those that are expired.
+        """
+        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
+        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+        # Let's create a couple of work items directly, not via the enqueue
+        # method, so that they exist but nobody will try to immediately execute
+        # them.
+        @transactionally(dbpool.pool.connection)
+        @inlineCallbacks
+        def setup(txn):
+            # First, one that's right now.
+            yield DummyWorkItem.makeJob(txn, a=1, b=2, notBefore=fakeNow)
+            # Next, create one that's actually far enough into the past to run.
+            yield DummyWorkItem.makeJob(
+                txn, a=3, b=4, notBefore=(
+                    # Schedule it in the past so that it should have already
+                    # run.
+                    fakeNow - datetime.timedelta(seconds=20)
+                )
+            )
+            # Finally, one that's actually scheduled for the future.
+            yield DummyWorkItem.makeJob(
+                txn, a=10, b=20, notBefore=fakeNow + datetime.timedelta(1000)
+            )
+        yield setup
+        # Wait for job
+        while len(DummyWorkItem.results) != 2:
+            clock.advance(1)
+        # Work item complete
+        self.assertTrue(DummyWorkItem.results == {1: 3, 2: 7})
+    @inlineCallbacks
+    def test_notBeforeWhenEnqueueing(self):
+        """
+        L{ControllerQueue.enqueueWork} enqueues some work immediately, but
+        only executes it when enough time has elapsed to allow the C{notBefore}
+        attribute of the given work item to have passed.
+        """
+        dbpool, qpool, clock, performerChosen = self._setupPools()
+        @transactionally(dbpool.pool.connection)
+        def check(txn):
+            return qpool.enqueueWork(
+                txn, DummyWorkItem, a=3, b=9,
+                notBefore=datetime.datetime(2012, 12, 12, 12, 12, 20)
+            )
+        yield check
+        # This is going to schedule the work to happen with some asynchronous
+        # I/O in the middle; this is a problem because how do we know when it's
+        # time to check to see if the work has started?  We need to intercept
+        # the thing that kicks off the work; we can then wait for the work
+        # itself.
+        self.assertEquals(performerChosen, [])
+        # Advance to exactly the appointed second.
+        clock.advance(20 - 12)
+        self.assertEquals(performerChosen, [True])
+        # Wait for job
+        while (yield inTransaction(dbpool.pool.connection, lambda txn: JobItem.all(txn))):
+            clock.advance(1)
+        # Work item complete
+        self.assertTrue(DummyWorkItem.results == {1: 12})
+    @inlineCallbacks
+    def test_notBeforeBefore(self):
+        """
+        L{ControllerQueue.enqueueWork} will execute its work immediately if
+        the C{notBefore} attribute of the work item in question is in the past.
+        """
+        dbpool, qpool, clock, performerChosen = self._setupPools()
+        @transactionally(dbpool.pool.connection)
+        def check(txn):
+            return qpool.enqueueWork(
+                txn, DummyWorkItem, a=3, b=9,
+                notBefore=datetime.datetime(2012, 12, 12, 12, 12, 0)
+            )
+        yield check
+        clock.advance(1000)
+        # Advance far beyond the given timestamp.
+        self.assertEquals(performerChosen, [True])
+        # Wait for job
+        while (yield inTransaction(dbpool.pool.connection, lambda txn: JobItem.all(txn))):
+            clock.advance(1)
+        # Work item complete
+        self.assertTrue(DummyWorkItem.results == {1: 12})
+    def test_workerConnectionPoolPerformJob(self):
+        """
+        L{WorkerConnectionPool.performJob} performs work by selecting a
+        L{ConnectionFromWorker} and sending it a L{PerformJOB} command.
+        """
+        clock = Clock()
+        peerPool = ControllerQueue(clock, None)
+        factory = peerPool.workerListenerFactory()
+        def peer():
+            p = factory.buildProtocol(None)
+            t = StringTransport()
+            p.makeConnection(t)
+            return p, t
+        worker1, _ignore_trans1 = peer()
+        worker2, _ignore_trans2 = peer()
+        # Ask the worker to do something.
+        worker1.performJob(JobDescriptor(1, 1, "ABC"))
+        self.assertEquals(worker1.currentLoad, 1)
+        self.assertEquals(worker2.currentLoad, 0)
+        # Now ask the pool to do something
+        peerPool.workerPool.performJob(JobDescriptor(2, 1, "ABC"))
+        self.assertEquals(worker1.currentLoad, 1)
+        self.assertEquals(worker2.currentLoad, 1)
+    def test_poolStartServiceChecksForWork(self):
+        """
+        L{ControllerQueue.startService} kicks off the idle work-check loop.
+        """
+        reactor = MemoryReactorWithClock()
+        cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
+        then = datetime.datetime(2012, 12, 12, 12, 12, 0)
+        reactor.advance(astimestamp(then))
+        cph.setUp(self)
+        pcp = ControllerQueue(reactor, cph.pool.connection, useWorkerPool=False)
+        now = then + datetime.timedelta(seconds=20)
+        @transactionally(cph.pool.connection)
+        def createOldWork(txn):
+            one = DummyWorkItem.makeJob(txn, jobID=1, workID=1, a=3, b=4, notBefore=then)
+            two = DummyWorkItem.makeJob(txn, jobID=2, workID=2, a=7, b=9, notBefore=now)
+            return gatherResults([one, two])
+        pcp.startService()
+        cph.flushHolders()
+        reactor.advance(19)
+        self.assertEquals(
+            DummyWorkItem.results,
+            {1: 7}
+        )
+        reactor.advance(20)
+        self.assertEquals(
+            DummyWorkItem.results,
+            {1: 7, 2: 16}
+        )
+    @inlineCallbacks
+    def test_exceptionWhenWorking(self):
+        """
+        L{ControllerQueue._workCheck} should execute any
+        outstanding work items, and keep going if some raise an exception.
+        """
+        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
+        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+        # Let's create a couple of work items directly, not via the enqueue
+        # method, so that they exist but nobody will try to immediately execute
+        # them.
+        @transactionally(dbpool.pool.connection)
+        @inlineCallbacks
+        def setup(txn):
+            # OK
+            yield DummyWorkItem.makeJob(
+                txn, a=1, b=0, notBefore=fakeNow - datetime.timedelta(20 * 60)
+            )
+            # Error
+            yield DummyWorkItem.makeJob(
+                txn, a=-1, b=1, notBefore=fakeNow - datetime.timedelta(20 * 60)
+            )
+            # OK
+            yield DummyWorkItem.makeJob(
+                txn, a=2, b=0, notBefore=fakeNow - datetime.timedelta(20 * 60)
+            )
+        yield setup
+        clock.advance(20 - 12)
+        # Work item complete
+        self.assertTrue(DummyWorkItem.results == {1: 1, 3: 2})
+    @inlineCallbacks
+    def test_exceptionUnassign(self):
+        """
+        When a work item fails it should appear as unassigned in the JOB
+        table and have the failure count bumped, and a notBefore one minute ahead.
+        """
+        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
+        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+        # Let's create a couple of work items directly, not via the enqueue
+        # method, so that they exist but nobody will try to immediately execute
+        # them.
+        @transactionally(dbpool.pool.connection)
+        @inlineCallbacks
+        def setup(txn):
+            # Next, create failing work that's actually far enough into the past to run.
+            yield DummyWorkItem.makeJob(
+                txn, a=-1, b=1, notBefore=fakeNow - datetime.timedelta(20 * 60)
+            )
+        yield setup
+        clock.advance(20 - 12)
+        @transactionally(dbpool.pool.connection)
+        def check(txn):
+            return JobItem.all(txn)
+        jobs = yield check
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(jobs[0].assigned is None)
+        self.assertTrue(jobs[0].failed == 1)
+        self.assertTrue(jobs[0].notBefore > datetime.datetime.utcnow())
+    @inlineCallbacks
+    def test_temporaryFailure(self):
+        """
+        When a work item temporarily fails it should appear as unassigned in the JOB
+        table and have the failure count bumped, and a notBefore set to the temporary delay.
+        """
+        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
+        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+        # Let's create a couple of work items directly, not via the enqueue
+        # method, so that they exist but nobody will try to immediately execute
+        # them.
+        @transactionally(dbpool.pool.connection)
+        @inlineCallbacks
+        def setup(txn):
+            # Next, create failing work that's actually far enough into the past to run.
+            yield DummyWorkItem.makeJob(
+                txn, a=-2, b=1, notBefore=fakeNow - datetime.timedelta(20 * 60)
+            )
+        yield setup
+        clock.advance(20 - 12)
+        @transactionally(dbpool.pool.connection)
+        def check(txn):
+            return JobItem.all(txn)
+        jobs = yield check
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(jobs[0].assigned is None)
+        self.assertTrue(jobs[0].failed == 1)
+        self.assertTrue(jobs[0].notBefore > datetime.datetime.utcnow() + datetime.timedelta(seconds=90))
+    @inlineCallbacks
+    def test_loopFailure_noRecovery(self):
+        """
+        When L{_workCheck} fails in its loop we need the problem job marked as failed.
+        """
+        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
+        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+        oldNextJob = JobItem.nextjob
+        @inlineCallbacks
+        def _nextJob(cls, txn, now, minPriority):
+            job = yield oldNextJob(txn, now, minPriority)
+            work = yield job.workItem()
+            if work.a == -2:
+                raise ValueError("oops")
+        self.patch(JobItem, "nextjob", classmethod(_nextJob))
+        # Let's create a couple of work items directly, not via the enqueue
+        # method, so that they exist but nobody will try to immediately execute
+        # them.
+        @transactionally(dbpool.pool.connection)
+        @inlineCallbacks
+        def setup(txn):
+            # Failing
+            yield DummyWorkItem.makeJob(
+                txn, a=-2, b=1, notBefore=fakeNow - datetime.timedelta(20 * 60)
+            )
+            # OK
+            yield DummyWorkItem.makeJob(
+                txn, a=1, b=0, notBefore=fakeNow - datetime.timedelta(20 * 60, 5)
+            )
+        yield setup
+        clock.advance(20 - 12)
+        @transactionally(dbpool.pool.connection)
+        def check(txn):
+            return JobItem.all(txn)
+        jobs = yield check
+        self.assertEqual(len(jobs), 2)
+        self.assertEqual(jobs[0].assigned, None)
+        self.assertEqual(jobs[0].failed, 0)
+        self.assertEqual(jobs[0].notBefore, fakeNow - datetime.timedelta(20 * 60))
+        self.assertEqual(jobs[1].assigned, None)
+        self.assertEqual(jobs[1].failed, 0)
+        self.assertEqual(jobs[1].notBefore, fakeNow - datetime.timedelta(20 * 60, 5))
+    @inlineCallbacks
+    def test_loopFailure_recovery(self):
+        """
+        When L{_workCheck} fails in its loop we need the problem job marked as failed.
+        """
+        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
+        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+        oldAssign = JobItem.assign
+        @inlineCallbacks
+        def _assign(self, when, overdue):
+            work = yield self.workItem()
+            if work.a == -2:
+                raise ValueError("oops")
+            yield oldAssign(self, when, overdue)
+        self.patch(JobItem, "assign", _assign)
+        # Let's create a couple of work items directly, not via the enqueue
+        # method, so that they exist but nobody will try to immediately execute
+        # them.
+        @transactionally(dbpool.pool.connection)
+        @inlineCallbacks
+        def setup(txn):
+            # Failing
+            yield DummyWorkItem.makeJob(
+                txn, a=-2, b=1, notBefore=fakeNow - datetime.timedelta(20 * 60)
+            )
+            # OK
+            yield DummyWorkItem.makeJob(
+                txn, a=1, b=0, notBefore=fakeNow - datetime.timedelta(20 * 60, 5)
+            )
+        yield setup
+        clock.advance(20 - 12)
+        @transactionally(dbpool.pool.connection)
+        def check(txn):
+            return JobItem.all(txn)
+        jobs = yield check
+        self.assertEqual(len(jobs), 1)
+        self.assertEqual(jobs[0].assigned, None)
+        self.assertEqual(jobs[0].failed, 1)
+        self.assertGreater(jobs[0].notBefore, datetime.datetime.utcnow() + datetime.timedelta(seconds=30))
+    @inlineCallbacks
+    def test_loopFailure_failedRecovery(self):
+        """
+        When L{_workCheck} fails in its loop we need the problem job marked as failed.
+        """
+        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
+        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+        oldAssign = JobItem.assign
+        @inlineCallbacks
+        def _assign(self, when, overdue):
+            work = yield self.workItem()
+            if work.a == -2:
+                raise ValueError("oops")
+            yield oldAssign(self, when, overdue)
+        self.patch(JobItem, "assign", _assign)
+        @inlineCallbacks
+        def _failedToRun(self, locked=False, delay=None):
+            raise ValueError("oops")
+        self.patch(JobItem, "failedToRun", _failedToRun)
+        # Let's create a couple of work items directly, not via the enqueue
+        # method, so that they exist but nobody will try to immediately execute
+        # them.
+        @transactionally(dbpool.pool.connection)
+        @inlineCallbacks
+        def setup(txn):
+            # Failing
+            yield DummyWorkItem.makeJob(
+                txn, a=-2, b=1, notBefore=fakeNow - datetime.timedelta(20 * 60)
+            )
+            # OK
+            yield DummyWorkItem.makeJob(
+                txn, a=1, b=0, notBefore=fakeNow - datetime.timedelta(20 * 60, 5)
+            )
+        yield setup
+        clock.advance(20 - 12)
+        @transactionally(dbpool.pool.connection)
+        def check(txn):
+            return JobItem.all(txn)
+        jobs = yield check
+        self.assertEqual(len(jobs), 2)
+        self.assertEqual(jobs[0].assigned, None)
+        self.assertEqual(jobs[0].failed, 0)
+        self.assertEqual(jobs[0].notBefore, fakeNow - datetime.timedelta(20 * 60))
+        self.assertEqual(jobs[1].assigned, None)
+        self.assertEqual(jobs[1].failed, 0)
+        self.assertEqual(jobs[1].notBefore, fakeNow - datetime.timedelta(20 * 60, 5))
+    @inlineCallbacks
+    def test_enableDisable(self):
+        """
+        L{ControllerQueue.enable} and L{ControllerQueue.disable} control queue processing.
+        """
+        dbpool, qpool, clock, performerChosen = self._setupPools()
+        # Disable processing
+        qpool.disable()
+        @transactionally(dbpool.pool.connection)
+        def check(txn):
+            return qpool.enqueueWork(
+                txn, DummyWorkItem, a=3, b=9,
+                notBefore=datetime.datetime(2012, 12, 12, 12, 12, 0)
+            )
+        yield check
+        # Advance far beyond the given timestamp.
+        clock.advance(1000)
+        self.assertEquals(performerChosen, [])
+        # Enable processing
+        qpool.enable()
+        clock.advance(1000)
+        self.assertEquals(performerChosen, [True])
+        # Wait for job
+        while (yield inTransaction(dbpool.pool.connection, lambda txn: JobItem.all(txn))):
+            clock.advance(1)
+        # Work item complete
+        self.assertTrue(DummyWorkItem.results == {1: 12})
+class HalfConnection(object):
+    def __init__(self, protocol):
+        self.protocol = protocol
+        self.transport = StringTransport()
+    def start(self):
+        """
+        Hook up the protocol and the transport.
+        """
+        self.protocol.makeConnection(self.transport)
+    def extract(self):
+        """
+        Extract the data currently present in this protocol's output buffer.
+        """
+        io = self.transport.io
+        value = io.getvalue()
+        io.seek(0)
+        io.truncate()
+        return value
+    def deliver(self, data):
+        """
+        Deliver the given data to this L{HalfConnection}'s protocol's
+        C{dataReceived} method.
+        @return: a boolean indicating whether any data was delivered.
+        @rtype: L{bool}
+        """
+        if data:
+            self.protocol.dataReceived(data)
+            return True
+        return False
+class Connection(object):
+    def __init__(self, local, remote):
+        """
+        Connect two protocol instances to each other via string transports.
+        """
+        self.receiver = HalfConnection(local)
+        self.sender = HalfConnection(remote)
+    def start(self):
+        """
+        Start up the connection.
+        """
+        self.sender.start()
+        self.receiver.start()
+    def pump(self):
+        """
+        Relay data in one direction between the two connections.
+        """
+        result = self.receiver.deliver(self.sender.extract())
+        self.receiver, self.sender = self.sender, self.receiver
+        return result
+    def flush(self, turns=10):
+        """
+        Keep relaying data until there's no more.
+        """
+        for _ignore_x in range(turns):
+            if not (self.pump() or self.pump()):
+                return
+class ControllerQueueIntegrationTests(TestCase):
+    """
+    L{ControllerQueue} is the service responsible for coordinating
+    eventually-consistent task queuing within a cluster.
+    """
+    @inlineCallbacks
+    def setUp(self):
+        """
+        L{ControllerQueue} requires access to a database and the reactor.
+        """
+        self.store = yield buildStore(self, None)
+        @inlineCallbacks
+        def doit(txn):
+            for statement in splitSQLString(schemaText):
+                yield txn.execSQL(statement)
+        yield inTransaction(
+            self.store.newTransaction,
+            doit,
+            label="bonus schema"
+        )
+        def indirectedTransactionFactory(*a, **b):
+            """
+            Allow tests to replace "self.store.newTransaction" to provide
+            fixtures with extra methods on a test-by-test basis.
+            """
+            return self.store.newTransaction(*a, **b)
+        def deschema():
+            @inlineCallbacks
+            def deletestuff(txn):
+                for stmt in dropSQL:
+                    yield txn.execSQL(stmt)
+            return inTransaction(
+                lambda *a, **b: self.store.newTransaction(*a, **b), deletestuff
+            )
+        self.addCleanup(deschema)
+        self.node1 = ControllerQueue(
+            reactor, indirectedTransactionFactory, useWorkerPool=False)
+        self.node2 = ControllerQueue(
+            reactor, indirectedTransactionFactory, useWorkerPool=False)
+        class FireMeService(Service, object):
+            def __init__(self, d):
+                super(FireMeService, self).__init__()
+                self.d = d
+            def startService(self):
+                self.d.callback(None)
+        d1 = Deferred()
+        d2 = Deferred()
+        FireMeService(d1).setServiceParent(self.node1)
+        FireMeService(d2).setServiceParent(self.node2)
+        ms = MultiService()
+        self.node1.setServiceParent(ms)
+        self.node2.setServiceParent(ms)
+        ms.startService()
+        @inlineCallbacks
+        def _clean():
+            yield ms.stopService()
+            self.flushLoggedErrors(CancelledError)
+        self.addCleanup(_clean)
+        yield gatherResults([d1, d2])
+        self.store.queuer = self.node1
+        DummyWorkItem.results = {}
+    @inlineCallbacks
+    def test_enqueueWorkDone(self):
+        """
+        When a L{WorkItem} is scheduled for execution via
+        L{ControllerQueue.enqueueWork} its C{doWork} method will be
+        run.
+        """
+        # TODO: this exact test should run against LocalQueuer as well.
+        def operation(txn):
+            # TODO: how does "enqueue" get associated with the transaction?
+            # This is not the fact with a raw t.w.enterprise transaction.
+            # Should probably do something with components.
+            return txn.enqueue(DummyWorkItem, a=3, b=4, jobID=100, workID=1,
+                               notBefore=datetime.datetime.utcnow())
+        yield inTransaction(self.store.newTransaction, operation)
+        # Wait for it to be executed.  Hopefully this does not time out :-\.
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+        self.assertEquals(DummyWorkItem.results, {100: 7})
+    @inlineCallbacks
+    def test_noWorkDoneWhenConcurrentlyDeleted(self):
+        """
+        When a L{WorkItem} is concurrently deleted by another transaction, it
+        should I{not} perform its work.
+        """
+        def operation(txn):
+            return txn.enqueue(
+                DummyWorkItem, a=30, b=40, workID=5678,
+                deleteOnLoad=1,
+                notBefore=datetime.datetime.utcnow()
+            )
+        yield inTransaction(self.store.newTransaction, operation)
+        # Wait for it to be executed.  Hopefully this does not time out :-\.
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+        self.assertEquals(DummyWorkItem.results, {})
+    @inlineCallbacks
+    def test_locked(self):
+        """
+        L{JobItem.run} locks the work item.
+        """
+        DummyWorkPauseItem.workStarted = Deferred()
+        DummyWorkPauseItem.unpauseWork = Deferred()
+        @transactionally(self.store.newTransaction)
+        def _enqueue(txn):
+            return txn.enqueue(
+                DummyWorkPauseItem, a=30, b=40, workID=1
+            )
+        yield _enqueue
+        # Make sure we have one JOB and one DUMMY_WORK_ITEM
+        def checkJob(txn):
+            return JobItem.all(txn)
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 1)
+        yield DummyWorkPauseItem.workStarted
+        @transactionally(self.store.newTransaction)
+        def _trylock(txn):
+            job = yield JobItem.load(txn, jobs[0].jobID)
+            work = yield job.workItem()
+            locked = yield work.trylock()
+            self.assertFalse(locked)
+        yield _trylock
+        DummyWorkPauseItem.unpauseWork.callback(None)
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 0)
+    @inlineCallbacks
+    def test_overdueStillRunning(self):
+        """
+        Make sure an overdue work item that is still running gets its overdue value bumped.
+        """
+        # Patch JobItem.assign and JobItem.bumpOverdue to track how many times
+        # they are called. Also, change the overdue to be one second ahead of assigned.
+        assigned = [0]
+        _oldAssign = JobItem.assign
+        def _newAssign(self, when, overdue):
+            assigned[0] += 1
+            return _oldAssign(self, when, 1)
+        self.patch(JobItem, "assign", _newAssign)
+        bumped = [0]
+        _oldBumped = JobItem.bumpOverdue
+        def _newBump(self, bump):
+            bumped[0] += 1
+            return _oldBumped(self, 100)
+        self.patch(JobItem, "bumpOverdue", _newBump)
+        DummyWorkPauseItem.workStarted = Deferred()
+        DummyWorkPauseItem.unpauseWork = Deferred()
+        @transactionally(self.store.newTransaction)
+        def _enqueue(txn):
+            return txn.enqueue(
+                DummyWorkPauseItem, a=30, b=40, workID=1
+            )
+        yield _enqueue
+        # Make sure we have one JOB and one DUMMY_WORK_ITEM
+        def checkJob(txn):
+            return JobItem.all(txn)
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(assigned[0] == 0)
+        self.assertTrue(bumped[0] == 0)
+        yield DummyWorkPauseItem.workStarted
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(assigned[0] == 1)
+        self.assertTrue(bumped[0] == 0)
+        # Pause long enough that the overdue time is passed, which should result
+        # in the overdue value being bumped
+        d = Deferred()
+        reactor.callLater(2, lambda: d.callback(None))
+        yield d
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(assigned[0] == 1)
+        self.assertTrue(bumped[0] == 1)
+        DummyWorkPauseItem.unpauseWork.callback(None)
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 0)
+        self.assertTrue(assigned[0] == 1)
+        self.assertTrue(bumped[0] == 1)
+    @inlineCallbacks
+    def test_overdueWorkGotLost(self):
+        """
+        Make sure an overdue work item that is not still running gets its overdue value bumped, and
+        eventually executed.
+        """
+        # Patch JobItem.assign and JobItem.bumpOverdue to track how many times
+        # they are called. Also, change the overdue to be one second ahead of assigned.
+        assigned = [0]
+        _oldAssign = JobItem.assign
+        def _newAssign(self, when, overdue):
+            assigned[0] += 1
+            return _oldAssign(self, when, 1)
+        self.patch(JobItem, "assign", _newAssign)
+        bumped = [0]
+        _oldBumped = JobItem.bumpOverdue
+        def _newBump(self, bump):
+            bumped[0] += 1
+            return _oldBumped(self, 5)
+        self.patch(JobItem, "bumpOverdue", _newBump)
+        failed = [0]
+        waitFail = Deferred()
+        def _newFailedToRun(self, locked=False, delay=None):
+            failed[0] += 1
+            waitFail.callback(None)
+            return succeed(None)
+        self.patch(JobItem, "failedToRun", _newFailedToRun)
+        def _newDoWorkRaise(self):
+            self.workStarted.callback(None)
+            raise ValueError()
+        def _newDoWorkSuccess(self):
+            return succeed(None)
+        DummyWorkPauseItem.workStarted = Deferred()
+        self.patch(DummyWorkPauseItem, "doWork", _newDoWorkRaise)
+        @transactionally(self.store.newTransaction)
+        def _enqueue(txn):
+            return txn.enqueue(
+                DummyWorkPauseItem, a=30, b=40, workID=1
+            )
+        yield _enqueue
+        # Make sure we have one JOB and one DUMMY_WORK_ITEM
+        def checkJob(txn):
+            return JobItem.all(txn)
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(assigned[0] == 0)
+        self.assertTrue(bumped[0] == 0)
+        self.assertTrue(failed[0] == 0)
+        # Wait for work to fail once and reset it to succeed next time
+        yield DummyWorkPauseItem.workStarted
+        self.patch(DummyWorkPauseItem, "doWork", _newDoWorkSuccess)
+        yield waitFail
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(assigned[0] == 1)
+        self.assertTrue(bumped[0] == 0)
+        self.assertTrue(failed[0] == 1)
+        # Wait for the overdue to be detected and the work restarted
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 0)
+        self.assertTrue(assigned[0] == 2)
+        self.assertTrue(bumped[0] == 0)
+        self.assertTrue(failed[0] == 1)
+    @inlineCallbacks
+    def test_lowPriorityOverdueWorkNotAssigned(self):
+        """
+        Make sure an overdue work item that is not still running gets its overdue value bumped, and
+        eventually executed.
+        """
+        # Patch the work item to fail once and appear as overdue
+        _oldAssign = JobItem.assign
+        def _newAssign(self, when, overdue):
+            return _oldAssign(self, when, 1)
+        self.patch(JobItem, "assign", _newAssign)
+        failed = [0]
+        waitFail = Deferred()
+        def _newFailedToRun(self, locked=False, delay=None):
+            failed[0] += 1
+            waitFail.callback(None)
+            return succeed(None)
+        self.patch(JobItem, "failedToRun", _newFailedToRun)
+        def _newDoWorkRaise(self):
+            self.workStarted.callback(None)
+            raise ValueError()
+        def _newDoWorkSuccess(self):
+            return succeed(None)
+        DummyWorkPauseItem.workStarted = Deferred()
+        self.patch(DummyWorkPauseItem, "doWork", _newDoWorkRaise)
+        @transactionally(self.store.newTransaction)
+        def _enqueue(txn):
+            return txn.enqueue(
+                DummyWorkPauseItem, a=30, b=40, workID=1
+            )
+        yield _enqueue
+        # Make sure we have one JOB and one DUMMY_WORK_ITEM
+        def checkJob(txn):
+            return JobItem.all(txn)
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(failed[0] == 0)
+        # Wait for work to fail once and reset it to succeed next time
+        yield DummyWorkPauseItem.workStarted
+        self.patch(DummyWorkPauseItem, "doWork", _newDoWorkSuccess)
+        yield waitFail
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(failed[0] == 1)
+        # Try to get the next high priority only job
+        @transactionally(self.store.newTransaction)
+        @inlineCallbacks
+        def _testNone(txn):
+            nowTime = datetime.datetime.utcfromtimestamp(reactor.seconds() + 10)
+            job = yield JobItem.nextjob(txn, nowTime, WORK_PRIORITY_HIGH)
+            self.assertTrue(job is None)
+        yield _testNone
+        # Wait for the overdue to be detected and the work restarted
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 0)
+        self.assertTrue(failed[0] == 1)
+    @inlineCallbacks
+    def test_aggregator_lock(self):
+        """
+        L{JobItem.run} fails an aggregated work item and then ignores it.
+        """
+        # Patch JobItem.failedToRun to track how many times it is called.
+        failed = [0]
+        _oldFailed = JobItem.failedToRun
+        def _newFailed(self, locked=False, delay=None):
+            failed[0] += 1
+            return _oldFailed(self, locked, 5)
+        self.patch(JobItem, "failedToRun", _newFailed)
+        @transactionally(self.store.newTransaction)
+        def _enqueue1(txn):
+            return txn.enqueue(
+                AggregatorWorkItem, a=1, b=1, workID=1
+            )
+        @transactionally(self.store.newTransaction)
+        def _enqueue2(txn):
+            return txn.enqueue(
+                AggregatorWorkItem, a=1, b=2, workID=2
+            )
+        yield _enqueue1
+        yield _enqueue2
+        # Make sure we have one JOB and one DUMMY_WORK_ITEM
+        def checkJob(txn):
+            return JobItem.all(txn)
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 2)
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertEqual(len(jobs), 0)
+        self.assertEqual(failed[0], 1)
+    @inlineCallbacks
+    def test_aggregator_no_deadlock(self):
+        """
+        L{JobItem.run} fails an aggregated work item and then ignores it.
+        """
+        # Patch JobItem.assign and JobItem.bumpOverdue to track how many times
+        # they are called.
+        failed = [0]
+        _oldFailed = JobItem.failedToRun
+        def _newFailed(self, locked=False, delay=None):
+            failed[0] += 1
+            return _oldFailed(self, locked, 5)
+        self.patch(JobItem, "failedToRun", _newFailed)
+        @transactionally(self.store.newTransaction)
+        def _enqueue1(txn):
+            return txn.enqueue(
+                AggregatorWorkItem, a=1, b=1, workID=1
+            )
+        @transactionally(self.store.newTransaction)
+        def _enqueue2(txn):
+            return txn.enqueue(
+                AggregatorWorkItem, a=1, b=1, workID=2
+            )
+        yield _enqueue1
+        yield _enqueue2
+        # Make sure we have one JOB and one DUMMY_WORK_ITEM
+        def checkJob(txn):
+            return JobItem.all(txn)
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 2)
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+        jobs = yield inTransaction(self.store.newTransaction, checkJob)
+        self.assertTrue(len(jobs) == 0)
+        self.assertEqual(failed[0], 1)
+    @inlineCallbacks
+    def test_pollingBackoff(self):
+        """
+        Check that an idle queue backs off its polling and goes back to rapid polling
+        when a worker enqueues a job.
+        """
+        # Speed up the backoff process
+        self.patch(ControllerQueue, "queuePollingBackoff", ((1.0, 60.0),))
+        # Wait for backoff
+        while self.node1._actualPollInterval == self.node1.queuePollInterval:
+            d = Deferred()
+            reactor.callLater(1.0, lambda : d.callback(None))
+            yield d
+        self.assertEqual(self.node1._actualPollInterval, 60.0)
+        # TODO: this exact test should run against LocalQueuer as well.
+        def operation(txn):
+            # TODO: how does "enqueue" get associated with the transaction?
+            # This is not the fact with a raw t.w.enterprise transaction.
+            # Should probably do something with components.
+            return txn.enqueue(DummyWorkItem, a=3, b=4, jobID=100, workID=1,
+                               notBefore=datetime.datetime.utcnow())
+        yield inTransaction(self.store.newTransaction, operation)
+        # Backoff terminated
+        while self.node1._actualPollInterval != self.node1.queuePollInterval:
+            d = Deferred()
+            reactor.callLater(0.1, lambda : d.callback(None))
+            yield d
+        self.assertEqual(self.node1._actualPollInterval, self.node1.queuePollInterval)
+        # Wait for it to be executed.  Hopefully this does not time out :-\.
+        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
+        # Wait for backoff
+        while self.node1._actualPollInterval == self.node1.queuePollInterval:
+            d = Deferred()
+            reactor.callLater(1.0, lambda : d.callback(None))
+            yield d
+        self.assertEqual(self.node1._actualPollInterval, 60.0)
+class NonPerformingQueuerTests(TestCase):
+    @inlineCallbacks
+    def test_choosePerformer(self):
+        queuer = NonPerformingQueuer()
+        performer = queuer.choosePerformer()
+        result = (yield performer.performJob(None))
+        self.assertEquals(result, None)

Added: twext/trunk/twext/enterprise/jobs/utils.py
--- twext/trunk/twext/enterprise/jobs/utils.py	                        (rev 0)
+++ twext/trunk/twext/enterprise/jobs/utils.py	2015-08-07 20:06:34 UTC (rev 15035)
@@ -0,0 +1,57 @@
+# Copyright (c) 2015 Apple Inc. All rights reserved.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.python.failure import Failure
+from datetime import datetime
+ at inlineCallbacks
+def inTransaction(transactionCreator, operation, label="jobqueue.inTransaction", **kwargs):
+    """
+    Perform the given operation in a transaction, committing or aborting as
+    required.
+    @param transactionCreator: a 0-arg callable that returns an
+        L{IAsyncTransaction}
+    @param operation: a 1-arg callable that takes an L{IAsyncTransaction} and
+        returns a value.
+    @param label: label to be used with the transaction.
+    @return: a L{Deferred} that fires with C{operation}'s result or fails with
+        its error, unless there is an error creating, aborting or committing
+        the transaction.
+    """
+    txn = transactionCreator(label=label)
+    try:
+        result = yield operation(txn, **kwargs)
+    except:
+        f = Failure()
+        yield txn.abort()
+        returnValue(f)
+    else:
+        yield txn.commit()
+        returnValue(result)
+def astimestamp(v):
+    """
+    Convert the given datetime to a POSIX timestamp.
+    """
+    return (v - datetime.utcfromtimestamp(0)).total_seconds()

Added: twext/trunk/twext/enterprise/jobs/workitem.py
--- twext/trunk/twext/enterprise/jobs/workitem.py	                        (rev 0)
+++ twext/trunk/twext/enterprise/jobs/workitem.py	2015-08-07 20:06:34 UTC (rev 15035)
@@ -0,0 +1,399 @@
+# Copyright (c) 2012-2015 Apple Inc. All rights reserved.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from datetime import datetime, timedelta
+from twext.enterprise.dal.record import SerializableRecord, NoSuchRecord
+from twext.enterprise.jobs.jobitem import JobItem
+from twext.python.log import Logger
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed
+log = Logger()
+# Priority for work - used to order work items in the job queue
+# Weight for work - used to schedule workers based on capacity
+WORK_WEIGHT_10 = 10
+WORK_WEIGHT_CAPACITY = 10   # Total amount of work any one worker can manage
+class WorkItem(SerializableRecord):
+    """
+    A L{WorkItem} is an item of work which may be stored in a database, then
+    executed later.
+    L{WorkItem} is an abstract class, since it is a L{Record} with no table
+    associated via L{fromTable}.  Concrete subclasses must associate a specific
+    table by inheriting like so::
+        class MyWorkItem(WorkItem, fromTable(schema.MY_TABLE)):
+    Concrete L{WorkItem}s should generally not be created directly; they are
+    both created and thereby implicitly scheduled to be executed by calling
+    L{enqueueWork <twext.enterprise.ienterprise.IQueuer.enqueueWork>} with the
+    appropriate L{WorkItem} concrete subclass.  There are different queue
+    implementations (L{ControllerQueue} and L{LocalQueuer}, for example), so
+    the exact timing and location of the work execution may differ.
+    L{WorkItem}s may be constrained in the ordering and timing of their
+    execution, to control concurrency and for performance reasons respectively.
+    Although all the usual database mutual-exclusion rules apply to work
+    executed in L{WorkItem.doWork}, implicit database row locking is not always
+    the best way to manage concurrency.  They have some problems, including:
+        - implicit locks are easy to accidentally acquire out of order, which
+          can lead to deadlocks
+        - implicit locks are easy to forget to acquire correctly - for example,
+          any read operation which subsequently turns into a write operation
+          must have been acquired with C{Select(..., ForUpdate=True)}, but it
+          is difficult to consistently indicate that methods which abstract out
+          read operations must pass this flag in certain cases and not others.
+        - implicit locks are held until the transaction ends, which means that
+          if expensive (long-running) queue operations share the same lock with
+          cheap (short-running) queue operations or user interactions, the
+          cheap operations all have to wait for the expensive ones to complete,
+          but continue to consume whatever database resources they were using.
+    In order to ameliorate these problems with potentially concurrent work
+    that uses the same resources, L{WorkItem} provides a database-wide mutex
+    that is automatically acquired at the beginning of the transaction and
+    released at the end.  To use it, simply L{align
+    <twext.enterprise.dal.record.Record.namingConvention>} the C{group}
+    attribute on your L{WorkItem} subclass with a column holding a string
+    (varchar).  L{WorkItem} subclasses with the same value for C{group} will
+    not execute their C{doWork} methods concurrently.  Furthermore, if the lock
+    cannot be quickly acquired, database resources associated with the
+    transaction attempting it will be released, and the transaction rolled back
+    until a future transaction I{can} can acquire it quickly.  If you do not
+    want any limits to concurrency, simply leave it set to C{None}.
+    In some applications it's possible to coalesce work together; to grab
+    multiple L{WorkItem}s in one C{doWork} transaction.  All you need to do is
+    to delete the rows which back other L{WorkItem}s from the database, and
+    they won't be processed.  Using the C{group} attribute, you can easily
+    prevent concurrency so that you can easily group these items together and
+    remove them as a set (otherwise, other workers might be attempting to
+    concurrently work on them and you'll get deletion errors).
+    However, if doing more work at once is less expensive, and you want to
+    avoid processing lots of individual rows in tiny transactions, you may also
+    delay the execution of a L{WorkItem} by setting its C{notBefore} attribute.
+    This must be backed by a database timestamp, so that processes which happen
+    to be restarting and examining the work to be done in the database don't
+    jump the gun and do it too early.
+    @cvar workID: the unique identifier (primary key) for items of this type.
+        On an instance of a concrete L{WorkItem} subclass, this attribute must
+        be an integer; on the concrete L{WorkItem} subclass itself, this
+        attribute must be a L{twext.enterprise.dal.syntax.ColumnSyntax}.  Note
+        that this is automatically taken care of if you simply have a
+        corresponding C{work_id} column in the associated L{fromTable} on your
+        L{WorkItem} subclass.  This column must be unique, and it must be an
+        integer.  In almost all cases, this column really ought to be filled
+        out by a database-defined sequence; if not, you need some other
+        mechanism for establishing a cluster-wide sequence.
+    @type workID: L{int} on instance,
+        L{twext.enterprise.dal.syntax.ColumnSyntax} on class.
+    @cvar notBefore: the timestamp before which this item should I{not} be
+        processed.  If unspecified, this should be the date and time of the
+        creation of the L{WorkItem}.
+    @type notBefore: L{datetime.datetime} on instance,
+        L{twext.enterprise.dal.syntax.ColumnSyntax} on class.
+    @ivar group: If not C{None}, a unique-to-the-database identifier for which
+        only one L{WorkItem} will execute at a time.
+    @type group: L{unicode} or L{NoneType}
+    """
+    group = None
+    default_priority = WORK_PRIORITY_LOW    # Default - subclasses should override
+    default_weight = WORK_WEIGHT_5          # Default - subclasses should override
+    _tableNameMap = {}
+    @classmethod
+    def workType(cls):
+        return cls.table.model.name
+    @classmethod
+    @inlineCallbacks
+    def makeJob(cls, transaction, **kwargs):
+        """
+        A new work item needs to be created. First we create a Job record, then
+        we create the actual work item related to the job.
+        @param transaction: the transaction to use
+        @type transaction: L{IAsyncTransaction}
+        """
+        jobargs = {
+            "workType": cls.workType()
+        }
+        def _transferArg(name):
+            arg = kwargs.pop(name, None)
+            if arg is not None:
+                jobargs[name] = arg
+            elif hasattr(cls, "default_{}".format(name)):
+                jobargs[name] = getattr(cls, "default_{}".format(name))
+        _transferArg("jobID")
+        _transferArg("priority")
+        _transferArg("weight")
+        _transferArg("notBefore")
+        _transferArg("pause")
+        # Always need a notBefore
+        if "notBefore" not in jobargs:
+            jobargs["notBefore"] = datetime.utcnow()
+        job = yield JobItem.create(transaction, **jobargs)
+        kwargs["jobID"] = job.jobID
+        work = yield cls.create(transaction, **kwargs)
+        work.__dict__["job"] = job
+        returnValue(work)
+    @classmethod
+    @inlineCallbacks
+    def loadForJob(cls, txn, jobID):
+        workItems = yield cls.query(txn, (cls.jobID == jobID))
+        returnValue(workItems)
+    @inlineCallbacks
+    def runlock(self):
+        """
+        Used to lock an L{WorkItem} before it is run. The L{WorkItem}'s row MUST be
+        locked via SELECT FOR UPDATE to ensure the job queue knows it is being worked
+        on so that it can detect when an overdue job needs to be restarted or not.
+        Note that the locking used here may cause deadlocks if not done in the correct
+        order. In particular anything that might cause locks across multiple LWorkItem}s,
+        such as group locks, multi-row locks, etc, MUST be done first.
+        @return: an L{Deferred} that fires with L{True} if the L{WorkItem} was locked,
+            L{False} if not.
+        @rtype: L{Deferred}
+        """
+        # Do the group lock first since this can impact multiple rows and thus could
+        # cause deadlocks if done in the wrong order
+        # Row level lock on this item
+        locked = yield self.trylock(self.group)
+        returnValue(locked)
+    @inlineCallbacks
+    def beforeWork(self):
+        """
+        A hook that gets called before the L{WorkItem} does its real work. This can be used
+        for common behaviors need by work items. The base implementation handles the group
+        locking behavior.
+        @return: an L{Deferred} that fires with L{True} if processing of the L{WorkItem}
+            should continue, L{False} if it should be skipped without error.
+        @rtype: L{Deferred}
+        """
+        try:
+            # Work item is deleted before doing work - but someone else may have
+            # done it whilst we waited on the lock so handle that by simply
+            # ignoring the work
+            yield self.delete()
+        except NoSuchRecord:
+            # The record has already been removed
+            returnValue(False)
+        else:
+            returnValue(True)
+    def doWork(self):
+        """
+        Subclasses must implement this to actually perform the queued work.
+        This method will be invoked in a worker process.
+        This method does I{not} need to delete the row referencing it; that
+        will be taken care of by the job queuing machinery.
+        """
+        raise NotImplementedError
+    def afterWork(self):
+        """
+        A hook that gets called after the L{WorkItem} does its real work. This can be used
+        for common clean-up behaviors. The base implementation does nothing.
+        """
+        return succeed(None)
+    @inlineCallbacks
+    def remove(self):
+        """
+        Remove this L{WorkItem} and the associated L{JobItem}. Typically work is not removed directly, but goes away
+        when processed, but in some cases (e.g., pod-2-pod migration) old work needs to be removed along with the
+        job (which is in a pause state and would otherwise never run).
+        """
+        # Delete the job, then self
+        yield JobItem.deletesome(self.transaction, JobItem.jobID == self.jobID)
+        yield self.delete()
+    @classmethod
+    @inlineCallbacks
+    def reschedule(cls, transaction, seconds, **kwargs):
+        """
+        Reschedule this work.
+        @param seconds: optional seconds delay - if not present use the class value.
+        @type seconds: L{int} or L{None}
+        """
+        if seconds is not None and seconds >= 0:
+            notBefore = (
+                datetime.utcnow() +
+                timedelta(seconds=seconds)
+            )
+            log.debug(
+                "Scheduling next {cls}: {when}",
+                cls=cls.__name__,
+                when=notBefore,
+            )
+            work = yield transaction._queuer.enqueueWork(
+                transaction,
+                cls,
+                notBefore=notBefore,
+                **kwargs
+            )
+            returnValue(work)
+        else:
+            returnValue(None)
+class SingletonWorkItem(WorkItem):
+    """
+    An L{WorkItem} that can only appear once no matter how many times an attempt is
+    made to create one. The L{allowOverride} class property determines whether the attempt
+    to create a new job is simply ignored, or whether the new job overrides any existing
+    one.
+    """
+    @classmethod
+    @inlineCallbacks
+    def makeJob(cls, transaction, **kwargs):
+        """
+        A new work item needs to be created. First we create a Job record, then
+        we create the actual work item related to the job.
+        @param transaction: the transaction to use
+        @type transaction: L{IAsyncTransaction}
+        """
+        all = yield cls.all(transaction)
+        if len(all):
+            # Silently ignore the creation of this work
+            returnValue(None)
+        result = yield super(SingletonWorkItem, cls).makeJob(transaction, **kwargs)
+        returnValue(result)
+    @inlineCallbacks
+    def beforeWork(self):
+        """
+        For safety just delete any others.
+        """
+        # Delete all other work items
+        yield self.deleteall(self.transaction)
+        returnValue(True)
+    @classmethod
+    @inlineCallbacks
+    def reschedule(cls, transaction, seconds, force=False, **kwargs):
+        """
+        Reschedule a singleton. If L{force} is set then delete any existing item before
+        creating the new one. This allows the caller to explicitly override an existing
+        singleton.
+        """
+        if force:
+            yield cls.deleteall(transaction)
+            yield cls.all(transaction)
+        result = yield super(SingletonWorkItem, cls).reschedule(transaction, seconds, **kwargs)
+        returnValue(result)
+class AggregatedWorkItem(WorkItem):
+    """
+    An L{WorkItem} that deletes all the others in the same group prior to running.
+    """
+    @inlineCallbacks
+    def beforeWork(self):
+        """
+        For safety just delete any others.
+        """
+        # Delete all other work items
+        yield self.deletesome(self.transaction, self.group)
+        returnValue(True)
+class RegeneratingWorkItem(SingletonWorkItem):
+    """
+    An L{SingletonWorkItem} that regenerates itself when work is done.
+    """
+    def regenerateInterval(self):
+        """
+        Return the interval in seconds between regenerating instances.
+        """
+        return None
+    @inlineCallbacks
+    def afterWork(self):
+        """
+        A hook that gets called after the L{WorkItem} does its real work. This can be used
+        for common clean-up behaviors. The base implementation does nothing.
+        """
+        yield super(RegeneratingWorkItem, self).afterWork()
+        yield self.reschedule(self.transaction, self.regenerateInterval())

Deleted: twext/trunk/twext/enterprise/test/test_jobqueue.py
--- twext/trunk/twext/enterprise/test/test_jobqueue.py	2015-08-06 21:04:52 UTC (rev 15034)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py	2015-08-07 20:06:34 UTC (rev 15035)
@@ -1,1946 +0,0 @@
-# Copyright (c) 2012-2015 Apple Inc. All rights reserved.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-# http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# See the License for the specific language governing permissions and
-# limitations under the License.
-Tests for L{twext.enterprise.job.queue}.
-import datetime
-from zope.interface.verify import verifyObject
-from twisted.internet import reactor
-from twisted.trial.unittest import TestCase, SkipTest
-from twisted.test.proto_helpers import StringTransport, MemoryReactor
-from twisted.internet.defer import \
-    Deferred, inlineCallbacks, gatherResults, passthru, returnValue, succeed, \
-    CancelledError
-from twisted.internet.task import Clock as _Clock
-from twisted.protocols.amp import Command, AMP, Integer
-from twisted.application.service import Service, MultiService
-from twext.enterprise.dal.syntax import SchemaSyntax, Delete
-from twext.enterprise.dal.parseschema import splitSQLString
-from twext.enterprise.dal.record import fromTable
-from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
-from twext.enterprise.fixtures import buildConnectionPool
-from twext.enterprise.fixtures import SteppablePoolHelper
-from twext.enterprise.jobqueue import \
-    inTransaction, PeerConnectionPool, astimestamp, \
-    LocalPerformer, _IJobPerformer, WorkItem, WorkerConnectionPool, \
-    ConnectionFromPeerNode, \
-    _BaseQueuer, NonPerformingQueuer, JobItem, \
-    JobDescriptor, SingletonWorkItem, JobFailedError, JobTemporaryError
-import twext.enterprise.jobqueue
-# TODO: There should be a store-building utility within twext.enterprise.
-    from txdav.common.datastore.test.util import buildStore
-except ImportError:
-    def buildStore(*args, **kwargs):
-        raise SkipTest(
-            "buildStore is not available, because it's in txdav; duh."
-        )
-class Clock(_Clock):
-    """
-    More careful L{IReactorTime} fake which mimics the exception behavior of
-    the real reactor.
-    """
-    def callLater(self, _seconds, _f, *args, **kw):
-        if _seconds < 0:
-            raise ValueError("%s<0: " % (_seconds,))
-        return super(Clock, self).callLater(_seconds, _f, *args, **kw)
-    @inlineCallbacks
-    def advanceCompletely(self, amount):
-        """
-        Move time on this clock forward by the given amount and run whatever
-        pending calls should be run. Always complete the deferred calls before
-        returning.
-        @type amount: C{float}
-        @param amount: The number of seconds which to advance this clock's
-        time.
-        """
-        self.rightNow += amount
-        self._sortCalls()
-        while self.calls and self.calls[0].getTime() <= self.seconds():
-            call = self.calls.pop(0)
-            call.called = 1
-            yield call.func(*call.args, **call.kw)
-            self._sortCalls()
-class MemoryReactorWithClock(MemoryReactor, Clock):
-    """
-    Simulate a real reactor.
-    """
-    def __init__(self):
-        MemoryReactor.__init__(self)
-        Clock.__init__(self)
-        self._sortCalls()
-def transactionally(transactionCreator):
-    """
-    Perform the decorated function immediately in a transaction, replacing its
-    name with a L{Deferred}.
-    Use like so::
-        @transactionally(connectionPool.connection)
-        @inlineCallbacks
-        def it(txn):
-            yield txn.doSomething()
-        it.addCallback(firedWhenDone)
-    @param transactionCreator: A 0-arg callable that returns an
-        L{IAsyncTransaction}.
-    """
-    def thunk(operation):
-        return inTransaction(transactionCreator, operation)
-    return thunk
-class UtilityTests(TestCase):
-    """
-    Tests for supporting utilities.
-    """
-    def test_inTransactionSuccess(self):
-        """
-        L{inTransaction} invokes its C{transactionCreator} argument, and then
-        returns a L{Deferred} which fires with the result of its C{operation}
-        argument when it succeeds.
-        """
-        class faketxn(object):
-            def __init__(self):
-                self.commits = []
-                self.aborts = []
-            def commit(self):
-                self.commits.append(Deferred())
-                return self.commits[-1]
-            def abort(self):
-                self.aborts.append(Deferred())
-                return self.aborts[-1]
-        createdTxns = []
-        def createTxn(label):
-            createdTxns.append(faketxn())
-            return createdTxns[-1]
-        dfrs = []
-        def operation(t):
-            self.assertIdentical(t, createdTxns[-1])
-            dfrs.append(Deferred())
-            return dfrs[-1]
-        d = inTransaction(createTxn, operation)
-        x = []
-        d.addCallback(x.append)
-        self.assertEquals(x, [])
-        self.assertEquals(len(dfrs), 1)
-        dfrs[0].callback(35)
-        # Commit in progress, so still no result...
-        self.assertEquals(x, [])
-        createdTxns[0].commits[0].callback(42)
-        # Committed, everything's done.
-        self.assertEquals(x, [35])
-class SimpleSchemaHelper(SchemaTestHelper):
-    def id(self):
-        return "worker"
-SQL = passthru
-nodeSchema = SQL(
-    """
-    create table NODE_INFO (
-      HOSTNAME varchar(255) not null,
-      PID integer not null,
-      PORT integer not null,
-      TIME timestamp default current_timestamp not null,
-      primary key (HOSTNAME, PORT)
-    );
-    """
-jobSchema = SQL(
-    """
-    create table JOB (
-      JOB_ID      integer primary key default 1,
-      WORK_TYPE   varchar(255) not null,
-      PRIORITY    integer default 0,
-      WEIGHT      integer default 0,
-      NOT_BEFORE  timestamp not null,
-      ASSIGNED    timestamp default null,
-      OVERDUE     timestamp default null,
-      FAILED      integer default 0,
-      PAUSE       integer default 0
-    );
-    """
-schemaText = SQL(
-    """
-    create table DUMMY_WORK_ITEM (
-      WORK_ID integer primary key,
-      JOB_ID integer references JOB,
-      A integer, B integer,
-      DELETE_ON_LOAD integer default 0
-    );
-    create table DUMMY_WORK_SINGLETON_ITEM (
-      WORK_ID integer primary key,
-      JOB_ID integer references JOB,
-      A integer, B integer,
-      DELETE_ON_LOAD integer default 0
-    );
-    create table DUMMY_WORK_PAUSE_ITEM (
-      WORK_ID integer primary key,
-      JOB_ID integer references JOB,
-      A integer, B integer,
-      DELETE_ON_LOAD integer default 0
-    );
-    create table AGGREGATOR_WORK_ITEM (
-      WORK_ID integer primary key,
-      JOB_ID integer references JOB,
-      A integer, B integer,
-      DELETE_ON_LOAD integer default 0
-    );
-    """
-    schema = SchemaSyntax(SimpleSchemaHelper().schemaFromString(jobSchema + schemaText))
-    dropSQL = [
-        "drop table {name} cascade".format(name=table)
-        for table in (
-            "DUMMY_WORK_ITEM",
-            "DUMMY_WORK_PAUSE_ITEM",
-        )
-    ] + ["delete from job"]
-except SkipTest as e:
-    DummyWorkItemTable = object
-    DummyWorkSingletonItemTable = object
-    DummyWorkPauseItemTable = object
-    AggregatorWorkItemTable = object
-    skip = e
-    DummyWorkItemTable = fromTable(schema.DUMMY_WORK_ITEM)
-    DummyWorkSingletonItemTable = fromTable(schema.DUMMY_WORK_SINGLETON_ITEM)
-    DummyWorkPauseItemTable = fromTable(schema.DUMMY_WORK_PAUSE_ITEM)
-    AggregatorWorkItemTable = fromTable(schema.AGGREGATOR_WORK_ITEM)
-    skip = False
-class DummyWorkItem(WorkItem, DummyWorkItemTable):
-    """
-    Sample L{WorkItem} subclass that adds two integers together and stores them
-    in another table.
-    """
-    results = {}
-    def doWork(self):
-        if self.a == -1:
-            raise ValueError("Ooops")
-        elif self.a == -2:
-            raise JobTemporaryError(120)
-        self.results[self.jobID] = self.a + self.b
-        return succeed(None)
-    @classmethod
-    @inlineCallbacks
-    def loadForJob(cls, txn, *a):
-        """
-        Load L{DummyWorkItem} as normal...  unless the loaded item has
-        C{DELETE_ON_LOAD} set, in which case, do a deletion of this same row in
-        a concurrent transaction, then commit it.
-        """
-        workItems = yield super(DummyWorkItem, cls).loadForJob(txn, *a)
-        if len(workItems) and workItems[0].deleteOnLoad:
-            otherTransaction = txn.store().newTransaction()
-            otherSelf = yield super(DummyWorkItem, cls).loadForJob(txn, *a)
-            yield otherSelf[0].delete()
-            yield otherTransaction.commit()
-        returnValue(workItems)
-class DummyWorkSingletonItem(SingletonWorkItem, DummyWorkSingletonItemTable):
-    """
-    Sample L{SingletonWorkItem} subclass that adds two integers together and stores them
-    in another table.
-    """
-    results = {}
-    def doWork(self):
-        if self.a == -1:
-            raise ValueError("Ooops")
-        self.results[self.jobID] = self.a + self.b
-        return succeed(None)
-class DummyWorkPauseItem(WorkItem, DummyWorkPauseItemTable):
-    """
-    Sample L{WorkItem} subclass that pauses until a Deferred is fired.
-    """
-    workStarted = None
-    unpauseWork = None
-    def doWork(self):
-        self.workStarted.callback(None)
-        return self.unpauseWork
-class AggregatorWorkItem(WorkItem, AggregatorWorkItemTable):
-    """
-    Sample L{WorkItem} subclass that deletes others with the same
-    value and than pauses for a bit.
-    """
-    group = property(lambda self: (self.table.B == self.b))
-    @inlineCallbacks
-    def doWork(self):
-        # Delete the work items we match
-        yield Delete(
-            From=self.table,
-            Where=(self.table.A == self.a)
-        ).on(self.transaction)
-        d = Deferred()
-        reactor.callLater(2.0, lambda: d.callback(None))
-        yield d
-class AMPTests(TestCase):
-    """
-    Tests for L{AMP} faithfully relaying ids across the wire.
-    """
-    def test_sendTableWithName(self):
-        """
-        You can send a reference to a table through a L{SchemaAMP} via
-        L{TableSyntaxByName}.
-        """
-        client = AMP()
-        class SampleCommand(Command):
-            arguments = [("id", Integer())]
-        class Receiver(AMP):
-            @SampleCommand.responder
-            def gotIt(self, id):
-                self.it = id
-                return {}
-        server = Receiver()
-        clientT = StringTransport()
-        serverT = StringTransport()
-        client.makeConnection(clientT)
-        server.makeConnection(serverT)
-        client.callRemote(SampleCommand, id=123)
-        server.dataReceived(clientT.io.getvalue())
-        self.assertEqual(server.it, 123)
-class WorkItemTests(TestCase):
-    """
-    A L{WorkItem} is an item of work that can be executed.
-    """
-    def test_forTableName(self):
-        """
-        L{WorkItem.forTable} returns L{WorkItem} subclasses mapped to the given
-        table.
-        """
-        self.assertIdentical(
-            JobItem.workItemForType(schema.DUMMY_WORK_ITEM.model.name), DummyWorkItem
-        )
-    @inlineCallbacks
-    def _enqueue(self, dbpool, a, b, notBefore=None, priority=None, weight=None, cl=DummyWorkItem):
-        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
-        if notBefore is None:
-            notBefore = datetime.datetime(2012, 12, 13, 12, 12, 0)
-        sinceEpoch = astimestamp(fakeNow)
-        clock = Clock()
-        clock.advance(sinceEpoch)
-        qpool = PeerConnectionPool(clock, dbpool.connection, 0, useWorkerPool=False)
-        realChoosePerformer = qpool.choosePerformer
-        performerChosen = []
-        def catchPerformerChoice():
-            result = realChoosePerformer()
-            performerChosen.append(True)
-            return result
-        qpool.choosePerformer = catchPerformerChoice
-        @transactionally(dbpool.connection)
-        def check(txn):
-            return qpool.enqueueWork(
-                txn, cl,
-                a=a, b=b, priority=priority, weight=weight,
-                notBefore=notBefore
-            )
-        yield check
-        returnValue(qpool)
-    @inlineCallbacks
-    def test_enqueue(self):
-        """
-        L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
-        """
-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
-        yield self._enqueue(dbpool, 1, 2)
-        # Make sure we have one JOB and one DUMMY_WORK_ITEM
-        @transactionally(dbpool.connection)
-        def checkJob(txn):
-            return JobItem.all(txn)
-        jobs = yield checkJob
-        self.assertTrue(len(jobs) == 1)
-        self.assertTrue(jobs[0].workType == "DUMMY_WORK_ITEM")
-        self.assertTrue(jobs[0].assigned is None)
-        @transactionally(dbpool.connection)
-        def checkWork(txn):
-            return DummyWorkItem.all(txn)
-        work = yield checkWork
-        self.assertTrue(len(work) == 1)
-        self.assertTrue(work[0].jobID == jobs[0].jobID)
-    @inlineCallbacks
-    def test_assign(self):
-        """
-        L{JobItem.assign} will mark a job as assigned.
-        """
-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
-        yield self._enqueue(dbpool, 1, 2)
-        # Make sure we have one JOB and one DUMMY_WORK_ITEM
-        def checkJob(txn):
-            return JobItem.all(txn)
-        jobs = yield inTransaction(dbpool.connection, checkJob)
-        self.assertTrue(len(jobs) == 1)
-        self.assertTrue(jobs[0].assigned is None)
-        @inlineCallbacks
-        def assignJob(txn):
-            job = yield JobItem.load(txn, jobs[0].jobID)
-            yield job.assign(datetime.datetime.utcnow(), PeerConnectionPool.queueOverdueTimeout)
-        yield inTransaction(dbpool.connection, assignJob)
-        jobs = yield inTransaction(dbpool.connection, checkJob)
-        self.assertTrue(len(jobs) == 1)
-        self.assertTrue(jobs[0].assigned is not None)
-    @inlineCallbacks
-    def test_nextjob(self):
-        """
-        L{JobItem.nextjob} returns the correct job based on priority.
-        """
-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
-        now = datetime.datetime.utcnow()
-        # Empty job queue
-        @inlineCallbacks
-        def _next(txn, priority=WORK_PRIORITY_LOW):
-            job = yield JobItem.nextjob(txn, now, priority)
-            if job is not None:
-                work = yield job.workItem()
-            else:
-                work = None
-            returnValue((job, work))
-        job, work = yield inTransaction(dbpool.connection, _next)
-        self.assertTrue(job is None)
-        self.assertTrue(work is None)
-        # Unassigned job with future notBefore not returned
-        yield self._enqueue(dbpool, 1, 1, now + datetime.timedelta(days=1))
-        job, work = yield inTransaction(dbpool.connection, _next)
-        self.assertTrue(job is None)
-        self.assertTrue(work is None)
-        # Unassigned job with past notBefore returned
-        yield self._enqueue(dbpool, 2, 1, now + datetime.timedelta(days=-1), priority=WORK_PRIORITY_HIGH)
-        job, work = yield inTransaction(dbpool.connection, _next)
-        self.assertTrue(job is not None)
-        self.assertTrue(work.a == 2)
-        assignID = job.jobID
-        # Assigned job with past notBefore not returned
-        @inlineCallbacks
-        def assignJob(txn, when=None):
-            assignee = yield JobItem.load(txn, assignID)
-            yield assignee.assign(now if when is None else when, PeerConnectionPool.queueOverdueTimeout)
-        yield inTransaction(dbpool.connection, assignJob)
-        job, work = yield inTransaction(dbpool.connection, _next)
-        self.assertTrue(job is None)
-        self.assertTrue(work is None)
-        # Unassigned, paused job with past notBefore not returned
-        yield self._enqueue(dbpool, 3, 1, now + datetime.timedelta(days=-1), priority=WORK_PRIORITY_HIGH)
-        @inlineCallbacks
-        def pauseJob(txn, pause=True):
-            works = yield DummyWorkItem.all(txn)
-            for work in works:
-                if work.a == 3:
-                    job = yield JobItem.load(txn, work.jobID)
-                    yield job.pauseIt(pause)
-        yield inTransaction(dbpool.connection, pauseJob)
-        job, work = yield inTransaction(dbpool.connection, _next)
-        self.assertTrue(job is None)
-        self.assertTrue(work is None)
-        # Unassigned, paused then unpaused job with past notBefore is returned
-        yield inTransaction(dbpool.connection, pauseJob, pause=False)
-        job, work = yield inTransaction(dbpool.connection, _next)
-        self.assertTrue(job is not None)
-        self.assertTrue(work.a == 3)
-        @inlineCallbacks
-        def deleteJob(txn, jobID):
-            job = yield JobItem.load(txn, jobID)
-            yield job.delete()
-        yield inTransaction(dbpool.connection, deleteJob, jobID=job.jobID)
-        # Unassigned low priority job with past notBefore not returned if high priority required
-        yield self._enqueue(dbpool, 4, 1, now + datetime.timedelta(days=-1))
-        job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_HIGH)
-        self.assertTrue(job is None)
-        self.assertTrue(work is None)
-        # Unassigned low priority job with past notBefore not returned if medium priority required
-        yield self._enqueue(dbpool, 5, 1, now + datetime.timedelta(days=-1))
-        job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_MEDIUM)
-        self.assertTrue(job is None)
-        self.assertTrue(work is None)
-        # Assigned job with past notBefore, but overdue is returned
-        yield inTransaction(dbpool.connection, assignJob, when=now + datetime.timedelta(days=-1))
-        job, work = yield inTransaction(dbpool.connection, _next, priority=WORK_PRIORITY_HIGH)
-        self.assertTrue(job is not None)
-        self.assertTrue(work.a == 2)
-    @inlineCallbacks
-    def test_notsingleton(self):
-        """
-        L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
-        """
-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
-        yield self._enqueue(dbpool, 1, 2, cl=DummyWorkItem)
-        def allJobs(txn):
-            return DummyWorkItem.all(txn)
-        jobs = yield inTransaction(dbpool.connection, allJobs)
-        self.assertTrue(len(jobs) == 1)
-        yield self._enqueue(dbpool, 2, 3)
-        jobs = yield inTransaction(dbpool.connection, allJobs)
-        self.assertTrue(len(jobs) == 2)
-    @inlineCallbacks
-    def test_singleton(self):
-        """
-        L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
-        """
-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
-        yield self._enqueue(dbpool, 1, 2, cl=DummyWorkSingletonItem)
-        def allJobs(txn):
-            return DummyWorkSingletonItem.all(txn)
-        jobs = yield inTransaction(dbpool.connection, allJobs)
-        self.assertTrue(len(jobs) == 1)
-        yield self._enqueue(dbpool, 2, 3, cl=DummyWorkSingletonItem)
-        jobs = yield inTransaction(dbpool.connection, allJobs)
-        self.assertTrue(len(jobs) == 1)
-    @inlineCallbacks
-    def test_singleton_reschedule(self):
-        """
-        L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
-        """
-        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
-        qpool = yield self._enqueue(dbpool, 1, 2, cl=DummyWorkSingletonItem, notBefore=datetime.datetime(2014, 5, 17, 12, 0, 0))
-        @inlineCallbacks
-        def allWork(txn):
-            jobs = yield JobItem.all(txn)
-            work = [((yield job.workItem()), job) for job in jobs]
-            returnValue(filter(lambda x: x[0], work))
-        work = yield inTransaction(dbpool.connection, allWork)
-        self.assertTrue(len(work) == 1)
-        self.assertTrue(work[0][1].notBefore == datetime.datetime(2014, 5, 17, 12, 0, 0))
-        def _reschedule_force(txn, force):
-            txn._queuer = qpool
-            return DummyWorkSingletonItem.reschedule(txn, 60, force=force)
-        yield inTransaction(dbpool.connection, _reschedule_force, force=False)
-        work = yield inTransaction(dbpool.connection, allWork)
-        self.assertTrue(len(work) == 1)
-        self.assertTrue(work[0][1].notBefore == datetime.datetime(2014, 5, 17, 12, 0, 0))
-        yield inTransaction(dbpool.connection, _reschedule_force, force=True)
-        work = yield inTransaction(dbpool.connection, allWork)
-        self.assertTrue(len(work) == 1)
-        self.assertTrue(work[0][1].notBefore != datetime.datetime(2014, 5, 17, 12, 0, 0))
-class WorkerConnectionPoolTests(TestCase):
-    """
-    A L{WorkerConnectionPool} is responsible for managing, in a node's
-    controller (master) process, the collection of worker (slave) processes
-    that are capable of executing queue work.
-    """
-class PeerConnectionPoolUnitTests(TestCase):
-    """
-    L{PeerConnectionPool} has many internal components.
-    """
-    def setUp(self):
-        """
-        Create a L{PeerConnectionPool} that is just initialized enough.
-        """
-        self.pcp = PeerConnectionPool(None, None, 4321)
-        DummyWorkItem.results = {}
-    def checkPerformer(self, cls):
-        """
-        Verify that the performer returned by
-        L{PeerConnectionPool.choosePerformer}.
-        """
-        performer = self.pcp.choosePerformer()
-        self.failUnlessIsInstance(performer, cls)
-        verifyObject(_IJobPerformer, performer)
-    def _setupPools(self):
-        """
-        Setup pool and reactor clock for time stepped tests.
-        """
-        reactor = MemoryReactorWithClock()
-        cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
-        then = datetime.datetime(2012, 12, 12, 12, 12, 12)
-        reactor.advance(astimestamp(then))
-        cph.setUp(self)
-        qpool = PeerConnectionPool(reactor, cph.pool.connection, 4321, useWorkerPool=False)
-        realChoosePerformer = qpool.choosePerformer
-        performerChosen = []
-        def catchPerformerChoice(onlyLocally=False):
-            result = realChoosePerformer(onlyLocally=onlyLocally)
-            performerChosen.append(True)
-            return result
-        qpool.choosePerformer = catchPerformerChoice
-        reactor.callLater(0, qpool._workCheck)
-        qpool.startService()
-        cph.flushHolders()
-        return cph, qpool, reactor, performerChosen
-    def test_choosingPerformerWhenNoPeersAndNoWorkers(self):
-        """
-        If L{PeerConnectionPool.choosePerformer} is invoked when no workers
-        have spawned and no peers have established connections (either incoming
-        or outgoing), then it chooses an implementation of C{performJob} that
-        simply executes the work locally.
-        """
-        # If we're using worker pool, this should raise
-        try:
-            self.pcp.choosePerformer()
-        except JobFailedError:
-            pass
-        else:
-            self.fail("Didn't raise JobFailedError")
-        # If we're not using worker pool, we should get back LocalPerformer
-        self.pcp = PeerConnectionPool(None, None, 4321, useWorkerPool=False)
-        self.checkPerformer(LocalPerformer)
-    def test_choosingPerformerWithLocalCapacity(self):
-        """
-        If L{PeerConnectionPool.choosePerformer} is invoked when some workers
-        have spawned, then it should choose the worker pool as the local
-        performer.
-        """
-        # Give it some local capacity.
-        # In this case we want pcp to have a workerPool, so create a new pcp
-        # for this test
-        self.pcp = PeerConnectionPool(None, None, 4321)
-        wlf = self.pcp.workerListenerFactory()
-        proto = wlf.buildProtocol(None)
-        proto.makeConnection(StringTransport())
-        # Sanity check.
-        self.assertEqual(len(self.pcp.workerPool.workers), 1)
-        self.assertEqual(self.pcp.workerPool.hasAvailableCapacity(), True)
-        # Now it has some capacity.
-        self.checkPerformer(WorkerConnectionPool)
-    def test_choosingPerformerFromNetwork(self):
-        """
-        If L{PeerConnectionPool.choosePerformer} is invoked when no workers
-        have spawned but some peers have connected, then it should choose a
-        connection from the network to perform it.
-        """
-        peer = PeerConnectionPool(None, None, 4322)
-        local = self.pcp.peerFactory().buildProtocol(None)
-        remote = peer.peerFactory().buildProtocol(None)
-        connection = Connection(local, remote)
-        connection.start()
-        self.checkPerformer(ConnectionFromPeerNode)
-    def test_performingWorkOnNetwork(self):
-        """
-        The L{performJob} command will get relayed to the remote peer
-        controller.
-        """
-        peer = PeerConnectionPool(None, None, 4322)
-        local = self.pcp.peerFactory().buildProtocol(None)
-        remote = peer.peerFactory().buildProtocol(None)
-        connection = Connection(local, remote)
-        connection.start()
-        d = Deferred()
-        class DummyPerformer(object):
-            def performJob(self, job):
-                self.jobID = job.jobID
-                return d
-        # Doing real database I/O in this test would be tedious so fake the
-        # first method in the call stack which actually talks to the DB.
-        dummy = DummyPerformer()
-        def chooseDummy(onlyLocally=False):
-            return dummy
-        peer.choosePerformer = chooseDummy
-        performed = local.performJob(JobDescriptor(7384, 1, "ABC"))
-        performResult = []
-        performed.addCallback(performResult.append)
-        # Sanity check.
-        self.assertEquals(performResult, [])
-        connection.flush()
-        self.assertEquals(dummy.jobID, 7384)
-        self.assertEquals(performResult, [])
-        d.callback(128374)
-        connection.flush()
-        self.assertEquals(performResult, [None])
-    def test_choosePerformerSorted(self):
-        """
-        If L{PeerConnectionPool.choosePerformer} is invoked make it
-        return the peer with the least load.
-        """
-        peer = PeerConnectionPool(None, None, 4322)
-        class DummyPeer(object):
-            def __init__(self, name, load):
-                self.name = name
-                self.load = load
-            def currentLoadEstimate(self):
-                return self.load
-        apeer = DummyPeer("A", 1)
-        bpeer = DummyPeer("B", 0)
-        cpeer = DummyPeer("C", 2)
-        peer.addPeerConnection(apeer)
-        peer.addPeerConnection(bpeer)
-        peer.addPeerConnection(cpeer)
-        performer = peer.choosePerformer(onlyLocally=False)
-        self.assertEqual(performer, bpeer)
-        bpeer.load = 2
-        performer = peer.choosePerformer(onlyLocally=False)
-        self.assertEqual(performer, apeer)
-    @inlineCallbacks
-    def test_notBeforeWhenCheckingForWork(self):
-        """
-        L{PeerConnectionPool._workCheck} should execute any
-        outstanding work items, but only those that are expired.
-        """
-        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
-        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
-        # Let's create a couple of work items directly, not via the enqueue
-        # method, so that they exist but nobody will try to immediately execute
-        # them.
-        @transactionally(dbpool.pool.connection)
-        @inlineCallbacks
-        def setup(txn):
-            # First, one that's right now.
-            yield DummyWorkItem.makeJob(txn, a=1, b=2, notBefore=fakeNow)
-            # Next, create one that's actually far enough into the past to run.
-            yield DummyWorkItem.makeJob(
-                txn, a=3, b=4, notBefore=(
-                    # Schedule it in the past so that it should have already
-                    # run.
-                    fakeNow - datetime.timedelta(seconds=20)
-                )
-            )
-            # Finally, one that's actually scheduled for the future.
-            yield DummyWorkItem.makeJob(
-                txn, a=10, b=20, notBefore=fakeNow + datetime.timedelta(1000)
-            )
-        yield setup
-        # Wait for job
-        while len(DummyWorkItem.results) != 2:
-            clock.advance(1)
-        # Work item complete
-        self.assertTrue(DummyWorkItem.results == {1: 3, 2: 7})
-    @inlineCallbacks
-    def test_notBeforeWhenEnqueueing(self):
-        """
-        L{PeerConnectionPool.enqueueWork} enqueues some work immediately, but
-        only executes it when enough time has elapsed to allow the C{notBefore}
-        attribute of the given work item to have passed.
-        """
-        dbpool, qpool, clock, performerChosen = self._setupPools()
-        @transactionally(dbpool.pool.connection)
-        def check(txn):
-            return qpool.enqueueWork(
-                txn, DummyWorkItem, a=3, b=9,
-                notBefore=datetime.datetime(2012, 12, 12, 12, 12, 20)
-            )
-        yield check
-        # This is going to schedule the work to happen with some asynchronous
-        # I/O in the middle; this is a problem because how do we know when it's
-        # time to check to see if the work has started?  We need to intercept
-        # the thing that kicks off the work; we can then wait for the work
-        # itself.
-        self.assertEquals(performerChosen, [])
-        # Advance to exactly the appointed second.
-        clock.advance(20 - 12)
-        self.assertEquals(performerChosen, [True])
-        # Wait for job
-        while (yield inTransaction(dbpool.pool.connection, lambda txn: JobItem.all(txn))):
-            clock.advance(1)
-        # Work item complete
-        self.assertTrue(DummyWorkItem.results == {1: 12})
-    @inlineCallbacks
-    def test_notBeforeBefore(self):
-        """
-        L{PeerConnectionPool.enqueueWork} will execute its work immediately if
-        the C{notBefore} attribute of the work item in question is in the past.
-        """
-        dbpool, qpool, clock, performerChosen = self._setupPools()
-        @transactionally(dbpool.pool.connection)
-        def check(txn):
-            return qpool.enqueueWork(
-                txn, DummyWorkItem, a=3, b=9,
-                notBefore=datetime.datetime(2012, 12, 12, 12, 12, 0)
-            )
-        yield check
-        clock.advance(1000)
-        # Advance far beyond the given timestamp.
-        self.assertEquals(performerChosen, [True])
-        # Wait for job
-        while (yield inTransaction(dbpool.pool.connection, lambda txn: JobItem.all(txn))):
-            clock.advance(1)
-        # Work item complete
-        self.assertTrue(DummyWorkItem.results == {1: 12})
-    def test_workerConnectionPoolPerformJob(self):
-        """
-        L{WorkerConnectionPool.performJob} performs work by selecting a
-        L{ConnectionFromWorker} and sending it a L{PerformJOB} command.
-        """
-        clock = Clock()
-        peerPool = PeerConnectionPool(clock, None, 4322)
-        factory = peerPool.workerListenerFactory()
-        def peer():
-            p = factory.buildProtocol(None)
-            t = StringTransport()
-            p.makeConnection(t)
-            return p, t
-        worker1, _ignore_trans1 = peer()
-        worker2, _ignore_trans2 = peer()
-        # Ask the worker to do something.
-        worker1.performJob(JobDescriptor(1, 1, "ABC"))
-        self.assertEquals(worker1.currentLoad, 1)
-        self.assertEquals(worker2.currentLoad, 0)
-        # Now ask the pool to do something
-        peerPool.workerPool.performJob(JobDescriptor(2, 1, "ABC"))
-        self.assertEquals(worker1.currentLoad, 1)
-        self.assertEquals(worker2.currentLoad, 1)
-    def test_poolStartServiceChecksForWork(self):
-        """
-        L{PeerConnectionPool.startService} kicks off the idle work-check loop.
-        """
-        reactor = MemoryReactorWithClock()
-        cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
-        then = datetime.datetime(2012, 12, 12, 12, 12, 0)
-        reactor.advance(astimestamp(then))
-        cph.setUp(self)
-        pcp = PeerConnectionPool(reactor, cph.pool.connection, 4321, useWorkerPool=False)
-        now = then + datetime.timedelta(seconds=20)
-        @transactionally(cph.pool.connection)
-        def createOldWork(txn):
-            one = DummyWorkItem.makeJob(txn, jobID=1, workID=1, a=3, b=4, notBefore=then)
-            two = DummyWorkItem.makeJob(txn, jobID=2, workID=2, a=7, b=9, notBefore=now)
-            return gatherResults([one, two])
-        pcp.startService()
-        cph.flushHolders()
-        reactor.advance(19)
-        self.assertEquals(
-            DummyWorkItem.results,
-            {1: 7}
-        )
-        reactor.advance(20)
-        self.assertEquals(
-            DummyWorkItem.results,
-            {1: 7, 2: 16}
-        )
-    @inlineCallbacks
-    def test_exceptionWhenWorking(self):
-        """
-        L{PeerConnectionPool._workCheck} should execute any
-        outstanding work items, and keep going if some raise an exception.
-        """
-        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
-        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
-        # Let's create a couple of work items directly, not via the enqueue
-        # method, so that they exist but nobody will try to immediately execute
-        # them.
-        @transactionally(dbpool.pool.connection)
-        @inlineCallbacks
-        def setup(txn):
-            # OK
-            yield DummyWorkItem.makeJob(
-                txn, a=1, b=0, notBefore=fakeNow - datetime.timedelta(20 * 60)
-            )
-            # Error
-            yield DummyWorkItem.makeJob(
-                txn, a=-1, b=1, notBefore=fakeNow - datetime.timedelta(20 * 60)
-            )
-            # OK
-            yield DummyWorkItem.makeJob(
-                txn, a=2, b=0, notBefore=fakeNow - datetime.timedelta(20 * 60)
-            )
-        yield setup
-        clock.advance(20 - 12)
-        # Work item complete
-        self.assertTrue(DummyWorkItem.results == {1: 1, 3: 2})
-    @inlineCallbacks
-    def test_exceptionUnassign(self):
-        """
-        When a work item fails it should appear as unassigned in the JOB
-        table and have the failure count bumped, and a notBefore one minute ahead.
-        """
-        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
-        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
-        # Let's create a couple of work items directly, not via the enqueue
-        # method, so that they exist but nobody will try to immediately execute
-        # them.
-        @transactionally(dbpool.pool.connection)
-        @inlineCallbacks
-        def setup(txn):
-            # Next, create failing work that's actually far enough into the past to run.
-            yield DummyWorkItem.makeJob(
-                txn, a=-1, b=1, notBefore=fakeNow - datetime.timedelta(20 * 60)
-            )
-        yield setup
-        clock.advance(20 - 12)
-        @transactionally(dbpool.pool.connection)
-        def check(txn):
-            return JobItem.all(txn)
-        jobs = yield check
-        self.assertTrue(len(jobs) == 1)
-        self.assertTrue(jobs[0].assigned is None)
-        self.assertTrue(jobs[0].failed == 1)
-        self.assertTrue(jobs[0].notBefore > datetime.datetime.utcnow())
-    @inlineCallbacks
-    def test_temporaryFailure(self):
-        """
-        When a work item temporarily fails it should appear as unassigned in the JOB
-        table and have the failure count bumped, and a notBefore set to the temporary delay.
-        """
-        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
-        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
-        # Let's create a couple of work items directly, not via the enqueue
-        # method, so that they exist but nobody will try to immediately execute
-        # them.
-        @transactionally(dbpool.pool.connection)
-        @inlineCallbacks
-        def setup(txn):
-            # Next, create failing work that's actually far enough into the past to run.
-            yield DummyWorkItem.makeJob(
-                txn, a=-2, b=1, notBefore=fakeNow - datetime.timedelta(20 * 60)
-            )
-        yield setup
-        clock.advance(20 - 12)
-        @transactionally(dbpool.pool.connection)
-        def check(txn):
-            return JobItem.all(txn)
-        jobs = yield check
-        self.assertTrue(len(jobs) == 1)
-        self.assertTrue(jobs[0].assigned is None)
-        self.assertTrue(jobs[0].failed == 1)
-        self.assertTrue(jobs[0].notBefore > datetime.datetime.utcnow() + datetime.timedelta(seconds=90))
-    @inlineCallbacks
-    def test_loopFailure_noRecovery(self):
-        """
-        When L{_workCheck} fails in its loop we need the problem job marked as failed.
-        """
-        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
-        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
-        oldNextJob = JobItem.nextjob
-        @inlineCallbacks
-        def _nextJob(cls, txn, now, minPriority):
-            job = yield oldNextJob(txn, now, minPriority)
-            work = yield job.workItem()
-            if work.a == -2:
-                raise ValueError("oops")
-        self.patch(JobItem, "nextjob", classmethod(_nextJob))
-        # Let's create a couple of work items directly, not via the enqueue
-        # method, so that they exist but nobody will try to immediately execute
-        # them.
-        @transactionally(dbpool.pool.connection)
-        @inlineCallbacks
-        def setup(txn):
-            # Failing
-            yield DummyWorkItem.makeJob(
-                txn, a=-2, b=1, notBefore=fakeNow - datetime.timedelta(20 * 60)
-            )
-            # OK
-            yield DummyWorkItem.makeJob(
-                txn, a=1, b=0, notBefore=fakeNow - datetime.timedelta(20 * 60, 5)
-            )
-        yield setup
-        clock.advance(20 - 12)
-        @transactionally(dbpool.pool.connection)
-        def check(txn):
-            return JobItem.all(txn)
-        jobs = yield check
-        self.assertEqual(len(jobs), 2)
-        self.assertEqual(jobs[0].assigned, None)
-        self.assertEqual(jobs[0].failed, 0)
-        self.assertEqual(jobs[0].notBefore, fakeNow - datetime.timedelta(20 * 60))
-        self.assertEqual(jobs[1].assigned, None)
-        self.assertEqual(jobs[1].failed, 0)
-        self.assertEqual(jobs[1].notBefore, fakeNow - datetime.timedelta(20 * 60, 5))
-    @inlineCallbacks
-    def test_loopFailure_recovery(self):
-        """
-        When L{_workCheck} fails in its loop we need the problem job marked as failed.
-        """
-        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
-        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
-        oldAssign = JobItem.assign
-        @inlineCallbacks
-        def _assign(self, when, overdue):
-            work = yield self.workItem()
-            if work.a == -2:
-                raise ValueError("oops")
-            yield oldAssign(self, when, overdue)
-        self.patch(JobItem, "assign", _assign)
-        # Let's create a couple of work items directly, not via the enqueue
-        # method, so that they exist but nobody will try to immediately execute
-        # them.
-        @transactionally(dbpool.pool.connection)
-        @inlineCallbacks
-        def setup(txn):
-            # Failing
-            yield DummyWorkItem.makeJob(
-                txn, a=-2, b=1, notBefore=fakeNow - datetime.timedelta(20 * 60)
-            )
-            # OK
-            yield DummyWorkItem.makeJob(
-                txn, a=1, b=0, notBefore=fakeNow - datetime.timedelta(20 * 60, 5)
-            )
-        yield setup
-        clock.advance(20 - 12)
-        @transactionally(dbpool.pool.connection)
-        def check(txn):
-            return JobItem.all(txn)
-        jobs = yield check
-        self.assertEqual(len(jobs), 1)
-        self.assertEqual(jobs[0].assigned, None)
-        self.assertEqual(jobs[0].failed, 1)
-        self.assertGreater(jobs[0].notBefore, datetime.datetime.utcnow() + datetime.timedelta(seconds=30))
-    @inlineCallbacks
-    def test_loopFailure_failedRecovery(self):
-        """
-        When L{_workCheck} fails in its loop we need the problem job marked as failed.
-        """
-        dbpool, _ignore_qpool, clock, _ignore_performerChosen = self._setupPools()
-        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
-        oldAssign = JobItem.assign
-        @inlineCallbacks
-        def _assign(self, when, overdue):
-            work = yield self.workItem()
-            if work.a == -2:
-                raise ValueError("oops")
-            yield oldAssign(self, when, overdue)
-        self.patch(JobItem, "assign", _assign)
-        @inlineCallbacks
-        def _failedToRun(self, locked=False, delay=None):
-            raise ValueError("oops")
-        self.patch(JobItem, "failedToRun", _failedToRun)
-        # Let's create a couple of work items directly, not via the enqueue
-        # method, so that they exist but nobody will try to immediately execute
-        # them.
-        @transactionally(dbpool.pool.connection)
-        @inlineCallbacks
-        def setup(txn):
-            # Failing
-            yield DummyWorkItem.makeJob(
-                txn, a=-2, b=1, notBefore=fakeNow - datetime.timedelta(20 * 60)
-            )
-            # OK
-            yield DummyWorkItem.makeJob(
-                txn, a=1, b=0, notBefore=fakeNow - datetime.timedelta(20 * 60, 5)
-            )
-        yield setup
-        clock.advance(20 - 12)
-        @transactionally(dbpool.pool.connection)
-        def check(txn):
-            return JobItem.all(txn)
-        jobs = yield check
-        self.assertEqual(len(jobs), 2)
-        self.assertEqual(jobs[0].assigned, None)
-        self.assertEqual(jobs[0].failed, 0)
-        self.assertEqual(jobs[0].notBefore, fakeNow - datetime.timedelta(20 * 60))
-        self.assertEqual(jobs[1].assigned, None)
-        self.assertEqual(jobs[1].failed, 0)
-        self.assertEqual(jobs[1].notBefore, fakeNow - datetime.timedelta(20 * 60, 5))
-    @inlineCallbacks
-    def test_enableDisable(self):
-        """
-        L{PeerConnectionPool.enable} and L{PeerConnectionPool.disable} control queue processing.
-        """
-        dbpool, qpool, clock, performerChosen = self._setupPools()
-        # Disable processing
-        qpool.disable()
-        @transactionally(dbpool.pool.connection)
-        def check(txn):
-            return qpool.enqueueWork(
-                txn, DummyWorkItem, a=3, b=9,
-                notBefore=datetime.datetime(2012, 12, 12, 12, 12, 0)
-            )
-        yield check
-        # Advance far beyond the given timestamp.
-        clock.advance(1000)
-        self.assertEquals(performerChosen, [])
-        # Enable processing
-        qpool.enable()
-        clock.advance(1000)
-        self.assertEquals(performerChosen, [True])
-        # Wait for job
-        while (yield inTransaction(dbpool.pool.connection, lambda txn: JobItem.all(txn))):
-            clock.advance(1)
-        # Work item complete
-        self.assertTrue(DummyWorkItem.results == {1: 12})
-class HalfConnection(object):
-    def __init__(self, protocol):
-        self.protocol = protocol
-        self.transport = StringTransport()
-    def start(self):
-        """
-        Hook up the protocol and the transport.
-        """
-        self.protocol.makeConnection(self.transport)
-    def extract(self):
-        """
-        Extract the data currently present in this protocol's output buffer.
-        """
-        io = self.transport.io
-        value = io.getvalue()
-        io.seek(0)
-        io.truncate()
-        return value
-    def deliver(self, data):
-        """
-        Deliver the given data to this L{HalfConnection}'s protocol's
-        C{dataReceived} method.
-        @return: a boolean indicating whether any data was delivered.
-        @rtype: L{bool}
-        """
-        if data:
-            self.protocol.dataReceived(data)
-            return True
-        return False
-class Connection(object):
-    def __init__(self, local, remote):
-        """
-        Connect two protocol instances to each other via string transports.
-        """
-        self.receiver = HalfConnection(local)
-        self.sender = HalfConnection(remote)
-    def start(self):
-        """
-        Start up the connection.
-        """
-        self.sender.start()
-        self.receiver.start()
-    def pump(self):
-        """
-        Relay data in one direction between the two connections.
-        """
-        result = self.receiver.deliver(self.sender.extract())
-        self.receiver, self.sender = self.sender, self.receiver
-        return result
-    def flush(self, turns=10):
-        """
-        Keep relaying data until there's no more.
-        """
-        for _ignore_x in range(turns):
-            if not (self.pump() or self.pump()):
-                return
-class PeerConnectionPoolIntegrationTests(TestCase):
-    """
-    L{PeerConnectionPool} is the service responsible for coordinating
-    eventually-consistent task queuing within a cluster.
-    """
-    @inlineCallbacks
-    def setUp(self):
-        """
-        L{PeerConnectionPool} requires access to a database and the reactor.
-        """
-        self.store = yield buildStore(self, None)
-        @inlineCallbacks
-        def doit(txn):
-            for statement in splitSQLString(schemaText):
-                yield txn.execSQL(statement)
-        yield inTransaction(
-            self.store.newTransaction,
-            doit,
-            label="bonus schema"
-        )
-        def indirectedTransactionFactory(*a, **b):
-            """
-            Allow tests to replace "self.store.newTransaction" to provide
-            fixtures with extra methods on a test-by-test basis.
-            """
-            return self.store.newTransaction(*a, **b)
-        def deschema():
-            @inlineCallbacks
-            def deletestuff(txn):
-                for stmt in dropSQL:
-                    yield txn.execSQL(stmt)
-            return inTransaction(
-                lambda *a, **b: self.store.newTransaction(*a, **b), deletestuff
-            )
-        self.addCleanup(deschema)
-        self.node1 = PeerConnectionPool(
-            reactor, indirectedTransactionFactory, 0, useWorkerPool=False)
-        self.node2 = PeerConnectionPool(
-            reactor, indirectedTransactionFactory, 0, useWorkerPool=False)
-        class FireMeService(Service, object):
-            def __init__(self, d):
-                super(FireMeService, self).__init__()
-                self.d = d
-            def startService(self):
-                self.d.callback(None)
-        d1 = Deferred()
-        d2 = Deferred()
-        FireMeService(d1).setServiceParent(self.node1)
-        FireMeService(d2).setServiceParent(self.node2)
-        ms = MultiService()
-        self.node1.setServiceParent(ms)
-        self.node2.setServiceParent(ms)
-        ms.startService()
-        @inlineCallbacks
-        def _clean():
-            yield ms.stopService()
-            self.flushLoggedErrors(CancelledError)
-        self.addCleanup(_clean)
-        yield gatherResults([d1, d2])
-        self.store.queuer = self.node1
-        DummyWorkItem.results = {}
-    def test_currentNodeInfo(self):
-        """
-        There will be two C{NODE_INFO} rows in the database, retrievable as two
-        L{NodeInfo} objects, once both nodes have started up.
-        """
-        @inlineCallbacks
-        def check(txn):
-            self.assertEquals(len((yield self.node1.activeNodes(txn))), 2)
-            self.assertEquals(len((yield self.node2.activeNodes(txn))), 2)
-        return inTransaction(self.store.newTransaction, check)
-    @inlineCallbacks
-    def test_enqueueWorkDone(self):
-        """
-        When a L{WorkItem} is scheduled for execution via
-        L{PeerConnectionPool.enqueueWork} its C{doWork} method will be
-        run.
-        """
-        # TODO: this exact test should run against LocalQueuer as well.
-        def operation(txn):
-            # TODO: how does "enqueue" get associated with the transaction?
-            # This is not the fact with a raw t.w.enterprise transaction.
-            # Should probably do something with components.
-            return txn.enqueue(DummyWorkItem, a=3, b=4, jobID=100, workID=1,
-                               notBefore=datetime.datetime.utcnow())
-        yield inTransaction(self.store.newTransaction, operation)
-        # Wait for it to be executed.  Hopefully this does not time out :-\.
-        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
-        self.assertEquals(DummyWorkItem.results, {100: 7})
-    @inlineCallbacks
-    def test_noWorkDoneWhenConcurrentlyDeleted(self):
-        """
-        When a L{WorkItem} is concurrently deleted by another transaction, it
-        should I{not} perform its work.
-        """
-        def operation(txn):
-            return txn.enqueue(
-                DummyWorkItem, a=30, b=40, workID=5678,
-                deleteOnLoad=1,
-                notBefore=datetime.datetime.utcnow()
-            )
-        yield inTransaction(self.store.newTransaction, operation)
-        # Wait for it to be executed.  Hopefully this does not time out :-\.
-        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
-        self.assertEquals(DummyWorkItem.results, {})
-    @inlineCallbacks
-    def test_locked(self):
-        """
-        L{JobItem.run} locks the work item.
-        """
-        DummyWorkPauseItem.workStarted = Deferred()
-        DummyWorkPauseItem.unpauseWork = Deferred()
-        @transactionally(self.store.newTransaction)
-        def _enqueue(txn):
-            return txn.enqueue(
-                DummyWorkPauseItem, a=30, b=40, workID=1
-            )
-        yield _enqueue
-        # Make sure we have one JOB and one DUMMY_WORK_ITEM
-        def checkJob(txn):
-            return JobItem.all(txn)
-        jobs = yield inTransaction(self.store.newTransaction, checkJob)
-        self.assertTrue(len(jobs) == 1)
-        yield DummyWorkPauseItem.workStarted
-        @transactionally(self.store.newTransaction)
-        def _trylock(txn):
-            job = yield JobItem.load(txn, jobs[0].jobID)
-            work = yield job.workItem()
-            locked = yield work.trylock()
-            self.assertFalse(locked)
-        yield _trylock
-        DummyWorkPauseItem.unpauseWork.callback(None)
-        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
-        jobs = yield inTransaction(self.store.newTransaction, checkJob)
-        self.assertTrue(len(jobs) == 0)
-    @inlineCallbacks
-    def test_overdueStillRunning(self):
-        """
-        Make sure an overdue work item that is still running gets its overdue value bumped.
-        """
-        # Patch JobItem.assign and JobItem.bumpOverdue to track how many times
-        # they are called. Also, change the overdue to be one second ahead of assigned.
-        assigned = [0]
-        _oldAssign = JobItem.assign
-        def _newAssign(self, when, overdue):
-            assigned[0] += 1
-            return _oldAssign(self, when, 1)
-        self.patch(JobItem, "assign", _newAssign)
-        bumped = [0]
-        _oldBumped = JobItem.bumpOverdue
-        def _newBump(self, bump):
-            bumped[0] += 1
-            return _oldBumped(self, 100)
-        self.patch(JobItem, "bumpOverdue", _newBump)
-        DummyWorkPauseItem.workStarted = Deferred()
-        DummyWorkPauseItem.unpauseWork = Deferred()
-        @transactionally(self.store.newTransaction)
-        def _enqueue(txn):
-            return txn.enqueue(
-                DummyWorkPauseItem, a=30, b=40, workID=1
-            )
-        yield _enqueue
-        # Make sure we have one JOB and one DUMMY_WORK_ITEM
-        def checkJob(txn):
-            return JobItem.all(txn)
-        jobs = yield inTransaction(self.store.newTransaction, checkJob)
-        self.assertTrue(len(jobs) == 1)
-        self.assertTrue(assigned[0] == 0)
-        self.assertTrue(bumped[0] == 0)
-        yield DummyWorkPauseItem.workStarted
-        jobs = yield inTransaction(self.store.newTransaction, checkJob)
-        self.assertTrue(len(jobs) == 1)
-        self.assertTrue(assigned[0] == 1)
-        self.assertTrue(bumped[0] == 0)
-        # Pause long enough that the overdue time is passed, which should result
-        # in the overdue value being bumped
-        d = Deferred()
-        reactor.callLater(2, lambda: d.callback(None))
-        yield d
-        jobs = yield inTransaction(self.store.newTransaction, checkJob)
-        self.assertTrue(len(jobs) == 1)
-        self.assertTrue(assigned[0] == 1)
-        self.assertTrue(bumped[0] == 1)
-        DummyWorkPauseItem.unpauseWork.callback(None)
-        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
-        jobs = yield inTransaction(self.store.newTransaction, checkJob)
-        self.assertTrue(len(jobs) == 0)
-        self.assertTrue(assigned[0] == 1)
-        self.assertTrue(bumped[0] == 1)
-    @inlineCallbacks
-    def test_overdueWorkGotLost(self):
-        """
-        Make sure an overdue work item that is not still running gets its overdue value bumped, and
-        eventually executed.
-        """
-        # Patch JobItem.assign and JobItem.bumpOverdue to track how many times
-        # they are called. Also, change the overdue to be one second ahead of assigned.
-        assigned = [0]
-        _oldAssign = JobItem.assign
-        def _newAssign(self, when, overdue):
-            assigned[0] += 1
-            return _oldAssign(self, when, 1)
-        self.patch(JobItem, "assign", _newAssign)
-        bumped = [0]
-        _oldBumped = JobItem.bumpOverdue
-        def _newBump(self, bump):
-            bumped[0] += 1
-            return _oldBumped(self, 5)
-        self.patch(JobItem, "bumpOverdue", _newBump)
-        failed = [0]
-        waitFail = Deferred()
-        def _newFailedToRun(self, locked=False, delay=None):
-            failed[0] += 1
-            waitFail.callback(None)
-            return succeed(None)
-        self.patch(JobItem, "failedToRun", _newFailedToRun)
-        def _newDoWorkRaise(self):
-            self.workStarted.callback(None)
-            raise ValueError()
-        def _newDoWorkSuccess(self):
-            return succeed(None)
-        DummyWorkPauseItem.workStarted = Deferred()
-        self.patch(DummyWorkPauseItem, "doWork", _newDoWorkRaise)
-        @transactionally(self.store.newTransaction)
-        def _enqueue(txn):
-            return txn.enqueue(
-                DummyWorkPauseItem, a=30, b=40, workID=1
-            )
-        yield _enqueue
-        # Make sure we have one JOB and one DUMMY_WORK_ITEM
-        def checkJob(txn):
-            return JobItem.all(txn)
-        jobs = yield inTransaction(self.store.newTransaction, checkJob)
-        self.assertTrue(len(jobs) == 1)
-        self.assertTrue(assigned[0] == 0)
-        self.assertTrue(bumped[0] == 0)
-        self.assertTrue(failed[0] == 0)
-        # Wait for work to fail once and reset it to succeed next time
-        yield DummyWorkPauseItem.workStarted
-        self.patch(DummyWorkPauseItem, "doWork", _newDoWorkSuccess)
-        yield waitFail
-        jobs = yield inTransaction(self.store.newTransaction, checkJob)
-        self.assertTrue(len(jobs) == 1)
-        self.assertTrue(assigned[0] == 1)
-        self.assertTrue(bumped[0] == 0)
-        self.assertTrue(failed[0] == 1)
-        # Wait for the overdue to be detected and the work restarted
-        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
-        jobs = yield inTransaction(self.store.newTransaction, checkJob)
-        self.assertTrue(len(jobs) == 0)
-        self.assertTrue(assigned[0] == 2)
-        self.assertTrue(bumped[0] == 0)
-        self.assertTrue(failed[0] == 1)
-    @inlineCallbacks
-    def test_lowPriorityOverdueWorkNotAssigned(self):
-        """
-        Make sure an overdue work item that is not still running gets its overdue value bumped, and
-        eventually executed.
-        """
-        # Patch the work item to fail once and appear as overdue
-        _oldAssign = JobItem.assign
-        def _newAssign(self, when, overdue):
-            return _oldAssign(self, when, 1)
-        self.patch(JobItem, "assign", _newAssign)
-        failed = [0]
-        waitFail = Deferred()
-        def _newFailedToRun(self, locked=False, delay=None):
-            failed[0] += 1
-            waitFail.callback(None)
-            return succeed(None)
-        self.patch(JobItem, "failedToRun", _newFailedToRun)
-        def _newDoWorkRaise(self):
-            self.workStarted.callback(None)
-            raise ValueError()
-        def _newDoWorkSuccess(self):
-            return succeed(None)
-        DummyWorkPauseItem.workStarted = Deferred()
-        self.patch(DummyWorkPauseItem, "doWork", _newDoWorkRaise)
-        @transactionally(self.store.newTransaction)
-        def _enqueue(txn):
-            return txn.enqueue(
-                DummyWorkPauseItem, a=30, b=40, workID=1
-            )
-        yield _enqueue
-        # Make sure we have one JOB and one DUMMY_WORK_ITEM
-        def checkJob(txn):
-            return JobItem.all(txn)
-        jobs = yield inTransaction(self.store.newTransaction, checkJob)
-        self.assertTrue(len(jobs) == 1)
-        self.assertTrue(failed[0] == 0)
-        # Wait for work to fail once and reset it to succeed next time
-        yield DummyWorkPauseItem.workStarted
-        self.patch(DummyWorkPauseItem, "doWork", _newDoWorkSuccess)
-        yield waitFail
-        jobs = yield inTransaction(self.store.newTransaction, checkJob)
-        self.assertTrue(len(jobs) == 1)
-        self.assertTrue(failed[0] == 1)
-        # Try to get the next high priority only job
-        @transactionally(self.store.newTransaction)
-        @inlineCallbacks
-        def _testNone(txn):
-            nowTime = datetime.datetime.utcfromtimestamp(reactor.seconds() + 10)
-            job = yield JobItem.nextjob(txn, nowTime, WORK_PRIORITY_HIGH)
-            self.assertTrue(job is None)
-        yield _testNone
-        # Wait for the overdue to be detected and the work restarted
-        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
-        jobs = yield inTransaction(self.store.newTransaction, checkJob)
-        self.assertTrue(len(jobs) == 0)
-        self.assertTrue(failed[0] == 1)
-    @inlineCallbacks
-    def test_aggregator_lock(self):
-        """
-        L{JobItem.run} fails an aggregated work item and then ignores it.
-        """
-        # Patch JobItem.failedToRun to track how many times it is called.
-        failed = [0]
-        _oldFailed = JobItem.failedToRun
-        def _newFailed(self, locked=False, delay=None):
-            failed[0] += 1
-            return _oldFailed(self, locked, 5)
-        self.patch(JobItem, "failedToRun", _newFailed)
-        @transactionally(self.store.newTransaction)
-        def _enqueue1(txn):
-            return txn.enqueue(
-                AggregatorWorkItem, a=1, b=1, workID=1
-            )
-        @transactionally(self.store.newTransaction)
-        def _enqueue2(txn):
-            return txn.enqueue(
-                AggregatorWorkItem, a=1, b=2, workID=2
-            )
-        yield _enqueue1
-        yield _enqueue2
-        # Make sure we have one JOB and one DUMMY_WORK_ITEM
-        def checkJob(txn):
-            return JobItem.all(txn)
-        jobs = yield inTransaction(self.store.newTransaction, checkJob)
-        self.assertTrue(len(jobs) == 2)
-        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
-        jobs = yield inTransaction(self.store.newTransaction, checkJob)
-        self.assertEqual(len(jobs), 0)
-        self.assertEqual(failed[0], 1)
-    @inlineCallbacks
-    def test_aggregator_no_deadlock(self):
-        """
-        L{JobItem.run} fails an aggregated work item and then ignores it.
-        """
-        # Patch JobItem.assign and JobItem.bumpOverdue to track how many times
-        # they are called.
-        failed = [0]
-        _oldFailed = JobItem.failedToRun
-        def _newFailed(self, locked=False, delay=None):
-            failed[0] += 1
-            return _oldFailed(self, locked, 5)
-        self.patch(JobItem, "failedToRun", _newFailed)
-        @transactionally(self.store.newTransaction)
-        def _enqueue1(txn):
-            return txn.enqueue(
-                AggregatorWorkItem, a=1, b=1, workID=1
-            )
-        @transactionally(self.store.newTransaction)
-        def _enqueue2(txn):
-            return txn.enqueue(
-                AggregatorWorkItem, a=1, b=1, workID=2
-            )
-        yield _enqueue1
-        yield _enqueue2
-        # Make sure we have one JOB and one DUMMY_WORK_ITEM
-        def checkJob(txn):
-            return JobItem.all(txn)
-        jobs = yield inTransaction(self.store.newTransaction, checkJob)
-        self.assertTrue(len(jobs) == 2)
-        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
-        jobs = yield inTransaction(self.store.newTransaction, checkJob)
-        self.assertTrue(len(jobs) == 0)
-        self.assertEqual(failed[0], 1)
-    @inlineCallbacks
-    def test_pollingBackoff(self):
-        """
-        Check that an idle queue backs off its polling and goes back to rapid polling
-        when a worker enqueues a job.
-        """
-        # Speed up the backoff process
-        self.patch(PeerConnectionPool, "queuePollingBackoff", ((1.0, 60.0),))
-        # Wait for backoff
-        while self.node1._actualPollInterval == self.node1.queuePollInterval:
-            d = Deferred()
-            reactor.callLater(1.0, lambda : d.callback(None))
-            yield d
-        self.assertEqual(self.node1._actualPollInterval, 60.0)
-        # TODO: this exact test should run against LocalQueuer as well.
-        def operation(txn):
-            # TODO: how does "enqueue" get associated with the transaction?
-            # This is not the fact with a raw t.w.enterprise transaction.
-            # Should probably do something with components.
-            return txn.enqueue(DummyWorkItem, a=3, b=4, jobID=100, workID=1,
-                               notBefore=datetime.datetime.utcnow())
-        yield inTransaction(self.store.newTransaction, operation)
-        # Backoff terminated
-        while self.node1._actualPollInterval != self.node1.queuePollInterval:
-            d = Deferred()
-            reactor.callLater(0.1, lambda : d.callback(None))
-            yield d
-        self.assertEqual(self.node1._actualPollInterval, self.node1.queuePollInterval)
-        # Wait for it to be executed.  Hopefully this does not time out :-\.
-        yield JobItem.waitEmpty(self.store.newTransaction, reactor, 60)
-        # Wait for backoff
-        while self.node1._actualPollInterval == self.node1.queuePollInterval:
-            d = Deferred()
-            reactor.callLater(1.0, lambda : d.callback(None))
-            yield d
-        self.assertEqual(self.node1._actualPollInterval, 60.0)
-class DummyProposal(object):
-    def __init__(self, *ignored):
-        pass
-    def _start(self):
-        pass
-class BaseQueuerTests(TestCase):
-    def setUp(self):
-        self.proposal = None
-        self.patch(twext.enterprise.jobqueue, "WorkProposal", DummyProposal)
-    def _proposalCallback(self, proposal):
-        self.proposal = proposal
-    @inlineCallbacks
-    def test_proposalCallbacks(self):
-        queuer = _BaseQueuer()
-        queuer.callWithNewProposals(self._proposalCallback)
-        self.assertEqual(self.proposal, None)
-        yield queuer.enqueueWork(None, None)
-        self.assertNotEqual(self.proposal, None)
-class NonPerformingQueuerTests(TestCase):
-    @inlineCallbacks
-    def test_choosePerformer(self):
-        queuer = NonPerformingQueuer()
-        performer = queuer.choosePerformer()
-        result = (yield performer.performJob(None))
-        self.assertEquals(result, None)

Modified: twext/trunk/twext/enterprise/test/test_queue.py
--- twext/trunk/twext/enterprise/test/test_queue.py	2015-08-06 21:04:52 UTC (rev 15034)
+++ twext/trunk/twext/enterprise/test/test_queue.py	2015-08-07 20:06:34 UTC (rev 15035)
@@ -775,7 +775,7 @@
         def doit(txn):
-            for statement in splitSQLString(schemaText):
+            for statement in splitSQLString(nodeSchema + schemaText):
                 yield txn.execSQL(statement)
         yield inTransaction(
@@ -794,6 +794,7 @@
             def deletestuff(txn):
                 for stmt in dropSQL:
                     yield txn.execSQL(stmt)
+                txn.execSQL("drop table node_info")
             return inTransaction(
                 lambda *a: self.store.newTransaction(*a), deletestuff
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150807/520194f8/attachment-0001.html>

More information about the calendarserver-changes mailing list