[CalendarServer-changes] [12037] CalendarServer/trunk/twext/application/masterchild.py

source_changes at macosforge.org source_changes at macosforge.org
Wed Mar 12 11:16:42 PDT 2014


Revision: 12037
          http://trac.calendarserver.org//changeset/12037
Author:   wsanchez at apple.com
Date:     2013-12-04 19:55:04 -0800 (Wed, 04 Dec 2013)
Log Message:
-----------
Echo works.
Reporting doesn't.

Modified Paths:
--------------
    CalendarServer/trunk/twext/application/masterchild.py

Modified: CalendarServer/trunk/twext/application/masterchild.py
===================================================================
--- CalendarServer/trunk/twext/application/masterchild.py	2013-12-05 02:50:05 UTC (rev 12036)
+++ CalendarServer/trunk/twext/application/masterchild.py	2013-12-05 03:55:04 UTC (rev 12037)
@@ -20,6 +20,12 @@
 child processes.
 """
 
+# python -c 'from twisted.scripts.twistd import run; run()' \
+#   -n -l - master --protocol=twext.protocols.echo.EchoProtocol --port=8080
+
+from __future__ import print_function
+
+
 __all__ = [
     "MasterOptions",
     "MasterServiceMaker",
@@ -29,6 +35,8 @@
 
 
 import sys
+from os import close, unlink
+from tempfile import mkstemp
 
 from zope.interface import implementer
 
@@ -48,8 +56,10 @@
 from twext.internet.sendfdport import IStatusWatcher
 from twext.internet.sendfdport import InheritedPort
 
+log = Logger()
 
 
+
 class MasterOptions(Options):
     """
     Options for a master process.
@@ -60,11 +70,11 @@
         Protocol
         """
         try:
-            protocol = namedClass(value)
+            namedClass(value)
         except (ValueError, AttributeError):
             raise UsageError("Unknown protocol: {0}".format(value))
 
-        self["protocol"] = protocol
+        self["protocol"] = value
 
 
     def opt_port(self, value):
@@ -122,9 +132,6 @@
     def makeService(self, options):
         service = MultiService()
 
-        port = options["port"]
-        childProtocol = options["protocol"]
-
         # Dispatcher
         statusWatcher = StatusWatcher()
         dispatcher = InheritedSocketDispatcher(statusWatcher)
@@ -135,9 +142,9 @@
 
         # TCP Service
         tcpFactory = SpawningInheritingProtocolFactory(
-            dispatcher, spawningService, childProtocol
+            dispatcher, spawningService, options["protocol"]
         )
-        tcpService = TCPServer(port, tcpFactory)
+        tcpService = TCPServer(options["port"], tcpFactory)
 
         tcpService.setServiceParent(service)
 
@@ -207,16 +214,27 @@
 
         processProtocol = ChildProcessProtocol(self, inheritedSocket)
 
+        # Annoyingly, twistd *has* to make a pid file.
+        pidFileFD, pidFileName = mkstemp()
+        close(pidFileFD)
+        unlink(pidFileName)
+
         arguments = (
             sys.executable, b"-c",
             b"from twisted.scripts.twistd import run; run()",
-            b"--pidfile", b"/dev/null",
-            b"--logfile", b"-",
+            b"--pidfile", pidFileName,
+            b"--nodaemon", b"--logfile", b"-",
             self.pluginName,
             b"--inherited-fd=3",
             b"--protocol", protocolName,
         )
 
+        self.log.debug(
+            u"Spawning child process for protocol {protocol!r} "
+            u"with arguments: {arguments}",
+            protocol=protocolName, arguments=arguments,
+        )
+
         transport = reactor.spawnProcess(
             processProtocol,
             sys.executable, arguments, env={
@@ -228,8 +246,8 @@
         child = ChildProcess(transport, processProtocol)
 
         self.log.info(
-            u"Spawned child process ({child.transport.pid}) "
-            u"for protocol {protocol!r}: {arguments}",
+            u"Spawned child process #{child.transport.pid} "
+            u"for protocol {protocol!r}",
             child=child, protocol=protocolName, arguments=arguments,
         )
 
@@ -369,13 +387,23 @@
 
 
     def startService(self):
-        self.wrappedProtocolFactory = ReportingWrapperFactory(
+        factory = ReportingWrapperFactory(
             self.protocolFactory, self.fd, self.createTransport
         )
+        self.wrappedProtocolFactory = factory
+
+        factory.inheritedPort.startReading()
+        factory.inheritedPort.reportStatus("0")
+
         return super(ChildService, self).startService()
 
 
     def stopService(self):
+        factory = self.wrappedProtocolFactory
+
+        factory.inheritedPort.stopReading()
+        factory.allConnectionsClosed()
+
         return super(ChildService, self).stopService()
 
 
@@ -385,17 +413,23 @@
         """
         from twisted.internet import reactor
 
