[CalendarServer-changes] [10234] CalendarServer/branches/users/glyph/queue-locking-and-timing

source_changes at macosforge.org source_changes at macosforge.org
Fri Jan 4 16:38:48 PST 2013


Revision: 10234
          http://trac.calendarserver.org//changeset/10234
Author:   glyph at apple.com
Date:     2013-01-04 16:38:48 -0800 (Fri, 04 Jan 2013)
Log Message:
-----------
Choose performer from peer pool, utility for testing connections.

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
    CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/queue-locking-and-timing/

Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py	2013-01-05 00:38:47 UTC (rev 10233)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py	2013-01-05 00:38:48 UTC (rev 10234)
@@ -459,6 +459,7 @@
         by the host of this connection since the last L{ReportLoad} message.
     @type _bonusLoad: L{int}
     """
+    implements(_IWorkPerformer)
 
     def __init__(self, peerPool, boxReceiver=None, locator=None):
         """
@@ -1249,11 +1250,9 @@
         @inlineCallbacks
         def startup(txn):
             endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
-            f = Factory()
-            f.buildProtocol = self.createPeerConnection
             # If this fails, the failure mode is going to be ugly, just like all
             # conflicted-port failures.  But, at least it won't proceed.
-            self._listeningPortObject = yield endpoint.listen(f)
+            self._listeningPortObject = yield endpoint.listen(self.peerFactory())
             self.ampPort = self._listeningPortObject.getHost().port
             yield Lock.exclusive(NodeInfo.table).on(txn)
             nodes = yield self.activeNodes(txn)
@@ -1325,9 +1324,7 @@
         @param node: a description of the master to connect to.
         @type node: L{NodeInfo}
         """
-        f = Factory()
-        f.buildProtocol = self.createPeerConnection
-        connected = node.endpoint(self.reactor).connect(f)
+        connected = node.endpoint(self.reactor).connect(self.peerFactory())
         def whenConnected(proto):
             self.mapPeer(node.hostname, node.port, proto)
             proto.callRemote(IdentifyNode,
@@ -1341,11 +1338,32 @@
         connected.addCallbacks(whenConnected, noted)
 
 
-    def createPeerConnection(self, addr):
-        return ConnectionFromPeerNode(self)
+    def peerFactory(self):
+        """
+        Factory for peer connections.
 
+        @return: a L{Factory} that will produce L{ConnectionFromPeerNode}
+            protocols attached to this L{PeerConnectionPool}.
+        """
+        return _PeerPoolFactory(self)
 
 
+
+class _PeerPoolFactory(Factory, object):
+    """
+    Protocol factory responsible for creating L{ConnectionFromPeerNode}
+    connections, both client and server.
+    """
+
+    def __init__(self, peerConnectionPool):
+        self.peerConnectionPool = peerConnectionPool
+
+
+    def buildProtocol(self, addr):
+        return ConnectionFromPeerNode(self.peerConnectionPool)
+
+
+
 class ImmediateWorkProposal(object):
     """
     Like L{WorkProposal}, but for items that must be executed immediately

Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py	2013-01-05 00:38:47 UTC (rev 10233)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py	2013-01-05 00:38:48 UTC (rev 10234)
@@ -45,6 +45,7 @@
 
 from twext.enterprise.dal.record import Record
 
+from twext.enterprise.queue import ConnectionFromPeerNode
 from zope.interface.verify import verifyObject
 from twisted.test.proto_helpers import StringTransport
 
@@ -239,11 +240,81 @@
         have spawned but some peers have connected, then it should choose a
         connection from the network to perform it.
         """
+        peer = PeerConnectionPool(None, None, 4321, schema)
+        local = self.pcp.peerFactory().buildProtocol(None)
+        remote = peer.peerFactory().buildProtocol(None)
+        connection = Connection(local, remote)
+        connection.start()
+        self.checkPerformer(ConnectionFromPeerNode)
 
-    test_choosingPerformerFromNetwork.skip = "not implemented yet..."
 
 
+class HalfConnection(object):
+    def __init__(self, protocol):
+        self.protocol = protocol
+        self.transport = StringTransport()
 
+
+    def start(self):
+        """
+        Hook up the protocol and the transport.
+        """
+        self.protocol.makeConnection(self.transport)
+
+
+    def extract(self):
+        """
+        Extract the data currently present in this protocol's output buffer.
+        """
+        io = self.transport.io
+        value = io.getvalue()
+        io.seek(0)
+        io.truncate()
+        return value
+
+
+    def deliver(self, data):
+        """
+        Deliver the given data to this L{HalfConnection}'s protocol's
+        C{dataReceived} method.
+
+        @return: a boolean indicating whether any data was delivered.
+        @rtype: L{bool}
+        """
+        if data:
+            self.protocol.dataReceived(data)
+            return True
+        return False
+
+
+
+class Connection(object):
+
+    def __init__(self, local, remote):
+        """
+        Connect two protocol instances to each other via string transports.
+        """
+        self.receiver = HalfConnection(local)
+        self.sender = HalfConnection(remote)
+
+
+    def start(self):
+        """
+        Start up the connection.
+        """
+        self.sender.start()
+        self.receiver.start()
+
+
+    def pump(self):
+        """
+        Relay data in one direction between the two connections.
+        """
+        self.receiver.deliver(self.sender.extract())
+        self.receiver, self.sender = self.sender, self.receiver
+
+
+
 class PeerConnectionPoolIntegrationTests(TestCase):
     """
     L{PeerConnectionPool} is the service responsible for coordinating
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130104/9791250e/attachment-0001.html>


More information about the calendarserver-changes mailing list