[CalendarServer-changes] [11672] CalendarServer/trunk/twext

source_changes at macosforge.org source_changes at macosforge.org
Wed Sep 11 17:09:20 PDT 2013


Revision: 11672
          http://trac.calendarserver.org//changeset/11672
Author:   glyph at apple.com
Date:     2013-09-11 17:09:20 -0700 (Wed, 11 Sep 2013)
Log Message:
-----------
Address an issue where sockets may be closed too early by the master process.

Modified Paths:
--------------
    CalendarServer/trunk/twext/internet/sendfdport.py
    CalendarServer/trunk/twext/internet/test/test_sendfdport.py
    CalendarServer/trunk/twext/web2/metafd.py
    CalendarServer/trunk/twext/web2/test/test_metafd.py

Modified: CalendarServer/trunk/twext/internet/sendfdport.py
===================================================================
--- CalendarServer/trunk/twext/internet/sendfdport.py	2013-09-11 22:02:06 UTC (rev 11671)
+++ CalendarServer/trunk/twext/internet/sendfdport.py	2013-09-12 00:09:20 UTC (rev 11672)
@@ -95,6 +95,7 @@
     used to transmit sockets to a subprocess.
 
     @ivar skt: the UNIX socket used as the sendmsg() transport.
+    @type skt: L{socket.socket}
 
     @ivar outgoingSocketQueue: an outgoing queue of sockets to send to the
         subprocess, along with their descriptions (strings describing their
@@ -107,7 +108,11 @@
         from the subprocess: this is an application-specific indication of how
         ready this subprocess is to receive more connections.  A typical usage
         would be to count the open connections: this is what is passed to
-    @type status: C{str}
+    @type status: See L{IStatusWatcher} for an explanation of which methods
+        determine this type.
+
+    @ivar dispatcher: The socket dispatcher that owns this L{_SubprocessSocket}
+    @type dispatcher: L{InheritedSocketDispatcher}
     """
 
     def __init__(self, dispatcher, skt, status):
@@ -117,6 +122,7 @@
         self.skt = skt          # XXX needs to be set non-blocking by somebody
         self.fileno = skt.fileno
         self.outgoingSocketQueue = []
+        self.pendingCloseSocketQueue = []
 
 
     def sendSocketToPeer(self, skt, description):
@@ -127,7 +133,7 @@
         self.startWriting()
 
 
-    def doRead(self):
+    def doRead(self, recvmsg=recvmsg):
         """
         Receive a status / health message and record it.
         """
@@ -137,10 +143,12 @@
             if se.errno not in (EAGAIN, ENOBUFS):
                 raise
         else:
-            self.dispatcher.statusMessage(self, data)
+            closeCount = self.dispatcher.statusMessage(self, data)
+            for ignored in xrange(closeCount):
+                self.pendingCloseSocketQueue.pop(0).close()
 
 
-    def doWrite(self):
+    def doWrite(self, sendfd=sendfd):
         """
         Transmit as many queued pending file descriptors as we can.
         """
@@ -154,8 +162,8 @@
                     return
                 raise
 
-            # Always close the socket on this end
-            skt.close()
+            # Ready to close this socket; wait until it is acknowledged.
+            self.pendingCloseSocketQueue.append(skt)
 
         if not self.outgoingSocketQueue:
             self.stopWriting()
@@ -197,6 +205,7 @@
         @return: the new status.
         """
 
+
     def newConnectionStatus(previousStatus): #@NoSelf
         """
         A new connection was sent to a given socket.  Compute its status based
@@ -208,6 +217,7 @@
         @return: the socket's status after incrementing its outstanding work.
         """
 
+
     def statusFromMessage(previousStatus, message): #@NoSelf
         """
         A status message was received by a worker.  Convert the previous status
@@ -222,7 +232,18 @@
         """
 
 
+    def closeCountFromStatus(previousStatus): #@NoSelf
+        """
+        Based on a status previously returned from a method on this
+        L{IStatusWatcher}, determine how many sockets may be closed.
 
+        @return: a 2-tuple of C{number of sockets that may safely be closed},
+            C{new status}.
+        @rtype: 2-tuple of (C{int}, C{<opaque>})
+        """
+
+
+
 class InheritedSocketDispatcher(object):
     """
     Used by one or more L{InheritingProtocolFactory}s, this keeps track of a
@@ -262,10 +283,11 @@
         The status of a connection has changed; update all registered status
         change listeners.
         """
-        subsocket.status = self.statusWatcher.statusFromMessage(
-            subsocket.status, message
-        )
-        self.statusWatcher.statusesChanged(self.statuses)
+        watcher = self.statusWatcher
+        status = watcher.statusFromMessage(subsocket.status, message)
+        closeCount, subsocket.status = watcher.closeCountFromStatus(status)
+        watcher.statusesChanged(self.statuses)
+        return closeCount
 
 
     def sendFileDescriptor(self, skt, description):
@@ -293,7 +315,7 @@
         # XXX Maybe want to send along 'description' or 'skt' or some
         # properties thereof? -glyph
         selectedSocket.status = self.statusWatcher.newConnectionStatus(
-           selectedSocket.status
+            selectedSocket.status
         )
         self.statusWatcher.statusesChanged(self.statuses)
 
@@ -307,7 +329,7 @@
             subSocket.startReading()
 
 
-    def addSocket(self):
+    def addSocket(self, socketpair=lambda: socketpair(AF_UNIX, SOCK_DGRAM)):
         """
         Add a C{sendmsg()}-oriented AF_UNIX socket to the pool of sockets being
         used for transmitting file descriptors to child processes.
@@ -316,7 +338,7 @@
             C{fileno()} as part of the C{childFDs} argument to
             C{spawnProcess()}, then close it.
         """
-        i, o = socketpair(AF_UNIX, SOCK_DGRAM)
+        i, o = socketpair()
         i.setblocking(False)
         o.setblocking(False)
         a = _SubprocessSocket(self, o, self.statusWatcher.initialStatus())

Modified: CalendarServer/trunk/twext/internet/test/test_sendfdport.py
===================================================================
--- CalendarServer/trunk/twext/internet/test/test_sendfdport.py	2013-09-11 22:02:06 UTC (rev 11671)
+++ CalendarServer/trunk/twext/internet/test/test_sendfdport.py	2013-09-12 00:09:20 UTC (rev 11672)
@@ -23,14 +23,25 @@
 import os
 import fcntl
 
+from zope.interface.verify import verifyClass
+from zope.interface import implementer
+
 from twext.internet.sendfdport import InheritedSocketDispatcher
 
 from twext.web2.metafd import ConnectionLimiter
 from twisted.internet.interfaces import IReactorFDSet
 from twisted.trial.unittest import TestCase
-from zope.interface import implementer
 
- at implementer(IReactorFDSet)
+def verifiedImplementer(interface):
+    def _(cls):
+        result = implementer(interface)(cls)
+        verifyClass(interface, result)
+        return result
+    return _
+
+
+
+ at verifiedImplementer(IReactorFDSet)
 class ReaderAdder(object):
 
     def __init__(self):
@@ -50,7 +61,23 @@
         self.writers.append(writer)
 
 
+    def removeAll(self):
+        self.__init__()
 
+
+    def getWriters(self):
+        return self.writers[:]
+
+
+    def removeReader(self, reader):
+        self.readers.remove(reader)
+
+
+    def removeWriter(self, writer):
+        self.writers.remove(writer)
+
+
+
 def isNonBlocking(skt):
     """
     Determine if the given socket is blocking or not.
@@ -66,22 +93,11 @@
 
 
 
-from zope.interface.verify import verifyClass
-from zope.interface import implementer
-
-def verifiedImplementer(interface):
-    def _(cls):
-        result = implementer(interface)(cls)
-        verifyClass(interface, result)
-        return result
-    return _
-
-
-
 @verifiedImplementer(IStatusWatcher)
 class Watcher(object):
     def __init__(self, q):
         self.q = q
+        self._closeCounter = 1
 
 
     def newConnectionStatus(self, previous):
@@ -100,7 +116,13 @@
         return 0
 
 
+    def closeCountFromStatus(self, status):
+        result = (self._closeCounter, status)
+        self._closeCounter += 1
+        return result
 
+
+
 class InheritedSocketDispatcherTests(TestCase):
     """
     Inherited socket dispatcher tests.
@@ -110,6 +132,51 @@
         self.dispatcher.reactor = ReaderAdder()
 
 
+    def test_closeSomeSockets(self):
+        """
+        L{InheritedSocketDispatcher} determines how many sockets to close from
+        L{IStatusWatcher.closeCountFromStatus}.
+        """
+        self.dispatcher.statusWatcher = Watcher([])
+        class SocketForClosing(object):
+            blocking = True
+            closed = False
+            def setblocking(self, b):
+                self.blocking = b
+            def fileno(self):
+                return object()
+            def close(self):
+                self.closed = True
+
+        one = SocketForClosing()
+        two = SocketForClosing()
+        three = SocketForClosing()
+
+        self.dispatcher.addSocket(
+            lambda: (SocketForClosing(), SocketForClosing())
+        )
+
+        self.dispatcher.sendFileDescriptor(one, "one")
+        self.dispatcher.sendFileDescriptor(two, "two")
+        self.dispatcher.sendFileDescriptor(three, "three")
+        def sendfd(unixSocket, tcpSocket, description):
+            pass
+        # Put something into the socket-close queue.
+        self.dispatcher._subprocessSockets[0].doWrite(sendfd)
+        # Nothing closed yet.
+        self.assertEquals(one.closed, False)
+        self.assertEquals(two.closed, False)
+        self.assertEquals(three.closed, False)
+
+        def recvmsg(fileno):
+            return 'data', 0, 0
+        self.dispatcher._subprocessSockets[0].doRead(recvmsg)
+        # One socket closed.
+        self.assertEquals(one.closed, True)
+        self.assertEquals(two.closed, False)
+        self.assertEquals(three.closed, False)
+
+
     def test_nonBlocking(self):
         """
         Creating a L{_SubprocessSocket} via
@@ -165,6 +232,7 @@
         message = "whatever"
         # Need to have a socket that will accept the descriptors.
         dispatcher.addSocket()
-        dispatcher.statusMessage(dispatcher._subprocessSockets[0], message)
-        dispatcher.statusMessage(dispatcher._subprocessSockets[0], message)
+        subskt = dispatcher._subprocessSockets[0]
+        dispatcher.statusMessage(subskt, message)
+        dispatcher.statusMessage(subskt, message)
         self.assertEquals(q, [[-1], [-2]])

Modified: CalendarServer/trunk/twext/web2/metafd.py
===================================================================
--- CalendarServer/trunk/twext/web2/metafd.py	2013-09-11 22:02:06 UTC (rev 11671)
+++ CalendarServer/trunk/twext/web2/metafd.py	2013-09-12 00:09:20 UTC (rev 11672)
@@ -23,6 +23,8 @@
 
 from functools import total_ordering
 
+from zope.interface import implementer
+
 from twext.internet.sendfdport import (
     InheritedPort, InheritedSocketDispatcher, InheritingProtocolFactory)
 from twext.internet.tcp import MaxAcceptTCPServer
@@ -32,6 +34,7 @@
 from twisted.internet import reactor
 from twisted.python.util import FancyStrMixin
 from twisted.internet.tcp import Server
+from twext.internet.sendfdport import IStatusWatcher
 
 log = Logger()
 
@@ -167,10 +170,11 @@
     The status of a worker process.
     """
 
-    showAttributes = "acknowledged unacknowledged started abandoned".split()
+    showAttributes = ("acknowledged unacknowledged started abandoned unclosed"
+                      .split())
 
     def __init__(self, acknowledged=0, unacknowledged=0, started=0,
-                 abandoned=0):
+                 abandoned=0, unclosed=0):
         """
         Create a L{ConnectionStatus} with a number of sent connections and a
         number of un-acknowledged connections.
@@ -188,11 +192,15 @@
             worker restarted.
 
         @param started: The number of times this worker has been started.
+
+        @param unclosed: The number of sockets which have been sent to the
+            subprocess but not yet closed.
         """
         self.acknowledged = acknowledged
         self.unacknowledged = unacknowledged
         self.started = started
         self.abandoned = abandoned
