[CalendarServer-changes] [10214] CalendarServer/branches/users/glyph/queue-locking-and-timing

source_changes at macosforge.org source_changes at macosforge.org
Fri Jan 4 16:38:23 PST 2013


Revision: 10214
          http://trac.calendarserver.org//changeset/10214
Author:   glyph at apple.com
Date:     2013-01-04 16:38:23 -0800 (Fri, 04 Jan 2013)
Log Message:
-----------
fix some minor set-up bugs that integration testing revealed

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/queue-locking-and-timing/

Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py	2013-01-05 00:38:22 UTC (rev 10213)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py	2013-01-05 00:38:23 UTC (rev 10214)
@@ -83,7 +83,7 @@
 
 from zope.interface import implements
 
-from twisted.application.service import Service
+from twisted.application.service import MultiService
 from twisted.internet.protocol import Factory
 from twisted.internet.defer import (
     inlineCallbacks, returnValue, Deferred, succeed
@@ -466,6 +466,7 @@
     @IdentifyNode.responder
     def identifyPeer(self, host, port):
         self.peerPool.mapPeer(host, port, self)
+        return {}
 
 
 
@@ -843,7 +844,7 @@
 
 
 
-class PeerConnectionPool(Service, object):
+class PeerConnectionPool(MultiService, object):
     """
     Each node has a L{PeerConnectionPool} connecting it to all the other nodes
     currently active on the same database.
@@ -908,6 +909,7 @@
             the L{WorkItem}s that this L{PeerConnectionPool} will process.
         @type schema: L{Schema}
         """
+        super(PeerConnectionPool, self).__init__()
         self.reactor = reactor
         self.transactionFactory = transactionFactory
         self.hostname = self.getfqdn()
@@ -1100,7 +1102,8 @@
             f.buildProtocol = self.createPeerConnection
             # If this fails, the failure mode is going to be ugly, just like all
             # conflicted-port failures.  But, at least it won't proceed.
-            yield endpoint.listen(f)
+            self._listeningPortObject = yield endpoint.listen(f)
+            self.ampPort = self._listeningPortObject.getHost().port
             yield Lock.exclusive(NodeInfo.table).on(txn)
             nodes = yield self.activeNodes(txn)
             selves = [node for node in nodes
@@ -1123,6 +1126,7 @@
         @self._startingUp.addBoth
         def done(result):
             self._startingUp = None
+            super(PeerConnectionPool, self).startService()
             return result
 
 
@@ -1141,7 +1145,7 @@
         if self._currentWorkDeferred is not None:
             yield self._currentWorkDeferred
         for peer in self.peers:
-            peer.transport.loseConnection()
+            peer.transport.abortConnection()
 
 
     def activeNodes(self, txn):
@@ -1172,10 +1176,18 @@
         """
         f = Factory()
         f.buildProtocol = self.createPeerConnection
-        @passthru(node.endpoint(self.reactor).connect(f).addCallback)
-        def connected(proto):
-            self.mapPeer(node, proto)
-            proto.callRemote(IdentifyNode, self.thisProcess)
+        connected = node.endpoint(self.reactor).connect(f)
+        def whenConnected(proto):
+            self.mapPeer(node.hostname, node.port, proto)
+            proto.callRemote(IdentifyNode,
+                             host=self.thisProcess.hostname,
+                             port=self.thisProcess.port).addErrback(
+                                 noted, "identify"
+                             )
+        def noted(err, x="connect"):
+            log.msg("Could not {0} to cluster peer {1} because {2}"
+                    .format(x, node, str(err.value)))
+        connected.addCallbacks(whenConnected, noted)
 
 
     def createPeerConnection(self, addr):
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130104/081a949f/attachment.html>


More information about the calendarserver-changes mailing list