[CalendarServer-changes] [10231] CalendarServer/branches/users/glyph/queue-locking-and-timing
source_changes at macosforge.org
source_changes at macosforge.org
Fri Jan 4 16:38:44 PST 2013
Revision: 10231
http://trac.calendarserver.org//changeset/10231
Author: glyph at apple.com
Date: 2013-01-04 16:38:44 -0800 (Fri, 04 Jan 2013)
Log Message:
-----------
direct test for WorkItem.forTable, direct test for SchemaAMP, use Record for other test fixture for brevity
Modified Paths:
--------------
CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py
Property Changed:
----------------
CalendarServer/branches/users/glyph/queue-locking-and-timing/
Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py 2013-01-05 00:38:43 UTC (rev 10230)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py 2013-01-05 00:38:44 UTC (rev 10231)
@@ -87,7 +87,7 @@
from twisted.application.service import MultiService
from twisted.internet.protocol import Factory
from twisted.internet.defer import (
- inlineCallbacks, returnValue, Deferred, succeed
+ inlineCallbacks, returnValue, Deferred
)
from twisted.internet.endpoints import TCP4ClientEndpoint
from twisted.protocols.amp import AMP, Command, Integer, Argument, String
@@ -371,11 +371,13 @@
@return: the relevant subclass
@rtype: L{type}
"""
+ tableName = table.model.name
for subcls in cls.__subclasses__():
- if table == getattr(subcls, "table", None):
+ clstable = getattr(subcls, "table", None)
+ if table == clstable:
return subcls
raise KeyError("No mapped {0} class for {1}.".format(
- cls, table
+ cls, tableName
))
@@ -473,7 +475,6 @@
@see: L{AMP.__init__}
"""
self.peerPool = peerPool
- self.localWorkerPool = peerPool.workerPool
self._bonusLoad = 0
self._reportedLoad = 0
super(ConnectionFromPeerNode, self).__init__(peerPool.schema,
@@ -484,8 +485,7 @@
"""
Report the current load for the local worker pool to this peer.
"""
- return self.callRemote(ReportLoad,
- load=self.localWorkerPool.totalLoad())
+ return self.callRemote(ReportLoad, load=self.totalLoad())
@ReportLoad.responder
@@ -559,7 +559,7 @@
@return: a L{Deferred} that fires when the work has been completed.
"""
- return self.localWorkerPool.performWork(table, workID)
+ return self.peerPool.performWorkForPeer(table, workID)
@IdentifyNode.responder
@@ -611,7 +611,7 @@
return False
- def totalLoad(self):
+ def allWorkerLoad(self):
"""
The total load of all currently connected workers.
"""
@@ -654,15 +654,12 @@
"""
An individual connection from a worker, as seem from the master's
perspective. L{ConnectionFromWorker}s go into a L{WorkerConnectionPool}.
-
- @ivar workerPool: The connection pool that this individual connection is
- participating in.
- @type workerPool: L{WorkerConnectionPool}
"""
- def __init__(self, schema, workerPool, boxReceiver=None, locator=None):
- self.workerPool = workerPool
- super(ConnectionFromWorker, self).__init__(schema, boxReceiver, locator)
+ def __init__(self, peerPool, boxReceiver=None, locator=None):
+ super(ConnectionFromWorker, self).__init__(peerPool.schema, boxReceiver,
+ locator)
+ self.peerPool = peerPool
self._load = 0
@@ -680,7 +677,7 @@
state.
"""
result = super(ConnectionFromWorker, self).startReceivingBoxes(sender)
- self.workerPool.addWorker(self)
+ self.peerPool.workerPool.addWorker(self)
return result
@@ -689,7 +686,7 @@
AMP boxes will no longer be received.
"""
result = super(ConnectionFromWorker, self).stopReceivingBoxes(reason)
- self.workerPool.removeWorker(self)
+ self.peerPool.workerPool.removeWorker(self)
return result
@@ -1075,13 +1072,16 @@
self.peers.append(peer)
+ def totalLoad(self):
+ return self.workerPool.allWorkerLoad()
+
+
def workerListenerFactory(self):
"""
Factory that listens for connections from workers.
"""
f = Factory()
- f.buildProtocol = lambda addr: ConnectionFromWorker(self.schema,
- self.workerPool)
+ f.buildProtocol = lambda addr: ConnectionFromWorker(self)
return f
@@ -1092,7 +1092,7 @@
self.peers.remove(peer)
- def choosePerformer(self):
+ def choosePerformer(self, onlyLocally=False):
"""
Choose a peer to distribute work to based on the current known slot
occupancy of the other nodes. Note that this will prefer distributing
@@ -1106,12 +1106,20 @@
"""
if self.workerPool.hasAvailableCapacity():
return self.workerPool
- if self.peers:
+ if self.peers and not onlyLocally:
return sorted(self.peers, lambda p: p.currentLoadEstimate())[0]
else:
return ImmediatePerformer(self.transactionFactory)
+ def performWorkForPeer(self, table, workID):
+ """
+ A peer has requested us to perform some work; choose a work performer
+ local to this node, and then execute it.
+ """
+ return self.choosePerformer(onlyLocally=True).performWork(table, workID)
+
+
def enqueueWork(self, txn, workItemType, **kw):
"""
There is some work to do. Do it, someplace else, ideally in parallel.
Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py 2013-01-05 00:38:43 UTC (rev 10230)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py 2013-01-05 00:38:44 UTC (rev 10231)
@@ -1,4 +1,5 @@
##
+from twext.enterprise.dal.record import Record
# Copyright (c) 2012 Apple Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,6 +22,10 @@
import datetime
# TODO: There should be a store-building utility within twext.enterprise.
+from twext.enterprise.queue import SchemaAMP
+from twisted.test.proto_helpers import StringTransportWithDisconnection
+from twext.enterprise.queue import TableSyntaxByName
+from twisted.protocols.amp import Command
from txdav.common.datastore.test.util import buildStore
from twext.enterprise.dal.syntax import SchemaSyntax
@@ -34,7 +39,6 @@
from twisted.application.service import Service, MultiService
-from twext.enterprise.dal.syntax import Insert
from twext.enterprise.queue import ImmediatePerformer, _IWorkPerformer
from twext.enterprise.queue import WorkerConnectionPool
from zope.interface.verify import verifyObject
@@ -105,24 +109,70 @@
for table in schema]
+class DummyWorkDone(Record, fromTable(schema.DUMMY_WORK_DONE)):
+ """
+ Work result.
+ """
+
+
class DummyWorkItem(WorkItem, fromTable(schema.DUMMY_WORK_ITEM)):
"""
Sample L{WorkItem} subclass that adds two integers together and stores them
in another table.
"""
- group = None
def doWork(self):
- # Perform the work.
- result = self.a + self.b
- # Store the result.
- return (Insert({schema.DUMMY_WORK_DONE.WORK_ID: self.workID,
- schema.DUMMY_WORK_DONE.A_PLUS_B: result})
- .on(self.transaction))
+ return DummyWorkDone.create(self.transaction, workID=self.workID,
+ aPlusB=self.a + self.b)
+class SchemaAMPTests(TestCase):
+ """
+ Tests for L{SchemaAMP} faithfully relaying tables across the wire.
+ """
+
+ def test_sendTableWithName(self):
+ """
+ You can send a reference to a table through a L{SchemaAMP} via
+ L{TableSyntaxByName}.
+ """
+ client = SchemaAMP(schema)
+ class SampleCommand(Command):
+ arguments = [('table', TableSyntaxByName())]
+ class Receiver(SchemaAMP):
+ @SampleCommand.responder
+ def gotIt(self, table):
+ self.it = table
+ return {}
+ server = Receiver(schema)
+ clientT = StringTransport()
+ serverT = StringTransport()
+ client.makeConnection(clientT)
+ server.makeConnection(serverT)
+ client.callRemote(SampleCommand, table=schema.DUMMY_WORK_ITEM)
+ server.dataReceived(clientT.io.getvalue())
+ self.assertEqual(server.it, schema.DUMMY_WORK_ITEM)
+
+
+
+
+class WorkItemTests(TestCase):
+ """
+ A L{WorkItem} is an item of work that can be executed.
+ """
+
+ def test_forTable(self):
+ """
+ L{WorkItem.forTable} returns L{WorkItem} subclasses mapped to the given
+ table.
+ """
+ self.assertIdentical(WorkItem.forTable(schema.DUMMY_WORK_ITEM),
+ DummyWorkItem)
+
+
+
class WorkerConnectionPoolTests(TestCase):
"""
A L{WorkerConnectionPool} is responsible for managing, in a node's
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130104/108a3516/attachment-0001.html>
More information about the calendarserver-changes
mailing list