[CalendarServer-changes] [5400] CalendarServer/branches/users/glyph/sendfdport

source_changes at macosforge.org source_changes at macosforge.org
Thu Mar 25 15:55:52 PDT 2010


Revision: 5400
          http://trac.macosforge.org/projects/calendarserver/changeset/5400
Author:   glyph at apple.com
Date:     2010-03-25 15:55:52 -0700 (Thu, 25 Mar 2010)
Log Message:
-----------
enough to start up without tracebacks, but not quite enough to actually finish a connection

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/caldav.py
    CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/test/test_caldav.py
    CalendarServer/branches/users/glyph/sendfdport/twext/internet/tcp.py
    CalendarServer/branches/users/glyph/sendfdport/twext/web2/channel/http.py
    CalendarServer/branches/users/glyph/sendfdport/twistedcaldav/stdconfig.py

Added Paths:
-----------
    CalendarServer/branches/users/glyph/sendfdport/twext/internet/sendfdport.py

Modified: CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/caldav.py	2010-03-25 18:39:01 UTC (rev 5399)
+++ CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/caldav.py	2010-03-25 22:55:52 UTC (rev 5400)
@@ -53,8 +53,12 @@
 from twext.python.log import logLevelForNamespace, setLogLevelForNamespace
 from twext.internet.ssl import ChainingOpenSSLContextFactory
 from twext.internet.tcp import MaxAcceptTCPServer, MaxAcceptSSLServer
-from twext.web2.channel.http import LimitingHTTPFactory, SSLRedirectRequest
+from twext.internet.sendfdport import (
+    InheritedSocketDispatcher, InheritingProtocolFactory)
 
+from twext.web2.channel.http import (
+    LimitingHTTPFactory, SSLRedirectRequest, ReportingHTTPFactory)
+
 try:
     from twistedcaldav.version import version
 except ImportError:
@@ -635,8 +639,31 @@
         elif config.MetaFD:
             fd = int(config.MetaFD)
 
-            # XXX sendmsg()-FD case
+            def myTransportFactory(skt, data, protocol):
+                from twisted.internet.tcp import Server
+                from twisted.internet import reactor
+                transport = Server(skt, protocol,
+                                   skt.getpeername(), skt.getsockname(),
+                                   4321, reactor)
+                if data == 'SSL':
+                    transport.startTLS(self.createContextFactory())
+                transport.startReading()
+                return transport
 
+            from twext.internet.sendfdport import InheritedPort
+            # Unlike other 'factory' constructions, config.MaxRequests and
+            # config.MaxAccepts are dealt with in the master process, so we
+            # don't need to propagate them here.
+            reportingFactory = ReportingHTTPFactory(site, vary=True)
+
+            reportingFactory.inheritedPort = InheritedPort(
+                fd, myTransportFactory, reportingFactory
+            )
+
+            # XXX for correctness, we need a service here, not just a Port;
+            # this should be in startService.
+            reportingFactory.inheritedPort.startReading()
+
         else: # Not inheriting, therefore we open our own:
 
             if not config.BindAddresses:
@@ -790,6 +817,17 @@
 
         s._inheritedSockets = [] # keep a reference to these so they don't close
 
+        if config.UseMetaFD:
+            def sortAsInts(x):
+                # "int" by itself isn't quite good enough, unfortunately,
+                # because it can't handle None...
+                if x is None:
+                    return 0
+                else:
+                    return int(x)
+                                
+            dispatcher = InheritedSocketDispatcher(sortAsInts)
+
         for bindAddress in config.BindAddresses:
             if config.BindHTTPPorts:
                 if config.HTTPPort == 0:
@@ -807,36 +845,53 @@
             elif config.SSLPort != 0:
                 config.BindSSLPorts = [config.SSLPort]
 
-            def _openSocket(addr, port):
-                log.info("Opening socket for inheritance at %s:%d" % (addr, port))
-                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-                sock.setblocking(0)
-                sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-                sock.bind((addr, port))
-                sock.listen(config.ListenBacklog)
-                s._inheritedSockets.append(sock)
-                return sock
+            if config.UseMetaFD:
+                # XXX no automated tests for this whole block.  How to test it?
 
