[CalendarServer-changes] [11672] CalendarServer/trunk/twext
source_changes at macosforge.org
source_changes at macosforge.org
Wed Sep 11 17:09:20 PDT 2013
Revision: 11672
http://trac.calendarserver.org//changeset/11672
Author: glyph at apple.com
Date: 2013-09-11 17:09:20 -0700 (Wed, 11 Sep 2013)
Log Message:
-----------
Address an issue where sockets may be closed too early by the master process.
Modified Paths:
--------------
CalendarServer/trunk/twext/internet/sendfdport.py
CalendarServer/trunk/twext/internet/test/test_sendfdport.py
CalendarServer/trunk/twext/web2/metafd.py
CalendarServer/trunk/twext/web2/test/test_metafd.py
Modified: CalendarServer/trunk/twext/internet/sendfdport.py
===================================================================
--- CalendarServer/trunk/twext/internet/sendfdport.py 2013-09-11 22:02:06 UTC (rev 11671)
+++ CalendarServer/trunk/twext/internet/sendfdport.py 2013-09-12 00:09:20 UTC (rev 11672)
@@ -95,6 +95,7 @@
used to transmit sockets to a subprocess.
@ivar skt: the UNIX socket used as the sendmsg() transport.
+ @type skt: L{socket.socket}
@ivar outgoingSocketQueue: an outgoing queue of sockets to send to the
subprocess, along with their descriptions (strings describing their
@@ -107,7 +108,11 @@
from the subprocess: this is an application-specific indication of how
ready this subprocess is to receive more connections. A typical usage
would be to count the open connections: this is what is passed to
- @type status: C{str}
+ @type status: See L{IStatusWatcher} for an explanation of which methods
+ determine this type.
+
+ @ivar dispatcher: The socket dispatcher that owns this L{_SubprocessSocket}
+ @type dispatcher: L{InheritedSocketDispatcher}
"""
def __init__(self, dispatcher, skt, status):
@@ -117,6 +122,7 @@
self.skt = skt # XXX needs to be set non-blocking by somebody
self.fileno = skt.fileno
self.outgoingSocketQueue = []
+ self.pendingCloseSocketQueue = []
def sendSocketToPeer(self, skt, description):
@@ -127,7 +133,7 @@
self.startWriting()
- def doRead(self):
+ def doRead(self, recvmsg=recvmsg):
"""
Receive a status / health message and record it.
"""
@@ -137,10 +143,12 @@
if se.errno not in (EAGAIN, ENOBUFS):
raise
else:
- self.dispatcher.statusMessage(self, data)
+ closeCount = self.dispatcher.statusMessage(self, data)
+ for ignored in xrange(closeCount):
+ self.pendingCloseSocketQueue.pop(0).close()
- def doWrite(self):
+ def doWrite(self, sendfd=sendfd):
"""
Transmit as many queued pending file descriptors as we can.
"""
@@ -154,8 +162,8 @@
return
raise
- # Always close the socket on this end
- skt.close()
+ # Ready to close this socket; wait until it is acknowledged.
+ self.pendingCloseSocketQueue.append(skt)
if not self.outgoingSocketQueue:
self.stopWriting()
@@ -197,6 +205,7 @@
@return: the new status.
"""
+
def newConnectionStatus(previousStatus): #@NoSelf
"""
A new connection was sent to a given socket. Compute its status based
@@ -208,6 +217,7 @@
@return: the socket's status after incrementing its outstanding work.
"""
+
def statusFromMessage(previousStatus, message): #@NoSelf
"""
A status message was received by a worker. Convert the previous status
@@ -222,7 +232,18 @@
"""
+ def closeCountFromStatus(previousStatus): #@NoSelf
+ """
+ Based on a status previously returned from a method on this
+ L{IStatusWatcher}, determine how many sockets may be closed.
+ @return: a 2-tuple of C{number of sockets that may safely be closed},
+ C{new status}.
+ @rtype: 2-tuple of (C{int}, C{<opaque>})
+ """
+
+
+
class InheritedSocketDispatcher(object):
"""
Used by one or more L{InheritingProtocolFactory}s, this keeps track of a
@@ -262,10 +283,11 @@
The status of a connection has changed; update all registered status
change listeners.
"""
- subsocket.status = self.statusWatcher.statusFromMessage(
- subsocket.status, message
- )
- self.statusWatcher.statusesChanged(self.statuses)
+ watcher = self.statusWatcher
+ status = watcher.statusFromMessage(subsocket.status, message)
+ closeCount, subsocket.status = watcher.closeCountFromStatus(status)
+ watcher.statusesChanged(self.statuses)
+ return closeCount
def sendFileDescriptor(self, skt, description):
@@ -293,7 +315,7 @@
# XXX Maybe want to send along 'description' or 'skt' or some
# properties thereof? -glyph
selectedSocket.status = self.statusWatcher.newConnectionStatus(
- selectedSocket.status
+ selectedSocket.status
)
self.statusWatcher.statusesChanged(self.statuses)
@@ -307,7 +329,7 @@
subSocket.startReading()
- def addSocket(self):
+ def addSocket(self, socketpair=lambda: socketpair(AF_UNIX, SOCK_DGRAM)):
"""
Add a C{sendmsg()}-oriented AF_UNIX socket to the pool of sockets being
used for transmitting file descriptors to child processes.
@@ -316,7 +338,7 @@
C{fileno()} as part of the C{childFDs} argument to
C{spawnProcess()}, then close it.
"""
- i, o = socketpair(AF_UNIX, SOCK_DGRAM)
+ i, o = socketpair()
i.setblocking(False)
o.setblocking(False)
a = _SubprocessSocket(self, o, self.statusWatcher.initialStatus())
Modified: CalendarServer/trunk/twext/internet/test/test_sendfdport.py
===================================================================
--- CalendarServer/trunk/twext/internet/test/test_sendfdport.py 2013-09-11 22:02:06 UTC (rev 11671)
+++ CalendarServer/trunk/twext/internet/test/test_sendfdport.py 2013-09-12 00:09:20 UTC (rev 11672)
@@ -23,14 +23,25 @@
import os
import fcntl
+from zope.interface.verify import verifyClass
+from zope.interface import implementer
+
from twext.internet.sendfdport import InheritedSocketDispatcher
from twext.web2.metafd import ConnectionLimiter
from twisted.internet.interfaces import IReactorFDSet
from twisted.trial.unittest import TestCase
-from zope.interface import implementer
- at implementer(IReactorFDSet)
+def verifiedImplementer(interface):
+ def _(cls):
+ result = implementer(interface)(cls)
+ verifyClass(interface, result)
+ return result
+ return _
+
+
+
+ at verifiedImplementer(IReactorFDSet)
class ReaderAdder(object):
def __init__(self):
@@ -50,7 +61,23 @@
self.writers.append(writer)
+ def removeAll(self):
+ self.__init__()
+
+ def getWriters(self):
+ return self.writers[:]
+
+
+ def removeReader(self, reader):
+ self.readers.remove(reader)
+
+
+ def removeWriter(self, writer):
+ self.writers.remove(writer)
+
+
+
def isNonBlocking(skt):
"""
Determine if the given socket is blocking or not.
@@ -66,22 +93,11 @@
-from zope.interface.verify import verifyClass
-from zope.interface import implementer
-
-def verifiedImplementer(interface):
- def _(cls):
- result = implementer(interface)(cls)
- verifyClass(interface, result)
- return result
- return _
-
-
-
@verifiedImplementer(IStatusWatcher)
class Watcher(object):
def __init__(self, q):
self.q = q
+ self._closeCounter = 1
def newConnectionStatus(self, previous):
@@ -100,7 +116,13 @@
return 0
+ def closeCountFromStatus(self, status):
+ result = (self._closeCounter, status)
+ self._closeCounter += 1
+ return result
+
+
class InheritedSocketDispatcherTests(TestCase):
"""
Inherited socket dispatcher tests.
@@ -110,6 +132,51 @@
self.dispatcher.reactor = ReaderAdder()
+ def test_closeSomeSockets(self):
+ """
+ L{InheritedSocketDispatcher} determines how many sockets to close from
+ L{IStatusWatcher.closeCountFromStatus}.
+ """
+ self.dispatcher.statusWatcher = Watcher([])
+ class SocketForClosing(object):
+ blocking = True
+ closed = False
+ def setblocking(self, b):
+ self.blocking = b
+ def fileno(self):
+ return object()
+ def close(self):
+ self.closed = True
+
+ one = SocketForClosing()
+ two = SocketForClosing()
+ three = SocketForClosing()
+
+ self.dispatcher.addSocket(
+ lambda: (SocketForClosing(), SocketForClosing())
+ )
+
+ self.dispatcher.sendFileDescriptor(one, "one")
+ self.dispatcher.sendFileDescriptor(two, "two")
+ self.dispatcher.sendFileDescriptor(three, "three")
+ def sendfd(unixSocket, tcpSocket, description):
+ pass
+ # Put something into the socket-close queue.
+ self.dispatcher._subprocessSockets[0].doWrite(sendfd)
+ # Nothing closed yet.
+ self.assertEquals(one.closed, False)
+ self.assertEquals(two.closed, False)
+ self.assertEquals(three.closed, False)
+
+ def recvmsg(fileno):
+ return 'data', 0, 0
+ self.dispatcher._subprocessSockets[0].doRead(recvmsg)
+ # One socket closed.
+ self.assertEquals(one.closed, True)
+ self.assertEquals(two.closed, False)
+ self.assertEquals(three.closed, False)
+
+
def test_nonBlocking(self):
"""
Creating a L{_SubprocessSocket} via
@@ -165,6 +232,7 @@
message = "whatever"
# Need to have a socket that will accept the descriptors.
dispatcher.addSocket()
- dispatcher.statusMessage(dispatcher._subprocessSockets[0], message)
- dispatcher.statusMessage(dispatcher._subprocessSockets[0], message)
+ subskt = dispatcher._subprocessSockets[0]
+ dispatcher.statusMessage(subskt, message)
+ dispatcher.statusMessage(subskt, message)
self.assertEquals(q, [[-1], [-2]])
Modified: CalendarServer/trunk/twext/web2/metafd.py
===================================================================
--- CalendarServer/trunk/twext/web2/metafd.py 2013-09-11 22:02:06 UTC (rev 11671)
+++ CalendarServer/trunk/twext/web2/metafd.py 2013-09-12 00:09:20 UTC (rev 11672)
@@ -23,6 +23,8 @@
from functools import total_ordering
+from zope.interface import implementer
+
from twext.internet.sendfdport import (
InheritedPort, InheritedSocketDispatcher, InheritingProtocolFactory)
from twext.internet.tcp import MaxAcceptTCPServer
@@ -32,6 +34,7 @@
from twisted.internet import reactor
from twisted.python.util import FancyStrMixin
from twisted.internet.tcp import Server
+from twext.internet.sendfdport import IStatusWatcher
log = Logger()
@@ -167,10 +170,11 @@
The status of a worker process.
"""
- showAttributes = "acknowledged unacknowledged started abandoned".split()
+ showAttributes = ("acknowledged unacknowledged started abandoned unclosed"
+ .split())
def __init__(self, acknowledged=0, unacknowledged=0, started=0,
- abandoned=0):
+ abandoned=0, unclosed=0):
"""
Create a L{ConnectionStatus} with a number of sent connections and a
number of un-acknowledged connections.
@@ -188,11 +192,15 @@
worker restarted.
@param started: The number of times this worker has been started.
+
+ @param unclosed: The number of sockets which have been sent to the
+ subprocess but not yet closed.
"""
self.acknowledged = acknowledged
self.unacknowledged = unacknowledged
self.started = started
self.abandoned = abandoned
+ self.unclosed = unclosed
def effective(self):
@@ -211,8 +219,7 @@
def _tuplify(self):
- return (self.acknowledged, self.unacknowledged, self.started,
- self.abandoned)
+ return tuple(getattr(self, attr) for attr in self.showAttributes)
def __lt__(self, other):
@@ -230,22 +237,20 @@
def __add__(self, other):
if not isinstance(other, WorkerStatus):
return NotImplemented
- return self.__class__(self.acknowledged + other.acknowledged,
- self.unacknowledged + other.unacknowledged,
- self.started + other.started,
- self.abandoned + other.abandoned)
+ a = self._tuplify()
+ b = other._tuplify()
+ c = [a1 + b1 for (a1, b1) in zip(a, b)]
+ return self.__class__(*c)
def __sub__(self, other):
if not isinstance(other, WorkerStatus):
return NotImplemented
- return self + self.__class__(-other.acknowledged,
- -other.unacknowledged,
- -other.started,
- -other.abandoned)
+ return self + self.__class__(*[-x for x in other._tuplify()])
+ at implementer(IStatusWatcher)
class ConnectionLimiter(MultiService, object):
"""
Connection limiter for use with L{InheritedSocketDispatcher}.
@@ -253,6 +258,8 @@
This depends on statuses being reported by L{ReportingHTTPFactory}
"""
+ _outstandingRequests = 0
+
def __init__(self, maxAccepts, maxRequests):
"""
Create a L{ConnectionLimiter} with an associated dispatcher and
@@ -319,9 +326,18 @@
else:
# '+' acknowledges that the subprocess has taken on the work.
return previousStatus + WorkerStatus(acknowledged=1,
- unacknowledged=-1)
+ unacknowledged=-1,
+ unclosed=1)
+ def closeCountFromStatus(self, status):
+ """
+ Determine the number of sockets to close from the current status.
+ """
+ toClose = status.unclosed
+ return (toClose, status - WorkerStatus(unclosed=toClose))
+
+
def newConnectionStatus(self, previousStatus):
"""
Determine the effect of a new connection being sent on a subprocess
@@ -352,7 +368,6 @@
f.myServer.myPort.startReading()
- _outstandingRequests = 0
@property # make read-only
def outstandingRequests(self):
return self._outstandingRequests
Modified: CalendarServer/trunk/twext/web2/test/test_metafd.py
===================================================================
--- CalendarServer/trunk/twext/web2/test/test_metafd.py 2013-09-11 22:02:06 UTC (rev 11671)
+++ CalendarServer/trunk/twext/web2/test/test_metafd.py 2013-09-12 00:09:20 UTC (rev 11672)
@@ -61,6 +61,7 @@
return ("4.3.2.1", 4321)
+
class InheritedPortForTesting(sendfdport.InheritedPort):
"""
L{sendfdport.InheritedPort} subclass that prevents certain I/O operations
@@ -92,15 +93,19 @@
def startReading(self):
"Do nothing."
+
def stopReading(self):
"Do nothing."
+
def startWriting(self):
"Do nothing."
+
def stopWriting(self):
"Do nothing."
+
def __init__(self, *a, **kw):
super(ServerTransportForTesting, self).__init__(*a, **kw)
self.reactor = None
@@ -198,9 +203,9 @@
L{WorkerStatus.__repr__} will show all the values associated with the
status of the worker.
"""
- self.assertEquals(repr(WorkerStatus(1, 2, 3, 4)),
+ self.assertEquals(repr(WorkerStatus(1, 2, 3, 4, 5)),
"<WorkerStatus acknowledged=1 unacknowledged=2 "
- "started=3 abandoned=4>")
+ "started=3 abandoned=4 unclosed=5>")
@@ -237,7 +242,8 @@
def serverServiceMaker(port, factory, *a, **k):
s.factory = factory
s.myPort = NotAPort()
- s.myPort.startReading() # TODO: technically, should wait for startService
+ # TODO: technically, the following should wait for startService
+ s.myPort.startReading()
factory.myServer = s
return s
return serverServiceMaker
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130911/591cee5f/attachment-0001.html>
More information about the calendarserver-changes
mailing list