[CalendarServer-changes] [4554] CalendarServer/branches/users/sagen/deployment-inherit-fds-4549/ twistedcaldav

source_changes at macosforge.org source_changes at macosforge.org
Thu Sep 24 13:32:08 PDT 2009


Revision: 4554
          http://trac.macosforge.org/projects/calendarserver/changeset/4554
Author:   sagen at apple.com
Date:     2009-09-24 13:32:07 -0700 (Thu, 24 Sep 2009)
Log Message:
-----------
If the software load balancer is disabled and there are multiple slaves, the master process will open all the necessary sockets once, and slaves will inherit them.

Modified Paths:
--------------
    CalendarServer/branches/users/sagen/deployment-inherit-fds-4549/twistedcaldav/cluster.py
    CalendarServer/branches/users/sagen/deployment-inherit-fds-4549/twistedcaldav/config.py
    CalendarServer/branches/users/sagen/deployment-inherit-fds-4549/twistedcaldav/tap.py

Modified: CalendarServer/branches/users/sagen/deployment-inherit-fds-4549/twistedcaldav/cluster.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inherit-fds-4549/twistedcaldav/cluster.py	2009-09-24 20:26:03 UTC (rev 4553)
+++ CalendarServer/branches/users/sagen/deployment-inherit-fds-4549/twistedcaldav/cluster.py	2009-09-24 20:32:07 UTC (rev 4554)
@@ -17,6 +17,8 @@
 import os
 import sys
 import tempfile
+import socket
+import time
 
 from twisted.runner import procmon
 from twisted.application import internet, service
@@ -54,17 +56,24 @@
 class TwistdSlaveProcess(object):
     prefix = "caldav"
 
-    def __init__(self, twistd, tapname, configFile,
-                 interfaces, port, sslPort):
+    def __init__(self, twistd, tapname, configFile, id,
+                 interfaces, port, sslPort,
+                 inheritFDs=None, inheritSSLFDs=None):
+
         self.twistd = twistd
 
         self.tapname = tapname
 
         self.configFile = configFile
 
+        self.id = id
+
         self.ports = port
         self.sslPorts = sslPort
 
+        self.inheritFDs = inheritFDs
+        self.inheritSSLFDs = inheritSSLFDs
+
         self.interfaces = interfaces
 
     def getName(self):
@@ -72,6 +81,8 @@
             return '%s-%s' % (self.prefix, self.ports[0])
         elif self.sslPorts is not None:
             return '%s-%s' % (self.prefix, self.sslPorts[0])
+        elif self.inheritFDs or self.inheritSSLFDs:
+            return '%s-%s' % (self.prefix, self.id)
 
         raise ConfigurationError(
             "Can't create TwistdSlaveProcess without a TCP Port")
@@ -118,9 +129,16 @@
                     '-o',
                     'BindSSLPorts=%s' % (','.join(map(str, self.sslPorts)),)])
 
+        if self.inheritFDs:
+            args.extend([
+                    '-o',
+                    'InheritFDs=%s' % (','.join(map(str, self.inheritFDs)),)])
 
+        if self.inheritSSLFDs:
+            args.extend([
+                    '-o',
+                    'InheritSSLFDs=%s' % (','.join(map(str, self.inheritSSLFDs)),)])
 
-
         return args
 
     def getHostLine(self, ssl=False):
@@ -133,6 +151,9 @@
         if ssl and self.sslPorts is not None:
             port = self.sslPorts
 
+        if self.inheritFDs or self.inheritSSLFDs:
+            port = [self.id]
+
         if port is None:
             raise ConfigurationError(
                 "Can not add a host without a port")
@@ -186,6 +207,28 @@
         except process.ProcessExitedAlready:
             pass
 