-            for portNum in config.BindHTTPPorts:
-                sock = _openSocket(bindAddress, int(portNum))
-                inheritFDs.append(sock.fileno())
+                for ports, description in [(config.BindSSLPorts, "SSL"),
+                                           (config.BindHTTPPorts, "TCP")]:
+                    for port in ports:
+                        TCPServer(
+                            port, InheritingProtocolFactory(dispatcher, description),
+                            interface=bindAddress,
+                            backlog=config.ListenBacklog
+                        ).setServiceParent(s)
+                # Okay, now for each subprocess I need to add a thing to the dispatcher
+            else:
+                def _openSocket(addr, port):
+                    log.info("Opening socket for inheritance at %s:%d" % (addr, port))
+                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                    sock.setblocking(0)
+                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+                    sock.bind((addr, port))
+                    sock.listen(config.ListenBacklog)
+                    s._inheritedSockets.append(sock)
+                    return sock
 
-            for portNum in config.BindSSLPorts:
-                sock = _openSocket(bindAddress, int(portNum))
-                inheritSSLFDs.append(sock.fileno())
+                for portNum in config.BindHTTPPorts:
+                    sock = _openSocket(bindAddress, int(portNum))
+                    inheritFDs.append(sock.fileno())
 
+                for portNum in config.BindSSLPorts:
+                    sock = _openSocket(bindAddress, int(portNum))
+                    inheritSSLFDs.append(sock.fileno())
 
+
+
         for p in xrange(0, config.MultiProcess.ProcessCount):
+            if config.UseMetaFD:
+                extraArgs = dict(metaFD=dispatcher.addSocket())
+            else:
+                extraArgs = dict(inheritFDs=inheritFDs,
+                                 inheritSSLFDs=inheritSSLFDs)
             process = TwistdSlaveProcess(
                 sys.argv[0],
                 self.tapname,
                 options["config"],
                 p,
                 config.BindAddresses,
-                inheritFDs=inheritFDs,
-                inheritSSLFDs=inheritSSLFDs
+                **extraArgs
             )
-
             monitor.addProcessObject(process, parentEnv)
 
 
@@ -971,11 +1026,16 @@
 
     @ivar inheritFDs: File descriptors to be inherited for calling accept() on
         in the subprocess.
-    @type inheritFDs: C{list} of C{int}
+    @type inheritFDs: C{list} of C{int}, or C{None}
 
     @ivar inheritSSLFDs: File descriptors to be inherited for calling accept()
         on in the subprocess, and speaking TLS on the resulting sockets.
-    @type inheritSSLFDs: C{list} of C{int}
+    @type inheritSSLFDs: C{list} of C{int}, or C{None}
+
+    @ivar metaFD: a UNIX socket which will be used to send file descriptors
+        down to the slave process.
+
+    @type metaFD: L{socket.socket}, or C{None}
     """
     prefix = "caldav"
 
@@ -1012,7 +1072,7 @@
         fds = {}
         maybeMetaFD = []
         if self.metaFD:
-            maybeMetaFD.append(self.metaFD)
+            maybeMetaFD.append(self.metaFD.fileno())
         for fd in self.inheritSSLFDs + self.inheritFDs + maybeMetaFD:
             fds[fd] = fd
         return fds
@@ -1067,8 +1127,10 @@
             ])
 
         if self.metaFD:
+            # XXX this FD is never closed in the parent.  should it be?
+            # (should they *all* be?) -glyph
             args.extend([
-                    "-o", "MetaFD=%s" % (self.metaFD,)
+                    "-o", "MetaFD=%s" % (self.metaFD.fileno(),)
                 ])
 
         return args
@@ -1091,7 +1153,7 @@
     start.  It also specializes process-starting to allow for process objects
     to
 
-    @ivar processObjects: a list of L{TwistdSlaveProcess} to add using
+    @ivar processObjects: a C{list} of L{TwistdSlaveProcess} to add using
         C{self.addProcess} when this service starts up.
 
     @ivar _extraFDs: a mapping from process names to extra file-descriptor
@@ -1106,9 +1168,6 @@
 
     def __init__(self, *args, **kwargs):
         procmon.ProcessMonitor.__init__(self, *args, **kwargs)
-
-        # processObjects stores TwistdSlaveProcesses which need to have their
-        # command-lines determined just in time
         self.processObjects = []
         self._extraFDs = {}
         from twisted.internet import reactor
@@ -1214,6 +1273,7 @@
         )
 
 
+
 class DelayedStartupLineLogger(object):
     """
     A line logger that can handle very long lines.

