[CalendarServer-changes] [5413] CalendarServer/branches/users/glyph/sendfdport

source_changes at macosforge.org source_changes at macosforge.org
Tue Mar 30 10:27:55 PDT 2010


Revision: 5413
          http://trac.macosforge.org/projects/calendarserver/changeset/5413
Author:   glyph at apple.com
Date:     2010-03-30 10:27:52 -0700 (Tue, 30 Mar 2010)
Log Message:
-----------
refactor the interface between sendfdport and application code; move the limiting code into one place, and honor MaxRequests again

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/caldav.py
    CalendarServer/branches/users/glyph/sendfdport/twext/internet/sendfdport.py
    CalendarServer/branches/users/glyph/sendfdport/twext/web2/channel/http.py

Modified: CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/caldav.py	2010-03-30 15:55:15 UTC (rev 5412)
+++ CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/caldav.py	2010-03-30 17:27:52 UTC (rev 5413)
@@ -57,7 +57,7 @@
     InheritedSocketDispatcher, InheritingProtocolFactory)
 
 from twext.web2.channel.http import (
-    LimitingHTTPFactory, SSLRedirectRequest, ReportingHTTPFactory)
+    LimitingHTTPFactory, SSLRedirectRequest, HTTPFactory)
 
 try:
     from twistedcaldav.version import version
@@ -826,15 +826,9 @@
         s._inheritedSockets = [] # keep a reference to these so they don't close
 
         if config.UseMetaFD:
-            def sortAsInts(x):
-                # "int" by itself isn't quite good enough, unfortunately,
-                # because it can't handle None...
-                if x is None:
-                    return 0
-                else:
-                    return int(x)
-                                
-            dispatcher = InheritedSocketDispatcher(sortAsInts)
+            cl = ConnectionLimiter(config.MaxAccepts,
+                                   (config.MaxRequests *
+                                    config.MultiProcess.ProcessCount))
 
         for bindAddress in config.BindAddresses:
             if config.BindHTTPPorts:
@@ -854,17 +848,14 @@
                 config.BindSSLPorts = [config.SSLPort]
 
             if config.UseMetaFD:
-                # XXX no automated tests for this whole block.  How to test it?
-
                 for ports, description in [(config.BindSSLPorts, "SSL"),
                                            (config.BindHTTPPorts, "TCP")]:
                     for port in ports:
-                        TCPServer(
-                            port, InheritingProtocolFactory(dispatcher, description),
+                        MaxAcceptTCPServer(
+                            port, cl.createFactory(description),
                             interface=bindAddress,
                             backlog=config.ListenBacklog
                         ).setServiceParent(s)
-                # Okay, now for each subprocess I need to add a thing to the dispatcher
             else:
                 def _openSocket(addr, port):
                     log.info("Opening socket for inheritance at %s:%d" % (addr, port))
@@ -884,11 +875,9 @@
                     sock = _openSocket(bindAddress, int(portNum))
                     inheritSSLFDs.append(sock.fileno())
 
-
-
         for p in xrange(0, config.MultiProcess.ProcessCount):
             if config.UseMetaFD:
-                extraArgs = dict(metaFD=dispatcher.addSocket())
+                extraArgs = dict(metaFD=cl.dispatcher.addSocket())
             else:
                 extraArgs = dict(inheritFDs=inheritFDs,
                                  inheritSSLFDs=inheritSSLFDs)
@@ -902,8 +891,6 @@
             )
             monitor.addProcessObject(process, parentEnv)
 
-
-
         for name, pool in config.Memcached.Pools.items():
             if pool.ServerEnabled:
                 self.log_info("Adding memcached service for pool: %s" % (name,))
@@ -1026,6 +1013,161 @@
 
 
 