+    def startProcess(self, name):
+        if self.protocols.has_key(name):
+            return
+        p = self.protocols[name] = procmon.LoggingProtocol()
+        p.service = self
+        p.name = name
+        args, uid, gid, env = self.processes[name]
+        self.timeStarted[name] = time.time()
+
+        childFDs = { 0 : "w", 1 : "r", 2 : "r" }
+
+        # Examine args for -o InheritFDs= and -o InheritSSLFDs=
+        # Add any file descriptors listed in those args to the childFDs
+        # dictionary so those don't get closed across the spawn.
+        for i in xrange(len(args)-1):
+            if args[i] == "-o" and args[i+1].startswith("Inherit"):
+                for fd in map(int, args[i+1].split("=")[1].split(",")):
+                    childFDs[fd] = fd
+
+        reactor.spawnProcess(p, args[0], args, uid=uid, gid=gid, env=env,
+            childFDs=childFDs)
+
 def makeService_Combined(self, options):
     s = service.MultiService()
     monitor = DelayedStartupProcessMonitor()
@@ -205,6 +248,9 @@
 
     bindAddress = ['127.0.0.1']
 
+    inheritFDs = []
+    inheritSSLFDs = []
+
     # Attempt to calculate the number of processes to use
     # 1 per processor
 
@@ -234,19 +280,64 @@
         if config.BindSSLPorts:
             sslPort = config.BindSSLPorts
 
+
     if port[0] == 0:
         port = None
 
     if sslPort[0] == 0:
         sslPort = None
 
-    # If the load balancer isn't enabled, or if we only have one process
-    # We listen directly on the interfaces.
+    # If we only have one process, disable the software load balancer and
+    # listen directly on the interfaces.
 
-    if ((not config.MultiProcess['LoadBalancer']['Enabled']) or
-        (config.MultiProcess['ProcessCount'] == 1)):
+    if config.MultiProcess['ProcessCount'] == 1:
+        config.MultiProcess['LoadBalancer']['Enabled'] = False
         bindAddress = config.BindAddresses
 
+    elif not config.MultiProcess['LoadBalancer']['Enabled']:
+        # We are multiprocess but not using load balancer, open the socket(s)
+        # to be inherited by the slaves
+
+        if not config.BindAddresses:
+            config.BindAddresses = [""]
+
+        s._inheritedSockets = [] # keep a reference to these so they don't close
+
+        for bindAddress in config.BindAddresses:
+            if config.BindHTTPPorts:
+                if config.HTTPPort == 0:
+                    raise UsageError(
+                        "HTTPPort required if BindHTTPPorts is not empty"
+                    )
+            elif config.HTTPPort != 0:
+                config.BindHTTPPorts = [config.HTTPPort]
+
+            if config.BindSSLPorts:
+                if config.SSLPort == 0:
+                    raise UsageError(
+                        "SSLPort required if BindSSLPorts is not empty"
+                    )
+            elif config.SSLPort != 0:
+                config.BindSSLPorts = [config.SSLPort]
+
+            def _openSocket(addr, port):
+                log.info("Opening socket for inheritance at %s:%d" % (addr, port))
+                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                sock.setblocking(0)
+                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+                sock.bind((addr, port))
+                sock.listen(config.ListenBacklog)
+                s._inheritedSockets.append(sock)
+                return sock
+
+            for portNum in config.BindHTTPPorts:
+                sock = _openSocket(bindAddress, int(portNum))
+                inheritFDs.append(sock.fileno())
+
+            for portNum in config.BindSSLPorts:
+                sock = _openSocket(bindAddress, int(portNum))
+                inheritSSLFDs.append(sock.fileno())
+
     for p in xrange(0, config.MultiProcess['ProcessCount']):
         if config.MultiProcess['ProcessCount'] > 1:
             if port is not None:
@@ -255,11 +346,21 @@
             if sslPort is not None:
                 sslPort = [sslPort[0] + 1]
 
+        if inheritFDs:
+            port = None
+
+        if inheritSSLFDs:
+            sslPort = None
+
         process = TwistdSlaveProcess(config.Twisted['twistd'],
                                      self.tapname,
                                      options['config'],
+                                     p,
                                      bindAddress,
-                                     port, sslPort)
+                                     port, sslPort,
+                                     inheritFDs=inheritFDs,
+                                     inheritSSLFDs=inheritSSLFDs
+                                     )
 
         monitor.addProcess(process.getName(),
                            process.getCommandLine(),
@@ -340,8 +441,6 @@
                                      config.PythonDirector['pydir'],
                                      fname],
                            env=parentEnv)
