[CalendarServer-changes] [10231] CalendarServer/branches/users/glyph/queue-locking-and-timing

source_changes at macosforge.org source_changes at macosforge.org
Fri Jan 4 16:38:44 PST 2013


Revision: 10231
          http://trac.calendarserver.org//changeset/10231
Author:   glyph at apple.com
Date:     2013-01-04 16:38:44 -0800 (Fri, 04 Jan 2013)
Log Message:
-----------
direct test for WorkItem.forTable, direct test for SchemaAMP, use Record for other test fixture for brevity

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
    CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/queue-locking-and-timing/

Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py	2013-01-05 00:38:43 UTC (rev 10230)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py	2013-01-05 00:38:44 UTC (rev 10231)
@@ -87,7 +87,7 @@
 from twisted.application.service import MultiService
 from twisted.internet.protocol import Factory
 from twisted.internet.defer import (
-    inlineCallbacks, returnValue, Deferred, succeed
+    inlineCallbacks, returnValue, Deferred
 )
 from twisted.internet.endpoints import TCP4ClientEndpoint
 from twisted.protocols.amp import AMP, Command, Integer, Argument, String
@@ -371,11 +371,13 @@
         @return: the relevant subclass
         @rtype: L{type}
         """
+        tableName = table.model.name
         for subcls in cls.__subclasses__():
-            if table == getattr(subcls, "table", None):
+            clstable = getattr(subcls, "table", None)
+            if table == clstable:
                 return subcls
         raise KeyError("No mapped {0} class for {1}.".format(
-            cls, table
+            cls, tableName
         ))
 
 
@@ -473,7 +475,6 @@
         @see: L{AMP.__init__}
         """
         self.peerPool = peerPool
-        self.localWorkerPool = peerPool.workerPool
         self._bonusLoad = 0
         self._reportedLoad = 0
         super(ConnectionFromPeerNode, self).__init__(peerPool.schema,
@@ -484,8 +485,7 @@
         """
         Report the current load for the local worker pool to this peer.
         """
-        return self.callRemote(ReportLoad,
-                               load=self.localWorkerPool.totalLoad())
+        return self.callRemote(ReportLoad, load=self.totalLoad())
 
 
     @ReportLoad.responder
@@ -559,7 +559,7 @@
 
         @return: a L{Deferred} that fires when the work has been completed.
         """
-        return self.localWorkerPool.performWork(table, workID)
+        return self.peerPool.performWorkForPeer(table, workID)
 
 
     @IdentifyNode.responder
@@ -611,7 +611,7 @@
         return False
 
 
-    def totalLoad(self):
+    def allWorkerLoad(self):
         """
         The total load of all currently connected workers.
         """
@@ -654,15 +654,12 @@
     """
     An individual connection from a worker, as seem from the master's
     perspective.  L{ConnectionFromWorker}s go into a L{WorkerConnectionPool}.
-
-    @ivar workerPool: The connection pool that this individual connection is
-        participating in.
-    @type workerPool: L{WorkerConnectionPool}
     """
 
-    def __init__(self, schema, workerPool, boxReceiver=None, locator=None):
-        self.workerPool = workerPool
-        super(ConnectionFromWorker, self).__init__(schema, boxReceiver, locator)
+    def __init__(self, peerPool, boxReceiver=None, locator=None):
+        super(ConnectionFromWorker, self).__init__(peerPool.schema, boxReceiver,
+                                                   locator)
+        self.peerPool = peerPool
         self._load = 0
 
 
@@ -680,7 +677,7 @@
         state.
         """
         result = super(ConnectionFromWorker, self).startReceivingBoxes(sender)
-        self.workerPool.addWorker(self)
+        self.peerPool.workerPool.addWorker(self)
         return result
 
 
@@ -689,7 +686,7 @@
         AMP boxes will no longer be received.
         """
         result = super(ConnectionFromWorker, self).stopReceivingBoxes(reason)
-        self.workerPool.removeWorker(self)
+        self.peerPool.workerPool.removeWorker(self)
         return result
 
 
@@ -1075,13 +1072,16 @@
         self.peers.append(peer)
 
 
+    def totalLoad(self):
+        return self.workerPool.allWorkerLoad()
+
+
     def workerListenerFactory(self):
         """
         Factory that listens for connections from workers.
         """
         f = Factory()
-        f.buildProtocol = lambda addr: ConnectionFromWorker(self.schema,
-                                                            self.workerPool)
+        f.buildProtocol = lambda addr: ConnectionFromWorker(self)
         return f
 
 
@@ -1092,7 +1092,7 @@
         self.peers.remove(peer)
 
 
-    def choosePerformer(self):
+    def choosePerformer(self, onlyLocally=False):
         """
         Choose a peer to distribute work to based on the current known slot
         occupancy of the other nodes.  Note that this will prefer distributing
@@ -1106,12 +1106,20 @@
         """
         if self.workerPool.hasAvailableCapacity():
             return self.workerPool
-        if self.peers:
+        if self.peers and not onlyLocally:
             return sorted(self.peers, lambda p: p.currentLoadEstimate())[0]
         else:
             return ImmediatePerformer(self.transactionFactory)
 
 
+    def performWorkForPeer(self, table, workID):
+        """
+        A peer has requested us to perform some work; choose a work performer
+        local to this node, and then execute it.
+        """
+        return self.choosePerformer(onlyLocally=True).performWork(table, workID)
+
+
     def enqueueWork(self, txn, workItemType, **kw):
         """
         There is some work to do.  Do it, someplace else, ideally in parallel.

Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py	2013-01-05 00:38:43 UTC (rev 10230)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py	2013-01-05 00:38:44 UTC (rev 10231)
@@ -1,4 +1,5 @@
 ##
+from twext.enterprise.dal.record import Record
 # Copyright (c) 2012 Apple Inc. All rights reserved.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
@@ -21,6 +22,10 @@
 import datetime
 
 # TODO: There should be a store-building utility within twext.enterprise.
+from twext.enterprise.queue import SchemaAMP
+from twisted.test.proto_helpers import StringTransportWithDisconnection
+from twext.enterprise.queue import TableSyntaxByName
+from twisted.protocols.amp import Command
 from txdav.common.datastore.test.util import buildStore
 
 from twext.enterprise.dal.syntax import SchemaSyntax
@@ -34,7 +39,6 @@
 
 from twisted.application.service import Service, MultiService
 
-from twext.enterprise.dal.syntax import Insert
 from twext.enterprise.queue import ImmediatePerformer, _IWorkPerformer
 from twext.enterprise.queue import WorkerConnectionPool
 from zope.interface.verify import verifyObject
@@ -105,24 +109,70 @@
            for table in schema]
 
 
