[CalendarServer-changes] [6293] CalendarServer/trunk/calendarserver/tap

source_changes at macosforge.org source_changes at macosforge.org
Tue Sep 14 20:31:12 PDT 2010


Revision: 6293
          http://trac.macosforge.org/projects/calendarserver/changeset/6293
Author:   glyph at apple.com
Date:     2010-09-14 20:31:11 -0700 (Tue, 14 Sep 2010)
Log Message:
-----------
Fix incompatibility with Twisted 10.1+, and improve test coverage for calendarserver.tap.caldav, as well as speeding up the tests a little by not depending on the real reactor at all.

(Plus, quiet some chatty tests by redirecting their output to the log.)

Modified Paths:
--------------
    CalendarServer/trunk/calendarserver/tap/caldav.py
    CalendarServer/trunk/calendarserver/tap/test/test_caldav.py

Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py	2010-09-15 03:15:56 UTC (rev 6292)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py	2010-09-15 03:31:11 UTC (rev 6293)
@@ -30,6 +30,8 @@
 from subprocess import Popen, PIPE
 from pwd import getpwuid, getpwnam
 from grp import getgrnam
+from inspect import getargspec
+
 from OpenSSL.SSL import Error as SSLError
 import OpenSSL
 
@@ -40,13 +42,13 @@
 from twisted.python.usage import Options, UsageError
 from twisted.python.reflect import namedClass
 from twisted.plugin import IPlugin
-from twisted.internet.defer import Deferred, gatherResults, succeed
+from twisted.internet.defer import gatherResults
 from twisted.internet import error, reactor
-from twisted.internet.reactor import callLater, addSystemEventTrigger
+from twisted.internet.reactor import addSystemEventTrigger
 from twisted.internet.process import ProcessExitedAlready
 from twisted.internet.protocol import Protocol, Factory
 from twisted.application.internet import TCPServer, UNIXServer
-from twisted.application.service import Service, MultiService, IServiceMaker
+from twisted.application.service import MultiService, IServiceMaker
 from twisted.scripts.mktap import getid
 from twisted.runner import procmon
 from twext.web2.server import Site
@@ -1151,6 +1153,8 @@
         # Record the port we were actually assigned
         config.ControlPort = self._port.getHost().port
 
+
+
 class DelayedStartupProcessMonitor(procmon.ProcessMonitor):
     """
     A L{DelayedStartupProcessMonitor} is a L{procmon.ProcessMonitor} that
@@ -1159,6 +1163,10 @@
     to determine their arguments as they are started up rather than entirely
     ahead of time.
 
+    Also, unlike L{procmon.ProcessMonitor}, its C{stopService} returns a
+    L{Deferred} which fires only when all processes have shut down, to allow
+    for a clean service shutdown.
+
     @ivar processObjects: a C{list} of L{TwistdSlaveProcess} to add using
         C{self.addProcess} when this service starts up.
 
@@ -1170,14 +1178,35 @@
 
     @ivar reactor: an L{IReactorProcess} for spawning processes, defaulting to
         the global reactor.
+
+    @ivar delayInterval: the amount of time to wait between starting subsequent
+        processes.
+
+    @ivar stopping: a flag used to determine whether it is time to fire the
+        Deferreds that track service shutdown.
     """
 
+    _shouldPassReactor = (
+        len(getargspec(procmon.ProcessMonitor.__init__)[0]) > 1
+    )
+
     def __init__(self, *args, **kwargs):
+        reactorToUse = kwargs.get("reactor", reactor)
+        if not self._shouldPassReactor:
+            # Try to do this the right way if we can, otherwise, let the tests
+            # monkeypatch.  (Our superclass does not accept a 'reactor'
+            # argument in Twisted 10.0.0, but does in Twisted 10.1.0 and
+            # later.)
+            kwargs.pop('reactor', None)
         procmon.ProcessMonitor.__init__(self, *args, **kwargs)
         self.processObjects = []
         self._extraFDs = {}
-        self.reactor = reactor
+        self.reactor = reactorToUse
         self.stopping = False
+        if config.MultiProcess.StaggeredStartup.Enabled:
+            self.delayInterval = config.MultiProcess.StaggeredStartup.Interval
+        else:
+            self.delayInterval = 0
 
 
     def addProcessObject(self, process, env):
@@ -1193,65 +1222,32 @@
 
 
     def startService(self):
-        Service.startService(self)
-
         # Now we're ready to build the command lines and actualy add the
-        # processes to procmon.  This step must be done prior to setting
-        # active to 1
+        # processes to procmon.
         for processObject, env in self.processObjects:
             name = processObject.getName()