-
-
     if config.Memcached["ServerEnabled"]:
         log.msg("Adding memcached service")
 

Modified: CalendarServer/branches/users/sagen/deployment-inherit-fds-4549/twistedcaldav/config.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inherit-fds-4549/twistedcaldav/config.py	2009-09-24 20:26:03 UTC (rev 4553)
+++ CalendarServer/branches/users/sagen/deployment-inherit-fds-4549/twistedcaldav/config.py	2009-09-24 20:32:07 UTC (rev 4554)
@@ -100,6 +100,8 @@
     "BindAddresses": [],   # List of IP addresses to bind to [empty = all]
     "BindHTTPPorts": [],   # List of port numbers to bind to for HTTP [empty = same as "Port"]
     "BindSSLPorts" : [],   # List of port numbers to bind to for SSL [empty = same as "SSLPort"]
+    "InheritFDs"   : [],   # File descriptors to inherit for HTTP requests (empty = don't inherit)
+    "InheritSSLFDs": [],   # File descriptors to inherit for HTTPS requests (empty = don't inherit)
 
     #
     # Data store

Modified: CalendarServer/branches/users/sagen/deployment-inherit-fds-4549/twistedcaldav/tap.py
===================================================================
--- CalendarServer/branches/users/sagen/deployment-inherit-fds-4549/twistedcaldav/tap.py	2009-09-24 20:26:03 UTC (rev 4553)
+++ CalendarServer/branches/users/sagen/deployment-inherit-fds-4549/twistedcaldav/tap.py	2009-09-24 20:32:07 UTC (rev 4554)
@@ -16,10 +16,12 @@
 
 import os
 import stat
+import socket
 
 from zope.interface import implements
 
 from twisted.internet.address import IPv4Address
+from twisted.internet import tcp, ssl
 
 from twisted.python.log import FileLogObserver
 from twisted.python.usage import Options, UsageError
@@ -418,7 +420,56 @@
 
         self._context = ctx
 
+class InheritedPort(tcp.Port):
 
+    def __init__(self, fd, factory, reactor):
+        tcp.Port.__init__(self, 0, factory, reactor=reactor)
+        # MOR: careful because fromfd dup()'s the socket, so we need to
+        # make sure we don't leak file descriptors
+        self.socket = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
+        self._realPortNumber = self.port = self.socket.getsockname()[1]
+
+    def createInternetSocket(self):
+        return self.socket
+
+    def startListening(self):
+        log.msg("%s starting on %s" % (self.factory.__class__, self._realPortNumber))
+        self.factory.doStart()
+        self.connected = 1
+        self.fileno = self.socket.fileno
+        self.numberAccepts = 100
+        self.startReading()
+
+class InheritedSSLPort(InheritedPort):
+    _socketShutdownMethod = 'sock_shutdown'
+
+    transport = ssl.Server
+
+    def __init__(self, fd, factory, ctxFactory, reactor):
+        InheritedPort.__init__(self, fd, factory, reactor)
+        self.ctxFactory = ctxFactory
+        self.socket = SSL.Connection(self.ctxFactory.getContext(), self.socket)
+
+    def _preMakeConnection(self, transport):
+        transport._startTLS()
+        return tcp.Port._preMakeConnection(self, transport)
+
+class InheritTCPServer(internet.TCPServer):
+
+    def _getPort(self):
+        from twisted.internet import reactor
+        port = InheritedPort(self.args[0], self.args[1], reactor)
+        port.startListening()
+        return port
+
+class InheritSSLServer(internet.SSLServer):
+
+    def _getPort(self):
+        from twisted.internet import reactor
+        port = InheritedSSLPort(self.args[0], self.args[1], self.args[2], reactor)
+        port.startListening()
+        return port
+
 class CalDAVServiceMaker(object):
     implements(IPlugin, service.IServiceMaker)
 
@@ -659,38 +710,21 @@
 
         config.addHook(updateChannel)
 
-        if not config.BindAddresses:
-            config.BindAddresses = [""]
 
