[CalendarServer-changes] [5414] CalendarServer/branches/users/glyph/sendfdport

source_changes at macosforge.org source_changes at macosforge.org
Tue Mar 30 11:31:29 PDT 2010


Revision: 5414
          http://trac.macosforge.org/projects/calendarserver/changeset/5414
Author:   glyph at apple.com
Date:     2010-03-30 11:31:28 -0700 (Tue, 30 Mar 2010)
Log Message:
-----------
put the HTTP-specific code into a separate module.

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/caldav.py

Added Paths:
-----------
    CalendarServer/branches/users/glyph/sendfdport/twistedcaldav/metafd.py

Modified: CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/caldav.py	2010-03-30 17:27:52 UTC (rev 5413)
+++ CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/caldav.py	2010-03-30 18:31:28 UTC (rev 5414)
@@ -53,11 +53,8 @@
 from twext.python.log import logLevelForNamespace, setLogLevelForNamespace
 from twext.internet.ssl import ChainingOpenSSLContextFactory
 from twext.internet.tcp import MaxAcceptTCPServer, MaxAcceptSSLServer
-from twext.internet.sendfdport import (
-    InheritedSocketDispatcher, InheritingProtocolFactory)
 
-from twext.web2.channel.http import (
-    LimitingHTTPFactory, SSLRedirectRequest, HTTPFactory)
+from twext.web2.channel.http import LimitingHTTPFactory, SSLRedirectRequest
 
 try:
     from twistedcaldav.version import version
@@ -80,6 +77,8 @@
 from twistedcaldav.upgrade import upgradeData
 from twistedcaldav.util import getNCPU
 
+from twistedcaldav.metafd import ConnectionLimiter, ReportingHTTPService
+
 try:
     from twistedcaldav.authkerb import NegotiateCredentialFactory
     NegotiateCredentialFactory  # pacify pyflakes
@@ -639,39 +638,12 @@
         elif config.MetaFD:
             fd = int(config.MetaFD)
 
-            def myTransportFactory(skt, data, protocol):
-                from twisted.internet.tcp import Server
-                from twisted.internet import reactor
+            ReportingHTTPService(
+                site, fd, self.createContextFactory()
+            ).setServiceParent(service)
 
-                class JustEnoughLikeAPort(object):
-                    """
-                    Fake out just enough of L{tcp.Port} to be acceptable to
-                    L{tcp.Server}...
-                    """
-                    _realPortNumber = 'inherited'
+            # XXX put the code back
 
-                transport = Server(skt, protocol,
-                                   skt.getpeername(), JustEnoughLikeAPort,
-                                   4321, reactor)
-                if data == 'SSL':
-                    transport.startTLS(self.createContextFactory())
-                transport.startReading()
-                return transport
-
-            from twext.internet.sendfdport import InheritedPort
-            # Unlike other 'factory' constructions, config.MaxRequests and
-            # config.MaxAccepts are dealt with in the master process, so we
-            # don't need to propagate them here.
-            reportingFactory = ReportingHTTPFactory(site, vary=True)
-
-            reportingFactory.inheritedPort = InheritedPort(
-                fd, myTransportFactory, reportingFactory
-            )
-
-            # XXX for correctness, we need a service here, not just a Port;
-            # this should be in startService.
-            reportingFactory.inheritedPort.startReading()
-
         else: # Not inheriting, therefore we open our own:
 
             if not config.BindAddresses:
@@ -823,12 +795,13 @@
         inheritFDs = []
         inheritSSLFDs = []
 
-        s._inheritedSockets = [] # keep a reference to these so they don't close
-
         if config.UseMetaFD:
             cl = ConnectionLimiter(config.MaxAccepts,
                                    (config.MaxRequests *
                                     config.MultiProcess.ProcessCount))
+            cl.setServiceParent(s)
+        else:
+            s._inheritedSockets = [] # keep a reference to these so they don't close
 
         for bindAddress in config.BindAddresses:
             if config.BindHTTPPorts:
@@ -851,11 +824,7 @@
                 for ports, description in [(config.BindSSLPorts, "SSL"),
                                            (config.BindHTTPPorts, "TCP")]:
                     for port in ports:
