[CalendarServer-changes] [11489] CalendarServer/branches/users/glyph/hang-fix/twext
source_changes at macosforge.org
source_changes at macosforge.org
Fri Jul 5 17:43:34 PDT 2013
Revision: 11489
http://trac.calendarserver.org//changeset/11489
Author: glyph at apple.com
Date: 2013-07-05 17:43:34 -0700 (Fri, 05 Jul 2013)
Log Message:
-----------
Remove some overly-specific white-box tests in favor of those which test the inter-process interface. Stop taking unacknowledged requests into account when computing the 'outstandingRequests' attribute in the master, since processes which have been restarted might have connections which never arrived.
Modified Paths:
--------------
CalendarServer/branches/users/glyph/hang-fix/twext/internet/test/test_sendfdport.py
CalendarServer/branches/users/glyph/hang-fix/twext/web2/metafd.py
CalendarServer/branches/users/glyph/hang-fix/twext/web2/test/test_metafd.py
Modified: CalendarServer/branches/users/glyph/hang-fix/twext/internet/test/test_sendfdport.py
===================================================================
--- CalendarServer/branches/users/glyph/hang-fix/twext/internet/test/test_sendfdport.py 2013-07-06 00:43:32 UTC (rev 11488)
+++ CalendarServer/branches/users/glyph/hang-fix/twext/internet/test/test_sendfdport.py 2013-07-06 00:43:34 UTC (rev 11489)
@@ -137,56 +137,6 @@
dispatcher._subprocessSockets)
- def test_sendFileDescriptorSorting(self):
- """
- Make sure InheritedSocketDispatcher.sendFileDescriptor sorts sockets
- with status None higher than those with int status values.
- """
- dispatcher = self.dispatcher
- dispatcher.addSocket()
- dispatcher.addSocket()
- dispatcher.addSocket()
-
- sockets = dispatcher._subprocessSockets[:]
-
- # Check that 0 is preferred over None
- sockets[0].status = 0
- sockets[1].status = 1
- sockets[2].status = None
-
- dispatcher.sendFileDescriptor(None, "")
-
- self.assertEqual(sockets[0].status, 1)
- self.assertEqual(sockets[1].status, 1)
- self.assertEqual(sockets[2].status, None)
-
- dispatcher.sendFileDescriptor(None, "")
-
- self.assertEqual(sockets[0].status, 1)
- self.assertEqual(sockets[1].status, 1)
- self.assertEqual(sockets[2].status, 1)
-
- # Check that after going to 1 and back to 0 that is still preferred
- # over None
- sockets[0].status = 0
- sockets[1].status = 1
- sockets[2].status = None
-
- dispatcher.sendFileDescriptor(None, "")
-
- self.assertEqual(sockets[0].status, 1)
- self.assertEqual(sockets[1].status, 1)
- self.assertEqual(sockets[2].status, None)
-
- sockets[1].status = 0
-
- dispatcher.sendFileDescriptor(None, "")
-
- self.assertEqual(sockets[0].status, 1)
- self.assertEqual(sockets[1].status, 1)
- self.assertEqual(sockets[2].status, None)
-
-
def test_statusesChangedOnNewConnection(self):
"""
L{InheritedSocketDispatcher.sendFileDescriptor} will update its
Modified: CalendarServer/branches/users/glyph/hang-fix/twext/web2/metafd.py
===================================================================
--- CalendarServer/branches/users/glyph/hang-fix/twext/web2/metafd.py 2013-07-06 00:43:32 UTC (rev 11488)
+++ CalendarServer/branches/users/glyph/hang-fix/twext/web2/metafd.py 2013-07-06 00:43:34 UTC (rev 11489)
@@ -15,6 +15,14 @@
# limitations under the License.
##
+"""
+Implementation of dispatching HTTP connections to child processes using
+L{twext.internet.sendfdport.InheritedSocketDispatcher}.
+"""
+from __future__ import print_function
+
+from functools import total_ordering
+
from twext.internet.sendfdport import (
InheritedPort, InheritedSocketDispatcher, InheritingProtocolFactory)
from twext.internet.tcp import MaxAcceptTCPServer
@@ -152,6 +160,73 @@
+ at total_ordering
+class WorkerStatus(object):
+ """
+ The status of a worker process.
+ """
+
+ def __init__(self, acknowledged=0, unacknowledged=0, started=0):
+ """
+ Create a L{ConnectionStatus} with a number of sent connections and a
+ number of un-acknowledged connections.
+
+ @param acknowledged: the number of connections which we know the
+ subprocess to be presently processing; i.e. those which have been
+ transmitted to the subprocess.
+
+ @param unacknowledged: The number of connections which we have sent to
+ the subprocess which have never received a status response (a
+ "C{+}" status message).
+
+ @param started: The number of times this worker has been started.
+ """
+ self.acknowledged = acknowledged
+ self.unacknowledged = unacknowledged
+ self.started = started
+
+
+ def restarted(self):
+ """
+ The L{WorkerStatus} derived from the current status of a process and
+ the fact that it just restarted.
+ """
+ return self.__class__(0, self.unacknowledged, self.started + 1)
+
+
+ def _tuplify(self):
+ return (self.acknowledged, self.unacknowledged, self.started)
+
+
+ def __lt__(self, other):
+ if not isinstance(other, WorkerStatus):
+ return NotImplemented
+ return self._tuplify() < other._tuplify()
+
+
+ def __eq__(self, other):
+ if not isinstance(other, WorkerStatus):
+ return NotImplemented
+ return self._tuplify() == other._tuplify()
+
+
+ def __add__(self, other):
+ if not isinstance(other, WorkerStatus):
+ return NotImplemented
+ return self.__class__(self.acknowledged + other.acknowledged,
+ self.unacknowledged + other.unacknowledged,
+ self.started + other.started)
+
+
+ def __sub__(self, other):
+ if not isinstance(other, WorkerStatus):
+ return NotImplemented
+ return self + self.__class__(-other.acknowledged,
+ -other.unacknowledged,
+ -other.started)
+
+
+
class ConnectionLimiter(MultiService, object):
"""
Connection limiter for use with L{InheritedSocketDispatcher}.
@@ -195,17 +270,15 @@
).setServiceParent(self)
- def intWithNoneAsZero(self, n):
- if n is None:
- return 0
- return n
# IStatusWatcher
def initialStatus(self):
"""
- TODO
+ The status of a new worker added to the pool.
"""
+ return WorkerStatus()
+
def statusFromMessage(self, previousStatus, message):
"""
Determine a subprocess socket's status from its previous status and a
@@ -215,25 +288,19 @@
# A connection has gone away in a subprocess; we should start
# accepting connections again if we paused (see
# newConnectionStatus)
- result = self.intWithNoneAsZero(previousStatus) - 1
- if result < 0:
- log.error("metafd: trying to decrement status below zero")
- result = 0
+ return previousStatus - WorkerStatus(acknowledged=1)
elif message == '0':
- # A new process just started accepting new connections; zero
- # out its expected load, but only if previous status is still
- # None
- result = 0 if previousStatus is None else previousStatus
- if previousStatus is None:
- result = 0
- else:
- log.error("metafd: trying to zero status that is not None")
- result = previousStatus
+ # A new process just started accepting new connections. It might
+ # still have some unacknowledged connections, but any connections
+ # that it acknowledged working on are now completed. (We have no
+ # way of knowing whether the acknowledged connections were acted
+ # upon or dropped, so we have to treat that number with a healthy
+ # amount of skepticism.)
+ return previousStatus.restarted()
else:
- # '+' is just an acknowledgement of newConnectionStatus, so we can
- # ignore it.
- result = self.intWithNoneAsZero(previousStatus)
- return result
+ # '+' acknowledges that the subprocess has taken on the work.
+ return previousStatus + WorkerStatus(acknowledged=1,
+ unacknowledged=-1)
def newConnectionStatus(self, previousStatus):
@@ -241,8 +308,7 @@
Determine the effect of a new connection being sent on a subprocess
socket.
"""
- result = self.intWithNoneAsZero(previousStatus) + 1
- return result
+ return previousStatus + WorkerStatus(unacknowledged=1)
def statusesChanged(self, statuses):
@@ -254,7 +320,9 @@
C{self.dispatcher.statuses} attribute, which is what
C{self.outstandingRequests} uses to compute it.)
"""
- current = self.outstandingRequests + 1
+ current = sum(status.acknowledged
+ for status in self.dispatcher.statuses)
+ self._outstandingRequests = current # preserve for or= field in log
maximum = self.maxRequests
overloaded = (current >= maximum)
if overloaded:
@@ -265,9 +333,10 @@
f.myServer.myPort.startReading()
- @property
+ _outstandingRequests = 0
+ @property # make read-only
def outstandingRequests(self):
- return sum(map(self.intWithNoneAsZero, self.dispatcher.statuses))
+ return self._outstandingRequests
Modified: CalendarServer/branches/users/glyph/hang-fix/twext/web2/test/test_metafd.py
===================================================================
--- CalendarServer/branches/users/glyph/hang-fix/twext/web2/test/test_metafd.py 2013-07-06 00:43:32 UTC (rev 11488)
+++ CalendarServer/branches/users/glyph/hang-fix/twext/web2/test/test_metafd.py 2013-07-06 00:43:34 UTC (rev 11489)
@@ -153,36 +153,6 @@
Tests for L{ConnectionLimiter}
"""
- def test_statusFromMessage(self):
- """
- L{ConnectionLimiter.statusFromMessage} will not return a value below
- zero, and a "0" status - a new process reporting its launching - will
- leave the previous status value the same, on the assumption that the 0
- is simply being reported late, and the master has given the worker some
- work to do before it reports its initial status.
- """
-
- cl = ConnectionLimiter(2, 20)
-
- # "0" Zeroing out does not overwrite legitimate count
- self.assertEqual(cl.statusFromMessage(None, "0"), 0)
- self.assertEqual(cl.statusFromMessage(0, "0"), 0)
- self.assertEqual(cl.statusFromMessage(1, "0"), 1)
- self.assertEqual(cl.statusFromMessage(2, "0"), 2)
-
- # "-" No negative counts
- self.assertEqual(cl.statusFromMessage(None, "-"), 0)
- self.assertEqual(cl.statusFromMessage(0, "-"), 0)
- self.assertEqual(cl.statusFromMessage(1, "-"), 0)
- self.assertEqual(cl.statusFromMessage(2, "-"), 1)
-
- # "+" No change
- self.assertEqual(cl.statusFromMessage(None, "+"), 0)
- self.assertEqual(cl.statusFromMessage(0, "+"), 0)
- self.assertEqual(cl.statusFromMessage(1, "+"), 1)
- self.assertEqual(cl.statusFromMessage(2, "+"), 2)
-
-
def test_loadReducedStartsReadingAgain(self):
"""
L{ConnectionLimiter.statusesChanged} determines whether the current
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130705/84c46d3c/attachment-0001.html>
More information about the calendarserver-changes
mailing list