-        for bindAddress in config.BindAddresses:
-            if config.BindHTTPPorts:
-                if config.HTTPPort == 0:
-                    raise UsageError(
-                        "HTTPPort required if BindHTTPPorts is not empty"
-                    )
-            elif config.HTTPPort != 0:
-                    config.BindHTTPPorts = [config.HTTPPort]
+        # If inheriting file descriptors from the master, use those to handle
+        # requests instead of opening ports.
 
-            if config.BindSSLPorts:
-                if config.SSLPort == 0:
-                    raise UsageError(
-                        "SSLPort required if BindSSLPorts is not empty"
-                    )
-            elif config.SSLPort != 0:
-                config.BindSSLPorts = [config.SSLPort]
-
-            for port in config.BindHTTPPorts:
-                log.info("Adding server at %s:%s" % (bindAddress, port))
-
-                httpService = internet.TCPServer(
-                    int(port), channel,
-                    interface=bindAddress,
+        if config.InheritFDs or config.InheritSSLFDs:
+            for fd in config.InheritFDs:
+                fd = int(fd)
+                inheritedService = InheritTCPServer(
+                    fd, channel,
                     backlog=config.ListenBacklog
                 )
-                httpService.setServiceParent(service)
+                inheritedService.setServiceParent(service)
 
-            for port in config.BindSSLPorts:
-                log.info("Adding SSL server at %s:%s" % (bindAddress, port))
+            for fd in config.InheritSSLFDs:
+                fd = int(fd)
 
                 try:
                     contextFactory = ChainingOpenSSLContextFactory(
@@ -701,15 +735,69 @@
                     )
                 except SSL.Error, e:
                     log.error("Unable to set up SSL context factory: %s" % (e,))
-                    log.error("Disabling SSL port: %s" % (port,))
                 else:
-                    httpsService = internet.SSLServer(
+                    inheritedService = InheritSSLServer(
+                        fd, channel,
+                        contextFactory,
+                        backlog=config.ListenBacklog
+                    )
+                    inheritedService.setServiceParent(service)
+
+
+        else: # Not inheriting, therefore open our own:
+
+            if not config.BindAddresses:
+                config.BindAddresses = [""]
+
+            for bindAddress in config.BindAddresses:
+                if config.BindHTTPPorts:
+                    if config.HTTPPort == 0:
+                        raise UsageError(
+                            "HTTPPort required if BindHTTPPorts is not empty"
+                        )
+                elif config.HTTPPort != 0:
+                    config.BindHTTPPorts = [config.HTTPPort]
+
+                if config.BindSSLPorts:
+                    if config.SSLPort == 0:
+                        raise UsageError(
+                            "SSLPort required if BindSSLPorts is not empty"
+                        )
+                elif config.SSLPort != 0:
+                    config.BindSSLPorts = [config.SSLPort]
+
+                for port in config.BindHTTPPorts:
+                    log.info("Adding server at %s:%s" % (bindAddress, port))
+
+                    httpService = internet.TCPServer(
                         int(port), channel,
-                        contextFactory, interface=bindAddress,
+                        interface=bindAddress,
                         backlog=config.ListenBacklog
                     )
-                    httpsService.setServiceParent(service)
+                    httpService.setServiceParent(service)
 
+                for port in config.BindSSLPorts:
+                    log.info("Adding SSL server at %s:%s" % (bindAddress, port))
+
+                    try:
+                        contextFactory = ChainingOpenSSLContextFactory(
+                            config.SSLPrivateKey,
+                            config.SSLCertificate,
+                            certificateChainFile=config.SSLAuthorityChain,
+                            passwdCallback=_getSSLPassphrase
+                        )
+                    except SSL.Error, e:
+                        log.error("Unable to set up SSL context factory: %s" % (e,))
+                        log.error("Disabling SSL port: %s" % (port,))
+                    else:
+                        httpsService = internet.SSLServer(
+                            int(port), channel,
+                            contextFactory, interface=bindAddress,
+                            backlog=config.ListenBacklog
+                        )
+                        httpsService.setServiceParent(service)
+
+
         # Change log level back to what it was before
         setLogLevelForNamespace(None, oldLogLevel)
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20090924/5d661708/attachment-0001.html>


More information about the calendarserver-changes mailing list