[CalendarServer-changes] [10221] CalendarServer/branches/users/glyph/queue-locking-and-timing

source_changes at macosforge.org source_changes at macosforge.org
Fri Jan 4 16:38:32 PST 2013


Revision: 10221
          http://trac.calendarserver.org//changeset/10221
Author:   glyph at apple.com
Date:     2013-01-04 16:38:31 -0800 (Fri, 04 Jan 2013)
Log Message:
-----------
Passing full integration test, and additional unit test for one of the steps; choosePerformer is now synchronous since it apparently never needs asynchrony.

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
    CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/queue-locking-and-timing/

Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py	2013-01-05 00:38:30 UTC (rev 10220)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py	2013-01-05 00:38:31 UTC (rev 10221)
@@ -177,7 +177,9 @@
 
         @param proto: an L{SchemaAMP}
         """
-        return TableSyntax(proto.schema.tableNamed(inString.decode("UTF-8")))
+        return TableSyntax(
+            proto.schema.model.tableNamed(inString.decode("UTF-8"))
+        )
 
 
     def toString(self, inObject):
@@ -605,7 +607,9 @@
             complete.
         @rtype: L{Deferred} firing L{dict}
         """
-        return self._selectLowestLoadWorker().performWork(table, workID)
+        preferredWorker = self._selectLowestLoadWorker()
+        result = preferredWorker.performWork(table, workID)
+        return result
 
 
 
@@ -699,7 +703,7 @@
         C{self}, since C{self} is also an object that has a C{performWork}
         method.
         """
-        return succeed(self)
+        return self
 
 
     def performWork(self, table, workID):
@@ -738,21 +742,63 @@
         process has instructed this worker to do it; so, look up the data in
         the row, and do it.
         """
-        @inlineCallbacks
-        def work(txn):
-            workItemClass = WorkItem.forTable(table)
-            workItem = yield workItemClass.load(txn, workID)
-            # TODO: what if we fail?  error-handling should be recorded
-            # someplace, the row should probably be marked, re-tries should be
-            # triggerable administratively.
-            yield workItem.delete()
-            # TODO: verify that workID is the primary key someplace.
-            yield workItem.doWork()
-            returnValue({})
-        return inTransaction(self.transactionFactory, work)
+        return (ultimatelyPerform(self.transactionFactory, table, workID)
+                .addCallback(lambda ignored: {}))
 
 
 
+def ultimatelyPerform(txnFactory, table, workID):
+    """
+    Eventually, after routing the work to the appropriate place, somebody
+    actually has to I{do} it.
+
+    @param txnFactory: a 0- or 1-argument callable that creates an
+        L{IAsyncTransaction}
+    @type txnFactory: L{callable}
+
+    @param table: the table object that corresponds to the necessary work item
+    @type table: L{twext.enterprise.dal.syntax.TableSyntax}
+
+    @param workID: the ID of the work to be performed
+    @type workID: L{int}
+
+    @return: a L{Deferred} which fires with C{None} when the work has been
+        performed, or fails if the work can't be performed.
+    """
+    @inlineCallbacks
+    def work(txn):
+        workItemClass = WorkItem.forTable(table)
+        workItem = yield workItemClass.load(txn, workID)
+        # TODO: what if we fail?  error-handling should be recorded someplace,
+        # the row should probably be marked, re-tries should be triggerable
+        # administratively.
+        yield workItem.delete()
+        # TODO: verify that workID is the primary key someplace.
+        yield workItem.doWork()
+    return inTransaction(txnFactory, work)
+
+
+
+class ImmediatePerformer(object):
+    """
+    Implementor of C{performWork} that does its work immediately, regardless.
+    """
+
+    def __init__(self, txnFactory):
+        """
+        Create this L{ImmediatePerformer} with a transaction factory.
+        """
+        self.txnFactory = txnFactory
+
+
+    def performWork(self, table, workID):
+        """
+        Perform the given work right now.
+        """
+        return ultimatelyPerform(self.txnFactory, table, workID)
+
+
+
 class WorkerFactory(Factory, object):
     """
     Factory, to be used as the client to connect from the worker to the
@@ -840,17 +886,14 @@
             @self.txn.postCommit
             def whenDone():
                 self._whenCommitted.callback(None)
-                @passthru(self.pool.choosePerformer().addCallback)
-                def performerChosen(performer):
-                    @passthru(performer.performWork(item.table, item.workID))
-                    def performed(result):
-                        self._whenExecuted.callback(None)
-                    @performed.addErrback
-                    def notPerformed(why):
-                        self._whenExecuted.errback(why)
-                @performerChosen.addErrback
-                def notChosen(whyNot):
-                    self._whenExecuted.errback(whyNot)
+                performer = self.pool.choosePerformer()
+                @passthru(performer.performWork(item.table, item.workID)
+                          .addCallback)
+                def performed(result):
+                    self._whenExecuted.callback(None)
+                @performed.addErrback
+                def notPerformed(why):
+                    self._whenExecuted.errback(why)
             @self.txn.postAbort
             def whenFailed():
                 self._whenCommitted.errback(TransactionFailed)
@@ -1027,10 +1070,12 @@
         @rtype: L{Deferred <twisted.internet.defer.Deferred>} firing
             L{ConnectionFromPeerNode} or L{WorkerConnectionPool}
         """