+class DummyWorkDone(Record, fromTable(schema.DUMMY_WORK_DONE)):
+    """
+    Work result.
+    """
 
+
+
 class DummyWorkItem(WorkItem, fromTable(schema.DUMMY_WORK_ITEM)):
     """
     Sample L{WorkItem} subclass that adds two integers together and stores them
     in another table.
     """
-    group = None
 
     def doWork(self):
-        # Perform the work.
-        result = self.a + self.b
-        # Store the result.
-        return (Insert({schema.DUMMY_WORK_DONE.WORK_ID: self.workID,
-                        schema.DUMMY_WORK_DONE.A_PLUS_B: result})
-                .on(self.transaction))
+        return DummyWorkDone.create(self.transaction, workID=self.workID,
+                                    aPlusB=self.a + self.b)
 
 
 
+class SchemaAMPTests(TestCase):
+    """
+    Tests for L{SchemaAMP} faithfully relaying tables across the wire.
+    """
+
+    def test_sendTableWithName(self):
+        """
+        You can send a reference to a table through a L{SchemaAMP} via
+        L{TableSyntaxByName}.
+        """
+        client = SchemaAMP(schema)
+        class SampleCommand(Command):
+            arguments = [('table', TableSyntaxByName())]
+        class Receiver(SchemaAMP):
+            @SampleCommand.responder
+            def gotIt(self, table):
+                self.it = table
+                return {}
+        server = Receiver(schema)
+        clientT = StringTransport()
+        serverT = StringTransport()
+        client.makeConnection(clientT)
+        server.makeConnection(serverT)
+        client.callRemote(SampleCommand, table=schema.DUMMY_WORK_ITEM)
+        server.dataReceived(clientT.io.getvalue())
+        self.assertEqual(server.it, schema.DUMMY_WORK_ITEM)
+
+
+
+
+class WorkItemTests(TestCase):
+    """
+    A L{WorkItem} is an item of work that can be executed.
+    """
+
+    def test_forTable(self):
+        """
+        L{WorkItem.forTable} returns L{WorkItem} subclasses mapped to the given
+        table.
+        """
+        self.assertIdentical(WorkItem.forTable(schema.DUMMY_WORK_ITEM),
+                             DummyWorkItem)
+
+
+
 class WorkerConnectionPoolTests(TestCase):
     """
     A L{WorkerConnectionPool} is responsible for managing, in a node's
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130104/108a3516/attachment-0001.html>


More information about the calendarserver-changes mailing list