[CalendarServer-changes] [10222] CalendarServer/branches/users/glyph/queue-locking-and-timing
source_changes at macosforge.org
source_changes at macosforge.org
Fri Jan 4 16:38:33 PST 2013
Revision: 10222
http://trac.calendarserver.org//changeset/10222
Author: glyph at apple.com
Date: 2013-01-04 16:38:33 -0800 (Fri, 04 Jan 2013)
Log Message:
-----------
Add (failing) tests for peer selection.
Modified Paths:
--------------
CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py
Property Changed:
----------------
CalendarServer/branches/users/glyph/queue-locking-and-timing/
Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py 2013-01-05 00:38:31 UTC (rev 10221)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py 2013-01-05 00:38:33 UTC (rev 10222)
@@ -104,7 +104,32 @@
from twisted.internet.endpoints import TCP4ServerEndpoint
from twext.enterprise.dal.syntax import Lock
from twext.enterprise.ienterprise import IQueuer
+from zope.interface.interface import Interface
+
+class _IWorkPerformer(Interface):
+ """
+ An object that can perform work.
+
+ Internal interface; implemented by several classes here since work has to
+ (in the worst case) pass from worker->controller->controller->worker.
+ """
+
+ def performWork(table, workID):
+ """
+ @param table: The table where work is waiting.
+ @type table: L{TableSyntax}
+
+ @param workID: The primary key identifier of the given work.
+ @type workID: L{int}
+
+ @return: a L{Deferred} firing with an empty dictionary when the work is
+ complete.
+ @rtype: L{Deferred} firing L{dict}
+ """
+
+
+
def makeNodeSchema(inSchema):
"""
Create a self-contained schema for L{NodeInfo} to use.
@@ -490,15 +515,7 @@
specific peer node-controller process to perform some work, having
already determined that it's appropriate.
- @param table: The table where work is waiting.
- @type table: L{TableSyntax}
-
- @param workID: The primary key identifier of the given work.
- @type workID: L{int}
-
- @return: a L{Deferred} firing with an empty dictionary when the work is
- complete.
- @rtype: L{Deferred} firing L{dict}
+ @see: L{_IWorkPerformer.performWork}
"""
d = self.callRemote(PerformWork, table=table, workID=workID)
self._bonusLoad += 1
@@ -569,7 +586,7 @@
hasAvailableCapacity to process another queue item?
"""
for worker in self.workers:
- if worker.currentLoad() < self.maximumLoadPerWorker:
+ if worker.currentLoad < self.maximumLoadPerWorker:
return True
return False
@@ -1062,16 +1079,12 @@
should be lower-latency. Also, if no peers are available, work will be
submitted locally even if the worker pool is already over-subscribed.
- @return: a L{Deferred <twisted.internet.defer.Deferred>} which fires
- with the chosen 'peer', i.e. object with a C{performWork} method,
- as soon as one is available. Normally this will be synchronous,
- but we need to account for the possibility that we may need to
- connect to other hosts.
- @rtype: L{Deferred <twisted.internet.defer.Deferred>} firing
- L{ConnectionFromPeerNode} or L{WorkerConnectionPool}
+ @return: the chosen peer.
+ @rtype: L{_IWorkPerformer} L{ConnectionFromPeerNode} or
+ L{WorkerConnectionPool}
"""
if self.workerPool.hasAvailableCapacity():
- return succeed(self.workerPool)
+ return self.workerPool
if self.peers:
return sorted(self.peers, lambda p: p.currentLoadEstimate())[0]
else:
Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py 2013-01-05 00:38:31 UTC (rev 10221)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py 2013-01-05 00:38:33 UTC (rev 10222)
@@ -35,7 +35,10 @@
from twisted.application.service import Service, MultiService
from twext.enterprise.dal.syntax import Insert
-from twext.enterprise.queue import ImmediatePerformer
+from twext.enterprise.queue import ImmediatePerformer, _IWorkPerformer
+from twext.enterprise.queue import WorkerConnectionPool
+from zope.interface.verify import verifyObject
+from twisted.test.proto_helpers import StringTransport
from twext.enterprise.dal.syntax import Select
class UtilityTests(TestCase):
@@ -132,7 +135,23 @@
"""
L{PeerConnectionPool} has many internal components.
"""
+ def setUp(self):
+ """
+ Create a L{PeerConnectionPool} that is just initialized enough.
+ """
+ self.pcp = PeerConnectionPool(None, None, 4321, schema)
+
+ def checkPerformer(self, cls):
+ """
+ Verify that the performer returned by
+ L{PeerConnectionPool.choosePerformer}.
+ """
+ performer = self.pcp.choosePerformer()
+ self.failUnlessIsInstance(performer, cls)
+ verifyObject(_IWorkPerformer, performer)
+
+
def test_choosingPerformerWhenNoPeersAndNoWorkers(self):
"""
If L{PeerConnectionPool.choosePerformer} is invoked when no workers
@@ -140,11 +159,34 @@
or outgoing), then it chooses an implementation of C{performWork} that
simply executes the work locally.
"""
- pcp = PeerConnectionPool(None, None, 4321, schema)
- self.assertIsInstance(pcp.choosePerformer(), ImmediatePerformer)
+ self.checkPerformer(ImmediatePerformer)
+ def test_choosingPerformerWithLocalCapacity(self):
+ """
+ If L{PeerConnectionPool.choosePerformer} is invoked when some workers
+ have spawned, then it should choose the worker pool as the local
+ performer.
+ """
+ # Give it some local capacity.
+ wlf = self.pcp.workerListenerFactory()
+ proto = wlf.buildProtocol(None)
+ proto.makeConnection(StringTransport())
+ # Now it has some capacity.
+ self.checkPerformer(WorkerConnectionPool)
+
+ def test_choosingPerformerFromNetwork(self):
+ """
+ If L{PeerConnectionPool.choosePerformer} is invoked when no workers
+ have spawned but some peers have connected, then it should choose a
+ connection from the network to perform it.
+ """
+
+ test_choosingPerformerFromNetwork.skip = "not implemented yet..."
+
+
+
class PeerConnectionPoolIntegrationTests(TestCase):
"""
L{PeerConnectionPool} is the service responsible for coordinating
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130104/643afc1d/attachment-0001.html>
More information about the calendarserver-changes
mailing list