Modified: CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/test/test_caldav.py
===================================================================
--- CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/test/test_caldav.py	2010-03-25 18:39:01 UTC (rev 5399)
+++ CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/test/test_caldav.py	2010-03-25 22:55:52 UTC (rev 5400)
@@ -987,6 +987,9 @@
         """
         dspm         = DelayedStartupProcessMonitor()
         dspm.reactor = InMemoryProcessSpawner()
+        class FakeFD:
+            def fileno(self):
+                return 4
 
         # Most arguments here will be ignored, so these are bogus values.
         slave = TwistdSlaveProcess(
@@ -995,19 +998,19 @@
             configFile    = "/does/not/exist",
             id            = 10,
             interfaces    = '127.0.0.1',
-            metaFD        = 4
+            metaFD        = FakeFD()
         )
 
         dspm.addProcessObject(slave, {})
         dspm.startService()
         self.addCleanup(dspm.consistency.cancel)
         oneProcessTransport = yield dspm.reactor.waitForOneProcess()
-        self.assertEquals(oneProcessTransport.childFDs,
-                          {0: 'w', 1: 'r', 2: 'r',
-                           4: 4})
         self.assertIn("MetaFD=4", oneProcessTransport.args)
         self.assertEquals(
             oneProcessTransport.args[oneProcessTransport.args.index("MetaFD=4")-1],
             '-o',
             "MetaFD argument was not passed as an option"
         )
+        self.assertEquals(oneProcessTransport.childFDs,
+                          {0: 'w', 1: 'r', 2: 'r',
+                           4: 4})

Added: CalendarServer/branches/users/glyph/sendfdport/twext/internet/sendfdport.py
===================================================================
--- CalendarServer/branches/users/glyph/sendfdport/twext/internet/sendfdport.py	                        (rev 0)
+++ CalendarServer/branches/users/glyph/sendfdport/twext/internet/sendfdport.py	2010-03-25 22:55:52 UTC (rev 5400)
@@ -0,0 +1,287 @@
+# -*- test-case-name: twext.internet.test.test_sendfdport -*-
+##
+# Copyright (c) 2005-2009 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Implementation of a TCP/SSL port that uses sendmsg/recvmsg as implemented by
+L{twext.python.sendfd}.
+"""
+
+from os import close
+from errno import EAGAIN
+from socket import socketpair, fromfd, error as SocketError, AF_INET, SOCK_STREAM
+
+from twisted.python import log
+
+from twisted.internet.abstract import FileDescriptor
+from twisted.internet.protocol import Protocol, Factory
+
+from twext.python.sendmsg import sendmsg, recvmsg
+from twext.python.sendfd import sendfd, recvfd
+
+class InheritingProtocol(Protocol, object):
+    """
+    When a connection comes in on this protocol, stop reading and writing, and
+    dispatch the socket to another process via its factory.
+    """
+
+    def connectionMade(self):
+        """
+        A connection was received; transmit the file descriptor to another
+        process via L{InheritingProtocolFactory} and remove my transport from
+        the reactor.
+        """
+        self.transport.stopReading()
+        self.transport.stopWriting()
+        skt = self.transport.getHandle()
+        # actually I want to retrieve a child from a *pool* of potentially
+        # available children.  i have to make that determination based on the
+        # number of connections each child is currently handling.  which means
+        # the logic shouldn't live here.  where should it live?  in this spot,
+        # I just need an FD.
+        self.factory.sendSocket(skt)
+
+
+
+class InheritingProtocolFactory(Factory, object):
+    """
+    In the 'master' process, make one of these and hook it up to the sockets
+    where you want to hear stuff.
+
+    @ivar dispatcher: an L{InheritedSocketDispatcher} to use to dispatch
+        incoming connections to an appropriate subprocess.
+
+    @ivar description: the string to send along with connections received on
+        this factory.
+    """
+
+    protocol = InheritingProtocol
+
+    def __init__(self, dispatcher, description):
+        self.dispatcher = dispatcher
+        self.description = description
+
+
+    def sendSocket(self, socketObject):
+        """
+        Send the given socket object on to my dispatcher.
+        """
+        self.dispatcher.sendFileDescriptor(socketObject, self.description)
+
+
+
+class _AvailableConnection(FileDescriptor, object):
+    """
+    A socket in the master process pointing at a file descriptor that can be
+    used to transmit sockets to a subprocess.
+
+    @ivar skt: the UNIX socket used as the sendmsg() transport.
+
+    @ivar outgoingSocketQueue: an outgoing queue of sockets to send to the
+        subprocess.
+
+    @ivar outgoingSocketQueue: a C{list} of 2-tuples of C{(socket-object, str)}
+
+    @ivar status: a record of the last status message received (via recvmsg)
+        from the subprocess: this is an application-specific indication of how
+        ready this subprocess is to receive more connections.  A typical usage
+        would be to count the open connections: this is what is passed to 
+
+    @type status: C{str}
+    """
+
+    def __init__(self, reactor, skt):
+        FileDescriptor.__init__(self, reactor)
+        self.status = None
+        self.skt = skt          # XXX needs to be set non-blocking by somebody
+        self.fileno = skt.fileno
+        self.outgoingSocketQueue = []
+
+
+    def sendSocketToPeer(self, skt, description):
+        """
+        Enqueue a socket to send to the subprocess.
+        """
+        self.outgoingSocketQueue.append((skt, description))
+        self.startWriting()
+
+
+    def doRead(self):
+        """
+        Receive a status / health message and record it.
+        """
+        try:
+            data, flags, ancillary = recvmsg(self.fd)
+        except SocketError:
+            pass                # handle EAGAIN, etc
+        else:
+            self.status = data
+
+
+    def doWrite(self):
+        """
+        Transmit as many queued pending file descriptors as we can.
+        """
+        while self.outgoingSocketQueue:
+            skt, desc = self.outgoingSocketQueue.pop(0)
+            try:
+                sendfd(self.skt.fileno(), skt.fileno(), desc)
+            except SocketError, se:
+                if se.errno == EAGAIN:
+                    self.outgoingSocketQueue.insert(0, (skt, desc))
+                    return
+                raise
+        if not self.outgoingSocketQueue:
+            self.stopWriting()
+
+
+
+class InheritedSocketDispatcher(object):
+    """
+    Used by one or more L{InheritingProtocolFactory}s, this keeps track of a
+    list of available sockets in subprocesses and sends inbound connections towards them.
+
+    @ivar fitnessFunction: a function used to evaluate status messages received
+        on available subprocess connections.  a 1-argument function which
+        accepts a string - or C{None}, if no status has been reported - and
+        returns something sortable.  this will be used to sort all available
+        status messages; the lowest sorting result will be used to handle the
+        new connection.
+    """
+
+    def __init__(self, fitnessFunction):
+        """
+        Create a socket dispatcher.
+        """
+        self.availableConnections = []
+        self.fitnessFunction = fitnessFunction
+        from twisted.internet import reactor
+        self.reactor = reactor
+
+
+    def sendFileDescriptor(self, skt, description):
+        """
+        A connection has been received.  Dispatch it.
+
+        @param skt: a socket object
+
+        @param description: some text to identify to the subprocess's
+            L{InheritedPort} what type of transport to create for this socket.
+        """
+        self.availableConnections.sort(key=lambda conn:
+                                           self.fitnessFunction(conn.status))
+
+
+    def addSocket(self):
+        """
+        Add a C{sendmsg()}-oriented AF_UNIX socket to the pool of sockets being
+        used for transmitting file descriptors to child processes.
+
+        @return: a socket object for the receiving side; pass this object's
+            C{fileno()} as part of the C{childFDs} argument to
+            C{spawnProcess()}, then close it.
+        """
+        i, o = socketpair()
+        a = _AvailableConnection(self.reactor, o)
+        self.availableConnections.append(a)
+        return i
+
+
+
+class InheritedPort(FileDescriptor, object):
+    """
+    Create this in the 'slave' process to handle incoming connections
+    dispatched via C{sendmsg}.
+    """
+
+    def __init__(self, fd, transportFactory, protocolFactory):
+        """
+        @param fd: a file descriptor
+
+        @type fd: C{int}
+        
+        @param transportFactory: a 3-argument function that takes the socket
+            object produced from the file descriptor, the (non-ancillary) data
+            sent along with the incoming file descriptor, and the protocol
+            built along with it, and returns an L{ITransport} provider.  Note
+            that this should NOT call C{makeConnection} on the protocol that it
+            produces, as this class will do that.
+
+        @param protocolFactory: an L{IProtocolFactory}
+        """
+        FileDescriptor.__init__(self)
+        self.fd = fd
+        self.transportFactory = transportFactory
+        self.protocolFactory = protocolFactory
+
+
+    def fileno(self):
+        """
+        Get the FD number for this socket.
+        """
+        return self.fd
+
+
+    def doRead(self):
+        """
+        A message is ready to read.  Receive a file descriptor from our parent
+        process.
+        """
+        try:
+            fd, description = recvfd(self.fd)
+        except SocketError, se:
+            if se.errno != EAGAIN:
+                raise
+        else:
+            try:
+                skt = fromfd(fd, AF_INET, SOCK_STREAM)
+                # XXX it could be AF_UNIX, I guess?  or even something else?
+                # should this be on the transportFactory's side of things?
+
+                close(fd)       # fromfd() calls dup()
+                peeraddr = skt.getpeername()
+                protocol = self.protocolFactory.buildProtocol(peeraddr)
+                transport = self.transportFactory(skt, description, protocol)
+                protocol.makeConnection(transport)
+            except:
+                log.err()
+
+
+    def doWrite(self):
+        """
+        Write some data.
+        """
+        while self.statusQueue:
+            msg = self.statusQueue.pop(0)
+            try:
+                sendmsg(self.fd, msg, 0)
+            except SocketError, se:
+                if se.errno == EAGAIN:
+                    self.statusQueue.insert(0, msg)
+                    return
+                raise
+        self.stopWriting()
+
+
+    def reportStatus(self, statusMessage):
+        """
+        Report a status message to the L{_AvailableConnection} monitoring this
+        L{InheritedPort}'s health in the master process.
+        """
+        # XXX this has got to be invoked from 
+        self.statusQueue.append(statusMessage)
+        self.startWriting()
+        

