[CalendarServer-changes] [5430] CalendarServer/branches/users/wsanchez/deployment

source_changes at macosforge.org source_changes at macosforge.org
Wed Mar 31 11:03:59 PDT 2010


Revision: 5430
          http://trac.macosforge.org/projects/calendarserver/changeset/5430
Author:   glyph at apple.com
Date:     2010-03-31 11:03:58 -0700 (Wed, 31 Mar 2010)
Log Message:
-----------
Merge in 'sendfd' changes so that the server will call accept() from a single process.

Modified Paths:
--------------
    CalendarServer/branches/users/wsanchez/deployment/setup.py
    CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cluster.py
    CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/config.py
    CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/tap.py
    CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/test_tap.py

Added Paths:
-----------
    CalendarServer/branches/users/wsanchez/deployment/lib-patches/Twisted/twisted.web2.channel.http.patch
    CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/metafd.py
    CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendfd.py
    CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendfdport.py
    CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendmsg.c
    CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/pullpipe.py
    CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/test_sendmsg.py

Property Changed:
----------------
    CalendarServer/branches/users/wsanchez/deployment/


Property changes on: CalendarServer/branches/users/wsanchez/deployment
___________________________________________________________________
Modified: svn:mergeinfo
   - /CalendarServer/branches/users/sagen/deployment-inherit-fds-4571:4573-4709
/CalendarServer/branches/users/sagen/deployment-inspection:4927-4937
   + /CalendarServer/branches/users/glyph/deployment-plus-sendfd:5426-5429
/CalendarServer/branches/users/sagen/deployment-inherit-fds-4571:4573-4709
/CalendarServer/branches/users/sagen/deployment-inspection:4927-4937

Copied: CalendarServer/branches/users/wsanchez/deployment/lib-patches/Twisted/twisted.web2.channel.http.patch (from rev 5429, CalendarServer/branches/users/glyph/deployment-plus-sendfd/lib-patches/Twisted/twisted.web2.channel.http.patch)
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/lib-patches/Twisted/twisted.web2.channel.http.patch	                        (rev 0)
+++ CalendarServer/branches/users/wsanchez/deployment/lib-patches/Twisted/twisted.web2.channel.http.patch	2010-03-31 18:03:58 UTC (rev 5430)
@@ -0,0 +1,90 @@
+Index: twisted/web2/channel/http.py
+===================================================================
+--- twisted/web2/channel/http.py	(revision 19773)
++++ twisted/web2/channel/http.py	(working copy)
+@@ -658,7 +658,7 @@
+         
+     def connectionMade(self):
+         self.setTimeout(self.inputTimeOut)
+-        self.factory.outstandingRequests+=1
++        self.factory.addConnectedChannel(self)
+     
+     def lineReceived(self, line):
+         if self._first_line:
+@@ -847,7 +847,7 @@
+             self.transport.loseConnection()
+         
+     def connectionLost(self, reason):
+-        self.factory.outstandingRequests-=1
++        self.factory.removeConnectedChannel(self)
+ 
+         self._writeLost = True
+         self.readConnectionLost()
+@@ -869,20 +869,33 @@
+                              "please try again later.</body></html>")
+         self.transport.loseConnection()
+ 
++
++
+ class HTTPFactory(protocol.ServerFactory):
+-    """Factory for HTTP server."""
++    """
++    Factory for HTTP server.
+ 
++    @ivar outstandingRequests: the number of currently connected HTTP channels.
++
++    @type outstandingRequests: C{int}
++
++    @ivar connectedChannels: all the channels that have currently active
++    connections.
++
++    @type connectedChannels: C{set} of L{HTTPChannel}
++    """
++
+     protocol = HTTPChannel
+     
+     protocolArgs = None
+ 
+-    outstandingRequests = 0
+-    
+     def __init__(self, requestFactory, maxRequests=600, **kwargs):
+-        self.maxRequests=maxRequests
++        self.maxRequests = maxRequests
+         self.protocolArgs = kwargs
+-        self.protocolArgs['requestFactory']=requestFactory
+-        
++        self.protocolArgs['requestFactory'] = requestFactory
++        self.connectedChannels = set()
++
++
+     def buildProtocol(self, addr):
+         if self.outstandingRequests >= self.maxRequests:
+             return OverloadedServerProtocol()
+@@ -893,4 +906,27 @@
+             setattr(p, arg, value)
+         return p
+ 
++ 
++    def addConnectedChannel(self, channel):
++        """
++        Add a connected channel to the set of currently connected channels and
++        increase the outstanding request count.
++        """
++        self.connectedChannels.add(channel)
++
++
++    def removeConnectedChannel(self, channel):
++        """
++        Remove a connected channel from the set of currently connected channels
++        and decrease the outstanding request count.
++        """
++        self.connectedChannels.remove(channel)
++
++
++    @property
++    def outstandingRequests(self):
++        return len(self.connectedChannels)
++
++
++
+ __all__ = ['HTTPFactory', ]

Modified: CalendarServer/branches/users/wsanchez/deployment/setup.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/setup.py	2010-03-31 17:55:47 UTC (rev 5429)
+++ CalendarServer/branches/users/wsanchez/deployment/setup.py	2010-03-31 18:03:58 UTC (rev 5430)
@@ -97,7 +97,10 @@
 
 from distutils.core import Extension
 