-            self.addProcess(
-                name,
-                processObject.getCommandLine(),
-                env=env
-            )
-            self._extraFDs[name] = processObject.getFileDescriptors()
+            cmdline = processObject.getCommandLine()
+            filedes = processObject.getFileDescriptors()
+            self._extraFDs[name] = filedes
+            self.addProcess(name, cmdline, env=env)
+        procmon.ProcessMonitor.startService(self)
 
-        self.active = 1
-        delay = 0
 
-        if config.MultiProcess.StaggeredStartup.Enabled:
-            delay_interval = config.MultiProcess.StaggeredStartup.Interval
-        else:
-            delay_interval = 0
-
-        for name in self.processes.keys():
-            if name.startswith("caldav"):
-                when = delay
-                delay += delay_interval
-            else:
-                when = 0
-            callLater(when, self.startProcess, name)
-
-        self.consistency = callLater(
-            self.consistencyDelay,
-            self._checkConsistency
-        )
-
     def stopService(self):
         """
-        Return a deferred that fires when all child processes have ended
+        Return a deferred that fires when all child processes have ended.
         """
-
-        Service.stopService(self)
-
         self.stopping = True
-        self.active = 0
-        self.consistency.cancel()
-
-        self.deferreds = { }
-        for name in self.processes.keys():
-            self.deferreds[name] = Deferred()
-            self.stopProcess(name)
-
+        self.deferreds = {}
+        procmon.ProcessMonitor.stopService(self)
         return gatherResults(self.deferreds.values())
 
+
     def processEnded(self, name):
         """
         When a child process has ended it calls me so I can fire the
         appropriate deferred which was created in stopService
         """
-
         if self.stopping:
             deferred = self.deferreds.get(name, None)
             if deferred is not None:
@@ -1272,14 +1268,15 @@
             if startswithname is None or name.startswith(startswithname):
                 self.signalProcess(signal, name)
 
+
     def signalProcess(self, signal, name):
         """
-        Send a signal to each monitored process
+        Send a signal to a single monitored process, by name, if that process
+        is running; otherwise, do nothing.
 
         @param signal: the signal to send
         @type signal: C{int}
-        @param startswithname: is set only signal those processes
-            whose name starts with this string
+        @param name: the name of the process to signal.
         @type signal: C{str}
         """
         if not self.protocols.has_key(name):
@@ -1290,7 +1287,13 @@
         except ProcessExitedAlready:
             pass
 
-    def startProcess(self, name):
+
+    def reallyStartProcess(self, name):
+        """
+        Actually start a process.  (Re-implemented here rather than just using
+        the inherited implementation of startService because ProcessMonitor
+        doesn't allow customization of subprocess environment).
+        """
         if self.protocols.has_key(name):
             return
         p = self.protocols[name] = DelayedStartupLoggingProtocol()
@@ -1308,43 +1311,25 @@
             childFDs=childFDs
         )
 
-    def _forceStopProcess(self, proc, name):
-        """
-        After self.killTime seconds has passed, this method sends a KILL
-        to the process.  Clean up the now-called murder deferred.
-        """
-        try:
-            # This deferred has fired so remove it to keep from trying to
-            # cancel it later
-            del self.murder[name]
-        except KeyError:
-            pass
-        try:
-            proc.signalProcess('KILL')
-        except error.ProcessExitedAlready:
-            pass
 
+    _pendingStarts = 0
 
-    def stopProcess(self, name):
+    def startProcess(self, name):
         """
-        Stop the process gently at first (TERM), but schedule a KILL for
-        self.killTime seconds later.
+        Start process named 'name'.  If another process has started recently,
+        wait until some time has passed before actually starting this process.
+
+        @param name: the name of the process to start.
         """
+        interval = (self.delayInterval * self._pendingStarts)
+        self._pendingStarts += 1
+        def delayedStart():
+            self._pendingStarts -= 1
+            self.reallyStartProcess(name)
+        self.reactor.callLater(interval, delayedStart)
 