+
+
+class ReportingHTTPFactory(HTTPFactory):
+    """
+    An L{HTTPFactory} which reports its status to a
+    L{twext.internet.sendfdport.InheritedPort}.
+
+    @ivar inheritedPort: an L{InheritedPort} to report status (the current
+        number of outstanding connections) to.  Since this - the
+        L{ReportingHTTPFactory} - needs to be instantiated to be passed to
+        L{InheritedPort}'s constructor, this attribute must be set afterwards
+        but before any connections have occurred.
+    """
+
+    def _report(self, message):
+        """
+        Report a status message to the parent.
+        """
+        self.inheritedPort.reportStatus(message)
+
+
+    def addConnectedChannel(self, channel):
+        """
+        Add the connected channel, and report the current number of open
+        channels to the listening socket in the parent process.
+        """
+        HTTPFactory.addConnectedChannel(self, channel)
+        self._report("+")
+
+
+    def removeConnectedChannel(self, channel):
+        """
+        Remove the connected channel, and report the current number of open
+        channels to the listening socket in the parent process.
+        """
+        HTTPFactory.removeConnectedChannel(self, channel)
+        self._report("-")
+
+
+
+class ConnectionLimiter(object):
+    """
+    Connection limiter for use with L{InheritedSocketDispatcher}.
+
+    This depends on statuses being reported by L{ReportingHTTPFactory}
+    """
+
+    def __init__(self, maxAccepts, maxRequests):
+        """
+        Create a L{ConnectionLimiter} with an associated dispatcher and
+        list of factories.
+        """
+        self.factories = []
+        self.dispatcher = InheritedSocketDispatcher(self)
+        self.maxAccepts = maxAccepts
+        self.maxRequests = maxRequests
+
+
+    # implementation of implicit statusWatcher interface required by
+    # InheritedSocketDispatcher
+
+    def statusFromMessage(self, previousStatus, message):
+        """
+        Determine a subprocess socket's status from its previous status and a
+        status message.
+        """
+        if message == '-':
+            result = self.intWithNoneAsZero(previousStatus) - 1
+            # A connection has gone away in a subprocess; we should start
+            # accepting connections again if we paused (see
+            # newConnectionStatus)
+            for f in self.factories:
+                f.myServer.myPort.startReading()
+        else:
+            # '+' is just an acknowledgement of newConnectionStatus, so we can
+            # ignore it.
+            result = self.intWithNoneAsZero(previousStatus)
+        return result
+
+
+    def newConnectionStatus(self, previousStatus):
+        """
+        Determine the effect of a new connection being sent on a subprocess
+        socket.
+        """
+        current = self.outstandingRequests + 1
+        maximum = (config.MaxRequests *
+                   config.MultiProcess.ProcessCount)
+        overloaded = (current >= maximum)
+        if overloaded:
+            for f in self.factories:
+                f.myServer.myPort.stopReading()
+
+        result = self.intWithNoneAsZero(previousStatus) + 1
+        return result
+
+
+    def createFactory(self, description):
+        """
+        Create a L{LimitingInheritingProtocolFactory}.
+        """
+        l = LimitingInheritingProtocolFactory(self, description)
+        self.factories.append(l)
+        return l
+
+
+    def intWithNoneAsZero(self, x):
+        """
+        Convert 'x' to an C{int}, unless x is C{None}, in which case return 0.
+        """
+        if x is None:
+            return 0
+        else:
+            return int(x)
+
+
+    @property
+    def outstandingRequests(self):
+        outstanding = 0
+        for status in self.dispatcher.statuses:
+            outstanding += self.intWithNoneAsZero(status)
+        return outstanding
+
+
+
+class LimitingInheritingProtocolFactory(InheritingProtocolFactory):
+    """
+    An L{InheritingProtocolFactory} that supports the implicit factory contract
+    required by L{MaxAcceptTCPServer}/L{MaxAcceptTCPPort}.
+
+    @ivar outstandingRequests: a read-only property for the number of currently
+        active connections.
+
+    @ivar maxAccepts: The maximum number of times to call 'accept()' in a
+        single reactor loop iteration.
+
+    @ivar maxRequests: The maximum number of concurrent connections to accept
+        at once - note that this is for the I{entire server}, whereas the
+        value in the configuration file is for only a single process.
+    """
+
+    def __init__(self, limiter, description):
+        super(LimitingInheritingProtocolFactory, self).__init__(
+            limiter.dispatcher, description)
+        self.limiter = limiter
+        self.maxAccepts = limiter.maxAccepts
+        self.maxRequests = limiter.maxRequests
+
+
+    @property
+    def outstandingRequests(self):
+        return self.limiter.outstandingRequests
+
+
+
 class TwistdSlaveProcess(object):
     """
     A L{TwistdSlaveProcess} is information about how to start a slave process

Modified: CalendarServer/branches/users/glyph/sendfdport/twext/internet/sendfdport.py
===================================================================
--- CalendarServer/branches/users/glyph/sendfdport/twext/internet/sendfdport.py	2010-03-30 15:55:15 UTC (rev 5412)
+++ CalendarServer/branches/users/glyph/sendfdport/twext/internet/sendfdport.py	2010-03-30 17:27:52 UTC (rev 5413)
@@ -21,8 +21,9 @@
 """
 
 from os import close
