[CalendarServer-changes] [5400] CalendarServer/branches/users/glyph/sendfdport
source_changes at macosforge.org
source_changes at macosforge.org
Thu Mar 25 15:55:52 PDT 2010
Revision: 5400
http://trac.macosforge.org/projects/calendarserver/changeset/5400
Author: glyph at apple.com
Date: 2010-03-25 15:55:52 -0700 (Thu, 25 Mar 2010)
Log Message:
-----------
enough to start up without tracebacks, but not quite enough to actually finish a connection
Modified Paths:
--------------
CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/caldav.py
CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/test/test_caldav.py
CalendarServer/branches/users/glyph/sendfdport/twext/internet/tcp.py
CalendarServer/branches/users/glyph/sendfdport/twext/web2/channel/http.py
CalendarServer/branches/users/glyph/sendfdport/twistedcaldav/stdconfig.py
Added Paths:
-----------
CalendarServer/branches/users/glyph/sendfdport/twext/internet/sendfdport.py
Modified: CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/caldav.py 2010-03-25 18:39:01 UTC (rev 5399)
+++ CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/caldav.py 2010-03-25 22:55:52 UTC (rev 5400)
@@ -53,8 +53,12 @@
from twext.python.log import logLevelForNamespace, setLogLevelForNamespace
from twext.internet.ssl import ChainingOpenSSLContextFactory
from twext.internet.tcp import MaxAcceptTCPServer, MaxAcceptSSLServer
-from twext.web2.channel.http import LimitingHTTPFactory, SSLRedirectRequest
+from twext.internet.sendfdport import (
+ InheritedSocketDispatcher, InheritingProtocolFactory)
+from twext.web2.channel.http import (
+ LimitingHTTPFactory, SSLRedirectRequest, ReportingHTTPFactory)
+
try:
from twistedcaldav.version import version
except ImportError:
@@ -635,8 +639,31 @@
elif config.MetaFD:
fd = int(config.MetaFD)
- # XXX sendmsg()-FD case
+ def myTransportFactory(skt, data, protocol):
+ from twisted.internet.tcp import Server
+ from twisted.internet import reactor
+ transport = Server(skt, protocol,
+ skt.getpeername(), skt.getsockname(),
+ 4321, reactor)
+ if data == 'SSL':
+ transport.startTLS(self.createContextFactory())
+ transport.startReading()
+ return transport
+ from twext.internet.sendfdport import InheritedPort
+ # Unlike other 'factory' constructions, config.MaxRequests and
+ # config.MaxAccepts are dealt with in the master process, so we
+ # don't need to propagate them here.
+ reportingFactory = ReportingHTTPFactory(site, vary=True)
+
+ reportingFactory.inheritedPort = InheritedPort(
+ fd, myTransportFactory, reportingFactory
+ )
+
+ # XXX for correctness, we need a service here, not just a Port;
+ # this should be in startService.
+ reportingFactory.inheritedPort.startReading()
+
else: # Not inheriting, therefore we open our own:
if not config.BindAddresses:
@@ -790,6 +817,17 @@
s._inheritedSockets = [] # keep a reference to these so they don't close
+ if config.UseMetaFD:
+ def sortAsInts(x):
+ # "int" by itself isn't quite good enough, unfortunately,
+ # because it can't handle None...
+ if x is None:
+ return 0
+ else:
+ return int(x)
+
+ dispatcher = InheritedSocketDispatcher(sortAsInts)
+
for bindAddress in config.BindAddresses:
if config.BindHTTPPorts:
if config.HTTPPort == 0:
@@ -807,36 +845,53 @@
elif config.SSLPort != 0:
config.BindSSLPorts = [config.SSLPort]
- def _openSocket(addr, port):
- log.info("Opening socket for inheritance at %s:%d" % (addr, port))
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.setblocking(0)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind((addr, port))
- sock.listen(config.ListenBacklog)
- s._inheritedSockets.append(sock)
- return sock
+ if config.UseMetaFD:
+ # XXX no automated tests for this whole block. How to test it?
- for portNum in config.BindHTTPPorts:
- sock = _openSocket(bindAddress, int(portNum))
- inheritFDs.append(sock.fileno())
+ for ports, description in [(config.BindSSLPorts, "SSL"),
+ (config.BindHTTPPorts, "TCP")]:
+ for port in ports:
+ TCPServer(
+ port, InheritingProtocolFactory(dispatcher, description),
+ interface=bindAddress,
+ backlog=config.ListenBacklog
+ ).setServiceParent(s)
+ # Okay, now for each subprocess I need to add a thing to the dispatcher
+ else:
+ def _openSocket(addr, port):
+ log.info("Opening socket for inheritance at %s:%d" % (addr, port))
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setblocking(0)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind((addr, port))
+ sock.listen(config.ListenBacklog)
+ s._inheritedSockets.append(sock)
+ return sock
- for portNum in config.BindSSLPorts:
- sock = _openSocket(bindAddress, int(portNum))
- inheritSSLFDs.append(sock.fileno())
+ for portNum in config.BindHTTPPorts:
+ sock = _openSocket(bindAddress, int(portNum))
+ inheritFDs.append(sock.fileno())
+ for portNum in config.BindSSLPorts:
+ sock = _openSocket(bindAddress, int(portNum))
+ inheritSSLFDs.append(sock.fileno())
+
+
for p in xrange(0, config.MultiProcess.ProcessCount):
+ if config.UseMetaFD:
+ extraArgs = dict(metaFD=dispatcher.addSocket())
+ else:
+ extraArgs = dict(inheritFDs=inheritFDs,
+ inheritSSLFDs=inheritSSLFDs)
process = TwistdSlaveProcess(
sys.argv[0],
self.tapname,
options["config"],
p,
config.BindAddresses,
- inheritFDs=inheritFDs,
- inheritSSLFDs=inheritSSLFDs
+ **extraArgs
)
-
monitor.addProcessObject(process, parentEnv)
@@ -971,11 +1026,16 @@
@ivar inheritFDs: File descriptors to be inherited for calling accept() on
in the subprocess.
- @type inheritFDs: C{list} of C{int}
+ @type inheritFDs: C{list} of C{int}, or C{None}
@ivar inheritSSLFDs: File descriptors to be inherited for calling accept()
on in the subprocess, and speaking TLS on the resulting sockets.
- @type inheritSSLFDs: C{list} of C{int}
+ @type inheritSSLFDs: C{list} of C{int}, or C{None}
+
+ @ivar metaFD: a UNIX socket which will be used to send file descriptors
+ down to the slave process.
+
+ @type metaFD: L{socket.socket}, or C{None}
"""
prefix = "caldav"
@@ -1012,7 +1072,7 @@
fds = {}
maybeMetaFD = []
if self.metaFD:
- maybeMetaFD.append(self.metaFD)
+ maybeMetaFD.append(self.metaFD.fileno())
for fd in self.inheritSSLFDs + self.inheritFDs + maybeMetaFD:
fds[fd] = fd
return fds
@@ -1067,8 +1127,10 @@
])
if self.metaFD:
+ # XXX this FD is never closed in the parent. should it be?
+ # (should they *all* be?) -glyph
args.extend([
- "-o", "MetaFD=%s" % (self.metaFD,)
+ "-o", "MetaFD=%s" % (self.metaFD.fileno(),)
])
return args
@@ -1091,7 +1153,7 @@
start. It also specializes process-starting to allow for process objects
to
- @ivar processObjects: a list of L{TwistdSlaveProcess} to add using
+ @ivar processObjects: a C{list} of L{TwistdSlaveProcess} to add using
C{self.addProcess} when this service starts up.
@ivar _extraFDs: a mapping from process names to extra file-descriptor
@@ -1106,9 +1168,6 @@
def __init__(self, *args, **kwargs):
procmon.ProcessMonitor.__init__(self, *args, **kwargs)
-
- # processObjects stores TwistdSlaveProcesses which need to have their
- # command-lines determined just in time
self.processObjects = []
self._extraFDs = {}
from twisted.internet import reactor
@@ -1214,6 +1273,7 @@
)
+
class DelayedStartupLineLogger(object):
"""
A line logger that can handle very long lines.
Modified: CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/test/test_caldav.py
===================================================================
--- CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/test/test_caldav.py 2010-03-25 18:39:01 UTC (rev 5399)
+++ CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/test/test_caldav.py 2010-03-25 22:55:52 UTC (rev 5400)
@@ -987,6 +987,9 @@
"""
dspm = DelayedStartupProcessMonitor()
dspm.reactor = InMemoryProcessSpawner()
+ class FakeFD:
+ def fileno(self):
+ return 4
# Most arguments here will be ignored, so these are bogus values.
slave = TwistdSlaveProcess(
@@ -995,19 +998,19 @@
configFile = "/does/not/exist",
id = 10,
interfaces = '127.0.0.1',
- metaFD = 4
+ metaFD = FakeFD()
)
dspm.addProcessObject(slave, {})
dspm.startService()
self.addCleanup(dspm.consistency.cancel)
oneProcessTransport = yield dspm.reactor.waitForOneProcess()
- self.assertEquals(oneProcessTransport.childFDs,
- {0: 'w', 1: 'r', 2: 'r',
- 4: 4})
self.assertIn("MetaFD=4", oneProcessTransport.args)
self.assertEquals(
oneProcessTransport.args[oneProcessTransport.args.index("MetaFD=4")-1],
'-o',
"MetaFD argument was not passed as an option"
)
+ self.assertEquals(oneProcessTransport.childFDs,
+ {0: 'w', 1: 'r', 2: 'r',
+ 4: 4})
Added: CalendarServer/branches/users/glyph/sendfdport/twext/internet/sendfdport.py
===================================================================
--- CalendarServer/branches/users/glyph/sendfdport/twext/internet/sendfdport.py (rev 0)
+++ CalendarServer/branches/users/glyph/sendfdport/twext/internet/sendfdport.py 2010-03-25 22:55:52 UTC (rev 5400)
@@ -0,0 +1,287 @@
+# -*- test-case-name: twext.internet.test.test_sendfdport -*-
+##
+# Copyright (c) 2005-2009 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Implementation of a TCP/SSL port that uses sendmsg/recvmsg as implemented by
+L{twext.python.sendfd}.
+"""
+
+from os import close
+from errno import EAGAIN
+from socket import socketpair, fromfd, error as SocketError, AF_INET, SOCK_STREAM
+
+from twisted.python import log
+
+from twisted.internet.abstract import FileDescriptor
+from twisted.internet.protocol import Protocol, Factory
+
+from twext.python.sendmsg import sendmsg, recvmsg
+from twext.python.sendfd import sendfd, recvfd
+
+class InheritingProtocol(Protocol, object):
+ """
+ When a connection comes in on this protocol, stop reading and writing, and
+ dispatch the socket to another process via its factory.
+ """
+
+ def connectionMade(self):
+ """
+ A connection was received; transmit the file descriptor to another
+ process via L{InheritingProtocolFactory} and remove my transport from
+ the reactor.
+ """
+ self.transport.stopReading()
+ self.transport.stopWriting()
+ skt = self.transport.getHandle()
+ # actually I want to retrieve a child from a *pool* of potentially
+ # available children. i have to make that determination based on the
+ # number of connections each child is currently handling. which means
+ # the logic shouldn't live here. where should it live? in this spot,
+ # I just need an FD.
+ self.factory.sendSocket(skt)
+
+
+
+class InheritingProtocolFactory(Factory, object):
+ """
+ In the 'master' process, make one of these and hook it up to the sockets
+ where you want to hear stuff.
+
+ @ivar dispatcher: an L{InheritedSocketDispatcher} to use to dispatch
+ incoming connections to an appropriate subprocess.
+
+ @ivar description: the string to send along with connections received on
+ this factory.
+ """
+
+ protocol = InheritingProtocol
+
+ def __init__(self, dispatcher, description):
+ self.dispatcher = dispatcher
+ self.description = description
+
+
+ def sendSocket(self, socketObject):
+ """
+ Send the given socket object on to my dispatcher.
+ """
+ self.dispatcher.sendFileDescriptor(socketObject, self.description)
+
+
+
+class _AvailableConnection(FileDescriptor, object):
+ """
+ A socket in the master process pointing at a file descriptor that can be
+ used to transmit sockets to a subprocess.
+
+ @ivar skt: the UNIX socket used as the sendmsg() transport.
+
+ @ivar outgoingSocketQueue: an outgoing queue of sockets to send to the
+ subprocess.
+
+ @ivar outgoingSocketQueue: a C{list} of 2-tuples of C{(socket-object, str)}
+
+ @ivar status: a record of the last status message received (via recvmsg)
+ from the subprocess: this is an application-specific indication of how
+ ready this subprocess is to receive more connections. A typical usage
+ would be to count the open connections: this is what is passed to
+
+ @type status: C{str}
+ """
+
+ def __init__(self, reactor, skt):
+ FileDescriptor.__init__(self, reactor)
+ self.status = None
+ self.skt = skt # XXX needs to be set non-blocking by somebody
+ self.fileno = skt.fileno
+ self.outgoingSocketQueue = []
+
+
+ def sendSocketToPeer(self, skt, description):
+ """
+ Enqueue a socket to send to the subprocess.
+ """
+ self.outgoingSocketQueue.append((skt, description))
+ self.startWriting()
+
+
+ def doRead(self):
+ """
+ Receive a status / health message and record it.
+ """
+ try:
+ data, flags, ancillary = recvmsg(self.fd)
+ except SocketError:
+ pass # handle EAGAIN, etc
+ else:
+ self.status = data
+
+
+ def doWrite(self):
+ """
+ Transmit as many queued pending file descriptors as we can.
+ """
+ while self.outgoingSocketQueue:
+ skt, desc = self.outgoingSocketQueue.pop(0)
+ try:
+ sendfd(self.skt.fileno(), skt.fileno(), desc)
+ except SocketError, se:
+ if se.errno == EAGAIN:
+ self.outgoingSocketQueue.insert(0, (skt, desc))
+ return
+ raise
+ if not self.outgoingSocketQueue:
+ self.stopWriting()
+
+
+
+class InheritedSocketDispatcher(object):
+ """
+ Used by one or more L{InheritingProtocolFactory}s, this keeps track of a
+ list of available sockets in subprocesses and sends inbound connections towards them.
+
+ @ivar fitnessFunction: a function used to evaluate status messages received
+ on available subprocess connections. a 1-argument function which
+ accepts a string - or C{None}, if no status has been reported - and
+ returns something sortable. this will be used to sort all available
+ status messages; the lowest sorting result will be used to handle the
+ new connection.
+ """
+
+ def __init__(self, fitnessFunction):
+ """
+ Create a socket dispatcher.
+ """
+ self.availableConnections = []
+ self.fitnessFunction = fitnessFunction
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+
+ def sendFileDescriptor(self, skt, description):
+ """
+ A connection has been received. Dispatch it.
+
+ @param skt: a socket object
+
+ @param description: some text to identify to the subprocess's
+ L{InheritedPort} what type of transport to create for this socket.
+ """
+ self.availableConnections.sort(key=lambda conn:
+ self.fitnessFunction(conn.status))
+
+
+ def addSocket(self):
+ """
+ Add a C{sendmsg()}-oriented AF_UNIX socket to the pool of sockets being
+ used for transmitting file descriptors to child processes.
+
+ @return: a socket object for the receiving side; pass this object's
+ C{fileno()} as part of the C{childFDs} argument to
+ C{spawnProcess()}, then close it.
+ """
+ i, o = socketpair()
+ a = _AvailableConnection(self.reactor, o)
+ self.availableConnections.append(a)
+ return i
+
+
+
+class InheritedPort(FileDescriptor, object):
+ """
+ Create this in the 'slave' process to handle incoming connections
+ dispatched via C{sendmsg}.
+ """
+
+ def __init__(self, fd, transportFactory, protocolFactory):
+ """
+ @param fd: a file descriptor
+
+ @type fd: C{int}
+
+ @param transportFactory: a 3-argument function that takes the socket
+ object produced from the file descriptor, the (non-ancillary) data
+ sent along with the incoming file descriptor, and the protocol
+ built along with it, and returns an L{ITransport} provider. Note
+ that this should NOT call C{makeConnection} on the protocol that it
+ produces, as this class will do that.
+
+ @param protocolFactory: an L{IProtocolFactory}
+ """
+ FileDescriptor.__init__(self)
+ self.fd = fd
+ self.transportFactory = transportFactory
+ self.protocolFactory = protocolFactory
+
+
+ def fileno(self):
+ """
+ Get the FD number for this socket.
+ """
+ return self.fd
+
+
+ def doRead(self):
+ """
+ A message is ready to read. Receive a file descriptor from our parent
+ process.
+ """
+ try:
+ fd, description = recvfd(self.fd)
+ except SocketError, se:
+ if se.errno != EAGAIN:
+ raise
+ else:
+ try:
+ skt = fromfd(fd, AF_INET, SOCK_STREAM)
+ # XXX it could be AF_UNIX, I guess? or even something else?
+ # should this be on the transportFactory's side of things?
+
+ close(fd) # fromfd() calls dup()
+ peeraddr = skt.getpeername()
+ protocol = self.protocolFactory.buildProtocol(peeraddr)
+ transport = self.transportFactory(skt, description, protocol)
+ protocol.makeConnection(transport)
+ except:
+ log.err()
+
+
+ def doWrite(self):
+ """
+ Write some data.
+ """
+ while self.statusQueue:
+ msg = self.statusQueue.pop(0)
+ try:
+ sendmsg(self.fd, msg, 0)
+ except SocketError, se:
+ if se.errno == EAGAIN:
+ self.statusQueue.insert(0, msg)
+ return
+ raise
+ self.stopWriting()
+
+
+ def reportStatus(self, statusMessage):
+ """
+ Report a status message to the L{_AvailableConnection} monitoring this
+ L{InheritedPort}'s health in the master process.
+ """
+ # XXX this has got to be invoked from
+ self.statusQueue.append(statusMessage)
+ self.startWriting()
+
Modified: CalendarServer/branches/users/glyph/sendfdport/twext/internet/tcp.py
===================================================================
--- CalendarServer/branches/users/glyph/sendfdport/twext/internet/tcp.py 2010-03-25 18:39:01 UTC (rev 5399)
+++ CalendarServer/branches/users/glyph/sendfdport/twext/internet/tcp.py 2010-03-25 22:55:52 UTC (rev 5400)
@@ -39,21 +39,28 @@
Mixin for resetting maxAccepts.
"""
def doRead(self):
- self.numberAccepts = min(self.factory.maxRequests - self.factory.outstandingRequests, self.factory.maxAccepts)
+ self.numberAccepts = min(
+ self.factory.maxRequests - self.factory.outstandingRequests,
+ self.factory.maxAccepts
+ )
tcp.Port.doRead(self)
+
+
class MaxAcceptTCPPort(MaxAcceptPortMixin, tcp.Port):
"""
Use for non-inheriting tcp ports.
"""
- pass
+
+
class MaxAcceptSSLPort(MaxAcceptPortMixin, ssl.Port):
"""
Use for non-inheriting SSL ports.
"""
- pass
+
+
class InheritedTCPPort(MaxAcceptTCPPort):
"""
A tcp port which uses an inherited file descriptor.
@@ -100,6 +107,9 @@
"""
TCP server which will uses MaxAcceptTCPPorts (and optionally,
inherited ports)
+
+ @ivar myPort: When running, this is set to the L{IListeningPort} being
+ managed by this service.
"""
def __init__(self, *args, **kwargs):
Modified: CalendarServer/branches/users/glyph/sendfdport/twext/web2/channel/http.py
===================================================================
--- CalendarServer/branches/users/glyph/sendfdport/twext/web2/channel/http.py 2010-03-25 18:39:01 UTC (rev 5399)
+++ CalendarServer/branches/users/glyph/sendfdport/twext/web2/channel/http.py 2010-03-25 22:55:52 UTC (rev 5400)
@@ -739,7 +739,7 @@
def connectionMade(self):
self.setTimeout(self.inputTimeOut)
- self.factory.outstandingRequests+=1
+ self.factory.addConnectedChannel(self)
def lineReceived(self, line):
if self._first_line:
@@ -928,7 +928,7 @@
self.transport.loseConnection()
def connectionLost(self, reason):
- self.factory.outstandingRequests-=1
+ self.factory.removeConnectedChannel(self)
self._writeLost = True
self.readConnectionLost()
@@ -951,19 +951,30 @@
self.transport.loseConnection()
class HTTPFactory(protocol.ServerFactory):
- """Factory for HTTP server."""
+ """
+ Factory for HTTP server.
+ @ivar outstandingRequests: the number of currently connected HTTP channels.
+
+ @type outstandingRequests: C{int}
+
+ @ivar connectedChannels: all the channels that have currently active
+ connections.
+
+ @type connectedChannels: C{set} of L{HTTPChannel}
+ """
+
protocol = HTTPChannel
protocolArgs = None
- outstandingRequests = 0
-
def __init__(self, requestFactory, maxRequests=600, **kwargs):
- self.maxRequests=maxRequests
+ self.maxRequests = maxRequests
self.protocolArgs = kwargs
- self.protocolArgs['requestFactory']=requestFactory
-
+ self.protocolArgs['requestFactory'] = requestFactory
+ self.connectedChannels = set()
+
+
def buildProtocol(self, addr):
if self.outstandingRequests >= self.maxRequests:
return OverloadedServerProtocol()
@@ -975,6 +986,27 @@
return p
+ def addConnectedChannel(self, channel):
+ """
+ Add a connected channel to the set of currently connected channels and
+ increase the outstanding request count.
+ """
+ self.connectedChannels.add(channel)
+
+
+ def removeConnectedChannel(self, channel):
+ """
+ Remove a connected channel from the set of currently connected channels
+ and decrease the outstanding request count.
+ """
+ self.connectedChannels.remove(channel)
+
+
+ @property
+ def outstandingRequests(self):
+ return len(self.connectedChannels)
+
+
class HTTP503LoggingFactory (HTTPFactory):
"""
Factory for HTTP server which emits a 503 response when overloaded.
@@ -1087,40 +1119,89 @@
-class LimitingHTTPChannel(HTTPChannel):
- """ HTTPChannel that takes itself out of the reactor once it has enough
- requests in flight.
+class LimitingHTTPFactory(HTTPFactory):
"""
+ HTTPFactory which stores maxAccepts on behalf of the MaxAcceptPortMixin
- def connectionMade(self):
- HTTPChannel.connectionMade(self)
- if self.factory.outstandingRequests >= self.factory.maxRequests:
- self.factory.myServer.myPort.stopReading()
-
- def connectionLost(self, reason):
- HTTPChannel.connectionLost(self, reason)
- if self.factory.outstandingRequests < self.factory.maxRequests:
- self.factory.myServer.myPort.startReading()
-
-class LimitingHTTPFactory(HTTPFactory):
- """ HTTPFactory which stores maxAccepts on behalf of the MaxAcceptPortMixin
+ @ivar myServer: a reference to a L{MaxAcceptTCPServer} that this
+ L{LimitingHTTPFactory} will limit. This must be set externally.
"""
- protocol = LimitingHTTPChannel
-
def __init__(self, requestFactory, maxRequests=600, maxAccepts=100,
**kwargs):
HTTPFactory.__init__(self, requestFactory, maxRequests, **kwargs)
self.maxAccepts = maxAccepts
def buildProtocol(self, addr):
-
+ """
+ Override L{HTTPFactory.buildProtocol} in order to avoid ever returning
+ an L{OverloadedServerProtocol}; this should be handled in other ways.
+ """
p = protocol.ServerFactory.buildProtocol(self, addr)
for arg, value in self.protocolArgs.iteritems():
setattr(p, arg, value)
return p
+ def addConnectedChannel(self, channel):
+ """
+ Override L{HTTPFactory.addConnectedChannel} to pause listening on the
+ socket when there are too many outstanding channels.
+ """
+ HTTPFactory.addConnectedChannel(self, channel)
+ if self.outstandingRequests >= self.maxRequests:
+ self.myServer.myPort.stopReading()
+
+ def removeConnectedChannel(self, channel):
+ """
+ Override L{HTTPFactory.addConnectedChannel} to resume listening on the
+ socket when there are too many outstanding channels.
+ """
+ HTTPFactory.removeConnectedChannel(self, channel)
+ if self.outstandingRequests < self.maxRequests:
+ self.myServer.myPort.startReading()
+
+
+
+class ReportingHTTPFactory(HTTPFactory):
+ """
+ An L{HTTPFactory} which reports its status to a
+ L{twext.internet.sendfdport.InheritedPort}.
+
+ @ivar inheritedPort: an L{InheritedPort} to report status (the current
+ number of outstanding connections) to. Since this - the
+ L{ReportingHTTPFactory} - needs to be instantiated to be passed to
+ L{InheritedPort}'s constructor, this attribute must be set afterwards
+ but before any connections have occurred.
+ """
+
+ def _report(self):
+ """
+ Report the current number of open channels to the listening socket in
+ the parent process.
+ """
+ self.inheritedPort.reportStatus(str(self.outstandingRequests))
+
+
+ def addConnectedChannel(self, channel):
+ """
+ Add the connected channel, and report the current number of open
+ channels to the listening socket in the parent process.
+ """
+ HTTPFactory.addConnectedChannel(self, channel)
+ self._report()
+
+
+ def removeConnectedChannel(self, channel):
+ """
+ Remove the connected channel, and report the current number of open
+ channels to the listening socket in the parent process.
+ """
+ HTTPFactory.removeConnectedChannel(self, channel)
+ self._report()
+
+
+
__all__ = [
"HTTPFactory",
"HTTP503LoggingFactory",
Modified: CalendarServer/branches/users/glyph/sendfdport/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/branches/users/glyph/sendfdport/twistedcaldav/stdconfig.py 2010-03-25 18:39:01 UTC (rev 5399)
+++ CalendarServer/branches/users/glyph/sendfdport/twistedcaldav/stdconfig.py 2010-03-25 22:55:52 UTC (rev 5400)
@@ -143,6 +143,9 @@
"InheritSSLFDs": [], # File descriptors to inherit for HTTPS requests (empty = don't inherit)
"MetaFD": 0, # Inherited file descriptor to call recvmsg() on to recive sockets (none = don't inherit)
+ "UseMetaFD": False, # Use a 'meta' FD, i.e. an FD to transmit other
+ # FDs to slave processes.
+
#
# Types of service provided
#
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20100325/82f48f18/attachment-0001.html>
More information about the calendarserver-changes
mailing list