-extensions = []
+extensions = [
+    Extension("twistedcaldav.sendmsg",
+              sources=["twistedcaldav/sendmsg.c"])
+]
 
 if sys.platform == "darwin":
     extensions.append(

Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cluster.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cluster.py	2010-03-31 17:55:47 UTC (rev 5429)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cluster.py	2010-03-31 18:03:58 UTC (rev 5430)
@@ -26,13 +26,17 @@
 from twisted.internet import reactor, process
 from twisted.internet.threads import deferToThread
 from twisted.python.reflect import namedClass
+from twisted.python.usage import UsageError
 
 from twistedcaldav.accesslog import AMPLoggingFactory, RotatingFileAccessLoggingObserver
 from twistedcaldav.config import config, ConfigurationError
 from twistedcaldav.util import getNCPU
 from twistedcaldav.log import Logger
 from twistedcaldav.directory.appleopendirectory import OpenDirectoryService
+OpenDirectoryService            # Pyflakes
 
+from twistedcaldav.metafd import ConnectionLimiter
+
 log = Logger()
 
 serviceTemplate = """
@@ -62,7 +66,7 @@
 
     def __init__(self, twistd, tapname, configFile, id,
                  interfaces, port, sslPort,
-                 inheritFDs=None, inheritSSLFDs=None):
+                 inheritFDs=None, inheritSSLFDs=None, dispatcher=None):
 
         self.twistd = twistd
 
@@ -75,8 +79,15 @@
         self.ports = port
         self.sslPorts = sslPort
 
-        self.inheritFDs = inheritFDs
-        self.inheritSSLFDs = inheritSSLFDs
+        def emptyIfNone(x):
+            if x is None:
+                return []
+            else:
+                return x
+        self.inheritFDs = emptyIfNone(inheritFDs)
+        self.inheritSSLFDs = emptyIfNone(inheritSSLFDs)
+        self.metaSocket = None
+        self.dispatcher = dispatcher
 
         self.interfaces = interfaces
 
@@ -85,12 +96,36 @@
             return '%s-%s' % (self.prefix, self.ports[0])
         elif self.sslPorts is not None:
             return '%s-%s' % (self.prefix, self.sslPorts[0])
-        elif self.inheritFDs or self.inheritSSLFDs:
+        elif self.inheritFDs or self.inheritSSLFDs or self.dispatcher:
             return '%s-%s' % (self.prefix, self.id)
 
         raise ConfigurationError(
             "Can't create TwistdSlaveProcess without a TCP Port")
 
+
+    def getMetaDescriptor(self):
+        """
+        Get the meta-socket file descriptor to inherit.
+        """
+        if self.metaSocket is None:
+            self.metaSocket = self.dispatcher.addSocket()
+        return self.metaSocket.fileno()
+
+
+    def getFileDescriptors(self):
+        """
+        @return: a mapping of file descriptor numbers for the new (child)
+            process to file descriptor numbers in the current (master) process.
+        """
+        fds = {}
+        maybeMetaFD = []
+        if self.dispatcher is not None:
+            maybeMetaFD.append(self.getMetaDescriptor())
+        for fd in self.inheritSSLFDs + self.inheritFDs + maybeMetaFD:
+            fds[fd] = fd
+        return fds
+
+
     def getCommandLine(self):
         args = [
             sys.executable,
@@ -144,6 +179,13 @@
             args.extend([
                     '-o',
                     'InheritSSLFDs=%s' % (','.join(map(str, self.inheritSSLFDs)),)])
+ 
+        if self.dispatcher is not None:
+            # XXX this FD is never closed in the parent.  should it be?
+            # (should they *all* be?) -glyph
+            args.extend([
+                    "-o", "MetaFD=%s" % (self.getMetaDescriptor(),)
+                ])
 
         return args
 
@@ -157,7 +199,7 @@
         if ssl and self.sslPorts is not None:
             port = self.sslPorts
 
-        if self.inheritFDs or self.inheritSSLFDs:
+        if self.inheritFDs or self.inheritSSLFDs or self.dispatcher:
             port = [self.id]
 
         if port is None:
@@ -169,42 +211,105 @@
                                'bindAddress': '127.0.0.1'}
 
 
+
 class DelayedStartupProcessMonitor(procmon.ProcessMonitor):
+    """
+    A L{DelayedStartupProcessMonitor} is a L{procmon.ProcessMonitor} that
+    defers building its command lines until the service is actually ready to
+    start.  It also specializes process-starting to allow for process objects
+    to determine their arguments as they are started up rather than entirely
+    ahead of time.
 
+    @ivar processObjects: a C{list} of L{TwistdSlaveProcess} to add using
+        C{self.addProcess} when this service starts up.
+
+    @ivar _extraFDs: a mapping from process names to extra file-descriptor
+        maps.  (By default, all processes will have the standard stdio mapping,
+        so all file descriptors here should be >2.)  This is updated during
+        L{DelayedStartupProcessMonitor.startService}, by inspecting the result
+        of L{TwistdSlaveProcess.getFileDescriptors}.
+
+    @ivar reactor: an L{IReactorProcess} for spawning processes, defaulting to
+        the global reactor.
+    """
+
+    def __init__(self, *args, **kwargs):
+        procmon.ProcessMonitor.__init__(self, *args, **kwargs)
+        self.processObjects = []
+        self._extraFDs = {}
+        self.reactor = reactor
+
+
+    def addProcessObject(self, process, env):
+        """
+        Add a process object to be run when this service is started.
+
+        @param env: a dictionary of environment variables.
+
+        @param process: a L{TwistdSlaveProcesses} object to be started upon
+            service startup.
+        """
+        self.processObjects.append((process, env))
+
+
     def startService(self):
         service.Service.startService(self)
+
+        # Now we're ready to build the command lines and actualy add the
+        # processes to procmon.  This step must be done prior to setting
+        # active to 1
+        for processObject, env in self.processObjects:
+            name = processObject.getName()
+            self.addProcess(
+                name,
+                processObject.getCommandLine(),
+                env=env
+            )
+            self._extraFDs[name] = processObject.getFileDescriptors()
+
         self.active = 1
         delay = 0
-        delay_interval = config.MultiProcess['StaggeredStartup']['Interval'] if config.MultiProcess['StaggeredStartup']['Enabled'] else 0 
+
+        if config.MultiProcess.StaggeredStartup.Enabled:
+            delay_interval = config.MultiProcess.StaggeredStartup.Interval
+        else:
+            delay_interval = 0
+
         for name in self.processes.keys():
-            reactor.callLater(delay if name.startswith("caldav") else 0, self.startProcess, name)
             if name.startswith("caldav"):
+                when = delay
                 delay += delay_interval
-        self.consistency = reactor.callLater(self.consistencyDelay,
-                                             self._checkConsistency)
+            else:
+                when = 0
+            reactor.callLater(when, self.startProcess, name)
 
-    def signalAll(self, signal, startswithname=None, seconds=0):
+        self.consistency = reactor.callLater(
+            self.consistencyDelay,
+            self._checkConsistency
+        )
+
+    def signalAll(self, signal, startswithname=None):
         """
         Send a signal to all child processes.
 
         @param signal: the signal to send
         @type signal: C{int}
-        @param startswithname: is set only signal those processes whose name starts with this string
+        @param startswithname: is set only signal those processes
+            whose name starts with this string
         @type signal: C{str}
         """
-        delay = 0
         for name in self.processes.keys():
             if startswithname is None or name.startswith(startswithname):
-                reactor.callLater(delay, self.signalProcess, signal, name)
-                delay += seconds
+                self.signalProcess(signal, name)
 
     def signalProcess(self, signal, name):
         """
         Send a signal to each monitored process
-        
+
         @param signal: the signal to send
         @type signal: C{int}
-        @param startswithname: is set only signal those processes whose name starts with this string
+        @param startswithname: is set only signal those processes
+            whose name starts with this string
         @type signal: C{str}
         """
         if not self.protocols.has_key(name):
@@ -226,17 +331,15 @@
 
         childFDs = { 0 : "w", 1 : "r", 2 : "r" }
 
-        # Examine args for -o InheritFDs= and -o InheritSSLFDs=
-        # Add any file descriptors listed in those args to the childFDs
-        # dictionary so those don't get closed across the spawn.
-        for i in xrange(len(args)-1):
-            if args[i] == "-o" and args[i+1].startswith("Inherit"):
-                for fd in map(int, args[i+1].split("=")[1].split(",")):
-                    childFDs[fd] = fd
+        childFDs.update(self._extraFDs.get(name, {}))
 
-        reactor.spawnProcess(p, args[0], args, uid=uid, gid=gid, env=env,
-            childFDs=childFDs)
+        self.reactor.spawnProcess(
+            p, args[0], args, uid=uid, gid=gid, env=env,
+            childFDs=childFDs
+        )
 
+
+
 def makeService_Combined(self, options):
 
 
@@ -327,7 +430,13 @@
         if not config.BindAddresses:
             config.BindAddresses = [""]
 
-        s._inheritedSockets = [] # keep a reference to these so they don't close
+        if config.UseMetaFD:
+            cl = ConnectionLimiter(config.MaxAccepts,
+                                   (config.MaxRequests *
+                                    config.MultiProcess.ProcessCount))
+            cl.setServiceParent(s)
+        else:
+            s._inheritedSockets = [] # keep a reference to these so they don't close
 
         for bindAddress in config.BindAddresses:
             if config.BindHTTPPorts:
@@ -346,23 +455,29 @@
             elif config.SSLPort != 0:
                 config.BindSSLPorts = [config.SSLPort]
 
-            def _openSocket(addr, port):
-                log.info("Opening socket for inheritance at %s:%d" % (addr, port))
-                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-                sock.setblocking(0)
-                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-                sock.bind((addr, port))
-                sock.listen(config.ListenBacklog)
-                s._inheritedSockets.append(sock)
-                return sock
+            if config.UseMetaFD:
+                for ports, description in [(config.BindSSLPorts, "SSL"),
+                                           (config.BindHTTPPorts, "TCP")]:
+                    for portNumber in ports:
+                        cl.addPortService(description, portNumber, bindAddress, config.ListenBacklog)
+            else:
+                def _openSocket(addr, port):
+                    log.info("Opening socket for inheritance at %s:%d" % (addr, port))
+                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                    sock.setblocking(0)
+                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+                    sock.bind((addr, port))
+                    sock.listen(config.ListenBacklog)
+                    s._inheritedSockets.append(sock)
+                    return sock
 
-            for portNum in config.BindHTTPPorts:
-                sock = _openSocket(bindAddress, int(portNum))
-                inheritFDs.append(sock.fileno())
+                for portNum in config.BindHTTPPorts:
+                    sock = _openSocket(bindAddress, int(portNum))
+                    inheritFDs.append(sock.fileno())
 
-            for portNum in config.BindSSLPorts:
-                sock = _openSocket(bindAddress, int(portNum))
-                inheritSSLFDs.append(sock.fileno())
+                for portNum in config.BindSSLPorts:
+                    sock = _openSocket(bindAddress, int(portNum))
+                    inheritSSLFDs.append(sock.fileno())
 
     if not config.MultiProcess['LoadBalancer']['Enabled']:
         bindAddress = config.BindAddresses
@@ -381,19 +496,24 @@
         if inheritSSLFDs:
             sslPort = None
 
+        if config.UseMetaFD:
+            port = None
+            sslPort = None
+            extraArgs = dict(dispatcher=cl.dispatcher)
+        else:
+            extraArgs = dict(inheritFDs=inheritFDs,
+                             inheritSSLFDs=inheritSSLFDs)
+
         process = TwistdSlaveProcess(config.Twisted['twistd'],
                                      self.tapname,
                                      options['config'],
                                      p,
                                      bindAddress,
                                      port, sslPort,
-                                     inheritFDs=inheritFDs,
-                                     inheritSSLFDs=inheritSSLFDs
+                                     **extraArgs
                                      )
 
-        monitor.addProcess(process.getName(),
-                           process.getCommandLine(),
-                           env=parentEnv)
+        monitor.addProcessObject(process, parentEnv)
 
         if config.HTTPPort:
             hosts.append(process.getHostLine())

Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/config.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/config.py	2010-03-31 17:55:47 UTC (rev 5429)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/config.py	2010-03-31 18:03:58 UTC (rev 5430)
@@ -92,7 +92,10 @@
     "ServerHostName": "localhost", # Network host name.
     "HTTPPort": 0,                 # HTTP port (0 to disable HTTP)
     "SSLPort" : 0,                 # SSL port (0 to disable HTTPS)
-
+    "MetaFD": 0,        # Inherited file descriptor to call recvmsg() on to recive sockets (none = don't inherit)
+                                                        
+    "UseMetaFD": True,         # Use a 'meta' FD, i.e. an FD to transmit other
+                               # FDs to slave processes.
     #
     # Network address configuration information
     #

Copied: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/metafd.py (from rev 5429, CalendarServer/branches/users/glyph/deployment-plus-sendfd/twistedcaldav/metafd.py)
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/metafd.py	                        (rev 0)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/metafd.py	2010-03-31 18:03:58 UTC (rev 5430)
@@ -0,0 +1,253 @@
+
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twisted.internet.tcp import Server
+from twisted.application.internet import TCPServer
+
+from twisted.internet import reactor
+
+from twisted.application.service import MultiService, Service
+
+from twisted.web2.channel.http import HTTPFactory
+
+from twistedcaldav.sendfdport import (
+    InheritedPort, InheritedSocketDispatcher, InheritingProtocolFactory)
+
+
+
+class JustEnoughLikeAPort(object):
+    """
+    Fake out just enough of L{tcp.Port} to be acceptable to
+    L{tcp.Server}...
+    """
+    port = 'inherited'
+
+
+
+class ReportingHTTPService(Service, object):
+    """
+    Service which starts up an HTTP server that can report back to its parent
+    process via L{InheritedPort}.
+    """
+
+    _connectionCount = 0
+
+    def __init__(self, site, fd, contextFactory):
+        self.contextFactory = contextFactory
+        # Unlike other 'factory' constructions, config.MaxRequests and
+        # config.MaxAccepts are dealt with in the master process, so we don't
+        # need to propagate them here.
+        self.site = site
+        self.fd = fd
+
+
+    def startService(self):
+        """
+        Start reading on the inherited port.
+        """
+        Service.startService(self)
+        self.reportingFactory = ReportingHTTPFactory(self.site, vary=True)
+        self.reportingFactory.inheritedPort = InheritedPort(
+            self.fd, self.createTransport, self.reportingFactory
+        )
+        self.reportingFactory.inheritedPort.startReading()
+
+
+    def stopService(self):
+        """
+        Stop reading on the inherited port.
+        """
+        Service.stopService(self)
+        # XXX stopping should really be destructive, because otherwise we will
+        # always leak a file descriptor; i.e. this shouldn't be restartable.
+        # XXX this needs to return a Deferred.
+        self.reportingFactory.inheritedPort.stopReading()
+
+
+    def createTransport(self, skt, data, protocol):
+        """
+        Create a TCP transport, from a socket object passed by the parent.
+        """
+        self._connectionCount += 1
+        transport = Server(skt, protocol,
+                           skt.getpeername(), JustEnoughLikeAPort,
+                           self._connectionCount)
+        if data == 'SSL':
+            transport.startTLS(self.contextFactory)
+        transport.startReading()
+        return transport
+
+
+
+class ReportingHTTPFactory(HTTPFactory):
+    """
+    An L{HTTPFactory} which reports its status to a
+    L{twext.internet.sendfdport.InheritedPort}.
+
+    @ivar inheritedPort: an L{InheritedPort} to report status (the current
+        number of outstanding connections) to.  Since this - the
+        L{ReportingHTTPFactory} - needs to be instantiated to be passed to
+        L{InheritedPort}'s constructor, this attribute must be set afterwards
+        but before any connections have occurred.
+    """
+
+    def _report(self, message):
+        """
+        Report a status message to the parent.
+        """
+        self.inheritedPort.reportStatus(message)
+
+
+    def addConnectedChannel(self, channel):
+        """
+        Add the connected channel, and report the current number of open
+        channels to the listening socket in the parent process.
+        """
+        HTTPFactory.addConnectedChannel(self, channel)
+        self._report("+")
+
+
+    def removeConnectedChannel(self, channel):
+        """
+        Remove the connected channel, and report the current number of open
+        channels to the listening socket in the parent process.
+        """
+        HTTPFactory.removeConnectedChannel(self, channel)
+        self._report("-")
+
+
+
+class ConnectionLimiter(MultiService, object):
+    """
+    Connection limiter for use with L{InheritedSocketDispatcher}.
+
+    This depends on statuses being reported by L{ReportingHTTPFactory}
+    """
+
+    def __init__(self, maxAccepts, maxRequests):
+        """
+        Create a L{ConnectionLimiter} with an associated dispatcher and
+        list of factories.
+        """
+        MultiService.__init__(self)
+        self.factories = []
+        # XXX dispatcher needs to be a service, so that it can shut down its
+        # sub-sockets.
+        self.dispatcher = InheritedSocketDispatcher(self)
+        self.maxAccepts = maxAccepts
+        self.maxRequests = maxRequests
+
+
+    def addPortService(self, description, port, interface, backlog):
+        """
+        Add a L{TCPServer} to bind a TCP port to a socket description.
+        """
+        lipf = LimitingInheritingProtocolFactory(self, description)
+        self.factories.append(lipf)
+        svr = TCPServer(
+            port, lipf,
+            interface=interface,
+            backlog=backlog
+        )
+        svr.setServiceParent(self)
+        lipf.serverService = svr
+
+
+    # implementation of implicit statusWatcher interface required by
+    # InheritedSocketDispatcher
+
+    def statusFromMessage(self, previousStatus, message):
+        """
+        Determine a subprocess socket's status from its previous status and a
+        status message.
+        """
+        if message == '-':
+            result = self.intWithNoneAsZero(previousStatus) - 1
+            # A connection has gone away in a subprocess; we should start
+            # accepting connections again if we paused (see
+            # newConnectionStatus)
+            for f in self.factories:
+                f.serverService._port.startReading()
+        else:
+            # '+' is just an acknowledgement of newConnectionStatus, so we can
+            # ignore it.
+            result = self.intWithNoneAsZero(previousStatus)
+        return result
+
+
+    def newConnectionStatus(self, previousStatus):
+        """
+        Determine the effect of a new connection being sent on a subprocess
+        socket.
+        """
+        current = self.outstandingRequests + 1
+        maximum = self.maxRequests
+        overloaded = (current >= maximum)
+        if overloaded:
+            for f in self.factories:
+                f.serverService._port.stopReading()
+
+        result = self.intWithNoneAsZero(previousStatus) + 1
+        return result
+
+
+    def intWithNoneAsZero(self, x):
+        """
+        Convert 'x' to an C{int}, unless x is C{None}, in which case return 0.
+        """
+        if x is None:
+            return 0
+        else:
+            return int(x)
+
+
+    @property
+    def outstandingRequests(self):
+        outstanding = 0
+        for status in self.dispatcher.statuses:
+            outstanding += self.intWithNoneAsZero(status)
+        return outstanding
+
+
+
+class LimitingInheritingProtocolFactory(InheritingProtocolFactory):
+    """
+    An L{InheritingProtocolFactory} that supports the implicit factory contract
+    required by L{MaxAcceptTCPServer}/L{MaxAcceptTCPPort}.
+
+    @ivar outstandingRequests: a read-only property for the number of currently
+        active connections.
+
+    @ivar maxAccepts: The maximum number of times to call 'accept()' in a
+        single reactor loop iteration.
+
+    @ivar maxRequests: The maximum number of concurrent connections to accept
+        at once - note that this is for the I{entire server}, whereas the
+        value in the configuration file is for only a single process.
+    """
+
+    def __init__(self, limiter, description):
+        super(LimitingInheritingProtocolFactory, self).__init__(
+            limiter.dispatcher, description)
+        self.limiter = limiter
+        self.maxAccepts = limiter.maxAccepts
+        self.maxRequests = limiter.maxRequests
+
+
+    @property
+    def outstandingRequests(self):
+        return self.limiter.outstandingRequests

Copied: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendfd.py (from rev 5429, CalendarServer/branches/users/glyph/deployment-plus-sendfd/twistedcaldav/sendfd.py)
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendfd.py	                        (rev 0)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendfd.py	2010-03-31 18:03:58 UTC (rev 5430)
@@ -0,0 +1,66 @@
+# -*- test-case-name: twext.python.test.test_sendmsg -*-
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from struct import pack, unpack
+from socket import SOL_SOCKET
+
+from twistedcaldav.sendmsg import sendmsg, recvmsg, SCM_RIGHTS
+
+def sendfd(socketfd, fd, description):
+    """
+    Send the given FD to another process via L{sendmsg} on the given C{AF_UNIX}
+    socket.
+
+    @param socketfd: An C{AF_UNIX} socket, attached to another process waiting
+        to receive sockets via the ancillary data mechanism in L{sendmsg}.
+
+    @type socketfd: C{int}
+
+    @param fd: A file descriptor to be sent to the other process.
+
+    @type fd: C{int}
+
+    @param description: a string describing the socket that was passed.
+
+    @type description: C{str}
+    """
+    sendmsg(
+        socketfd, description, 0, [(SOL_SOCKET, SCM_RIGHTS, pack("i", fd))]
+    )
+
+
+def recvfd(socketfd):
+    """
+    Receive a file descriptor from a L{sendmsg} message on the given C{AF_UNIX}
+    socket.
+
+    @param socketfd: An C{AF_UNIX} socket, attached to another process waiting
+        to send sockets via the ancillary data mechanism in L{sendmsg}.
+
+    @param fd: C{int}
+
+    @return: a 2-tuple of (new file descriptor, description).
+
+    @rtype: 2-tuple of (C{int}, C{str})
+    """
+    data, flags, ancillary = recvmsg(socketfd)
+    [(cmsg_level, cmsg_type, packedFD)] = ancillary
+    # cmsg_level and cmsg_type really need to be SOL_SOCKET / SCM_RIGHTS, but
+    # since those are the *only* standard values, there's not much point in
+    # checking.
+    [unpackedFD] = unpack("i", packedFD)
+    return (unpackedFD, data)

Copied: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendfdport.py (from rev 5429, CalendarServer/branches/users/glyph/deployment-plus-sendfd/twistedcaldav/sendfdport.py)
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendfdport.py	                        (rev 0)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendfdport.py	2010-03-31 18:03:58 UTC (rev 5430)
@@ -0,0 +1,303 @@
+# -*- test-case-name: twext.internet.test.test_sendfdport -*-
+##
+# Copyright (c) 2005-2009 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Implementation of a TCP/SSL port that uses sendmsg/recvmsg as implemented by
+L{twext.python.sendfd}.
+"""
+
+from os import close
+from errno import EAGAIN, ENOBUFS
+from socket import (socketpair, fromfd, error as SocketError,
+                    AF_INET, AF_UNIX, SOCK_STREAM, SOCK_DGRAM)
+
+from twisted.python import log
+
+from twisted.internet.abstract import FileDescriptor
+from twisted.internet.protocol import Protocol, Factory
+
+from twistedcaldav.sendmsg import sendmsg, recvmsg
+from twistedcaldav.sendfd import sendfd, recvfd
+
+class InheritingProtocol(Protocol, object):
+    """
+    When a connection comes in on this protocol, stop reading and writing, and
+    dispatch the socket to another process via its factory.
+    """
+
+    def connectionMade(self):
+        """
+        A connection was received; transmit the file descriptor to another
+        process via L{InheritingProtocolFactory} and remove my transport from
+        the reactor.
+        """
+        self.transport.stopReading()
+        self.transport.stopWriting()
+        skt = self.transport.getHandle()
+        self.factory.sendSocket(skt)
+
+
+
+class InheritingProtocolFactory(Factory, object):
+    """
+    In the 'master' process, make one of these and hook it up to the sockets
+    where you want to hear stuff.
+
+    @ivar dispatcher: an L{InheritedSocketDispatcher} to use to dispatch
+        incoming connections to an appropriate subprocess.
+
+    @ivar description: the string to send along with connections received on
+        this factory.
+    """
+
+    protocol = InheritingProtocol
+
+    def __init__(self, dispatcher, description):
+        self.dispatcher = dispatcher
+        self.description = description
+
+
+    def sendSocket(self, socketObject):
+        """
+        Send the given socket object on to my dispatcher.
+        """
+        self.dispatcher.sendFileDescriptor(socketObject, self.description)
+
+
+
+class _SubprocessSocket(FileDescriptor, object):
+    """
+    A socket in the master process pointing at a file descriptor that can be
+    used to transmit sockets to a subprocess.
+
+    @ivar skt: the UNIX socket used as the sendmsg() transport.
+
+    @ivar outgoingSocketQueue: an outgoing queue of sockets to send to the
+        subprocess.
+
+    @ivar outgoingSocketQueue: a C{list} of 2-tuples of C{(socket-object, str)}
+
+    @ivar status: a record of the last status message received (via recvmsg)
+        from the subprocess: this is an application-specific indication of how
+        ready this subprocess is to receive more connections.  A typical usage
+        would be to count the open connections: this is what is passed to 
+
+    @type status: C{str}
+    """
+
+    def __init__(self, dispatcher, skt):
+        FileDescriptor.__init__(self, dispatcher.reactor)
+        self.status = None
+        self.dispatcher = dispatcher
+        self.skt = skt          # XXX needs to be set non-blocking by somebody
+        self.fileno = skt.fileno
+        self.outgoingSocketQueue = []
+
+
+    def sendSocketToPeer(self, skt, description):
+        """
+        Enqueue a socket to send to the subprocess.
+        """
+        self.outgoingSocketQueue.append((skt, description))
+        self.startWriting()
+
+
+    def doRead(self):
+        """
+        Receive a status / health message and record it.
+        """
+        try:
+            data, flags, ancillary = recvmsg(self.skt.fileno())
+        except SocketError, se:
+            if se.errno not in (EAGAIN, ENOBUFS):
+                raise
+        else:
+            self.dispatcher.statusMessage(self, data)
+
+
+    def doWrite(self):
+        """
+        Transmit as many queued pending file descriptors as we can.
+        """
+        while self.outgoingSocketQueue:
+            skt, desc = self.outgoingSocketQueue.pop(0)
+            try:
+                sendfd(self.skt.fileno(), skt.fileno(), desc)
+            except SocketError, se:
+                if se.errno in (EAGAIN, ENOBUFS):
+                    self.outgoingSocketQueue.insert(0, (skt, desc))
+                    return
+                raise
+            else:
+                skt.close()
+        if not self.outgoingSocketQueue:
+            self.stopWriting()
+
+
+
+class InheritedSocketDispatcher(object):
+    """
+    Used by one or more L{InheritingProtocolFactory}s, this keeps track of a
+    list of available sockets in subprocesses and sends inbound connections
+    towards them.
+    """
+
+    def __init__(self, statusWatcher):
+        """
+        Create a socket dispatcher.
+        """
+        self._subprocessSockets = []
+        self.statusWatcher = statusWatcher
+        from twisted.internet import reactor
+        self.reactor = reactor
+
+
+    @property
+    def statuses(self):
+        """
+        Yield the current status of all subprocess sockets.
+        """
+        for subsocket in self._subprocessSockets:
+            yield subsocket.status
+
+
+    def statusMessage(self, subsocket, message):
+        """
+        The status of a connection has changed; update all registered status
+        change listeners.
+        """
+        subsocket.status = self.statusWatcher.statusFromMessage(subsocket.status, message)
+
+
+    def sendFileDescriptor(self, skt, description):
+        """
+        A connection has been received.  Dispatch it.
+
+        @param skt: a socket object
+
+        @param description: some text to identify to the subprocess's
+            L{InheritedPort} what type of transport to create for this socket.
+        """
+        self._subprocessSockets.sort(key=lambda conn: conn.status)
+        selectedSocket = self._subprocessSockets[0]
+        selectedSocket.sendSocketToPeer(skt, description)
+        # XXX Maybe want to send along 'description' or 'skt' or some
+        # properties thereof? -glyph
+        selectedSocket.status = self.statusWatcher.newConnectionStatus(selectedSocket.status)
+
+
+    def addSocket(self):
+        """
+        Add a C{sendmsg()}-oriented AF_UNIX socket to the pool of sockets being
+        used for transmitting file descriptors to child processes.
+
+        @return: a socket object for the receiving side; pass this object's
+            C{fileno()} as part of the C{childFDs} argument to
+            C{spawnProcess()}, then close it.
+        """
+        i, o = socketpair(AF_UNIX, SOCK_DGRAM)
+        a = _SubprocessSocket(self, o)
+        a.startReading()
+        self._subprocessSockets.append(a)
+        return i
+
+
+
+class InheritedPort(FileDescriptor, object):
+    """
+    Create this in the 'slave' process to handle incoming connections
+    dispatched via C{sendmsg}.
+    """
+
+    def __init__(self, fd, transportFactory, protocolFactory):
+        """
+        @param fd: a file descriptor
+
+        @type fd: C{int}
+        
+        @param transportFactory: a 3-argument function that takes the socket
+            object produced from the file descriptor, the (non-ancillary) data
+            sent along with the incoming file descriptor, and the protocol
+            built along with it, and returns an L{ITransport} provider.  Note
+            that this should NOT call C{makeConnection} on the protocol that it
+            produces, as this class will do that.
+
+        @param protocolFactory: an L{IProtocolFactory}
+        """
+        FileDescriptor.__init__(self)
+        self.fd = fd
+        self.transportFactory = transportFactory
+        self.protocolFactory = protocolFactory
+        self.statusQueue = []
+
+
+    def fileno(self):
+        """
+        Get the FD number for this socket.
+        """
+        return self.fd
+
+
+    def doRead(self):
+        """
+        A message is ready to read.  Receive a file descriptor from our parent
+        process.
+        """
+        try:
+            fd, description = recvfd(self.fd)
+        except SocketError, se:
+            if se.errno != EAGAIN:
+                raise
+        else:
+            try:
+                skt = fromfd(fd, AF_INET, SOCK_STREAM)
+                # XXX it could be AF_UNIX, I guess?  or even something else?
+                # should this be on the transportFactory's side of things?
+
+                close(fd)       # fromfd() calls dup()
+                peeraddr = skt.getpeername()
+                protocol = self.protocolFactory.buildProtocol(peeraddr)
+                transport = self.transportFactory(skt, description, protocol)
+                protocol.makeConnection(transport)
+            except:
+                log.err()
+
+
+    def doWrite(self):
+        """
+        Write some data.
+        """
+        while self.statusQueue:
+            msg = self.statusQueue.pop(0)
+            try:
+                sendmsg(self.fd, msg, 0)
+            except SocketError, se:
+                if se.errno in (EAGAIN, ENOBUFS):
+                    self.statusQueue.insert(0, msg)
+                    return
+                raise
+        self.stopWriting()
+
+
+    def reportStatus(self, statusMessage):
+        """
+        Report a status message to the L{_SubprocessSocket} monitoring this
+        L{InheritedPort}'s health in the master process.
+        """
+        self.statusQueue.append(statusMessage)
+        self.startWriting()
+        

Copied: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendmsg.c (from rev 5429, CalendarServer/branches/users/glyph/deployment-plus-sendfd/twistedcaldav/sendmsg.c)
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendmsg.c	                        (rev 0)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendmsg.c	2010-03-31 18:03:58 UTC (rev 5430)
@@ -0,0 +1,341 @@
+/*
+ * Copyright (c) 2010 Apple Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <Python.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <signal.h>
+
+PyObject *sendmsg_socket_error;
+
+static PyObject *sendmsg_sendmsg(PyObject *self, PyObject *args, PyObject *keywds);
+static PyObject *sendmsg_recvmsg(PyObject *self, PyObject *args, PyObject *keywds);
+
+static PyMethodDef sendmsg_methods[] = {
+    {"sendmsg", (PyCFunction) sendmsg_sendmsg, METH_VARARGS | METH_KEYWORDS,
+     NULL},
+    {"recvmsg", (PyCFunction) sendmsg_recvmsg, METH_VARARGS | METH_KEYWORDS,
+     NULL},
+    {NULL, NULL, 0, NULL}
+};
+
+
+PyMODINIT_FUNC initsendmsg(void) {
+    PyObject *module;
+
+    sendmsg_socket_error = NULL; /* Make sure that this has a known value
+                                    before doing anything that might exit. */
+
+    module = Py_InitModule("sendmsg", sendmsg_methods);
+
+    if (!module) {
+        return;
+    }
+
+    /*
+      The following is the only value mentioned by POSIX:
+      http://www.opengroup.org/onlinepubs/9699919799/basedefs/sys_socket.h.html
+    */
+
+    if (-1 == PyModule_AddIntConstant(module, "SCM_RIGHTS", SCM_RIGHTS)) {
+        return;
+    }
+
+
+    /* BSD, Darwin, Hurd */
+#if defined(SCM_CREDS)
+    if (-1 == PyModule_AddIntConstant(module, "SCM_CREDS", SCM_CREDS)) {
+        return;
+    }
+#endif
+
+    /* Linux */
+#if defined(SCM_CREDENTIALS)
+    if (-1 == PyModule_AddIntConstant(module, "SCM_CREDENTIALS", SCM_CREDENTIALS)) {
+        return;
+    }
+#endif
+
+    /* Apparently everywhere, but not standardized. */
+#if defined(SCM_TIMESTAMP)
+    if (-1 == PyModule_AddIntConstant(module, "SCM_TIMESTAMP", SCM_TIMESTAMP)) {
+        return;
+    }
+#endif
+
+    module = PyImport_ImportModule("socket");
+    if (!module) {
+        return;
+    }
+
+    sendmsg_socket_error = PyObject_GetAttrString(module, "error");
+    if (!sendmsg_socket_error) {
+        return;
+    }
+}
+
+static PyObject *sendmsg_sendmsg(PyObject *self, PyObject *args, PyObject *keywds) {
+
+    int fd;
+    int flags = 0;
+    int sendmsg_result;
+    struct msghdr message_header;
+    struct iovec iov[1];
+    PyObject *ancillary = NULL;
+    static char *kwlist[] = {"fd", "data", "flags", "ancillary", NULL};
+
+    if (!PyArg_ParseTupleAndKeywords(
+            args, keywds, "it#|iO:sendmsg", kwlist,
+            &fd,
+            &iov[0].iov_base,
+            &iov[0].iov_len,
+            &flags,
+            &ancillary)) {
+        return NULL;
+    }
+
+    message_header.msg_name = NULL;
+    message_header.msg_namelen = 0;
+
+    message_header.msg_iov = iov;
+    message_header.msg_iovlen = 1;
+
+    message_header.msg_control = NULL;
+    message_header.msg_controllen = 0;
+
+    message_header.msg_flags = 0;
+
+    if (ancillary) {
+
+        if (!PyList_Check(ancillary)) {
+            PyErr_Format(PyExc_TypeError,
+                         "sendmsg argument 3 expected list, got %s",
+                         ancillary->ob_type->tp_name);
+            return NULL;
+        }
+
+        PyObject *iterator = PyObject_GetIter(ancillary);
+        PyObject *item = NULL;
+
+        if (iterator == NULL) {
+            return NULL;
+        }
+
+        int all_data_len = 0;
+
+        /* First we need to know how big the buffer needs to be in order to
+           have enough space for all of the messages. */
+        while ( (item = PyIter_Next(iterator)) ) {
+            int data_len, type, level;
+            char *data;
+            if (!PyArg_ParseTuple(item, "iit#:sendmsg ancillary data (level, type, data)",
+                                  &level,
+                                  &type,
+                                  &data,
+                                  &data_len)) {
+                Py_DECREF(item);
+                Py_DECREF(iterator);
+                return NULL;
+            }
+            all_data_len += CMSG_SPACE(data_len);
+
+            Py_DECREF(item);
+        }
+
+        Py_DECREF(iterator);
+        iterator = NULL;
+
+        /* Allocate the buffer for all of the ancillary elements. */
+        message_header.msg_control = malloc(all_data_len);
+        if (!message_header.msg_control) {
+            PyErr_NoMemory();
+            return NULL;
+        }
+        message_header.msg_controllen = all_data_len;
+
+        iterator = PyObject_GetIter(ancillary); /* again */
+        item = NULL;
+
+        if (!iterator) {
+            free(message_header.msg_control);
+            return NULL;
+        }
+
+        /* Unpack the tuples into the control message. */
+        struct cmsghdr *control_message = CMSG_FIRSTHDR(&message_header);
+        while ( (item = PyIter_Next(iterator)) ) {
+            int data_len, type, level;
+            unsigned char *data, *cmsg_data;
+
+            if (!PyArg_ParseTuple(item,
+                                  "iit#:sendmsg ancillary data (level, type, data)",
+                                  &level,
+                                  &type,
+                                  &data,
+                                  &data_len)) {
+                Py_DECREF(item);
+                Py_DECREF(iterator);
+                free(message_header.msg_control);
+                return NULL;
+            }
+
+            control_message->cmsg_level = level;
+            control_message->cmsg_type = type;
+            control_message->cmsg_len = CMSG_LEN(data_len);
+
+            cmsg_data = CMSG_DATA(control_message);
+            memcpy(cmsg_data, data, data_len);
+
+            Py_DECREF(item);
+
+            control_message = CMSG_NXTHDR(&message_header, control_message);
+
+            /* We explicitly allocated enough space for all ancillary data
+               above; if there isn't enough room, all bets are off. */
+            assert(control_message);
+        }
+        
+        Py_DECREF(iterator);
+        
+        if (PyErr_Occurred()) {
+            free(message_header.msg_control);
+            return NULL;
+        }
+    }
+
+    sendmsg_result = sendmsg(fd, &message_header, flags);
+
+    if (sendmsg_result < 0) {
+        PyErr_SetFromErrno(sendmsg_socket_error);
+        if (message_header.msg_control) {
+            free(message_header.msg_control);
+        }
+        return NULL;
+    }
+
+    return Py_BuildValue("i", sendmsg_result);
+}
+
+static PyObject *sendmsg_recvmsg(PyObject *self, PyObject *args, PyObject *keywds) {
+    int fd = -1;
+    int flags = 0;
+    size_t maxsize = 8192;
+    size_t cmsg_size = 4*1024;
+    int recvmsg_result;
+    struct msghdr message_header;
+    struct cmsghdr *control_message;
+    struct iovec iov[1];
+    char *cmsgbuf;
+    PyObject *ancillary;
+    PyObject *final_result = NULL;
+
+    static char *kwlist[] = {"fd", "flags", "maxsize", "cmsg_size", NULL};
+
+    if (!PyArg_ParseTupleAndKeywords(args, keywds, "i|iii:recvmsg", kwlist,
+                                     &fd, &flags, &maxsize, &cmsg_size)) {
+        return NULL;
+    }
+
+    cmsg_size = CMSG_SPACE(cmsg_size);
+
+    message_header.msg_name = NULL;
+    message_header.msg_namelen = 0;
+
+    iov[0].iov_len = maxsize;
+    iov[0].iov_base = malloc(maxsize);
+
+    if (!iov[0].iov_base) {
+        PyErr_NoMemory();
+        return NULL;
+    }
+
+    message_header.msg_iov = iov;
+    message_header.msg_iovlen = 1;
+
+    cmsgbuf = malloc(cmsg_size);
+
+    if (!cmsgbuf) {
+        free(iov[0].iov_base);
+        PyErr_NoMemory();
+        return NULL;
+    }
+
+    memset(cmsgbuf, 0, cmsg_size);
+    message_header.msg_control = cmsgbuf;
+    message_header.msg_controllen = cmsg_size;
+
+    recvmsg_result = recvmsg(fd, &message_header, flags);
+    if (recvmsg_result < 0) {
+        PyErr_SetFromErrno(sendmsg_socket_error);
+        goto finished;
+    }
+
+    ancillary = PyList_New(0);
+    if (!ancillary) {
+        goto finished;
+    }
+
+    for (control_message = CMSG_FIRSTHDR(&message_header);
+         control_message;
+         control_message = CMSG_NXTHDR(&message_header,
+                                       control_message)) {
+        PyObject *entry;
+
+        /* Some platforms apparently always fill out the ancillary data
+           structure with a single bogus value if none is provided; ignore it,
+           if that is the case. */
+
+        if ((!(control_message->cmsg_level)) &&
+            (!(control_message->cmsg_type))) {
+            continue;
+        }
+
+        entry = Py_BuildValue(
+            "(iis#)",
+            control_message->cmsg_level,
+            control_message->cmsg_type,
+            CMSG_DATA(control_message),
+            control_message->cmsg_len - sizeof(struct cmsghdr));
+
+        if (!entry) {
+            Py_DECREF(ancillary);
+            goto finished;
+        }
+
+        if (PyList_Append(ancillary, entry) < 0) {
+            Py_DECREF(ancillary);
+            Py_DECREF(entry);
+            goto finished;
+        } else {
+            Py_DECREF(entry);
+        }
+    }
+
+    final_result = Py_BuildValue(
+        "s#iO",
+        iov[0].iov_base,
+        recvmsg_result,
+        message_header.msg_flags,
+        ancillary);
+
+    Py_DECREF(ancillary);
+
+  finished:
+    free(iov[0].iov_base);
+    free(cmsgbuf);
+    return final_result;
+}
+

Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/tap.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/tap.py	2010-03-31 17:55:47 UTC (rev 5429)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/tap.py	2010-03-31 18:03:58 UTC (rev 5430)
@@ -60,6 +60,8 @@
 from twistedcaldav import memcachepool
 from twistedcaldav.notify import installNotificationClient
 