-        if not self.protocols.has_key(name):
-            return
-        proc = self.protocols[name].transport
-        del self.protocols[name]
-        try:
-            proc.signalProcess('TERM')
-        except error.ProcessExitedAlready:
-            pass
-        else:
-            self.murder[name] = reactor.callLater(self.killTime, self._forceStopProcess, proc, name)
 
 
-
-
 class DelayedStartupLineLogger(object):
     """
     A line logger that can handle very long lines.

Modified: CalendarServer/trunk/calendarserver/tap/test/test_caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/test/test_caldav.py	2010-09-15 03:15:56 UTC (rev 6292)
+++ CalendarServer/trunk/calendarserver/tap/test/test_caldav.py	2010-09-15 03:31:11 UTC (rev 6293)
@@ -29,9 +29,10 @@
 from twisted.python.reflect import namedAny
 from twisted.python import log
 
+from twisted.internet.interfaces import IProcessTransport, IReactorProcess
 from twisted.internet.protocol import ServerFactory
-from twisted.internet.defer import Deferred, inlineCallbacks
-from twisted.internet.interfaces import IProcessTransport, IReactorProcess
+from twisted.internet.defer import Deferred
+from twisted.internet.task import Clock
 
 from twisted.application.service import IService
 from twisted.application import internet
@@ -40,7 +41,7 @@
 from twext.web2.log import LogWrapperResource
 from twext.python.filepath import CachingFilePath as FilePath
 
-from twext.python.plistlib import writePlist
+from twext.python.plistlib import writePlist #@UnresolvedImport
 from twext.internet.tcp import MaxAcceptTCPServer, MaxAcceptSSLServer
 
 from twistedcaldav.config import config, ConfigDict
@@ -58,6 +59,7 @@
     DelayedStartupProcessMonitor, DelayedStartupLineLogger, TwistdSlaveProcess
 )
 from calendarserver.provision.root import RootResource
+from StringIO import StringIO
 
 
 # Points to top of source tree.
@@ -86,9 +88,9 @@
         self.childFDs = childFDs
 
 
-class InMemoryProcessSpawner(object):
+class InMemoryProcessSpawner(Clock):
     """
-    Stub out L{IReactorProcess.spawnProcess} so that we can examine the
+    Stub out L{IReactorProcess} and L{IReactorClock} so that we can examine the
     interaction of L{DelayedStartupProcessMonitor} and the reactor.
     """
     implements(IReactorProcess)
@@ -97,18 +99,25 @@
         """
         Create some storage to hold on to all the fake processes spawned.
         """
+        super(InMemoryProcessSpawner, self).__init__()
         self.processTransports = []
         self.waiting = []
 
-    def waitForOneProcess(self):
+
+    def waitForOneProcess(self, amount=10.0):
         """
-        Return a L{Deferred} which will fire when spawnProcess has been
-        invoked, with the L{IProcessTransport}.
+        Wait for an L{IProcessTransport} to be created by advancing the clock.
+        If none are created in the specified amount of time, raise an
+        AssertionError.
         """
-        d = Deferred()
-        self.waiting.append(d)
-        return d
+        self.advance(amount)
+        if self.processTransports:
+            return self.processTransports.pop(0)
+        else:
+            print 'wth', self.calls
+            raise AssertionError("There were no process transports available.")
 
+
     def spawnProcess(self, processProtocol, executable, args=(), env={},
                      path=None, uid=None, gid=None, usePTY=0,
                      childFDs=None):
@@ -117,13 +126,14 @@
             processProtocol, executable, args, env, path, uid, gid, usePTY,
             childFDs
         )
+        transport.startedAt = self.seconds()
         self.processTransports.append(transport)
         if self.waiting:
             self.waiting.pop(0).callback(transport)
         return transport
-        
 
 
+
 class TestCalDAVOptions (CalDAVOptions):
     """
     A fake implementation of CalDAVOptions that provides
@@ -136,6 +146,21 @@
         pass
 
 
+    def loadConfiguration(self):
+        """
+        Simple wrapper to avoid printing during test runs.
+        """
+        oldout = sys.stdout
+        newout = sys.stdout = StringIO()
+        try:
+            return CalDAVOptions.loadConfiguration(self)
+        finally:
+            sys.stdout = oldout
+            log.msg(
+                "load configuration console output: %s" % newout.getvalue()
+            )
+
+
 class CalDAVOptionsTest (TestCase):
     """
     Test various parameters of our usage.Options subclass
@@ -901,6 +926,17 @@
     Test cases for L{DelayedStartupProcessMonitor}.
     """
 