-        self.wrappedFactory.inheritedPort.reportStatus("+")
+        factory = self.wrappedProtocolFactory
+        factory.inheritedPort.reportStatus("+")
+        log.info("{factory.inheritedPort.statusQueue}", factory=factory)
 
         socketFD = socket.fileno()
-        return reactor.adoptStreamConnection(
-            socketFD, getsockfam(socketFD), self.wrappedProtocolFactory
+        transport = reactor.adoptStreamConnection(
+            socketFD, getsockfam(socketFD), factory
         )
+        transport.startReading()
 
+        return transport
 
 
+
 class ReportingProtocolWrapper(ProtocolWrapper, object):
     def connectionLost(self, reason):
+        log.info("CONNECTION LOST")
         self.factory.inheritedPort.reportStatus("-")
         return super(ReportingProtocolWrapper, self).connectionLost(reason)
 
@@ -410,19 +444,31 @@
 
 
 
+class Status(object):
+    def __init__(self, sentCount, ackedCount):
+        self.sentCount = sentCount
+        self.ackedCount = ackedCount
+
+    def __repr__(self):
+        return "({self.sentCount},{self.ackedCount})".format(self=self)
+
+
+
 @implementer(IStatusWatcher)
 class StatusWatcher(object):
     """
-    This enabled the dispatcher to keep track of how many connections are in
+    This enables the dispatcher to keep track of how many connections are in
     flight for each child.
     """
     @staticmethod
     def initialStatus():
+        log.info("Status: init")
         return Status(sentCount=0, ackedCount=0)
 
 
     @staticmethod
     def newConnectionStatus(previousStatus):
+        log.info("Status: {0} new".format(previousStatus))
         return Status(
             sentCount=previousStatus.sentCount + 1,
             ackedCount=previousStatus.ackedCount,
@@ -431,6 +477,7 @@
 
     @staticmethod
     def statusFromMessage(previousStatus, message):
+        log.info("Status: {0}{1!r}".format(previousStatus, message))
         if message == b"-":
             return Status(
                 sentCount=previousStatus.sentCount - 1,
@@ -447,6 +494,7 @@
 
     @staticmethod
     def closeCountFromStatus(previousStatus):
+        log.info("Status: {0} close".format(previousStatus))
         return (
             previousStatus.ackedCount,
             Status(
@@ -458,13 +506,7 @@
 
     @staticmethod
     def statusesChanged(statuses):
-        # FIXME: THis isn't in IStatusWatcher, but is called by
+        log.info("Status changed: {0}".format(tuple(statuses)))
+        # FIXME: This isn't in IStatusWatcher, but is called by
         # InheritedSocketDispatcher.
         pass
-
-
-
-class Status(object):
-    def __init__(self, sentCount, ackedCount):
-        self.sentCount = sentCount
-        self.ackedCount = ackedCount
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140312/42d1bddd/attachment.html>


More information about the calendarserver-changes mailing list