Modified: CalendarServer/branches/users/glyph/sendfdport/twext/internet/tcp.py
===================================================================
--- CalendarServer/branches/users/glyph/sendfdport/twext/internet/tcp.py	2010-03-25 18:39:01 UTC (rev 5399)
+++ CalendarServer/branches/users/glyph/sendfdport/twext/internet/tcp.py	2010-03-25 22:55:52 UTC (rev 5400)
@@ -39,21 +39,28 @@
     Mixin for resetting maxAccepts.
     """
     def doRead(self):
-        self.numberAccepts = min(self.factory.maxRequests - self.factory.outstandingRequests, self.factory.maxAccepts)
+        self.numberAccepts = min(
+            self.factory.maxRequests - self.factory.outstandingRequests,
+            self.factory.maxAccepts
+        )
         tcp.Port.doRead(self)
 
+
+
 class MaxAcceptTCPPort(MaxAcceptPortMixin, tcp.Port):
     """
     Use for non-inheriting tcp ports.
     """
-    pass
 
+
+
 class MaxAcceptSSLPort(MaxAcceptPortMixin, ssl.Port):
     """
     Use for non-inheriting SSL ports.
     """
-    pass
 
+
+
 class InheritedTCPPort(MaxAcceptTCPPort):
     """
     A tcp port which uses an inherited file descriptor.
