[CalendarServer-changes] [10221] CalendarServer/branches/users/glyph/queue-locking-and-timing
source_changes at macosforge.org
source_changes at macosforge.org
Fri Jan 4 16:38:32 PST 2013
Revision: 10221
http://trac.calendarserver.org//changeset/10221
Author: glyph at apple.com
Date: 2013-01-04 16:38:31 -0800 (Fri, 04 Jan 2013)
Log Message:
-----------
Passing full integration test, and additional unit test for one of the steps; choosePerformer is now synchronous since it apparently never needs asynchrony.
Modified Paths:
--------------
CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py
Property Changed:
----------------
CalendarServer/branches/users/glyph/queue-locking-and-timing/
Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py 2013-01-05 00:38:30 UTC (rev 10220)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py 2013-01-05 00:38:31 UTC (rev 10221)
@@ -177,7 +177,9 @@
@param proto: an L{SchemaAMP}
"""
- return TableSyntax(proto.schema.tableNamed(inString.decode("UTF-8")))
+ return TableSyntax(
+ proto.schema.model.tableNamed(inString.decode("UTF-8"))
+ )
def toString(self, inObject):
@@ -605,7 +607,9 @@
complete.
@rtype: L{Deferred} firing L{dict}
"""
- return self._selectLowestLoadWorker().performWork(table, workID)
+ preferredWorker = self._selectLowestLoadWorker()
+ result = preferredWorker.performWork(table, workID)
+ return result
@@ -699,7 +703,7 @@
C{self}, since C{self} is also an object that has a C{performWork}
method.
"""
- return succeed(self)
+ return self
def performWork(self, table, workID):
@@ -738,21 +742,63 @@
process has instructed this worker to do it; so, look up the data in
the row, and do it.
"""
- @inlineCallbacks
- def work(txn):
- workItemClass = WorkItem.forTable(table)
- workItem = yield workItemClass.load(txn, workID)
- # TODO: what if we fail? error-handling should be recorded
- # someplace, the row should probably be marked, re-tries should be
- # triggerable administratively.
- yield workItem.delete()
- # TODO: verify that workID is the primary key someplace.
- yield workItem.doWork()
- returnValue({})
- return inTransaction(self.transactionFactory, work)
+ return (ultimatelyPerform(self.transactionFactory, table, workID)
+ .addCallback(lambda ignored: {}))
+def ultimatelyPerform(txnFactory, table, workID):
+ """
+ Eventually, after routing the work to the appropriate place, somebody
+ actually has to I{do} it.
+
+ @param txnFactory: a 0- or 1-argument callable that creates an
+ L{IAsyncTransaction}
+ @type txnFactory: L{callable}
+
+ @param table: the table object that corresponds to the necessary work item
+ @type table: L{twext.enterprise.dal.syntax.TableSyntax}
+
+ @param workID: the ID of the work to be performed
+ @type workID: L{int}
+
+ @return: a L{Deferred} which fires with C{None} when the work has been
+ performed, or fails if the work can't be performed.
+ """
+ @inlineCallbacks
+ def work(txn):
+ workItemClass = WorkItem.forTable(table)
+ workItem = yield workItemClass.load(txn, workID)
+ # TODO: what if we fail? error-handling should be recorded someplace,
+ # the row should probably be marked, re-tries should be triggerable
+ # administratively.
+ yield workItem.delete()
+ # TODO: verify that workID is the primary key someplace.
+ yield workItem.doWork()
+ return inTransaction(txnFactory, work)
+
+
+
+class ImmediatePerformer(object):
+ """
+ Implementor of C{performWork} that does its work immediately, regardless.
+ """
+
+ def __init__(self, txnFactory):
+ """
+ Create this L{ImmediatePerformer} with a transaction factory.
+ """
+ self.txnFactory = txnFactory
+
+
+ def performWork(self, table, workID):
+ """
+ Perform the given work right now.
+ """
+ return ultimatelyPerform(self.txnFactory, table, workID)
+
+
+
class WorkerFactory(Factory, object):
"""
Factory, to be used as the client to connect from the worker to the
@@ -840,17 +886,14 @@
@self.txn.postCommit
def whenDone():
self._whenCommitted.callback(None)
- @passthru(self.pool.choosePerformer().addCallback)
- def performerChosen(performer):
- @passthru(performer.performWork(item.table, item.workID))
- def performed(result):
- self._whenExecuted.callback(None)
- @performed.addErrback
- def notPerformed(why):
- self._whenExecuted.errback(why)
- @performerChosen.addErrback
- def notChosen(whyNot):
- self._whenExecuted.errback(whyNot)
+ performer = self.pool.choosePerformer()
+ @passthru(performer.performWork(item.table, item.workID)
+ .addCallback)
+ def performed(result):
+ self._whenExecuted.callback(None)
+ @performed.addErrback
+ def notPerformed(why):
+ self._whenExecuted.errback(why)
@self.txn.postAbort
def whenFailed():
self._whenCommitted.errback(TransactionFailed)
@@ -1027,10 +1070,12 @@
@rtype: L{Deferred <twisted.internet.defer.Deferred>} firing
L{ConnectionFromPeerNode} or L{WorkerConnectionPool}
"""
- if not self.workerPool.hasAvailableCapacity() and self.peers:
+ if self.workerPool.hasAvailableCapacity():
+ return succeed(self.workerPool)
+ if self.peers:
return sorted(self.peers, lambda p: p.currentLoadEstimate())[0]
else:
- return succeed(self.workerPool)
+ return ImmediatePerformer(self.transactionFactory)
def enqueueWork(self, txn, workItemType, **kw):
@@ -1121,7 +1166,7 @@
self.queueProcessTimeout
))
)):
- peer = yield self.choosePerformer()
+ peer = self.choosePerformer()
yield peer.performWork(overdueItem.table,
overdueItem.workID)
return inTransaction(self.transactionFactory, workCheck)
Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py 2013-01-05 00:38:30 UTC (rev 10220)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py 2013-01-05 00:38:31 UTC (rev 10221)
@@ -35,6 +35,7 @@
from twisted.application.service import Service, MultiService
from twext.enterprise.dal.syntax import Insert
+from twext.enterprise.queue import ImmediatePerformer
from twext.enterprise.dal.syntax import Select
class UtilityTests(TestCase):
@@ -89,7 +90,8 @@
SQL = passthru
schemaText = SQL("""
- create table DUMMY_WORK_ITEM (WORK_ID integer, NOT_BEFORE timestamp,
+ create table DUMMY_WORK_ITEM (WORK_ID integer primary key,
+ NOT_BEFORE timestamp,
A integer, B integer);
create table DUMMY_WORK_DONE (WORK_ID integer, A_PLUS_B integer);
""")
@@ -118,6 +120,31 @@
+class WorkerConnectionPoolTests(TestCase):
+ """
+ A L{WorkerConnectionPool} is responsible for managing, in a node's
+ controller (master) process, the collection of worker (slave) processes
+ that are capable of executing queue work.
+ """
+
+
+class PeerConnectionPoolUnitTests(TestCase):
+ """
+ L{PeerConnectionPool} has many internal components.
+ """
+
+ def test_choosingPerformerWhenNoPeersAndNoWorkers(self):
+ """
+ If L{PeerConnectionPool.choosePerformer} is invoked when no workers
+ have spawned and no peers have established connections (either incoming
+ or outgoing), then it chooses an implementation of C{performWork} that
+ simply executes the work locally.
+ """
+ pcp = PeerConnectionPool(None, None, 4321, schema)
+ self.assertIsInstance(pcp.choosePerformer(), ImmediatePerformer)
+
+
+
class PeerConnectionPoolIntegrationTests(TestCase):
"""
L{PeerConnectionPool} is the service responsible for coordinating
@@ -202,6 +229,6 @@
schema.DUMMY_WORK_DONE.A_PLUS_B],
From=schema.DUMMY_WORK_DONE).on(txn)
rows = yield inTransaction(self.store.newTransaction, op2)
- self.assertEquals(rows, [(3421, 7)])
+ self.assertEquals(rows, [[4321, 7]])
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130104/98b9df25/attachment.html>
More information about the calendarserver-changes
mailing list