[CalendarServer-changes] [9671] CalendarServer/branches/users/glyph/q
source_changes at macosforge.org
source_changes at macosforge.org
Sat Aug 11 01:55:41 PDT 2012
Revision: 9671
http://trac.macosforge.org/projects/calendarserver/changeset/9671
Author: glyph at apple.com
Date: 2012-08-11 01:55:41 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
null queuer, sketch of peer mapping
Modified Paths:
--------------
CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
Property Changed:
----------------
CalendarServer/branches/users/glyph/q/
Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:40 UTC (rev 9670)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py 2012-08-11 08:55:41 UTC (rev 9671)
@@ -88,7 +88,7 @@
inlineCallbacks, returnValue, Deferred, succeed
)
from twisted.internet.endpoints import TCP4ClientEndpoint
-from twisted.protocols.amp import AMP, Command, Integer, Argument
+from twisted.protocols.amp import AMP, Command, Integer, Argument, String
from twisted.python.reflect import qual
from twisted.python import log
@@ -266,7 +266,22 @@
response = []
+class IdentifyNode(Command):
+ """
+ Identify this node to its peer. The connector knows which hostname it's
+ looking for, and which hostname it considers itself to be, only the
+ initiator (not the listener) issues this command. This command is
+ necessary because if reverse DNS isn't set up perfectly, the listener may
+ not be able to identify its peer.
+ """
+ arguments = [
+ ("host", String()),
+ ("port", Integer()),
+ ]
+
+
+
class SchemaAMP(AMP):
"""
An AMP instance which also has a L{Schema} attached to it.
@@ -396,7 +411,12 @@
return self.localWorkerPool.performWork(table, workID)
+ @IdentifyNode.responder
+ def identifyPeer(self, host, port):
+ self.peerPool.mapPeer(host, port, self)
+
+
class WorkerConnectionPool(object):
"""
A pool of L{ConnectionFromWorker}s.
@@ -775,6 +795,7 @@
self.thisProcess = None
self.workerPool = WorkerConnectionPool()
self.peers = []
+ self.mappedPeers = {}
self.schema = schema
self._startingUp = None
self._listeningPortObject = None
@@ -990,6 +1011,18 @@
self._startConnectingTo(node)
+ def mapPeer(self, host, port, peer):
+ """
+ A peer has been identified as belonging to the given host/port
+ combination. Disconnect any other peer that claims to be connected for
+ the same peer.
+ """
+ # if (host, port) in self.mappedPeers:
+ # TODO: think about this for race conditions
+ # self.mappedPeers.pop((host, port)).transport.loseConnection()
+ self.mappedPeers[(host, port)] = peer
+
+
def _startConnectingTo(self, node):
"""
Start an outgoing connection to another master process.
@@ -999,7 +1032,10 @@
"""
f = Factory()
f.buildProtocol = self.createPeerConnection
- node.endpoint().connect(f)
+ @passthru(node.endpoint().connect(f).addCallback)
+ def connected(proto):
+ self.mapPeer(node, proto)
+ proto.callRemote(IdentifyNode, self.thisProcess)
def createPeerConnection(self, addr):
@@ -1007,3 +1043,23 @@
+class NullQueuer(object):
+ """
+ When work is enqueued with this queuer, it is just executed immediately,
+ within the same transaction. While this is technically correct, it is not
+ very efficient.
+ """
+
+ @inlineCallbacks
+ def enqueueWork(self, txn, workItemType, **kw):
+ """
+ Do this work immediately.
+
+ @see: L{PeerConnectionPool.enqueueWork}
+ """
+ item = yield self.workItemType.create(self.txn, **self.kw)
+ yield item.delete()
+ yield item.doWork()
+
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/acb6b591/attachment-0001.html>
More information about the calendarserver-changes
mailing list