@@ -100,6 +107,9 @@
     """
     TCP server which will uses MaxAcceptTCPPorts (and optionally,
     inherited ports)
+
+    @ivar myPort: When running, this is set to the L{IListeningPort} being
+        managed by this service.
     """
 
     def __init__(self, *args, **kwargs):

Modified: CalendarServer/branches/users/glyph/sendfdport/twext/web2/channel/http.py
===================================================================
--- CalendarServer/branches/users/glyph/sendfdport/twext/web2/channel/http.py	2010-03-25 18:39:01 UTC (rev 5399)
+++ CalendarServer/branches/users/glyph/sendfdport/twext/web2/channel/http.py	2010-03-25 22:55:52 UTC (rev 5400)
@@ -739,7 +739,7 @@
         
     def connectionMade(self):
         self.setTimeout(self.inputTimeOut)
-        self.factory.outstandingRequests+=1
+        self.factory.addConnectedChannel(self)
     
     def lineReceived(self, line):
         if self._first_line:
@@ -928,7 +928,7 @@
             self.transport.loseConnection()
         
     def connectionLost(self, reason):
-        self.factory.outstandingRequests-=1
+        self.factory.removeConnectedChannel(self)
 
         self._writeLost = True
         self.readConnectionLost()
@@ -951,19 +951,30 @@
         self.transport.loseConnection()
 
 class HTTPFactory(protocol.ServerFactory):
