[CalendarServer-changes] [12003] CalendarServer/trunk/twext/application/masterchild.py

source_changes at macosforge.org source_changes at macosforge.org
Wed Mar 12 11:16:18 PDT 2014


Revision: 12003
          http://trac.calendarserver.org//changeset/12003
Author:   wsanchez at apple.com
Date:     2013-11-22 20:36:06 -0800 (Fri, 22 Nov 2013)
Log Message:
-----------
Work on the master

Modified Paths:
--------------
    CalendarServer/trunk/twext/application/masterchild.py

Modified: CalendarServer/trunk/twext/application/masterchild.py
===================================================================
--- CalendarServer/trunk/twext/application/masterchild.py	2013-11-23 02:27:34 UTC (rev 12002)
+++ CalendarServer/trunk/twext/application/masterchild.py	2013-11-23 04:36:06 UTC (rev 12003)
@@ -30,9 +30,10 @@
 from twisted.application.service import IServiceMaker
 from twisted.application.internet import TCPServer
 from twisted.protocols.policies import WrappingFactory, ProtocolWrapper
-
 from twisted.internet.protocol import ServerFactory
 from twisted.internet.protocol import ProcessProtocol
+
+from twext.python.log import Logger
 from twext.internet.sendfdport import InheritingProtocolFactory
 from twext.internet.sendfdport import InheritedSocketDispatcher
 from twext.internet.sendfdport import IStatusWatcher
@@ -40,22 +41,34 @@
 
 
 
-class MasterOptions (Options):
-    optParameters = [[
-        #"config", "f", DEFAULT_CONFIG_FILE, "Path to configuration file."
-    ]]
+class MasterOptions(Options):
+    """
+    Options for a master process.
+    """
 
-    # def __init__(self, *args, **kwargs):
-    #     super(Options, self).__init__(*args, **kwargs)
 
 
+class SpawningInheritingProtocolFactory(InheritingProtocolFactory):
+    def __init__(self, dispatcher, spawningService, description):
+        super(SpawningInheritingProtocolFactory, self).__init__(
+            dispatcher, description
+        )
+        self.spawningService = spawningService
 
+
+    def sendSocket(self, socketObject):
+        self.spawningService.socketWillArrive()
+        super(SpawningInheritingProtocolFactory, self).sendSocket(socketObject)
+
+
+
 @implementer(IServiceMaker)
 class MasterServiceMaker(object):
     def makeService(self, options):
         service = MultiService()
 
         port = 8000
+        childProtocol = "twext.protocols.echo.EchoProtocol"
 
         # Dispatcher
         statusWatcher = StatusWatcher()
@@ -66,8 +79,10 @@
         spawningService.setServiceParent(service)
 
         # TCP Service
-        description = b""  # UserInfo sent to the dispatcher
-        tcpFactory = InheritingProtocolFactory(dispatcher, description)
+        description = bytes(childProtocol)  # UserInfo sent to the dispatcher
+        tcpFactory = SpawningInheritingProtocolFactory(
+            dispatcher, spawningService, description
+        )
         tcpService = TCPServer(port, tcpFactory)
 
         tcpService.setServiceParent(service)
@@ -75,22 +90,22 @@
         return service
 
 
-#########
-# Subclass InheritingProtocolFactory
-# override sendSocket to decide when to spawn a child
 
+Child = namedtuple("Child", ("transport", "protocol"))
 
-# @implementer(IServiceMaker)
-# class ChildSpawningServiceMaker(object):
-#     def makeService(self, options):
-#         service = ChildSpawningService(args)
-#         return service
 
 
+class ChildSpawningService(Service, object):
+    log = Logger()
 
-class ChildSpawningService(Service, object):
-    def __init__(self, dispatcher, maxProcessCount=8):
+    def __init__(self, dispatcher, protocolName, maxProcessCount=8):
+        """
+        @param protocol: The name of the protocol for the child to use
+            to handle connections.
+        @type protocol: L{str} naming an L{IProtocol} implementer.
+        """
         self.dispatcher = dispatcher
+        self.protocolName = protocolName
         self.maxProcessCount = maxProcessCount
 
 
@@ -104,7 +119,20 @@
         del(self.children)
 
 