+        self.unclosed = unclosed
 
 
     def effective(self):
@@ -211,8 +219,7 @@
 
 
     def _tuplify(self):
-        return (self.acknowledged, self.unacknowledged, self.started,
-                self.abandoned)
+        return tuple(getattr(self, attr) for attr in self.showAttributes)
 
 
     def __lt__(self, other):
@@ -230,22 +237,20 @@
     def __add__(self, other):
         if not isinstance(other, WorkerStatus):
             return NotImplemented
-        return self.__class__(self.acknowledged + other.acknowledged,
-                              self.unacknowledged + other.unacknowledged,
-                              self.started + other.started,
-                              self.abandoned + other.abandoned)
+        a = self._tuplify()
+        b = other._tuplify()
+        c = [a1 + b1 for (a1, b1) in zip(a, b)]
+        return self.__class__(*c)
 
 
     def __sub__(self, other):
         if not isinstance(other, WorkerStatus):
             return NotImplemented
-        return self + self.__class__(-other.acknowledged,
-                                     -other.unacknowledged,
-                                     -other.started,
-                                     -other.abandoned)
+        return self + self.__class__(*[-x for x in other._tuplify()])
 
 
 
+ at implementer(IStatusWatcher)
 class ConnectionLimiter(MultiService, object):
     """
     Connection limiter for use with L{InheritedSocketDispatcher}.
@@ -253,6 +258,8 @@
     This depends on statuses being reported by L{ReportingHTTPFactory}
     """
 
