[CalendarServer-changes] [11487] CalendarServer/branches/users/glyph/hang-fix/twext

source_changes at macosforge.org source_changes at macosforge.org
Fri Jul 5 17:43:30 PDT 2013


Revision: 11487
          http://trac.calendarserver.org//changeset/11487
Author:   glyph at apple.com
Date:     2013-07-05 17:43:30 -0700 (Fri, 05 Jul 2013)
Log Message:
-----------
Make status watcher interface explicit and documented, and add an 'initialStatus' method so that we don't have to have all these None checks.

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/hang-fix/twext/internet/sendfdport.py
    CalendarServer/branches/users/glyph/hang-fix/twext/internet/test/test_sendfdport.py
    CalendarServer/branches/users/glyph/hang-fix/twext/web2/metafd.py
    CalendarServer/branches/users/glyph/hang-fix/twext/web2/test/test_metafd.py

Modified: CalendarServer/branches/users/glyph/hang-fix/twext/internet/sendfdport.py
===================================================================
--- CalendarServer/branches/users/glyph/hang-fix/twext/internet/sendfdport.py	2013-07-06 00:43:28 UTC (rev 11486)
+++ CalendarServer/branches/users/glyph/hang-fix/twext/internet/sendfdport.py	2013-07-06 00:43:30 UTC (rev 11487)
@@ -25,6 +25,8 @@
 from socket import (socketpair, fromfd, error as SocketError, AF_UNIX,
                     SOCK_STREAM, SOCK_DGRAM)
 
+from zope.interface import Interface
+
 from twisted.internet.abstract import FileDescriptor
 from twisted.internet.protocol import Protocol, Factory
 
@@ -108,9 +110,9 @@
     @type status: C{str}
     """
 
-    def __init__(self, dispatcher, skt):
+    def __init__(self, dispatcher, skt, status):
         FileDescriptor.__init__(self, dispatcher.reactor)
-        self.status = None
+        self.status = status
         self.dispatcher = dispatcher
         self.skt = skt          # XXX needs to be set non-blocking by somebody
         self.fileno = skt.fileno
@@ -156,6 +158,69 @@
 
 
 
+class IStatusWatcher(Interface):
+    """
+    A provider of L{IStatusWatcher} tracks the I{status messages} reported by
+    the worker processes over their control sockets, and computes internal
+    I{status values} for those messages.  The I{messages} are individual
+    octets, representing one of three operations.  C{0} meaning "a new worker
+    process has started, with zero connections being processed", C{+} meaning
+    "I have received and am processing your request; I am confirming that my
+    requests-being-processed count has gone up by one", and C{-} meaning "I
+    have completed processing a request, my requests-being-processed count has
+    gone down by one".  The I{status value} tracked by
+    L{_SubprocessSocket.status} is an integer, indicating the current
+    requests-being-processed value.  (FIXME: the intended design here is
+    actually just that all I{this} object knows about is that
+    L{_SubprocessSocket.status} is an orderable value, and that this
+    C{statusWatcher} will compute appropriate values so the status that I{sorts
+    the least} is the socket to which new connections should be directed; also,
+    the format of the status messages is only known / understood by the
+    C{statusWatcher}, not the L{InheritedSocketDispatcher}.  It's hard to
+    explain it in that manner though.)
+
+    @note: the intention of this interface is to eventually provide a broader
+        notion of what might constitute 'status', so the above explanation just
+        explains the current implementation, in for expediency's sake, rather
+        than the somewhat more abstract language that would be accurate.
+    """
+
+    def initialStatus():
+        """
+        A new socket was created and added to the dispatcher.  Compute an
+        initial value for its status.
+
+        @return: the new status.
+        """
+
+
+    def newConnectionStatus(previousStatus):
+        """
+        A new connection was sent to a given socket.  Compute its status based
+        on the previous status of that socket.
+
+        @param previousStatus: A status value for the socket being sent work,
+            previously returned by one of the methods on this interface.
+
+        @return: the socket's status after incrementing its outstanding work.
+        """
+
+
+    def statusFromMessage(previousStatus, message):
+        """
+        A status message was received by a worker.  Convert the previous status
+        value (returned from L{newConnectionStatus}, L{initialStatus}, or
+        L{statusFromMessage}).
+
+        @param previousStatus: A status value for the socket being sent work,
+            previously returned by one of the methods on this interface.
+
+        @return: the socket's status after taking the reported message into
+            account.
+        """
+
+
+
 class InheritedSocketDispatcher(object):
     """
     Used by one or more L{InheritingProtocolFactory}s, this keeps track of a
@@ -165,29 +230,9 @@
     L{InheritedSocketDispatcher} is therefore insantiated in the I{master
     process}.
 
