[CalendarServer-changes] [11489] CalendarServer/branches/users/glyph/hang-fix/twext

source_changes at macosforge.org source_changes at macosforge.org
Fri Jul 5 17:43:34 PDT 2013


Revision: 11489
          http://trac.calendarserver.org//changeset/11489
Author:   glyph at apple.com
Date:     2013-07-05 17:43:34 -0700 (Fri, 05 Jul 2013)
Log Message:
-----------
Remove some overly-specific white-box tests in favor of those which test the inter-process interface.  Stop taking unacknowledged requests into account when computing the 'outstandingRequests' attribute in the master, since processes which have been restarted might have connections which never arrived.

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/hang-fix/twext/internet/test/test_sendfdport.py
    CalendarServer/branches/users/glyph/hang-fix/twext/web2/metafd.py
    CalendarServer/branches/users/glyph/hang-fix/twext/web2/test/test_metafd.py

Modified: CalendarServer/branches/users/glyph/hang-fix/twext/internet/test/test_sendfdport.py
===================================================================
--- CalendarServer/branches/users/glyph/hang-fix/twext/internet/test/test_sendfdport.py	2013-07-06 00:43:32 UTC (rev 11488)
+++ CalendarServer/branches/users/glyph/hang-fix/twext/internet/test/test_sendfdport.py	2013-07-06 00:43:34 UTC (rev 11489)
@@ -137,56 +137,6 @@
                           dispatcher._subprocessSockets)
 
 
-    def test_sendFileDescriptorSorting(self):
-        """
-        Make sure InheritedSocketDispatcher.sendFileDescriptor sorts sockets
-        with status None higher than those with int status values.
-        """
-        dispatcher = self.dispatcher
-        dispatcher.addSocket()
-        dispatcher.addSocket()
-        dispatcher.addSocket()
-
-        sockets = dispatcher._subprocessSockets[:]
-
-        # Check that 0 is preferred over None
-        sockets[0].status = 0
-        sockets[1].status = 1
-        sockets[2].status = None
-
-        dispatcher.sendFileDescriptor(None, "")
-
-        self.assertEqual(sockets[0].status, 1)
-        self.assertEqual(sockets[1].status, 1)
-        self.assertEqual(sockets[2].status, None)
-
-        dispatcher.sendFileDescriptor(None, "")
-
-        self.assertEqual(sockets[0].status, 1)
-        self.assertEqual(sockets[1].status, 1)
-        self.assertEqual(sockets[2].status, 1)
-
-        # Check that after going to 1 and back to 0 that is still preferred
-        # over None
-        sockets[0].status = 0
-        sockets[1].status = 1
-        sockets[2].status = None
-
-        dispatcher.sendFileDescriptor(None, "")
-
-        self.assertEqual(sockets[0].status, 1)
-        self.assertEqual(sockets[1].status, 1)
-        self.assertEqual(sockets[2].status, None)
-
-        sockets[1].status = 0
-
-        dispatcher.sendFileDescriptor(None, "")
-
-        self.assertEqual(sockets[0].status, 1)
-        self.assertEqual(sockets[1].status, 1)
-        self.assertEqual(sockets[2].status, None)
-
-
     def test_statusesChangedOnNewConnection(self):
         """
         L{InheritedSocketDispatcher.sendFileDescriptor} will update its

Modified: CalendarServer/branches/users/glyph/hang-fix/twext/web2/metafd.py
===================================================================
--- CalendarServer/branches/users/glyph/hang-fix/twext/web2/metafd.py	2013-07-06 00:43:32 UTC (rev 11488)
+++ CalendarServer/branches/users/glyph/hang-fix/twext/web2/metafd.py	2013-07-06 00:43:34 UTC (rev 11489)
@@ -15,6 +15,14 @@
 # limitations under the License.
 ##
 
+"""
+Implementation of dispatching HTTP connections to child processes using
+L{twext.internet.sendfdport.InheritedSocketDispatcher}.
+"""
+from __future__ import print_function
+
+from functools import total_ordering
+
 from twext.internet.sendfdport import (
     InheritedPort, InheritedSocketDispatcher, InheritingProtocolFactory)
 from twext.internet.tcp import MaxAcceptTCPServer
@@ -152,6 +160,73 @@
 
 
 
+ at total_ordering
+class WorkerStatus(object):
+    """
+    The status of a worker process.
+    """
+
+    def __init__(self, acknowledged=0, unacknowledged=0, started=0):
+        """
+        Create a L{ConnectionStatus} with a number of sent connections and a
+        number of un-acknowledged connections.
+
+        @param acknowledged: the number of connections which we know the
+            subprocess to be presently processing; i.e. those which have been
+            transmitted to the subprocess.
+
+        @param unacknowledged: The number of connections which we have sent to
+            the subprocess which have never received a status response (a
+            "C{+}" status message).
+
+        @param started: The number of times this worker has been started.
+        """
+        self.acknowledged = acknowledged
+        self.unacknowledged = unacknowledged
+        self.started = started
+
+
+    def restarted(self):
+        """
+        The L{WorkerStatus} derived from the current status of a process and
+        the fact that it just restarted.
+        """
+        return self.__class__(0, self.unacknowledged, self.started + 1)
+
+
+    def _tuplify(self):
+        return (self.acknowledged, self.unacknowledged, self.started)
+
+
+    def __lt__(self, other):
+        if not isinstance(other, WorkerStatus):
+            return NotImplemented
+        return self._tuplify() < other._tuplify()
+
+
+    def __eq__(self, other):
+        if not isinstance(other, WorkerStatus):
+            return NotImplemented
+        return self._tuplify() == other._tuplify()
+
+
+    def __add__(self, other):
+        if not isinstance(other, WorkerStatus):
+            return NotImplemented
+        return self.__class__(self.acknowledged + other.acknowledged,
+                              self.unacknowledged + other.unacknowledged,
+                              self.started + other.started)
+
+
+    def __sub__(self, other):
+        if not isinstance(other, WorkerStatus):
+            return NotImplemented
+        return self + self.__class__(-other.acknowledged,
+                                     -other.unacknowledged,
+                                     -other.started)
+
+
+
 class ConnectionLimiter(MultiService, object):
     """
     Connection limiter for use with L{InheritedSocketDispatcher}.
