[CalendarServer-changes] [12043] CalendarServer/trunk/twext/application/masterchild.py

source_changes at macosforge.org source_changes at macosforge.org
Wed Mar 12 11:19:45 PDT 2014


Revision: 12043
          http://trac.calendarserver.org//changeset/12043
Author:   wsanchez at apple.com
Date:     2013-12-09 14:01:25 -0800 (Mon, 09 Dec 2013)
Log Message:
-----------
Change args to allow for multiple protocols in the future.
Define a MasterService.

Modified Paths:
--------------
    CalendarServer/trunk/twext/application/masterchild.py

Modified: CalendarServer/trunk/twext/application/masterchild.py
===================================================================
--- CalendarServer/trunk/twext/application/masterchild.py	2013-12-07 05:24:08 UTC (rev 12042)
+++ CalendarServer/trunk/twext/application/masterchild.py	2013-12-09 22:01:25 UTC (rev 12043)
@@ -47,6 +47,7 @@
 from twisted.application.service import IServiceMaker
 from twisted.application.internet import TCPServer
 from twisted.protocols.policies import WrappingFactory, ProtocolWrapper
+from twisted.internet.protocol import Protocol
 from twisted.internet.protocol import ServerFactory
 from twisted.internet.protocol import ProcessProtocol
 
@@ -67,23 +68,32 @@
 
     def opt_protocol(self, value):
         """
-        Protocol
+        Protocol and port (specify as proto:port).
         """
         try:
-            namedClass(value)
+            protocol, port = value.split(":")
+        except ValueError:
+            if ":" in value:
+                raise UsageError("Invalid protocol argument.")
+            else:
+                raise UsageError("Port is required in protocol argument.")
+
+        # Validate protocol name
+        try:
+            protocolClass = namedClass(protocol)
         except (ValueError, AttributeError):
-            raise UsageError("Unknown protocol: {0}".format(value))
+            raise UsageError("Unknown protocol: {0}".format(protocol))
 
-        self["protocol"] = value
+        try:
+            if not issubclass(protocolClass, Protocol):
+                raise TypeError()
+        except TypeError:
+            raise UsageError("Not a protocol: {0}".format(protocol))
 
-
-    def opt_port(self, value):
-        """
-        Inherited file descriptor
-        """
+        # Validate port number
         try:
             try:
-                port = int(value)
+                port = int(port)
             except ValueError:
                 raise ValueError("not an integer")
 
@@ -92,14 +102,31 @@
 
         except ValueError as e:
             raise UsageError(
-                "Invalid port number {0!r}: {1}".format(value, e)
+                "Invalid port number {0}: {1}".format(port, e)
             )
 
-        self["port"] = port
+        protocols = self.setdefault("protocol", [])
 
+        for (otherProtocol, otherPort) in protocols:
+            # FIXME: Raise here because we don't properly handle multiple
+            # protocols yet.
+            raise UsageError("Only one protocol may be specified.")
 
+            if otherPort == port:
+                if otherProtocol == protocol:
+                    return
+
+                raise UsageError(
+                    "Port {0} cannot be registered more than once "
+                    "for different protocols: ({1}, {2})",
+                    otherProtocol, protocol
+                )
+
+        protocols.append((protocol, port))
+
+
     def postOptions(self):
-        for parameter in ("protocol", "port"):
+        for parameter in ("protocol",):
             if parameter not in self:
                 raise UsageError("{0} parameter is required".format(parameter))
 
@@ -123,31 +150,119 @@
 
 
 
- at implementer(IServiceMaker)
-class MasterServiceMaker(object):
+ at implementer(IStatusWatcher)
+class MasterService(MultiService, object):
     """
-    Master process service maker.
+    Service for master processes.
     """
 
-    def makeService(self, options):
-        service = MultiService()
+    log = Logger()
 
+
+    def __init__(self):
+        MultiService.__init__(self)
+
         # Dispatcher
-        statusWatcher = StatusWatcher()
-        dispatcher = InheritedSocketDispatcher(statusWatcher)
+        self.dispatcher = InheritedSocketDispatcher(self)
 
         # Child Processes
-        spawningService = ChildSpawningService(dispatcher)
-        spawningService.setServiceParent(service)
+        log.info("Setting up master/child spawning service...")
+        self.spawningService = ChildSpawningService(self.dispatcher)
+        self.spawningService.setServiceParent(self)
 
+
+    def addProtocol(self, protocol, port):
+        log.info(
+            "Setting service for protocol {protocol!r} on port {port}...",
+            protocol=protocol, port=port,
+        )
+
         # TCP Service
         tcpFactory = SpawningInheritingProtocolFactory(
-            dispatcher, spawningService, options["protocol"]
+            self.dispatcher, self.spawningService, protocol
         )
