[CalendarServer-changes] [12779] twext/branches/users/cdaboo/jobs/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Sat Mar 1 07:42:17 PST 2014


Revision: 12779
          http://trac.calendarserver.org//changeset/12779
Author:   cdaboo at apple.com
Date:     2014-03-01 07:42:17 -0800 (Sat, 01 Mar 2014)
Log Message:
-----------
New job queue based work queue implementation. Current this does just what the old queue did, but with
a centralized "job" queue where all work items are listed. Eventually this will be made smarter to
automatically deal with coalescing work, repetitive work, pooled work, as well as eliminate head-of-line
blocking.

Modified Paths:
--------------
    twext/branches/users/cdaboo/jobs/twext/enterprise/adbapi2.py
    twext/branches/users/cdaboo/jobs/twext/enterprise/dal/model.py
    twext/branches/users/cdaboo/jobs/twext/enterprise/dal/parseschema.py
    twext/branches/users/cdaboo/jobs/twext/enterprise/dal/record.py
    twext/branches/users/cdaboo/jobs/twext/enterprise/dal/test/test_parseschema.py
    twext/branches/users/cdaboo/jobs/twext/enterprise/ienterprise.py
    twext/branches/users/cdaboo/jobs/twext/enterprise/test/test_adbapi2.py

Added Paths:
-----------
    twext/branches/users/cdaboo/jobs/twext/enterprise/jobqueue.py
    twext/branches/users/cdaboo/jobs/twext/enterprise/test/test_jobqueue.py

Modified: twext/branches/users/cdaboo/jobs/twext/enterprise/adbapi2.py
===================================================================
--- twext/branches/users/cdaboo/jobs/twext/enterprise/adbapi2.py	2014-03-01 15:17:14 UTC (rev 12778)
+++ twext/branches/users/cdaboo/jobs/twext/enterprise/adbapi2.py	2014-03-01 15:42:17 UTC (rev 12779)
@@ -155,12 +155,12 @@
     noisy = False
 
     def __init__(self, pool, threadHolder, connection, cursor):
-        self._pool       = pool
-        self._completed  = "idle"
-        self._cursor     = cursor
+        self._pool = pool
+        self._completed = "idle"
+        self._cursor = cursor
         self._connection = connection
-        self._holder     = threadHolder
-        self._first      = True
+        self._holder = threadHolder
+        self._first = True
 
 
     @_forward
@@ -169,14 +169,12 @@
         The paramstyle attribute is mirrored from the connection pool.
         """
 
-
     @_forward
     def dialect(self):
         """
         The dialect attribute is mirrored from the connection pool.
         """
 
-
     def _reallyExecSQL(self, sql, args=None, raiseOnZeroRowCount=None):
         """
         Execute the given SQL on a thread, using a DB-API 2.0 cursor.
@@ -280,7 +278,7 @@
                 # that we cannot workaround or address automatically, so no
                 # try:except: for them.
                 self._connection = self._pool.connectionFactory()
-                self._cursor     = self._connection.cursor()
+                self._cursor = self._connection.cursor()
 
                 # Note that although this method is being invoked recursively,
                 # the "_first" flag is re-set at the very top, so we will _not_
@@ -382,9 +380,9 @@
         transaction.
         """
         self._completed = "released"
-        self._stopped   = True
-        holder          = self._holder
-        self._holder    = None
+        self._stopped = True
+        holder = self._holder
+        self._holder = None
 
         def _reallyClose():
             if self._cursor is None:
@@ -417,8 +415,8 @@
         return fail(ConnectionError(self.reason))
 
     execSQL = _everything
-    commit  = _everything
-    abort   = _everything
+    commit = _everything
+    abort = _everything
 
 
 
@@ -593,12 +591,12 @@
 
     def __init__(self, pool, baseTxn):
         super(_SingleTxn, self).__init__()
-        self._pool           = pool
-        self._baseTxn        = baseTxn
-        self._completed      = False
-        self._currentBlock   = None
-        self._blockedQueue   = None
-        self._pendingBlocks  = []
+        self._pool = pool
+        self._baseTxn = baseTxn
+        self._completed = False
+        self._currentBlock = None
+        self._blockedQueue = None
+        self._pendingBlocks = []
         self._stillExecuting = []
 
 
@@ -615,7 +613,7 @@
         implementation of L{IAsyncTransaction} that will actually do the work;
         either a L{_ConnectedTxn} or a L{_NoTxn}.
         """
-        spooledBase   = self._baseTxn
+        spooledBase = self._baseTxn
         self._baseTxn = baseTxn
         spooledBase._unspool(baseTxn)
 
@@ -892,8 +890,8 @@
             and subsequent SQL executions for this connection.
         @type holder: L{ThreadHolder}
         """
-        self._pool    = pool
-        self._holder  = holder
+        self._pool = pool
+        self._holder = holder
         self._aborted = False
 
 
@@ -992,11 +990,11 @@
         if name is not None:
             self.name = name
 
-        self._free       = []
-        self._busy       = []
-        self._waiting    = []
-        self._finishing  = []
-        self._stopping   = False
+        self._free = []
+        self._busy = []
+        self._waiting = []
+        self._finishing = []
+        self._stopping = False
 
 
     def startService(self):
@@ -1107,7 +1105,7 @@
             # support threadlevel=1; we can't necessarily cursor() in a
             # different thread than we do transactions in.
             connection = self.connectionFactory()
-            cursor     = connection.cursor()
+            cursor = connection.cursor()
             return (connection, cursor)
 
         def finishInit((connection, cursor)):
@@ -1364,8 +1362,8 @@
         Initialize a mapping of transaction IDs to transaction objects.
         """
         super(ConnectionPoolConnection, self).__init__()
-        self.pool    = pool
-        self._txns   = {}
+        self.pool = pool
+        self._txns = {}
         self._blocks = {}
 
 
@@ -1479,10 +1477,10 @@
     ):
         # See DEFAULT_PARAM_STYLE FIXME above.
         super(ConnectionPoolClient, self).__init__()
-        self._nextID    = count().next
-        self._txns      = weakref.WeakValueDictionary()
-        self._queries   = {}
-        self.dialect    = dialect
+        self._nextID = count().next
+        self._txns = weakref.WeakValueDictionary()
+        self._queries = {}
+        self.dialect = dialect
         self.paramstyle = paramstyle
 
 
@@ -1507,8 +1505,8 @@
 
         @rtype: L{IAsyncTransaction}
         """
-        txnid             = str(self._nextID())
-        txn               = _NetTransaction(client=self, transactionID=txnid)
+        txnid = str(self._nextID())
+        txn = _NetTransaction(client=self, transactionID=txnid)
         self._txns[txnid] = txn
         self.callRemote(StartTxn, transactionID=txnid)
         return txn
@@ -1529,10 +1527,10 @@
 
 class _Query(object):
     def __init__(self, sql, raiseOnZeroRowCount, args):
-        self.sql                 = sql
-        self.args                = args
-        self.results             = []
-        self.deferred            = Deferred()
+        self.sql = sql
+        self.args = args
+        self.results = []
+        self.deferred = Deferred()
         self.raiseOnZeroRowCount = raiseOnZeroRowCount
 
 
@@ -1606,11 +1604,11 @@
         transaction identifier.
         """
         super(_NetTransaction, self).__init__()
-        self._client        = client
+        self._client = client
         self._transactionID = transactionID
-        self._completed     = False
-        self._committing    = False
-        self._committed     = False
+        self._completed = False
+        self._committing = False
+        self._committed = False
 
 
     @property

Modified: twext/branches/users/cdaboo/jobs/twext/enterprise/dal/model.py
===================================================================
--- twext/branches/users/cdaboo/jobs/twext/enterprise/dal/model.py	2014-03-01 15:17:14 UTC (rev 12778)
+++ twext/branches/users/cdaboo/jobs/twext/enterprise/dal/model.py	2014-03-01 15:42:17 UTC (rev 12779)
@@ -30,6 +30,7 @@
     "Index",
     "PseudoIndex",
     "Sequence",
+    "Function",
     "Schema",
 ]
 
@@ -516,6 +517,36 @@
 
 
 
+class Function(FancyEqMixin, object):
+    """
+    A function object.
+    """
+
+    compareAttributes = "name".split()
+
+    def __init__(self, schema, name):
+        _checkstr(name)
+        self.name = name
+        schema.functions.append(self)
+
+
+    def __repr__(self):
+        return "<Function %r>" % (self.name,)
+
+
+    def compare(self, other):
+        """
+        Return the differences between two functions.
+
+        @param other: the function to compare with
+        @type other: L{Function}
+        """
+
+        # TODO: ought to compare function body but we don't track that
+        return []
+
+
+
 def _namedFrom(name, sequence):
     """
     Retrieve an item with a given name attribute from a given sequence, or
@@ -538,6 +569,7 @@
         self.tables = []
         self.indexes = []
         self.sequences = []
+        self.functions = []
 
 
     def __repr__(self):
@@ -576,6 +608,7 @@
         _compareLists(self.tables, other.tables, "table")
         _compareLists(self.pseudoIndexes(), other.pseudoIndexes(), "index")
         _compareLists(self.sequences, other.sequences, "sequence")
+        _compareLists(self.functions, other.functions, "functions")
 
         return results
 
@@ -617,3 +650,7 @@
 
     def indexNamed(self, name):
         return _namedFrom(name, self.indexes)
+
+
+    def functionNamed(self, name):
+        return _namedFrom(name, self.functions)

Modified: twext/branches/users/cdaboo/jobs/twext/enterprise/dal/parseschema.py
===================================================================
--- twext/branches/users/cdaboo/jobs/twext/enterprise/dal/parseschema.py	2014-03-01 15:17:14 UTC (rev 12778)
+++ twext/branches/users/cdaboo/jobs/twext/enterprise/dal/parseschema.py	2014-03-01 15:42:17 UTC (rev 12779)
@@ -43,7 +43,7 @@
                           Function, Comparison)
 
 from twext.enterprise.dal.model import (
-    Schema, Table, SQLType, ProcedureCall, Constraint, Sequence, Index)
+    Schema, Table, SQLType, ProcedureCall, Constraint, Sequence, Index, Function as FunctionModel)
 
 from twext.enterprise.dal.syntax import (
     ColumnSyntax, CompoundComparison, Constant, Function as FunctionSyntax
@@ -208,6 +208,12 @@
                     columnName = nameOrIdentifier(token)
                     idx.addColumn(idx.table.columnNamed(columnName))
 
+            elif createType == u"FUNCTION":
+                FunctionModel(
+                    schema,
+                    stmt.token_next(2, True).value.encode("utf-8")
+                )
+
         elif stmt.get_type() == "INSERT":
             insertTokens = iterSignificant(stmt)
             expect(insertTokens, ttype=Keyword.DML, value="INSERT")
@@ -234,6 +240,15 @@
 
             schema.tableNamed(tableName).insertSchemaRow(rowData)
 
+        elif stmt.get_type() == "CREATE OR REPLACE":
+            createType = stmt.token_next(1, True).value.upper()
+
+            if createType == u"FUNCTION":
+                FunctionModel(
+                    schema,
+                    stmt.token_next(2, True).token_first(True).token_first(True).value.encode("utf-8")
+                )
+
         else:
             print("unknown type:", stmt.get_type())
 

Modified: twext/branches/users/cdaboo/jobs/twext/enterprise/dal/record.py
===================================================================
--- twext/branches/users/cdaboo/jobs/twext/enterprise/dal/record.py	2014-03-01 15:17:14 UTC (rev 12778)
+++ twext/branches/users/cdaboo/jobs/twext/enterprise/dal/record.py	2014-03-01 15:42:17 UTC (rev 12779)
@@ -171,6 +171,23 @@
         return r
 
 
+    @classmethod
+    def fromTable(cls, table):
+        """
+        Initialize from a L{Table} at run time.
+
+        @param table: table containing the record data
+        @type table: L{Table}
+        """
+        cls.__attrmap__ = {}
+        cls.__colmap__ = {}
+        allColumns = list(table)
+        for column in allColumns:
+            attrname = cls.namingConvention(column.model.name)
+            cls.__attrmap__[attrname] = column
+            cls.__colmap__[column] = attrname
+
+
     @staticmethod
     def namingConvention(columnName):
         """
@@ -274,7 +291,7 @@
         """
         for setAttribute, setValue in attributeList:
             setColumn = self.__attrmap__[setAttribute]
-            if setColumn.model.type.name == "timestamp":
+            if setColumn.model.type.name == "timestamp" and setValue is not None:
                 setValue = parseSQLTimestamp(setValue)
             setattr(self, setAttribute, setValue)
 
@@ -335,7 +352,7 @@
 
 
     @classmethod
-    def query(cls, transaction, expr, order=None, ascending=True, group=None):
+    def query(cls, transaction, expr, order=None, ascending=True, group=None, forUpdate=False, noWait=False):
         """
         Query the table that corresponds to C{cls}, and return instances of
         C{cls} corresponding to the rows that are returned from that table.
@@ -353,15 +370,29 @@
 
         @param group: a L{ColumnSyntax} to group the resulting record objects
             by.