+    _outstandingRequests = 0
+
     def __init__(self, maxAccepts, maxRequests):
         """
         Create a L{ConnectionLimiter} with an associated dispatcher and
@@ -319,9 +326,18 @@
         else:
             # '+' acknowledges that the subprocess has taken on the work.
             return previousStatus + WorkerStatus(acknowledged=1,
-                                                 unacknowledged=-1)
+                                                 unacknowledged=-1,
+                                                 unclosed=1)
 
 
+    def closeCountFromStatus(self, status):
+        """
+        Determine the number of sockets to close from the current status.
+        """
+        toClose = status.unclosed
+        return (toClose, status - WorkerStatus(unclosed=toClose))
+
+
     def newConnectionStatus(self, previousStatus):
         """
         Determine the effect of a new connection being sent on a subprocess
@@ -352,7 +368,6 @@
                 f.myServer.myPort.startReading()
 
 
-    _outstandingRequests = 0
     @property # make read-only
     def outstandingRequests(self):
         return self._outstandingRequests

Modified: CalendarServer/trunk/twext/web2/test/test_metafd.py
===================================================================
--- CalendarServer/trunk/twext/web2/test/test_metafd.py	2013-09-11 22:02:06 UTC (rev 11671)
+++ CalendarServer/trunk/twext/web2/test/test_metafd.py	2013-09-12 00:09:20 UTC (rev 11672)
@@ -61,6 +61,7 @@
         return ("4.3.2.1", 4321)
 
 