+from twistedcaldav.metafd import ReportingHTTPService
+
 log = Logger()
 
 try:
@@ -504,6 +506,20 @@
     calendarResourceClass        = CalendarHomeProvisioningFile
     timezoneServiceResourceClass = TimezoneServiceFile
 
+
+    def createContextFactory(self):
+        """
+        Create an SSL context factory for use with any SSL socket talking to
+        this server.
+        """
+        return ChainingOpenSSLContextFactory(
+            config.SSLPrivateKey,
+            config.SSLCertificate,
+            certificateChainFile=config.SSLAuthorityChain,
+            passwdCallback=_getSSLPassphrase
+        )
+
+
     def makeService_Slave(self, options):
         #
         # Change default log level to "info" as its useful to have
@@ -742,12 +758,7 @@
                 fd = int(fd)
 
                 try:
-                    contextFactory = ChainingOpenSSLContextFactory(
-                        config.SSLPrivateKey,
-                        config.SSLCertificate,
-                        certificateChainFile=config.SSLAuthorityChain,
-                        passwdCallback=_getSSLPassphrase
-                    )
+                    contextFactory = self.createContextFactory()
                 except SSL.Error, e:
                     log.error("Unable to set up SSL context factory: %s" % (e,))
                 else:
@@ -758,7 +769,16 @@
                     )
                     inheritedService.setServiceParent(service)
 