+
+        @param forUpdate: do a SELECT ... FOR UPDATE
+        @type forUpdate: L{bool}
+        @param noWait: include NOWAIT with the FOR UPDATE
+        @type noWait: L{bool}
         """
         kw = {}
         if order is not None:
             kw.update(OrderBy=order, Ascending=ascending)
         if group is not None:
             kw.update(GroupBy=group)
+        if forUpdate:
+            kw.update(ForUpdate=True)
+            if noWait:
+                kw.update(NoWait=True)
         return cls._rowsFromQuery(
             transaction,
-            Select(list(cls.table), From=cls.table, Where=expr, **kw),
+            Select(
+                list(cls.table),
+                From=cls.table,
+                Where=expr,
+                **kw
+            ),
             None
         )
 

Modified: twext/branches/users/cdaboo/jobs/twext/enterprise/dal/test/test_parseschema.py
===================================================================
--- twext/branches/users/cdaboo/jobs/twext/enterprise/dal/test/test_parseschema.py	2014-03-01 15:17:14 UTC (rev 12778)
+++ twext/branches/users/cdaboo/jobs/twext/enterprise/dal/test/test_parseschema.py	2014-03-01 15:42:17 UTC (rev 12779)
@@ -403,3 +403,27 @@
                 "z-unique:(c)",
             ))
         )
+
+
+    def test_functions(self):
+        """
+        A 'create (or replace) function' statement will add an L{Function} object to a L{Schema}'s
+        C{functions} list.
+        """
+        s = self.schemaFromString(
+            """
+CREATE FUNCTION increment(i integer) RETURNS integer AS $$
+BEGIN
+    RETURN i + 1;
+END;
+$$ LANGUAGE plpgsql;
+CREATE OR REPLACE FUNCTION decrement(i integer) RETURNS integer AS $$
+BEGIN
+    RETURN i - 1;
+END;
+$$ LANGUAGE plpgsql;
+            """
+        )
+        self.assertTrue(s.functionNamed("increment") is not None)
+        self.assertTrue(s.functionNamed("decrement") is not None)
+        self.assertRaises(KeyError, s.functionNamed, "merge")

Modified: twext/branches/users/cdaboo/jobs/twext/enterprise/ienterprise.py
===================================================================
--- twext/branches/users/cdaboo/jobs/twext/enterprise/ienterprise.py	2014-03-01 15:17:14 UTC (rev 12778)
+++ twext/branches/users/cdaboo/jobs/twext/enterprise/ienterprise.py	2014-03-01 15:42:17 UTC (rev 12779)
@@ -115,7 +115,6 @@
             transaction has already been committed or rolled back.
         """
 
-
     def preCommit(operation):
         """
         Perform the given operation when this L{IAsyncTransaction}'s C{commit}
@@ -128,7 +127,6 @@
             will not fire until that L{Deferred} fires.
         """
 
-
     def postCommit(operation):
         """
         Perform the given operation only after this L{IAsyncTransaction}
@@ -140,7 +138,6 @@
             will not fire until that L{Deferred} fires.
         """
 
-
     def abort():
         """
         Roll back changes caused by this transaction.
@@ -149,7 +146,6 @@
             rollback of this transaction.
         """
 
-
     def postAbort(operation):
         """
         Invoke a callback after abort.
@@ -160,7 +156,6 @@
             L{Deferred}.
         """
 
-
     def commandBlock():
         """
         Create an object which will cause the commands executed on it to be
@@ -257,7 +252,6 @@
         @return: the concrete value which should be passed to the DB-API layer.
         """
 
-
     def postQuery(cursor):
         """
         After running a query, invoke this method in the DB-API thread.
@@ -288,17 +282,16 @@
 
         @param workItemType: the type of work item to create.
         @type workItemType: L{type}, specifically, a subtype of L{WorkItem
-            <twext.enterprise.queue.WorkItem>}
+            <twext.enterprise.jobqueue.WorkItem>}
 
         @param kw: The keyword parameters are relayed to C{workItemType.create}
             to create an appropriately initialized item.
 
         @return: a work proposal that allows tracking of the various phases of
             completion of the work item.
-        @rtype: L{twext.enterprise.queue.WorkItem}
+        @rtype: L{twext.enterprise.jobqueue.WorkItem}
         """
 
-
     def callWithNewProposals(self, callback):
         """
         Tells the IQueuer to call a callback method whenever a new WorkProposal
@@ -308,7 +301,6 @@
             L{WorkProposal}
         """
 
