[CalendarServer-changes] [12044] CalendarServer/trunk/twext/application/masterchild.py
source_changes at macosforge.org
source_changes at macosforge.org
Wed Mar 12 11:17:02 PDT 2014
Revision: 12044
http://trac.calendarserver.org//changeset/12044
Author: wsanchez at apple.com
Date: 2013-12-09 14:31:51 -0800 (Mon, 09 Dec 2013)
Log Message:
-----------
Borrow more from metafd.py
Modified Paths:
--------------
CalendarServer/trunk/twext/application/masterchild.py
Modified: CalendarServer/trunk/twext/application/masterchild.py
===================================================================
--- CalendarServer/trunk/twext/application/masterchild.py 2013-12-09 22:01:25 UTC (rev 12043)
+++ CalendarServer/trunk/twext/application/masterchild.py 2013-12-09 22:31:51 UTC (rev 12044)
@@ -37,12 +37,14 @@
import sys
from os import close, unlink
from tempfile import mkstemp
+from functools import total_ordering
from zope.interface import implementer
from twisted.python.sendmsg import getsockfam
from twisted.python.usage import Options, UsageError
from twisted.python.reflect import namedClass
+from twisted.python.util import FancyStrMixin
from twisted.application.service import MultiService, Service
from twisted.application.service import IServiceMaker
from twisted.application.internet import TCPServer
@@ -166,13 +168,13 @@
self.dispatcher = InheritedSocketDispatcher(self)
# Child Processes
- log.info("Setting up master/child spawning service...")
+ self.log.info("Setting up master/child spawning service...")
self.spawningService = ChildSpawningService(self.dispatcher)
self.spawningService.setServiceParent(self)
def addProtocol(self, protocol, port):
- log.info(
+ self.log.info(
"Setting service for protocol {protocol!r} on port {port}...",
protocol=protocol, port=port,
)
@@ -199,56 +201,77 @@
@staticmethod
def initialStatus():
log.info("Status: init")
- return ChildStatus(sentCount=0, ackedCount=0)
+ return ChildStatus()
+
@staticmethod
def newConnectionStatus(previousStatus):
log.info("Status: {0} new".format(previousStatus))
- return ChildStatus(
- sentCount=previousStatus.sentCount + 1,
- ackedCount=previousStatus.ackedCount,
- )
+ return previousStatus + ChildStatus(unacknowledged=1)
+
@staticmethod
def statusFromMessage(previousStatus, message):
- log.info("Status: {0}{1!r}".format(previousStatus, message))
- if message == b"-":
- return ChildStatus(
- sentCount=previousStatus.sentCount - 1,
- ackedCount=previousStatus.ackedCount,
+ log.info("Status: {0} {1!r}".format(previousStatus, message))
+
+ if message == "-":
+ # A connection has gone away in a subprocess; we should start
+ # accepting connections again if we paused (see
+ # newConnectionStatus)
+ return previousStatus - ChildStatus(acknowledged=1)
+
+ elif message == "0":
+ # A new process just started accepting new connections. It might
+ # still have some unacknowledged connections, but any connections
+ # that it acknowledged working on are now completed. (We have no
+ # way of knowing whether the acknowledged connections were acted
+ # upon or dropped, so we have to treat that number with a healthy
+ # amount of skepticism.)
+ return previousStatus.restarted()
+
+ elif message == "+":
+ # Acknowledges that the subprocess has taken on the work.
+ return (
+ previousStatus +
+ ChildStatus(acknowledged=1, unacknowledged=-1, unclosed=1)
)
- elif message == b"+":
- return ChildStatus(
- sentCount=previousStatus.sentCount,
- ackedCount=previousStatus.ackedCount + 1,
- )
+
else:
- raise AssertionError("Unknown message: {}".format(message))
+ raise AssertionError("Unknown message: {0}".format(message))
@staticmethod
def closeCountFromStatus(previousStatus):
log.info("Status: {0} close".format(previousStatus))
- return (
- previousStatus.ackedCount,
- ChildStatus(
- sentCount=previousStatus.sentCount,
- ackedCount=0,
- )
- )
+ toClose = previousStatus.unclosed
+ return (toClose, previousStatus - ChildStatus(unclosed=toClose))
- @staticmethod
- def statusesChanged(statuses):
- log.info("Status changed: {0}".format(tuple(statuses)))
+
+ def statusesChanged(self, statuses):
# FIXME: This isn't in IStatusWatcher, but is called by
# InheritedSocketDispatcher.
- pass
+ self.log.info("Status changed: {0}".format(tuple(statuses)))
+ # current = sum(
+ # status.effective()
+ # for status in self.dispatcher.statuses
+ # )
+ # maximum = self.maxRequests
+ # overloaded = (current >= maximum)
+
+ # for f in self.factories:
+ # if overloaded:
+ # f.loadAboveMaximum()
+ # else:
+ # f.loadNominal()
+
+
+
@implementer(IServiceMaker)
class MasterServiceMaker(object):
"""
@@ -496,6 +519,9 @@
Service for child processes.
"""
+ log = Logger()
+
+
def __init__(self, fd, protocolFactory):
self.fd = fd
self.protocolFactory = protocolFactory
@@ -530,7 +556,7 @@
factory = self.wrappedProtocolFactory
factory.inheritedPort.reportStatus("+")
- log.info("{factory.inheritedPort.statusQueue}", factory=factory)
+ self.log.info("{factory.inheritedPort.statusQueue}", factory=factory)
socketFD = socket.fileno()
transport = reactor.adoptStreamConnection(
@@ -543,8 +569,11 @@
class ReportingProtocolWrapper(ProtocolWrapper, object):
+ log = Logger()
+
+
def connectionLost(self, reason):
- log.info("CONNECTION LOST")
+ self.log.info("CONNECTION LOST")
self.factory.inheritedPort.reportStatus("-")
return super(ReportingProtocolWrapper, self).connectionLost(reason)
@@ -559,10 +588,101 @@
-class ChildStatus(object):
- def __init__(self, sentCount, ackedCount):
- self.sentCount = sentCount
- self.ackedCount = ackedCount
+ at total_ordering
+class ChildStatus(FancyStrMixin, object):
+ """
+ The status of a child process.
+ """
- def __repr__(self):
- return "({self.sentCount},{self.ackedCount})".format(self=self)
+ showAttributes = (
+ "acknowledged",
+ "unacknowledged",
+ "started",
+ "abandoned",
+ "unclosed",
+ )
+
+
+ def __init__(self, acknowledged=0, unacknowledged=0, started=0,
+ abandoned=0, unclosed=0):
+ """
+ Create a L{ConnectionStatus} with a number of sent connections and a
+ number of un-acknowledged connections.
+
+ @param acknowledged: the number of connections which we know the
+ subprocess to be presently processing; i.e. those which have been
+ transmitted to the subprocess.
+
+ @param unacknowledged: The number of connections which we have sent to
+ the subprocess which have never received a status response (a
+ "C{+}" status message).
+
+ @param abandoned: The number of connections which have been sent to
+ this worker, but were not acknowledged at the moment that the
+ worker restarted.
+
+ @param started: The number of times this worker has been started.
+
+ @param unclosed: The number of sockets which have been sent to the
+ subprocess but not yet closed.
+ """
+ self.acknowledged = acknowledged
+ self.unacknowledged = unacknowledged
+ self.started = started
+ self.abandoned = abandoned
+ self.unclosed = unclosed
+
+
+ def effectiveLoad(self):
+ """
+ The current effective load.
+ """
+ return self.acknowledged + self.unacknowledged
+
+
+ def restarted(self):
+ """
+ The L{ChildStatus} derived from the current status of a process and
+ the fact that it just restarted.
+ """
+ return self.__class__(0, 0, self.started + 1, self.unacknowledged)
+
+
+ def _tuplify(self):
+ return tuple(getattr(self, attr) for attr in self.showAttributes)
+
+
+ def __lt__(self, other):
+ if not isinstance(other, ChildStatus):
+ return NotImplemented
+
+ return self.effectiveLoad() < other.effectiveLoad()
+
+
+ def __eq__(self, other):
+ if not isinstance(other, ChildStatus):
+ return NotImplemented
+
+ return self._tuplify() == other._tuplify()
+
+
+ def __add__(self, other):
+ if not isinstance(other, ChildStatus):
+ return NotImplemented
+
+ a = self._tuplify()
+ b = other._tuplify()
+ sum = [a1 + b1 for (a1, b1) in zip(a, b)]
+
+ return self.__class__(*sum)
+
+
+ def __sub__(self, other):
+ if not isinstance(other, ChildStatus):
+ return NotImplemented
+
+ a = self._tuplify()
+ b = other._tuplify()
+ difference = [a1 - b1 for (a1, b1) in zip(a, b)]
+
+ return self + self.__class__(*difference)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140312/f72175c9/attachment.html>
More information about the calendarserver-changes
mailing list