[CalendarServer-changes] [10234] CalendarServer/branches/users/glyph/queue-locking-and-timing
source_changes at macosforge.org
source_changes at macosforge.org
Fri Jan 4 16:38:48 PST 2013
Revision: 10234
http://trac.calendarserver.org//changeset/10234
Author: glyph at apple.com
Date: 2013-01-04 16:38:48 -0800 (Fri, 04 Jan 2013)
Log Message:
-----------
Choose performer from peer pool, utility for testing connections.
Modified Paths:
--------------
CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py
Property Changed:
----------------
CalendarServer/branches/users/glyph/queue-locking-and-timing/
Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py 2013-01-05 00:38:47 UTC (rev 10233)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py 2013-01-05 00:38:48 UTC (rev 10234)
@@ -459,6 +459,7 @@
by the host of this connection since the last L{ReportLoad} message.
@type _bonusLoad: L{int}
"""
+ implements(_IWorkPerformer)
def __init__(self, peerPool, boxReceiver=None, locator=None):
"""
@@ -1249,11 +1250,9 @@
@inlineCallbacks
def startup(txn):
endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
- f = Factory()
- f.buildProtocol = self.createPeerConnection
# If this fails, the failure mode is going to be ugly, just like all
# conflicted-port failures. But, at least it won't proceed.
- self._listeningPortObject = yield endpoint.listen(f)
+ self._listeningPortObject = yield endpoint.listen(self.peerFactory())
self.ampPort = self._listeningPortObject.getHost().port
yield Lock.exclusive(NodeInfo.table).on(txn)
nodes = yield self.activeNodes(txn)
@@ -1325,9 +1324,7 @@
@param node: a description of the master to connect to.
@type node: L{NodeInfo}
"""
- f = Factory()
- f.buildProtocol = self.createPeerConnection
- connected = node.endpoint(self.reactor).connect(f)
+ connected = node.endpoint(self.reactor).connect(self.peerFactory())
def whenConnected(proto):
self.mapPeer(node.hostname, node.port, proto)
proto.callRemote(IdentifyNode,
@@ -1341,11 +1338,32 @@
connected.addCallbacks(whenConnected, noted)
- def createPeerConnection(self, addr):
- return ConnectionFromPeerNode(self)
+ def peerFactory(self):
+ """
+ Factory for peer connections.
+ @return: a L{Factory} that will produce L{ConnectionFromPeerNode}
+ protocols attached to this L{PeerConnectionPool}.
+ """
+ return _PeerPoolFactory(self)
+
+class _PeerPoolFactory(Factory, object):
+ """
+ Protocol factory responsible for creating L{ConnectionFromPeerNode}
+ connections, both client and server.
+ """
+
+ def __init__(self, peerConnectionPool):
+ self.peerConnectionPool = peerConnectionPool
+
+
+ def buildProtocol(self, addr):
+ return ConnectionFromPeerNode(self.peerConnectionPool)
+
+
+
class ImmediateWorkProposal(object):
"""
Like L{WorkProposal}, but for items that must be executed immediately
Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py 2013-01-05 00:38:47 UTC (rev 10233)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py 2013-01-05 00:38:48 UTC (rev 10234)
@@ -45,6 +45,7 @@
from twext.enterprise.dal.record import Record
+from twext.enterprise.queue import ConnectionFromPeerNode
from zope.interface.verify import verifyObject
from twisted.test.proto_helpers import StringTransport
@@ -239,11 +240,81 @@
have spawned but some peers have connected, then it should choose a
connection from the network to perform it.
"""
+ peer = PeerConnectionPool(None, None, 4321, schema)
+ local = self.pcp.peerFactory().buildProtocol(None)
+ remote = peer.peerFactory().buildProtocol(None)
+ connection = Connection(local, remote)
+ connection.start()
+ self.checkPerformer(ConnectionFromPeerNode)
- test_choosingPerformerFromNetwork.skip = "not implemented yet..."
+class HalfConnection(object):
+ def __init__(self, protocol):
+ self.protocol = protocol
+ self.transport = StringTransport()
+
+ def start(self):
+ """
+ Hook up the protocol and the transport.
+ """
+ self.protocol.makeConnection(self.transport)
+
+
+ def extract(self):
+ """
+ Extract the data currently present in this protocol's output buffer.
+ """
+ io = self.transport.io
+ value = io.getvalue()
+ io.seek(0)
+ io.truncate()
+ return value
+
+
+ def deliver(self, data):
+ """
+ Deliver the given data to this L{HalfConnection}'s protocol's
+ C{dataReceived} method.
+
+ @return: a boolean indicating whether any data was delivered.
+ @rtype: L{bool}
+ """
+ if data:
+ self.protocol.dataReceived(data)
+ return True
+ return False
+
+
+
+class Connection(object):
+
+ def __init__(self, local, remote):
+ """
+ Connect two protocol instances to each other via string transports.
+ """
+ self.receiver = HalfConnection(local)
+ self.sender = HalfConnection(remote)
+
+
+ def start(self):
+ """
+ Start up the connection.
+ """
+ self.sender.start()
+ self.receiver.start()
+
+
+ def pump(self):
+ """
+ Relay data in one direction between the two connections.
+ """
+ self.receiver.deliver(self.sender.extract())
+ self.receiver, self.sender = self.sender, self.receiver
+
+
+
class PeerConnectionPoolIntegrationTests(TestCase):
"""
L{PeerConnectionPool} is the service responsible for coordinating
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130104/9791250e/attachment-0001.html>
More information about the calendarserver-changes
mailing list