-
     def transferProposalCallbacks(self, newQueuer):
         """
         Transfer the registered callbacks to the new queuer.

Added: twext/branches/users/cdaboo/jobs/twext/enterprise/jobqueue.py
===================================================================
--- twext/branches/users/cdaboo/jobs/twext/enterprise/jobqueue.py	                        (rev 0)
+++ twext/branches/users/cdaboo/jobs/twext/enterprise/jobqueue.py	2014-03-01 15:42:17 UTC (rev 12779)
@@ -0,0 +1,1592 @@
+# -*- test-case-name: twext.enterprise.test.test_queue -*-
+##
+# Copyright (c) 2012-2014 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+
+"""
+L{twext.enterprise.jobqueue} is an U{eventually consistent
+<https://en.wikipedia.org/wiki/Eventual_consistency>} task-queuing system for
+use by applications with multiple front-end servers talking to a single
+database instance, that want to defer and parallelize work that involves
+storing the results of computation.
+
+By enqueuing with L{twisted.enterprise.queue}, you may guarantee that the work
+will I{eventually} be done, and reliably commit to doing it in the future, but
+defer it if it does not need to be done I{now}.
+
+To pick a hypothetical example, let's say that you have a store which wants to
+issue a promotional coupon based on a customer loyalty program, in response to
+an administrator clicking on a button.  Determining the list of customers to
+send the coupon to is quick: a simple query will get you all their names.
+However, analyzing each user's historical purchase data is (A) time consuming
+and (B) relatively isolated, so it would be good to do that in parallel, and it
+would also be acceptable to have that happen at a later time, outside the
+critical path.
+
+Such an application might be implemented with this queuing system like so::
+
+    from twext.enterprise.jobqueue import WorkItem, queueFromTransaction
+    from twext.enterprise.dal.parseschema import addSQLToSchema
+    from twext.enterprise.dal.syntax import SchemaSyntax
+
+    schemaModel = Schema()
+    addSQLToSchema('''
+        create table CUSTOMER (NAME varchar(255), ID integer primary key);
+        create table PRODUCT (NAME varchar(255), ID integer primary key);
+        create table PURCHASE (NAME varchar(255), WHEN timestamp,
+                               CUSTOMER_ID integer references CUSTOMER,
+                               PRODUCT_ID integer references PRODUCT;
+        create table COUPON_WORK (WORK_ID integer primary key,
+                                  CUSTOMER_ID integer references CUSTOMER);
+        create table COUPON (ID integer primary key,
+                            CUSTOMER_ID integer references customer,
+                            AMOUNT integer);
+    ''')
+    schema = SchemaSyntax(schemaModel)
+
+    class Coupon(Record, fromTable(schema.COUPON_WORK)):
+        pass
+
+    class CouponWork(WorkItem, fromTable(schema.COUPON_WORK)):
+        @inlineCallbacks
+        def doWork(self):
+            purchases = yield Select(schema.PURCHASE,
+                                     Where=schema.PURCHASE.CUSTOMER_ID
+                                     == self.customerID).on(self.transaction)
+            couponAmount = yield doSomeMathThatTakesAWhile(purchases)
+            yield Coupon.create(customerID=self.customerID,
+                                amount=couponAmount)
+
+    @inlineCallbacks
+    def makeSomeCoupons(txn):
+        # Note, txn was started before, will be committed later...
+        for customerID in (yield Select([schema.CUSTOMER.CUSTOMER_ID],
+                                        From=schema.CUSTOMER).on(txn)):
+            # queuer is a provider of IQueuer, of which there are several
+            # implementations in this module.
+            queuer.enqueueWork(txn, CouponWork, customerID=customerID)
+"""
+
+from functools import wraps
+from datetime import datetime
+
+from zope.interface import implements
+
+from twisted.application.service import MultiService
+from twisted.internet.protocol import Factory
+from twisted.internet.defer import (
+    inlineCallbacks, returnValue, Deferred, passthru, succeed
+)
+from twisted.internet.endpoints import TCP4ClientEndpoint
+from twisted.protocols.amp import AMP, Command, Integer, String
+from twisted.python.reflect import qual
+from twisted.python import log
+
+from twext.enterprise.dal.syntax import SchemaSyntax, Lock, NamedValue, Select, \
+    Count
+
+from twext.enterprise.dal.model import ProcedureCall
+from twext.enterprise.dal.record import Record, fromTable, NoSuchRecord
+from twisted.python.failure import Failure
+
+from twext.enterprise.dal.model import Table, Schema, SQLType, Constraint
+from twisted.internet.endpoints import TCP4ServerEndpoint
+from twext.enterprise.ienterprise import IQueuer
+from zope.interface.interface import Interface
+from twext.enterprise.locking import NamedLock
+
+
+class _IJobPerformer(Interface):
+    """
+    An object that can perform work.
+
+    Internal interface; implemented by several classes here since work has to
+    (in the worst case) pass from worker->controller->controller->worker.
+    """
+
+    def performJob(jobID): #@NoSelf
+        """
+        @param jobID: The primary key identifier of the given job.
+        @type jobID: L{int}
+
+        @return: a L{Deferred} firing with an empty dictionary when the work is
+            complete.
+        @rtype: L{Deferred} firing L{dict}
+        """
+
+
+
+def makeNodeSchema(inSchema):
+    """
+    Create a self-contained schema for L{NodeInfo} to use, in C{inSchema}.
+
+    @param inSchema: a L{Schema} to add the node-info table to.
+    @type inSchema: L{Schema}
+
+    @return: a schema with just the one table.
+    """
+    # Initializing this duplicate schema avoids a circular dependency, but this
+    # should really be accomplished with independent schema objects that the
+    # transaction is made aware of somehow.
+    NodeTable = Table(inSchema, "NODE_INFO")
+
+    NodeTable.addColumn("HOSTNAME", SQLType("varchar", 255))
+    NodeTable.addColumn("PID", SQLType("integer", None))
+    NodeTable.addColumn("PORT", SQLType("integer", None))
+    NodeTable.addColumn("TIME", SQLType("timestamp", None)).setDefaultValue(
+        # Note: in the real data structure, this is actually a not-cleaned-up
+        # sqlparse internal data structure, but it *should* look closer to
+        # this.
+        ProcedureCall("timezone", ["UTC", NamedValue("CURRENT_TIMESTAMP")])
+    )
+    for column in NodeTable.columns:
+        NodeTable.tableConstraint(Constraint.NOT_NULL, [column.name])
+    NodeTable.primaryKey = [
+        NodeTable.columnNamed("HOSTNAME"),
+        NodeTable.columnNamed("PORT"),
+    ]
+
+    return inSchema
+
+NodeInfoSchema = SchemaSyntax(makeNodeSchema(Schema(__file__)))
+
+
+
+def makeJobSchema(inSchema):
+    """
+    Create a self-contained schema for L{JobInfo} to use, in C{inSchema}.
+
+    @param inSchema: a L{Schema} to add the node-info table to.
+    @type inSchema: L{Schema}
+
+    @return: a schema with just the one table.
+    """
+    # Initializing this duplicate schema avoids a circular dependency, but this
+    # should really be accomplished with independent schema objects that the
+    # transaction is made aware of somehow.
+    JobTable = Table(inSchema, "JOB")
+
+    JobTable.addColumn("JOB_ID", SQLType("integer", None)).setDefaultValue(
+        ProcedureCall("nextval", ["JOB_SEQ"])
+    )
+    JobTable.addColumn("WORK_TYPE", SQLType("varchar", 255))
+    JobTable.addColumn("PRIORITY", SQLType("integer", 0))
+    JobTable.addColumn("WEIGHT", SQLType("integer", 0))
+    JobTable.addColumn("NOT_BEFORE", SQLType("timestamp", None))
+    JobTable.addColumn("NOT_AFTER", SQLType("timestamp", None))
+    for column in ("JOB_ID", "WORK_TYPE"):
+        JobTable.tableConstraint(Constraint.NOT_NULL, [column])
+    JobTable.primaryKey = [JobTable.columnNamed("JOB_ID"), ]
+
+    return inSchema
+
+JobInfoSchema = SchemaSyntax(makeJobSchema(Schema(__file__)))
+
+
+
+ at inlineCallbacks
+def inTransaction(transactionCreator, operation):
+    """
+    Perform the given operation in a transaction, committing or aborting as
+    required.
+
+    @param transactionCreator: a 0-arg callable that returns an
+        L{IAsyncTransaction}
+
+    @param operation: a 1-arg callable that takes an L{IAsyncTransaction} and
+        returns a value.
+
+    @return: a L{Deferred} that fires with C{operation}'s result or fails with
+        its error, unless there is an error creating, aborting or committing
+        the transaction.
+    """
+    txn = transactionCreator()
+    try:
+        result = yield operation(txn)
+    except:
+        f = Failure()
+        yield txn.abort()
+        returnValue(f)
+    else:
+        yield txn.commit()
+        returnValue(result)
+
+
+
+def astimestamp(v):
+    """
+    Convert the given datetime to a POSIX timestamp.
+    """
+    return (v - datetime.utcfromtimestamp(0)).total_seconds()
+
+
+
+class NodeInfo(Record, fromTable(NodeInfoSchema.NODE_INFO)):
+    """
+    A L{NodeInfo} is information about a currently-active Node process.
+    """
+
+    def endpoint(self, reactor):
+        """
+        Create an L{IStreamServerEndpoint} that will talk to the node process
+        that is described by this L{NodeInfo}.
+
+        @return: an endpoint that will connect to this host.
+        @rtype: L{IStreamServerEndpoint}
+        """
+        return TCP4ClientEndpoint(reactor, self.hostname, self.port)
+
+
+
+def abstract(thunk):
+    """
+    The decorated function is abstract.
+
+    @note: only methods are currently supported.
+    """
+    @classmethod
+    @wraps(thunk)
+    def inner(cls, *a, **k):
+        raise NotImplementedError(
+            qual(cls) + " does not implement " + thunk.func_name
+        )
+    return inner
+
+
+
+class JobItem(Record, fromTable(JobInfoSchema.JOB)):
+    """
+    An item in the job table. This is typically not directly used by code creating
+    work items, but rather is used for internal book keeping of jobs associated with
+    work items.
+    """
+
+    @inlineCallbacks
+    def getWorkForJob(self):
+        workItemClass = WorkItem.forTableName(self.workType)
+        workItems = yield workItemClass.loadForJob(self.transaction, self.jobID)
+        returnValue(workItems[0] if len(workItems) == 1 else None)
+
+
+    @inlineCallbacks
+    def run(self):
+        """
+        Run this job item by finding the appropriate work item class and running that.
+        """
+        workItem = yield self.getWorkForJob()
+        if workItem is not None:
+            if workItem.group is not None:
+                yield NamedLock.acquire(self.transaction, workItem.group)
+
+            try:
+                # Once the work is done we delete ourselves
+                yield workItem.delete()
+            except NoSuchRecord:
+                # The record has already been removed
+                pass
+            else:
+                yield workItem.doWork()
+
+        try:
+            # Once the work is done we delete ourselves
+            yield self.delete()
+        except NoSuchRecord:
+            # The record has already been removed
+            pass
+
+
+    @classmethod
+    @inlineCallbacks
+    def histogram(cls, txn):
+        """
+        Generate a histogram of work items currently in the queue.
+        """
+        jb = JobInfoSchema.JOB
+        rows = yield Select(
+            [jb.WORK_TYPE, Count(jb.WORK_TYPE)],
+            From=jb,
+            GroupBy=jb.WORK_TYPE
+        ).on(txn)
+        results = dict(rows)
+
+        # Add in empty data for other work
+        allwork = WorkItem.__subclasses__()
+        for workitem in allwork:
+            if workitem.table.model.name not in results:
+                results[workitem.table.model.name] = 0
+        returnValue(results)
+
+
+    @classmethod
+    def numberOfWorkTypes(cls):
+        return len(WorkItem.__subclasses__())
+
+
+# Priority for work - used to order work items in the job queue
+WORK_PRIORITY_LOW = 1
+WORK_PRIORITY_MEDIUM = 2
+WORK_PRIORITY_HIGH = 3
+
+
+class WorkItem(Record):
+    """
+    A L{WorkItem} is an item of work which may be stored in a database, then
+    executed later.
+
+    L{WorkItem} is an abstract class, since it is a L{Record} with no table
+    associated via L{fromTable}.  Concrete subclasses must associate a specific
+    table by inheriting like so::
+
+        class MyWorkItem(WorkItem, fromTable(schema.MY_TABLE)):
+
+    Concrete L{WorkItem}s should generally not be created directly; they are
+    both created and thereby implicitly scheduled to be executed by calling
+    L{enqueueWork <twext.enterprise.ienterprise.IQueuer.enqueueWork>} with the
+    appropriate L{WorkItem} concrete subclass.  There are different queue
+    implementations (L{PeerConnectionPool} and L{LocalQueuer}, for example), so
+    the exact timing and location of the work execution may differ.
+
+    L{WorkItem}s may be constrained in the ordering and timing of their
+    execution, to control concurrency and for performance reasons respectively.
+
+    Although all the usual database mutual-exclusion rules apply to work
+    executed in L{WorkItem.doWork}, implicit database row locking is not always
+    the best way to manage concurrency.  They have some problems, including:
+
+        - implicit locks are easy to accidentally acquire out of order, which
+          can lead to deadlocks
+
+        - implicit locks are easy to forget to acquire correctly - for example,
+          any read operation which subsequently turns into a write operation
+          must have been acquired with C{Select(..., ForUpdate=True)}, but it
+          is difficult to consistently indicate that methods which abstract out
+          read operations must pass this flag in certain cases and not others.
+
+        - implicit locks are held until the transaction ends, which means that
+          if expensive (long-running) queue operations share the same lock with
+          cheap (short-running) queue operations or user interactions, the
+          cheap operations all have to wait for the expensive ones to complete,
+          but continue to consume whatever database resources they were using.
+
+    In order to ameliorate these problems with potentially concurrent work
+    that uses the same resources, L{WorkItem} provides a database-wide mutex
+    that is automatically acquired at the beginning of the transaction and
+    released at the end.  To use it, simply L{align
+    <twext.enterprise.dal.record.Record.namingConvention>} the C{group}
+    attribute on your L{WorkItem} subclass with a column holding a string
+    (varchar).  L{WorkItem} subclasses with the same value for C{group} will
+    not execute their C{doWork} methods concurrently.  Furthermore, if the lock
+    cannot be quickly acquired, database resources associated with the
+    transaction attempting it will be released, and the transaction rolled back
+    until a future transaction I{can} can acquire it quickly.  If you do not
+    want any limits to concurrency, simply leave it set to C{None}.
+
+    In some applications it's possible to coalesce work together; to grab
+    multiple L{WorkItem}s in one C{doWork} transaction.  All you need to do is
+    to delete the rows which back other L{WorkItem}s from the database, and
+    they won't be processed.  Using the C{group} attribute, you can easily
+    prevent concurrency so that you can easily group these items together and
+    remove them as a set (otherwise, other workers might be attempting to
+    concurrently work on them and you'll get deletion errors).
+
+    However, if doing more work at once is less expensive, and you want to
+    avoid processing lots of individual rows in tiny transactions, you may also
+    delay the execution of a L{WorkItem} by setting its C{notBefore} attribute.
+    This must be backed by a database timestamp, so that processes which happen
+    to be restarting and examining the work to be done in the database don't
+    jump the gun and do it too early.
+
+    @cvar workID: the unique identifier (primary key) for items of this type.
+        On an instance of a concrete L{WorkItem} subclass, this attribute must
+        be an integer; on the concrete L{WorkItem} subclass itself, this
+        attribute must be a L{twext.enterprise.dal.syntax.ColumnSyntax}.  Note
+        that this is automatically taken care of if you simply have a
+        corresponding C{work_id} column in the associated L{fromTable} on your
+        L{WorkItem} subclass.  This column must be unique, and it must be an
+        integer.  In almost all cases, this column really ought to be filled
+        out by a database-defined sequence; if not, you need some other
+        mechanism for establishing a cluster-wide sequence.
+    @type workID: L{int} on instance,
+        L{twext.enterprise.dal.syntax.ColumnSyntax} on class.
+
+    @cvar notBefore: the timestamp before which this item should I{not} be
+        processed.  If unspecified, this should be the date and time of the
+        creation of the L{WorkItem}.
+    @type notBefore: L{datetime.datetime} on instance,
+        L{twext.enterprise.dal.syntax.ColumnSyntax} on class.
+
+    @ivar group: If not C{None}, a unique-to-the-database identifier for which
+        only one L{WorkItem} will execute at a time.
+    @type group: L{unicode} or L{NoneType}
+    """
+
+    group = None
+    priority = WORK_PRIORITY_LOW    # Default - subclasses should override
+
+
+    @classmethod
+    @inlineCallbacks
+    def makeJob(cls, transaction, **kwargs):
+        """
+        A new work item needs to be created. First we create a Job record, then we create
+        the actual work item related to the job.
+
+        @param transaction: the transaction to use
+        @type transaction: L{IAsyncTransaction}
+        """
+
+        jobargs = {
+            "workType": cls.table.model.name
+        }
+        def _transferArg(name):
+            if name in kwargs:
+                jobargs[name] = kwargs[name]
+                del kwargs[name]
+
+        _transferArg("jobID")
+        if "priority" in kwargs:
+            _transferArg("priority")
+        else:
+            jobargs["priority"] = cls.priority
+        _transferArg("weight")
+        _transferArg("notBefore")
+        _transferArg("notAfter")
+
+        job = yield JobItem.create(transaction, **jobargs)
+
+        kwargs["jobID"] = job.jobID
+        work = yield cls.create(transaction, **kwargs)
+        work.__dict__["job"] = job
+        returnValue(work)
+
+
+    @classmethod
+    @inlineCallbacks
+    def loadForJob(cls, txn, jobID):
+        workItems = yield cls.query(txn, (cls.jobID == jobID))
+        returnValue(workItems)
+
+
+    def doWork(self):
+        """
+        Subclasses must implement this to actually perform the queued work.
+
+        This method will be invoked in a worker process.
+
+        This method does I{not} need to delete the row referencing it; that
+        will be taken care of by the job queuing machinery.
+        """
+        raise NotImplementedError
+
+
+    @classmethod
+    def forTableName(cls, tableName):
+        """
+        Look up a work-item class given a particular table name.  Factoring
+        this correctly may place it into L{twext.enterprise.record.Record}
+        instead; it is probably generally useful to be able to look up a mapped
+        class from a table.
+
+        @param tableName: the name of the table to look up
+        @type tableName: L{str}
+
+        @return: the relevant subclass
+        @rtype: L{type}
+        """
+        for subcls in cls.__subclasses__():
+            clstable = getattr(subcls, "table", None)
+            if tableName == clstable.model.name:
+                return subcls
+        raise KeyError("No mapped {0} class for {1}.".format(
+            cls, tableName
+        ))
+
+
+
+class PerformJob(Command):
+    """
+    Notify another process that it must do a job that has been persisted to
+    the database, by informing it of the job ID.
+    """
+
+    arguments = [
+        ("jobID", Integer()),
+    ]
+    response = []
+
+
+
+class ReportLoad(Command):
+    """
+    Notify another node of the total, current load for this whole node (all of
+    its workers).
+    """
+    arguments = [
+        ("load", Integer())
+    ]
+    response = []
+
+
+
+class IdentifyNode(Command):
+    """
+    Identify this node to its peer.  The connector knows which hostname it's
+    looking for, and which hostname it considers itself to be, only the
+    initiator (not the listener) issues this command.  This command is
+    necessary because we don't want to rely on DNS; if reverse DNS weren't set
+    up perfectly, the listener would not be able to identify its peer, and it
+    is easier to modify local configuration so that L{socket.getfqdn} returns
+    the right value than to ensure that DNS does.
+    """
+
+    arguments = [
+        ("host", String()),
+        ("port", Integer()),
+    ]
+
+
+
+class ConnectionFromPeerNode(AMP):
+    """
+    A connection to a peer node.  Symmetric; since the "client" and the
+    "server" both serve the same role, the logic is the same in every node.
+
+    @ivar localWorkerPool: the pool of local worker processes that can process
+        queue work.
+    @type localWorkerPool: L{WorkerConnectionPool}
+
+    @ivar _reportedLoad: The number of outstanding requests being processed by
+        the peer of this connection, from all requestors (both the host of this
+        connection and others), as last reported by the most recent
+        L{ReportLoad} message received from the peer.
+    @type _reportedLoad: L{int}
+
+    @ivar _bonusLoad: The number of additional outstanding requests being
+        processed by the peer of this connection; the number of requests made
+        by the host of this connection since the last L{ReportLoad} message.
+    @type _bonusLoad: L{int}
+    """
+    implements(_IJobPerformer)
+
+    def __init__(self, peerPool, boxReceiver=None, locator=None):
+        """
+        Initialize this L{ConnectionFromPeerNode} with a reference to a
+        L{PeerConnectionPool}, as well as required initialization arguments for
+        L{AMP}.
+
+        @param peerPool: the connection pool within which this
+            L{ConnectionFromPeerNode} is a participant.
+        @type peerPool: L{PeerConnectionPool}
+
+        @see: L{AMP.__init__}
+        """
+        self.peerPool = peerPool
+        self._bonusLoad = 0
+        self._reportedLoad = 0
+        super(ConnectionFromPeerNode, self).__init__(
+            boxReceiver, locator
+        )
+
+
+    def reportCurrentLoad(self):
+        """
+        Report the current load for the local worker pool to this peer.
+        """
+        return self.callRemote(ReportLoad, load=self.totalLoad())
+
+
+    @ReportLoad.responder
+    def reportedLoad(self, load):
+        """
+        The peer reports its load.
+        """
+        self._reportedLoad = (load - self._bonusLoad)
+        return {}
+
+
+    def startReceivingBoxes(self, sender):
+        """
+        Connection is up and running; add this to the list of active peers.
+        """
+        r = super(ConnectionFromPeerNode, self).startReceivingBoxes(sender)
+        self.peerPool.addPeerConnection(self)
+        return r
+
+
+    def stopReceivingBoxes(self, reason):
+        """
+        The connection has shut down; remove this from the list of active
+        peers.
+        """
+        self.peerPool.removePeerConnection(self)
+        r = super(ConnectionFromPeerNode, self).stopReceivingBoxes(reason)
+        return r
+
+
+    def currentLoadEstimate(self):
+        """
+        What is the current load estimate for this peer?
+
+        @return: The number of full "slots", i.e. currently-being-processed
+            queue items (and other items which may contribute to this process's
+            load, such as currently-being-processed client requests).
+        @rtype: L{int}
+        """
+        return self._reportedLoad + self._bonusLoad
+
+
+    def performJob(self, jobID):
+        """
+        A L{local worker connection <ConnectionFromWorker>} is asking this
+        specific peer node-controller process to perform a job, having
+        already determined that it's appropriate.
+
+        @see: L{_IJobPerformer.performJob}
+        """
+        d = self.callRemote(PerformJob, jobID=jobID)
+        self._bonusLoad += 1
+
+        @d.addBoth
+        def performed(result):
+            self._bonusLoad -= 1
+            return result
+
+        @d.addCallback
+        def success(result):
+            return None
+
+        return d
+
+
+    @PerformJob.responder
+    def dispatchToWorker(self, jobID):
+        """
+        A remote peer node has asked this node to do a job; dispatch it to
+        a local worker on this node.
+
+        @param jobID: the identifier of the job.
+        @type jobID: L{int}
+
+        @return: a L{Deferred} that fires when the work has been completed.
+        """
+        d = self.peerPool.performJobForPeer(jobID)
+        d.addCallback(lambda ignored: {})
+        return d
+
+
+    @IdentifyNode.responder
+    def identifyPeer(self, host, port):
+        self.peerPool.mapPeer(host, port, self)
+        return {}
+
+
+
+class WorkerConnectionPool(object):
+    """
+    A pool of L{ConnectionFromWorker}s.
+
+    L{WorkerConnectionPool} also implements the same implicit protocol as a
+    L{ConnectionFromPeerNode}, but one that dispenses work to the local worker
+    processes rather than to a remote connection pool.
+    """
+    implements(_IJobPerformer)
+
+    def __init__(self, maximumLoadPerWorker=5):
+        self.workers = []
+        self.maximumLoadPerWorker = maximumLoadPerWorker
+
+
+    def addWorker(self, worker):
+        """
+        Add a L{ConnectionFromWorker} to this L{WorkerConnectionPool} so that
+        it can be selected.
+        """
+        self.workers.append(worker)
+
+
+    def removeWorker(self, worker):
+        """
+        Remove a L{ConnectionFromWorker} from this L{WorkerConnectionPool} that
+        was previously added.
+        """
+        self.workers.remove(worker)
+
+
+    def hasAvailableCapacity(self):
+        """
+        Does this worker connection pool have any local workers who have spare
+        hasAvailableCapacity to process another queue item?
+        """
+        for worker in self.workers:
+            if worker.currentLoad < self.maximumLoadPerWorker:
+                return True
+        return False
+
+
+    def allWorkerLoad(self):
+        """
+        The total load of all currently connected workers.
+        """
+        return sum(worker.currentLoad for worker in self.workers)
+
+
+    def _selectLowestLoadWorker(self):
+        """
+        Select the local connection with the lowest current load, or C{None} if
+        all workers are too busy.
+
+        @return: a worker connection with the lowest current load.
+        @rtype: L{ConnectionFromWorker}
+        """
+        return sorted(self.workers[:], key=lambda w: w.currentLoad)[0]
+
+
+    def performJob(self, jobID):
+        """
+        Select a local worker that is idle enough to perform the given job,
+        then ask them to perform it.
+
+        @param jobID: The primary key identifier of the given job.
+        @type jobID: L{int}
+
+        @return: a L{Deferred} firing with an empty dictionary when the work is
+            complete.
+        @rtype: L{Deferred} firing L{dict}
+        """
+        preferredWorker = self._selectLowestLoadWorker()
+        result = preferredWorker.performJob(jobID)
+        return result
+
+
+
+class ConnectionFromWorker(AMP):
+    """
+    An individual connection from a worker, as seen from the master's
+    perspective.  L{ConnectionFromWorker}s go into a L{WorkerConnectionPool}.
+    """
+
+    def __init__(self, peerPool, boxReceiver=None, locator=None):
+        super(ConnectionFromWorker, self).__init__(boxReceiver, locator)
+        self.peerPool = peerPool
+        self._load = 0
+
+
+    @property
+    def currentLoad(self):
+        """
+        What is the current load of this worker?
+        """
+        return self._load
+
+
+    def startReceivingBoxes(self, sender):
+        """
+        Start receiving AMP boxes from the peer.  Initialize all necessary
+        state.
+        """
+        result = super(ConnectionFromWorker, self).startReceivingBoxes(sender)
+        self.peerPool.workerPool.addWorker(self)
+        return result
+
+
+    def stopReceivingBoxes(self, reason):
+        """
+        AMP boxes will no longer be received.
+        """
+        result = super(ConnectionFromWorker, self).stopReceivingBoxes(reason)
+        self.peerPool.workerPool.removeWorker(self)
+        return result
+
+
+    @PerformJob.responder
+    def performJob(self, jobID):
+        """
+        Dispatch a job to this worker.
+
+        @see: The responder for this should always be
+            L{ConnectionFromController.actuallyReallyExecuteJobHere}.
+        """
+        d = self.callRemote(PerformJob, jobID=jobID)
+        self._load += 1
+
+        @d.addBoth
+        def f(result):
+            self._load -= 1
+            return result
+
+        return d
+
+
+
+class ConnectionFromController(AMP):
+    """
+    A L{ConnectionFromController} is the connection to a node-controller
+    process, in a worker process.  It processes requests from its own
+    controller to do work.  It is the opposite end of the connection from
+    L{ConnectionFromWorker}.
+    """
+    implements(IQueuer)
+
+    def __init__(self, transactionFactory, whenConnected,
+                 boxReceiver=None, locator=None):
+        super(ConnectionFromController, self).__init__(boxReceiver, locator)
+        self.transactionFactory = transactionFactory
+        self.whenConnected = whenConnected
+        # FIXME: Glyph it appears WorkProposal expects this to have reactor...
+        from twisted.internet import reactor
+        self.reactor = reactor
+
+
+    def startReceivingBoxes(self, sender):
+        super(ConnectionFromController, self).startReceivingBoxes(sender)
+        self.whenConnected(self)
+
+
+    def choosePerformer(self):
+        """
+        To conform with L{WorkProposal}'s expectations, which may run in either
+        a controller (against a L{PeerConnectionPool}) or in a worker (against
+        a L{ConnectionFromController}), this is implemented to always return
+        C{self}, since C{self} is also an object that has a C{performJob}
+        method.
+        """
+        return self
+
+
+    def performJob(self, jobID):
+        """
+        Ask the controller to perform a job on our behalf.
+        """
+        return self.callRemote(PerformJob, jobID=jobID)
+
+
+    @inlineCallbacks
+    def enqueueWork(self, txn, workItemType, **kw):
+        """
+        There is some work to do.  Do it, ideally someplace else, ideally in
+        parallel.  Later, let the caller know that the work has been completed
+        by firing a L{Deferred}.
+
+        @param workItemType: The type of work item to be enqueued.
+        @type workItemType: A subtype of L{WorkItem}
+
+        @param kw: The parameters to construct a work item.
+        @type kw: keyword parameters to C{workItemType.create}, i.e.
+            C{workItemType.__init__}
+
+        @return: an object that can track the enqueuing and remote execution of
+            this work.
+        @rtype: L{WorkProposal}
+        """
+        wp = WorkProposal(self, txn, workItemType, kw)
+        yield wp._start()
+        returnValue(wp)
+
+
+    @PerformJob.responder
+    def actuallyReallyExecuteJobHere(self, jobID):
+        """
+        This is where it's time to actually do the job.  The controller
+        process has instructed this worker to do it; so, look up the data in
+        the row, and do it.
+        """
+        d = ultimatelyPerform(self.transactionFactory, jobID)
+        d.addCallback(lambda ignored: {})
+        return d
+
+
+
+def ultimatelyPerform(txnFactory, jobID):
+    """
+    Eventually, after routing the job to the appropriate place, somebody
+    actually has to I{do} it.
+
+    @param txnFactory: a 0- or 1-argument callable that creates an
+        L{IAsyncTransaction}
+    @type txnFactory: L{callable}
+
+    @param jobID: the ID of the job to be performed
+    @type jobID: L{int}
+
+    @return: a L{Deferred} which fires with C{None} when the job has been
+        performed, or fails if the job can't be performed.
+    """
+    @inlineCallbacks
+    def runJob(txn):
+        try:
+            job = yield JobItem.load(txn, jobID)
+            yield job.run()
+        except NoSuchRecord:
+            # The record has already been removed
+            pass
+
+    return inTransaction(txnFactory, runJob)
+
+
+
+class LocalPerformer(object):
+    """
+    Implementor of C{performJob} that does its work in the local process,
+    regardless of other conditions.
+    """
+    implements(_IJobPerformer)
+
+    def __init__(self, txnFactory):
+        """
+        Create this L{LocalPerformer} with a transaction factory.
+        """
+        self.txnFactory = txnFactory
+
+
+    def performJob(self, jobID):
+        """
+        Perform the given job right now.
+        """
+        return ultimatelyPerform(self.txnFactory, jobID)
+
+
+
+class WorkerFactory(Factory, object):
+    """
+    Factory, to be used as the client to connect from the worker to the
+    controller.
+    """
+
+    def __init__(self, transactionFactory, whenConnected):
+        """
+        Create a L{WorkerFactory} with a transaction factory and a schema.
+        """
+        self.transactionFactory = transactionFactory
+        self.whenConnected = whenConnected
+
+
+    def buildProtocol(self, addr):
+        """
+        Create a L{ConnectionFromController} connected to the
+        transactionFactory and store.
+        """
+        return ConnectionFromController(self.transactionFactory, self.whenConnected)
+
+
+
+class TransactionFailed(Exception):
+    """
+    A transaction failed.
+    """
+
+
+
+def _cloneDeferred(d):
+    """
+    Make a new Deferred, adding callbacks to C{d}.
+
+    @return: another L{Deferred} that fires with C{d's} result when C{d} fires.
+    @rtype: L{Deferred}
+    """
+    d2 = Deferred()
+    d.chainDeferred(d2)
+    return d2
+
+
+
+class WorkProposal(object):
+    """
+    A L{WorkProposal} is a proposal for work that will be executed, perhaps on
+    another node, perhaps in the future.
+
+    @ivar _chooser: The object which will choose where the work in this
+        proposal gets performed.  This must have both a C{choosePerformer}
+        method and a C{reactor} attribute, providing an L{IReactorTime}.
+    @type _chooser: L{PeerConnectionPool} or L{LocalQueuer}
+
+    @ivar txn: The transaction where the work will be enqueued.
+    @type txn: L{IAsyncTransaction}
+
+    @ivar workItemType: The type of work to be enqueued by this L{WorkProposal}
+    @type workItemType: L{WorkItem} subclass
+
+    @ivar kw: The keyword arguments to pass to C{self.workItemType.create} to
+        construct it.
+    @type kw: L{dict}
+    """
+
+    def __init__(self, chooser, txn, workItemType, kw):
+        self._chooser = chooser
+        self.txn = txn
+        self.workItemType = workItemType
+        self.kw = kw
+        self._whenExecuted = Deferred()
+        self._whenCommitted = Deferred()
+        self.workItem = None
+
+
+    @inlineCallbacks
+    def _start(self):
+        """
+        Execute this L{WorkProposal} by creating the work item in the database,
+        waiting for the transaction where that addition was completed to
+        commit, and asking the local node controller process to do the work.
+        """
+        try:
+            created = yield self.workItemType.makeJob(self.txn, **self.kw)
+        except Exception:
+            self._whenCommitted.errback(TransactionFailed)
+            raise
+        else:
+            self.workItem = created
+
+            @self.txn.postCommit
+            def whenDone():
+                self._whenCommitted.callback(self)
+
+                def maybeLater():
+                    performer = self._chooser.choosePerformer()
+
+                    @passthru(
+                        performer.performJob(created.jobID).addCallback
+                    )
+                    def performed(result):
+                        self._whenExecuted.callback(self)
+
+                    @performed.addErrback
+                    def notPerformed(why):
+                        self._whenExecuted.errback(why)
+
+                reactor = self._chooser.reactor
+                when = max(0, astimestamp(created.job.notBefore) - reactor.seconds()) if created.job.notBefore is not None else 0
+                # TODO: Track the returned DelayedCall so it can be stopped
+                # when the service stops.
+                self._chooser.reactor.callLater(when, maybeLater)
+
+            @self.txn.postAbort
+            def whenFailed():
+                self._whenCommitted.errback(TransactionFailed)
+
+
+    def whenExecuted(self):
+        """
+        Let the caller know when the proposed work has been fully executed.
+
+        @note: The L{Deferred} returned by C{whenExecuted} should be used with
+            extreme caution.  If an application decides to do any
+            database-persistent work as a result of this L{Deferred} firing,
+            that work I{may be lost} as a result of a service being normally
+            shut down between the time that the work is scheduled and the time
+            that it is executed.  So, the only things that should be added as
+            callbacks to this L{Deferred} are those which are ephemeral, in
+            memory, and reflect only presentation state associated with the
+            user's perception of the completion of work, not logical chains of
+            work which need to be completed in sequence; those should all be
+            completed within the transaction of the L{WorkItem.doWork} that
+            gets executed.
+
+        @return: a L{Deferred} that fires with this L{WorkProposal} when the
+            work has been completed remotely.
+        """
+        return _cloneDeferred(self._whenExecuted)
+
+
+    def whenProposed(self):
+        """
+        Let the caller know when the work has been proposed; i.e. when the work
+        is first transmitted to the database.
+
+        @return: a L{Deferred} that fires with this L{WorkProposal} when the
+            relevant commands have been sent to the database to create the
+            L{WorkItem}, and fails if those commands do not succeed for some
+            reason.
+        """
+        return succeed(self)
+
+
+    def whenCommitted(self):
+        """
+        Let the caller know when the work has been committed to; i.e. when the
+        transaction where the work was proposed has been committed to the
+        database.
+
+        @return: a L{Deferred} that fires with this L{WorkProposal} when the
+            relevant transaction has been committed, or fails if the
+            transaction is not committed for any reason.
+        """
+        return _cloneDeferred(self._whenCommitted)
+
+
+
+class _BaseQueuer(object):
+    implements(IQueuer)
+
+    def __init__(self):
+        super(_BaseQueuer, self).__init__()
+        self.proposalCallbacks = set()
+
+
+    def callWithNewProposals(self, callback):
+        self.proposalCallbacks.add(callback)
+
+
+    def transferProposalCallbacks(self, newQueuer):
+        newQueuer.proposalCallbacks = self.proposalCallbacks
+        return newQueuer
+
+
+    @inlineCallbacks
+    def enqueueWork(self, txn, workItemType, **kw):
+        """
+        There is some work to do.  Do it, someplace else, ideally in parallel.
+        Later, let the caller know that the work has been completed by firing a
+        L{Deferred}.
+
+        @param workItemType: The type of work item to be enqueued.
+        @type workItemType: A subtype of L{WorkItem}
+
+        @param kw: The parameters to construct a work item.
+        @type kw: keyword parameters to C{workItemType.create}, i.e.
+            C{workItemType.__init__}
+
+        @return: an object that can track the enqueuing and remote execution of
+            this work.
+        @rtype: L{WorkProposal}
+        """
+        wp = WorkProposal(self, txn, workItemType, kw)
+        yield wp._start()
+        for callback in self.proposalCallbacks:
+            callback(wp)
+        returnValue(wp)
+
+
+
+class PeerConnectionPool(_BaseQueuer, MultiService, object):
+    """
+    Each node has a L{PeerConnectionPool} connecting it to all the other nodes
+    currently active on the same database.
+
+    @ivar hostname: The hostname where this node process is running, as
+        reported by the local host's configuration.  Possibly this should be
+        obtained via C{config.ServerHostName} instead of C{socket.getfqdn()};
+        although hosts within a cluster may be configured with the same
+        C{ServerHostName}; TODO need to confirm.
+    @type hostname: L{bytes}
+
+    @ivar thisProcess: a L{NodeInfo} representing this process, which is
+        initialized when this L{PeerConnectionPool} service is started via
+        C{startService}.  May be C{None} if this service is not fully started
+        up or if it is shutting down.
+    @type thisProcess: L{NodeInfo}
+
+    @ivar queueProcessTimeout: The amount of time after a L{WorkItem} is
+        scheduled to be processed (its C{notBefore} attribute) that it is
+        considered to be "orphaned" and will be run by a lost-work check rather
+        than waiting for it to be requested.  By default, 10 minutes.
+    @type queueProcessTimeout: L{float} (in seconds)
+
+    @ivar queueDelayedProcessInterval: The amount of time between database
+        pings, i.e. checks for over-due queue items that might have been
+        orphaned by a controller process that died mid-transaction.  This is
+        how often the shared database should be pinged by I{all} nodes (i.e.,
+        all controller processes, or each instance of L{PeerConnectionPool});
+        each individual node will ping commensurately less often as more nodes
+        join the database.
+    @type queueDelayedProcessInterval: L{float} (in seconds)
+
+    @ivar reactor: The reactor used for scheduling timed events.
+    @type reactor: L{IReactorTime} provider.
+
+    @ivar peers: The list of currently connected peers.
+    @type peers: L{list} of L{PeerConnectionPool}
+    """
+    implements(IQueuer)
+
+    from socket import getfqdn
+    from os import getpid
+    getfqdn = staticmethod(getfqdn)
+    getpid = staticmethod(getpid)
+
+    queueProcessTimeout = (10.0 * 60.0)
+    queueDelayedProcessInterval = (60.0)
+
+    def __init__(self, reactor, transactionFactory, ampPort):
+        """
+        Initialize a L{PeerConnectionPool}.
+
+        @param ampPort: The AMP TCP port number to listen on for inter-host
+            communication.  This must be an integer (and not, say, an endpoint,
+            or an endpoint description) because we need to communicate it to
+            the other peers in the cluster in a way that will be meaningful to
+            them as clients.
+        @type ampPort: L{int}
+
+        @param transactionFactory: a 0- or 1-argument callable that produces an
+            L{IAsyncTransaction}
+        """
+        super(PeerConnectionPool, self).__init__()
+        self.reactor = reactor
+        self.transactionFactory = transactionFactory
+        self.hostname = self.getfqdn()
+        self.pid = self.getpid()
+        self.ampPort = ampPort
+        self.thisProcess = None
+        self.workerPool = WorkerConnectionPool()
+        self.peers = []
+        self.mappedPeers = {}
+        self._startingUp = None
+        self._listeningPort = None
+        self._lastSeenTotalNodes = 1
+        self._lastSeenNodeIndex = 1
+
+
+    def addPeerConnection(self, peer):
+        """
+        Add a L{ConnectionFromPeerNode} to the active list of peers.
+        """
+        self.peers.append(peer)
+
+
+    def totalLoad(self):
+        return self.workerPool.allWorkerLoad()
+
+
+    def workerListenerFactory(self):
+        """
+        Factory that listens for connections from workers.
+        """
+        f = Factory()
+        f.buildProtocol = lambda addr: ConnectionFromWorker(self)
+        return f
+
+
+    def removePeerConnection(self, peer):
+        """
+        Remove a L{ConnectionFromPeerNode} to the active list of peers.
+        """
+        self.peers.remove(peer)
+
+
+    def choosePerformer(self, onlyLocally=False):
+        """
+        Choose a peer to distribute work to based on the current known slot
+        occupancy of the other nodes.  Note that this will prefer distributing
+        work to local workers until the current node is full, because that
+        should be lower-latency.  Also, if no peers are available, work will be
+        submitted locally even if the worker pool is already over-subscribed.
+
+        @return: the chosen peer.
+        @rtype: L{_IJobPerformer} L{ConnectionFromPeerNode} or
+            L{WorkerConnectionPool}
+        """
+        if self.workerPool.hasAvailableCapacity():
+            return self.workerPool
+
+        if self.peers and not onlyLocally:
+            return sorted(self.peers, key=lambda p: p.currentLoadEstimate())[0]
+        else:
+            return LocalPerformer(self.transactionFactory)
+
+
+    def performJobForPeer(self, jobID):
+        """
+        A peer has requested us to perform a job; choose a job performer
+        local to this node, and then execute it.
+        """
+        performer = self.choosePerformer(onlyLocally=True)
+        return performer.performJob(jobID)
+
+
+    def totalNumberOfNodes(self):
+        """
+        How many nodes are there, total?
+
+        @return: the maximum number of other L{PeerConnectionPool} instances
+            that may be connected to the database described by
+            C{self.transactionFactory}.  Note that this is not the current
+            count by connectivity, but the count according to the database.
+        @rtype: L{int}
+        """
+        # TODO
+        return self._lastSeenTotalNodes
+
+
+    def nodeIndex(self):
+        """
+        What ordinal does this node, i.e. this instance of
+        L{PeerConnectionPool}, occupy within the ordered set of all nodes
+        connected to the database described by C{self.transactionFactory}?
+
+        @return: the index of this node within the total collection.  For
+            example, if this L{PeerConnectionPool} is 6 out of 30, this method
+            will return C{6}.
+        @rtype: L{int}
+        """
+        # TODO
+        return self._lastSeenNodeIndex
+
+
+    def _periodicLostWorkCheck(self):
+        """
+        Periodically, every node controller has to check to make sure that work
+        hasn't been dropped on the floor by someone.  In order to do that it
+        queries each work-item table.
+        """
+        @inlineCallbacks
+        def workCheck(txn):
+            if self.thisProcess:
+                nodes = [(node.hostname, node.port) for node in
+                         (yield self.activeNodes(txn))]
+                nodes.sort()
+                self._lastSeenTotalNodes = len(nodes)
+                self._lastSeenNodeIndex = nodes.index(
+                    (self.thisProcess.hostname, self.thisProcess.port)
+                )
+
+            # TODO: here is where we should iterate over the unlocked items that
+            # are due, ordered by priority, notBefore etc
+            tooLate = datetime.utcfromtimestamp(
+                self.reactor.seconds() - self.queueProcessTimeout
+            )
+            overdueItems = (yield JobItem.query(
+                txn, (JobItem.notBefore < tooLate))
+            )
+            for overdueItem in overdueItems:
+                peer = self.choosePerformer()
+                yield peer.performJob(overdueItem.jobID)
+
+        return inTransaction(self.transactionFactory, workCheck)
+
+    _currentWorkDeferred = None
+    _lostWorkCheckCall = None
+
+    def _lostWorkCheckLoop(self):
+        """
+        While the service is running, keep checking for any overdue / lost work
+        items and re-submit them to the cluster for processing.  Space out
+        those checks in time based on the size of the cluster.
+        """
+        self._lostWorkCheckCall = None
+
+        @passthru(
+            self._periodicLostWorkCheck().addErrback(log.err).addCallback
+        )
+        def scheduleNext(result):
+            self._currentWorkDeferred = None
+            if not self.running:
+                return
+            index = self.nodeIndex()
+            now = self.reactor.seconds()
+
+            interval = self.queueDelayedProcessInterval
+            count = self.totalNumberOfNodes()
+            when = (now - (now % interval)) + (interval * (count + index))
+            delay = when - now
+            self._lostWorkCheckCall = self.reactor.callLater(
+                delay, self._lostWorkCheckLoop
+            )
+
+        self._currentWorkDeferred = scheduleNext
+
+
+    def startService(self):
+        """
+        Register ourselves with the database and establish all outgoing
+        connections to other servers in the cluster.
+        """
+        @inlineCallbacks
+        def startup(txn):
+            endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
+            # If this fails, the failure mode is going to be ugly, just like
+            # all conflicted-port failures.  But, at least it won't proceed.
+            self._listeningPort = yield endpoint.listen(self.peerFactory())
+            self.ampPort = self._listeningPort.getHost().port
+            yield Lock.exclusive(NodeInfo.table).on(txn)
+            nodes = yield self.activeNodes(txn)
+            selves = [node for node in nodes
+                      if ((node.hostname == self.hostname) and
+                          (node.port == self.ampPort))]
+            if selves:
+                self.thisProcess = selves[0]
+                nodes.remove(self.thisProcess)
+                yield self.thisProcess.update(pid=self.pid,
+                                              time=datetime.now())
+            else:
+                self.thisProcess = yield NodeInfo.create(
+                    txn, hostname=self.hostname, port=self.ampPort,
+                    pid=self.pid, time=datetime.now()
+                )
+
+            for node in nodes:
+                self._startConnectingTo(node)
+
+        self._startingUp = inTransaction(self.transactionFactory, startup)
+
+        @self._startingUp.addBoth
+        def done(result):
+            self._startingUp = None
+            super(PeerConnectionPool, self).startService()
+            self._lostWorkCheckLoop()
+            return result
+
+
+    @inlineCallbacks
+    def stopService(self):
+        """
+        Stop this service, terminating any incoming or outgoing connections.
+        """
+        yield super(PeerConnectionPool, self).stopService()
+
+        if self._startingUp is not None:
+            yield self._startingUp
+
+        if self._listeningPort is not None:
+            yield self._listeningPort.stopListening()
+
+        if self._lostWorkCheckCall is not None:
+            self._lostWorkCheckCall.cancel()
+
+        if self._currentWorkDeferred is not None:
+            yield self._currentWorkDeferred
+
+        for peer in self.peers:
+            peer.transport.abortConnection()
+
+
+    def activeNodes(self, txn):
+        """
+        Load information about all other nodes.
+        """
+        return NodeInfo.all(txn)
+
+
+    def mapPeer(self, host, port, peer):
+        """
+        A peer has been identified as belonging to the given host/port
+        combination.  Disconnect any other peer that claims to be connected for
+        the same peer.
+        """
+        # if (host, port) in self.mappedPeers:
+            # TODO: think about this for race conditions
+            # self.mappedPeers.pop((host, port)).transport.loseConnection()
+        self.mappedPeers[(host, port)] = peer
+
+
+    def _startConnectingTo(self, node):
+        """
+        Start an outgoing connection to another master process.
+
+        @param node: a description of the master to connect to.
+        @type node: L{NodeInfo}
+        """
+        connected = node.endpoint(self.reactor).connect(self.peerFactory())
+
+        def whenConnected(proto):
+            self.mapPeer(node.hostname, node.port, proto)
+            proto.callRemote(
+                IdentifyNode,
+                host=self.thisProcess.hostname,
+                port=self.thisProcess.port
+            ).addErrback(noted, "identify")
+
+        def noted(err, x="connect"):
+            log.msg(
+                "Could not {0} to cluster peer {1} because {2}"
+                .format(x, node, str(err.value))
+            )
+
+        connected.addCallbacks(whenConnected, noted)
+
+
+    def peerFactory(self):
+        """
+        Factory for peer connections.
+
+        @return: a L{Factory} that will produce L{ConnectionFromPeerNode}
+            protocols attached to this L{PeerConnectionPool}.
+        """
+        return _PeerPoolFactory(self)
+
+
+
+class _PeerPoolFactory(Factory, object):
+    """
+    Protocol factory responsible for creating L{ConnectionFromPeerNode}
+    connections, both client and server.
+    """
+
+    def __init__(self, peerConnectionPool):
+        self.peerConnectionPool = peerConnectionPool
+
+
+    def buildProtocol(self, addr):
+        return ConnectionFromPeerNode(self.peerConnectionPool)
+
+
+
+class LocalQueuer(_BaseQueuer):
+    """
+    When work is enqueued with this queuer, it is just executed locally.
+    """
+    implements(IQueuer)
+
+    def __init__(self, txnFactory, reactor=None):
+        super(LocalQueuer, self).__init__()
+        self.txnFactory = txnFactory
+        if reactor is None:
+            from twisted.internet import reactor
+        self.reactor = reactor
+
+
+    def choosePerformer(self):
+        """
+        Choose to perform the work locally.
+        """
+        return LocalPerformer(self.txnFactory)
+
+
+
+class NonPerformer(object):
+    """
+    Implementor of C{performJob} that doesn't actual perform any work.  This
+    is used in the case where you want to be able to enqueue work for someone
+    else to do, but not take on any work yourself (such as a command line
+    tool).
+    """
+    implements(_IJobPerformer)
+
+    def performJob(self, jobID):
+        """
+        Don't perform job.
+        """
+        return succeed(None)
+
+
+
+class NonPerformingQueuer(_BaseQueuer):
+    """
+    When work is enqueued with this queuer, it is never executed locally.
+    It's expected that the polling machinery will find the work and perform it.
+    """
+    implements(IQueuer)
+
+    def __init__(self, reactor=None):
+        super(NonPerformingQueuer, self).__init__()
+        if reactor is None:
+            from twisted.internet import reactor
+        self.reactor = reactor
+
+
+    def choosePerformer(self):
+        """
+        Choose to perform the work locally.
+        """
+        return NonPerformer()

Modified: twext/branches/users/cdaboo/jobs/twext/enterprise/test/test_adbapi2.py
===================================================================
--- twext/branches/users/cdaboo/jobs/twext/enterprise/test/test_adbapi2.py	2014-03-01 15:17:14 UTC (rev 12778)
+++ twext/branches/users/cdaboo/jobs/twext/enterprise/test/test_adbapi2.py	2014-03-01 15:42:17 UTC (rev 12779)
@@ -192,13 +192,13 @@
         a = self.createTransaction()
 
         alphaResult = self.resultOf(a.execSQL("alpha"))
-        [[counter, echo]] = alphaResult[0]
+        [[_ignore_counter, _ignore_echo]] = alphaResult[0]
 
         b = self.createTransaction()
         # "b" should have opened a connection.
         self.assertEquals(len(self.factory.connections), 2)
         betaResult = self.resultOf(b.execSQL("beta"))
-        [[bcounter, becho]] = betaResult[0]
+        [[bcounter, _ignore_becho]] = betaResult[0]
 
         # both "a" and "b" are holding open a connection now; let's try to open
         # a third one.  (The ordering will be deterministic even if this fails,
@@ -214,13 +214,13 @@
         commitResult = self.resultOf(b.commit())
 
         # Now that "b" has committed, "c" should be able to complete.
-        [[ccounter, cecho]] = gammaResult[0]
+        [[ccounter, _ignore_cecho]] = gammaResult[0]
 
         # The connection for "a" ought to still be busy, so let's make sure
         # we're using the one for "c".
         self.assertEquals(ccounter, bcounter)
 
-        # Sanity check: the commit should have succeded!
+        # Sanity check: the commit should have succeeded!
         self.assertEquals(commitResult, [None])
 
 
@@ -231,7 +231,7 @@
         """
         a = self.createTransaction()
         alphaResult = self.resultOf(a.execSQL("alpha"))
-        [[[counter, echo]]] = alphaResult
+        [[[_ignore_counter, _ignore_echo]]] = alphaResult
         self.assertEquals(len(self.factory.connections), 1)
         self.assertEquals(len(self.holders), 1)
         [holder] = self.holders
@@ -452,7 +452,7 @@
         for txn in txns:
             # Make sure rollback will actually be executed.
             results = self.resultOf(txn.execSQL("maybe change something!"))
-            [[[counter, echo]]] = results
+            [[[_ignore_counter, echo]]] = results
             self.assertEquals("maybe change something!", echo)
         # Fail one (and only one) call to rollback().
         self.factory.rollbackFail = True
@@ -483,7 +483,7 @@
         """
         active = []
         # Use up the available connections ...
-        for i in xrange(self.pool.maxConnections):
+        for _ignore in xrange(self.pool.maxConnections):
             active.append(self.createTransaction())
 
         # ... so that this one has to be spooled.
@@ -506,7 +506,7 @@
         abortResult = self.resultOf(it.abort())
 
         # steal it from the queue so we can do it out of order
-        d, work = self.holders[0]._q.get()
+        d, _ignore_work = self.holders[0]._q.get()
 
         # that should be the only work unit so don't continue if something else
         # got in there
@@ -716,7 +716,7 @@
 
         self.factory.connections[0].executeWillFail(CustomExecuteFailed)
         results = self.resultOf(txn.execSQL("hello, world!"))
-        [[[counter, echo]]] = results
+        [[[_ignore_counter, echo]]] = results
         self.assertEquals("hello, world!", echo)
 
         # Two execution attempts should have been made, one on each connection.
@@ -752,7 +752,7 @@
         )
         results = self.resultOf(txn.execSQL("hello, world!"))
         txn.commit()
-        [[[counter, echo]]] = results
+        [[[_ignore_counter, echo]]] = results
         self.assertEquals("hello, world!", echo)
         txn2 = self.createTransaction()
         self.assertEquals(
@@ -767,7 +767,7 @@
         self.factory.connections[0].executeWillFail(CustomExecFail)
         results = self.resultOf(txn2.execSQL("second try!"))
         txn2.commit()
-        [[[counter, echo]]] = results
+        [[[_ignore_counter, echo]]] = results
         self.assertEquals("second try!", echo)
         self.assertEquals(len(self.flushLoggedErrors(CustomExecFail)), 1)
 
@@ -936,7 +936,7 @@
         re-connection on the next try.
         """
         txn = self.createTransaction()
-        [[[counter, echo]]] = self.resultOf(txn.execSQL("hello, world!", []))
+        [[[_ignore_counter, _ignore_echo]]] = self.resultOf(txn.execSQL("hello, world!", []))
         self.factory.connections[0].executeWillFail(ZeroDivisionError)
         [f] = self.resultOf(txn.execSQL("divide by zero", []))
         f.trap(self.translateError(ZeroDivisionError))
@@ -966,7 +966,7 @@
         """
         txn = self.createTransaction()
         results = self.resultOf(txn.execSQL("maybe change something!"))
-        [[[counter, echo]]] = results
+        [[[_ignore_counter, echo]]] = results
         self.assertEquals("maybe change something!", echo)
         self.factory.rollbackFail = True
         [x] = self.resultOf(txn.abort())
@@ -992,7 +992,7 @@
         txn = self.createTransaction()
         self.factory.commitFail = True
         results = self.resultOf(txn.execSQL("maybe change something!"))
-        [[[counter, echo]]] = results
+        [[[_ignore_counter, echo]]] = results
         self.assertEquals("maybe change something!", echo)
         [x] = self.resultOf(txn.commit())
         x.trap(self.translateError(CommitFail))
@@ -1210,7 +1210,7 @@
         r = self.resultOf(
             txn.execSQL("some-rows", raiseOnZeroRowCount=RuntimeError)
         )
-        [[[counter, echo]]] = r
+        [[[_ignore_counter, echo]]] = r
         self.assertEquals(echo, "some-rows")
 
 
@@ -1338,7 +1338,6 @@
     interacting with each other.
     """
 
-
     def setParamstyle(self, paramstyle):
         """
         Change the paramstyle on both the pool and the client.
@@ -1366,6 +1365,7 @@
         self.assertEquals(len(self.factory.connections), 1)
 
 
+
 class HookableOperationTests(TestCase):
     """
     Tests for L{_HookableOperation}.

Added: twext/branches/users/cdaboo/jobs/twext/enterprise/test/test_jobqueue.py
===================================================================
--- twext/branches/users/cdaboo/jobs/twext/enterprise/test/test_jobqueue.py	                        (rev 0)
+++ twext/branches/users/cdaboo/jobs/twext/enterprise/test/test_jobqueue.py	2014-03-01 15:42:17 UTC (rev 12779)
@@ -0,0 +1,1015 @@
+##
+# Copyright (c) 2012-2014 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for L{twext.enterprise.job.queue}.
+"""
+
+import datetime
+
+from zope.interface.verify import verifyObject
+
+from twisted.trial.unittest import TestCase, SkipTest
+from twisted.test.proto_helpers import StringTransport, MemoryReactor
+from twisted.internet.defer import (
+    Deferred, inlineCallbacks, gatherResults, passthru, returnValue
+)
+from twisted.internet.task import Clock as _Clock
+from twisted.protocols.amp import Command, AMP, Integer
+from twisted.application.service import Service, MultiService
+
+from twext.enterprise.dal.syntax import SchemaSyntax, Select
+from twext.enterprise.dal.record import fromTable
+from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
+from twext.enterprise.fixtures import buildConnectionPool
+from twext.enterprise.fixtures import SteppablePoolHelper
+from twext.enterprise.jobqueue import (
+    inTransaction, PeerConnectionPool, astimestamp,
+    LocalPerformer, _IJobPerformer, WorkItem, WorkerConnectionPool,
+    ConnectionFromPeerNode, LocalQueuer,
+    _BaseQueuer, NonPerformingQueuer
+)
+import twext.enterprise.jobqueue
+
+# TODO: There should be a store-building utility within twext.enterprise.
+try:
+    from txdav.common.datastore.test.util import buildStore
+except ImportError:
+    def buildStore(*args, **kwargs):
+        raise SkipTest(
+            "buildStore is not available, because it's in txdav; duh."
+        )
+
+
+
+class Clock(_Clock):
+    """
+    More careful L{IReactorTime} fake which mimics the exception behavior of
+    the real reactor.
+    """
+
+    def callLater(self, _seconds, _f, *args, **kw):
+        if _seconds < 0:
+            raise ValueError("%s<0: " % (_seconds,))
+        return super(Clock, self).callLater(_seconds, _f, *args, **kw)
+
+
+
+class MemoryReactorWithClock(MemoryReactor, Clock):
+    """
+    Simulate a real reactor.
+    """
+    def __init__(self):
+        MemoryReactor.__init__(self)
+        Clock.__init__(self)
+
+
+
+def transactionally(transactionCreator):
+    """
+    Perform the decorated function immediately in a transaction, replacing its
+    name with a L{Deferred}.
+
+    Use like so::
+
+        @transactionally(connectionPool.connection)
+        @inlineCallbacks
+        def it(txn):
+            yield txn.doSomething()
+        it.addCallback(firedWhenDone)
+
+    @param transactionCreator: A 0-arg callable that returns an
+        L{IAsyncTransaction}.
+    """
+    def thunk(operation):
+        return inTransaction(transactionCreator, operation)
+    return thunk
+
+
+
+class UtilityTests(TestCase):
+    """
+    Tests for supporting utilities.
+    """
+
+    def test_inTransactionSuccess(self):
+        """
+        L{inTransaction} invokes its C{transactionCreator} argument, and then
+        returns a L{Deferred} which fires with the result of its C{operation}
+        argument when it succeeds.
+        """
+        class faketxn(object):
+            def __init__(self):
+                self.commits = []
+                self.aborts = []
+
+            def commit(self):
+                self.commits.append(Deferred())
+                return self.commits[-1]
+
+            def abort(self):
+                self.aborts.append(Deferred())
+                return self.aborts[-1]
+
+        createdTxns = []
+
+        def createTxn():
+            createdTxns.append(faketxn())
+            return createdTxns[-1]
+
+        dfrs = []
+
+        def operation(t):
+            self.assertIdentical(t, createdTxns[-1])
+            dfrs.append(Deferred())
+            return dfrs[-1]
+
+        d = inTransaction(createTxn, operation)
+        x = []
+        d.addCallback(x.append)
+        self.assertEquals(x, [])
+        self.assertEquals(len(dfrs), 1)
+        dfrs[0].callback(35)
+
+        # Commit in progress, so still no result...
+        self.assertEquals(x, [])
+        createdTxns[0].commits[0].callback(42)
+
+        # Committed, everything's done.
+        self.assertEquals(x, [35])
+
+
+
+class SimpleSchemaHelper(SchemaTestHelper):
+    def id(self):
+        return "worker"
+
+
+
+SQL = passthru
+
+nodeSchema = SQL(
+    """
+    create table NODE_INFO (
+      HOSTNAME varchar(255) not null,
+      PID integer not null,
+      PORT integer not null,
+      TIME timestamp default current_timestamp not null,
+      primary key (HOSTNAME, PORT)
+    );
+    """
+)
+
+jobSchema = SQL(
+    """
+    create table JOB (
+      JOB_ID      integer primary key default 1,
+      WORK_TYPE   varchar(255) not null,
+      PRIORITY    integer default 0,
+      WEIGHT      integer default 0,
+      NOT_BEFORE  timestamp default null,
+      NOT_AFTER   timestamp default null
+    );
+    """
+)
+
+schemaText = SQL(
+    """
+    create table DUMMY_WORK_ITEM (
+      WORK_ID integer primary key,
+      JOB_ID integer references JOB,
+      A integer, B integer,
+      DELETE_ON_LOAD integer default 0
+    );
+    create table DUMMY_WORK_DONE (
+      WORK_ID integer primary key,
+      JOB_ID integer references JOB,
+      A_PLUS_B integer
+    );
+    """
+)
+
+try:
+    schema = SchemaSyntax(SimpleSchemaHelper().schemaFromString(jobSchema + schemaText))
+
+    dropSQL = [
+        "drop table {name} cascade".format(name=table)
+        for table in ("DUMMY_WORK_ITEM", "DUMMY_WORK_DONE")
+    ] + ["delete from job"]
+except SkipTest as e:
+    DummyWorkDone = DummyWorkItem = object
+    skip = e
+else:
+    DummyWorkDone = fromTable(schema.DUMMY_WORK_DONE)
+    DummyWorkItem = fromTable(schema.DUMMY_WORK_ITEM)
+    skip = False
+
+
+
+class DummyWorkDone(WorkItem, DummyWorkDone):
+    """
+    Work result.
+    """
+
+
+
+class DummyWorkItem(WorkItem, DummyWorkItem):
+    """
+    Sample L{WorkItem} subclass that adds two integers together and stores them
+    in another table.
+    """
+
+    def doWork(self):
+        return DummyWorkDone.makeJob(
+            self.transaction, jobID=self.jobID + 100, workID=self.workID + 100, aPlusB=self.a + self.b
+        )
+
+
+    @classmethod
+    @inlineCallbacks
+    def loadForJob(cls, txn, *a):
+        """
+        Load L{DummyWorkItem} as normal...  unless the loaded item has
+        C{DELETE_ON_LOAD} set, in which case, do a deletion of this same row in
+        a concurrent transaction, then commit it.
+        """
+        workItems = yield super(DummyWorkItem, cls).loadForJob(txn, *a)
+        if workItems[0].deleteOnLoad:
+            otherTransaction = txn.concurrently()
+            otherSelf = yield super(DummyWorkItem, cls).loadForJob(txn, *a)
+            yield otherSelf[0].delete()
+            yield otherTransaction.commit()
+        returnValue(workItems)
+
+
+
+class AMPTests(TestCase):
+    """
+    Tests for L{AMP} faithfully relaying ids across the wire.
+    """
+
+    def test_sendTableWithName(self):
+        """
+        You can send a reference to a table through a L{SchemaAMP} via
+        L{TableSyntaxByName}.
+        """
+        client = AMP()
+
+        class SampleCommand(Command):
+            arguments = [("id", Integer())]
+
+        class Receiver(AMP):
+            @SampleCommand.responder
+            def gotIt(self, id):
+                self.it = id
+                return {}
+
+        server = Receiver()
+        clientT = StringTransport()
+        serverT = StringTransport()
+        client.makeConnection(clientT)
+        server.makeConnection(serverT)
+        client.callRemote(SampleCommand, id=123)
+        server.dataReceived(clientT.io.getvalue())
+        self.assertEqual(server.it, 123)
+
+
+
+class WorkItemTests(TestCase):
+    """
+    A L{WorkItem} is an item of work that can be executed.
+    """
+
+    def test_forTableName(self):
+        """
+        L{WorkItem.forTable} returns L{WorkItem} subclasses mapped to the given
+        table.
+        """
+        self.assertIdentical(
+            WorkItem.forTableName(schema.DUMMY_WORK_ITEM.model.name), DummyWorkItem
+        )
+
+
+    @inlineCallbacks
+    def test_enqueue(self):
+        """
+        L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
+        """
+        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+        sinceEpoch = astimestamp(fakeNow)
+        clock = Clock()
+        clock.advance(sinceEpoch)
+        qpool = PeerConnectionPool(clock, dbpool.connection, 0)
+        realChoosePerformer = qpool.choosePerformer
+        performerChosen = []
+
+        def catchPerformerChoice():
+            result = realChoosePerformer()
+            performerChosen.append(True)
+            return result
+
+        qpool.choosePerformer = catchPerformerChoice
+
+        @transactionally(dbpool.connection)
+        def check(txn):
+            return qpool.enqueueWork(
+                txn, DummyWorkItem, a=3, b=9,
+                notBefore=datetime.datetime(2012, 12, 13, 12, 12, 0)
+            )
+
+        proposal = yield check
+        yield proposal.whenProposed()
+
+        # Make sure we have one JOB and one DUMMY_WORK_ITEM
+        @transactionally(dbpool.connection)
+        def checkJob(txn):
+            return Select(
+                From=schema.JOB
+            ).on(txn)
+
+        jobs = yield checkJob
+        self.assertTrue(len(jobs) == 1)
+        self.assertTrue(jobs[0][1] == "DUMMY_WORK_ITEM")
+
+        @transactionally(dbpool.connection)
+        def checkWork(txn):
+            return Select(
+                From=schema.DUMMY_WORK_ITEM
+            ).on(txn)
+
+        work = yield checkWork
+        self.assertTrue(len(work) == 1)
+        self.assertTrue(work[0][1] == jobs[0][0])
+
+
+
+class WorkerConnectionPoolTests(TestCase):
+    """
+    A L{WorkerConnectionPool} is responsible for managing, in a node's
+    controller (master) process, the collection of worker (slave) processes
+    that are capable of executing queue work.
+    """
+
+
+
+class WorkProposalTests(TestCase):
+    """
+    Tests for L{WorkProposal}.
+    """
+
+    @inlineCallbacks
+    def test_whenProposedSuccess(self):
+        """
+        The L{Deferred} returned by L{WorkProposal.whenProposed} fires when the
+        SQL sent to the database has completed.
+        """
+        cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
+        cph.setUp(test=self)
+        lq = LocalQueuer(cph.createTransaction)
+        enqTxn = cph.createTransaction()
+        wp = yield lq.enqueueWork(enqTxn, DummyWorkItem, a=3, b=4)
+        r = yield wp.whenProposed()
+        self.assertEquals(r, wp)
+
+
+    def test_whenProposedFailure(self):
+        """
+        The L{Deferred} returned by L{WorkProposal.whenProposed} fails with an
+        errback when the SQL executed to create the WorkItem row fails.
+        """
+        cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
+        cph.setUp(self)
+        enqTxn = cph.createTransaction()
+        lq = LocalQueuer(cph.createTransaction)
+        self.failUnlessFailure(lq.enqueueWork(enqTxn, DummyWorkItem, a=3, b=4, bogus=3), TypeError)
+        enqTxn.abort()
+        self.flushLoggedErrors()
+
+
+
+class PeerConnectionPoolUnitTests(TestCase):
+    """
+    L{PeerConnectionPool} has many internal components.
+    """
+    def setUp(self):
+        """
+        Create a L{PeerConnectionPool} that is just initialized enough.
+        """
+        self.pcp = PeerConnectionPool(None, None, 4321)
+
+
+    def checkPerformer(self, cls):
+        """
+        Verify that the performer returned by
+        L{PeerConnectionPool.choosePerformer}.
+        """
+        performer = self.pcp.choosePerformer()
+        self.failUnlessIsInstance(performer, cls)
+        verifyObject(_IJobPerformer, performer)
+
+
+    def test_choosingPerformerWhenNoPeersAndNoWorkers(self):
+        """
+        If L{PeerConnectionPool.choosePerformer} is invoked when no workers
+        have spawned and no peers have established connections (either incoming
+        or outgoing), then it chooses an implementation of C{performJob} that
+        simply executes the work locally.
+        """
+        self.checkPerformer(LocalPerformer)
+
+
+    def test_choosingPerformerWithLocalCapacity(self):
+        """
+        If L{PeerConnectionPool.choosePerformer} is invoked when some workers
+        have spawned, then it should choose the worker pool as the local
+        performer.
+        """
+        # Give it some local capacity.
+        wlf = self.pcp.workerListenerFactory()
+        proto = wlf.buildProtocol(None)
+        proto.makeConnection(StringTransport())
+        # Sanity check.
+        self.assertEqual(len(self.pcp.workerPool.workers), 1)
+        self.assertEqual(self.pcp.workerPool.hasAvailableCapacity(), True)
+        # Now it has some capacity.
+        self.checkPerformer(WorkerConnectionPool)
+
+
+    def test_choosingPerformerFromNetwork(self):
+        """
+        If L{PeerConnectionPool.choosePerformer} is invoked when no workers
+        have spawned but some peers have connected, then it should choose a
+        connection from the network to perform it.
+        """
+        peer = PeerConnectionPool(None, None, 4322)
+        local = self.pcp.peerFactory().buildProtocol(None)
+        remote = peer.peerFactory().buildProtocol(None)
+        connection = Connection(local, remote)
+        connection.start()
+        self.checkPerformer(ConnectionFromPeerNode)
+
+
+    def test_performingWorkOnNetwork(self):
+        """
+        The L{performJob} command will get relayed to the remote peer
+        controller.
+        """
+        peer = PeerConnectionPool(None, None, 4322)
+        local = self.pcp.peerFactory().buildProtocol(None)
+        remote = peer.peerFactory().buildProtocol(None)
+        connection = Connection(local, remote)
+        connection.start()
+        d = Deferred()
+
+        class DummyPerformer(object):
+            def performJob(self, jobID):
+                self.jobID = jobID
+                return d
+
+        # Doing real database I/O in this test would be tedious so fake the
+        # first method in the call stack which actually talks to the DB.
+        dummy = DummyPerformer()
+
+        def chooseDummy(onlyLocally=False):
+            return dummy
+
+        peer.choosePerformer = chooseDummy
+        performed = local.performJob(7384)
+        performResult = []
+        performed.addCallback(performResult.append)
+
+        # Sanity check.
+        self.assertEquals(performResult, [])
+        connection.flush()
+        self.assertEquals(dummy.jobID, 7384)
+        self.assertEquals(performResult, [])
+        d.callback(128374)
+        connection.flush()
+        self.assertEquals(performResult, [None])
+
+
+    def test_choosePerformerSorted(self):
+        """
+        If L{PeerConnectionPool.choosePerformer} is invoked make it
+        return the peer with the least load.
+        """
+        peer = PeerConnectionPool(None, None, 4322)
+
+        class DummyPeer(object):
+            def __init__(self, name, load):
+                self.name = name
+                self.load = load
+
+            def currentLoadEstimate(self):
+                return self.load
+
+        apeer = DummyPeer("A", 1)
+        bpeer = DummyPeer("B", 0)
+        cpeer = DummyPeer("C", 2)
+        peer.addPeerConnection(apeer)
+        peer.addPeerConnection(bpeer)
+        peer.addPeerConnection(cpeer)
+
+        performer = peer.choosePerformer(onlyLocally=False)
+        self.assertEqual(performer, bpeer)
+
+        bpeer.load = 2
+        performer = peer.choosePerformer(onlyLocally=False)
+        self.assertEqual(performer, apeer)
+
+
+    @inlineCallbacks
+    def test_notBeforeWhenCheckingForLostWork(self):
+        """
+        L{PeerConnectionPool._periodicLostWorkCheck} should execute any
+        outstanding work items, but only those that are expired.
+        """
+        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        # An arbitrary point in time.
+        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+        # *why* does datetime still not have .astimestamp()
+        sinceEpoch = astimestamp(fakeNow)
+        clock = Clock()
+        clock.advance(sinceEpoch)
+        qpool = PeerConnectionPool(clock, dbpool.connection, 0)
+
+        # Let's create a couple of work items directly, not via the enqueue
+        # method, so that they exist but nobody will try to immediately execute
+        # them.
+
+        @transactionally(dbpool.connection)
+        @inlineCallbacks
+        def setup(txn):
+            # First, one that's right now.
+            yield DummyWorkItem.makeJob(txn, a=1, b=2, notBefore=fakeNow)
+
+            # Next, create one that's actually far enough into the past to run.
+            yield DummyWorkItem.makeJob(
+                txn, a=3, b=4, notBefore=(
+                    # Schedule it in the past so that it should have already
+                    # run.
+                    fakeNow - datetime.timedelta(
+                        seconds=qpool.queueProcessTimeout + 20
+                    )
+                )
+            )
+
+            # Finally, one that's actually scheduled for the future.
+            yield DummyWorkItem.makeJob(
+                txn, a=10, b=20, notBefore=fakeNow + datetime.timedelta(1000)
+            )
+        yield setup
+        yield qpool._periodicLostWorkCheck()
+
+        @transactionally(dbpool.connection)
+        def check(txn):
+            return DummyWorkDone.all(txn)
+
+        every = yield check
+        self.assertEquals([x.aPlusB for x in every], [7])
+
+
+    @inlineCallbacks
+    def test_notBeforeWhenEnqueueing(self):
+        """
+        L{PeerConnectionPool.enqueueWork} enqueues some work immediately, but
+        only executes it when enough time has elapsed to allow the C{notBefore}
+        attribute of the given work item to have passed.
+        """
+        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+        sinceEpoch = astimestamp(fakeNow)
+        clock = Clock()
+        clock.advance(sinceEpoch)
+        qpool = PeerConnectionPool(clock, dbpool.connection, 0)
+        realChoosePerformer = qpool.choosePerformer
+        performerChosen = []
+
+        def catchPerformerChoice():
+            result = realChoosePerformer()
+            performerChosen.append(True)
+            return result
+
+        qpool.choosePerformer = catchPerformerChoice
+
+        @transactionally(dbpool.connection)
+        def check(txn):
+            return qpool.enqueueWork(
+                txn, DummyWorkItem, a=3, b=9,
+                notBefore=datetime.datetime(2012, 12, 12, 12, 12, 20)
+            )
+
+        proposal = yield check
+        yield proposal.whenProposed()
+
+        # This is going to schedule the work to happen with some asynchronous
+        # I/O in the middle; this is a problem because how do we know when it's
+        # time to check to see if the work has started?  We need to intercept
+        # the thing that kicks off the work; we can then wait for the work
+        # itself.
+
+        self.assertEquals(performerChosen, [])
+
+        # Advance to exactly the appointed second.
+        clock.advance(20 - 12)
+        self.assertEquals(performerChosen, [True])
+
+        # FIXME: if this fails, it will hang, but that's better than no
+        # notification that it is broken at all.
+
+        result = yield proposal.whenExecuted()
+        self.assertIdentical(result, proposal)
+
+
+    @inlineCallbacks
+    def test_notBeforeBefore(self):
+        """
+        L{PeerConnectionPool.enqueueWork} will execute its work immediately if
+        the C{notBefore} attribute of the work item in question is in the past.
+        """
+        dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+        sinceEpoch = astimestamp(fakeNow)
+        clock = Clock()
+        clock.advance(sinceEpoch)
+        qpool = PeerConnectionPool(clock, dbpool.connection, 0)
+        realChoosePerformer = qpool.choosePerformer
+        performerChosen = []
+
+        def catchPerformerChoice():
+            result = realChoosePerformer()
+            performerChosen.append(True)
+            return result
+
+        qpool.choosePerformer = catchPerformerChoice
+
+        @transactionally(dbpool.connection)
+        def check(txn):
+            return qpool.enqueueWork(
+                txn, DummyWorkItem, a=3, b=9,
+                notBefore=datetime.datetime(2012, 12, 12, 12, 12, 0)
+            )
+
+        proposal = yield check
+        yield proposal.whenProposed()
+
+        clock.advance(1000)
+        # Advance far beyond the given timestamp.
+        self.assertEquals(performerChosen, [True])
+
+        result = yield proposal.whenExecuted()
+        self.assertIdentical(result, proposal)
+
+
+    def test_workerConnectionPoolPerformJob(self):
+        """
+        L{WorkerConnectionPool.performJob} performs work by selecting a
+        L{ConnectionFromWorker} and sending it a L{PerformJOB} command.
+        """
+        clock = Clock()
+        peerPool = PeerConnectionPool(clock, None, 4322)
+        factory = peerPool.workerListenerFactory()
+
+        def peer():
+            p = factory.buildProtocol(None)
+            t = StringTransport()
+            p.makeConnection(t)
+            return p, t
+
+        worker1, _ignore_trans1 = peer()
+        worker2, _ignore_trans2 = peer()
+
+        # Ask the worker to do something.
+        worker1.performJob(1)
+        self.assertEquals(worker1.currentLoad, 1)
+        self.assertEquals(worker2.currentLoad, 0)
+
+        # Now ask the pool to do something
+        peerPool.workerPool.performJob(2)
+        self.assertEquals(worker1.currentLoad, 1)
+        self.assertEquals(worker2.currentLoad, 1)
+
+
+    def test_poolStartServiceChecksForWork(self):
+        """
+        L{PeerConnectionPool.startService} kicks off the idle work-check loop.
+        """
+        reactor = MemoryReactorWithClock()
+        cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
+        then = datetime.datetime(2012, 12, 12, 12, 12, 0)
+        reactor.advance(astimestamp(then))
+        cph.setUp(self)
+        pcp = PeerConnectionPool(reactor, cph.pool.connection, 4321)
+        now = then + datetime.timedelta(seconds=pcp.queueProcessTimeout * 2)
+
+        @transactionally(cph.pool.connection)
+        def createOldWork(txn):
+            one = DummyWorkItem.makeJob(txn, jobID=100, workID=1, a=3, b=4, notBefore=then)
+            two = DummyWorkItem.makeJob(txn, jobID=101, workID=2, a=7, b=9, notBefore=now)
+            return gatherResults([one, two])
+
+        pcp.startService()
+        cph.flushHolders()
+        reactor.advance(pcp.queueProcessTimeout * 2)
+        self.assertEquals(
+            cph.rows("select * from DUMMY_WORK_DONE"),
+            [(101, 200, 7)]
+        )
+        cph.rows("delete from DUMMY_WORK_DONE")
+        reactor.advance(pcp.queueProcessTimeout * 2)
+        self.assertEquals(
+            cph.rows("select * from DUMMY_WORK_DONE"),
+            [(102, 201, 16)]
+        )
+
+
+
+class HalfConnection(object):
+    def __init__(self, protocol):
+        self.protocol = protocol
+        self.transport = StringTransport()
+
+
+    def start(self):
+        """
+        Hook up the protocol and the transport.
+        """
+        self.protocol.makeConnection(self.transport)
+
+
+    def extract(self):
+        """
+        Extract the data currently present in this protocol's output buffer.
+        """
+        io = self.transport.io
+        value = io.getvalue()
+        io.seek(0)
+        io.truncate()
+        return value
+
+
+    def deliver(self, data):
+        """
+        Deliver the given data to this L{HalfConnection}'s protocol's
+        C{dataReceived} method.
+
+        @return: a boolean indicating whether any data was delivered.
+        @rtype: L{bool}
+        """
+        if data:
+            self.protocol.dataReceived(data)
+            return True
+        return False
+
+
+
+class Connection(object):
+
+    def __init__(self, local, remote):
+        """
+        Connect two protocol instances to each other via string transports.
+        """
+        self.receiver = HalfConnection(local)
+        self.sender = HalfConnection(remote)
+
+
+    def start(self):
+        """
+        Start up the connection.
+        """
+        self.sender.start()
+        self.receiver.start()
+
+
+    def pump(self):
+        """
+        Relay data in one direction between the two connections.
+        """
+        result = self.receiver.deliver(self.sender.extract())
+        self.receiver, self.sender = self.sender, self.receiver
+        return result
+
+
+    def flush(self, turns=10):
+        """
+        Keep relaying data until there's no more.
+        """
+        for _ignore_x in range(turns):
+            if not (self.pump() or self.pump()):
+                return
+
+
+
+class PeerConnectionPoolIntegrationTests(TestCase):
+    """
+    L{PeerConnectionPool} is the service responsible for coordinating
+    eventually-consistent task queuing within a cluster.
+    """
+
+    @inlineCallbacks
+    def setUp(self):
+        """
+        L{PeerConnectionPool} requires access to a database and the reactor.
+        """
+        self.store = yield buildStore(self, None)
+
+        def doit(txn):
+            return txn.execSQL(schemaText)
+
+        yield inTransaction(
+            lambda: self.store.newTransaction("bonus schema"), doit
+        )
+
+        def indirectedTransactionFactory(*a):
+            """
+            Allow tests to replace "self.store.newTransaction" to provide
+            fixtures with extra methods on a test-by-test basis.
+            """
+            return self.store.newTransaction(*a)
+
+        def deschema():
+            @inlineCallbacks
+            def deletestuff(txn):
+                for stmt in dropSQL:
+                    yield txn.execSQL(stmt)
+            return inTransaction(
+                lambda *a: self.store.newTransaction(*a), deletestuff
+            )
+        self.addCleanup(deschema)
+
+        from twisted.internet import reactor
+        self.node1 = PeerConnectionPool(
+            reactor, indirectedTransactionFactory, 0)
+        self.node2 = PeerConnectionPool(
+            reactor, indirectedTransactionFactory, 0)
+
+        class FireMeService(Service, object):
+            def __init__(self, d):
+                super(FireMeService, self).__init__()
+                self.d = d
+
+            def startService(self):
+                self.d.callback(None)
+
+        d1 = Deferred()
+        d2 = Deferred()
+        FireMeService(d1).setServiceParent(self.node1)
+        FireMeService(d2).setServiceParent(self.node2)
+        ms = MultiService()
+        self.node1.setServiceParent(ms)
+        self.node2.setServiceParent(ms)
+        ms.startService()
+        self.addCleanup(ms.stopService)
+        yield gatherResults([d1, d2])
+        self.store.queuer = self.node1
+
+
+    def test_currentNodeInfo(self):
+        """
+        There will be two C{NODE_INFO} rows in the database, retrievable as two
+        L{NodeInfo} objects, once both nodes have started up.
+        """
+        @inlineCallbacks
+        def check(txn):
+            self.assertEquals(len((yield self.node1.activeNodes(txn))), 2)
+            self.assertEquals(len((yield self.node2.activeNodes(txn))), 2)
+        return inTransaction(self.store.newTransaction, check)
+
+
+    @inlineCallbacks
+    def test_enqueueHappyPath(self):
+        """
+        When a L{WorkItem} is scheduled for execution via
+        L{PeerConnectionPool.enqueueWork} its C{doWork} method will be invoked
+        by the time the L{Deferred} returned from the resulting
+        L{WorkProposal}'s C{whenExecuted} method has fired.
+        """
+        # TODO: this exact test should run against LocalQueuer as well.
+        def operation(txn):
+            # TODO: how does "enqueue" get associated with the transaction?
+            # This is not the fact with a raw t.w.enterprise transaction.
+            # Should probably do something with components.
+            return txn.enqueue(DummyWorkItem, a=3, b=4, jobID=100, workID=1,
+                               notBefore=datetime.datetime.utcnow())
+        result = yield inTransaction(self.store.newTransaction, operation)
+        # Wait for it to be executed.  Hopefully this does not time out :-\.
+        yield result.whenExecuted()
+
+        def op2(txn):
+            return Select(
+                [
+                    schema.DUMMY_WORK_DONE.WORK_ID,
+                    schema.DUMMY_WORK_DONE.JOB_ID,
+                    schema.DUMMY_WORK_DONE.A_PLUS_B,
+                ],
+                From=schema.DUMMY_WORK_DONE
+            ).on(txn)
+
+        rows = yield inTransaction(self.store.newTransaction, op2)
+        self.assertEquals(rows, [[101, 200, 7]])
+
+
+    @inlineCallbacks
+    def test_noWorkDoneWhenConcurrentlyDeleted(self):
+        """
+        When a L{WorkItem} is concurrently deleted by another transaction, it
+        should I{not} perform its work.
+        """
+        # Provide access to a method called "concurrently" everything using
+        original = self.store.newTransaction
+
+        def decorate(*a, **k):
+            result = original(*a, **k)
+            result.concurrently = self.store.newTransaction
+            return result
+
+        self.store.newTransaction = decorate
+
+        def operation(txn):
+            return txn.enqueue(
+                DummyWorkItem, a=30, b=40, workID=5678,
+                deleteOnLoad=1,
+                notBefore=datetime.datetime.utcnow()
+            )
+
+        proposal = yield inTransaction(self.store.newTransaction, operation)
+        yield proposal.whenExecuted()
+
+        # Sanity check on the concurrent deletion.
+        def op2(txn):
+            return Select(
+                [schema.DUMMY_WORK_ITEM.WORK_ID],
+                From=schema.DUMMY_WORK_ITEM
+            ).on(txn)
+
+        rows = yield inTransaction(self.store.newTransaction, op2)
+        self.assertEquals(rows, [])
+
+        def op3(txn):
+            return Select(
+                [
+                    schema.DUMMY_WORK_DONE.WORK_ID,
+                    schema.DUMMY_WORK_DONE.A_PLUS_B,
+                ],
+                From=schema.DUMMY_WORK_DONE
+            ).on(txn)
+
+        rows = yield inTransaction(self.store.newTransaction, op3)
+        self.assertEquals(rows, [])
+
+
+
+class DummyProposal(object):
+
+    def __init__(self, *ignored):
+        pass
+
+
+    def _start(self):
+        pass
+
+
+
+class BaseQueuerTests(TestCase):
+
+    def setUp(self):
+        self.proposal = None
+        self.patch(twext.enterprise.jobqueue, "WorkProposal", DummyProposal)
+
+
+    def _proposalCallback(self, proposal):
+        self.proposal = proposal
+
+
+    @inlineCallbacks
+    def test_proposalCallbacks(self):
+        queuer = _BaseQueuer()
+        queuer.callWithNewProposals(self._proposalCallback)
+        self.assertEqual(self.proposal, None)
+        yield queuer.enqueueWork(None, None)
+        self.assertNotEqual(self.proposal, None)
+
+
+
+class NonPerformingQueuerTests(TestCase):
+
+    @inlineCallbacks
+    def test_choosePerformer(self):
+        queuer = NonPerformingQueuer()
+        performer = queuer.choosePerformer()
+        result = (yield performer.performJob(None))
+        self.assertEquals(result, None)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140301/baad6f2a/attachment-0001.html>


More information about the calendarserver-changes mailing list