-                        MaxAcceptTCPServer(
-                            port, cl.createFactory(description),
-                            interface=bindAddress,
-                            backlog=config.ListenBacklog
-                        ).setServiceParent(s)
+                        cl.addPortService(description, port, bindAddress, config.ListenBacklog)
             else:
                 def _openSocket(addr, port):
                     log.info("Opening socket for inheritance at %s:%d" % (addr, port))
@@ -1013,161 +982,6 @@
 
 
 
-
-
-class ReportingHTTPFactory(HTTPFactory):
-    """
-    An L{HTTPFactory} which reports its status to a
-    L{twext.internet.sendfdport.InheritedPort}.
-
-    @ivar inheritedPort: an L{InheritedPort} to report status (the current
-        number of outstanding connections) to.  Since this - the
-        L{ReportingHTTPFactory} - needs to be instantiated to be passed to
-        L{InheritedPort}'s constructor, this attribute must be set afterwards
-        but before any connections have occurred.
-    """
-
-    def _report(self, message):
-        """
-        Report a status message to the parent.
-        """
-        self.inheritedPort.reportStatus(message)
-
-
-    def addConnectedChannel(self, channel):
-        """
-        Add the connected channel, and report the current number of open
-        channels to the listening socket in the parent process.
-        """
-        HTTPFactory.addConnectedChannel(self, channel)
-        self._report("+")
-
-
-    def removeConnectedChannel(self, channel):
-        """
-        Remove the connected channel, and report the current number of open
-        channels to the listening socket in the parent process.
-        """
-        HTTPFactory.removeConnectedChannel(self, channel)
-        self._report("-")
-
-
-
-class ConnectionLimiter(object):
-    """
-    Connection limiter for use with L{InheritedSocketDispatcher}.
-
-    This depends on statuses being reported by L{ReportingHTTPFactory}
-    """
-
-    def __init__(self, maxAccepts, maxRequests):
-        """
-        Create a L{ConnectionLimiter} with an associated dispatcher and
-        list of factories.
-        """
-        self.factories = []
-        self.dispatcher = InheritedSocketDispatcher(self)
-        self.maxAccepts = maxAccepts
-        self.maxRequests = maxRequests
-
-
-    # implementation of implicit statusWatcher interface required by
-    # InheritedSocketDispatcher
-
-    def statusFromMessage(self, previousStatus, message):
-        """
-        Determine a subprocess socket's status from its previous status and a
-        status message.
-        """
-        if message == '-':
-            result = self.intWithNoneAsZero(previousStatus) - 1
-            # A connection has gone away in a subprocess; we should start
-            # accepting connections again if we paused (see
-            # newConnectionStatus)
-            for f in self.factories:
-                f.myServer.myPort.startReading()
-        else:
-            # '+' is just an acknowledgement of newConnectionStatus, so we can
-            # ignore it.
-            result = self.intWithNoneAsZero(previousStatus)
-        return result
-
-
-    def newConnectionStatus(self, previousStatus):
-        """
-        Determine the effect of a new connection being sent on a subprocess
-        socket.
-        """
-        current = self.outstandingRequests + 1
-        maximum = (config.MaxRequests *
-                   config.MultiProcess.ProcessCount)
-        overloaded = (current >= maximum)
-        if overloaded:
-            for f in self.factories:
-                f.myServer.myPort.stopReading()
-
-        result = self.intWithNoneAsZero(previousStatus) + 1
-        return result
-
-
-    def createFactory(self, description):
-        """
-        Create a L{LimitingInheritingProtocolFactory}.
-        """
-        l = LimitingInheritingProtocolFactory(self, description)
-        self.factories.append(l)
-        return l
-
-
-    def intWithNoneAsZero(self, x):
-        """
-        Convert 'x' to an C{int}, unless x is C{None}, in which case return 0.
-        """
-        if x is None:
-            return 0
-        else:
-            return int(x)
-
-
-    @property
-    def outstandingRequests(self):
-        outstanding = 0
-        for status in self.dispatcher.statuses:
-            outstanding += self.intWithNoneAsZero(status)
-        return outstanding
-
-
-
-class LimitingInheritingProtocolFactory(InheritingProtocolFactory):
-    """
-    An L{InheritingProtocolFactory} that supports the implicit factory contract
-    required by L{MaxAcceptTCPServer}/L{MaxAcceptTCPPort}.
-
-    @ivar outstandingRequests: a read-only property for the number of currently
-        active connections.
-
-    @ivar maxAccepts: The maximum number of times to call 'accept()' in a
-        single reactor loop iteration.
-
-    @ivar maxRequests: The maximum number of concurrent connections to accept
-        at once - note that this is for the I{entire server}, whereas the
-        value in the configuration file is for only a single process.
-    """
-
-    def __init__(self, limiter, description):
-        super(LimitingInheritingProtocolFactory, self).__init__(
-            limiter.dispatcher, description)
-        self.limiter = limiter
-        self.maxAccepts = limiter.maxAccepts
-        self.maxRequests = limiter.maxRequests
-
-
-    @property
-    def outstandingRequests(self):
-        return self.limiter.outstandingRequests
-
-
-
 class TwistdSlaveProcess(object):
     """
     A L{TwistdSlaveProcess} is information about how to start a slave process

Added: CalendarServer/branches/users/glyph/sendfdport/twistedcaldav/metafd.py
===================================================================
--- CalendarServer/branches/users/glyph/sendfdport/twistedcaldav/metafd.py	                        (rev 0)
+++ CalendarServer/branches/users/glyph/sendfdport/twistedcaldav/metafd.py	2010-03-30 18:31:28 UTC (rev 5414)
@@ -0,0 +1,248 @@
+
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twisted.internet.tcp import Server
+from twext.internet.tcp import MaxAcceptTCPServer
+
+from twisted.internet import reactor
+
+from twisted.application.service import MultiService, Service
+
+from twext.web2.channel.http import HTTPFactory
+
+from twext.internet.sendfdport import (
+    InheritedPort, InheritedSocketDispatcher, InheritingProtocolFactory)
+
+
+
+class JustEnoughLikeAPort(object):
+    """
+    Fake out just enough of L{tcp.Port} to be acceptable to
+    L{tcp.Server}...
+    """
+    _realPortNumber = 'inherited'
+
+
+
+class ReportingHTTPService(Service, object):
+    """
+    Service which starts up an HTTP server that can report back to its parent
+    process via L{InheritedPort}.
+    """
+
+    def __init__(self, site, fd, contextFactory):
+        self.contextFactory = contextFactory
+        # Unlike other 'factory' constructions, config.MaxRequests and
+        # config.MaxAccepts are dealt with in the master process, so we don't
+        # need to propagate them here.
+        self.site = site
+        self.fd = fd
+
+
+    def startService(self):
+        """
+        Start reading on the inherited port.
+        """
+        Service.startService(self)
+        self.reportingFactory = ReportingHTTPFactory(self.site, vary=True)
+        self.reportingFactory.inheritedPort = InheritedPort(
+            self.fd, self.createTransport, self.reportingFactory
+        )
+        self.reportingFactory.inheritedPort.startReading()
+
+
+    def stopService(self):
+        """
+        Stop reading on the inherited port.
+        """
+        Service.stopService(self)
+        # XXX stopping should really be destructive, because otherwise we will
+        # always leak a file descriptor; i.e. this shouldn't be restartable.
+        # XXX this needs to return a Deferred.
+        self.reportingFactory.inheritedPort.stopReading()
+
+
+    def createTransport(self, skt, data, protocol):
+        """
+        Create a TCP transport, from a socket object passed by the parent.
+        """
+        transport = Server(skt, protocol,
+                           skt.getpeername(), JustEnoughLikeAPort,
+                           4321, reactor)
+        if data == 'SSL':
+            transport.startTLS(self.contextFactory)
+        transport.startReading()
+        return transport
+
+
+
+class ReportingHTTPFactory(HTTPFactory):
+    """
+    An L{HTTPFactory} which reports its status to a
+    L{twext.internet.sendfdport.InheritedPort}.
+
+    @ivar inheritedPort: an L{InheritedPort} to report status (the current
+        number of outstanding connections) to.  Since this - the
+        L{ReportingHTTPFactory} - needs to be instantiated to be passed to
+        L{InheritedPort}'s constructor, this attribute must be set afterwards
+        but before any connections have occurred.
+    """
+
+    def _report(self, message):
+        """
+        Report a status message to the parent.
+        """
+        self.inheritedPort.reportStatus(message)
+
+
+    def addConnectedChannel(self, channel):
+        """
+        Add the connected channel, and report the current number of open
+        channels to the listening socket in the parent process.
+        """
+        HTTPFactory.addConnectedChannel(self, channel)
+        self._report("+")
+
+
+    def removeConnectedChannel(self, channel):
+        """
+        Remove the connected channel, and report the current number of open
+        channels to the listening socket in the parent process.
+        """
+        HTTPFactory.removeConnectedChannel(self, channel)
+        self._report("-")
+
+
+
+class ConnectionLimiter(MultiService, object):
+    """
+    Connection limiter for use with L{InheritedSocketDispatcher}.
+
+    This depends on statuses being reported by L{ReportingHTTPFactory}
+    """
+
+    def __init__(self, maxAccepts, maxRequests):
+        """
+        Create a L{ConnectionLimiter} with an associated dispatcher and
+        list of factories.
+        """
+        MultiService.__init__(self)
+        self.factories = []
+        # XXX dispatcher needs to be a service, so that it can shut down its
+        # sub-sockets.
+        self.dispatcher = InheritedSocketDispatcher(self)
+        self.maxAccepts = maxAccepts
+        self.maxRequests = maxRequests
+
+
+    def addPortService(self, description, port, interface, backlog):
+        """
+        Add a L{MaxAcceptTCPServer} to bind a TCP port to a socket description.
+        """
+        lipf = LimitingInheritingProtocolFactory(self, description)
+        self.factories.append(lipf)
+        MaxAcceptTCPServer(
+            port, lipf,
+            interface=interface,
+            backlog=backlog
+        ).setServiceParent(self)
+
+
+    # implementation of implicit statusWatcher interface required by
+    # InheritedSocketDispatcher
+
+    def statusFromMessage(self, previousStatus, message):
+        """
+        Determine a subprocess socket's status from its previous status and a
+        status message.
+        """
+        if message == '-':
+            result = self.intWithNoneAsZero(previousStatus) - 1
+            # A connection has gone away in a subprocess; we should start
+            # accepting connections again if we paused (see
+            # newConnectionStatus)
+            for f in self.factories:
+                f.myServer.myPort.startReading()
+        else:
+            # '+' is just an acknowledgement of newConnectionStatus, so we can
+            # ignore it.
+            result = self.intWithNoneAsZero(previousStatus)
+        return result
+
+
+    def newConnectionStatus(self, previousStatus):
+        """
+        Determine the effect of a new connection being sent on a subprocess
+        socket.
+        """
+        current = self.outstandingRequests + 1
+        maximum = self.maxRequests
+        overloaded = (current >= maximum)
+        if overloaded:
+            for f in self.factories:
+                f.myServer.myPort.stopReading()
+
+        result = self.intWithNoneAsZero(previousStatus) + 1
+        return result
+
+
+    def intWithNoneAsZero(self, x):
+        """
+        Convert 'x' to an C{int}, unless x is C{None}, in which case return 0.
+        """
+        if x is None:
+            return 0
+        else:
+            return int(x)
+
+
+    @property
+    def outstandingRequests(self):
+        outstanding = 0
+        for status in self.dispatcher.statuses:
+            outstanding += self.intWithNoneAsZero(status)
+        return outstanding
+
+
+
+class LimitingInheritingProtocolFactory(InheritingProtocolFactory):
+    """
+    An L{InheritingProtocolFactory} that supports the implicit factory contract
+    required by L{MaxAcceptTCPServer}/L{MaxAcceptTCPPort}.
+
+    @ivar outstandingRequests: a read-only property for the number of currently
+        active connections.
+
+    @ivar maxAccepts: The maximum number of times to call 'accept()' in a
+        single reactor loop iteration.
+
+    @ivar maxRequests: The maximum number of concurrent connections to accept
+        at once - note that this is for the I{entire server}, whereas the
+        value in the configuration file is for only a single process.
+    """
+
+    def __init__(self, limiter, description):
+        super(LimitingInheritingProtocolFactory, self).__init__(
+            limiter.dispatcher, description)
+        self.limiter = limiter
+        self.maxAccepts = limiter.maxAccepts
+        self.maxRequests = limiter.maxRequests
+
+
+    @property
+    def outstandingRequests(self):
+        return self.limiter.outstandingRequests
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20100330/f6975a1c/attachment-0001.html>


More information about the calendarserver-changes mailing list