@@ -195,17 +270,15 @@
         ).setServiceParent(self)
 
 
-    def intWithNoneAsZero(self, n):
-        if n is None:
-            return 0
-        return n
     # IStatusWatcher
 
     def initialStatus(self):
         """
-        TODO
+        The status of a new worker added to the pool.
         """
+        return WorkerStatus()
 
+
     def statusFromMessage(self, previousStatus, message):
         """
         Determine a subprocess socket's status from its previous status and a
@@ -215,25 +288,19 @@
             # A connection has gone away in a subprocess; we should start
             # accepting connections again if we paused (see
             # newConnectionStatus)
-            result = self.intWithNoneAsZero(previousStatus) - 1
-            if result < 0:
-                log.error("metafd: trying to decrement status below zero")
-                result = 0
+            return previousStatus - WorkerStatus(acknowledged=1)
         elif message == '0':
-            # A new process just started accepting new connections; zero
-            # out its expected load, but only if previous status is still
-            # None
-            result = 0 if previousStatus is None else previousStatus
-            if previousStatus is None:
-                result = 0
-            else:
-                log.error("metafd: trying to zero status that is not None")
-                result = previousStatus
+            # A new process just started accepting new connections.  It might
+            # still have some unacknowledged connections, but any connections
+            # that it acknowledged working on are now completed.  (We have no
+            # way of knowing whether the acknowledged connections were acted
+            # upon or dropped, so we have to treat that number with a healthy
+            # amount of skepticism.)
+            return previousStatus.restarted()
         else:
-            # '+' is just an acknowledgement of newConnectionStatus, so we can
-            # ignore it.
-            result = self.intWithNoneAsZero(previousStatus)
-        return result
+            # '+' acknowledges that the subprocess has taken on the work.
+            return previousStatus + WorkerStatus(acknowledged=1,
+                                                 unacknowledged=-1)
 
 
     def newConnectionStatus(self, previousStatus):
@@ -241,8 +308,7 @@
         Determine the effect of a new connection being sent on a subprocess
         socket.
         """
-        result = self.intWithNoneAsZero(previousStatus) + 1
-        return result
+        return previousStatus + WorkerStatus(unacknowledged=1)
 
 
     def statusesChanged(self, statuses):
@@ -254,7 +320,9 @@
         C{self.dispatcher.statuses} attribute, which is what
         C{self.outstandingRequests} uses to compute it.)
         """
-        current = self.outstandingRequests + 1
+        current = sum(status.acknowledged
+                      for status in self.dispatcher.statuses)
+        self._outstandingRequests = current # preserve for or= field in log
         maximum = self.maxRequests
         overloaded = (current >= maximum)
         if overloaded:
@@ -265,9 +333,10 @@
                 f.myServer.myPort.startReading()
 
 
-    @property
+    _outstandingRequests = 0
+    @property # make read-only
     def outstandingRequests(self):
-        return sum(map(self.intWithNoneAsZero, self.dispatcher.statuses))
+        return self._outstandingRequests
 
 
 

Modified: CalendarServer/branches/users/glyph/hang-fix/twext/web2/test/test_metafd.py
===================================================================
--- CalendarServer/branches/users/glyph/hang-fix/twext/web2/test/test_metafd.py	2013-07-06 00:43:32 UTC (rev 11488)
+++ CalendarServer/branches/users/glyph/hang-fix/twext/web2/test/test_metafd.py	2013-07-06 00:43:34 UTC (rev 11489)
@@ -153,36 +153,6 @@
     Tests for L{ConnectionLimiter}
     """
 
-    def test_statusFromMessage(self):
-        """
-        L{ConnectionLimiter.statusFromMessage} will not return a value below
-        zero, and a "0" status - a new process reporting its launching - will
-        leave the previous status value the same, on the assumption that the 0
-        is simply being reported late, and the master has given the worker some
-        work to do before it reports its initial status.
-        """
-
-        cl = ConnectionLimiter(2, 20)
-
-        # "0" Zeroing out does not overwrite legitimate count
-        self.assertEqual(cl.statusFromMessage(None, "0"), 0)
-        self.assertEqual(cl.statusFromMessage(0, "0"), 0)
-        self.assertEqual(cl.statusFromMessage(1, "0"), 1)
-        self.assertEqual(cl.statusFromMessage(2, "0"), 2)
-
-        # "-" No negative counts
-        self.assertEqual(cl.statusFromMessage(None, "-"), 0)
-        self.assertEqual(cl.statusFromMessage(0, "-"), 0)
-        self.assertEqual(cl.statusFromMessage(1, "-"), 0)
-        self.assertEqual(cl.statusFromMessage(2, "-"), 1)
-
-        # "+" No change
-        self.assertEqual(cl.statusFromMessage(None, "+"), 0)
-        self.assertEqual(cl.statusFromMessage(0, "+"), 0)
-        self.assertEqual(cl.statusFromMessage(1, "+"), 1)
-        self.assertEqual(cl.statusFromMessage(2, "+"), 2)
-
-
     def test_loadReducedStartsReadingAgain(self):
         """
         L{ConnectionLimiter.statusesChanged} determines whether the current
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130705/84c46d3c/attachment-0001.html>


More information about the calendarserver-changes mailing list