+    def useFakeReactor(self, fakeReactor):
+        """
+        Earlier versions of Twisted used a global reactor in
+        L{twisted.runner.procmon}, so we need to take that into account when
+        testing.
+        """
+        if not DelayedStartupProcessMonitor._shouldPassReactor:
+            import twisted.runner.procmon as inherited
+            self.patch(inherited, "reactor", fakeReactor)
+
+
     def test_lineAfterLongLine(self):
         """
         A "long" line of output from a monitored process (longer than
@@ -949,14 +985,14 @@
         return d
 
 
-    @inlineCallbacks
     def test_acceptDescriptorInheritance(self):
         """
         If a L{TwistdSlaveProcess} specifies some file descriptors to be
         inherited, they should be inherited by the subprocess.
         """
-        dspm         = DelayedStartupProcessMonitor()
-        dspm.reactor = InMemoryProcessSpawner()
+        dspm                = DelayedStartupProcessMonitor()
+        imps = dspm.reactor = InMemoryProcessSpawner()
+        self.useFakeReactor(imps)
 
         # Most arguments here will be ignored, so these are bogus values.
         slave = TwistdSlaveProcess(
@@ -971,16 +1007,16 @@
 
         dspm.addProcessObject(slave, {})
         dspm.startService()
-        self.addCleanup(dspm.consistency.cancel)
         # We can easily stub out spawnProcess, because caldav calls it, but a
         # bunch of callLater calls are buried in procmon itself, so we need to
         # use the real clock.
-        oneProcessTransport = yield dspm.reactor.waitForOneProcess()
+        oneProcessTransport = imps.waitForOneProcess()
         self.assertEquals(oneProcessTransport.childFDs,
                           {0: 'w', 1: 'r', 2: 'r',
                            3: 3, 7: 7,
                            19: 19, 25: 25})
-    @inlineCallbacks
+
+
     def test_metaDescriptorInheritance(self):
         """
         If a L{TwistdSlaveProcess} specifies a meta-file-descriptor to be
@@ -988,20 +1024,9 @@
         configuration argument should be passed that indicates to the
         subprocess.
         """
-        dspm         = DelayedStartupProcessMonitor()
-        dspm.reactor = InMemoryProcessSpawner()
-        class FakeFD:
-            def __init__(self, n):
-                self.fd = n
-            def fileno(self):
-                return self.fd
-
-        class FakeDispatcher:
-            n = 3
-            def addSocket(self):
-                self.n += 1
-                return FakeFD(self.n)
-
+        dspm                = DelayedStartupProcessMonitor()
+        imps = dspm.reactor = InMemoryProcessSpawner()
+        self.useFakeReactor(imps)
         # Most arguments here will be ignored, so these are bogus values.
         slave = TwistdSlaveProcess(
             twistd     = "bleh",
@@ -1014,8 +1039,7 @@
 
         dspm.addProcessObject(slave, {})
         dspm.startService()
-        self.addCleanup(dspm.consistency.cancel)
-        oneProcessTransport = yield dspm.reactor.waitForOneProcess()
+        oneProcessTransport = imps.waitForOneProcess()
         self.assertIn("MetaFD=4", oneProcessTransport.args)
         self.assertEquals(
             oneProcessTransport.args[oneProcessTransport.args.index("MetaFD=4")-1],
@@ -1027,7 +1051,58 @@
                            4: 4})
 
 
+    def test_startServiceDelay(self):
+        """
+        Starting a L{DelayedStartupProcessMonitor} should result in the process
+        objects that have been added to it being started once per
+        delayInterval.
+        """
+        dspm                = DelayedStartupProcessMonitor()
+        dspm.delayInterval = 3.0
+        imps = dspm.reactor = InMemoryProcessSpawner()
+        self.useFakeReactor(imps)
+        sampleCounter = range(0, 5)
+        for counter in sampleCounter:
+            slave = TwistdSlaveProcess(
+                twistd     = "bleh",
+                tapname    = "caldav",
+                configFile = "/does/not/exist",
+                id         = counter * 10,
+                interfaces = '127.0.0.1',
+                metaSocket = FakeDispatcher().addSocket()
+            )
+            dspm.addProcessObject(slave, {"SAMPLE_ENV_COUNTER": str(counter)})
+        dspm.startService()
 
+        # Advance the clock a bunch of times, allowing us to time things with a
+        # comprehensible resolution.
+        imps.pump([0] + [dspm.delayInterval / 2.0] * len(sampleCounter) * 3)
+        expectedValues = [dspm.delayInterval * n for n in sampleCounter]
+        self.assertEquals([x.startedAt for x in imps.processTransports],
+                          expectedValues)
+
+
+
+class FakeFD(object):
+
+    def __init__(self, n):
+        self.fd = n
+
+    
+    def fileno(self):
+        return self.fd
+
+
+
+class FakeDispatcher(object):
+    n = 3
+
+    def addSocket(self):
+        self.n += 1
+        return FakeFD(self.n)
+
+
+
 class TwistdSlaveProcessTests(TestCase):
     """
     Tests for L{TwistdSlaveProcess}.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20100914/ddc69ba1/attachment-0001.html>


More information about the calendarserver-changes mailing list