[CalendarServer-changes] [12003] CalendarServer/trunk/twext/application/masterchild.py
source_changes at macosforge.org
source_changes at macosforge.org
Wed Mar 12 11:16:18 PDT 2014
Revision: 12003
http://trac.calendarserver.org//changeset/12003
Author: wsanchez at apple.com
Date: 2013-11-22 20:36:06 -0800 (Fri, 22 Nov 2013)
Log Message:
-----------
Work on the master
Modified Paths:
--------------
CalendarServer/trunk/twext/application/masterchild.py
Modified: CalendarServer/trunk/twext/application/masterchild.py
===================================================================
--- CalendarServer/trunk/twext/application/masterchild.py 2013-11-23 02:27:34 UTC (rev 12002)
+++ CalendarServer/trunk/twext/application/masterchild.py 2013-11-23 04:36:06 UTC (rev 12003)
@@ -30,9 +30,10 @@
from twisted.application.service import IServiceMaker
from twisted.application.internet import TCPServer
from twisted.protocols.policies import WrappingFactory, ProtocolWrapper
-
from twisted.internet.protocol import ServerFactory
from twisted.internet.protocol import ProcessProtocol
+
+from twext.python.log import Logger
from twext.internet.sendfdport import InheritingProtocolFactory
from twext.internet.sendfdport import InheritedSocketDispatcher
from twext.internet.sendfdport import IStatusWatcher
@@ -40,22 +41,34 @@
-class MasterOptions (Options):
- optParameters = [[
- #"config", "f", DEFAULT_CONFIG_FILE, "Path to configuration file."
- ]]
+class MasterOptions(Options):
+ """
+ Options for a master process.
+ """
- # def __init__(self, *args, **kwargs):
- # super(Options, self).__init__(*args, **kwargs)
+class SpawningInheritingProtocolFactory(InheritingProtocolFactory):
+ def __init__(self, dispatcher, spawningService, description):
+ super(SpawningInheritingProtocolFactory, self).__init__(
+ dispatcher, description
+ )
+ self.spawningService = spawningService
+
+ def sendSocket(self, socketObject):
+ self.spawningService.socketWillArrive()
+ super(SpawningInheritingProtocolFactory, self).sendSocket(socketObject)
+
+
+
@implementer(IServiceMaker)
class MasterServiceMaker(object):
def makeService(self, options):
service = MultiService()
port = 8000
+ childProtocol = "twext.protocols.echo.EchoProtocol"
# Dispatcher
statusWatcher = StatusWatcher()
@@ -66,8 +79,10 @@
spawningService.setServiceParent(service)
# TCP Service
- description = b"" # UserInfo sent to the dispatcher
- tcpFactory = InheritingProtocolFactory(dispatcher, description)
+ description = bytes(childProtocol) # UserInfo sent to the dispatcher
+ tcpFactory = SpawningInheritingProtocolFactory(
+ dispatcher, spawningService, description
+ )
tcpService = TCPServer(port, tcpFactory)
tcpService.setServiceParent(service)
@@ -75,22 +90,22 @@
return service
-#########
-# Subclass InheritingProtocolFactory
-# override sendSocket to decide when to spawn a child
+Child = namedtuple("Child", ("transport", "protocol"))
-# @implementer(IServiceMaker)
-# class ChildSpawningServiceMaker(object):
-# def makeService(self, options):
-# service = ChildSpawningService(args)
-# return service
+class ChildSpawningService(Service, object):
+ log = Logger()
-class ChildSpawningService(Service, object):
- def __init__(self, dispatcher, maxProcessCount=8):
+ def __init__(self, dispatcher, protocolName, maxProcessCount=8):
+ """
+ @param protocol: The name of the protocol for the child to use
+ to handle connections.
+ @type protocol: L{str} naming an L{IProtocol} implementer.
+ """
self.dispatcher = dispatcher
+ self.protocolName = protocolName
self.maxProcessCount = maxProcessCount
@@ -104,7 +119,20 @@
del(self.children)
- def spawnChild(self, arguments):
+ def socketWillArrive(self):
+ """
+ This method is where this service makes sure that there are
+ sufficient child processes available to handle additional
+ connections.
+ """
+ if len(self.children) == 0:
+ self.spawnChild()
+
+
+ def spawnChild(self):
+ """
+ Spawn a child process to handle connections.
+ """
from twisted.internet import reactor
inheritedSocket = self.dispatcher.addSocket()
@@ -112,45 +140,86 @@
processProtocol = ChildProcessProtocol(self, inheritedSocket)
- reactor.spawnProcess(
+ arguments = (
+ sys.executable, b"-c",
+ b"from twisted.scripts.twistd import run; run()",
+ b"--inherited-fd", b"3",
+ b"--protocol", self.protocolName,
+ )
+
+ transport = reactor.spawnProcess(
processProtocol,
- sys.executable,
- args=(
- sys.executable, b"-c",
- b"from twisted.scripts.twistd import run; run()",
- b"--inherited-fd", b"3",
- ) + tuple(arguments),
- env={},
+ sys.executable, arguments, env={},
childFDs={0: b"w", 1: b"r", 2: b"r", 3: inheritedFD}
)
- self.children.add(processProtocol)
+ child = Child(transport, processProtocol)
+ self.log.info(
+ u"Spawned child process ({child.transport.pid}) "
+ u"for protocol {protocol!r}: {arguments}",
+ child=child, protocol=self.protocolName, arguments=arguments,
+ )
- def childDidExit(self, child):
- self.children.remove(child)
- self.dispatcher.removeSocket(child.inheritedSocket)
+ self.children.add(child)
+ def childDidExit(self, processProtocol, reason):
+ """
+ Called by L{ChildProcessProtocol} to alert this service that a
+ child process has exited.
+ @param processProtocol: The processProtocol for the child that
+ exited.
+ @type processProtocol: L{ChildProcessProtocol}
+
+ @param reason: The reason that the child exited.
+ @type reason: L{Failure}
+ """
+ for child in self.children:
+ if child.protocol == processProtocol:
+ self.log.info(
+ u"Child process ({child.transport.pid}) exited: "
+ u"{reason}",
+ child=child, reason=reason,
+ )
+ self.children.remove(child)
+ break
+ else:
+ self.log.error(
+ u"No child for for process protocol",
+ processProtocol=processProtocol
+ )
+
+ try:
+ self.dispatcher.removeSocket(processProtocol.inheritedSocket)
+ except ValueError:
+ self.log.error(
+ u"No socket found for process protocol",
+ processProtocol=processProtocol
+ )
+
+
+
class ChildProcessProtocol(ProcessProtocol, object):
+ log = Logger()
+
def __init__(self, service, inheritedSocket):
self.service = service
self.inheritedSocket = inheritedSocket
def outReceived(self, data):
- super(ChildProcessProtocol, self).outReceived(data)
- # FIXME: log...
+ self.log.info(u"{data}", data=data)
def errReceived(self, data):
super(ChildProcessProtocol, self).errReceived(data)
- # FIXME: log...
+ self.log.error(u"{data}", data=data)
def processExited(self, reason):
- self.service.childDidExit(self)
+ self.service.childDidExit(self, reason)
@@ -262,6 +331,7 @@
def initialStatus():
return Status(sentCount=0, ackedCount=0)
+
@staticmethod
def newConnectionStatus(previousStatus):
return Status(
@@ -269,6 +339,7 @@
ackedCount=previousStatus.ackedCount,
)
+
@staticmethod
def statusFromMessage(previousStatus, message):
if message == b"-":
@@ -284,6 +355,7 @@
else:
raise AssertionError("Unknown message: {}".format(message))
+
@staticmethod
def closeCountFromStatus(previousStatus):
return (
@@ -296,7 +368,4 @@
-Status = namedtuple(
- "Status",
- ("sentCount", "ackedCount")
-)
+Status = namedtuple("Status", ("sentCount", "ackedCount"))
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140312/bdc62e4d/attachment.html>
More information about the calendarserver-changes
mailing list