-        if not self.workerPool.hasAvailableCapacity() and self.peers:
+        if self.workerPool.hasAvailableCapacity():
+            return succeed(self.workerPool)
+        if self.peers:
             return sorted(self.peers, lambda p: p.currentLoadEstimate())[0]
         else:
-            return succeed(self.workerPool)
+            return ImmediatePerformer(self.transactionFactory)
 
 
     def enqueueWork(self, txn, workItemType, **kw):
@@ -1121,7 +1166,7 @@
                                 self.queueProcessTimeout
                             ))
                     )):
-                    peer = yield self.choosePerformer()
+                    peer = self.choosePerformer()
                     yield peer.performWork(overdueItem.table,
                                            overdueItem.workID)
         return inTransaction(self.transactionFactory, workCheck)

Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py	2013-01-05 00:38:30 UTC (rev 10220)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py	2013-01-05 00:38:31 UTC (rev 10221)
@@ -35,6 +35,7 @@
 from twisted.application.service import Service, MultiService
 
 from twext.enterprise.dal.syntax import Insert
+from twext.enterprise.queue import ImmediatePerformer
 
 from twext.enterprise.dal.syntax import Select
 class UtilityTests(TestCase):
@@ -89,7 +90,8 @@
 SQL = passthru
 
 schemaText = SQL("""
-    create table DUMMY_WORK_ITEM (WORK_ID integer, NOT_BEFORE timestamp,
+    create table DUMMY_WORK_ITEM (WORK_ID integer primary key,
+                                  NOT_BEFORE timestamp,
                                   A integer, B integer);
     create table DUMMY_WORK_DONE (WORK_ID integer, A_PLUS_B integer);
 """)
@@ -118,6 +120,31 @@
 
 
 
+class WorkerConnectionPoolTests(TestCase):
+    """
+    A L{WorkerConnectionPool} is responsible for managing, in a node's
+    controller (master) process, the collection of worker (slave) processes
+    that are capable of executing queue work.
+    """
+
+
+class PeerConnectionPoolUnitTests(TestCase):
+    """
+    L{PeerConnectionPool} has many internal components.
+    """
+
+    def test_choosingPerformerWhenNoPeersAndNoWorkers(self):
+        """
+        If L{PeerConnectionPool.choosePerformer} is invoked when no workers
+        have spawned and no peers have established connections (either incoming
+        or outgoing), then it chooses an implementation of C{performWork} that
+        simply executes the work locally.
+        """
+        pcp = PeerConnectionPool(None, None, 4321, schema)
+        self.assertIsInstance(pcp.choosePerformer(), ImmediatePerformer)
+
+
+
 class PeerConnectionPoolIntegrationTests(TestCase):
     """
     L{PeerConnectionPool} is the service responsible for coordinating
@@ -202,6 +229,6 @@
                            schema.DUMMY_WORK_DONE.A_PLUS_B],
                            From=schema.DUMMY_WORK_DONE).on(txn)
         rows = yield inTransaction(self.store.newTransaction, op2)
-        self.assertEquals(rows, [(3421, 7)])
+        self.assertEquals(rows, [[4321, 7]])
 
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130104/98b9df25/attachment.html>


More information about the calendarserver-changes mailing list