-    def spawnChild(self, arguments):
+    def socketWillArrive(self):
+        """
+        This method is where this service makes sure that there are
+        sufficient child processes available to handle additional
+        connections.
+        """
+        if len(self.children) == 0:
+            self.spawnChild()
+
+
+    def spawnChild(self):
+        """
+        Spawn a child process to handle connections.
+        """
         from twisted.internet import reactor
 
         inheritedSocket = self.dispatcher.addSocket()
@@ -112,45 +140,86 @@
 
         processProtocol = ChildProcessProtocol(self, inheritedSocket)
 
-        reactor.spawnProcess(
+        arguments = (
+            sys.executable, b"-c",
+            b"from twisted.scripts.twistd import run; run()",
+            b"--inherited-fd", b"3",
+            b"--protocol", self.protocolName,
+        )
+
+        transport = reactor.spawnProcess(
             processProtocol,
-            sys.executable,
-            args=(
-                sys.executable, b"-c",
-                b"from twisted.scripts.twistd import run; run()",
-                b"--inherited-fd", b"3",
-            ) + tuple(arguments),
-            env={},
+            sys.executable, arguments, env={},
             childFDs={0: b"w", 1: b"r", 2: b"r", 3: inheritedFD}
         )
 
-        self.children.add(processProtocol)
+        child = Child(transport, processProtocol)
 
+        self.log.info(
+            u"Spawned child process ({child.transport.pid}) "
+            u"for protocol {protocol!r}: {arguments}",
+            child=child, protocol=self.protocolName, arguments=arguments,
+        )
 
-    def childDidExit(self, child):
-        self.children.remove(child)
-        self.dispatcher.removeSocket(child.inheritedSocket)
+        self.children.add(child)
 
 
+    def childDidExit(self, processProtocol, reason):
+        """
+        Called by L{ChildProcessProtocol} to alert this service that a
+        child process has exited.
 
+        @param processProtocol: The processProtocol for the child that
+            exited.
+        @type processProtocol: L{ChildProcessProtocol}
+
+        @param reason: The reason that the child exited.
+        @type reason: L{Failure}
+        """
+        for child in self.children:
+            if child.protocol == processProtocol:
+                self.log.info(
+                    u"Child process ({child.transport.pid}) exited: "
+                    u"{reason}",
+                    child=child, reason=reason,
+                )
+                self.children.remove(child)
+                break
+        else:
+            self.log.error(
+                u"No child for for process protocol",
+                processProtocol=processProtocol
+            )
+
+        try:
+            self.dispatcher.removeSocket(processProtocol.inheritedSocket)
+        except ValueError:
+            self.log.error(
+                u"No socket found for process protocol",
+                processProtocol=processProtocol
+            )
+
+
+
 class ChildProcessProtocol(ProcessProtocol, object):
+    log = Logger()
+
     def __init__(self, service, inheritedSocket):
         self.service = service
         self.inheritedSocket = inheritedSocket
 
 
     def outReceived(self, data):
-        super(ChildProcessProtocol, self).outReceived(data)
-        # FIXME: log...
+        self.log.info(u"{data}", data=data)
 
 
     def errReceived(self, data):
         super(ChildProcessProtocol, self).errReceived(data)
-        # FIXME: log...
+        self.log.error(u"{data}", data=data)
 
 
     def processExited(self, reason):
-        self.service.childDidExit(self)
+        self.service.childDidExit(self, reason)
 
 
 
@@ -262,6 +331,7 @@
     def initialStatus():
         return Status(sentCount=0, ackedCount=0)
 
+
     @staticmethod
     def newConnectionStatus(previousStatus):
         return Status(
@@ -269,6 +339,7 @@
             ackedCount=previousStatus.ackedCount,
         )
 
+
     @staticmethod
     def statusFromMessage(previousStatus, message):
         if message == b"-":
@@ -284,6 +355,7 @@
         else:
             raise AssertionError("Unknown message: {}".format(message))
 
+
     @staticmethod
     def closeCountFromStatus(previousStatus):
         return (
@@ -296,7 +368,4 @@
 
 
 
-Status = namedtuple(
-    "Status",
-    ("sentCount", "ackedCount")
-)
+Status = namedtuple("Status", ("sentCount", "ackedCount"))
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140312/bdc62e4d/attachment.html>


More information about the calendarserver-changes mailing list