[CalendarServer-changes] [11487] CalendarServer/branches/users/glyph/hang-fix/twext
source_changes at macosforge.org
source_changes at macosforge.org
Fri Jul 5 17:43:30 PDT 2013
Revision: 11487
http://trac.calendarserver.org//changeset/11487
Author: glyph at apple.com
Date: 2013-07-05 17:43:30 -0700 (Fri, 05 Jul 2013)
Log Message:
-----------
Make status watcher interface explicit and documented, and add an 'initialStatus' method so that we don't have to have all these None checks.
Modified Paths:
--------------
CalendarServer/branches/users/glyph/hang-fix/twext/internet/sendfdport.py
CalendarServer/branches/users/glyph/hang-fix/twext/internet/test/test_sendfdport.py
CalendarServer/branches/users/glyph/hang-fix/twext/web2/metafd.py
CalendarServer/branches/users/glyph/hang-fix/twext/web2/test/test_metafd.py
Modified: CalendarServer/branches/users/glyph/hang-fix/twext/internet/sendfdport.py
===================================================================
--- CalendarServer/branches/users/glyph/hang-fix/twext/internet/sendfdport.py 2013-07-06 00:43:28 UTC (rev 11486)
+++ CalendarServer/branches/users/glyph/hang-fix/twext/internet/sendfdport.py 2013-07-06 00:43:30 UTC (rev 11487)
@@ -25,6 +25,8 @@
from socket import (socketpair, fromfd, error as SocketError, AF_UNIX,
SOCK_STREAM, SOCK_DGRAM)
+from zope.interface import Interface
+
from twisted.internet.abstract import FileDescriptor
from twisted.internet.protocol import Protocol, Factory
@@ -108,9 +110,9 @@
@type status: C{str}
"""
- def __init__(self, dispatcher, skt):
+ def __init__(self, dispatcher, skt, status):
FileDescriptor.__init__(self, dispatcher.reactor)
- self.status = None
+ self.status = status
self.dispatcher = dispatcher
self.skt = skt # XXX needs to be set non-blocking by somebody
self.fileno = skt.fileno
@@ -156,6 +158,69 @@
+class IStatusWatcher(Interface):
+ """
+ A provider of L{IStatusWatcher} tracks the I{status messages} reported by
+ the worker processes over their control sockets, and computes internal
+ I{status values} for those messages. The I{messages} are individual
+ octets, representing one of three operations. C{0} meaning "a new worker
+ process has started, with zero connections being processed", C{+} meaning
+ "I have received and am processing your request; I am confirming that my
+ requests-being-processed count has gone up by one", and C{-} meaning "I
+ have completed processing a request, my requests-being-processed count has
+ gone down by one". The I{status value} tracked by
+ L{_SubprocessSocket.status} is an integer, indicating the current
+ requests-being-processed value. (FIXME: the intended design here is
+ actually just that all I{this} object knows about is that
+ L{_SubprocessSocket.status} is an orderable value, and that this
+ C{statusWatcher} will compute appropriate values so the status that I{sorts
+ the least} is the socket to which new connections should be directed; also,
+ the format of the status messages is only known / understood by the
+ C{statusWatcher}, not the L{InheritedSocketDispatcher}. It's hard to
+ explain it in that manner though.)
+
+ @note: the intention of this interface is to eventually provide a broader
+ notion of what might constitute 'status', so the above explanation just
+ explains the current implementation, in for expediency's sake, rather
+ than the somewhat more abstract language that would be accurate.
+ """
+
+ def initialStatus():
+ """
+ A new socket was created and added to the dispatcher. Compute an
+ initial value for its status.
+
+ @return: the new status.
+ """
+
+
+ def newConnectionStatus(previousStatus):
+ """
+ A new connection was sent to a given socket. Compute its status based
+ on the previous status of that socket.
+
+ @param previousStatus: A status value for the socket being sent work,
+ previously returned by one of the methods on this interface.
+
+ @return: the socket's status after incrementing its outstanding work.
+ """
+
+
+ def statusFromMessage(previousStatus, message):
+ """
+ A status message was received by a worker. Convert the previous status
+ value (returned from L{newConnectionStatus}, L{initialStatus}, or
+ L{statusFromMessage}).
+
+ @param previousStatus: A status value for the socket being sent work,
+ previously returned by one of the methods on this interface.
+
+ @return: the socket's status after taking the reported message into
+ account.
+ """
+
+
+
class InheritedSocketDispatcher(object):
"""
Used by one or more L{InheritingProtocolFactory}s, this keeps track of a
@@ -165,29 +230,9 @@
L{InheritedSocketDispatcher} is therefore insantiated in the I{master
process}.
- @ivar statusWatcher: An object that tracks the I{status messages} reported
- by the worker processes over their control sockets, and computes
- internal I{status values} for those messages. The I{messages} are
- individual octets, representing one of three operations. C{0} meaning
- "a new worker process has started, with zero connections being
- processed", C{+} meaning "I have received and am processing your
- request; I am confirming that my requests-being-processed count has
- gone up by one", and C{-} meaning "I have completed processing a
- request, my requests-being-processed count has gone down by one". The
- I{status value} tracked by L{_SubprocessSocket.status} is an integer,
- indicating the current requests-being-processed value. (FIXME: the
- intended design here is actually just that all I{this} object knows
- about is that L{_SubprocessSocket.status} is an orderable value, and
- that this C{statusWatcher} will compute appropriate values so the
- status that I{sorts the least} is the socket to which new connections
- should be directed; also, the format of the status messages is only
- known / understood by the C{statusWatcher}, not the
- L{InheritedSocketDispatcher}. It's hard to explain it in that manner
- though.)
- @type statusWatcher: L{twext.web2.metafd.ConnectionLimiter} (FIXME: this
- should be a bit more abstract; declared in an interface or something,
- since we may in principle want to throttle connections from more than
- just HTTP.)
+ @ivar statusWatcher: The object which will handle status messages and
+ convert them into current statuses, as well as .
+ @type statusWatcher: L{IStatusWatcher}
"""
def __init__(self, statusWatcher):
@@ -272,7 +317,7 @@
i, o = socketpair(AF_UNIX, SOCK_DGRAM)
i.setblocking(False)
o.setblocking(False)
- a = _SubprocessSocket(self, o)
+ a = _SubprocessSocket(self, o, self.statusWatcher.initialStatus())
self._subprocessSockets.append(a)
if self._isDispatching:
a.startReading()
Modified: CalendarServer/branches/users/glyph/hang-fix/twext/internet/test/test_sendfdport.py
===================================================================
--- CalendarServer/branches/users/glyph/hang-fix/twext/internet/test/test_sendfdport.py 2013-07-06 00:43:28 UTC (rev 11486)
+++ CalendarServer/branches/users/glyph/hang-fix/twext/internet/test/test_sendfdport.py 2013-07-06 00:43:30 UTC (rev 11487)
@@ -1,3 +1,4 @@
+from twext.internet.sendfdport import IStatusWatcher
# -*- test-case-name: twext.internet.test.test_sendfdport -*-
##
# Copyright (c) 2010-2013 Apple Inc. All rights reserved.
@@ -65,20 +66,29 @@
+from zope.interface.verify import verifyClass
+from zope.interface import implementer
+
+def verifiedImplementer(interface):
+ def _(cls):
+ result = implementer(interface)(cls)
+ verifyClass(interface, result)
+ return result
+ return _
+
+
+
+ at verifiedImplementer(IStatusWatcher)
class Watcher(object):
def __init__(self, q):
self.q = q
def newConnectionStatus(self, previous):
- if previous is None:
- previous = 0
return previous + 1
def statusFromMessage(self, previous, message):
- if previous is None:
- previous = 0
return previous - 1
@@ -86,7 +96,11 @@
self.q.append(list(statuses))
+ def initialStatus(self):
+ return 0
+
+
class InheritedSocketDispatcherTests(TestCase):
"""
Inherited socket dispatcher tests.
Modified: CalendarServer/branches/users/glyph/hang-fix/twext/web2/metafd.py
===================================================================
--- CalendarServer/branches/users/glyph/hang-fix/twext/web2/metafd.py 2013-07-06 00:43:28 UTC (rev 11486)
+++ CalendarServer/branches/users/glyph/hang-fix/twext/web2/metafd.py 2013-07-06 00:43:30 UTC (rev 11487)
@@ -195,9 +195,17 @@
).setServiceParent(self)
- # implementation of implicit statusWatcher interface required by
- # InheritedSocketDispatcher
+ def intWithNoneAsZero(self, n):
+ if n is None:
+ return 0
+ return n
+ # IStatusWatcher
+ def initialStatus(self):
+ """
+ TODO
+ """
+
def statusFromMessage(self, previousStatus, message):
"""
Determine a subprocess socket's status from its previous status and a
@@ -257,22 +265,9 @@
f.myServer.myPort.startReading()
- def intWithNoneAsZero(self, x):
- """
- Convert 'x' to an C{int}, unless x is C{None}, in which case return 0.
- """
- if x is None:
- return 0
- else:
- return int(x)
-
-
@property
def outstandingRequests(self):
- outstanding = 0
- for status in self.dispatcher.statuses:
- outstanding += self.intWithNoneAsZero(status)
- return outstanding
+ return sum(map(self.intWithNoneAsZero, self.dispatcher.statuses))
Modified: CalendarServer/branches/users/glyph/hang-fix/twext/web2/test/test_metafd.py
===================================================================
--- CalendarServer/branches/users/glyph/hang-fix/twext/web2/test/test_metafd.py 2013-07-06 00:43:28 UTC (rev 11486)
+++ CalendarServer/branches/users/glyph/hang-fix/twext/web2/test/test_metafd.py 2013-07-06 00:43:30 UTC (rev 11487)
@@ -28,7 +28,7 @@
from twext.web2.metafd import ReportingHTTPService, ConnectionLimiter
from twisted.internet.tcp import Server
from twisted.application.service import Service
-from twext.internet.sendfdport import InheritedSocketDispatcher
+
from twext.internet.test.test_sendfdport import ReaderAdder
from twisted.trial.unittest import TestCase
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130705/3b7d1aa4/attachment-0001.html>
More information about the calendarserver-changes
mailing list