[CalendarServer-changes] [9671] CalendarServer/branches/users/glyph/q

source_changes at macosforge.org source_changes at macosforge.org
Sat Aug 11 01:55:41 PDT 2012


Revision: 9671
          http://trac.macosforge.org/projects/calendarserver/changeset/9671
Author:   glyph at apple.com
Date:     2012-08-11 01:55:41 -0700 (Sat, 11 Aug 2012)
Log Message:
-----------
null queuer, sketch of peer mapping

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/q/

Modified: CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:40 UTC (rev 9670)
+++ CalendarServer/branches/users/glyph/q/twext/enterprise/queue.py	2012-08-11 08:55:41 UTC (rev 9671)
@@ -88,7 +88,7 @@
     inlineCallbacks, returnValue, Deferred, succeed
 )
 from twisted.internet.endpoints import TCP4ClientEndpoint
-from twisted.protocols.amp import AMP, Command, Integer, Argument
+from twisted.protocols.amp import AMP, Command, Integer, Argument, String
 from twisted.python.reflect import qual
 from twisted.python import log
 
@@ -266,7 +266,22 @@
     response = []
 
 
+class IdentifyNode(Command):
+    """
+    Identify this node to its peer.  The connector knows which hostname it's
+    looking for, and which hostname it considers itself to be, only the
+    initiator (not the listener) issues this command.  This command is
+    necessary because if reverse DNS isn't set up perfectly, the listener may
+    not be able to identify its peer.
+    """
 
+    arguments = [
+        ("host", String()),
+        ("port", Integer()),
+    ]
+
+
+
 class SchemaAMP(AMP):
     """
     An AMP instance which also has a L{Schema} attached to it.
@@ -396,7 +411,12 @@
         return self.localWorkerPool.performWork(table, workID)
 
 
+    @IdentifyNode.responder
+    def identifyPeer(self, host, port):
+        self.peerPool.mapPeer(host, port, self)
 
+
+
 class WorkerConnectionPool(object):
     """
     A pool of L{ConnectionFromWorker}s.
@@ -775,6 +795,7 @@
         self.thisProcess = None
         self.workerPool = WorkerConnectionPool()
         self.peers = []
+        self.mappedPeers = {}
         self.schema = schema
         self._startingUp = None
         self._listeningPortObject = None
@@ -990,6 +1011,18 @@
             self._startConnectingTo(node)
 
 
+    def mapPeer(self, host, port, peer):
+        """
+        A peer has been identified as belonging to the given host/port
+        combination.  Disconnect any other peer that claims to be connected for
+        the same peer.
+        """
+        # if (host, port) in self.mappedPeers:
+            # TODO: think about this for race conditions
+            # self.mappedPeers.pop((host, port)).transport.loseConnection()
+        self.mappedPeers[(host, port)] = peer
+
+
     def _startConnectingTo(self, node):
         """
         Start an outgoing connection to another master process.
@@ -999,7 +1032,10 @@
         """
         f = Factory()
         f.buildProtocol = self.createPeerConnection
-        node.endpoint().connect(f)
+        @passthru(node.endpoint().connect(f).addCallback)
+        def connected(proto):
+            self.mapPeer(node, proto)
+            proto.callRemote(IdentifyNode, self.thisProcess)
 
 
     def createPeerConnection(self, addr):
@@ -1007,3 +1043,23 @@
 
 
 
+class NullQueuer(object):
+    """
+    When work is enqueued with this queuer, it is just executed immediately,
+    within the same transaction.  While this is technically correct, it is not
+    very efficient.
+    """
+
+    @inlineCallbacks
+    def enqueueWork(self, txn, workItemType, **kw):
+        """
+        Do this work immediately.
+
+        @see: L{PeerConnectionPool.enqueueWork}
+        """
+        item = yield self.workItemType.create(self.txn, **self.kw)
+        yield item.delete()
+        yield item.doWork()
+
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120811/acb6b591/attachment-0001.html>


More information about the calendarserver-changes mailing list