[CalendarServer-changes] [12044] CalendarServer/trunk/twext/application/masterchild.py

source_changes at macosforge.org source_changes at macosforge.org
Wed Mar 12 11:17:02 PDT 2014


Revision: 12044
          http://trac.calendarserver.org//changeset/12044
Author:   wsanchez at apple.com
Date:     2013-12-09 14:31:51 -0800 (Mon, 09 Dec 2013)
Log Message:
-----------
Borrow more from metafd.py

Modified Paths:
--------------
    CalendarServer/trunk/twext/application/masterchild.py

Modified: CalendarServer/trunk/twext/application/masterchild.py
===================================================================
--- CalendarServer/trunk/twext/application/masterchild.py	2013-12-09 22:01:25 UTC (rev 12043)
+++ CalendarServer/trunk/twext/application/masterchild.py	2013-12-09 22:31:51 UTC (rev 12044)
@@ -37,12 +37,14 @@
 import sys
 from os import close, unlink
 from tempfile import mkstemp
+from functools import total_ordering
 
 from zope.interface import implementer
 
 from twisted.python.sendmsg import getsockfam
 from twisted.python.usage import Options, UsageError
 from twisted.python.reflect import namedClass
+from twisted.python.util import FancyStrMixin
 from twisted.application.service import MultiService, Service
 from twisted.application.service import IServiceMaker
 from twisted.application.internet import TCPServer
@@ -166,13 +168,13 @@
         self.dispatcher = InheritedSocketDispatcher(self)
 
         # Child Processes
-        log.info("Setting up master/child spawning service...")
+        self.log.info("Setting up master/child spawning service...")
         self.spawningService = ChildSpawningService(self.dispatcher)
         self.spawningService.setServiceParent(self)
 
 
     def addProtocol(self, protocol, port):
