[CalendarServer-changes] [5413] CalendarServer/branches/users/glyph/sendfdport
source_changes at macosforge.org
source_changes at macosforge.org
Tue Mar 30 10:27:55 PDT 2010
Revision: 5413
http://trac.macosforge.org/projects/calendarserver/changeset/5413
Author: glyph at apple.com
Date: 2010-03-30 10:27:52 -0700 (Tue, 30 Mar 2010)
Log Message:
-----------
refactor the interface between sendfdport and application code; move the limiting code into one place, and honor MaxRequests again
Modified Paths:
--------------
CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/caldav.py
CalendarServer/branches/users/glyph/sendfdport/twext/internet/sendfdport.py
CalendarServer/branches/users/glyph/sendfdport/twext/web2/channel/http.py
Modified: CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/caldav.py 2010-03-30 15:55:15 UTC (rev 5412)
+++ CalendarServer/branches/users/glyph/sendfdport/calendarserver/tap/caldav.py 2010-03-30 17:27:52 UTC (rev 5413)
@@ -57,7 +57,7 @@
InheritedSocketDispatcher, InheritingProtocolFactory)
from twext.web2.channel.http import (
- LimitingHTTPFactory, SSLRedirectRequest, ReportingHTTPFactory)
+ LimitingHTTPFactory, SSLRedirectRequest, HTTPFactory)
try:
from twistedcaldav.version import version
@@ -826,15 +826,9 @@
s._inheritedSockets = [] # keep a reference to these so they don't close
if config.UseMetaFD:
- def sortAsInts(x):
- # "int" by itself isn't quite good enough, unfortunately,
- # because it can't handle None...
- if x is None:
- return 0
- else:
- return int(x)
-
- dispatcher = InheritedSocketDispatcher(sortAsInts)
+ cl = ConnectionLimiter(config.MaxAccepts,
+ (config.MaxRequests *
+ config.MultiProcess.ProcessCount))
for bindAddress in config.BindAddresses:
if config.BindHTTPPorts:
@@ -854,17 +848,14 @@
config.BindSSLPorts = [config.SSLPort]
if config.UseMetaFD:
- # XXX no automated tests for this whole block. How to test it?
-
for ports, description in [(config.BindSSLPorts, "SSL"),
(config.BindHTTPPorts, "TCP")]:
for port in ports:
- TCPServer(
- port, InheritingProtocolFactory(dispatcher, description),
+ MaxAcceptTCPServer(
+ port, cl.createFactory(description),
interface=bindAddress,
backlog=config.ListenBacklog
).setServiceParent(s)
- # Okay, now for each subprocess I need to add a thing to the dispatcher
else:
def _openSocket(addr, port):
log.info("Opening socket for inheritance at %s:%d" % (addr, port))
@@ -884,11 +875,9 @@
sock = _openSocket(bindAddress, int(portNum))
inheritSSLFDs.append(sock.fileno())
-
-
for p in xrange(0, config.MultiProcess.ProcessCount):
if config.UseMetaFD:
- extraArgs = dict(metaFD=dispatcher.addSocket())
+ extraArgs = dict(metaFD=cl.dispatcher.addSocket())
else:
extraArgs = dict(inheritFDs=inheritFDs,
inheritSSLFDs=inheritSSLFDs)
@@ -902,8 +891,6 @@
)
monitor.addProcessObject(process, parentEnv)
-
-
for name, pool in config.Memcached.Pools.items():
if pool.ServerEnabled:
self.log_info("Adding memcached service for pool: %s" % (name,))
@@ -1026,6 +1013,161 @@
+
+
+class ReportingHTTPFactory(HTTPFactory):
+ """
+ An L{HTTPFactory} which reports its status to a
+ L{twext.internet.sendfdport.InheritedPort}.
+
+ @ivar inheritedPort: an L{InheritedPort} to report status (the current
+ number of outstanding connections) to. Since this - the
+ L{ReportingHTTPFactory} - needs to be instantiated to be passed to
+ L{InheritedPort}'s constructor, this attribute must be set afterwards
+ but before any connections have occurred.
+ """
+
+ def _report(self, message):
+ """
+ Report a status message to the parent.
+ """
+ self.inheritedPort.reportStatus(message)
+
+
+ def addConnectedChannel(self, channel):
+ """
+ Add the connected channel, and report the current number of open
+ channels to the listening socket in the parent process.
+ """
+ HTTPFactory.addConnectedChannel(self, channel)
+ self._report("+")
+
+
+ def removeConnectedChannel(self, channel):
+ """
+ Remove the connected channel, and report the current number of open
+ channels to the listening socket in the parent process.
+ """
+ HTTPFactory.removeConnectedChannel(self, channel)
+ self._report("-")
+
+
+
+class ConnectionLimiter(object):
+ """
+ Connection limiter for use with L{InheritedSocketDispatcher}.
+
+ This depends on statuses being reported by L{ReportingHTTPFactory}
+ """
+
+ def __init__(self, maxAccepts, maxRequests):
+ """
+ Create a L{ConnectionLimiter} with an associated dispatcher and
+ list of factories.
+ """
+ self.factories = []
+ self.dispatcher = InheritedSocketDispatcher(self)
+ self.maxAccepts = maxAccepts
+ self.maxRequests = maxRequests
+
+
+ # implementation of implicit statusWatcher interface required by
+ # InheritedSocketDispatcher
+
+ def statusFromMessage(self, previousStatus, message):
+ """
+ Determine a subprocess socket's status from its previous status and a
+ status message.
+ """
+ if message == '-':
+ result = self.intWithNoneAsZero(previousStatus) - 1
+ # A connection has gone away in a subprocess; we should start
+ # accepting connections again if we paused (see
+ # newConnectionStatus)
+ for f in self.factories:
+ f.myServer.myPort.startReading()
+ else:
+ # '+' is just an acknowledgement of newConnectionStatus, so we can
+ # ignore it.
+ result = self.intWithNoneAsZero(previousStatus)
+ return result
+
+
+ def newConnectionStatus(self, previousStatus):
+ """
+ Determine the effect of a new connection being sent on a subprocess
+ socket.
+ """
+ current = self.outstandingRequests + 1
+ maximum = (config.MaxRequests *
+ config.MultiProcess.ProcessCount)
+ overloaded = (current >= maximum)
+ if overloaded:
+ for f in self.factories:
+ f.myServer.myPort.stopReading()
+
+ result = self.intWithNoneAsZero(previousStatus) + 1
+ return result
+
+
+ def createFactory(self, description):
+ """
+ Create a L{LimitingInheritingProtocolFactory}.
+ """
+ l = LimitingInheritingProtocolFactory(self, description)
+ self.factories.append(l)
+ return l
+
+
+ def intWithNoneAsZero(self, x):
+ """
+ Convert 'x' to an C{int}, unless x is C{None}, in which case return 0.
+ """
+ if x is None:
+ return 0
+ else:
+ return int(x)
+
+
+ @property
+ def outstandingRequests(self):
+ outstanding = 0
+ for status in self.dispatcher.statuses:
+ outstanding += self.intWithNoneAsZero(status)
+ return outstanding
+
+
+
+class LimitingInheritingProtocolFactory(InheritingProtocolFactory):
+ """
+ An L{InheritingProtocolFactory} that supports the implicit factory contract
+ required by L{MaxAcceptTCPServer}/L{MaxAcceptTCPPort}.
+
+ @ivar outstandingRequests: a read-only property for the number of currently
+ active connections.
+
+ @ivar maxAccepts: The maximum number of times to call 'accept()' in a
+ single reactor loop iteration.
+
+ @ivar maxRequests: The maximum number of concurrent connections to accept
+ at once - note that this is for the I{entire server}, whereas the
+ value in the configuration file is for only a single process.
+ """
+
+ def __init__(self, limiter, description):
+ super(LimitingInheritingProtocolFactory, self).__init__(
+ limiter.dispatcher, description)
+ self.limiter = limiter
+ self.maxAccepts = limiter.maxAccepts
+ self.maxRequests = limiter.maxRequests
+
+
+ @property
+ def outstandingRequests(self):
+ return self.limiter.outstandingRequests
+
+
+
class TwistdSlaveProcess(object):
"""
A L{TwistdSlaveProcess} is information about how to start a slave process
Modified: CalendarServer/branches/users/glyph/sendfdport/twext/internet/sendfdport.py
===================================================================
--- CalendarServer/branches/users/glyph/sendfdport/twext/internet/sendfdport.py 2010-03-30 15:55:15 UTC (rev 5412)
+++ CalendarServer/branches/users/glyph/sendfdport/twext/internet/sendfdport.py 2010-03-30 17:27:52 UTC (rev 5413)
@@ -21,8 +21,9 @@
"""
from os import close
-from errno import EAGAIN
-from socket import socketpair, fromfd, error as SocketError, AF_INET, SOCK_STREAM
+from errno import EAGAIN, ENOBUFS
+from socket import (socketpair, fromfd, error as SocketError,
+ AF_INET, AF_UNIX, SOCK_STREAM, SOCK_DGRAM)
from twisted.python import log
@@ -47,11 +48,6 @@
self.transport.stopReading()
self.transport.stopWriting()
skt = self.transport.getHandle()
- # actually I want to retrieve a child from a *pool* of potentially
- # available children. i have to make that determination based on the
- # number of connections each child is currently handling. which means
- # the logic shouldn't live here. where should it live? in this spot,
- # I just need an FD.
self.factory.sendSocket(skt)
@@ -83,7 +79,7 @@
-class _AvailableConnection(FileDescriptor, object):
+class _SubprocessSocket(FileDescriptor, object):
"""
A socket in the master process pointing at a file descriptor that can be
used to transmit sockets to a subprocess.
@@ -103,9 +99,10 @@
@type status: C{str}
"""
- def __init__(self, reactor, skt):
- FileDescriptor.__init__(self, reactor)
+ def __init__(self, dispatcher, skt):
+ FileDescriptor.__init__(self, dispatcher.reactor)
self.status = None
+ self.dispatcher = dispatcher
self.skt = skt # XXX needs to be set non-blocking by somebody
self.fileno = skt.fileno
self.outgoingSocketQueue = []
@@ -124,11 +121,12 @@
Receive a status / health message and record it.
"""
try:
- data, flags, ancillary = recvmsg(self.fd)
- except SocketError:
- pass # handle EAGAIN, etc
+ data, flags, ancillary = recvmsg(self.skt.fileno())
+ except SocketError, se:
+ if se.errno not in (EAGAIN, ENOBUFS):
+ raise
else:
- self.status = data
+ self.dispatcher.statusMessage(self, data)
def doWrite(self):
@@ -140,7 +138,7 @@
try:
sendfd(self.skt.fileno(), skt.fileno(), desc)
except SocketError, se:
- if se.errno == EAGAIN:
+ if se.errno in (EAGAIN, ENOBUFS):
self.outgoingSocketQueue.insert(0, (skt, desc))
return
raise
@@ -152,26 +150,37 @@
class InheritedSocketDispatcher(object):
"""
Used by one or more L{InheritingProtocolFactory}s, this keeps track of a
- list of available sockets in subprocesses and sends inbound connections towards them.
-
- @ivar fitnessFunction: a function used to evaluate status messages received
- on available subprocess connections. a 1-argument function which
- accepts a string - or C{None}, if no status has been reported - and
- returns something sortable. this will be used to sort all available
- status messages; the lowest sorting result will be used to handle the
- new connection.
+ list of available sockets in subprocesses and sends inbound connections
+ towards them.
"""
- def __init__(self, fitnessFunction):
+ def __init__(self, statusWatcher):
"""
Create a socket dispatcher.
"""
- self.availableConnections = []
- self.fitnessFunction = fitnessFunction
+ self._subprocessSockets = []
+ self.statusWatcher = statusWatcher
from twisted.internet import reactor
self.reactor = reactor
+ @property
+ def statuses(self):
+ """
+ Yield the current status of all subprocess sockets.
+ """
+ for subsocket in self._subprocessSockets:
+ yield subsocket.status
+
+
+ def statusMessage(self, subsocket, message):
+ """
+ The status of a connection has changed; update all registered status
+ change listeners.
+ """
+ subsocket.status = self.statusWatcher.statusFromMessage(subsocket.status, message)
+
+
def sendFileDescriptor(self, skt, description):
"""
A connection has been received. Dispatch it.
@@ -181,9 +190,12 @@
@param description: some text to identify to the subprocess's
L{InheritedPort} what type of transport to create for this socket.
"""
- self.availableConnections.sort(key=lambda conn:
- self.fitnessFunction(conn.status))
- self.availableConnections[0].sendSocketToPeer(skt, description)
+ self._subprocessSockets.sort(key=lambda conn: conn.status)
+ selectedSocket = self._subprocessSockets[0]
+ selectedSocket.sendSocketToPeer(skt, description)
+ # XXX Maybe want to send along 'description' or 'skt' or some
+ # properties thereof? -glyph
+ selectedSocket.status = self.statusWatcher.newConnectionStatus(selectedSocket.status)
def addSocket(self):
@@ -195,9 +207,10 @@
C{fileno()} as part of the C{childFDs} argument to
C{spawnProcess()}, then close it.
"""
- i, o = socketpair()
- a = _AvailableConnection(self.reactor, o)
- self.availableConnections.append(a)
+ i, o = socketpair(AF_UNIX, SOCK_DGRAM)
+ a = _SubprocessSocket(self, o)
+ a.startReading()
+ self._subprocessSockets.append(a)
return i
@@ -271,7 +284,7 @@
try:
sendmsg(self.fd, msg, 0)
except SocketError, se:
- if se.errno == EAGAIN:
+ if se.errno in (EAGAIN, ENOBUFS):
self.statusQueue.insert(0, msg)
return
raise
@@ -280,10 +293,9 @@
def reportStatus(self, statusMessage):
"""
- Report a status message to the L{_AvailableConnection} monitoring this
+ Report a status message to the L{_SubprocessSocket} monitoring this
L{InheritedPort}'s health in the master process.
"""
- # XXX this has got to be invoked from
self.statusQueue.append(statusMessage)
self.startWriting()
Modified: CalendarServer/branches/users/glyph/sendfdport/twext/web2/channel/http.py
===================================================================
--- CalendarServer/branches/users/glyph/sendfdport/twext/web2/channel/http.py 2010-03-30 15:55:15 UTC (rev 5412)
+++ CalendarServer/branches/users/glyph/sendfdport/twext/web2/channel/http.py 2010-03-30 17:27:52 UTC (rev 5413)
@@ -950,6 +950,8 @@
"please try again later.</body></html>")
self.transport.loseConnection()
+
+
class HTTPFactory(protocol.ServerFactory):
"""
Factory for HTTP server.
@@ -1007,6 +1009,7 @@
return len(self.connectedChannels)
+
class HTTP503LoggingFactory (HTTPFactory):
"""
Factory for HTTP server which emits a 503 response when overloaded.
@@ -1163,45 +1166,6 @@
-class ReportingHTTPFactory(HTTPFactory):
- """
- An L{HTTPFactory} which reports its status to a
- L{twext.internet.sendfdport.InheritedPort}.
-
- @ivar inheritedPort: an L{InheritedPort} to report status (the current
- number of outstanding connections) to. Since this - the
- L{ReportingHTTPFactory} - needs to be instantiated to be passed to
- L{InheritedPort}'s constructor, this attribute must be set afterwards
- but before any connections have occurred.
- """
-
- def _report(self):
- """
- Report the current number of open channels to the listening socket in
- the parent process.
- """
- self.inheritedPort.reportStatus(str(self.outstandingRequests))
-
-
- def addConnectedChannel(self, channel):
- """
- Add the connected channel, and report the current number of open
- channels to the listening socket in the parent process.
- """
- HTTPFactory.addConnectedChannel(self, channel)
- self._report()
-
-
- def removeConnectedChannel(self, channel):
- """
- Remove the connected channel, and report the current number of open
- channels to the listening socket in the parent process.
- """
- HTTPFactory.removeConnectedChannel(self, channel)
- self._report()
-
-
-
__all__ = [
"HTTPFactory",
"HTTP503LoggingFactory",
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20100330/96e5eaca/attachment-0001.html>
More information about the calendarserver-changes
mailing list