-    @ivar statusWatcher: An object that tracks the I{status messages} reported
-        by the worker processes over their control sockets, and computes
-        internal I{status values} for those messages.  The I{messages} are
-        individual octets, representing one of three operations.  C{0} meaning
-        "a new worker process has started, with zero connections being
-        processed", C{+} meaning "I have received and am processing your
-        request; I am confirming that my requests-being-processed count has
-        gone up by one", and C{-} meaning "I have completed processing a
-        request, my requests-being-processed count has gone down by one".  The
-        I{status value} tracked by L{_SubprocessSocket.status} is an integer,
-        indicating the current requests-being-processed value.  (FIXME: the
-        intended design here is actually just that all I{this} object knows
-        about is that L{_SubprocessSocket.status} is an orderable value, and
-        that this C{statusWatcher} will compute appropriate values so the
-        status that I{sorts the least} is the socket to which new connections
-        should be directed; also, the format of the status messages is only
-        known / understood by the C{statusWatcher}, not the
-        L{InheritedSocketDispatcher}.  It's hard to explain it in that manner
-        though.)
-    @type statusWatcher: L{twext.web2.metafd.ConnectionLimiter} (FIXME: this
-        should be a bit more abstract; declared in an interface or something,
-        since we may in principle want to throttle connections from more than
-        just HTTP.)
+    @ivar statusWatcher: The object which will handle status messages and
+        convert them into current statuses, as well as .
+    @type statusWatcher: L{IStatusWatcher}
     """
 
     def __init__(self, statusWatcher):
@@ -272,7 +317,7 @@
         i, o = socketpair(AF_UNIX, SOCK_DGRAM)
         i.setblocking(False)
         o.setblocking(False)
-        a = _SubprocessSocket(self, o)
+        a = _SubprocessSocket(self, o, self.statusWatcher.initialStatus())
         self._subprocessSockets.append(a)
         if self._isDispatching:
             a.startReading()

Modified: CalendarServer/branches/users/glyph/hang-fix/twext/internet/test/test_sendfdport.py
===================================================================
--- CalendarServer/branches/users/glyph/hang-fix/twext/internet/test/test_sendfdport.py	2013-07-06 00:43:28 UTC (rev 11486)
+++ CalendarServer/branches/users/glyph/hang-fix/twext/internet/test/test_sendfdport.py	2013-07-06 00:43:30 UTC (rev 11487)
@@ -1,3 +1,4 @@
+from twext.internet.sendfdport import IStatusWatcher
 # -*- test-case-name: twext.internet.test.test_sendfdport -*-
 ##
 # Copyright (c) 2010-2013 Apple Inc. All rights reserved.
@@ -65,20 +66,29 @@
 
 
 
+from zope.interface.verify import verifyClass
+from zope.interface import implementer
+
+def verifiedImplementer(interface):
+    def _(cls):
+        result = implementer(interface)(cls)
+        verifyClass(interface, result)
+        return result
+    return _
+
+
+
+ at verifiedImplementer(IStatusWatcher)
 class Watcher(object):
     def __init__(self, q):
         self.q = q
 
 
     def newConnectionStatus(self, previous):
-        if previous is None:
-            previous = 0
         return previous + 1
 
 
     def statusFromMessage(self, previous, message):
-        if previous is None:
-            previous = 0
         return previous - 1
 
 
@@ -86,7 +96,11 @@
         self.q.append(list(statuses))
 
 
+    def initialStatus(self):
+        return 0
 
+
+
 class InheritedSocketDispatcherTests(TestCase):
     """
     Inherited socket dispatcher tests.

Modified: CalendarServer/branches/users/glyph/hang-fix/twext/web2/metafd.py
===================================================================
--- CalendarServer/branches/users/glyph/hang-fix/twext/web2/metafd.py	2013-07-06 00:43:28 UTC (rev 11486)
+++ CalendarServer/branches/users/glyph/hang-fix/twext/web2/metafd.py	2013-07-06 00:43:30 UTC (rev 11487)
@@ -195,9 +195,17 @@
         ).setServiceParent(self)
 
 
-    # implementation of implicit statusWatcher interface required by
-    # InheritedSocketDispatcher
+    def intWithNoneAsZero(self, n):
+        if n is None:
+            return 0
+        return n
+    # IStatusWatcher
 
+    def initialStatus(self):
+        """
+        TODO
+        """
+
     def statusFromMessage(self, previousStatus, message):
         """
         Determine a subprocess socket's status from its previous status and a
@@ -257,22 +265,9 @@
                 f.myServer.myPort.startReading()
 
 
-    def intWithNoneAsZero(self, x):
-        """
-        Convert 'x' to an C{int}, unless x is C{None}, in which case return 0.
-        """
-        if x is None:
-            return 0
-        else:
-            return int(x)
-
-
     @property
     def outstandingRequests(self):
-        outstanding = 0
-        for status in self.dispatcher.statuses:
-            outstanding += self.intWithNoneAsZero(status)
-        return outstanding
+        return sum(map(self.intWithNoneAsZero, self.dispatcher.statuses))
 
 
 

Modified: CalendarServer/branches/users/glyph/hang-fix/twext/web2/test/test_metafd.py
===================================================================
--- CalendarServer/branches/users/glyph/hang-fix/twext/web2/test/test_metafd.py	2013-07-06 00:43:28 UTC (rev 11486)
+++ CalendarServer/branches/users/glyph/hang-fix/twext/web2/test/test_metafd.py	2013-07-06 00:43:30 UTC (rev 11487)
@@ -28,7 +28,7 @@
 from twext.web2.metafd import ReportingHTTPService, ConnectionLimiter
 from twisted.internet.tcp import Server
 from twisted.application.service import Service
-from twext.internet.sendfdport import InheritedSocketDispatcher
+
 from twext.internet.test.test_sendfdport import ReaderAdder
 from twisted.trial.unittest import TestCase
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130705/3b7d1aa4/attachment-0001.html>


More information about the calendarserver-changes mailing list