+
 class InheritedPortForTesting(sendfdport.InheritedPort):
     """
     L{sendfdport.InheritedPort} subclass that prevents certain I/O operations
@@ -92,15 +93,19 @@
     def startReading(self):
         "Do nothing."
 
+
     def stopReading(self):
         "Do nothing."
 
+
     def startWriting(self):
         "Do nothing."
 
+
     def stopWriting(self):
         "Do nothing."
 
+
     def __init__(self, *a, **kw):
         super(ServerTransportForTesting, self).__init__(*a, **kw)
         self.reactor = None
@@ -198,9 +203,9 @@
         L{WorkerStatus.__repr__} will show all the values associated with the
         status of the worker.
         """
-        self.assertEquals(repr(WorkerStatus(1, 2, 3, 4)),
+        self.assertEquals(repr(WorkerStatus(1, 2, 3, 4, 5)),
                           "<WorkerStatus acknowledged=1 unacknowledged=2 "
-                          "started=3 abandoned=4>")
+                          "started=3 abandoned=4 unclosed=5>")
 
 
 
@@ -237,7 +242,8 @@
         def serverServiceMaker(port, factory, *a, **k):
             s.factory = factory
             s.myPort = NotAPort()
-            s.myPort.startReading() # TODO: technically, should wait for startService
+            # TODO: technically, the following should wait for startService
+            s.myPort.startReading()
             factory.myServer = s
             return s
         return serverServiceMaker
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130911/591cee5f/attachment-0001.html>


More information about the calendarserver-changes mailing list