-from errno import EAGAIN
-from socket import socketpair, fromfd, error as SocketError, AF_INET, SOCK_STREAM
+from errno import EAGAIN, ENOBUFS
+from socket import (socketpair, fromfd, error as SocketError,
+                    AF_INET, AF_UNIX, SOCK_STREAM, SOCK_DGRAM)
 
 from twisted.python import log
 
@@ -47,11 +48,6 @@
         self.transport.stopReading()
         self.transport.stopWriting()
         skt = self.transport.getHandle()
-        # actually I want to retrieve a child from a *pool* of potentially
-        # available children.  i have to make that determination based on the
-        # number of connections each child is currently handling.  which means
-        # the logic shouldn't live here.  where should it live?  in this spot,
-        # I just need an FD.
         self.factory.sendSocket(skt)
 
 
@@ -83,7 +79,7 @@
 
 
 
-class _AvailableConnection(FileDescriptor, object):
+class _SubprocessSocket(FileDescriptor, object):
     """
     A socket in the master process pointing at a file descriptor that can be
     used to transmit sockets to a subprocess.
@@ -103,9 +99,10 @@
     @type status: C{str}
     """
 
-    def __init__(self, reactor, skt):
-        FileDescriptor.__init__(self, reactor)
+    def __init__(self, dispatcher, skt):
+        FileDescriptor.__init__(self, dispatcher.reactor)
         self.status = None
+        self.dispatcher = dispatcher
         self.skt = skt          # XXX needs to be set non-blocking by somebody
         self.fileno = skt.fileno
         self.outgoingSocketQueue = []
@@ -124,11 +121,12 @@
         Receive a status / health message and record it.
         """
         try:
-            data, flags, ancillary = recvmsg(self.fd)
-        except SocketError:
-            pass                # handle EAGAIN, etc
+            data, flags, ancillary = recvmsg(self.skt.fileno())
+        except SocketError, se:
+            if se.errno not in (EAGAIN, ENOBUFS):
+                raise
         else:
-            self.status = data
+            self.dispatcher.statusMessage(self, data)
 
 
     def doWrite(self):
@@ -140,7 +138,7 @@
             try:
                 sendfd(self.skt.fileno(), skt.fileno(), desc)
             except SocketError, se:
-                if se.errno == EAGAIN:
+                if se.errno in (EAGAIN, ENOBUFS):
                     self.outgoingSocketQueue.insert(0, (skt, desc))
                     return
                 raise
@@ -152,26 +150,37 @@
 class InheritedSocketDispatcher(object):
     """
     Used by one or more L{InheritingProtocolFactory}s, this keeps track of a
-    list of available sockets in subprocesses and sends inbound connections towards them.
-
-    @ivar fitnessFunction: a function used to evaluate status messages received
-        on available subprocess connections.  a 1-argument function which
-        accepts a string - or C{None}, if no status has been reported - and
-        returns something sortable.  this will be used to sort all available
-        status messages; the lowest sorting result will be used to handle the
-        new connection.
+    list of available sockets in subprocesses and sends inbound connections
+    towards them.
     """
 
-    def __init__(self, fitnessFunction):
+    def __init__(self, statusWatcher):
         """
         Create a socket dispatcher.
         """
-        self.availableConnections = []
-        self.fitnessFunction = fitnessFunction
+        self._subprocessSockets = []
+        self.statusWatcher = statusWatcher
         from twisted.internet import reactor
         self.reactor = reactor
 
 
+    @property
+    def statuses(self):
+        """
+        Yield the current status of all subprocess sockets.
+        """
+        for subsocket in self._subprocessSockets:
+            yield subsocket.status
+
+
+    def statusMessage(self, subsocket, message):
+        """
+        The status of a connection has changed; update all registered status
+        change listeners.
+        """
+        subsocket.status = self.statusWatcher.statusFromMessage(subsocket.status, message)
+
+
     def sendFileDescriptor(self, skt, description):
         """
         A connection has been received.  Dispatch it.
