[CalendarServer-changes] [10222] CalendarServer/branches/users/glyph/queue-locking-and-timing

source_changes at macosforge.org source_changes at macosforge.org
Fri Jan 4 16:38:33 PST 2013


Revision: 10222
          http://trac.calendarserver.org//changeset/10222
Author:   glyph at apple.com
Date:     2013-01-04 16:38:33 -0800 (Fri, 04 Jan 2013)
Log Message:
-----------
Add (failing) tests for peer selection.

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
    CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/queue-locking-and-timing/

Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py	2013-01-05 00:38:31 UTC (rev 10221)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py	2013-01-05 00:38:33 UTC (rev 10222)
@@ -104,7 +104,32 @@
 from twisted.internet.endpoints import TCP4ServerEndpoint
 from twext.enterprise.dal.syntax import Lock
 from twext.enterprise.ienterprise import IQueuer
+from zope.interface.interface import Interface
 
+
+class _IWorkPerformer(Interface):
+    """
+    An object that can perform work.
+
+    Internal interface; implemented by several classes here since work has to
+    (in the worst case) pass from worker->controller->controller->worker.
+    """
+
+    def performWork(table, workID):
+        """
+        @param table: The table where work is waiting.
+        @type table: L{TableSyntax}
+
+        @param workID: The primary key identifier of the given work.
+        @type workID: L{int}
+
+        @return: a L{Deferred} firing with an empty dictionary when the work is
+            complete.
+        @rtype: L{Deferred} firing L{dict}
+        """
+
+
+
 def makeNodeSchema(inSchema):
     """
     Create a self-contained schema for L{NodeInfo} to use.
@@ -490,15 +515,7 @@
         specific peer node-controller process to perform some work, having
         already determined that it's appropriate.
 
-        @param table: The table where work is waiting.
-        @type table: L{TableSyntax}
-
-        @param workID: The primary key identifier of the given work.
-        @type workID: L{int}
-
-        @return: a L{Deferred} firing with an empty dictionary when the work is
-            complete.
-        @rtype: L{Deferred} firing L{dict}
+        @see: L{_IWorkPerformer.performWork}
         """
         d = self.callRemote(PerformWork, table=table, workID=workID)
         self._bonusLoad += 1
@@ -569,7 +586,7 @@
         hasAvailableCapacity to process another queue item?
         """
         for worker in self.workers:
-            if worker.currentLoad() < self.maximumLoadPerWorker:
+            if worker.currentLoad < self.maximumLoadPerWorker:
                 return True
         return False
 
@@ -1062,16 +1079,12 @@
         should be lower-latency.  Also, if no peers are available, work will be
         submitted locally even if the worker pool is already over-subscribed.
 
-        @return: a L{Deferred <twisted.internet.defer.Deferred>} which fires
-            with the chosen 'peer', i.e. object with a C{performWork} method,
-            as soon as one is available.  Normally this will be synchronous,
-            but we need to account for the possibility that we may need to
-            connect to other hosts.
-        @rtype: L{Deferred <twisted.internet.defer.Deferred>} firing
-            L{ConnectionFromPeerNode} or L{WorkerConnectionPool}
+        @return: the chosen peer.
+        @rtype: L{_IWorkPerformer} L{ConnectionFromPeerNode} or
+            L{WorkerConnectionPool}
         """
         if self.workerPool.hasAvailableCapacity():
-            return succeed(self.workerPool)
+            return self.workerPool
         if self.peers:
             return sorted(self.peers, lambda p: p.currentLoadEstimate())[0]
         else:

Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py	2013-01-05 00:38:31 UTC (rev 10221)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py	2013-01-05 00:38:33 UTC (rev 10222)
@@ -35,7 +35,10 @@
 from twisted.application.service import Service, MultiService
 
 from twext.enterprise.dal.syntax import Insert
-from twext.enterprise.queue import ImmediatePerformer
+from twext.enterprise.queue import ImmediatePerformer, _IWorkPerformer
+from twext.enterprise.queue import WorkerConnectionPool
+from zope.interface.verify import verifyObject
+from twisted.test.proto_helpers import StringTransport
 
 from twext.enterprise.dal.syntax import Select
 class UtilityTests(TestCase):
@@ -132,7 +135,23 @@
     """
     L{PeerConnectionPool} has many internal components.
     """
+    def setUp(self):
+        """
+        Create a L{PeerConnectionPool} that is just initialized enough.
+        """
+        self.pcp = PeerConnectionPool(None, None, 4321, schema)
 
+
+    def checkPerformer(self, cls):
+        """
+        Verify that the performer returned by
+        L{PeerConnectionPool.choosePerformer}.
+        """
+        performer = self.pcp.choosePerformer()
+        self.failUnlessIsInstance(performer, cls)
+        verifyObject(_IWorkPerformer, performer)
+
+
     def test_choosingPerformerWhenNoPeersAndNoWorkers(self):
         """
         If L{PeerConnectionPool.choosePerformer} is invoked when no workers
@@ -140,11 +159,34 @@
         or outgoing), then it chooses an implementation of C{performWork} that
         simply executes the work locally.
         """
-        pcp = PeerConnectionPool(None, None, 4321, schema)
-        self.assertIsInstance(pcp.choosePerformer(), ImmediatePerformer)
+        self.checkPerformer(ImmediatePerformer)
 
 
+    def test_choosingPerformerWithLocalCapacity(self):
+        """
+        If L{PeerConnectionPool.choosePerformer} is invoked when some workers
+        have spawned, then it should choose the worker pool as the local
+        performer.
+        """
+        # Give it some local capacity.
+        wlf = self.pcp.workerListenerFactory()
+        proto = wlf.buildProtocol(None)
+        proto.makeConnection(StringTransport())
+        # Now it has some capacity.
+        self.checkPerformer(WorkerConnectionPool)
 
+
+    def test_choosingPerformerFromNetwork(self):
+        """
+        If L{PeerConnectionPool.choosePerformer} is invoked when no workers
+        have spawned but some peers have connected, then it should choose a
+        connection from the network to perform it.
+        """
+
+    test_choosingPerformerFromNetwork.skip = "not implemented yet..."
+
+
+
 class PeerConnectionPoolIntegrationTests(TestCase):
     """
     L{PeerConnectionPool} is the service responsible for coordinating
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130104/643afc1d/attachment-0001.html>


More information about the calendarserver-changes mailing list