-        log.info(
+        self.log.info(
             "Setting service for protocol {protocol!r} on port {port}...",
             protocol=protocol, port=port,
         )
@@ -199,56 +201,77 @@
     @staticmethod
     def initialStatus():
         log.info("Status: init")
-        return ChildStatus(sentCount=0, ackedCount=0)
 
+        return ChildStatus()
 
+
     @staticmethod
     def newConnectionStatus(previousStatus):
         log.info("Status: {0} new".format(previousStatus))
-        return ChildStatus(
-            sentCount=previousStatus.sentCount + 1,
-            ackedCount=previousStatus.ackedCount,
-        )
 
+        return previousStatus + ChildStatus(unacknowledged=1)
 
+
     @staticmethod
     def statusFromMessage(previousStatus, message):
-        log.info("Status: {0}{1!r}".format(previousStatus, message))
-        if message == b"-":
-            return ChildStatus(
-                sentCount=previousStatus.sentCount - 1,
-                ackedCount=previousStatus.ackedCount,
+        log.info("Status: {0} {1!r}".format(previousStatus, message))
+
+        if message == "-":
+            # A connection has gone away in a subprocess; we should start
+            # accepting connections again if we paused (see
+            # newConnectionStatus)
+            return previousStatus - ChildStatus(acknowledged=1)
+
+        elif message == "0":
+            # A new process just started accepting new connections.  It might
+            # still have some unacknowledged connections, but any connections
+            # that it acknowledged working on are now completed.  (We have no
+            # way of knowing whether the acknowledged connections were acted
+            # upon or dropped, so we have to treat that number with a healthy
+            # amount of skepticism.)
+            return previousStatus.restarted()
+
+        elif message == "+":
+            # Acknowledges that the subprocess has taken on the work.
+            return (
+                previousStatus +
+                ChildStatus(acknowledged=1, unacknowledged=-1, unclosed=1)
             )
-        elif message == b"+":
-            return ChildStatus(
-                sentCount=previousStatus.sentCount,
-                ackedCount=previousStatus.ackedCount + 1,
-            )
+
         else:
-            raise AssertionError("Unknown message: {}".format(message))
+            raise AssertionError("Unknown message: {0}".format(message))
 
 
     @staticmethod
     def closeCountFromStatus(previousStatus):
         log.info("Status: {0} close".format(previousStatus))
-        return (
-            previousStatus.ackedCount,
-            ChildStatus(
-                sentCount=previousStatus.sentCount,
-                ackedCount=0,
-            )
-        )
 
+        toClose = previousStatus.unclosed
+        return (toClose, previousStatus - ChildStatus(unclosed=toClose))
 
-    @staticmethod
-    def statusesChanged(statuses):
-        log.info("Status changed: {0}".format(tuple(statuses)))
+
+    def statusesChanged(self, statuses):
         # FIXME: This isn't in IStatusWatcher, but is called by
         # InheritedSocketDispatcher.
-        pass
 
+        self.log.info("Status changed: {0}".format(tuple(statuses)))
 
+        # current = sum(
+        #     status.effective()
+        #     for status in self.dispatcher.statuses
+        # )
 
+        # maximum = self.maxRequests
+        # overloaded = (current >= maximum)
+
+        # for f in self.factories:
+        #     if overloaded:
+        #         f.loadAboveMaximum()
+        #     else:
+        #         f.loadNominal()
+
+
+
 @implementer(IServiceMaker)
 class MasterServiceMaker(object):
     """
@@ -496,6 +519,9 @@
     Service for child processes.
     """
 
+    log = Logger()
+
+
     def __init__(self, fd, protocolFactory):
         self.fd = fd
         self.protocolFactory = protocolFactory
@@ -530,7 +556,7 @@
 
         factory = self.wrappedProtocolFactory
         factory.inheritedPort.reportStatus("+")
-        log.info("{factory.inheritedPort.statusQueue}", factory=factory)
+        self.log.info("{factory.inheritedPort.statusQueue}", factory=factory)
 
         socketFD = socket.fileno()
         transport = reactor.adoptStreamConnection(
@@ -543,8 +569,11 @@
 
 
 class ReportingProtocolWrapper(ProtocolWrapper, object):
+    log = Logger()
+
+
     def connectionLost(self, reason):
-        log.info("CONNECTION LOST")
+        self.log.info("CONNECTION LOST")
         self.factory.inheritedPort.reportStatus("-")
         return super(ReportingProtocolWrapper, self).connectionLost(reason)
 
@@ -559,10 +588,101 @@
 
 
 
-class ChildStatus(object):
-    def __init__(self, sentCount, ackedCount):
-        self.sentCount = sentCount
-        self.ackedCount = ackedCount
+ at total_ordering
+class ChildStatus(FancyStrMixin, object):
+    """
+    The status of a child process.
+    """
 
-    def __repr__(self):
-        return "({self.sentCount},{self.ackedCount})".format(self=self)
+    showAttributes = (
+        "acknowledged",
+        "unacknowledged",
+        "started",
+        "abandoned",
+        "unclosed",
+    )
+
+
+    def __init__(self, acknowledged=0, unacknowledged=0, started=0,
+                 abandoned=0, unclosed=0):
+        """
+        Create a L{ConnectionStatus} with a number of sent connections and a
+        number of un-acknowledged connections.
+
+        @param acknowledged: the number of connections which we know the
+            subprocess to be presently processing; i.e. those which have been
+            transmitted to the subprocess.
+
+        @param unacknowledged: The number of connections which we have sent to
+            the subprocess which have never received a status response (a
+            "C{+}" status message).
+
+        @param abandoned: The number of connections which have been sent to
+            this worker, but were not acknowledged at the moment that the
+            worker restarted.
+
+        @param started: The number of times this worker has been started.
+
+        @param unclosed: The number of sockets which have been sent to the
+            subprocess but not yet closed.
+        """
+        self.acknowledged = acknowledged
+        self.unacknowledged = unacknowledged
+        self.started = started
+        self.abandoned = abandoned
+        self.unclosed = unclosed
+
+
+    def effectiveLoad(self):
+        """
+        The current effective load.
+        """
+        return self.acknowledged + self.unacknowledged
+
+
+    def restarted(self):
+        """
+        The L{ChildStatus} derived from the current status of a process and
+        the fact that it just restarted.
+        """
+        return self.__class__(0, 0, self.started + 1, self.unacknowledged)
+
+
+    def _tuplify(self):
+        return tuple(getattr(self, attr) for attr in self.showAttributes)
+
+
+    def __lt__(self, other):
+        if not isinstance(other, ChildStatus):
+            return NotImplemented
+
+        return self.effectiveLoad() < other.effectiveLoad()
+
+
+    def __eq__(self, other):
+        if not isinstance(other, ChildStatus):
+            return NotImplemented
+
+        return self._tuplify() == other._tuplify()
+
+
+    def __add__(self, other):
+        if not isinstance(other, ChildStatus):
+            return NotImplemented
+
+        a = self._tuplify()
+        b = other._tuplify()
+        sum = [a1 + b1 for (a1, b1) in zip(a, b)]
+
+        return self.__class__(*sum)
+
+
+    def __sub__(self, other):
+        if not isinstance(other, ChildStatus):
+            return NotImplemented
+
+        a = self._tuplify()
+        b = other._tuplify()
+        difference = [a1 - b1 for (a1, b1) in zip(a, b)]
+
+        return self + self.__class__(*difference)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140312/f72175c9/attachment.html>


More information about the calendarserver-changes mailing list