[CalendarServer-changes] [5511] CalendarServer/branches/users/wsanchez/deployment/twistedcaldav

source_changes at macosforge.org source_changes at macosforge.org
Thu Apr 22 17:06:26 PDT 2010


Revision: 5511
          http://trac.macosforge.org/projects/calendarserver/changeset/5511
Author:   glyph at apple.com
Date:     2010-04-22 17:06:25 -0700 (Thu, 22 Apr 2010)
Log Message:
-----------
Initialize a new subsocket for each new process to make sure the accounting of how many outstanding connections there are is correct (avoid losing sockets due to race conditions).

Modified Paths:
--------------
    CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cluster.py
    CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/metafd.py
    CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendfdport.py

Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cluster.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cluster.py	2010-04-22 23:13:43 UTC (rev 5510)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cluster.py	2010-04-23 00:06:25 UTC (rev 5511)
@@ -64,9 +64,11 @@
 class TwistdSlaveProcess(object):
     prefix = "caldav"
 
+    metaSocket = None
+
     def __init__(self, twistd, tapname, configFile, id,
                  interfaces, port, sslPort,
-                 inheritFDs=None, inheritSSLFDs=None, metaSocket=None):
+                 inheritFDs=None, inheritSSLFDs=None, dispatcher=None):
 
         self.twistd = twistd
 
@@ -86,9 +88,22 @@
                 return x
         self.inheritFDs = emptyIfNone(inheritFDs)
         self.inheritSSLFDs = emptyIfNone(inheritSSLFDs)
-        self.metaSocket = metaSocket
+        self.dispatcher = dispatcher
+        self.resocket()
         self.interfaces = interfaces
 
+
+    def resocket(self):
+        """
+        Re-initialize the meta-socket, if this process is using one.
+        """
+        if self.metaSocket is not None:
+            self.dispatcher.removeSocket(self.metaSocket)
+            self.metaSocket = None
+        if self.dispatcher is not None:
+            self.metaSocket = self.dispatcher.addSocket()
+        
+
     def getName(self):
         if self.ports is not None:
             return '%s-%s' % (self.prefix, self.ports[0])
@@ -326,7 +341,28 @@
         )
 
 
+    def connectionLost(self, name):
+        """
+        A process with the given name has died.
+        """
+        procmon.ProcessMonitor.connectionLost(self, name)
+        for (processObject, env) in self.processObjects:
+            if processObject.getName() == name:
+                break
+        else:
+            # A process without a corresponding object; possibly memcached or
+            # something.  We don't need to manage it.
+            return
 
+        if processObject.metaSocket is not None:
+            processObject.resocket()
+            self._extraFDs[name] = processObject.getFileDescriptors()
+            args, uid, gid, env = self.processes[name]
+            self.processes[name] = processObject.getCommandLine(), uid, gid, env
+            processObject.dispatcher.startDispatching()
+
+
+
 def makeService_Combined(self, options):
 
 
@@ -486,7 +522,7 @@
         if config.UseMetaFD:
             port = None
             sslPort = None
-            extraArgs = dict(metaSocket=cl.dispatcher.addSocket())
+            extraArgs = dict(dispatcher=cl.dispatcher)
         else:
             extraArgs = dict(inheritFDs=inheritFDs,
                              inheritSSLFDs=inheritSSLFDs)

Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/metafd.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/metafd.py	2010-04-22 23:13:43 UTC (rev 5510)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/metafd.py	2010-04-23 00:06:25 UTC (rev 5511)
@@ -64,7 +64,6 @@
             self.fd, self.createTransport, self.reportingFactory
         )
         inheritedPort.startReading()
-        inheritedPort.reportStatus("0")
 
 
     def stopService(self):
@@ -183,22 +182,13 @@
         Determine a subprocess socket's status from its previous status and a
         status message.
         """
-        if message in ('-', '0'):
-            if message == '-':
-                # A connection has gone away in a subprocess; we should start
-                # accepting connections again if we paused (see
-                # newConnectionStatus)
-                result = self.intWithNoneAsZero(previousStatus) - 1
-            else:
-                # A new process just started accepting new connections; zero
-                # out its expected load.
-                result = 0
-            # If load has indeed decreased (i.e. in any case except 'a new,
-            # idle process replaced an old, idle process'), then start
-            # listening again.
-            if result < previousStatus:
-                for f in self.factories:
-                    f.serverService._port.startReading()
+        if message == '-':
+            # A connection has gone away in a subprocess; we should start
+            # accepting connections again if we paused (see
+            # newConnectionStatus)
+            result = self.intWithNoneAsZero(previousStatus) - 1
+            for f in self.factories:
+                f.serverService._port.startReading()
         else:
             # '+' is just an acknowledgement of newConnectionStatus, so we can
             # ignore it.

Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendfdport.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendfdport.py	2010-04-22 23:13:43 UTC (rev 5510)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendfdport.py	2010-04-23 00:06:25 UTC (rev 5511)
@@ -161,6 +161,7 @@
         Create a socket dispatcher.
         """
         self._subprocessSockets = []
+        self._subSocketMap = {} # map input-socket to _SubprocessSocket
         self.statusWatcher = statusWatcher
         from twisted.internet import reactor
         self.reactor = reactor
@@ -220,10 +221,25 @@
         i, o = socketpair(AF_UNIX, SOCK_DGRAM)
         a = _SubprocessSocket(self, o)
         self._subprocessSockets.append(a)
+        self._subSocketMap[i] = a
         return i
 
 
+    def removeSocket(self, skt):
+        """
+        Remove (and close) a socket previously added with C{addSocket}.
 
+        @param skt: the return value of L{InheritedSocketDispatcher.addSocket}.
+        @type skt: L{socket}
+        """
+        subsock = self._subSocketMap.pop(skt)
+        subsock.skt.close()
+        skt.close()
+        self._subprocessSockets.remove(subsock)
+        subsock.stopReading()
+        subsock.stopWriting()
+
+
 class InheritedPort(FileDescriptor, object):
     """
     Create this in the 'slave' process to handle incoming connections
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20100422/95fab0f5/attachment-0001.html>


More information about the calendarserver-changes mailing list