[CalendarServer-changes] [12779] twext/branches/users/cdaboo/jobs/twext/enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Sat Mar 1 07:42:17 PST 2014
Revision: 12779
http://trac.calendarserver.org//changeset/12779
Author: cdaboo at apple.com
Date: 2014-03-01 07:42:17 -0800 (Sat, 01 Mar 2014)
Log Message:
-----------
New job queue based work queue implementation. Current this does just what the old queue did, but with
a centralized "job" queue where all work items are listed. Eventually this will be made smarter to
automatically deal with coalescing work, repetitive work, pooled work, as well as eliminate head-of-line
blocking.
Modified Paths:
--------------
twext/branches/users/cdaboo/jobs/twext/enterprise/adbapi2.py
twext/branches/users/cdaboo/jobs/twext/enterprise/dal/model.py
twext/branches/users/cdaboo/jobs/twext/enterprise/dal/parseschema.py
twext/branches/users/cdaboo/jobs/twext/enterprise/dal/record.py
twext/branches/users/cdaboo/jobs/twext/enterprise/dal/test/test_parseschema.py
twext/branches/users/cdaboo/jobs/twext/enterprise/ienterprise.py
twext/branches/users/cdaboo/jobs/twext/enterprise/test/test_adbapi2.py
Added Paths:
-----------
twext/branches/users/cdaboo/jobs/twext/enterprise/jobqueue.py
twext/branches/users/cdaboo/jobs/twext/enterprise/test/test_jobqueue.py
Modified: twext/branches/users/cdaboo/jobs/twext/enterprise/adbapi2.py
===================================================================
--- twext/branches/users/cdaboo/jobs/twext/enterprise/adbapi2.py 2014-03-01 15:17:14 UTC (rev 12778)
+++ twext/branches/users/cdaboo/jobs/twext/enterprise/adbapi2.py 2014-03-01 15:42:17 UTC (rev 12779)
@@ -155,12 +155,12 @@
noisy = False
def __init__(self, pool, threadHolder, connection, cursor):
- self._pool = pool
- self._completed = "idle"
- self._cursor = cursor
+ self._pool = pool
+ self._completed = "idle"
+ self._cursor = cursor
self._connection = connection
- self._holder = threadHolder
- self._first = True
+ self._holder = threadHolder
+ self._first = True
@_forward
@@ -169,14 +169,12 @@
The paramstyle attribute is mirrored from the connection pool.
"""
-
@_forward
def dialect(self):
"""
The dialect attribute is mirrored from the connection pool.
"""
-
def _reallyExecSQL(self, sql, args=None, raiseOnZeroRowCount=None):
"""
Execute the given SQL on a thread, using a DB-API 2.0 cursor.
@@ -280,7 +278,7 @@
# that we cannot workaround or address automatically, so no
# try:except: for them.
self._connection = self._pool.connectionFactory()
- self._cursor = self._connection.cursor()
+ self._cursor = self._connection.cursor()
# Note that although this method is being invoked recursively,
# the "_first" flag is re-set at the very top, so we will _not_
@@ -382,9 +380,9 @@
transaction.
"""
self._completed = "released"
- self._stopped = True
- holder = self._holder
- self._holder = None
+ self._stopped = True
+ holder = self._holder
+ self._holder = None
def _reallyClose():
if self._cursor is None:
@@ -417,8 +415,8 @@
return fail(ConnectionError(self.reason))
execSQL = _everything
- commit = _everything
- abort = _everything
+ commit = _everything
+ abort = _everything
@@ -593,12 +591,12 @@
def __init__(self, pool, baseTxn):
super(_SingleTxn, self).__init__()
- self._pool = pool
- self._baseTxn = baseTxn
- self._completed = False
- self._currentBlock = None
- self._blockedQueue = None
- self._pendingBlocks = []
+ self._pool = pool
+ self._baseTxn = baseTxn
+ self._completed = False
+ self._currentBlock = None
+ self._blockedQueue = None
+ self._pendingBlocks = []
self._stillExecuting = []
@@ -615,7 +613,7 @@
implementation of L{IAsyncTransaction} that will actually do the work;
either a L{_ConnectedTxn} or a L{_NoTxn}.
"""
- spooledBase = self._baseTxn
+ spooledBase = self._baseTxn
self._baseTxn = baseTxn
spooledBase._unspool(baseTxn)
@@ -892,8 +890,8 @@
and subsequent SQL executions for this connection.
@type holder: L{ThreadHolder}
"""
- self._pool = pool
- self._holder = holder
+ self._pool = pool
+ self._holder = holder
self._aborted = False
@@ -992,11 +990,11 @@
if name is not None:
self.name = name
- self._free = []
- self._busy = []
- self._waiting = []
- self._finishing = []
- self._stopping = False
+ self._free = []
+ self._busy = []
+ self._waiting = []
+ self._finishing = []
+ self._stopping = False
def startService(self):
@@ -1107,7 +1105,7 @@
# support threadlevel=1; we can't necessarily cursor() in a
# different thread than we do transactions in.
connection = self.connectionFactory()
- cursor = connection.cursor()
+ cursor = connection.cursor()
return (connection, cursor)
def finishInit((connection, cursor)):
@@ -1364,8 +1362,8 @@
Initialize a mapping of transaction IDs to transaction objects.
"""
super(ConnectionPoolConnection, self).__init__()
- self.pool = pool
- self._txns = {}
+ self.pool = pool
+ self._txns = {}
self._blocks = {}
@@ -1479,10 +1477,10 @@
):
# See DEFAULT_PARAM_STYLE FIXME above.
super(ConnectionPoolClient, self).__init__()
- self._nextID = count().next
- self._txns = weakref.WeakValueDictionary()
- self._queries = {}
- self.dialect = dialect
+ self._nextID = count().next
+ self._txns = weakref.WeakValueDictionary()
+ self._queries = {}
+ self.dialect = dialect
self.paramstyle = paramstyle
@@ -1507,8 +1505,8 @@
@rtype: L{IAsyncTransaction}
"""
- txnid = str(self._nextID())
- txn = _NetTransaction(client=self, transactionID=txnid)
+ txnid = str(self._nextID())
+ txn = _NetTransaction(client=self, transactionID=txnid)
self._txns[txnid] = txn
self.callRemote(StartTxn, transactionID=txnid)
return txn
@@ -1529,10 +1527,10 @@
class _Query(object):
def __init__(self, sql, raiseOnZeroRowCount, args):
- self.sql = sql
- self.args = args
- self.results = []
- self.deferred = Deferred()
+ self.sql = sql
+ self.args = args
+ self.results = []
+ self.deferred = Deferred()
self.raiseOnZeroRowCount = raiseOnZeroRowCount
@@ -1606,11 +1604,11 @@
transaction identifier.
"""
super(_NetTransaction, self).__init__()
- self._client = client
+ self._client = client
self._transactionID = transactionID
- self._completed = False
- self._committing = False
- self._committed = False
+ self._completed = False
+ self._committing = False
+ self._committed = False
@property
Modified: twext/branches/users/cdaboo/jobs/twext/enterprise/dal/model.py
===================================================================
--- twext/branches/users/cdaboo/jobs/twext/enterprise/dal/model.py 2014-03-01 15:17:14 UTC (rev 12778)
+++ twext/branches/users/cdaboo/jobs/twext/enterprise/dal/model.py 2014-03-01 15:42:17 UTC (rev 12779)
@@ -30,6 +30,7 @@
"Index",
"PseudoIndex",
"Sequence",
+ "Function",
"Schema",
]
@@ -516,6 +517,36 @@
+class Function(FancyEqMixin, object):
+ """
+ A function object.
+ """
+
+ compareAttributes = "name".split()
+
+ def __init__(self, schema, name):
+ _checkstr(name)
+ self.name = name
+ schema.functions.append(self)
+
+
+ def __repr__(self):
+ return "<Function %r>" % (self.name,)
+
+
+ def compare(self, other):
+ """
+ Return the differences between two functions.
+
+ @param other: the function to compare with
+ @type other: L{Function}
+ """
+
+ # TODO: ought to compare function body but we don't track that
+ return []
+
+
+
def _namedFrom(name, sequence):
"""
Retrieve an item with a given name attribute from a given sequence, or
@@ -538,6 +569,7 @@
self.tables = []
self.indexes = []
self.sequences = []
+ self.functions = []
def __repr__(self):
@@ -576,6 +608,7 @@
_compareLists(self.tables, other.tables, "table")
_compareLists(self.pseudoIndexes(), other.pseudoIndexes(), "index")
_compareLists(self.sequences, other.sequences, "sequence")
+ _compareLists(self.functions, other.functions, "functions")
return results
@@ -617,3 +650,7 @@
def indexNamed(self, name):
return _namedFrom(name, self.indexes)
+
+
+ def functionNamed(self, name):
+ return _namedFrom(name, self.functions)
Modified: twext/branches/users/cdaboo/jobs/twext/enterprise/dal/parseschema.py
===================================================================
--- twext/branches/users/cdaboo/jobs/twext/enterprise/dal/parseschema.py 2014-03-01 15:17:14 UTC (rev 12778)
+++ twext/branches/users/cdaboo/jobs/twext/enterprise/dal/parseschema.py 2014-03-01 15:42:17 UTC (rev 12779)
@@ -43,7 +43,7 @@
Function, Comparison)
from twext.enterprise.dal.model import (
- Schema, Table, SQLType, ProcedureCall, Constraint, Sequence, Index)
+ Schema, Table, SQLType, ProcedureCall, Constraint, Sequence, Index, Function as FunctionModel)
from twext.enterprise.dal.syntax import (
ColumnSyntax, CompoundComparison, Constant, Function as FunctionSyntax
@@ -208,6 +208,12 @@
columnName = nameOrIdentifier(token)
idx.addColumn(idx.table.columnNamed(columnName))
+ elif createType == u"FUNCTION":
+ FunctionModel(
+ schema,
+ stmt.token_next(2, True).value.encode("utf-8")
+ )
+
elif stmt.get_type() == "INSERT":
insertTokens = iterSignificant(stmt)
expect(insertTokens, ttype=Keyword.DML, value="INSERT")
@@ -234,6 +240,15 @@
schema.tableNamed(tableName).insertSchemaRow(rowData)
+ elif stmt.get_type() == "CREATE OR REPLACE":
+ createType = stmt.token_next(1, True).value.upper()
+
+ if createType == u"FUNCTION":
+ FunctionModel(
+ schema,
+ stmt.token_next(2, True).token_first(True).token_first(True).value.encode("utf-8")
+ )
+
else:
print("unknown type:", stmt.get_type())
Modified: twext/branches/users/cdaboo/jobs/twext/enterprise/dal/record.py
===================================================================
--- twext/branches/users/cdaboo/jobs/twext/enterprise/dal/record.py 2014-03-01 15:17:14 UTC (rev 12778)
+++ twext/branches/users/cdaboo/jobs/twext/enterprise/dal/record.py 2014-03-01 15:42:17 UTC (rev 12779)
@@ -171,6 +171,23 @@
return r
+ @classmethod
+ def fromTable(cls, table):
+ """
+ Initialize from a L{Table} at run time.
+
+ @param table: table containing the record data
+ @type table: L{Table}
+ """
+ cls.__attrmap__ = {}
+ cls.__colmap__ = {}
+ allColumns = list(table)
+ for column in allColumns:
+ attrname = cls.namingConvention(column.model.name)
+ cls.__attrmap__[attrname] = column
+ cls.__colmap__[column] = attrname
+
+
@staticmethod
def namingConvention(columnName):
"""
@@ -274,7 +291,7 @@
"""
for setAttribute, setValue in attributeList:
setColumn = self.__attrmap__[setAttribute]
- if setColumn.model.type.name == "timestamp":
+ if setColumn.model.type.name == "timestamp" and setValue is not None:
setValue = parseSQLTimestamp(setValue)
setattr(self, setAttribute, setValue)
@@ -335,7 +352,7 @@
@classmethod
- def query(cls, transaction, expr, order=None, ascending=True, group=None):
+ def query(cls, transaction, expr, order=None, ascending=True, group=None, forUpdate=False, noWait=False):
"""
Query the table that corresponds to C{cls}, and return instances of
C{cls} corresponding to the rows that are returned from that table.
@@ -353,15 +370,29 @@
@param group: a L{ColumnSyntax} to group the resulting record objects
by.
+
+ @param forUpdate: do a SELECT ... FOR UPDATE
+ @type forUpdate: L{bool}
+ @param noWait: include NOWAIT with the FOR UPDATE
+ @type noWait: L{bool}
"""
kw = {}
if order is not None:
kw.update(OrderBy=order, Ascending=ascending)
if group is not None:
kw.update(GroupBy=group)
+ if forUpdate:
+ kw.update(ForUpdate=True)
+ if noWait:
+ kw.update(NoWait=True)
return cls._rowsFromQuery(
transaction,
- Select(list(cls.table), From=cls.table, Where=expr, **kw),
+ Select(
+ list(cls.table),
+ From=cls.table,
+ Where=expr,
+ **kw
+ ),
None
)
Modified: twext/branches/users/cdaboo/jobs/twext/enterprise/dal/test/test_parseschema.py
===================================================================
--- twext/branches/users/cdaboo/jobs/twext/enterprise/dal/test/test_parseschema.py 2014-03-01 15:17:14 UTC (rev 12778)
+++ twext/branches/users/cdaboo/jobs/twext/enterprise/dal/test/test_parseschema.py 2014-03-01 15:42:17 UTC (rev 12779)
@@ -403,3 +403,27 @@
"z-unique:(c)",
))
)
+
+
+ def test_functions(self):
+ """
+ A 'create (or replace) function' statement will add an L{Function} object to a L{Schema}'s
+ C{functions} list.
+ """
+ s = self.schemaFromString(
+ """
+CREATE FUNCTION increment(i integer) RETURNS integer AS $$
+BEGIN
+ RETURN i + 1;
+END;
+$$ LANGUAGE plpgsql;
+CREATE OR REPLACE FUNCTION decrement(i integer) RETURNS integer AS $$
+BEGIN
+ RETURN i - 1;
+END;
+$$ LANGUAGE plpgsql;
+ """
+ )
+ self.assertTrue(s.functionNamed("increment") is not None)
+ self.assertTrue(s.functionNamed("decrement") is not None)
+ self.assertRaises(KeyError, s.functionNamed, "merge")
Modified: twext/branches/users/cdaboo/jobs/twext/enterprise/ienterprise.py
===================================================================
--- twext/branches/users/cdaboo/jobs/twext/enterprise/ienterprise.py 2014-03-01 15:17:14 UTC (rev 12778)
+++ twext/branches/users/cdaboo/jobs/twext/enterprise/ienterprise.py 2014-03-01 15:42:17 UTC (rev 12779)
@@ -115,7 +115,6 @@
transaction has already been committed or rolled back.
"""
-
def preCommit(operation):
"""
Perform the given operation when this L{IAsyncTransaction}'s C{commit}
@@ -128,7 +127,6 @@
will not fire until that L{Deferred} fires.
"""
-
def postCommit(operation):
"""
Perform the given operation only after this L{IAsyncTransaction}
@@ -140,7 +138,6 @@
will not fire until that L{Deferred} fires.
"""
-
def abort():
"""
Roll back changes caused by this transaction.
@@ -149,7 +146,6 @@
rollback of this transaction.
"""
-
def postAbort(operation):
"""
Invoke a callback after abort.
@@ -160,7 +156,6 @@
L{Deferred}.
"""
-
def commandBlock():
"""
Create an object which will cause the commands executed on it to be
@@ -257,7 +252,6 @@
@return: the concrete value which should be passed to the DB-API layer.
"""
-
def postQuery(cursor):
"""
After running a query, invoke this method in the DB-API thread.
@@ -288,17 +282,16 @@
@param workItemType: the type of work item to create.
@type workItemType: L{type}, specifically, a subtype of L{WorkItem
- <twext.enterprise.queue.WorkItem>}
+ <twext.enterprise.jobqueue.WorkItem>}
@param kw: The keyword parameters are relayed to C{workItemType.create}
to create an appropriately initialized item.
@return: a work proposal that allows tracking of the various phases of
completion of the work item.
- @rtype: L{twext.enterprise.queue.WorkItem}
+ @rtype: L{twext.enterprise.jobqueue.WorkItem}
"""
-
def callWithNewProposals(self, callback):
"""
Tells the IQueuer to call a callback method whenever a new WorkProposal
@@ -308,7 +301,6 @@
L{WorkProposal}
"""
-
def transferProposalCallbacks(self, newQueuer):
"""
Transfer the registered callbacks to the new queuer.
Added: twext/branches/users/cdaboo/jobs/twext/enterprise/jobqueue.py
===================================================================
--- twext/branches/users/cdaboo/jobs/twext/enterprise/jobqueue.py (rev 0)
+++ twext/branches/users/cdaboo/jobs/twext/enterprise/jobqueue.py 2014-03-01 15:42:17 UTC (rev 12779)
@@ -0,0 +1,1592 @@
+# -*- test-case-name: twext.enterprise.test.test_queue -*-
+##
+# Copyright (c) 2012-2014 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+
+"""
+L{twext.enterprise.jobqueue} is an U{eventually consistent
+<https://en.wikipedia.org/wiki/Eventual_consistency>} task-queuing system for
+use by applications with multiple front-end servers talking to a single
+database instance, that want to defer and parallelize work that involves
+storing the results of computation.
+
+By enqueuing with L{twisted.enterprise.queue}, you may guarantee that the work
+will I{eventually} be done, and reliably commit to doing it in the future, but
+defer it if it does not need to be done I{now}.
+
+To pick a hypothetical example, let's say that you have a store which wants to
+issue a promotional coupon based on a customer loyalty program, in response to
+an administrator clicking on a button. Determining the list of customers to
+send the coupon to is quick: a simple query will get you all their names.
+However, analyzing each user's historical purchase data is (A) time consuming
+and (B) relatively isolated, so it would be good to do that in parallel, and it
+would also be acceptable to have that happen at a later time, outside the
+critical path.
+
+Such an application might be implemented with this queuing system like so::
+
+ from twext.enterprise.jobqueue import WorkItem, queueFromTransaction
+ from twext.enterprise.dal.parseschema import addSQLToSchema
+ from twext.enterprise.dal.syntax import SchemaSyntax
+
+ schemaModel = Schema()
+ addSQLToSchema('''
+ create table CUSTOMER (NAME varchar(255), ID integer primary key);
+ create table PRODUCT (NAME varchar(255), ID integer primary key);
+ create table PURCHASE (NAME varchar(255), WHEN timestamp,
+ CUSTOMER_ID integer references CUSTOMER,
+ PRODUCT_ID integer references PRODUCT;
+ create table COUPON_WORK (WORK_ID integer primary key,
+ CUSTOMER_ID integer references CUSTOMER);
+ create table COUPON (ID integer primary key,
+ CUSTOMER_ID integer references customer,
+ AMOUNT integer);
+ ''')
+ schema = SchemaSyntax(schemaModel)
+
+ class Coupon(Record, fromTable(schema.COUPON_WORK)):
+ pass
+
+ class CouponWork(WorkItem, fromTable(schema.COUPON_WORK)):
+ @inlineCallbacks
+ def doWork(self):
+ purchases = yield Select(schema.PURCHASE,
+ Where=schema.PURCHASE.CUSTOMER_ID
+ == self.customerID).on(self.transaction)
+ couponAmount = yield doSomeMathThatTakesAWhile(purchases)
+ yield Coupon.create(customerID=self.customerID,
+ amount=couponAmount)
+
+ @inlineCallbacks
+ def makeSomeCoupons(txn):
+ # Note, txn was started before, will be committed later...
+ for customerID in (yield Select([schema.CUSTOMER.CUSTOMER_ID],
+ From=schema.CUSTOMER).on(txn)):
+ # queuer is a provider of IQueuer, of which there are several
+ # implementations in this module.
+ queuer.enqueueWork(txn, CouponWork, customerID=customerID)
+"""
+
+from functools import wraps
+from datetime import datetime
+
+from zope.interface import implements
+
+from twisted.application.service import MultiService
+from twisted.internet.protocol import Factory
+from twisted.internet.defer import (
+ inlineCallbacks, returnValue, Deferred, passthru, succeed
+)
+from twisted.internet.endpoints import TCP4ClientEndpoint
+from twisted.protocols.amp import AMP, Command, Integer, String
+from twisted.python.reflect import qual
+from twisted.python import log
+
+from twext.enterprise.dal.syntax import SchemaSyntax, Lock, NamedValue, Select, \
+ Count
+
+from twext.enterprise.dal.model import ProcedureCall
+from twext.enterprise.dal.record import Record, fromTable, NoSuchRecord
+from twisted.python.failure import Failure
+
+from twext.enterprise.dal.model import Table, Schema, SQLType, Constraint
+from twisted.internet.endpoints import TCP4ServerEndpoint
+from twext.enterprise.ienterprise import IQueuer
+from zope.interface.interface import Interface
+from twext.enterprise.locking import NamedLock
+
+
+class _IJobPerformer(Interface):
+ """
+ An object that can perform work.
+
+ Internal interface; implemented by several classes here since work has to
+ (in the worst case) pass from worker->controller->controller->worker.
+ """
+
+ def performJob(jobID): #@NoSelf
+ """
+ @param jobID: The primary key identifier of the given job.
+ @type jobID: L{int}
+
+ @return: a L{Deferred} firing with an empty dictionary when the work is
+ complete.
+ @rtype: L{Deferred} firing L{dict}
+ """
+
+
+
+def makeNodeSchema(inSchema):
+ """
+ Create a self-contained schema for L{NodeInfo} to use, in C{inSchema}.
+
+ @param inSchema: a L{Schema} to add the node-info table to.
+ @type inSchema: L{Schema}
+
+ @return: a schema with just the one table.
+ """
+ # Initializing this duplicate schema avoids a circular dependency, but this
+ # should really be accomplished with independent schema objects that the
+ # transaction is made aware of somehow.
+ NodeTable = Table(inSchema, "NODE_INFO")
+
+ NodeTable.addColumn("HOSTNAME", SQLType("varchar", 255))
+ NodeTable.addColumn("PID", SQLType("integer", None))
+ NodeTable.addColumn("PORT", SQLType("integer", None))
+ NodeTable.addColumn("TIME", SQLType("timestamp", None)).setDefaultValue(
+ # Note: in the real data structure, this is actually a not-cleaned-up
+ # sqlparse internal data structure, but it *should* look closer to
+ # this.
+ ProcedureCall("timezone", ["UTC", NamedValue("CURRENT_TIMESTAMP")])
+ )
+ for column in NodeTable.columns:
+ NodeTable.tableConstraint(Constraint.NOT_NULL, [column.name])
+ NodeTable.primaryKey = [
+ NodeTable.columnNamed("HOSTNAME"),
+ NodeTable.columnNamed("PORT"),
+ ]
+
+ return inSchema
+
+NodeInfoSchema = SchemaSyntax(makeNodeSchema(Schema(__file__)))
+
+
+
+def makeJobSchema(inSchema):
+ """
+ Create a self-contained schema for L{JobInfo} to use, in C{inSchema}.
+
+ @param inSchema: a L{Schema} to add the node-info table to.
+ @type inSchema: L{Schema}
+
+ @return: a schema with just the one table.
+ """
+ # Initializing this duplicate schema avoids a circular dependency, but this
+ # should really be accomplished with independent schema objects that the
+ # transaction is made aware of somehow.
+ JobTable = Table(inSchema, "JOB")
+
+ JobTable.addColumn("JOB_ID", SQLType("integer", None)).setDefaultValue(
+ ProcedureCall("nextval", ["JOB_SEQ"])
+ )
+ JobTable.addColumn("WORK_TYPE", SQLType("varchar", 255))
+ JobTable.addColumn("PRIORITY", SQLType("integer", 0))
+ JobTable.addColumn("WEIGHT", SQLType("integer", 0))
+ JobTable.addColumn("NOT_BEFORE", SQLType("timestamp", None))
+ JobTable.addColumn("NOT_AFTER", SQLType("timestamp", None))
+ for column in ("JOB_ID", "WORK_TYPE"):
+ JobTable.tableConstraint(Constraint.NOT_NULL, [column])
+ JobTable.primaryKey = [JobTable.columnNamed("JOB_ID"), ]
+
+ return inSchema
+
+JobInfoSchema = SchemaSyntax(makeJobSchema(Schema(__file__)))
+
+
+
+ at inlineCallbacks
+def inTransaction(transactionCreator, operation):
+ """
+ Perform the given operation in a transaction, committing or aborting as
+ required.
+
+ @param transactionCreator: a 0-arg callable that returns an
+ L{IAsyncTransaction}
+
+ @param operation: a 1-arg callable that takes an L{IAsyncTransaction} and
+ returns a value.
+
+ @return: a L{Deferred} that fires with C{operation}'s result or fails with
+ its error, unless there is an error creating, aborting or committing
+ the transaction.
+ """
+ txn = transactionCreator()
+ try:
+ result = yield operation(txn)
+ except:
+ f = Failure()
+ yield txn.abort()
+ returnValue(f)
+ else:
+ yield txn.commit()
+ returnValue(result)
+
+
+
+def astimestamp(v):
+ """
+ Convert the given datetime to a POSIX timestamp.
+ """
+ return (v - datetime.utcfromtimestamp(0)).total_seconds()
+
+
+
+class NodeInfo(Record, fromTable(NodeInfoSchema.NODE_INFO)):
+ """
+ A L{NodeInfo} is information about a currently-active Node process.
+ """
+
+ def endpoint(self, reactor):
+ """
+ Create an L{IStreamServerEndpoint} that will talk to the node process
+ that is described by this L{NodeInfo}.
+
+ @return: an endpoint that will connect to this host.
+ @rtype: L{IStreamServerEndpoint}
+ """
+ return TCP4ClientEndpoint(reactor, self.hostname, self.port)
+
+
+
+def abstract(thunk):
+ """
+ The decorated function is abstract.
+
+ @note: only methods are currently supported.
+ """
+ @classmethod
+ @wraps(thunk)
+ def inner(cls, *a, **k):
+ raise NotImplementedError(
+ qual(cls) + " does not implement " + thunk.func_name
+ )
+ return inner
+
+
+
+class JobItem(Record, fromTable(JobInfoSchema.JOB)):
+ """
+ An item in the job table. This is typically not directly used by code creating
+ work items, but rather is used for internal book keeping of jobs associated with
+ work items.
+ """
+
+ @inlineCallbacks
+ def getWorkForJob(self):
+ workItemClass = WorkItem.forTableName(self.workType)
+ workItems = yield workItemClass.loadForJob(self.transaction, self.jobID)
+ returnValue(workItems[0] if len(workItems) == 1 else None)
+
+
+ @inlineCallbacks
+ def run(self):
+ """
+ Run this job item by finding the appropriate work item class and running that.
+ """
+ workItem = yield self.getWorkForJob()
+ if workItem is not None:
+ if workItem.group is not None:
+ yield NamedLock.acquire(self.transaction, workItem.group)
+
+ try:
+ # Once the work is done we delete ourselves
+ yield workItem.delete()
+ except NoSuchRecord:
+ # The record has already been removed
+ pass
+ else:
+ yield workItem.doWork()
+
+ try:
+ # Once the work is done we delete ourselves
+ yield self.delete()
+ except NoSuchRecord:
+ # The record has already been removed
+ pass
+
+
+ @classmethod
+ @inlineCallbacks
+ def histogram(cls, txn):
+ """
+ Generate a histogram of work items currently in the queue.
+ """
+ jb = JobInfoSchema.JOB
+ rows = yield Select(
+ [jb.WORK_TYPE, Count(jb.WORK_TYPE)],
+ From=jb,
+ GroupBy=jb.WORK_TYPE
+ ).on(txn)
+ results = dict(rows)
+
+ # Add in empty data for other work
+ allwork = WorkItem.__subclasses__()
+ for workitem in allwork:
+ if workitem.table.model.name not in results:
+ results[workitem.table.model.name] = 0
+ returnValue(results)
+
+
+ @classmethod
+ def numberOfWorkTypes(cls):
+ return len(WorkItem.__subclasses__())
+
+
+# Priority for work - used to order work items in the job queue
+WORK_PRIORITY_LOW = 1
+WORK_PRIORITY_MEDIUM = 2
+WORK_PRIORITY_HIGH = 3
+
+
+class WorkItem(Record):
+ """
+ A L{WorkItem} is an item of work which may be stored in a database, then
+ executed later.
+
+ L{WorkItem} is an abstract class, since it is a L{Record} with no table
+ associated via L{fromTable}. Concrete subclasses must associate a specific
+ table by inheriting like so::
+
+ class MyWorkItem(WorkItem, fromTable(schema.MY_TABLE)):
+
+ Concrete L{WorkItem}s should generally not be created directly; they are
+ both created and thereby implicitly scheduled to be executed by calling
+ L{enqueueWork <twext.enterprise.ienterprise.IQueuer.enqueueWork>} with the
+ appropriate L{WorkItem} concrete subclass. There are different queue
+ implementations (L{PeerConnectionPool} and L{LocalQueuer}, for example), so
+ the exact timing and location of the work execution may differ.
+
+ L{WorkItem}s may be constrained in the ordering and timing of their
+ execution, to control concurrency and for performance reasons respectively.
+
+ Although all the usual database mutual-exclusion rules apply to work
+ executed in L{WorkItem.doWork}, implicit database row locking is not always
+ the best way to manage concurrency. They have some problems, including:
+
+ - implicit locks are easy to accidentally acquire out of order, which
+ can lead to deadlocks
+
+ - implicit locks are easy to forget to acquire correctly - for example,
+ any read operation which subsequently turns into a write operation
+ must have been acquired with C{Select(..., ForUpdate=True)}, but it
+ is difficult to consistently indicate that methods which abstract out
+ read operations must pass this flag in certain cases and not others.
+
+ - implicit locks are held until the transaction ends, which means that
+ if expensive (long-running) queue operations share the same lock with
+ cheap (short-running) queue operations or user interactions, the
+ cheap operations all have to wait for the expensive ones to complete,
+ but continue to consume whatever database resources they were using.
+
+ In order to ameliorate these problems with potentially concurrent work
+ that uses the same resources, L{WorkItem} provides a database-wide mutex
+ that is automatically acquired at the beginning of the transaction and
+ released at the end. To use it, simply L{align
+ <twext.enterprise.dal.record.Record.namingConvention>} the C{group}
+ attribute on your L{WorkItem} subclass with a column holding a string
+ (varchar). L{WorkItem} subclasses with the same value for C{group} will
+ not execute their C{doWork} methods concurrently. Furthermore, if the lock
+ cannot be quickly acquired, database resources associated with the
+ transaction attempting it will be released, and the transaction rolled back
+ until a future transaction I{can} can acquire it quickly. If you do not
+ want any limits to concurrency, simply leave it set to C{None}.
+
+ In some applications it's possible to coalesce work together; to grab
+ multiple L{WorkItem}s in one C{doWork} transaction. All you need to do is
+ to delete the rows which back other L{WorkItem}s from the database, and
+ they won't be processed. Using the C{group} attribute, you can easily
+ prevent concurrency so that you can easily group these items together and
+ remove them as a set (otherwise, other workers might be attempting to
+ concurrently work on them and you'll get deletion errors).
+
+ However, if doing more work at once is less expensive, and you want to
+ avoid processing lots of individual rows in tiny transactions, you may also
+ delay the execution of a L{WorkItem} by setting its C{notBefore} attribute.
+ This must be backed by a database timestamp, so that processes which happen
+ to be restarting and examining the work to be done in the database don't
+ jump the gun and do it too early.
+
+ @cvar workID: the unique identifier (primary key) for items of this type.
+ On an instance of a concrete L{WorkItem} subclass, this attribute must
+ be an integer; on the concrete L{WorkItem} subclass itself, this
+ attribute must be a L{twext.enterprise.dal.syntax.ColumnSyntax}. Note
+ that this is automatically taken care of if you simply have a
+ corresponding C{work_id} column in the associated L{fromTable} on your
+ L{WorkItem} subclass. This column must be unique, and it must be an
+ integer. In almost all cases, this column really ought to be filled
+ out by a database-defined sequence; if not, you need some other
+ mechanism for establishing a cluster-wide sequence.
+ @type workID: L{int} on instance,
+ L{twext.enterprise.dal.syntax.ColumnSyntax} on class.
+
+ @cvar notBefore: the timestamp before which this item should I{not} be
+ processed. If unspecified, this should be the date and time of the
+ creation of the L{WorkItem}.
+ @type notBefore: L{datetime.datetime} on instance,
+ L{twext.enterprise.dal.syntax.ColumnSyntax} on class.
+
+ @ivar group: If not C{None}, a unique-to-the-database identifier for which
+ only one L{WorkItem} will execute at a time.
+ @type group: L{unicode} or L{NoneType}
+ """
+
+ group = None
+ priority = WORK_PRIORITY_LOW # Default - subclasses should override
+
+
+ @classmethod
+ @inlineCallbacks
+ def makeJob(cls, transaction, **kwargs):
+ """
+ A new work item needs to be created. First we create a Job record, then we create
+ the actual work item related to the job.
+
+ @param transaction: the transaction to use
+ @type transaction: L{IAsyncTransaction}
+ """
+
+ jobargs = {
+ "workType": cls.table.model.name
+ }
+ def _transferArg(name):
+ if name in kwargs:
+ jobargs[name] = kwargs[name]
+ del kwargs[name]
+
+ _transferArg("jobID")
+ if "priority" in kwargs:
+ _transferArg("priority")
+ else:
+ jobargs["priority"] = cls.priority
+ _transferArg("weight")
+ _transferArg("notBefore")
+ _transferArg("notAfter")
+
+ job = yield JobItem.create(transaction, **jobargs)
+
+ kwargs["jobID"] = job.jobID
+ work = yield cls.create(transaction, **kwargs)
+ work.__dict__["job"] = job
+ returnValue(work)
+
+
+ @classmethod
+ @inlineCallbacks
+ def loadForJob(cls, txn, jobID):
+ workItems = yield cls.query(txn, (cls.jobID == jobID))
+ returnValue(workItems)
+
+
+ def doWork(self):
+ """
+ Subclasses must implement this to actually perform the queued work.
+
+ This method will be invoked in a worker process.
+
+ This method does I{not} need to delete the row referencing it; that
+ will be taken care of by the job queuing machinery.
+ """
+ raise NotImplementedError
+
+
+ @classmethod
+ def forTableName(cls, tableName):
+ """
+ Look up a work-item class given a particular table name. Factoring
+ this correctly may place it into L{twext.enterprise.record.Record}
+ instead; it is probably generally useful to be able to look up a mapped
+ class from a table.
+
+ @param tableName: the name of the table to look up
+ @type tableName: L{str}
+
+ @return: the relevant subclass
+ @rtype: L{type}
+ """
+ for subcls in cls.__subclasses__():
+ clstable = getattr(subcls, "table", None)
+ if tableName == clstable.model.name:
+ return subcls
+ raise KeyError("No mapped {0} class for {1}.".format(
+ cls, tableName
+ ))
+
+
+
+class PerformJob(Command):
+ """
+ Notify another process that it must do a job that has been persisted to
+ the database, by informing it of the job ID.
+ """
+
+ arguments = [
+ ("jobID", Integer()),
+ ]
+ response = []
+
+
+
+class ReportLoad(Command):
+ """
+ Notify another node of the total, current load for this whole node (all of
+ its workers).
+ """
+ arguments = [
+ ("load", Integer())
+ ]
+ response = []
+
+
+
+class IdentifyNode(Command):
+ """
+ Identify this node to its peer. The connector knows which hostname it's
+ looking for, and which hostname it considers itself to be, only the
+ initiator (not the listener) issues this command. This command is
+ necessary because we don't want to rely on DNS; if reverse DNS weren't set
+ up perfectly, the listener would not be able to identify its peer, and it
+ is easier to modify local configuration so that L{socket.getfqdn} returns
+ the right value than to ensure that DNS does.
+ """
+
+ arguments = [
+ ("host", String()),
+ ("port", Integer()),
+ ]
+
+
+
+class ConnectionFromPeerNode(AMP):
+ """
+ A connection to a peer node. Symmetric; since the "client" and the
+ "server" both serve the same role, the logic is the same in every node.
+
+ @ivar localWorkerPool: the pool of local worker processes that can process
+ queue work.
+ @type localWorkerPool: L{WorkerConnectionPool}
+
+ @ivar _reportedLoad: The number of outstanding requests being processed by
+ the peer of this connection, from all requestors (both the host of this
+ connection and others), as last reported by the most recent
+ L{ReportLoad} message received from the peer.
+ @type _reportedLoad: L{int}
+
+ @ivar _bonusLoad: The number of additional outstanding requests being
+ processed by the peer of this connection; the number of requests made
+ by the host of this connection since the last L{ReportLoad} message.
+ @type _bonusLoad: L{int}
+ """
+ implements(_IJobPerformer)
+
+ def __init__(self, peerPool, boxReceiver=None, locator=None):
+ """
+ Initialize this L{ConnectionFromPeerNode} with a reference to a
+ L{PeerConnectionPool}, as well as required initialization arguments for
+ L{AMP}.
+
+ @param peerPool: the connection pool within which this
+ L{ConnectionFromPeerNode} is a participant.
+ @type peerPool: L{PeerConnectionPool}
+
+ @see: L{AMP.__init__}
+ """
+ self.peerPool = peerPool
+ self._bonusLoad = 0
+ self._reportedLoad = 0
+ super(ConnectionFromPeerNode, self).__init__(
+ boxReceiver, locator
+ )
+
+
+ def reportCurrentLoad(self):
+ """
+ Report the current load for the local worker pool to this peer.
+ """
+ return self.callRemote(ReportLoad, load=self.totalLoad())
+
+
+ @ReportLoad.responder
+ def reportedLoad(self, load):
+ """
+ The peer reports its load.
+ """
+ self._reportedLoad = (load - self._bonusLoad)
+ return {}
+
+
+ def startReceivingBoxes(self, sender):
+ """
+ Connection is up and running; add this to the list of active peers.
+ """
+ r = super(ConnectionFromPeerNode, self).startReceivingBoxes(sender)
+ self.peerPool.addPeerConnection(self)
+ return r
+
+
+ def stopReceivingBoxes(self, reason):
+ """
+ The connection has shut down; remove this from the list of active
+ peers.
+ """
+ self.peerPool.removePeerConnection(self)
+ r = super(ConnectionFromPeerNode, self).stopReceivingBoxes(reason)
+ return r
+
+
+ def currentLoadEstimate(self):
+ """
+ What is the current load estimate for this peer?
+
+ @return: The number of full "slots", i.e. currently-being-processed
+ queue items (and other items which may contribute to this process's
+ load, such as currently-being-processed client requests).
+ @rtype: L{int}
+ """
+ return self._reportedLoad + self._bonusLoad
+
+
+ def performJob(self, jobID):
+ """
+ A L{local worker connection <ConnectionFromWorker>} is asking this
+ specific peer node-controller process to perform a job, having
+ already determined that it's appropriate.
+
+ @see: L{_IJobPerformer.performJob}
+ """
+ d = self.callRemote(PerformJob, jobID=jobID)
+ self._bonusLoad += 1
+
+ @d.addBoth
+ def performed(result):
+ self._bonusLoad -= 1
+ return result
+
+ @d.addCallback
+ def success(result):
+ return None
+
+ return d
+
+
+ @PerformJob.responder
+ def dispatchToWorker(self, jobID):
+ """
+ A remote peer node has asked this node to do a job; dispatch it to
+ a local worker on this node.
+
+ @param jobID: the identifier of the job.
+ @type jobID: L{int}
+
+ @return: a L{Deferred} that fires when the work has been completed.
+ """
+ d = self.peerPool.performJobForPeer(jobID)
+ d.addCallback(lambda ignored: {})
+ return d
+
+
+ @IdentifyNode.responder
+ def identifyPeer(self, host, port):
+ self.peerPool.mapPeer(host, port, self)
+ return {}
+
+
+
+class WorkerConnectionPool(object):
+ """
+ A pool of L{ConnectionFromWorker}s.
+
+ L{WorkerConnectionPool} also implements the same implicit protocol as a
+ L{ConnectionFromPeerNode}, but one that dispenses work to the local worker
+ processes rather than to a remote connection pool.
+ """
+ implements(_IJobPerformer)
+
+ def __init__(self, maximumLoadPerWorker=5):
+ self.workers = []
+ self.maximumLoadPerWorker = maximumLoadPerWorker
+
+
+ def addWorker(self, worker):
+ """
+ Add a L{ConnectionFromWorker} to this L{WorkerConnectionPool} so that
+ it can be selected.
+ """
+ self.workers.append(worker)
+
+
+ def removeWorker(self, worker):
+ """
+ Remove a L{ConnectionFromWorker} from this L{WorkerConnectionPool} that
+ was previously added.
+ """
+ self.workers.remove(worker)
+
+
+ def hasAvailableCapacity(self):
+ """
+ Does this worker connection pool have any local workers who have spare
+ hasAvailableCapacity to process another queue item?
+ """
+ for worker in self.workers:
+ if worker.currentLoad < self.maximumLoadPerWorker:
+ return True
+ return False
+
+
+ def allWorkerLoad(self):
+ """
+ The total load of all currently connected workers.
+ """
+ return sum(worker.currentLoad for worker in self.workers)
+
+
+ def _selectLowestLoadWorker(self):
+ """
+ Select the local connection with the lowest current load, or C{None} if
+ all workers are too busy.
+
+ @return: a worker connection with the lowest current load.
+ @rtype: L{ConnectionFromWorker}
+ """
+ return sorted(self.workers[:], key=lambda w: w.currentLoad)[0]
+
+
+ def performJob(self, jobID):
+ """
+ Select a local worker that is idle enough to perform the given job,
+ then ask them to perform it.
+
+ @param jobID: The primary key identifier of the given job.
+ @type jobID: L{int}
+
+ @return: a L{Deferred} firing with an empty dictionary when the work is
+ complete.
+ @rtype: L{Deferred} firing L{dict}
+ """
+ preferredWorker = self._selectLowestLoadWorker()
+ result = preferredWorker.performJob(jobID)
+ return result
+
+
+
+class ConnectionFromWorker(AMP):
+ """
+ An individual connection from a worker, as seen from the master's
+ perspective. L{ConnectionFromWorker}s go into a L{WorkerConnectionPool}.
+ """
+
+ def __init__(self, peerPool, boxReceiver=None, locator=None):
+ super(ConnectionFromWorker, self).__init__(boxReceiver, locator)
+ self.peerPool = peerPool
+ self._load = 0
+
+
+ @property
+ def currentLoad(self):
+ """
+ What is the current load of this worker?
+ """
+ return self._load
+
+
+ def startReceivingBoxes(self, sender):
+ """
+ Start receiving AMP boxes from the peer. Initialize all necessary
+ state.
+ """
+ result = super(ConnectionFromWorker, self).startReceivingBoxes(sender)
+ self.peerPool.workerPool.addWorker(self)
+ return result
+
+
+ def stopReceivingBoxes(self, reason):
+ """
+ AMP boxes will no longer be received.
+ """
+ result = super(ConnectionFromWorker, self).stopReceivingBoxes(reason)
+ self.peerPool.workerPool.removeWorker(self)
+ return result
+
+
+ @PerformJob.responder
+ def performJob(self, jobID):
+ """
+ Dispatch a job to this worker.
+
+ @see: The responder for this should always be
+ L{ConnectionFromController.actuallyReallyExecuteJobHere}.
+ """
+ d = self.callRemote(PerformJob, jobID=jobID)
+ self._load += 1
+
+ @d.addBoth
+ def f(result):
+ self._load -= 1
+ return result
+
+ return d
+
+
+
+class ConnectionFromController(AMP):
+ """
+ A L{ConnectionFromController} is the connection to a node-controller
+ process, in a worker process. It processes requests from its own
+ controller to do work. It is the opposite end of the connection from
+ L{ConnectionFromWorker}.
+ """
+ implements(IQueuer)
+
+ def __init__(self, transactionFactory, whenConnected,
+ boxReceiver=None, locator=None):
+ super(ConnectionFromController, self).__init__(boxReceiver, locator)
+ self.transactionFactory = transactionFactory
+ self.whenConnected = whenConnected
+ # FIXME: Glyph it appears WorkProposal expects this to have reactor...
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+
+ def startReceivingBoxes(self, sender):
+ super(ConnectionFromController, self).startReceivingBoxes(sender)
+ self.whenConnected(self)
+
+
+ def choosePerformer(self):
+ """
+ To conform with L{WorkProposal}'s expectations, which may run in either
+ a controller (against a L{PeerConnectionPool}) or in a worker (against
+ a L{ConnectionFromController}), this is implemented to always return
+ C{self}, since C{self} is also an object that has a C{performJob}
+ method.
+ """
+ return self
+
+
+ def performJob(self, jobID):
+ """
+ Ask the controller to perform a job on our behalf.
+ """
+ return self.callRemote(PerformJob, jobID=jobID)
+
+
+ @inlineCallbacks
+ def enqueueWork(self, txn, workItemType, **kw):
+ """
+ There is some work to do. Do it, ideally someplace else, ideally in
+ parallel. Later, let the caller know that the work has been completed
+ by firing a L{Deferred}.
+
+ @param workItemType: The type of work item to be enqueued.
+ @type workItemType: A subtype of L{WorkItem}
+
+ @param kw: The parameters to construct a work item.
+ @type kw: keyword parameters to C{workItemType.create}, i.e.
+ C{workItemType.__init__}
+
+ @return: an object that can track the enqueuing and remote execution of
+ this work.
+ @rtype: L{WorkProposal}
+ """
+ wp = WorkProposal(self, txn, workItemType, kw)
+ yield wp._start()
+ returnValue(wp)
+
+
+ @PerformJob.responder
+ def actuallyReallyExecuteJobHere(self, jobID):
+ """
+ This is where it's time to actually do the job. The controller
+ process has instructed this worker to do it; so, look up the data in
+ the row, and do it.
+ """
+ d = ultimatelyPerform(self.transactionFactory, jobID)
+ d.addCallback(lambda ignored: {})
+ return d
+
+
+
+def ultimatelyPerform(txnFactory, jobID):
+ """
+ Eventually, after routing the job to the appropriate place, somebody
+ actually has to I{do} it.
+
+ @param txnFactory: a 0- or 1-argument callable that creates an
+ L{IAsyncTransaction}
+ @type txnFactory: L{callable}
+
+ @param jobID: the ID of the job to be performed
+ @type jobID: L{int}
+
+ @return: a L{Deferred} which fires with C{None} when the job has been
+ performed, or fails if the job can't be performed.
+ """
+ @inlineCallbacks
+ def runJob(txn):
+ try:
+ job = yield JobItem.load(txn, jobID)
+ yield job.run()
+ except NoSuchRecord:
+ # The record has already been removed
+ pass
+
+ return inTransaction(txnFactory, runJob)
+
+
+
+class LocalPerformer(object):
+ """
+ Implementor of C{performJob} that does its work in the local process,
+ regardless of other conditions.
+ """
+ implements(_IJobPerformer)
+
+ def __init__(self, txnFactory):
+ """
+ Create this L{LocalPerformer} with a transaction factory.
+ """
+ self.txnFactory = txnFactory
+
+
+ def performJob(self, jobID):
+ """
+ Perform the given job right now.
+ """
+ return ultimatelyPerform(self.txnFactory, jobID)
+
+
+
+class WorkerFactory(Factory, object):
+ """
+ Factory, to be used as the client to connect from the worker to the
+ controller.
+ """
+
+ def __init__(self, transactionFactory, whenConnected):
+ """
+ Create a L{WorkerFactory} with a transaction factory and a schema.
+ """
+ self.transactionFactory = transactionFactory
+ self.whenConnected = whenConnected
+
+
+ def buildProtocol(self, addr):
+ """
+ Create a L{ConnectionFromController} connected to the
+ transactionFactory and store.
+ """
+ return ConnectionFromController(self.transactionFactory, self.whenConnected)
+
+
+
+class TransactionFailed(Exception):
+ """
+ A transaction failed.
+ """
+
+
+
+def _cloneDeferred(d):
+ """
+ Make a new Deferred, adding callbacks to C{d}.
+
+ @return: another L{Deferred} that fires with C{d's} result when C{d} fires.
+ @rtype: L{Deferred}
+ """
+ d2 = Deferred()
+ d.chainDeferred(d2)
+ return d2
+
+
+
+class WorkProposal(object):
+ """
+ A L{WorkProposal} is a proposal for work that will be executed, perhaps on
+ another node, perhaps in the future.
+
+ @ivar _chooser: The object which will choose where the work in this
+ proposal gets performed. This must have both a C{choosePerformer}
+ method and a C{reactor} attribute, providing an L{IReactorTime}.
+ @type _chooser: L{PeerConnectionPool} or L{LocalQueuer}
+
+ @ivar txn: The transaction where the work will be enqueued.
+ @type txn: L{IAsyncTransaction}
+
+ @ivar workItemType: The type of work to be enqueued by this L{WorkProposal}
+ @type workItemType: L{WorkItem} subclass
+
+ @ivar kw: The keyword arguments to pass to C{self.workItemType.create} to
+ construct it.
+ @type kw: L{dict}
+ """
+
+ def __init__(self, chooser, txn, workItemType, kw):
+ self._chooser = chooser
+ self.txn = txn
+ self.workItemType = workItemType
+ self.kw = kw
+ self._whenExecuted = Deferred()
+ self._whenCommitted = Deferred()
+ self.workItem = None
+
+
+ @inlineCallbacks
+ def _start(self):
+ """
+ Execute this L{WorkProposal} by creating the work item in the database,
+ waiting for the transaction where that addition was completed to
+ commit, and asking the local node controller process to do the work.
+ """
+ try:
+ created = yield self.workItemType.makeJob(self.txn, **self.kw)
+ except Exception:
+ self._whenCommitted.errback(TransactionFailed)
+ raise
+ else:
+ self.workItem = created
+
+ @self.txn.postCommit
+ def whenDone():
+ self._whenCommitted.callback(self)
+
+ def maybeLater():
+ performer = self._chooser.choosePerformer()
+
+ @passthru(
+ performer.performJob(created.jobID).addCallback
+ )
+ def performed(result):
+ self._whenExecuted.callback(self)
+
+ @performed.addErrback
+ def notPerformed(why):
+ self._whenExecuted.errback(why)
+
+ reactor = self._chooser.reactor
+ when = max(0, astimestamp(created.job.notBefore) - reactor.seconds()) if created.job.notBefore is not None else 0
+ # TODO: Track the returned DelayedCall so it can be stopped
+ # when the service stops.
+ self._chooser.reactor.callLater(when, maybeLater)
+
+ @self.txn.postAbort
+ def whenFailed():
+ self._whenCommitted.errback(TransactionFailed)
+
+
+ def whenExecuted(self):
+ """
+ Let the caller know when the proposed work has been fully executed.
+
+ @note: The L{Deferred} returned by C{whenExecuted} should be used with
+ extreme caution. If an application decides to do any
+ database-persistent work as a result of this L{Deferred} firing,
+ that work I{may be lost} as a result of a service being normally
+ shut down between the time that the work is scheduled and the time
+ that it is executed. So, the only things that should be added as
+ callbacks to this L{Deferred} are those which are ephemeral, in
+ memory, and reflect only presentation state associated with the
+ user's perception of the completion of work, not logical chains of
+ work which need to be completed in sequence; those should all be
+ completed within the transaction of the L{WorkItem.doWork} that
+ gets executed.
+
+ @return: a L{Deferred} that fires with this L{WorkProposal} when the
+ work has been completed remotely.
+ """
+ return _cloneDeferred(self._whenExecuted)
+
+
+ def whenProposed(self):
+ """
+ Let the caller know when the work has been proposed; i.e. when the work
+ is first transmitted to the database.
+
+ @return: a L{Deferred} that fires with this L{WorkProposal} when the
+ relevant commands have been sent to the database to create the
+ L{WorkItem}, and fails if those commands do not succeed for some
+ reason.
+ """
+ return succeed(self)
+
+
+ def whenCommitted(self):
+ """
+ Let the caller know when the work has been committed to; i.e. when the
+ transaction where the work was proposed has been committed to the
+ database.
+
+ @return: a L{Deferred} that fires with this L{WorkProposal} when the
+ relevant transaction has been committed, or fails if the
+ transaction is not committed for any reason.
+ """
+ return _cloneDeferred(self._whenCommitted)
+
+
+
+class _BaseQueuer(object):
+ implements(IQueuer)
+
+ def __init__(self):
+ super(_BaseQueuer, self).__init__()
+ self.proposalCallbacks = set()
+
+
+ def callWithNewProposals(self, callback):
+ self.proposalCallbacks.add(callback)
+
+
+ def transferProposalCallbacks(self, newQueuer):
+ newQueuer.proposalCallbacks = self.proposalCallbacks
+ return newQueuer
+
+
+ @inlineCallbacks
+ def enqueueWork(self, txn, workItemType, **kw):
+ """
+ There is some work to do. Do it, someplace else, ideally in parallel.
+ Later, let the caller know that the work has been completed by firing a
+ L{Deferred}.
+
+ @param workItemType: The type of work item to be enqueued.
+ @type workItemType: A subtype of L{WorkItem}
+
+ @param kw: The parameters to construct a work item.
+ @type kw: keyword parameters to C{workItemType.create}, i.e.
+ C{workItemType.__init__}
+
+ @return: an object that can track the enqueuing and remote execution of
+ this work.
+ @rtype: L{WorkProposal}
+ """
+ wp = WorkProposal(self, txn, workItemType, kw)
+ yield wp._start()
+ for callback in self.proposalCallbacks:
+ callback(wp)
+ returnValue(wp)
+
+
+
+class PeerConnectionPool(_BaseQueuer, MultiService, object):
+ """
+ Each node has a L{PeerConnectionPool} connecting it to all the other nodes
+ currently active on the same database.
+
+ @ivar hostname: The hostname where this node process is running, as
+ reported by the local host's configuration. Possibly this should be
+ obtained via C{config.ServerHostName} instead of C{socket.getfqdn()};
+ although hosts within a cluster may be configured with the same
+ C{ServerHostName}; TODO need to confirm.
+ @type hostname: L{bytes}
+
+ @ivar thisProcess: a L{NodeInfo} representing this process, which is
+ initialized when this L{PeerConnectionPool} service is started via
+ C{startService}. May be C{None} if this service is not fully started
+ up or if it is shutting down.
+ @type thisProcess: L{NodeInfo}
+
+ @ivar queueProcessTimeout: The amount of time after a L{WorkItem} is
+ scheduled to be processed (its C{notBefore} attribute) that it is
+ considered to be "orphaned" and will be run by a lost-work check rather
+ than waiting for it to be requested. By default, 10 minutes.
+ @type queueProcessTimeout: L{float} (in seconds)
+
+ @ivar queueDelayedProcessInterval: The amount of time between database
+ pings, i.e. checks for over-due queue items that might have been
+ orphaned by a controller process that died mid-transaction. This is
+ how often the shared database should be pinged by I{all} nodes (i.e.,
+ all controller processes, or each instance of L{PeerConnectionPool});
+ each individual node will ping commensurately less often as more nodes
+ join the database.
+ @type queueDelayedProcessInterval: L{float} (in seconds)
+
+ @ivar reactor: The reactor used for scheduling timed events.
+ @type reactor: L{IReactorTime} provider.
+
+ @ivar peers: The list of currently connected peers.
+ @type peers: L{list} of L{PeerConnectionPool}
+ """
+ implements(IQueuer)
+
+ from socket import getfqdn
+ from os import getpid
+ getfqdn = staticmethod(getfqdn)
+ getpid = staticmethod(getpid)
+
+ queueProcessTimeout = (10.0 * 60.0)
+ queueDelayedProcessInterval = (60.0)
+
+ def __init__(self, reactor, transactionFactory, ampPort):
+ """
+ Initialize a L{PeerConnectionPool}.
+
+ @param ampPort: The AMP TCP port number to listen on for inter-host
+ communication. This must be an integer (and not, say, an endpoint,
+ or an endpoint description) because we need to communicate it to
+ the other peers in the cluster in a way that will be meaningful to
+ them as clients.
+ @type ampPort: L{int}
+
+ @param transactionFactory: a 0- or 1-argument callable that produces an
+ L{IAsyncTransaction}
+ """
+ super(PeerConnectionPool, self).__init__()
+ self.reactor = reactor
+ self.transactionFactory = transactionFactory
+ self.hostname = self.getfqdn()
+ self.pid = self.getpid()
+ self.ampPort = ampPort
+ self.thisProcess = None
+ self.workerPool = WorkerConnectionPool()
+ self.peers = []
+ self.mappedPeers = {}
+ self._startingUp = None
+ self._listeningPort = None
+ self._lastSeenTotalNodes = 1
+ self._lastSeenNodeIndex = 1
+
+
+ def addPeerConnection(self, peer):
+ """
+ Add a L{ConnectionFromPeerNode} to the active list of peers.
+ """
+ self.peers.append(peer)
+
+
+ def totalLoad(self):
+ return self.workerPool.allWorkerLoad()
+
+
+ def workerListenerFactory(self):
+ """
+ Factory that listens for connections from workers.
+ """
+ f = Factory()
+ f.buildProtocol = lambda addr: ConnectionFromWorker(self)
+ return f
+
+
+ def removePeerConnection(self, peer):
+ """
+ Remove a L{ConnectionFromPeerNode} to the active list of peers.
+ """
+ self.peers.remove(peer)
+
+
+ def choosePerformer(self, onlyLocally=False):
+ """
+ Choose a peer to distribute work to based on the current known slot
+ occupancy of the other nodes. Note that this will prefer distributing
+ work to local workers until the current node is full, because that
+ should be lower-latency. Also, if no peers are available, work will be
+ submitted locally even if the worker pool is already over-subscribed.
+
+ @return: the chosen peer.
+ @rtype: L{_IJobPerformer} L{ConnectionFromPeerNode} or
+ L{WorkerConnectionPool}
+ """
+ if self.workerPool.hasAvailableCapacity():
+ return self.workerPool
+
+ if self.peers and not onlyLocally:
+ return sorted(self.peers, key=lambda p: p.currentLoadEstimate())[0]
+ else:
+ return LocalPerformer(self.transactionFactory)
+
+
+ def performJobForPeer(self, jobID):
+ """
+ A peer has requested us to perform a job; choose a job performer
+ local to this node, and then execute it.
+ """
+ performer = self.choosePerformer(onlyLocally=True)
+ return performer.performJob(jobID)
+
+
+ def totalNumberOfNodes(self):
+ """
+ How many nodes are there, total?
+
+ @return: the maximum number of other L{PeerConnectionPool} instances
+ that may be connected to the database described by
+ C{self.transactionFactory}. Note that this is not the current
+ count by connectivity, but the count according to the database.
+ @rtype: L{int}
+ """
+ # TODO
+ return self._lastSeenTotalNodes
+
+
+ def nodeIndex(self):
+ """
+ What ordinal does this node, i.e. this instance of
+ L{PeerConnectionPool}, occupy within the ordered set of all nodes
+ connected to the database described by C{self.transactionFactory}?
+
+ @return: the index of this node within the total collection. For
+ example, if this L{PeerConnectionPool} is 6 out of 30, this method
+ will return C{6}.
+ @rtype: L{int}
+ """
+ # TODO
+ return self._lastSeenNodeIndex
+
+
+ def _periodicLostWorkCheck(self):
+ """
+ Periodically, every node controller has to check to make sure that work
+ hasn't been dropped on the floor by someone. In order to do that it
+ queries each work-item table.
+ """
+ @inlineCallbacks
+ def workCheck(txn):
+ if self.thisProcess:
+ nodes = [(node.hostname, node.port) for node in
+ (yield self.activeNodes(txn))]
+ nodes.sort()
+ self._lastSeenTotalNodes = len(nodes)
+ self._lastSeenNodeIndex = nodes.index(
+ (self.thisProcess.hostname, self.thisProcess.port)
+ )
+
+ # TODO: here is where we should iterate over the unlocked items that
+ # are due, ordered by priority, notBefore etc
+ tooLate = datetime.utcfromtimestamp(
+ self.reactor.seconds() - self.queueProcessTimeout
+ )
+ overdueItems = (yield JobItem.query(
+ txn, (JobItem.notBefore < tooLate))
+ )
+ for overdueItem in overdueItems:
+ peer = self.choosePerformer()
+ yield peer.performJob(overdueItem.jobID)
+
+ return inTransaction(self.transactionFactory, workCheck)
+
+ _currentWorkDeferred = None
+ _lostWorkCheckCall = None
+
+ def _lostWorkCheckLoop(self):
+ """
+ While the service is running, keep checking for any overdue / lost work
+ items and re-submit them to the cluster for processing. Space out
+ those checks in time based on the size of the cluster.
+ """
+ self._lostWorkCheckCall = None
+
+ @passthru(
+ self._periodicLostWorkCheck().addErrback(log.err).addCallback
+ )
+ def scheduleNext(result):
+ self._currentWorkDeferred = None
+ if not self.running:
+ return
+ index = self.nodeIndex()
+ now = self.reactor.seconds()
+
+ interval = self.queueDelayedProcessInterval
+ count = self.totalNumberOfNodes()
+ when = (now - (now % interval)) + (interval * (count + index))
+ delay = when - now
+ self._lostWorkCheckCall = self.reactor.callLater(
+ delay, self._lostWorkCheckLoop
+ )
+
+ self._currentWorkDeferred = scheduleNext
+
+
+ def startService(self):
+ """
+ Register ourselves with the database and establish all outgoing
+ connections to other servers in the cluster.
+ """
+ @inlineCallbacks
+ def startup(txn):
+ endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
+ # If this fails, the failure mode is going to be ugly, just like
+ # all conflicted-port failures. But, at least it won't proceed.
+ self._listeningPort = yield endpoint.listen(self.peerFactory())
+ self.ampPort = self._listeningPort.getHost().port
+ yield Lock.exclusive(NodeInfo.table).on(txn)
+ nodes = yield self.activeNodes(txn)
+ selves = [node for node in nodes
+ if ((node.hostname == self.hostname) and
+ (node.port == self.ampPort))]
+ if selves:
+ self.thisProcess = selves[0]
+ nodes.remove(self.thisProcess)
+ yield self.thisProcess.update(pid=self.pid,
+ time=datetime.now())
+ else:
+ self.thisProcess = yield NodeInfo.create(
+ txn, hostname=self.hostname, port=self.ampPort,
+ pid=self.pid, time=datetime.now()
+ )
+
+ for node in nodes:
+ self._startConnectingTo(node)
+
+ self._startingUp = inTransaction(self.transactionFactory, startup)
+
+ @self._startingUp.addBoth
+ def done(result):
+ self._startingUp = None
+ super(PeerConnectionPool, self).startService()
+ self._lostWorkCheckLoop()
+ return result
+
+
+ @inlineCallbacks
+ def stopService(self):
+ """
+ Stop this service, terminating any incoming or outgoing connections.
+ """
+ yield super(PeerConnectionPool, self).stopService()
+
+ if self._startingUp is not None:
+ yield self._startingUp
+
+ if self._listeningPort is not None:
+ yield self._listeningPort.stopListening()
+
+ if self._lostWorkCheckCall is not None:
+ self._lostWorkCheckCall.cancel()
+
+ if self._currentWorkDeferred is not None:
+ yield self._currentWorkDeferred
+
+ for peer in self.peers:
+ peer.transport.abortConnection()
+
+
+ def activeNodes(self, txn):
+ """
+ Load information about all other nodes.
+ """
+ return NodeInfo.all(txn)
+
+
+ def mapPeer(self, host, port, peer):
+ """
+ A peer has been identified as belonging to the given host/port
+ combination. Disconnect any other peer that claims to be connected for
+ the same peer.
+ """
+ # if (host, port) in self.mappedPeers:
+ # TODO: think about this for race conditions
+ # self.mappedPeers.pop((host, port)).transport.loseConnection()
+ self.mappedPeers[(host, port)] = peer
+
+
+ def _startConnectingTo(self, node):
+ """
+ Start an outgoing connection to another master process.
+
+ @param node: a description of the master to connect to.
+ @type node: L{NodeInfo}
+ """
+ connected = node.endpoint(self.reactor).connect(self.peerFactory())
+
+ def whenConnected(proto):
+ self.mapPeer(node.hostname, node.port, proto)
+ proto.callRemote(
+ IdentifyNode,
+ host=self.thisProcess.hostname,
+ port=self.thisProcess.port
+ ).addErrback(noted, "identify")
+
+ def noted(err, x="connect"):
+ log.msg(
+ "Could not {0} to cluster peer {1} because {2}"
+ .format(x, node, str(err.value))
+ )
+
+ connected.addCallbacks(whenConnected, noted)
+
+
+ def peerFactory(self):
+ """
+ Factory for peer connections.
+
+ @return: a L{Factory} that will produce L{ConnectionFromPeerNode}
+ protocols attached to this L{PeerConnectionPool}.
+ """
+ return _PeerPoolFactory(self)
+
+
+
+class _PeerPoolFactory(Factory, object):
+ """
+ Protocol factory responsible for creating L{ConnectionFromPeerNode}
+ connections, both client and server.
+ """
+
+ def __init__(self, peerConnectionPool):
+ self.peerConnectionPool = peerConnectionPool
+
+
+ def buildProtocol(self, addr):
+ return ConnectionFromPeerNode(self.peerConnectionPool)
+
+
+
+class LocalQueuer(_BaseQueuer):
+ """
+ When work is enqueued with this queuer, it is just executed locally.
+ """
+ implements(IQueuer)
+
+ def __init__(self, txnFactory, reactor=None):
+ super(LocalQueuer, self).__init__()
+ self.txnFactory = txnFactory
+ if reactor is None:
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+
+ def choosePerformer(self):
+ """
+ Choose to perform the work locally.
+ """
+ return LocalPerformer(self.txnFactory)
+
+
+
+class NonPerformer(object):
+ """
+ Implementor of C{performJob} that doesn't actual perform any work. This
+ is used in the case where you want to be able to enqueue work for someone
+ else to do, but not take on any work yourself (such as a command line
+ tool).
+ """
+ implements(_IJobPerformer)
+
+ def performJob(self, jobID):
+ """
+ Don't perform job.
+ """
+ return succeed(None)
+
+
+
+class NonPerformingQueuer(_BaseQueuer):
+ """
+ When work is enqueued with this queuer, it is never executed locally.
+ It's expected that the polling machinery will find the work and perform it.
+ """
+ implements(IQueuer)
+
+ def __init__(self, reactor=None):
+ super(NonPerformingQueuer, self).__init__()
+ if reactor is None:
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+
+ def choosePerformer(self):
+ """
+ Choose to perform the work locally.
+ """
+ return NonPerformer()
Modified: twext/branches/users/cdaboo/jobs/twext/enterprise/test/test_adbapi2.py
===================================================================
--- twext/branches/users/cdaboo/jobs/twext/enterprise/test/test_adbapi2.py 2014-03-01 15:17:14 UTC (rev 12778)
+++ twext/branches/users/cdaboo/jobs/twext/enterprise/test/test_adbapi2.py 2014-03-01 15:42:17 UTC (rev 12779)
@@ -192,13 +192,13 @@
a = self.createTransaction()
alphaResult = self.resultOf(a.execSQL("alpha"))
- [[counter, echo]] = alphaResult[0]
+ [[_ignore_counter, _ignore_echo]] = alphaResult[0]
b = self.createTransaction()
# "b" should have opened a connection.
self.assertEquals(len(self.factory.connections), 2)
betaResult = self.resultOf(b.execSQL("beta"))
- [[bcounter, becho]] = betaResult[0]
+ [[bcounter, _ignore_becho]] = betaResult[0]
# both "a" and "b" are holding open a connection now; let's try to open
# a third one. (The ordering will be deterministic even if this fails,
@@ -214,13 +214,13 @@
commitResult = self.resultOf(b.commit())
# Now that "b" has committed, "c" should be able to complete.
- [[ccounter, cecho]] = gammaResult[0]
+ [[ccounter, _ignore_cecho]] = gammaResult[0]
# The connection for "a" ought to still be busy, so let's make sure
# we're using the one for "c".
self.assertEquals(ccounter, bcounter)
- # Sanity check: the commit should have succeded!
+ # Sanity check: the commit should have succeeded!
self.assertEquals(commitResult, [None])
@@ -231,7 +231,7 @@
"""
a = self.createTransaction()
alphaResult = self.resultOf(a.execSQL("alpha"))
- [[[counter, echo]]] = alphaResult
+ [[[_ignore_counter, _ignore_echo]]] = alphaResult
self.assertEquals(len(self.factory.connections), 1)
self.assertEquals(len(self.holders), 1)
[holder] = self.holders
@@ -452,7 +452,7 @@
for txn in txns:
# Make sure rollback will actually be executed.
results = self.resultOf(txn.execSQL("maybe change something!"))
- [[[counter, echo]]] = results
+ [[[_ignore_counter, echo]]] = results
self.assertEquals("maybe change something!", echo)
# Fail one (and only one) call to rollback().
self.factory.rollbackFail = True
@@ -483,7 +483,7 @@
"""
active = []
# Use up the available connections ...
- for i in xrange(self.pool.maxConnections):
+ for _ignore in xrange(self.pool.maxConnections):
active.append(self.createTransaction())
# ... so that this one has to be spooled.
@@ -506,7 +506,7 @@
abortResult = self.resultOf(it.abort())
# steal it from the queue so we can do it out of order
- d, work = self.holders[0]._q.get()
+ d, _ignore_work = self.holders[0]._q.get()
# that should be the only work unit so don't continue if something else
# got in there
@@ -716,7 +716,7 @@
self.factory.connections[0].executeWillFail(CustomExecuteFailed)
results = self.resultOf(txn.execSQL("hello, world!"))
- [[[counter, echo]]] = results
+ [[[_ignore_counter, echo]]] = results
self.assertEquals("hello, world!", echo)
# Two execution attempts should have been made, one on each connection.
@@ -752,7 +752,7 @@
)
results = self.resultOf(txn.execSQL("hello, world!"))
txn.commit()
- [[[counter, echo]]] = results
+ [[[_ignore_counter, echo]]] = results
self.assertEquals("hello, world!", echo)
txn2 = self.createTransaction()
self.assertEquals(
@@ -767,7 +767,7 @@
self.factory.connections[0].executeWillFail(CustomExecFail)
results = self.resultOf(txn2.execSQL("second try!"))
txn2.commit()
- [[[counter, echo]]] = results
+ [[[_ignore_counter, echo]]] = results
self.assertEquals("second try!", echo)
self.assertEquals(len(self.flushLoggedErrors(CustomExecFail)), 1)
@@ -936,7 +936,7 @@
re-connection on the next try.
"""
txn = self.createTransaction()
- [[[counter, echo]]] = self.resultOf(txn.execSQL("hello, world!", []))
+ [[[_ignore_counter, _ignore_echo]]] = self.resultOf(txn.execSQL("hello, world!", []))
self.factory.connections[0].executeWillFail(ZeroDivisionError)
[f] = self.resultOf(txn.execSQL("divide by zero", []))
f.trap(self.translateError(ZeroDivisionError))
@@ -966,7 +966,7 @@
"""
txn = self.createTransaction()
results = self.resultOf(txn.execSQL("maybe change something!"))
- [[[counter, echo]]] = results
+ [[[_ignore_counter, echo]]] = results
self.assertEquals("maybe change something!", echo)
self.factory.rollbackFail = True
[x] = self.resultOf(txn.abort())
@@ -992,7 +992,7 @@
txn = self.createTransaction()
self.factory.commitFail = True
results = self.resultOf(txn.execSQL("maybe change something!"))
- [[[counter, echo]]] = results
+ [[[_ignore_counter, echo]]] = results
self.assertEquals("maybe change something!", echo)
[x] = self.resultOf(txn.commit())
x.trap(self.translateError(CommitFail))
@@ -1210,7 +1210,7 @@
r = self.resultOf(
txn.execSQL("some-rows", raiseOnZeroRowCount=RuntimeError)
)
- [[[counter, echo]]] = r
+ [[[_ignore_counter, echo]]] = r
self.assertEquals(echo, "some-rows")
@@ -1338,7 +1338,6 @@
interacting with each other.
"""
-
def setParamstyle(self, paramstyle):
"""
Change the paramstyle on both the pool and the client.
@@ -1366,6 +1365,7 @@
self.assertEquals(len(self.factory.connections), 1)
+
class HookableOperationTests(TestCase):
"""
Tests for L{_HookableOperation}.
Added: twext/branches/users/cdaboo/jobs/twext/enterprise/test/test_jobqueue.py
===================================================================
--- twext/branches/users/cdaboo/jobs/twext/enterprise/test/test_jobqueue.py (rev 0)
+++ twext/branches/users/cdaboo/jobs/twext/enterprise/test/test_jobqueue.py 2014-03-01 15:42:17 UTC (rev 12779)
@@ -0,0 +1,1015 @@
+##
+# Copyright (c) 2012-2014 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for L{twext.enterprise.job.queue}.
+"""
+
+import datetime
+
+from zope.interface.verify import verifyObject
+
+from twisted.trial.unittest import TestCase, SkipTest
+from twisted.test.proto_helpers import StringTransport, MemoryReactor
+from twisted.internet.defer import (
+ Deferred, inlineCallbacks, gatherResults, passthru, returnValue
+)
+from twisted.internet.task import Clock as _Clock
+from twisted.protocols.amp import Command, AMP, Integer
+from twisted.application.service import Service, MultiService
+
+from twext.enterprise.dal.syntax import SchemaSyntax, Select
+from twext.enterprise.dal.record import fromTable
+from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
+from twext.enterprise.fixtures import buildConnectionPool
+from twext.enterprise.fixtures import SteppablePoolHelper
+from twext.enterprise.jobqueue import (
+ inTransaction, PeerConnectionPool, astimestamp,
+ LocalPerformer, _IJobPerformer, WorkItem, WorkerConnectionPool,
+ ConnectionFromPeerNode, LocalQueuer,
+ _BaseQueuer, NonPerformingQueuer
+)
+import twext.enterprise.jobqueue
+
+# TODO: There should be a store-building utility within twext.enterprise.
+try:
+ from txdav.common.datastore.test.util import buildStore
+except ImportError:
+ def buildStore(*args, **kwargs):
+ raise SkipTest(
+ "buildStore is not available, because it's in txdav; duh."
+ )
+
+
+
+class Clock(_Clock):
+ """
+ More careful L{IReactorTime} fake which mimics the exception behavior of
+ the real reactor.
+ """
+
+ def callLater(self, _seconds, _f, *args, **kw):
+ if _seconds < 0:
+ raise ValueError("%s<0: " % (_seconds,))
+ return super(Clock, self).callLater(_seconds, _f, *args, **kw)
+
+
+
+class MemoryReactorWithClock(MemoryReactor, Clock):
+ """
+ Simulate a real reactor.
+ """
+ def __init__(self):
+ MemoryReactor.__init__(self)
+ Clock.__init__(self)
+
+
+
+def transactionally(transactionCreator):
+ """
+ Perform the decorated function immediately in a transaction, replacing its
+ name with a L{Deferred}.
+
+ Use like so::
+
+ @transactionally(connectionPool.connection)
+ @inlineCallbacks
+ def it(txn):
+ yield txn.doSomething()
+ it.addCallback(firedWhenDone)
+
+ @param transactionCreator: A 0-arg callable that returns an
+ L{IAsyncTransaction}.
+ """
+ def thunk(operation):
+ return inTransaction(transactionCreator, operation)
+ return thunk
+
+
+
+class UtilityTests(TestCase):
+ """
+ Tests for supporting utilities.
+ """
+
+ def test_inTransactionSuccess(self):
+ """
+ L{inTransaction} invokes its C{transactionCreator} argument, and then
+ returns a L{Deferred} which fires with the result of its C{operation}
+ argument when it succeeds.
+ """
+ class faketxn(object):
+ def __init__(self):
+ self.commits = []
+ self.aborts = []
+
+ def commit(self):
+ self.commits.append(Deferred())
+ return self.commits[-1]
+
+ def abort(self):
+ self.aborts.append(Deferred())
+ return self.aborts[-1]
+
+ createdTxns = []
+
+ def createTxn():
+ createdTxns.append(faketxn())
+ return createdTxns[-1]
+
+ dfrs = []
+
+ def operation(t):
+ self.assertIdentical(t, createdTxns[-1])
+ dfrs.append(Deferred())
+ return dfrs[-1]
+
+ d = inTransaction(createTxn, operation)
+ x = []
+ d.addCallback(x.append)
+ self.assertEquals(x, [])
+ self.assertEquals(len(dfrs), 1)
+ dfrs[0].callback(35)
+
+ # Commit in progress, so still no result...
+ self.assertEquals(x, [])
+ createdTxns[0].commits[0].callback(42)
+
+ # Committed, everything's done.
+ self.assertEquals(x, [35])
+
+
+
+class SimpleSchemaHelper(SchemaTestHelper):
+ def id(self):
+ return "worker"
+
+
+
+SQL = passthru
+
+nodeSchema = SQL(
+ """
+ create table NODE_INFO (
+ HOSTNAME varchar(255) not null,
+ PID integer not null,
+ PORT integer not null,
+ TIME timestamp default current_timestamp not null,
+ primary key (HOSTNAME, PORT)
+ );
+ """
+)
+
+jobSchema = SQL(
+ """
+ create table JOB (
+ JOB_ID integer primary key default 1,
+ WORK_TYPE varchar(255) not null,
+ PRIORITY integer default 0,
+ WEIGHT integer default 0,
+ NOT_BEFORE timestamp default null,
+ NOT_AFTER timestamp default null
+ );
+ """
+)
+
+schemaText = SQL(
+ """
+ create table DUMMY_WORK_ITEM (
+ WORK_ID integer primary key,
+ JOB_ID integer references JOB,
+ A integer, B integer,
+ DELETE_ON_LOAD integer default 0
+ );
+ create table DUMMY_WORK_DONE (
+ WORK_ID integer primary key,
+ JOB_ID integer references JOB,
+ A_PLUS_B integer
+ );
+ """
+)
+
+try:
+ schema = SchemaSyntax(SimpleSchemaHelper().schemaFromString(jobSchema + schemaText))
+
+ dropSQL = [
+ "drop table {name} cascade".format(name=table)
+ for table in ("DUMMY_WORK_ITEM", "DUMMY_WORK_DONE")
+ ] + ["delete from job"]
+except SkipTest as e:
+ DummyWorkDone = DummyWorkItem = object
+ skip = e
+else:
+ DummyWorkDone = fromTable(schema.DUMMY_WORK_DONE)
+ DummyWorkItem = fromTable(schema.DUMMY_WORK_ITEM)
+ skip = False
+
+
+
+class DummyWorkDone(WorkItem, DummyWorkDone):
+ """
+ Work result.
+ """
+
+
+
+class DummyWorkItem(WorkItem, DummyWorkItem):
+ """
+ Sample L{WorkItem} subclass that adds two integers together and stores them
+ in another table.
+ """
+
+ def doWork(self):
+ return DummyWorkDone.makeJob(
+ self.transaction, jobID=self.jobID + 100, workID=self.workID + 100, aPlusB=self.a + self.b
+ )
+
+
+ @classmethod
+ @inlineCallbacks
+ def loadForJob(cls, txn, *a):
+ """
+ Load L{DummyWorkItem} as normal... unless the loaded item has
+ C{DELETE_ON_LOAD} set, in which case, do a deletion of this same row in
+ a concurrent transaction, then commit it.
+ """
+ workItems = yield super(DummyWorkItem, cls).loadForJob(txn, *a)
+ if workItems[0].deleteOnLoad:
+ otherTransaction = txn.concurrently()
+ otherSelf = yield super(DummyWorkItem, cls).loadForJob(txn, *a)
+ yield otherSelf[0].delete()
+ yield otherTransaction.commit()
+ returnValue(workItems)
+
+
+
+class AMPTests(TestCase):
+ """
+ Tests for L{AMP} faithfully relaying ids across the wire.
+ """
+
+ def test_sendTableWithName(self):
+ """
+ You can send a reference to a table through a L{SchemaAMP} via
+ L{TableSyntaxByName}.
+ """
+ client = AMP()
+
+ class SampleCommand(Command):
+ arguments = [("id", Integer())]
+
+ class Receiver(AMP):
+ @SampleCommand.responder
+ def gotIt(self, id):
+ self.it = id
+ return {}
+
+ server = Receiver()
+ clientT = StringTransport()
+ serverT = StringTransport()
+ client.makeConnection(clientT)
+ server.makeConnection(serverT)
+ client.callRemote(SampleCommand, id=123)
+ server.dataReceived(clientT.io.getvalue())
+ self.assertEqual(server.it, 123)
+
+
+
+class WorkItemTests(TestCase):
+ """
+ A L{WorkItem} is an item of work that can be executed.
+ """
+
+ def test_forTableName(self):
+ """
+ L{WorkItem.forTable} returns L{WorkItem} subclasses mapped to the given
+ table.
+ """
+ self.assertIdentical(
+ WorkItem.forTableName(schema.DUMMY_WORK_ITEM.model.name), DummyWorkItem
+ )
+
+
+ @inlineCallbacks
+ def test_enqueue(self):
+ """
+ L{PeerConnectionPool.enqueueWork} will insert a job and a work item.
+ """
+ dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+ sinceEpoch = astimestamp(fakeNow)
+ clock = Clock()
+ clock.advance(sinceEpoch)
+ qpool = PeerConnectionPool(clock, dbpool.connection, 0)
+ realChoosePerformer = qpool.choosePerformer
+ performerChosen = []
+
+ def catchPerformerChoice():
+ result = realChoosePerformer()
+ performerChosen.append(True)
+ return result
+
+ qpool.choosePerformer = catchPerformerChoice
+
+ @transactionally(dbpool.connection)
+ def check(txn):
+ return qpool.enqueueWork(
+ txn, DummyWorkItem, a=3, b=9,
+ notBefore=datetime.datetime(2012, 12, 13, 12, 12, 0)
+ )
+
+ proposal = yield check
+ yield proposal.whenProposed()
+
+ # Make sure we have one JOB and one DUMMY_WORK_ITEM
+ @transactionally(dbpool.connection)
+ def checkJob(txn):
+ return Select(
+ From=schema.JOB
+ ).on(txn)
+
+ jobs = yield checkJob
+ self.assertTrue(len(jobs) == 1)
+ self.assertTrue(jobs[0][1] == "DUMMY_WORK_ITEM")
+
+ @transactionally(dbpool.connection)
+ def checkWork(txn):
+ return Select(
+ From=schema.DUMMY_WORK_ITEM
+ ).on(txn)
+
+ work = yield checkWork
+ self.assertTrue(len(work) == 1)
+ self.assertTrue(work[0][1] == jobs[0][0])
+
+
+
+class WorkerConnectionPoolTests(TestCase):
+ """
+ A L{WorkerConnectionPool} is responsible for managing, in a node's
+ controller (master) process, the collection of worker (slave) processes
+ that are capable of executing queue work.
+ """
+
+
+
+class WorkProposalTests(TestCase):
+ """
+ Tests for L{WorkProposal}.
+ """
+
+ @inlineCallbacks
+ def test_whenProposedSuccess(self):
+ """
+ The L{Deferred} returned by L{WorkProposal.whenProposed} fires when the
+ SQL sent to the database has completed.
+ """
+ cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
+ cph.setUp(test=self)
+ lq = LocalQueuer(cph.createTransaction)
+ enqTxn = cph.createTransaction()
+ wp = yield lq.enqueueWork(enqTxn, DummyWorkItem, a=3, b=4)
+ r = yield wp.whenProposed()
+ self.assertEquals(r, wp)
+
+
+ def test_whenProposedFailure(self):
+ """
+ The L{Deferred} returned by L{WorkProposal.whenProposed} fails with an
+ errback when the SQL executed to create the WorkItem row fails.
+ """
+ cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
+ cph.setUp(self)
+ enqTxn = cph.createTransaction()
+ lq = LocalQueuer(cph.createTransaction)
+ self.failUnlessFailure(lq.enqueueWork(enqTxn, DummyWorkItem, a=3, b=4, bogus=3), TypeError)
+ enqTxn.abort()
+ self.flushLoggedErrors()
+
+
+
+class PeerConnectionPoolUnitTests(TestCase):
+ """
+ L{PeerConnectionPool} has many internal components.
+ """
+ def setUp(self):
+ """
+ Create a L{PeerConnectionPool} that is just initialized enough.
+ """
+ self.pcp = PeerConnectionPool(None, None, 4321)
+
+
+ def checkPerformer(self, cls):
+ """
+ Verify that the performer returned by
+ L{PeerConnectionPool.choosePerformer}.
+ """
+ performer = self.pcp.choosePerformer()
+ self.failUnlessIsInstance(performer, cls)
+ verifyObject(_IJobPerformer, performer)
+
+
+ def test_choosingPerformerWhenNoPeersAndNoWorkers(self):
+ """
+ If L{PeerConnectionPool.choosePerformer} is invoked when no workers
+ have spawned and no peers have established connections (either incoming
+ or outgoing), then it chooses an implementation of C{performJob} that
+ simply executes the work locally.
+ """
+ self.checkPerformer(LocalPerformer)
+
+
+ def test_choosingPerformerWithLocalCapacity(self):
+ """
+ If L{PeerConnectionPool.choosePerformer} is invoked when some workers
+ have spawned, then it should choose the worker pool as the local
+ performer.
+ """
+ # Give it some local capacity.
+ wlf = self.pcp.workerListenerFactory()
+ proto = wlf.buildProtocol(None)
+ proto.makeConnection(StringTransport())
+ # Sanity check.
+ self.assertEqual(len(self.pcp.workerPool.workers), 1)
+ self.assertEqual(self.pcp.workerPool.hasAvailableCapacity(), True)
+ # Now it has some capacity.
+ self.checkPerformer(WorkerConnectionPool)
+
+
+ def test_choosingPerformerFromNetwork(self):
+ """
+ If L{PeerConnectionPool.choosePerformer} is invoked when no workers
+ have spawned but some peers have connected, then it should choose a
+ connection from the network to perform it.
+ """
+ peer = PeerConnectionPool(None, None, 4322)
+ local = self.pcp.peerFactory().buildProtocol(None)
+ remote = peer.peerFactory().buildProtocol(None)
+ connection = Connection(local, remote)
+ connection.start()
+ self.checkPerformer(ConnectionFromPeerNode)
+
+
+ def test_performingWorkOnNetwork(self):
+ """
+ The L{performJob} command will get relayed to the remote peer
+ controller.
+ """
+ peer = PeerConnectionPool(None, None, 4322)
+ local = self.pcp.peerFactory().buildProtocol(None)
+ remote = peer.peerFactory().buildProtocol(None)
+ connection = Connection(local, remote)
+ connection.start()
+ d = Deferred()
+
+ class DummyPerformer(object):
+ def performJob(self, jobID):
+ self.jobID = jobID
+ return d
+
+ # Doing real database I/O in this test would be tedious so fake the
+ # first method in the call stack which actually talks to the DB.
+ dummy = DummyPerformer()
+
+ def chooseDummy(onlyLocally=False):
+ return dummy
+
+ peer.choosePerformer = chooseDummy
+ performed = local.performJob(7384)
+ performResult = []
+ performed.addCallback(performResult.append)
+
+ # Sanity check.
+ self.assertEquals(performResult, [])
+ connection.flush()
+ self.assertEquals(dummy.jobID, 7384)
+ self.assertEquals(performResult, [])
+ d.callback(128374)
+ connection.flush()
+ self.assertEquals(performResult, [None])
+
+
+ def test_choosePerformerSorted(self):
+ """
+ If L{PeerConnectionPool.choosePerformer} is invoked make it
+ return the peer with the least load.
+ """
+ peer = PeerConnectionPool(None, None, 4322)
+
+ class DummyPeer(object):
+ def __init__(self, name, load):
+ self.name = name
+ self.load = load
+
+ def currentLoadEstimate(self):
+ return self.load
+
+ apeer = DummyPeer("A", 1)
+ bpeer = DummyPeer("B", 0)
+ cpeer = DummyPeer("C", 2)
+ peer.addPeerConnection(apeer)
+ peer.addPeerConnection(bpeer)
+ peer.addPeerConnection(cpeer)
+
+ performer = peer.choosePerformer(onlyLocally=False)
+ self.assertEqual(performer, bpeer)
+
+ bpeer.load = 2
+ performer = peer.choosePerformer(onlyLocally=False)
+ self.assertEqual(performer, apeer)
+
+
+ @inlineCallbacks
+ def test_notBeforeWhenCheckingForLostWork(self):
+ """
+ L{PeerConnectionPool._periodicLostWorkCheck} should execute any
+ outstanding work items, but only those that are expired.
+ """
+ dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ # An arbitrary point in time.
+ fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+ # *why* does datetime still not have .astimestamp()
+ sinceEpoch = astimestamp(fakeNow)
+ clock = Clock()
+ clock.advance(sinceEpoch)
+ qpool = PeerConnectionPool(clock, dbpool.connection, 0)
+
+ # Let's create a couple of work items directly, not via the enqueue
+ # method, so that they exist but nobody will try to immediately execute
+ # them.
+
+ @transactionally(dbpool.connection)
+ @inlineCallbacks
+ def setup(txn):
+ # First, one that's right now.
+ yield DummyWorkItem.makeJob(txn, a=1, b=2, notBefore=fakeNow)
+
+ # Next, create one that's actually far enough into the past to run.
+ yield DummyWorkItem.makeJob(
+ txn, a=3, b=4, notBefore=(
+ # Schedule it in the past so that it should have already
+ # run.
+ fakeNow - datetime.timedelta(
+ seconds=qpool.queueProcessTimeout + 20
+ )
+ )
+ )
+
+ # Finally, one that's actually scheduled for the future.
+ yield DummyWorkItem.makeJob(
+ txn, a=10, b=20, notBefore=fakeNow + datetime.timedelta(1000)
+ )
+ yield setup
+ yield qpool._periodicLostWorkCheck()
+
+ @transactionally(dbpool.connection)
+ def check(txn):
+ return DummyWorkDone.all(txn)
+
+ every = yield check
+ self.assertEquals([x.aPlusB for x in every], [7])
+
+
+ @inlineCallbacks
+ def test_notBeforeWhenEnqueueing(self):
+ """
+ L{PeerConnectionPool.enqueueWork} enqueues some work immediately, but
+ only executes it when enough time has elapsed to allow the C{notBefore}
+ attribute of the given work item to have passed.
+ """
+ dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+ sinceEpoch = astimestamp(fakeNow)
+ clock = Clock()
+ clock.advance(sinceEpoch)
+ qpool = PeerConnectionPool(clock, dbpool.connection, 0)
+ realChoosePerformer = qpool.choosePerformer
+ performerChosen = []
+
+ def catchPerformerChoice():
+ result = realChoosePerformer()
+ performerChosen.append(True)
+ return result
+
+ qpool.choosePerformer = catchPerformerChoice
+
+ @transactionally(dbpool.connection)
+ def check(txn):
+ return qpool.enqueueWork(
+ txn, DummyWorkItem, a=3, b=9,
+ notBefore=datetime.datetime(2012, 12, 12, 12, 12, 20)
+ )
+
+ proposal = yield check
+ yield proposal.whenProposed()
+
+ # This is going to schedule the work to happen with some asynchronous
+ # I/O in the middle; this is a problem because how do we know when it's
+ # time to check to see if the work has started? We need to intercept
+ # the thing that kicks off the work; we can then wait for the work
+ # itself.
+
+ self.assertEquals(performerChosen, [])
+
+ # Advance to exactly the appointed second.
+ clock.advance(20 - 12)
+ self.assertEquals(performerChosen, [True])
+
+ # FIXME: if this fails, it will hang, but that's better than no
+ # notification that it is broken at all.
+
+ result = yield proposal.whenExecuted()
+ self.assertIdentical(result, proposal)
+
+
+ @inlineCallbacks
+ def test_notBeforeBefore(self):
+ """
+ L{PeerConnectionPool.enqueueWork} will execute its work immediately if
+ the C{notBefore} attribute of the work item in question is in the past.
+ """
+ dbpool = buildConnectionPool(self, nodeSchema + jobSchema + schemaText)
+ fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+ sinceEpoch = astimestamp(fakeNow)
+ clock = Clock()
+ clock.advance(sinceEpoch)
+ qpool = PeerConnectionPool(clock, dbpool.connection, 0)
+ realChoosePerformer = qpool.choosePerformer
+ performerChosen = []
+
+ def catchPerformerChoice():
+ result = realChoosePerformer()
+ performerChosen.append(True)
+ return result
+
+ qpool.choosePerformer = catchPerformerChoice
+
+ @transactionally(dbpool.connection)
+ def check(txn):
+ return qpool.enqueueWork(
+ txn, DummyWorkItem, a=3, b=9,
+ notBefore=datetime.datetime(2012, 12, 12, 12, 12, 0)
+ )
+
+ proposal = yield check
+ yield proposal.whenProposed()
+
+ clock.advance(1000)
+ # Advance far beyond the given timestamp.
+ self.assertEquals(performerChosen, [True])
+
+ result = yield proposal.whenExecuted()
+ self.assertIdentical(result, proposal)
+
+
+ def test_workerConnectionPoolPerformJob(self):
+ """
+ L{WorkerConnectionPool.performJob} performs work by selecting a
+ L{ConnectionFromWorker} and sending it a L{PerformJOB} command.
+ """
+ clock = Clock()
+ peerPool = PeerConnectionPool(clock, None, 4322)
+ factory = peerPool.workerListenerFactory()
+
+ def peer():
+ p = factory.buildProtocol(None)
+ t = StringTransport()
+ p.makeConnection(t)
+ return p, t
+
+ worker1, _ignore_trans1 = peer()
+ worker2, _ignore_trans2 = peer()
+
+ # Ask the worker to do something.
+ worker1.performJob(1)
+ self.assertEquals(worker1.currentLoad, 1)
+ self.assertEquals(worker2.currentLoad, 0)
+
+ # Now ask the pool to do something
+ peerPool.workerPool.performJob(2)
+ self.assertEquals(worker1.currentLoad, 1)
+ self.assertEquals(worker2.currentLoad, 1)
+
+
+ def test_poolStartServiceChecksForWork(self):
+ """
+ L{PeerConnectionPool.startService} kicks off the idle work-check loop.
+ """
+ reactor = MemoryReactorWithClock()
+ cph = SteppablePoolHelper(nodeSchema + jobSchema + schemaText)
+ then = datetime.datetime(2012, 12, 12, 12, 12, 0)
+ reactor.advance(astimestamp(then))
+ cph.setUp(self)
+ pcp = PeerConnectionPool(reactor, cph.pool.connection, 4321)
+ now = then + datetime.timedelta(seconds=pcp.queueProcessTimeout * 2)
+
+ @transactionally(cph.pool.connection)
+ def createOldWork(txn):
+ one = DummyWorkItem.makeJob(txn, jobID=100, workID=1, a=3, b=4, notBefore=then)
+ two = DummyWorkItem.makeJob(txn, jobID=101, workID=2, a=7, b=9, notBefore=now)
+ return gatherResults([one, two])
+
+ pcp.startService()
+ cph.flushHolders()
+ reactor.advance(pcp.queueProcessTimeout * 2)
+ self.assertEquals(
+ cph.rows("select * from DUMMY_WORK_DONE"),
+ [(101, 200, 7)]
+ )
+ cph.rows("delete from DUMMY_WORK_DONE")
+ reactor.advance(pcp.queueProcessTimeout * 2)
+ self.assertEquals(
+ cph.rows("select * from DUMMY_WORK_DONE"),
+ [(102, 201, 16)]
+ )
+
+
+
+class HalfConnection(object):
+ def __init__(self, protocol):
+ self.protocol = protocol
+ self.transport = StringTransport()
+
+
+ def start(self):
+ """
+ Hook up the protocol and the transport.
+ """
+ self.protocol.makeConnection(self.transport)
+
+
+ def extract(self):
+ """
+ Extract the data currently present in this protocol's output buffer.
+ """
+ io = self.transport.io
+ value = io.getvalue()
+ io.seek(0)
+ io.truncate()
+ return value
+
+
+ def deliver(self, data):
+ """
+ Deliver the given data to this L{HalfConnection}'s protocol's
+ C{dataReceived} method.
+
+ @return: a boolean indicating whether any data was delivered.
+ @rtype: L{bool}
+ """
+ if data:
+ self.protocol.dataReceived(data)
+ return True
+ return False
+
+
+
+class Connection(object):
+
+ def __init__(self, local, remote):
+ """
+ Connect two protocol instances to each other via string transports.
+ """
+ self.receiver = HalfConnection(local)
+ self.sender = HalfConnection(remote)
+
+
+ def start(self):
+ """
+ Start up the connection.
+ """
+ self.sender.start()
+ self.receiver.start()
+
+
+ def pump(self):
+ """
+ Relay data in one direction between the two connections.
+ """
+ result = self.receiver.deliver(self.sender.extract())
+ self.receiver, self.sender = self.sender, self.receiver
+ return result
+
+
+ def flush(self, turns=10):
+ """
+ Keep relaying data until there's no more.
+ """
+ for _ignore_x in range(turns):
+ if not (self.pump() or self.pump()):
+ return
+
+
+
+class PeerConnectionPoolIntegrationTests(TestCase):
+ """
+ L{PeerConnectionPool} is the service responsible for coordinating
+ eventually-consistent task queuing within a cluster.
+ """
+
+ @inlineCallbacks
+ def setUp(self):
+ """
+ L{PeerConnectionPool} requires access to a database and the reactor.
+ """
+ self.store = yield buildStore(self, None)
+
+ def doit(txn):
+ return txn.execSQL(schemaText)
+
+ yield inTransaction(
+ lambda: self.store.newTransaction("bonus schema"), doit
+ )
+
+ def indirectedTransactionFactory(*a):
+ """
+ Allow tests to replace "self.store.newTransaction" to provide
+ fixtures with extra methods on a test-by-test basis.
+ """
+ return self.store.newTransaction(*a)
+
+ def deschema():
+ @inlineCallbacks
+ def deletestuff(txn):
+ for stmt in dropSQL:
+ yield txn.execSQL(stmt)
+ return inTransaction(
+ lambda *a: self.store.newTransaction(*a), deletestuff
+ )
+ self.addCleanup(deschema)
+
+ from twisted.internet import reactor
+ self.node1 = PeerConnectionPool(
+ reactor, indirectedTransactionFactory, 0)
+ self.node2 = PeerConnectionPool(
+ reactor, indirectedTransactionFactory, 0)
+
+ class FireMeService(Service, object):
+ def __init__(self, d):
+ super(FireMeService, self).__init__()
+ self.d = d
+
+ def startService(self):
+ self.d.callback(None)
+
+ d1 = Deferred()
+ d2 = Deferred()
+ FireMeService(d1).setServiceParent(self.node1)
+ FireMeService(d2).setServiceParent(self.node2)
+ ms = MultiService()
+ self.node1.setServiceParent(ms)
+ self.node2.setServiceParent(ms)
+ ms.startService()
+ self.addCleanup(ms.stopService)
+ yield gatherResults([d1, d2])
+ self.store.queuer = self.node1
+
+
+ def test_currentNodeInfo(self):
+ """
+ There will be two C{NODE_INFO} rows in the database, retrievable as two
+ L{NodeInfo} objects, once both nodes have started up.
+ """
+ @inlineCallbacks
+ def check(txn):
+ self.assertEquals(len((yield self.node1.activeNodes(txn))), 2)
+ self.assertEquals(len((yield self.node2.activeNodes(txn))), 2)
+ return inTransaction(self.store.newTransaction, check)
+
+
+ @inlineCallbacks
+ def test_enqueueHappyPath(self):
+ """
+ When a L{WorkItem} is scheduled for execution via
+ L{PeerConnectionPool.enqueueWork} its C{doWork} method will be invoked
+ by the time the L{Deferred} returned from the resulting
+ L{WorkProposal}'s C{whenExecuted} method has fired.
+ """
+ # TODO: this exact test should run against LocalQueuer as well.
+ def operation(txn):
+ # TODO: how does "enqueue" get associated with the transaction?
+ # This is not the fact with a raw t.w.enterprise transaction.
+ # Should probably do something with components.
+ return txn.enqueue(DummyWorkItem, a=3, b=4, jobID=100, workID=1,
+ notBefore=datetime.datetime.utcnow())
+ result = yield inTransaction(self.store.newTransaction, operation)
+ # Wait for it to be executed. Hopefully this does not time out :-\.
+ yield result.whenExecuted()
+
+ def op2(txn):
+ return Select(
+ [
+ schema.DUMMY_WORK_DONE.WORK_ID,
+ schema.DUMMY_WORK_DONE.JOB_ID,
+ schema.DUMMY_WORK_DONE.A_PLUS_B,
+ ],
+ From=schema.DUMMY_WORK_DONE
+ ).on(txn)
+
+ rows = yield inTransaction(self.store.newTransaction, op2)
+ self.assertEquals(rows, [[101, 200, 7]])
+
+
+ @inlineCallbacks
+ def test_noWorkDoneWhenConcurrentlyDeleted(self):
+ """
+ When a L{WorkItem} is concurrently deleted by another transaction, it
+ should I{not} perform its work.
+ """
+ # Provide access to a method called "concurrently" everything using
+ original = self.store.newTransaction
+
+ def decorate(*a, **k):
+ result = original(*a, **k)
+ result.concurrently = self.store.newTransaction
+ return result
+
+ self.store.newTransaction = decorate
+
+ def operation(txn):
+ return txn.enqueue(
+ DummyWorkItem, a=30, b=40, workID=5678,
+ deleteOnLoad=1,
+ notBefore=datetime.datetime.utcnow()
+ )
+
+ proposal = yield inTransaction(self.store.newTransaction, operation)
+ yield proposal.whenExecuted()
+
+ # Sanity check on the concurrent deletion.
+ def op2(txn):
+ return Select(
+ [schema.DUMMY_WORK_ITEM.WORK_ID],
+ From=schema.DUMMY_WORK_ITEM
+ ).on(txn)
+
+ rows = yield inTransaction(self.store.newTransaction, op2)
+ self.assertEquals(rows, [])
+
+ def op3(txn):
+ return Select(
+ [
+ schema.DUMMY_WORK_DONE.WORK_ID,
+ schema.DUMMY_WORK_DONE.A_PLUS_B,
+ ],
+ From=schema.DUMMY_WORK_DONE
+ ).on(txn)
+
+ rows = yield inTransaction(self.store.newTransaction, op3)
+ self.assertEquals(rows, [])
+
+
+
+class DummyProposal(object):
+
+ def __init__(self, *ignored):
+ pass
+
+
+ def _start(self):
+ pass
+
+
+
+class BaseQueuerTests(TestCase):
+
+ def setUp(self):
+ self.proposal = None
+ self.patch(twext.enterprise.jobqueue, "WorkProposal", DummyProposal)
+
+
+ def _proposalCallback(self, proposal):
+ self.proposal = proposal
+
+
+ @inlineCallbacks
+ def test_proposalCallbacks(self):
+ queuer = _BaseQueuer()
+ queuer.callWithNewProposals(self._proposalCallback)
+ self.assertEqual(self.proposal, None)
+ yield queuer.enqueueWork(None, None)
+ self.assertNotEqual(self.proposal, None)
+
+
+
+class NonPerformingQueuerTests(TestCase):
+
+ @inlineCallbacks
+ def test_choosePerformer(self):
+ queuer = NonPerformingQueuer()
+ performer = queuer.choosePerformer()
+ result = (yield performer.performJob(None))
+ self.assertEquals(result, None)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140301/baad6f2a/attachment-0001.html>
More information about the calendarserver-changes
mailing list