-    """Factory for HTTP server."""
+    """
+    Factory for HTTP server.
 
+    @ivar outstandingRequests: the number of currently connected HTTP channels.
+
+    @type outstandingRequests: C{int}
+
+    @ivar connectedChannels: all the channels that have currently active
+    connections.
+
+    @type connectedChannels: C{set} of L{HTTPChannel}
+    """
+
     protocol = HTTPChannel
     
     protocolArgs = None
 
-    outstandingRequests = 0
-    
     def __init__(self, requestFactory, maxRequests=600, **kwargs):
-        self.maxRequests=maxRequests
+        self.maxRequests = maxRequests
         self.protocolArgs = kwargs
-        self.protocolArgs['requestFactory']=requestFactory
-        
+        self.protocolArgs['requestFactory'] = requestFactory
+        self.connectedChannels = set()
+
+
     def buildProtocol(self, addr):
         if self.outstandingRequests >= self.maxRequests:
             return OverloadedServerProtocol()
@@ -975,6 +986,27 @@
         return p
 
 
+    def addConnectedChannel(self, channel):
+        """
+        Add a connected channel to the set of currently connected channels and
+        increase the outstanding request count.
+        """
+        self.connectedChannels.add(channel)
+
+
+    def removeConnectedChannel(self, channel):
+        """
+        Remove a connected channel from the set of currently connected channels
+        and decrease the outstanding request count.
+        """
+        self.connectedChannels.remove(channel)
+
+
+    @property
+    def outstandingRequests(self):
+        return len(self.connectedChannels)
+
+
 class HTTP503LoggingFactory (HTTPFactory):
     """
     Factory for HTTP server which emits a 503 response when overloaded.
@@ -1087,40 +1119,89 @@
 
 
 
-class LimitingHTTPChannel(HTTPChannel):
-    """ HTTPChannel that takes itself out of the reactor once it has enough
-        requests in flight.
+class LimitingHTTPFactory(HTTPFactory):
     """
+    HTTPFactory which stores maxAccepts on behalf of the MaxAcceptPortMixin
 
-    def connectionMade(self):
-        HTTPChannel.connectionMade(self)
-        if self.factory.outstandingRequests >= self.factory.maxRequests:
-            self.factory.myServer.myPort.stopReading()
-
-    def connectionLost(self, reason):
-        HTTPChannel.connectionLost(self, reason)
-        if self.factory.outstandingRequests < self.factory.maxRequests:
-            self.factory.myServer.myPort.startReading()
-
-class LimitingHTTPFactory(HTTPFactory):
-    """ HTTPFactory which stores maxAccepts on behalf of the MaxAcceptPortMixin
+    @ivar myServer: a reference to a L{MaxAcceptTCPServer} that this
+        L{LimitingHTTPFactory} will limit.  This must be set externally.
     """
 
-    protocol = LimitingHTTPChannel
-
     def __init__(self, requestFactory, maxRequests=600, maxAccepts=100,
         **kwargs):
         HTTPFactory.__init__(self, requestFactory, maxRequests, **kwargs)
         self.maxAccepts = maxAccepts
 
     def buildProtocol(self, addr):
-
+        """
+        Override L{HTTPFactory.buildProtocol} in order to avoid ever returning
+        an L{OverloadedServerProtocol}; this should be handled in other ways.
+        """
         p = protocol.ServerFactory.buildProtocol(self, addr)
         for arg, value in self.protocolArgs.iteritems():
             setattr(p, arg, value)
         return p
 
+    def addConnectedChannel(self, channel):
+        """
+        Override L{HTTPFactory.addConnectedChannel} to pause listening on the
+        socket when there are too many outstanding channels.
+        """
+        HTTPFactory.addConnectedChannel(self, channel)
+        if self.outstandingRequests >= self.maxRequests:
+            self.myServer.myPort.stopReading()
 
+
+    def removeConnectedChannel(self, channel):
+        """
+        Override L{HTTPFactory.addConnectedChannel} to resume listening on the
+        socket when there are too many outstanding channels.
+        """
+        HTTPFactory.removeConnectedChannel(self, channel)
+        if self.outstandingRequests < self.maxRequests:
+            self.myServer.myPort.startReading()
+
+
+
+class ReportingHTTPFactory(HTTPFactory):
+    """
+    An L{HTTPFactory} which reports its status to a
+    L{twext.internet.sendfdport.InheritedPort}.
+
+    @ivar inheritedPort: an L{InheritedPort} to report status (the current
+        number of outstanding connections) to.  Since this - the
+        L{ReportingHTTPFactory} - needs to be instantiated to be passed to
+        L{InheritedPort}'s constructor, this attribute must be set afterwards
+        but before any connections have occurred.
+    """
+
+    def _report(self):
+        """
+        Report the current number of open channels to the listening socket in
+        the parent process.
+        """
+        self.inheritedPort.reportStatus(str(self.outstandingRequests))
+
+
+    def addConnectedChannel(self, channel):
+        """
+        Add the connected channel, and report the current number of open
+        channels to the listening socket in the parent process.
+        """
+        HTTPFactory.addConnectedChannel(self, channel)
+        self._report()
+
+
+    def removeConnectedChannel(self, channel):
+        """
+        Remove the connected channel, and report the current number of open
+        channels to the listening socket in the parent process.
+        """
+        HTTPFactory.removeConnectedChannel(self, channel)
+        self._report()
+
+
+
 __all__ = [
     "HTTPFactory",
     "HTTP503LoggingFactory",

Modified: CalendarServer/branches/users/glyph/sendfdport/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/branches/users/glyph/sendfdport/twistedcaldav/stdconfig.py	2010-03-25 18:39:01 UTC (rev 5399)
+++ CalendarServer/branches/users/glyph/sendfdport/twistedcaldav/stdconfig.py	2010-03-25 22:55:52 UTC (rev 5400)
@@ -143,6 +143,9 @@
     "InheritSSLFDs": [],   # File descriptors to inherit for HTTPS requests (empty = don't inherit)
     "MetaFD": 0,        # Inherited file descriptor to call recvmsg() on to recive sockets (none = don't inherit)
 
+    "UseMetaFD": False,         # Use a 'meta' FD, i.e. an FD to transmit other
+                                # FDs to slave processes.
+
     #
     # Types of service provided
     #
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20100325/82f48f18/attachment-0001.html>


More information about the calendarserver-changes mailing list