+        elif config.MetaFD:
+            # Inherit a single socket to receive accept()ed connections via
+            # recvmsg() and SCM_RIGHTS.
+ 
+            fd = int(config.MetaFD)
 
+            ReportingHTTPService(
+                site, fd, self.createContextFactory()
+            ).setServiceParent(service)
+
         else: # Not inheriting, therefore open our own:
 
             channel = HTTP503LoggingFactory(
@@ -800,12 +820,7 @@
                     log.info("Adding SSL server at %s:%s" % (bindAddress, port))
 
                     try:
-                        contextFactory = ChainingOpenSSLContextFactory(
-                            config.SSLPrivateKey,
-                            config.SSLCertificate,
-                            certificateChainFile=config.SSLAuthorityChain,
-                            passwdCallback=_getSSLPassphrase
-                        )
+                        contextFactory = self.createContextFactory()
                     except SSL.Error, e:
                         log.error("Unable to set up SSL context factory: %s" % (e,))
                         log.error("Disabling SSL port: %s" % (port,))

Copied: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/pullpipe.py (from rev 5429, CalendarServer/branches/users/glyph/deployment-plus-sendfd/twistedcaldav/test/pullpipe.py)
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/pullpipe.py	                        (rev 0)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/pullpipe.py	2010-03-31 18:03:58 UTC (rev 5430)
@@ -0,0 +1,26 @@
+#!/usr/bin/python
+# -*- test-case-name: twistedcaldav.test.test_sendmsg -*-
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+if __name__ == '__main__':
+    import sys, os
+    from twistedcaldav.sendfd import recvfd
+    fd, description = recvfd(int(sys.argv[1]))
+    os.write(fd, "Test fixture data: %s.\n" % (description,))
+    os.close(fd)
+
+    

Copied: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/test_sendmsg.py (from rev 5429, CalendarServer/branches/users/glyph/deployment-plus-sendfd/twistedcaldav/test/test_sendmsg.py)
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/test_sendmsg.py	                        (rev 0)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/test_sendmsg.py	2010-03-31 18:03:58 UTC (rev 5430)
@@ -0,0 +1,172 @@
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+import socket
+from os import pipe, read, close, environ
+from twisted.python.filepath import FilePath
+import sys
+
+from twisted.internet.defer import Deferred
+from twisted.internet.error import ProcessDone
+from twisted.trial.unittest import TestCase
+from twisted.internet.defer import inlineCallbacks
+from twisted.internet import reactor
+
+from twistedcaldav.sendmsg import sendmsg, recvmsg
+from twistedcaldav.sendfd import sendfd
+from twisted.internet.protocol import ProcessProtocol
+
+from twistedcaldav.test.test_tap import CleanupHelper
+
+class ExitedWithStderr(Exception):
+    """
+    A process exited with some stderr.
+    """
+
+    def __str__(self):
+        """
+        Dump the errors in a pretty way in the event of a subprocess traceback.
+        """
+        return '\n'.join([''] + list(self.args))
+
+
+class StartStopProcessProtocol(ProcessProtocol):
+    """
+    An L{IProcessProtocol} with a Deferred for events where the subprocess
+    starts and stops.
+    """
+
+    def __init__(self):
+        self.started = Deferred()
+        self.stopped = Deferred()
+        self.output = ''
+        self.errors = ''
+
+    def connectionMade(self):
+        self.started.callback(self.transport)
+
+    def outReceived(self, data):
+        self.output += data
+
+    def errReceived(self, data):
+        self.errors += data
+
+    def processEnded(self, reason):
+        if reason.check(ProcessDone):
+            self.stopped.callback(self.output)
+        else:
+            self.stopped.errback(ExitedWithStderr(
+                    self.errors, self.output))
+
+
+
+def bootReactor():
+    """
+    Yield this from a trial test to bootstrap the reactor in order to avoid
+    PotentialZombieWarning, for tests that use subprocesses.  This hack will no
+    longer be necessary in Twisted 10.1, since U{the underlying bug was fixed
+    <http://twistedmatrix.com/trac/ticket/2078>}.
+    """
+    d = Deferred()
+    reactor.callLater(0, d.callback, None)
+    return d
+
+
+
+class SendmsgTestCase(CleanupHelper, TestCase):
+    """
+    Tests for sendmsg extension module and associated file-descriptor sending
+    functionality in L{twext.python.sendfd}.
+    """
+
+    def setUp(self):
+        """
+        Create a pair of UNIX sockets.
+        """
+        CleanupHelper.setUp(self)
+        self.input, self.output = socket.socketpair(socket.AF_UNIX)
+        def closePair():
+            self.input.close()
+            self.output.close()
+
+        self.addCleanup(closePair)
+
+
+    def test_roundtrip(self):
+        """
+        L{recvmsg} will retrieve a message sent via L{sendmsg}.
+        """
+        sendmsg(self.input.fileno(), "hello, world!", 0)
+
+        result = recvmsg(fd=self.output.fileno())
+        self.assertEquals(result, ("hello, world!", 0, []))
+
+
+    def test_wrongTypeAncillary(self):
+        """
+        L{sendmsg} will show a helpful exception message when given the wrong
+        type of object for the 'ancillary' argument.
+        """
+        error = self.assertRaises(TypeError,
+                                  sendmsg, self.input.fileno(),
+                                  "hello, world!", 0, 4321)
+        self.assertEquals(str(error),
+                          "sendmsg argument 3 expected list, got int")
+
+
+    def spawn(self, script):
+        """
+        Start a script that is a peer of this test as a subprocess.
+
+        @param script: the module name of the script in this directory (no
+            package prefix, no '.py')
+        @type script: C{str}
+
+        @rtype: L{StartStopProcessProtocol}
+        """
+        sspp = StartStopProcessProtocol()
+        reactor.spawnProcess(
+            sspp, sys.executable, [
+                sys.executable,
+                FilePath(__file__).sibling(script + ".py").path,
+                str(self.output.fileno()),
+            ],
+            environ,
+            childFDs={0: "w", 1: "r", 2: "r",
+                      self.output.fileno(): self.output.fileno()}
+        )
+        return sspp
+
+
+    @inlineCallbacks
+    def test_sendSubProcessFD(self):
+        """
+        Calling L{sendsmsg} with SOL_SOCKET, SCM_RIGHTS, and a platform-endian
+        packed file descriptor number should send that file descriptor to a
+        different process, where it can be retrieved by using L{recvmsg}.
+        """
+        yield bootReactor()
+        sspp = self.spawn("pullpipe")
+        yield sspp.started
+        pipeOut, pipeIn = pipe()
+        self.addCleanup(lambda : close(pipeOut))
+        sendfd(self.input.fileno(), pipeIn, "blonk")
+        close(pipeIn)
+        yield sspp.stopped
+        self.assertEquals(read(pipeOut, 1024), "Test fixture data: blonk.\n")
+        # Make sure that the pipe is actually closed now.
+        self.assertEquals(read(pipeOut, 1024), "")
+

Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/test_tap.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/test_tap.py	2010-03-31 17:55:47 UTC (rev 5429)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/test_tap.py	2010-03-31 18:03:58 UTC (rev 5430)
@@ -15,14 +15,21 @@
 ##
 
 import os
+import sys
 from copy import deepcopy
 
-from twisted.trial import unittest
+from zope.interface import implements
+
 from twistedcaldav.test.util import TestCase
 
 from twisted.python.usage import Options, UsageError
 from twisted.python.util import sibpath
 from twisted.python.reflect import namedAny
+from twisted.python.filepath import FilePath
+
+from twisted.internet.interfaces import IProcessTransport, IReactorProcess
+from twisted.internet.defer import Deferred, inlineCallbacks
+
 from twisted.application.service import IService
 from twisted.application import internet
 
@@ -30,6 +37,9 @@
 from twisted.web2.log import LogWrapperResource
 
 from twistedcaldav.tap import CalDAVOptions, CalDAVServiceMaker
+
+from twistedcaldav.cluster import DelayedStartupProcessMonitor, TwistdSlaveProcess
+
 from twistedcaldav import tap
 
 from twistedcaldav.config import config
@@ -40,7 +50,68 @@
 from twistedcaldav.directory.sudo import SudoDirectoryService
 from twistedcaldav.directory.directory import UnknownRecordTypeError
 
+class NotAProcessTransport(object):
+    """
+    Simple L{IProcessTransport} stub.
+    """
+    implements(IProcessTransport)
 
+    def __init__(self, processProtocol, executable, args, env, path,
+                 uid, gid, usePTY, childFDs):
+        """
+        Hold on to all the attributes passed to spawnProcess.
+        """
+        self.processProtocol = processProtocol
+        self.executable = executable
+        self.args = args
+        self.env = env
+        self.path = path
+        self.uid = uid
+        self.gid = gid
+        self.usePTY = usePTY
+        self.childFDs = childFDs
+
+
+class InMemoryProcessSpawner(object):
+    """
+    Stub out L{IReactorProcess.spawnProcess} so that we can examine the
+    interaction of L{DelayedStartupProcessMonitor} and the reactor.
+    """
+    implements(IReactorProcess)
+
+    def __init__(self):
+        """
+        Create some storage to hold on to all the fake processes spawned.
+        """
+        self.processTransports = []
+        self.waiting = []
+
+
+    def waitForOneProcess(self):
+        """
+        Return a L{Deferred} which will fire when spawnProcess has been
+        invoked, with the L{IProcessTransport}.
+        """
+        d = Deferred()
+        self.waiting.append(d)
+        return d
+        
+
+    def spawnProcess(self, processProtocol, executable, args=(), env={},
+                     path=None, uid=None, gid=None, usePTY=0,
+                     childFDs=None):
+
+        transport = NotAProcessTransport(
+            processProtocol, executable, args, env, path, uid, gid, usePTY,
+            childFDs
+        )
+        self.processTransports.append(transport)
+        if self.waiting:
+            self.waiting.pop(0).callback(transport)
+        return transport
+
+
+
 class TestCalDAVOptions(CalDAVOptions):
     """
     A fake implementation of CalDAVOptions that provides
@@ -673,3 +744,162 @@
         self.failUnless(isinstance(
                 realDirectory,
                 configuredDirectory))
+
+
+
+
+class DummyProcessObject(object):
+    """
+    Simple stub for the Process Object API that will run a test script.
+
+    This is a stand in for L{TwistdSlaveProcess}.
+    """
+
+    def __init__(self, scriptname, *args):
+        self.scriptname = scriptname
+        self.args = list(args)
+
+
+    def getCommandLine(self):
+        """
+        Get the command line to invoke this script.
+        """
+        return [sys.executable,
+                FilePath(__file__).sibling(self.scriptname).path] + self.args
+
+
+    def getFileDescriptors(self):
+        """
+        Return a dummy, empty mapping of file descriptors.
+        """
+        return {}
+
+
+    def getName(self):
+        """
+        Get a dummy name.
+        """
+        return 'Dummy'
+
+
+class CleanupHelper(object):
+    """
+    Emulate a more recent version of Twisted by providing addCleanup.
+    """
+
+    def setUp(self):
+        """
+        Emulate a more recent version of Twisted; initialize a list of
+        callables to run during cleanup.
+        """
+        self.cleanups = []
+
+
+    def addCleanup(self, thunk):
+        """
+        Emulate a more recent version of Trial; add a method to be run during
+        teardown.
+        """
+        self.cleanups.append(thunk)
+
+
+    def tearDown(self):
+        """
+        Emulate a more recent version of Trial; run all registered cleanup
+        functions.
+        """
+        from twisted.python import log
+        for cleanup in self.cleanups:
+            try:
+                cleanup()
+            except:
+                log.err()
+        
+
+
+
+class DelayedStartupProcessMonitorTests(CleanupHelper, TestCase):
+    """
+    Test cases for L{DelayedStartupProcessMonitor}.
+    """
+
+    @inlineCallbacks
+    def test_acceptDescriptorInheritance(self):
+        """
+        If a L{TwistdSlaveProcess} specifies some file descriptors to be
+        inherited, they should be inherited by the subprocess.
+        """
+        dspm         = DelayedStartupProcessMonitor()
+        dspm.reactor = InMemoryProcessSpawner()
+
+        # Most arguments here will be ignored, so these are bogus values.
+        slave = TwistdSlaveProcess(
+            twistd        = "bleh",
+            tapname       = "caldav",
+            configFile    = "/does/not/exist",
+            id            = 10,
+            interfaces    = '127.0.0.1',
+            port          = None,
+            sslPort       = None,
+            inheritFDs    = [3, 7],
+            inheritSSLFDs = [19, 25],
+        )
+
+        dspm.addProcessObject(slave, {})
+        dspm.startService()
+        self.addCleanup(dspm.consistency.cancel)
+        # We can easily stub out spawnProcess, because caldav calls it, but a
+        # bunch of callLater calls are buried in procmon itself, so we need to
+        # use the real clock.
+        oneProcessTransport = yield dspm.reactor.waitForOneProcess()
+        self.assertEquals(oneProcessTransport.childFDs,
+                          {0: 'w', 1: 'r', 2: 'r',
+                           3: 3, 7: 7,
+                           19: 19, 25: 25})
+    @inlineCallbacks
+    def test_metaDescriptorInheritance(self):
+        """
+        If a L{TwistdSlaveProcess} specifies a meta-file-descriptor to be
+        inherited, it should be inherited by the subprocess, and a
+        configuration argument should be passed that indicates to the
+        subprocess.
+        """
+        dspm         = DelayedStartupProcessMonitor()
+        dspm.reactor = InMemoryProcessSpawner()
+        class FakeFD:
+            def __init__(self, n):
+                self.fd = n
+            def fileno(self):
+                return self.fd
+
+        class FakeDispatcher:
+            n = 3
+            def addSocket(self):
+                self.n += 1
+                return FakeFD(self.n)
+
+        # Most arguments here will be ignored, so these are bogus values.
+        slave = TwistdSlaveProcess(
+            twistd     = "bleh",
+            tapname    = "caldav",
+            configFile = "/does/not/exist",
+            id         = 10,
+            port          = None,
+            sslPort       = None,
+            interfaces = '127.0.0.1',
+            dispatcher = FakeDispatcher()
+        )
+
+        dspm.addProcessObject(slave, {})
+        dspm.startService()
+        self.addCleanup(dspm.consistency.cancel)
+        oneProcessTransport = yield dspm.reactor.waitForOneProcess()
+        self.assertIn("MetaFD=4", oneProcessTransport.args)
+        self.assertEquals(
+            oneProcessTransport.args[oneProcessTransport.args.index("MetaFD=4")-1],
+            '-o',
+            "MetaFD argument was not passed as an option"
+        )
+        self.assertEquals(oneProcessTransport.childFDs,
+                          {0: 'w', 1: 'r', 2: 'r',
+                           4: 4})
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20100331/e2361a29/attachment-0001.html>


More information about the calendarserver-changes mailing list