-        tcpService = TCPServer(options["port"], tcpFactory)
+        tcpService = TCPServer(port, tcpFactory)
 
-        tcpService.setServiceParent(service)
+        tcpService.setServiceParent(self)
 
+
+    def startService(self):
+        """
+        Start up multiservice, then start up the dispatcher.
+        """
+        super(MasterService, self).startService()
+        self.dispatcher.startDispatching()
+
+
+    # IStatusWatcher
+
+    @staticmethod
+    def initialStatus():
+        log.info("Status: init")
+        return ChildStatus(sentCount=0, ackedCount=0)
+
+
+    @staticmethod
+    def newConnectionStatus(previousStatus):
+        log.info("Status: {0} new".format(previousStatus))
+        return ChildStatus(
+            sentCount=previousStatus.sentCount + 1,
+            ackedCount=previousStatus.ackedCount,
+        )
+
+
+    @staticmethod
+    def statusFromMessage(previousStatus, message):
+        log.info("Status: {0}{1!r}".format(previousStatus, message))
+        if message == b"-":
+            return ChildStatus(
+                sentCount=previousStatus.sentCount - 1,
+                ackedCount=previousStatus.ackedCount,
+            )
+        elif message == b"+":
+            return ChildStatus(
+                sentCount=previousStatus.sentCount,
+                ackedCount=previousStatus.ackedCount + 1,
+            )
+        else:
+            raise AssertionError("Unknown message: {}".format(message))
+
+
+    @staticmethod
+    def closeCountFromStatus(previousStatus):
+        log.info("Status: {0} close".format(previousStatus))
+        return (
+            previousStatus.ackedCount,
+            ChildStatus(
+                sentCount=previousStatus.sentCount,
+                ackedCount=0,
+            )
+        )
+
+
+    @staticmethod
+    def statusesChanged(statuses):
+        log.info("Status changed: {0}".format(tuple(statuses)))
+        # FIXME: This isn't in IStatusWatcher, but is called by
+        # InheritedSocketDispatcher.
+        pass
+
+
+
+ at implementer(IServiceMaker)
+class MasterServiceMaker(object):
+    """
+    Master process service maker.
+    """
+    log = Logger()
+
+
+    def makeService(self, options):
+        service = MasterService()
+
+        for protocol, port in options["protocol"]:
+            service.addProtocol(protocol, port)
+
         return service
 
 
@@ -444,69 +559,10 @@
 
 
 
-class Status(object):
+class ChildStatus(object):
     def __init__(self, sentCount, ackedCount):
         self.sentCount = sentCount
         self.ackedCount = ackedCount
 
     def __repr__(self):
         return "({self.sentCount},{self.ackedCount})".format(self=self)
-
-
-
- at implementer(IStatusWatcher)
-class StatusWatcher(object):
-    """
-    This enables the dispatcher to keep track of how many connections are in
-    flight for each child.
-    """
-    @staticmethod
-    def initialStatus():
-        log.info("Status: init")
-        return Status(sentCount=0, ackedCount=0)
-
-
-    @staticmethod
-    def newConnectionStatus(previousStatus):
-        log.info("Status: {0} new".format(previousStatus))
-        return Status(
-            sentCount=previousStatus.sentCount + 1,
-            ackedCount=previousStatus.ackedCount,
-        )
-
-
-    @staticmethod
-    def statusFromMessage(previousStatus, message):
-        log.info("Status: {0}{1!r}".format(previousStatus, message))
-        if message == b"-":
-            return Status(
-                sentCount=previousStatus.sentCount - 1,
-                ackedCount=previousStatus.ackedCount,
-            )
-        elif message == b"+":
-            return Status(
-                sentCount=previousStatus.sentCount,
-                ackedCount=previousStatus.ackedCount + 1,
-            )
-        else:
-            raise AssertionError("Unknown message: {}".format(message))
-
-
-    @staticmethod
-    def closeCountFromStatus(previousStatus):
-        log.info("Status: {0} close".format(previousStatus))
-        return (
-            previousStatus.ackedCount,
-            Status(
-                sentCount=previousStatus.sentCount,
-                ackedCount=0,
-            )
-        )
-
-
-    @staticmethod
-    def statusesChanged(statuses):
-        log.info("Status changed: {0}".format(tuple(statuses)))
-        # FIXME: This isn't in IStatusWatcher, but is called by
-        # InheritedSocketDispatcher.
-        pass
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140312/2fb54544/attachment.html>


More information about the calendarserver-changes mailing list