@@ -181,9 +190,12 @@
         @param description: some text to identify to the subprocess's
             L{InheritedPort} what type of transport to create for this socket.
         """
-        self.availableConnections.sort(key=lambda conn:
-                                           self.fitnessFunction(conn.status))
-        self.availableConnections[0].sendSocketToPeer(skt, description)
+        self._subprocessSockets.sort(key=lambda conn: conn.status)
+        selectedSocket = self._subprocessSockets[0]
+        selectedSocket.sendSocketToPeer(skt, description)
+        # XXX Maybe want to send along 'description' or 'skt' or some
+        # properties thereof? -glyph
+        selectedSocket.status = self.statusWatcher.newConnectionStatus(selectedSocket.status)
 
 
     def addSocket(self):
@@ -195,9 +207,10 @@
             C{fileno()} as part of the C{childFDs} argument to
             C{spawnProcess()}, then close it.
         """
-        i, o = socketpair()
-        a = _AvailableConnection(self.reactor, o)
-        self.availableConnections.append(a)
+        i, o = socketpair(AF_UNIX, SOCK_DGRAM)
+        a = _SubprocessSocket(self, o)
+        a.startReading()
+        self._subprocessSockets.append(a)
         return i
 
 
@@ -271,7 +284,7 @@
             try:
                 sendmsg(self.fd, msg, 0)
             except SocketError, se:
-                if se.errno == EAGAIN:
+                if se.errno in (EAGAIN, ENOBUFS):
                     self.statusQueue.insert(0, msg)
                     return
                 raise
@@ -280,10 +293,9 @@
 
     def reportStatus(self, statusMessage):
         """
-        Report a status message to the L{_AvailableConnection} monitoring this
+        Report a status message to the L{_SubprocessSocket} monitoring this
         L{InheritedPort}'s health in the master process.
         """
-        # XXX this has got to be invoked from 
         self.statusQueue.append(statusMessage)
         self.startWriting()
         

Modified: CalendarServer/branches/users/glyph/sendfdport/twext/web2/channel/http.py
===================================================================
--- CalendarServer/branches/users/glyph/sendfdport/twext/web2/channel/http.py	2010-03-30 15:55:15 UTC (rev 5412)
+++ CalendarServer/branches/users/glyph/sendfdport/twext/web2/channel/http.py	2010-03-30 17:27:52 UTC (rev 5413)
@@ -950,6 +950,8 @@
                              "please try again later.</body></html>")
         self.transport.loseConnection()
 
+
+
 class HTTPFactory(protocol.ServerFactory):
     """
     Factory for HTTP server.
@@ -1007,6 +1009,7 @@
         return len(self.connectedChannels)
 
 
+
 class HTTP503LoggingFactory (HTTPFactory):
     """
     Factory for HTTP server which emits a 503 response when overloaded.
@@ -1163,45 +1166,6 @@
 
 
 
-class ReportingHTTPFactory(HTTPFactory):
-    """
-    An L{HTTPFactory} which reports its status to a
-    L{twext.internet.sendfdport.InheritedPort}.
-
-    @ivar inheritedPort: an L{InheritedPort} to report status (the current
-        number of outstanding connections) to.  Since this - the
-        L{ReportingHTTPFactory} - needs to be instantiated to be passed to
-        L{InheritedPort}'s constructor, this attribute must be set afterwards
-        but before any connections have occurred.
-    """
-
-    def _report(self):
-        """
-        Report the current number of open channels to the listening socket in
-        the parent process.
-        """
-        self.inheritedPort.reportStatus(str(self.outstandingRequests))
-
-
-    def addConnectedChannel(self, channel):
-        """
-        Add the connected channel, and report the current number of open
-        channels to the listening socket in the parent process.
-        """
-        HTTPFactory.addConnectedChannel(self, channel)
-        self._report()
-
-
-    def removeConnectedChannel(self, channel):
-        """
-        Remove the connected channel, and report the current number of open
-        channels to the listening socket in the parent process.
-        """
-        HTTPFactory.removeConnectedChannel(self, channel)
-        self._report()
-
-
-
 __all__ = [
     "HTTPFactory",
     "HTTP503LoggingFactory",
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20100330/96e5eaca/attachment-0001.html>


More information about the calendarserver-changes mailing list