[CalendarServer-changes] [5430] CalendarServer/branches/users/wsanchez/deployment
source_changes at macosforge.org
source_changes at macosforge.org
Wed Mar 31 11:03:59 PDT 2010
Revision: 5430
http://trac.macosforge.org/projects/calendarserver/changeset/5430
Author: glyph at apple.com
Date: 2010-03-31 11:03:58 -0700 (Wed, 31 Mar 2010)
Log Message:
-----------
Merge in 'sendfd' changes so that the server will call accept() from a single process.
Modified Paths:
--------------
CalendarServer/branches/users/wsanchez/deployment/setup.py
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cluster.py
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/config.py
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/tap.py
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/test_tap.py
Added Paths:
-----------
CalendarServer/branches/users/wsanchez/deployment/lib-patches/Twisted/twisted.web2.channel.http.patch
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/metafd.py
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendfd.py
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendfdport.py
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendmsg.c
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/pullpipe.py
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/test_sendmsg.py
Property Changed:
----------------
CalendarServer/branches/users/wsanchez/deployment/
Property changes on: CalendarServer/branches/users/wsanchez/deployment
___________________________________________________________________
Modified: svn:mergeinfo
- /CalendarServer/branches/users/sagen/deployment-inherit-fds-4571:4573-4709
/CalendarServer/branches/users/sagen/deployment-inspection:4927-4937
+ /CalendarServer/branches/users/glyph/deployment-plus-sendfd:5426-5429
/CalendarServer/branches/users/sagen/deployment-inherit-fds-4571:4573-4709
/CalendarServer/branches/users/sagen/deployment-inspection:4927-4937
Copied: CalendarServer/branches/users/wsanchez/deployment/lib-patches/Twisted/twisted.web2.channel.http.patch (from rev 5429, CalendarServer/branches/users/glyph/deployment-plus-sendfd/lib-patches/Twisted/twisted.web2.channel.http.patch)
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/lib-patches/Twisted/twisted.web2.channel.http.patch (rev 0)
+++ CalendarServer/branches/users/wsanchez/deployment/lib-patches/Twisted/twisted.web2.channel.http.patch 2010-03-31 18:03:58 UTC (rev 5430)
@@ -0,0 +1,90 @@
+Index: twisted/web2/channel/http.py
+===================================================================
+--- twisted/web2/channel/http.py (revision 19773)
++++ twisted/web2/channel/http.py (working copy)
+@@ -658,7 +658,7 @@
+
+ def connectionMade(self):
+ self.setTimeout(self.inputTimeOut)
+- self.factory.outstandingRequests+=1
++ self.factory.addConnectedChannel(self)
+
+ def lineReceived(self, line):
+ if self._first_line:
+@@ -847,7 +847,7 @@
+ self.transport.loseConnection()
+
+ def connectionLost(self, reason):
+- self.factory.outstandingRequests-=1
++ self.factory.removeConnectedChannel(self)
+
+ self._writeLost = True
+ self.readConnectionLost()
+@@ -869,20 +869,33 @@
+ "please try again later.</body></html>")
+ self.transport.loseConnection()
+
++
++
+ class HTTPFactory(protocol.ServerFactory):
+- """Factory for HTTP server."""
++ """
++ Factory for HTTP server.
+
++ @ivar outstandingRequests: the number of currently connected HTTP channels.
++
++ @type outstandingRequests: C{int}
++
++ @ivar connectedChannels: all the channels that have currently active
++ connections.
++
++ @type connectedChannels: C{set} of L{HTTPChannel}
++ """
++
+ protocol = HTTPChannel
+
+ protocolArgs = None
+
+- outstandingRequests = 0
+-
+ def __init__(self, requestFactory, maxRequests=600, **kwargs):
+- self.maxRequests=maxRequests
++ self.maxRequests = maxRequests
+ self.protocolArgs = kwargs
+- self.protocolArgs['requestFactory']=requestFactory
+-
++ self.protocolArgs['requestFactory'] = requestFactory
++ self.connectedChannels = set()
++
++
+ def buildProtocol(self, addr):
+ if self.outstandingRequests >= self.maxRequests:
+ return OverloadedServerProtocol()
+@@ -893,4 +906,27 @@
+ setattr(p, arg, value)
+ return p
+
++
++ def addConnectedChannel(self, channel):
++ """
++ Add a connected channel to the set of currently connected channels and
++ increase the outstanding request count.
++ """
++ self.connectedChannels.add(channel)
++
++
++ def removeConnectedChannel(self, channel):
++ """
++ Remove a connected channel from the set of currently connected channels
++ and decrease the outstanding request count.
++ """
++ self.connectedChannels.remove(channel)
++
++
++ @property
++ def outstandingRequests(self):
++ return len(self.connectedChannels)
++
++
++
+ __all__ = ['HTTPFactory', ]
Modified: CalendarServer/branches/users/wsanchez/deployment/setup.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/setup.py 2010-03-31 17:55:47 UTC (rev 5429)
+++ CalendarServer/branches/users/wsanchez/deployment/setup.py 2010-03-31 18:03:58 UTC (rev 5430)
@@ -97,7 +97,10 @@
from distutils.core import Extension
-extensions = []
+extensions = [
+ Extension("twistedcaldav.sendmsg",
+ sources=["twistedcaldav/sendmsg.c"])
+]
if sys.platform == "darwin":
extensions.append(
Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cluster.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cluster.py 2010-03-31 17:55:47 UTC (rev 5429)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cluster.py 2010-03-31 18:03:58 UTC (rev 5430)
@@ -26,13 +26,17 @@
from twisted.internet import reactor, process
from twisted.internet.threads import deferToThread
from twisted.python.reflect import namedClass
+from twisted.python.usage import UsageError
from twistedcaldav.accesslog import AMPLoggingFactory, RotatingFileAccessLoggingObserver
from twistedcaldav.config import config, ConfigurationError
from twistedcaldav.util import getNCPU
from twistedcaldav.log import Logger
from twistedcaldav.directory.appleopendirectory import OpenDirectoryService
+OpenDirectoryService # Pyflakes
+from twistedcaldav.metafd import ConnectionLimiter
+
log = Logger()
serviceTemplate = """
@@ -62,7 +66,7 @@
def __init__(self, twistd, tapname, configFile, id,
interfaces, port, sslPort,
- inheritFDs=None, inheritSSLFDs=None):
+ inheritFDs=None, inheritSSLFDs=None, dispatcher=None):
self.twistd = twistd
@@ -75,8 +79,15 @@
self.ports = port
self.sslPorts = sslPort
- self.inheritFDs = inheritFDs
- self.inheritSSLFDs = inheritSSLFDs
+ def emptyIfNone(x):
+ if x is None:
+ return []
+ else:
+ return x
+ self.inheritFDs = emptyIfNone(inheritFDs)
+ self.inheritSSLFDs = emptyIfNone(inheritSSLFDs)
+ self.metaSocket = None
+ self.dispatcher = dispatcher
self.interfaces = interfaces
@@ -85,12 +96,36 @@
return '%s-%s' % (self.prefix, self.ports[0])
elif self.sslPorts is not None:
return '%s-%s' % (self.prefix, self.sslPorts[0])
- elif self.inheritFDs or self.inheritSSLFDs:
+ elif self.inheritFDs or self.inheritSSLFDs or self.dispatcher:
return '%s-%s' % (self.prefix, self.id)
raise ConfigurationError(
"Can't create TwistdSlaveProcess without a TCP Port")
+
+ def getMetaDescriptor(self):
+ """
+ Get the meta-socket file descriptor to inherit.
+ """
+ if self.metaSocket is None:
+ self.metaSocket = self.dispatcher.addSocket()
+ return self.metaSocket.fileno()
+
+
+ def getFileDescriptors(self):
+ """
+ @return: a mapping of file descriptor numbers for the new (child)
+ process to file descriptor numbers in the current (master) process.
+ """
+ fds = {}
+ maybeMetaFD = []
+ if self.dispatcher is not None:
+ maybeMetaFD.append(self.getMetaDescriptor())
+ for fd in self.inheritSSLFDs + self.inheritFDs + maybeMetaFD:
+ fds[fd] = fd
+ return fds
+
+
def getCommandLine(self):
args = [
sys.executable,
@@ -144,6 +179,13 @@
args.extend([
'-o',
'InheritSSLFDs=%s' % (','.join(map(str, self.inheritSSLFDs)),)])
+
+ if self.dispatcher is not None:
+ # XXX this FD is never closed in the parent. should it be?
+ # (should they *all* be?) -glyph
+ args.extend([
+ "-o", "MetaFD=%s" % (self.getMetaDescriptor(),)
+ ])
return args
@@ -157,7 +199,7 @@
if ssl and self.sslPorts is not None:
port = self.sslPorts
- if self.inheritFDs or self.inheritSSLFDs:
+ if self.inheritFDs or self.inheritSSLFDs or self.dispatcher:
port = [self.id]
if port is None:
@@ -169,42 +211,105 @@
'bindAddress': '127.0.0.1'}
+
class DelayedStartupProcessMonitor(procmon.ProcessMonitor):
+ """
+ A L{DelayedStartupProcessMonitor} is a L{procmon.ProcessMonitor} that
+ defers building its command lines until the service is actually ready to
+ start. It also specializes process-starting to allow for process objects
+ to determine their arguments as they are started up rather than entirely
+ ahead of time.
+ @ivar processObjects: a C{list} of L{TwistdSlaveProcess} to add using
+ C{self.addProcess} when this service starts up.
+
+ @ivar _extraFDs: a mapping from process names to extra file-descriptor
+ maps. (By default, all processes will have the standard stdio mapping,
+ so all file descriptors here should be >2.) This is updated during
+ L{DelayedStartupProcessMonitor.startService}, by inspecting the result
+ of L{TwistdSlaveProcess.getFileDescriptors}.
+
+ @ivar reactor: an L{IReactorProcess} for spawning processes, defaulting to
+ the global reactor.
+ """
+
+ def __init__(self, *args, **kwargs):
+ procmon.ProcessMonitor.__init__(self, *args, **kwargs)
+ self.processObjects = []
+ self._extraFDs = {}
+ self.reactor = reactor
+
+
+ def addProcessObject(self, process, env):
+ """
+ Add a process object to be run when this service is started.
+
+ @param env: a dictionary of environment variables.
+
+ @param process: a L{TwistdSlaveProcesses} object to be started upon
+ service startup.
+ """
+ self.processObjects.append((process, env))
+
+
def startService(self):
service.Service.startService(self)
+
+ # Now we're ready to build the command lines and actualy add the
+ # processes to procmon. This step must be done prior to setting
+ # active to 1
+ for processObject, env in self.processObjects:
+ name = processObject.getName()
+ self.addProcess(
+ name,
+ processObject.getCommandLine(),
+ env=env
+ )
+ self._extraFDs[name] = processObject.getFileDescriptors()
+
self.active = 1
delay = 0
- delay_interval = config.MultiProcess['StaggeredStartup']['Interval'] if config.MultiProcess['StaggeredStartup']['Enabled'] else 0
+
+ if config.MultiProcess.StaggeredStartup.Enabled:
+ delay_interval = config.MultiProcess.StaggeredStartup.Interval
+ else:
+ delay_interval = 0
+
for name in self.processes.keys():
- reactor.callLater(delay if name.startswith("caldav") else 0, self.startProcess, name)
if name.startswith("caldav"):
+ when = delay
delay += delay_interval
- self.consistency = reactor.callLater(self.consistencyDelay,
- self._checkConsistency)
+ else:
+ when = 0
+ reactor.callLater(when, self.startProcess, name)
- def signalAll(self, signal, startswithname=None, seconds=0):
+ self.consistency = reactor.callLater(
+ self.consistencyDelay,
+ self._checkConsistency
+ )
+
+ def signalAll(self, signal, startswithname=None):
"""
Send a signal to all child processes.
@param signal: the signal to send
@type signal: C{int}
- @param startswithname: is set only signal those processes whose name starts with this string
+ @param startswithname: is set only signal those processes
+ whose name starts with this string
@type signal: C{str}
"""
- delay = 0
for name in self.processes.keys():
if startswithname is None or name.startswith(startswithname):
- reactor.callLater(delay, self.signalProcess, signal, name)
- delay += seconds
+ self.signalProcess(signal, name)
def signalProcess(self, signal, name):
"""
Send a signal to each monitored process
-
+
@param signal: the signal to send
@type signal: C{int}
- @param startswithname: is set only signal those processes whose name starts with this string
+ @param startswithname: is set only signal those processes
+ whose name starts with this string
@type signal: C{str}
"""
if not self.protocols.has_key(name):
@@ -226,17 +331,15 @@
childFDs = { 0 : "w", 1 : "r", 2 : "r" }
- # Examine args for -o InheritFDs= and -o InheritSSLFDs=
- # Add any file descriptors listed in those args to the childFDs
- # dictionary so those don't get closed across the spawn.
- for i in xrange(len(args)-1):
- if args[i] == "-o" and args[i+1].startswith("Inherit"):
- for fd in map(int, args[i+1].split("=")[1].split(",")):
- childFDs[fd] = fd
+ childFDs.update(self._extraFDs.get(name, {}))
- reactor.spawnProcess(p, args[0], args, uid=uid, gid=gid, env=env,
- childFDs=childFDs)
+ self.reactor.spawnProcess(
+ p, args[0], args, uid=uid, gid=gid, env=env,
+ childFDs=childFDs
+ )
+
+
def makeService_Combined(self, options):
@@ -327,7 +430,13 @@
if not config.BindAddresses:
config.BindAddresses = [""]
- s._inheritedSockets = [] # keep a reference to these so they don't close
+ if config.UseMetaFD:
+ cl = ConnectionLimiter(config.MaxAccepts,
+ (config.MaxRequests *
+ config.MultiProcess.ProcessCount))
+ cl.setServiceParent(s)
+ else:
+ s._inheritedSockets = [] # keep a reference to these so they don't close
for bindAddress in config.BindAddresses:
if config.BindHTTPPorts:
@@ -346,23 +455,29 @@
elif config.SSLPort != 0:
config.BindSSLPorts = [config.SSLPort]
- def _openSocket(addr, port):
- log.info("Opening socket for inheritance at %s:%d" % (addr, port))
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.setblocking(0)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- sock.bind((addr, port))
- sock.listen(config.ListenBacklog)
- s._inheritedSockets.append(sock)
- return sock
+ if config.UseMetaFD:
+ for ports, description in [(config.BindSSLPorts, "SSL"),
+ (config.BindHTTPPorts, "TCP")]:
+ for portNumber in ports:
+ cl.addPortService(description, portNumber, bindAddress, config.ListenBacklog)
+ else:
+ def _openSocket(addr, port):
+ log.info("Opening socket for inheritance at %s:%d" % (addr, port))
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setblocking(0)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind((addr, port))
+ sock.listen(config.ListenBacklog)
+ s._inheritedSockets.append(sock)
+ return sock
- for portNum in config.BindHTTPPorts:
- sock = _openSocket(bindAddress, int(portNum))
- inheritFDs.append(sock.fileno())
+ for portNum in config.BindHTTPPorts:
+ sock = _openSocket(bindAddress, int(portNum))
+ inheritFDs.append(sock.fileno())
- for portNum in config.BindSSLPorts:
- sock = _openSocket(bindAddress, int(portNum))
- inheritSSLFDs.append(sock.fileno())
+ for portNum in config.BindSSLPorts:
+ sock = _openSocket(bindAddress, int(portNum))
+ inheritSSLFDs.append(sock.fileno())
if not config.MultiProcess['LoadBalancer']['Enabled']:
bindAddress = config.BindAddresses
@@ -381,19 +496,24 @@
if inheritSSLFDs:
sslPort = None
+ if config.UseMetaFD:
+ port = None
+ sslPort = None
+ extraArgs = dict(dispatcher=cl.dispatcher)
+ else:
+ extraArgs = dict(inheritFDs=inheritFDs,
+ inheritSSLFDs=inheritSSLFDs)
+
process = TwistdSlaveProcess(config.Twisted['twistd'],
self.tapname,
options['config'],
p,
bindAddress,
port, sslPort,
- inheritFDs=inheritFDs,
- inheritSSLFDs=inheritSSLFDs
+ **extraArgs
)
- monitor.addProcess(process.getName(),
- process.getCommandLine(),
- env=parentEnv)
+ monitor.addProcessObject(process, parentEnv)
if config.HTTPPort:
hosts.append(process.getHostLine())
Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/config.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/config.py 2010-03-31 17:55:47 UTC (rev 5429)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/config.py 2010-03-31 18:03:58 UTC (rev 5430)
@@ -92,7 +92,10 @@
"ServerHostName": "localhost", # Network host name.
"HTTPPort": 0, # HTTP port (0 to disable HTTP)
"SSLPort" : 0, # SSL port (0 to disable HTTPS)
-
+ "MetaFD": 0, # Inherited file descriptor to call recvmsg() on to recive sockets (none = don't inherit)
+
+ "UseMetaFD": True, # Use a 'meta' FD, i.e. an FD to transmit other
+ # FDs to slave processes.
#
# Network address configuration information
#
Copied: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/metafd.py (from rev 5429, CalendarServer/branches/users/glyph/deployment-plus-sendfd/twistedcaldav/metafd.py)
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/metafd.py (rev 0)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/metafd.py 2010-03-31 18:03:58 UTC (rev 5430)
@@ -0,0 +1,253 @@
+
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twisted.internet.tcp import Server
+from twisted.application.internet import TCPServer
+
+from twisted.internet import reactor
+
+from twisted.application.service import MultiService, Service
+
+from twisted.web2.channel.http import HTTPFactory
+
+from twistedcaldav.sendfdport import (
+ InheritedPort, InheritedSocketDispatcher, InheritingProtocolFactory)
+
+
+
+class JustEnoughLikeAPort(object):
+ """
+ Fake out just enough of L{tcp.Port} to be acceptable to
+ L{tcp.Server}...
+ """
+ port = 'inherited'
+
+
+
+class ReportingHTTPService(Service, object):
+ """
+ Service which starts up an HTTP server that can report back to its parent
+ process via L{InheritedPort}.
+ """
+
+ _connectionCount = 0
+
+ def __init__(self, site, fd, contextFactory):
+ self.contextFactory = contextFactory
+ # Unlike other 'factory' constructions, config.MaxRequests and
+ # config.MaxAccepts are dealt with in the master process, so we don't
+ # need to propagate them here.
+ self.site = site
+ self.fd = fd
+
+
+ def startService(self):
+ """
+ Start reading on the inherited port.
+ """
+ Service.startService(self)
+ self.reportingFactory = ReportingHTTPFactory(self.site, vary=True)
+ self.reportingFactory.inheritedPort = InheritedPort(
+ self.fd, self.createTransport, self.reportingFactory
+ )
+ self.reportingFactory.inheritedPort.startReading()
+
+
+ def stopService(self):
+ """
+ Stop reading on the inherited port.
+ """
+ Service.stopService(self)
+ # XXX stopping should really be destructive, because otherwise we will
+ # always leak a file descriptor; i.e. this shouldn't be restartable.
+ # XXX this needs to return a Deferred.
+ self.reportingFactory.inheritedPort.stopReading()
+
+
+ def createTransport(self, skt, data, protocol):
+ """
+ Create a TCP transport, from a socket object passed by the parent.
+ """
+ self._connectionCount += 1
+ transport = Server(skt, protocol,
+ skt.getpeername(), JustEnoughLikeAPort,
+ self._connectionCount)
+ if data == 'SSL':
+ transport.startTLS(self.contextFactory)
+ transport.startReading()
+ return transport
+
+
+
+class ReportingHTTPFactory(HTTPFactory):
+ """
+ An L{HTTPFactory} which reports its status to a
+ L{twext.internet.sendfdport.InheritedPort}.
+
+ @ivar inheritedPort: an L{InheritedPort} to report status (the current
+ number of outstanding connections) to. Since this - the
+ L{ReportingHTTPFactory} - needs to be instantiated to be passed to
+ L{InheritedPort}'s constructor, this attribute must be set afterwards
+ but before any connections have occurred.
+ """
+
+ def _report(self, message):
+ """
+ Report a status message to the parent.
+ """
+ self.inheritedPort.reportStatus(message)
+
+
+ def addConnectedChannel(self, channel):
+ """
+ Add the connected channel, and report the current number of open
+ channels to the listening socket in the parent process.
+ """
+ HTTPFactory.addConnectedChannel(self, channel)
+ self._report("+")
+
+
+ def removeConnectedChannel(self, channel):
+ """
+ Remove the connected channel, and report the current number of open
+ channels to the listening socket in the parent process.
+ """
+ HTTPFactory.removeConnectedChannel(self, channel)
+ self._report("-")
+
+
+
+class ConnectionLimiter(MultiService, object):
+ """
+ Connection limiter for use with L{InheritedSocketDispatcher}.
+
+ This depends on statuses being reported by L{ReportingHTTPFactory}
+ """
+
+ def __init__(self, maxAccepts, maxRequests):
+ """
+ Create a L{ConnectionLimiter} with an associated dispatcher and
+ list of factories.
+ """
+ MultiService.__init__(self)
+ self.factories = []
+ # XXX dispatcher needs to be a service, so that it can shut down its
+ # sub-sockets.
+ self.dispatcher = InheritedSocketDispatcher(self)
+ self.maxAccepts = maxAccepts
+ self.maxRequests = maxRequests
+
+
+ def addPortService(self, description, port, interface, backlog):
+ """
+ Add a L{TCPServer} to bind a TCP port to a socket description.
+ """
+ lipf = LimitingInheritingProtocolFactory(self, description)
+ self.factories.append(lipf)
+ svr = TCPServer(
+ port, lipf,
+ interface=interface,
+ backlog=backlog
+ )
+ svr.setServiceParent(self)
+ lipf.serverService = svr
+
+
+ # implementation of implicit statusWatcher interface required by
+ # InheritedSocketDispatcher
+
+ def statusFromMessage(self, previousStatus, message):
+ """
+ Determine a subprocess socket's status from its previous status and a
+ status message.
+ """
+ if message == '-':
+ result = self.intWithNoneAsZero(previousStatus) - 1
+ # A connection has gone away in a subprocess; we should start
+ # accepting connections again if we paused (see
+ # newConnectionStatus)
+ for f in self.factories:
+ f.serverService._port.startReading()
+ else:
+ # '+' is just an acknowledgement of newConnectionStatus, so we can
+ # ignore it.
+ result = self.intWithNoneAsZero(previousStatus)
+ return result
+
+
+ def newConnectionStatus(self, previousStatus):
+ """
+ Determine the effect of a new connection being sent on a subprocess
+ socket.
+ """
+ current = self.outstandingRequests + 1
+ maximum = self.maxRequests
+ overloaded = (current >= maximum)
+ if overloaded:
+ for f in self.factories:
+ f.serverService._port.stopReading()
+
+ result = self.intWithNoneAsZero(previousStatus) + 1
+ return result
+
+
+ def intWithNoneAsZero(self, x):
+ """
+ Convert 'x' to an C{int}, unless x is C{None}, in which case return 0.
+ """
+ if x is None:
+ return 0
+ else:
+ return int(x)
+
+
+ @property
+ def outstandingRequests(self):
+ outstanding = 0
+ for status in self.dispatcher.statuses:
+ outstanding += self.intWithNoneAsZero(status)
+ return outstanding
+
+
+
+class LimitingInheritingProtocolFactory(InheritingProtocolFactory):
+ """
+ An L{InheritingProtocolFactory} that supports the implicit factory contract
+ required by L{MaxAcceptTCPServer}/L{MaxAcceptTCPPort}.
+
+ @ivar outstandingRequests: a read-only property for the number of currently
+ active connections.
+
+ @ivar maxAccepts: The maximum number of times to call 'accept()' in a
+ single reactor loop iteration.
+
+ @ivar maxRequests: The maximum number of concurrent connections to accept
+ at once - note that this is for the I{entire server}, whereas the
+ value in the configuration file is for only a single process.
+ """
+
+ def __init__(self, limiter, description):
+ super(LimitingInheritingProtocolFactory, self).__init__(
+ limiter.dispatcher, description)
+ self.limiter = limiter
+ self.maxAccepts = limiter.maxAccepts
+ self.maxRequests = limiter.maxRequests
+
+
+ @property
+ def outstandingRequests(self):
+ return self.limiter.outstandingRequests
Copied: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendfd.py (from rev 5429, CalendarServer/branches/users/glyph/deployment-plus-sendfd/twistedcaldav/sendfd.py)
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendfd.py (rev 0)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendfd.py 2010-03-31 18:03:58 UTC (rev 5430)
@@ -0,0 +1,66 @@
+# -*- test-case-name: twext.python.test.test_sendmsg -*-
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from struct import pack, unpack
+from socket import SOL_SOCKET
+
+from twistedcaldav.sendmsg import sendmsg, recvmsg, SCM_RIGHTS
+
+def sendfd(socketfd, fd, description):
+ """
+ Send the given FD to another process via L{sendmsg} on the given C{AF_UNIX}
+ socket.
+
+ @param socketfd: An C{AF_UNIX} socket, attached to another process waiting
+ to receive sockets via the ancillary data mechanism in L{sendmsg}.
+
+ @type socketfd: C{int}
+
+ @param fd: A file descriptor to be sent to the other process.
+
+ @type fd: C{int}
+
+ @param description: a string describing the socket that was passed.
+
+ @type description: C{str}
+ """
+ sendmsg(
+ socketfd, description, 0, [(SOL_SOCKET, SCM_RIGHTS, pack("i", fd))]
+ )
+
+
+def recvfd(socketfd):
+ """
+ Receive a file descriptor from a L{sendmsg} message on the given C{AF_UNIX}
+ socket.
+
+ @param socketfd: An C{AF_UNIX} socket, attached to another process waiting
+ to send sockets via the ancillary data mechanism in L{sendmsg}.
+
+ @param fd: C{int}
+
+ @return: a 2-tuple of (new file descriptor, description).
+
+ @rtype: 2-tuple of (C{int}, C{str})
+ """
+ data, flags, ancillary = recvmsg(socketfd)
+ [(cmsg_level, cmsg_type, packedFD)] = ancillary
+ # cmsg_level and cmsg_type really need to be SOL_SOCKET / SCM_RIGHTS, but
+ # since those are the *only* standard values, there's not much point in
+ # checking.
+ [unpackedFD] = unpack("i", packedFD)
+ return (unpackedFD, data)
Copied: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendfdport.py (from rev 5429, CalendarServer/branches/users/glyph/deployment-plus-sendfd/twistedcaldav/sendfdport.py)
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendfdport.py (rev 0)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendfdport.py 2010-03-31 18:03:58 UTC (rev 5430)
@@ -0,0 +1,303 @@
+# -*- test-case-name: twext.internet.test.test_sendfdport -*-
+##
+# Copyright (c) 2005-2009 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Implementation of a TCP/SSL port that uses sendmsg/recvmsg as implemented by
+L{twext.python.sendfd}.
+"""
+
+from os import close
+from errno import EAGAIN, ENOBUFS
+from socket import (socketpair, fromfd, error as SocketError,
+ AF_INET, AF_UNIX, SOCK_STREAM, SOCK_DGRAM)
+
+from twisted.python import log
+
+from twisted.internet.abstract import FileDescriptor
+from twisted.internet.protocol import Protocol, Factory
+
+from twistedcaldav.sendmsg import sendmsg, recvmsg
+from twistedcaldav.sendfd import sendfd, recvfd
+
+class InheritingProtocol(Protocol, object):
+ """
+ When a connection comes in on this protocol, stop reading and writing, and
+ dispatch the socket to another process via its factory.
+ """
+
+ def connectionMade(self):
+ """
+ A connection was received; transmit the file descriptor to another
+ process via L{InheritingProtocolFactory} and remove my transport from
+ the reactor.
+ """
+ self.transport.stopReading()
+ self.transport.stopWriting()
+ skt = self.transport.getHandle()
+ self.factory.sendSocket(skt)
+
+
+
+class InheritingProtocolFactory(Factory, object):
+ """
+ In the 'master' process, make one of these and hook it up to the sockets
+ where you want to hear stuff.
+
+ @ivar dispatcher: an L{InheritedSocketDispatcher} to use to dispatch
+ incoming connections to an appropriate subprocess.
+
+ @ivar description: the string to send along with connections received on
+ this factory.
+ """
+
+ protocol = InheritingProtocol
+
+ def __init__(self, dispatcher, description):
+ self.dispatcher = dispatcher
+ self.description = description
+
+
+ def sendSocket(self, socketObject):
+ """
+ Send the given socket object on to my dispatcher.
+ """
+ self.dispatcher.sendFileDescriptor(socketObject, self.description)
+
+
+
+class _SubprocessSocket(FileDescriptor, object):
+ """
+ A socket in the master process pointing at a file descriptor that can be
+ used to transmit sockets to a subprocess.
+
+ @ivar skt: the UNIX socket used as the sendmsg() transport.
+
+ @ivar outgoingSocketQueue: an outgoing queue of sockets to send to the
+ subprocess.
+
+ @ivar outgoingSocketQueue: a C{list} of 2-tuples of C{(socket-object, str)}
+
+ @ivar status: a record of the last status message received (via recvmsg)
+ from the subprocess: this is an application-specific indication of how
+ ready this subprocess is to receive more connections. A typical usage
+ would be to count the open connections: this is what is passed to
+
+ @type status: C{str}
+ """
+
+ def __init__(self, dispatcher, skt):
+ FileDescriptor.__init__(self, dispatcher.reactor)
+ self.status = None
+ self.dispatcher = dispatcher
+ self.skt = skt # XXX needs to be set non-blocking by somebody
+ self.fileno = skt.fileno
+ self.outgoingSocketQueue = []
+
+
+ def sendSocketToPeer(self, skt, description):
+ """
+ Enqueue a socket to send to the subprocess.
+ """
+ self.outgoingSocketQueue.append((skt, description))
+ self.startWriting()
+
+
+ def doRead(self):
+ """
+ Receive a status / health message and record it.
+ """
+ try:
+ data, flags, ancillary = recvmsg(self.skt.fileno())
+ except SocketError, se:
+ if se.errno not in (EAGAIN, ENOBUFS):
+ raise
+ else:
+ self.dispatcher.statusMessage(self, data)
+
+
+ def doWrite(self):
+ """
+ Transmit as many queued pending file descriptors as we can.
+ """
+ while self.outgoingSocketQueue:
+ skt, desc = self.outgoingSocketQueue.pop(0)
+ try:
+ sendfd(self.skt.fileno(), skt.fileno(), desc)
+ except SocketError, se:
+ if se.errno in (EAGAIN, ENOBUFS):
+ self.outgoingSocketQueue.insert(0, (skt, desc))
+ return
+ raise
+ else:
+ skt.close()
+ if not self.outgoingSocketQueue:
+ self.stopWriting()
+
+
+
+class InheritedSocketDispatcher(object):
+ """
+ Used by one or more L{InheritingProtocolFactory}s, this keeps track of a
+ list of available sockets in subprocesses and sends inbound connections
+ towards them.
+ """
+
+ def __init__(self, statusWatcher):
+ """
+ Create a socket dispatcher.
+ """
+ self._subprocessSockets = []
+ self.statusWatcher = statusWatcher
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+
+ @property
+ def statuses(self):
+ """
+ Yield the current status of all subprocess sockets.
+ """
+ for subsocket in self._subprocessSockets:
+ yield subsocket.status
+
+
+ def statusMessage(self, subsocket, message):
+ """
+ The status of a connection has changed; update all registered status
+ change listeners.
+ """
+ subsocket.status = self.statusWatcher.statusFromMessage(subsocket.status, message)
+
+
+ def sendFileDescriptor(self, skt, description):
+ """
+ A connection has been received. Dispatch it.
+
+ @param skt: a socket object
+
+ @param description: some text to identify to the subprocess's
+ L{InheritedPort} what type of transport to create for this socket.
+ """
+ self._subprocessSockets.sort(key=lambda conn: conn.status)
+ selectedSocket = self._subprocessSockets[0]
+ selectedSocket.sendSocketToPeer(skt, description)
+ # XXX Maybe want to send along 'description' or 'skt' or some
+ # properties thereof? -glyph
+ selectedSocket.status = self.statusWatcher.newConnectionStatus(selectedSocket.status)
+
+
+ def addSocket(self):
+ """
+ Add a C{sendmsg()}-oriented AF_UNIX socket to the pool of sockets being
+ used for transmitting file descriptors to child processes.
+
+ @return: a socket object for the receiving side; pass this object's
+ C{fileno()} as part of the C{childFDs} argument to
+ C{spawnProcess()}, then close it.
+ """
+ i, o = socketpair(AF_UNIX, SOCK_DGRAM)
+ a = _SubprocessSocket(self, o)
+ a.startReading()
+ self._subprocessSockets.append(a)
+ return i
+
+
+
+class InheritedPort(FileDescriptor, object):
+ """
+ Create this in the 'slave' process to handle incoming connections
+ dispatched via C{sendmsg}.
+ """
+
+ def __init__(self, fd, transportFactory, protocolFactory):
+ """
+ @param fd: a file descriptor
+
+ @type fd: C{int}
+
+ @param transportFactory: a 3-argument function that takes the socket
+ object produced from the file descriptor, the (non-ancillary) data
+ sent along with the incoming file descriptor, and the protocol
+ built along with it, and returns an L{ITransport} provider. Note
+ that this should NOT call C{makeConnection} on the protocol that it
+ produces, as this class will do that.
+
+ @param protocolFactory: an L{IProtocolFactory}
+ """
+ FileDescriptor.__init__(self)
+ self.fd = fd
+ self.transportFactory = transportFactory
+ self.protocolFactory = protocolFactory
+ self.statusQueue = []
+
+
+ def fileno(self):
+ """
+ Get the FD number for this socket.
+ """
+ return self.fd
+
+
+ def doRead(self):
+ """
+ A message is ready to read. Receive a file descriptor from our parent
+ process.
+ """
+ try:
+ fd, description = recvfd(self.fd)
+ except SocketError, se:
+ if se.errno != EAGAIN:
+ raise
+ else:
+ try:
+ skt = fromfd(fd, AF_INET, SOCK_STREAM)
+ # XXX it could be AF_UNIX, I guess? or even something else?
+ # should this be on the transportFactory's side of things?
+
+ close(fd) # fromfd() calls dup()
+ peeraddr = skt.getpeername()
+ protocol = self.protocolFactory.buildProtocol(peeraddr)
+ transport = self.transportFactory(skt, description, protocol)
+ protocol.makeConnection(transport)
+ except:
+ log.err()
+
+
+ def doWrite(self):
+ """
+ Write some data.
+ """
+ while self.statusQueue:
+ msg = self.statusQueue.pop(0)
+ try:
+ sendmsg(self.fd, msg, 0)
+ except SocketError, se:
+ if se.errno in (EAGAIN, ENOBUFS):
+ self.statusQueue.insert(0, msg)
+ return
+ raise
+ self.stopWriting()
+
+
+ def reportStatus(self, statusMessage):
+ """
+ Report a status message to the L{_SubprocessSocket} monitoring this
+ L{InheritedPort}'s health in the master process.
+ """
+ self.statusQueue.append(statusMessage)
+ self.startWriting()
+
Copied: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendmsg.c (from rev 5429, CalendarServer/branches/users/glyph/deployment-plus-sendfd/twistedcaldav/sendmsg.c)
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendmsg.c (rev 0)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/sendmsg.c 2010-03-31 18:03:58 UTC (rev 5430)
@@ -0,0 +1,341 @@
+/*
+ * Copyright (c) 2010 Apple Inc. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <Python.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <signal.h>
+
+PyObject *sendmsg_socket_error;
+
+static PyObject *sendmsg_sendmsg(PyObject *self, PyObject *args, PyObject *keywds);
+static PyObject *sendmsg_recvmsg(PyObject *self, PyObject *args, PyObject *keywds);
+
+static PyMethodDef sendmsg_methods[] = {
+ {"sendmsg", (PyCFunction) sendmsg_sendmsg, METH_VARARGS | METH_KEYWORDS,
+ NULL},
+ {"recvmsg", (PyCFunction) sendmsg_recvmsg, METH_VARARGS | METH_KEYWORDS,
+ NULL},
+ {NULL, NULL, 0, NULL}
+};
+
+
+PyMODINIT_FUNC initsendmsg(void) {
+ PyObject *module;
+
+ sendmsg_socket_error = NULL; /* Make sure that this has a known value
+ before doing anything that might exit. */
+
+ module = Py_InitModule("sendmsg", sendmsg_methods);
+
+ if (!module) {
+ return;
+ }
+
+ /*
+ The following is the only value mentioned by POSIX:
+ http://www.opengroup.org/onlinepubs/9699919799/basedefs/sys_socket.h.html
+ */
+
+ if (-1 == PyModule_AddIntConstant(module, "SCM_RIGHTS", SCM_RIGHTS)) {
+ return;
+ }
+
+
+ /* BSD, Darwin, Hurd */
+#if defined(SCM_CREDS)
+ if (-1 == PyModule_AddIntConstant(module, "SCM_CREDS", SCM_CREDS)) {
+ return;
+ }
+#endif
+
+ /* Linux */
+#if defined(SCM_CREDENTIALS)
+ if (-1 == PyModule_AddIntConstant(module, "SCM_CREDENTIALS", SCM_CREDENTIALS)) {
+ return;
+ }
+#endif
+
+ /* Apparently everywhere, but not standardized. */
+#if defined(SCM_TIMESTAMP)
+ if (-1 == PyModule_AddIntConstant(module, "SCM_TIMESTAMP", SCM_TIMESTAMP)) {
+ return;
+ }
+#endif
+
+ module = PyImport_ImportModule("socket");
+ if (!module) {
+ return;
+ }
+
+ sendmsg_socket_error = PyObject_GetAttrString(module, "error");
+ if (!sendmsg_socket_error) {
+ return;
+ }
+}
+
+static PyObject *sendmsg_sendmsg(PyObject *self, PyObject *args, PyObject *keywds) {
+
+ int fd;
+ int flags = 0;
+ int sendmsg_result;
+ struct msghdr message_header;
+ struct iovec iov[1];
+ PyObject *ancillary = NULL;
+ static char *kwlist[] = {"fd", "data", "flags", "ancillary", NULL};
+
+ if (!PyArg_ParseTupleAndKeywords(
+ args, keywds, "it#|iO:sendmsg", kwlist,
+ &fd,
+ &iov[0].iov_base,
+ &iov[0].iov_len,
+ &flags,
+ &ancillary)) {
+ return NULL;
+ }
+
+ message_header.msg_name = NULL;
+ message_header.msg_namelen = 0;
+
+ message_header.msg_iov = iov;
+ message_header.msg_iovlen = 1;
+
+ message_header.msg_control = NULL;
+ message_header.msg_controllen = 0;
+
+ message_header.msg_flags = 0;
+
+ if (ancillary) {
+
+ if (!PyList_Check(ancillary)) {
+ PyErr_Format(PyExc_TypeError,
+ "sendmsg argument 3 expected list, got %s",
+ ancillary->ob_type->tp_name);
+ return NULL;
+ }
+
+ PyObject *iterator = PyObject_GetIter(ancillary);
+ PyObject *item = NULL;
+
+ if (iterator == NULL) {
+ return NULL;
+ }
+
+ int all_data_len = 0;
+
+ /* First we need to know how big the buffer needs to be in order to
+ have enough space for all of the messages. */
+ while ( (item = PyIter_Next(iterator)) ) {
+ int data_len, type, level;
+ char *data;
+ if (!PyArg_ParseTuple(item, "iit#:sendmsg ancillary data (level, type, data)",
+ &level,
+ &type,
+ &data,
+ &data_len)) {
+ Py_DECREF(item);
+ Py_DECREF(iterator);
+ return NULL;
+ }
+ all_data_len += CMSG_SPACE(data_len);
+
+ Py_DECREF(item);
+ }
+
+ Py_DECREF(iterator);
+ iterator = NULL;
+
+ /* Allocate the buffer for all of the ancillary elements. */
+ message_header.msg_control = malloc(all_data_len);
+ if (!message_header.msg_control) {
+ PyErr_NoMemory();
+ return NULL;
+ }
+ message_header.msg_controllen = all_data_len;
+
+ iterator = PyObject_GetIter(ancillary); /* again */
+ item = NULL;
+
+ if (!iterator) {
+ free(message_header.msg_control);
+ return NULL;
+ }
+
+ /* Unpack the tuples into the control message. */
+ struct cmsghdr *control_message = CMSG_FIRSTHDR(&message_header);
+ while ( (item = PyIter_Next(iterator)) ) {
+ int data_len, type, level;
+ unsigned char *data, *cmsg_data;
+
+ if (!PyArg_ParseTuple(item,
+ "iit#:sendmsg ancillary data (level, type, data)",
+ &level,
+ &type,
+ &data,
+ &data_len)) {
+ Py_DECREF(item);
+ Py_DECREF(iterator);
+ free(message_header.msg_control);
+ return NULL;
+ }
+
+ control_message->cmsg_level = level;
+ control_message->cmsg_type = type;
+ control_message->cmsg_len = CMSG_LEN(data_len);
+
+ cmsg_data = CMSG_DATA(control_message);
+ memcpy(cmsg_data, data, data_len);
+
+ Py_DECREF(item);
+
+ control_message = CMSG_NXTHDR(&message_header, control_message);
+
+ /* We explicitly allocated enough space for all ancillary data
+ above; if there isn't enough room, all bets are off. */
+ assert(control_message);
+ }
+
+ Py_DECREF(iterator);
+
+ if (PyErr_Occurred()) {
+ free(message_header.msg_control);
+ return NULL;
+ }
+ }
+
+ sendmsg_result = sendmsg(fd, &message_header, flags);
+
+ if (sendmsg_result < 0) {
+ PyErr_SetFromErrno(sendmsg_socket_error);
+ if (message_header.msg_control) {
+ free(message_header.msg_control);
+ }
+ return NULL;
+ }
+
+ return Py_BuildValue("i", sendmsg_result);
+}
+
+static PyObject *sendmsg_recvmsg(PyObject *self, PyObject *args, PyObject *keywds) {
+ int fd = -1;
+ int flags = 0;
+ size_t maxsize = 8192;
+ size_t cmsg_size = 4*1024;
+ int recvmsg_result;
+ struct msghdr message_header;
+ struct cmsghdr *control_message;
+ struct iovec iov[1];
+ char *cmsgbuf;
+ PyObject *ancillary;
+ PyObject *final_result = NULL;
+
+ static char *kwlist[] = {"fd", "flags", "maxsize", "cmsg_size", NULL};
+
+ if (!PyArg_ParseTupleAndKeywords(args, keywds, "i|iii:recvmsg", kwlist,
+ &fd, &flags, &maxsize, &cmsg_size)) {
+ return NULL;
+ }
+
+ cmsg_size = CMSG_SPACE(cmsg_size);
+
+ message_header.msg_name = NULL;
+ message_header.msg_namelen = 0;
+
+ iov[0].iov_len = maxsize;
+ iov[0].iov_base = malloc(maxsize);
+
+ if (!iov[0].iov_base) {
+ PyErr_NoMemory();
+ return NULL;
+ }
+
+ message_header.msg_iov = iov;
+ message_header.msg_iovlen = 1;
+
+ cmsgbuf = malloc(cmsg_size);
+
+ if (!cmsgbuf) {
+ free(iov[0].iov_base);
+ PyErr_NoMemory();
+ return NULL;
+ }
+
+ memset(cmsgbuf, 0, cmsg_size);
+ message_header.msg_control = cmsgbuf;
+ message_header.msg_controllen = cmsg_size;
+
+ recvmsg_result = recvmsg(fd, &message_header, flags);
+ if (recvmsg_result < 0) {
+ PyErr_SetFromErrno(sendmsg_socket_error);
+ goto finished;
+ }
+
+ ancillary = PyList_New(0);
+ if (!ancillary) {
+ goto finished;
+ }
+
+ for (control_message = CMSG_FIRSTHDR(&message_header);
+ control_message;
+ control_message = CMSG_NXTHDR(&message_header,
+ control_message)) {
+ PyObject *entry;
+
+ /* Some platforms apparently always fill out the ancillary data
+ structure with a single bogus value if none is provided; ignore it,
+ if that is the case. */
+
+ if ((!(control_message->cmsg_level)) &&
+ (!(control_message->cmsg_type))) {
+ continue;
+ }
+
+ entry = Py_BuildValue(
+ "(iis#)",
+ control_message->cmsg_level,
+ control_message->cmsg_type,
+ CMSG_DATA(control_message),
+ control_message->cmsg_len - sizeof(struct cmsghdr));
+
+ if (!entry) {
+ Py_DECREF(ancillary);
+ goto finished;
+ }
+
+ if (PyList_Append(ancillary, entry) < 0) {
+ Py_DECREF(ancillary);
+ Py_DECREF(entry);
+ goto finished;
+ } else {
+ Py_DECREF(entry);
+ }
+ }
+
+ final_result = Py_BuildValue(
+ "s#iO",
+ iov[0].iov_base,
+ recvmsg_result,
+ message_header.msg_flags,
+ ancillary);
+
+ Py_DECREF(ancillary);
+
+ finished:
+ free(iov[0].iov_base);
+ free(cmsgbuf);
+ return final_result;
+}
+
Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/tap.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/tap.py 2010-03-31 17:55:47 UTC (rev 5429)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/tap.py 2010-03-31 18:03:58 UTC (rev 5430)
@@ -60,6 +60,8 @@
from twistedcaldav import memcachepool
from twistedcaldav.notify import installNotificationClient
+from twistedcaldav.metafd import ReportingHTTPService
+
log = Logger()
try:
@@ -504,6 +506,20 @@
calendarResourceClass = CalendarHomeProvisioningFile
timezoneServiceResourceClass = TimezoneServiceFile
+
+ def createContextFactory(self):
+ """
+ Create an SSL context factory for use with any SSL socket talking to
+ this server.
+ """
+ return ChainingOpenSSLContextFactory(
+ config.SSLPrivateKey,
+ config.SSLCertificate,
+ certificateChainFile=config.SSLAuthorityChain,
+ passwdCallback=_getSSLPassphrase
+ )
+
+
def makeService_Slave(self, options):
#
# Change default log level to "info" as its useful to have
@@ -742,12 +758,7 @@
fd = int(fd)
try:
- contextFactory = ChainingOpenSSLContextFactory(
- config.SSLPrivateKey,
- config.SSLCertificate,
- certificateChainFile=config.SSLAuthorityChain,
- passwdCallback=_getSSLPassphrase
- )
+ contextFactory = self.createContextFactory()
except SSL.Error, e:
log.error("Unable to set up SSL context factory: %s" % (e,))
else:
@@ -758,7 +769,16 @@
)
inheritedService.setServiceParent(service)
+ elif config.MetaFD:
+ # Inherit a single socket to receive accept()ed connections via
+ # recvmsg() and SCM_RIGHTS.
+
+ fd = int(config.MetaFD)
+ ReportingHTTPService(
+ site, fd, self.createContextFactory()
+ ).setServiceParent(service)
+
else: # Not inheriting, therefore open our own:
channel = HTTP503LoggingFactory(
@@ -800,12 +820,7 @@
log.info("Adding SSL server at %s:%s" % (bindAddress, port))
try:
- contextFactory = ChainingOpenSSLContextFactory(
- config.SSLPrivateKey,
- config.SSLCertificate,
- certificateChainFile=config.SSLAuthorityChain,
- passwdCallback=_getSSLPassphrase
- )
+ contextFactory = self.createContextFactory()
except SSL.Error, e:
log.error("Unable to set up SSL context factory: %s" % (e,))
log.error("Disabling SSL port: %s" % (port,))
Copied: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/pullpipe.py (from rev 5429, CalendarServer/branches/users/glyph/deployment-plus-sendfd/twistedcaldav/test/pullpipe.py)
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/pullpipe.py (rev 0)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/pullpipe.py 2010-03-31 18:03:58 UTC (rev 5430)
@@ -0,0 +1,26 @@
+#!/usr/bin/python
+# -*- test-case-name: twistedcaldav.test.test_sendmsg -*-
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+if __name__ == '__main__':
+ import sys, os
+ from twistedcaldav.sendfd import recvfd
+ fd, description = recvfd(int(sys.argv[1]))
+ os.write(fd, "Test fixture data: %s.\n" % (description,))
+ os.close(fd)
+
+
Copied: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/test_sendmsg.py (from rev 5429, CalendarServer/branches/users/glyph/deployment-plus-sendfd/twistedcaldav/test/test_sendmsg.py)
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/test_sendmsg.py (rev 0)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/test_sendmsg.py 2010-03-31 18:03:58 UTC (rev 5430)
@@ -0,0 +1,172 @@
+##
+# Copyright (c) 2010 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+import socket
+from os import pipe, read, close, environ
+from twisted.python.filepath import FilePath
+import sys
+
+from twisted.internet.defer import Deferred
+from twisted.internet.error import ProcessDone
+from twisted.trial.unittest import TestCase
+from twisted.internet.defer import inlineCallbacks
+from twisted.internet import reactor
+
+from twistedcaldav.sendmsg import sendmsg, recvmsg
+from twistedcaldav.sendfd import sendfd
+from twisted.internet.protocol import ProcessProtocol
+
+from twistedcaldav.test.test_tap import CleanupHelper
+
+class ExitedWithStderr(Exception):
+ """
+ A process exited with some stderr.
+ """
+
+ def __str__(self):
+ """
+ Dump the errors in a pretty way in the event of a subprocess traceback.
+ """
+ return '\n'.join([''] + list(self.args))
+
+
+class StartStopProcessProtocol(ProcessProtocol):
+ """
+ An L{IProcessProtocol} with a Deferred for events where the subprocess
+ starts and stops.
+ """
+
+ def __init__(self):
+ self.started = Deferred()
+ self.stopped = Deferred()
+ self.output = ''
+ self.errors = ''
+
+ def connectionMade(self):
+ self.started.callback(self.transport)
+
+ def outReceived(self, data):
+ self.output += data
+
+ def errReceived(self, data):
+ self.errors += data
+
+ def processEnded(self, reason):
+ if reason.check(ProcessDone):
+ self.stopped.callback(self.output)
+ else:
+ self.stopped.errback(ExitedWithStderr(
+ self.errors, self.output))
+
+
+
+def bootReactor():
+ """
+ Yield this from a trial test to bootstrap the reactor in order to avoid
+ PotentialZombieWarning, for tests that use subprocesses. This hack will no
+ longer be necessary in Twisted 10.1, since U{the underlying bug was fixed
+ <http://twistedmatrix.com/trac/ticket/2078>}.
+ """
+ d = Deferred()
+ reactor.callLater(0, d.callback, None)
+ return d
+
+
+
+class SendmsgTestCase(CleanupHelper, TestCase):
+ """
+ Tests for sendmsg extension module and associated file-descriptor sending
+ functionality in L{twext.python.sendfd}.
+ """
+
+ def setUp(self):
+ """
+ Create a pair of UNIX sockets.
+ """
+ CleanupHelper.setUp(self)
+ self.input, self.output = socket.socketpair(socket.AF_UNIX)
+ def closePair():
+ self.input.close()
+ self.output.close()
+
+ self.addCleanup(closePair)
+
+
+ def test_roundtrip(self):
+ """
+ L{recvmsg} will retrieve a message sent via L{sendmsg}.
+ """
+ sendmsg(self.input.fileno(), "hello, world!", 0)
+
+ result = recvmsg(fd=self.output.fileno())
+ self.assertEquals(result, ("hello, world!", 0, []))
+
+
+ def test_wrongTypeAncillary(self):
+ """
+ L{sendmsg} will show a helpful exception message when given the wrong
+ type of object for the 'ancillary' argument.
+ """
+ error = self.assertRaises(TypeError,
+ sendmsg, self.input.fileno(),
+ "hello, world!", 0, 4321)
+ self.assertEquals(str(error),
+ "sendmsg argument 3 expected list, got int")
+
+
+ def spawn(self, script):
+ """
+ Start a script that is a peer of this test as a subprocess.
+
+ @param script: the module name of the script in this directory (no
+ package prefix, no '.py')
+ @type script: C{str}
+
+ @rtype: L{StartStopProcessProtocol}
+ """
+ sspp = StartStopProcessProtocol()
+ reactor.spawnProcess(
+ sspp, sys.executable, [
+ sys.executable,
+ FilePath(__file__).sibling(script + ".py").path,
+ str(self.output.fileno()),
+ ],
+ environ,
+ childFDs={0: "w", 1: "r", 2: "r",
+ self.output.fileno(): self.output.fileno()}
+ )
+ return sspp
+
+
+ @inlineCallbacks
+ def test_sendSubProcessFD(self):
+ """
+ Calling L{sendsmsg} with SOL_SOCKET, SCM_RIGHTS, and a platform-endian
+ packed file descriptor number should send that file descriptor to a
+ different process, where it can be retrieved by using L{recvmsg}.
+ """
+ yield bootReactor()
+ sspp = self.spawn("pullpipe")
+ yield sspp.started
+ pipeOut, pipeIn = pipe()
+ self.addCleanup(lambda : close(pipeOut))
+ sendfd(self.input.fileno(), pipeIn, "blonk")
+ close(pipeIn)
+ yield sspp.stopped
+ self.assertEquals(read(pipeOut, 1024), "Test fixture data: blonk.\n")
+ # Make sure that the pipe is actually closed now.
+ self.assertEquals(read(pipeOut, 1024), "")
+
Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/test_tap.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/test_tap.py 2010-03-31 17:55:47 UTC (rev 5429)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/test/test_tap.py 2010-03-31 18:03:58 UTC (rev 5430)
@@ -15,14 +15,21 @@
##
import os
+import sys
from copy import deepcopy
-from twisted.trial import unittest
+from zope.interface import implements
+
from twistedcaldav.test.util import TestCase
from twisted.python.usage import Options, UsageError
from twisted.python.util import sibpath
from twisted.python.reflect import namedAny
+from twisted.python.filepath import FilePath
+
+from twisted.internet.interfaces import IProcessTransport, IReactorProcess
+from twisted.internet.defer import Deferred, inlineCallbacks
+
from twisted.application.service import IService
from twisted.application import internet
@@ -30,6 +37,9 @@
from twisted.web2.log import LogWrapperResource
from twistedcaldav.tap import CalDAVOptions, CalDAVServiceMaker
+
+from twistedcaldav.cluster import DelayedStartupProcessMonitor, TwistdSlaveProcess
+
from twistedcaldav import tap
from twistedcaldav.config import config
@@ -40,7 +50,68 @@
from twistedcaldav.directory.sudo import SudoDirectoryService
from twistedcaldav.directory.directory import UnknownRecordTypeError
+class NotAProcessTransport(object):
+ """
+ Simple L{IProcessTransport} stub.
+ """
+ implements(IProcessTransport)
+ def __init__(self, processProtocol, executable, args, env, path,
+ uid, gid, usePTY, childFDs):
+ """
+ Hold on to all the attributes passed to spawnProcess.
+ """
+ self.processProtocol = processProtocol
+ self.executable = executable
+ self.args = args
+ self.env = env
+ self.path = path
+ self.uid = uid
+ self.gid = gid
+ self.usePTY = usePTY
+ self.childFDs = childFDs
+
+
+class InMemoryProcessSpawner(object):
+ """
+ Stub out L{IReactorProcess.spawnProcess} so that we can examine the
+ interaction of L{DelayedStartupProcessMonitor} and the reactor.
+ """
+ implements(IReactorProcess)
+
+ def __init__(self):
+ """
+ Create some storage to hold on to all the fake processes spawned.
+ """
+ self.processTransports = []
+ self.waiting = []
+
+
+ def waitForOneProcess(self):
+ """
+ Return a L{Deferred} which will fire when spawnProcess has been
+ invoked, with the L{IProcessTransport}.
+ """
+ d = Deferred()
+ self.waiting.append(d)
+ return d
+
+
+ def spawnProcess(self, processProtocol, executable, args=(), env={},
+ path=None, uid=None, gid=None, usePTY=0,
+ childFDs=None):
+
+ transport = NotAProcessTransport(
+ processProtocol, executable, args, env, path, uid, gid, usePTY,
+ childFDs
+ )
+ self.processTransports.append(transport)
+ if self.waiting:
+ self.waiting.pop(0).callback(transport)
+ return transport
+
+
+
class TestCalDAVOptions(CalDAVOptions):
"""
A fake implementation of CalDAVOptions that provides
@@ -673,3 +744,162 @@
self.failUnless(isinstance(
realDirectory,
configuredDirectory))
+
+
+
+
+class DummyProcessObject(object):
+ """
+ Simple stub for the Process Object API that will run a test script.
+
+ This is a stand in for L{TwistdSlaveProcess}.
+ """
+
+ def __init__(self, scriptname, *args):
+ self.scriptname = scriptname
+ self.args = list(args)
+
+
+ def getCommandLine(self):
+ """
+ Get the command line to invoke this script.
+ """
+ return [sys.executable,
+ FilePath(__file__).sibling(self.scriptname).path] + self.args
+
+
+ def getFileDescriptors(self):
+ """
+ Return a dummy, empty mapping of file descriptors.
+ """
+ return {}
+
+
+ def getName(self):
+ """
+ Get a dummy name.
+ """
+ return 'Dummy'
+
+
+class CleanupHelper(object):
+ """
+ Emulate a more recent version of Twisted by providing addCleanup.
+ """
+
+ def setUp(self):
+ """
+ Emulate a more recent version of Twisted; initialize a list of
+ callables to run during cleanup.
+ """
+ self.cleanups = []
+
+
+ def addCleanup(self, thunk):
+ """
+ Emulate a more recent version of Trial; add a method to be run during
+ teardown.
+ """
+ self.cleanups.append(thunk)
+
+
+ def tearDown(self):
+ """
+ Emulate a more recent version of Trial; run all registered cleanup
+ functions.
+ """
+ from twisted.python import log
+ for cleanup in self.cleanups:
+ try:
+ cleanup()
+ except:
+ log.err()
+
+
+
+
+class DelayedStartupProcessMonitorTests(CleanupHelper, TestCase):
+ """
+ Test cases for L{DelayedStartupProcessMonitor}.
+ """
+
+ @inlineCallbacks
+ def test_acceptDescriptorInheritance(self):
+ """
+ If a L{TwistdSlaveProcess} specifies some file descriptors to be
+ inherited, they should be inherited by the subprocess.
+ """
+ dspm = DelayedStartupProcessMonitor()
+ dspm.reactor = InMemoryProcessSpawner()
+
+ # Most arguments here will be ignored, so these are bogus values.
+ slave = TwistdSlaveProcess(
+ twistd = "bleh",
+ tapname = "caldav",
+ configFile = "/does/not/exist",
+ id = 10,
+ interfaces = '127.0.0.1',
+ port = None,
+ sslPort = None,
+ inheritFDs = [3, 7],
+ inheritSSLFDs = [19, 25],
+ )
+
+ dspm.addProcessObject(slave, {})
+ dspm.startService()
+ self.addCleanup(dspm.consistency.cancel)
+ # We can easily stub out spawnProcess, because caldav calls it, but a
+ # bunch of callLater calls are buried in procmon itself, so we need to
+ # use the real clock.
+ oneProcessTransport = yield dspm.reactor.waitForOneProcess()
+ self.assertEquals(oneProcessTransport.childFDs,
+ {0: 'w', 1: 'r', 2: 'r',
+ 3: 3, 7: 7,
+ 19: 19, 25: 25})
+ @inlineCallbacks
+ def test_metaDescriptorInheritance(self):
+ """
+ If a L{TwistdSlaveProcess} specifies a meta-file-descriptor to be
+ inherited, it should be inherited by the subprocess, and a
+ configuration argument should be passed that indicates to the
+ subprocess.
+ """
+ dspm = DelayedStartupProcessMonitor()
+ dspm.reactor = InMemoryProcessSpawner()
+ class FakeFD:
+ def __init__(self, n):
+ self.fd = n
+ def fileno(self):
+ return self.fd
+
+ class FakeDispatcher:
+ n = 3
+ def addSocket(self):
+ self.n += 1
+ return FakeFD(self.n)
+
+ # Most arguments here will be ignored, so these are bogus values.
+ slave = TwistdSlaveProcess(
+ twistd = "bleh",
+ tapname = "caldav",
+ configFile = "/does/not/exist",
+ id = 10,
+ port = None,
+ sslPort = None,
+ interfaces = '127.0.0.1',
+ dispatcher = FakeDispatcher()
+ )
+
+ dspm.addProcessObject(slave, {})
+ dspm.startService()
+ self.addCleanup(dspm.consistency.cancel)
+ oneProcessTransport = yield dspm.reactor.waitForOneProcess()
+ self.assertIn("MetaFD=4", oneProcessTransport.args)
+ self.assertEquals(
+ oneProcessTransport.args[oneProcessTransport.args.index("MetaFD=4")-1],
+ '-o',
+ "MetaFD argument was not passed as an option"
+ )
+ self.assertEquals(oneProcessTransport.childFDs,
+ {0: 'w', 1: 'r', 2: 'r',
+ 4: 4})
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20100331/e2361a29/attachment-0001.html>
More information about the calendarserver-changes
mailing list