[CalendarServer-changes] [6293] CalendarServer/trunk/calendarserver/tap
source_changes at macosforge.org
source_changes at macosforge.org
Tue Sep 14 20:31:12 PDT 2010
Revision: 6293
http://trac.macosforge.org/projects/calendarserver/changeset/6293
Author: glyph at apple.com
Date: 2010-09-14 20:31:11 -0700 (Tue, 14 Sep 2010)
Log Message:
-----------
Fix incompatibility with Twisted 10.1+, and improve test coverage for calendarserver.tap.caldav, as well as speeding up the tests a little by not depending on the real reactor at all.
(Plus, quiet some chatty tests by redirecting their output to the log.)
Modified Paths:
--------------
CalendarServer/trunk/calendarserver/tap/caldav.py
CalendarServer/trunk/calendarserver/tap/test/test_caldav.py
Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py 2010-09-15 03:15:56 UTC (rev 6292)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py 2010-09-15 03:31:11 UTC (rev 6293)
@@ -30,6 +30,8 @@
from subprocess import Popen, PIPE
from pwd import getpwuid, getpwnam
from grp import getgrnam
+from inspect import getargspec
+
from OpenSSL.SSL import Error as SSLError
import OpenSSL
@@ -40,13 +42,13 @@
from twisted.python.usage import Options, UsageError
from twisted.python.reflect import namedClass
from twisted.plugin import IPlugin
-from twisted.internet.defer import Deferred, gatherResults, succeed
+from twisted.internet.defer import gatherResults
from twisted.internet import error, reactor
-from twisted.internet.reactor import callLater, addSystemEventTrigger
+from twisted.internet.reactor import addSystemEventTrigger
from twisted.internet.process import ProcessExitedAlready
from twisted.internet.protocol import Protocol, Factory
from twisted.application.internet import TCPServer, UNIXServer
-from twisted.application.service import Service, MultiService, IServiceMaker
+from twisted.application.service import MultiService, IServiceMaker
from twisted.scripts.mktap import getid
from twisted.runner import procmon
from twext.web2.server import Site
@@ -1151,6 +1153,8 @@
# Record the port we were actually assigned
config.ControlPort = self._port.getHost().port
+
+
class DelayedStartupProcessMonitor(procmon.ProcessMonitor):
"""
A L{DelayedStartupProcessMonitor} is a L{procmon.ProcessMonitor} that
@@ -1159,6 +1163,10 @@
to determine their arguments as they are started up rather than entirely
ahead of time.
+ Also, unlike L{procmon.ProcessMonitor}, its C{stopService} returns a
+ L{Deferred} which fires only when all processes have shut down, to allow
+ for a clean service shutdown.
+
@ivar processObjects: a C{list} of L{TwistdSlaveProcess} to add using
C{self.addProcess} when this service starts up.
@@ -1170,14 +1178,35 @@
@ivar reactor: an L{IReactorProcess} for spawning processes, defaulting to
the global reactor.
+
+ @ivar delayInterval: the amount of time to wait between starting subsequent
+ processes.
+
+ @ivar stopping: a flag used to determine whether it is time to fire the
+ Deferreds that track service shutdown.
"""
+ _shouldPassReactor = (
+ len(getargspec(procmon.ProcessMonitor.__init__)[0]) > 1
+ )
+
def __init__(self, *args, **kwargs):
+ reactorToUse = kwargs.get("reactor", reactor)
+ if not self._shouldPassReactor:
+ # Try to do this the right way if we can, otherwise, let the tests
+ # monkeypatch. (Our superclass does not accept a 'reactor'
+ # argument in Twisted 10.0.0, but does in Twisted 10.1.0 and
+ # later.)
+ kwargs.pop('reactor', None)
procmon.ProcessMonitor.__init__(self, *args, **kwargs)
self.processObjects = []
self._extraFDs = {}
- self.reactor = reactor
+ self.reactor = reactorToUse
self.stopping = False
+ if config.MultiProcess.StaggeredStartup.Enabled:
+ self.delayInterval = config.MultiProcess.StaggeredStartup.Interval
+ else:
+ self.delayInterval = 0
def addProcessObject(self, process, env):
@@ -1193,65 +1222,32 @@
def startService(self):
- Service.startService(self)
-
# Now we're ready to build the command lines and actualy add the
- # processes to procmon. This step must be done prior to setting
- # active to 1
+ # processes to procmon.
for processObject, env in self.processObjects:
name = processObject.getName()
- self.addProcess(
- name,
- processObject.getCommandLine(),
- env=env
- )
- self._extraFDs[name] = processObject.getFileDescriptors()
+ cmdline = processObject.getCommandLine()
+ filedes = processObject.getFileDescriptors()
+ self._extraFDs[name] = filedes
+ self.addProcess(name, cmdline, env=env)
+ procmon.ProcessMonitor.startService(self)
- self.active = 1
- delay = 0
- if config.MultiProcess.StaggeredStartup.Enabled:
- delay_interval = config.MultiProcess.StaggeredStartup.Interval
- else:
- delay_interval = 0
-
- for name in self.processes.keys():
- if name.startswith("caldav"):
- when = delay
- delay += delay_interval
- else:
- when = 0
- callLater(when, self.startProcess, name)
-
- self.consistency = callLater(
- self.consistencyDelay,
- self._checkConsistency
- )
-
def stopService(self):
"""
- Return a deferred that fires when all child processes have ended
+ Return a deferred that fires when all child processes have ended.
"""
-
- Service.stopService(self)
-
self.stopping = True
- self.active = 0
- self.consistency.cancel()
-
- self.deferreds = { }
- for name in self.processes.keys():
- self.deferreds[name] = Deferred()
- self.stopProcess(name)
-
+ self.deferreds = {}
+ procmon.ProcessMonitor.stopService(self)
return gatherResults(self.deferreds.values())
+
def processEnded(self, name):
"""
When a child process has ended it calls me so I can fire the
appropriate deferred which was created in stopService
"""
-
if self.stopping:
deferred = self.deferreds.get(name, None)
if deferred is not None:
@@ -1272,14 +1268,15 @@
if startswithname is None or name.startswith(startswithname):
self.signalProcess(signal, name)
+
def signalProcess(self, signal, name):
"""
- Send a signal to each monitored process
+ Send a signal to a single monitored process, by name, if that process
+ is running; otherwise, do nothing.
@param signal: the signal to send
@type signal: C{int}
- @param startswithname: is set only signal those processes
- whose name starts with this string
+ @param name: the name of the process to signal.
@type signal: C{str}
"""
if not self.protocols.has_key(name):
@@ -1290,7 +1287,13 @@
except ProcessExitedAlready:
pass
- def startProcess(self, name):
+
+ def reallyStartProcess(self, name):
+ """
+ Actually start a process. (Re-implemented here rather than just using
+ the inherited implementation of startService because ProcessMonitor
+ doesn't allow customization of subprocess environment).
+ """
if self.protocols.has_key(name):
return
p = self.protocols[name] = DelayedStartupLoggingProtocol()
@@ -1308,43 +1311,25 @@
childFDs=childFDs
)
- def _forceStopProcess(self, proc, name):
- """
- After self.killTime seconds has passed, this method sends a KILL
- to the process. Clean up the now-called murder deferred.
- """
- try:
- # This deferred has fired so remove it to keep from trying to
- # cancel it later
- del self.murder[name]
- except KeyError:
- pass
- try:
- proc.signalProcess('KILL')
- except error.ProcessExitedAlready:
- pass
+ _pendingStarts = 0
- def stopProcess(self, name):
+ def startProcess(self, name):
"""
- Stop the process gently at first (TERM), but schedule a KILL for
- self.killTime seconds later.
+ Start process named 'name'. If another process has started recently,
+ wait until some time has passed before actually starting this process.
+
+ @param name: the name of the process to start.
"""
+ interval = (self.delayInterval * self._pendingStarts)
+ self._pendingStarts += 1
+ def delayedStart():
+ self._pendingStarts -= 1
+ self.reallyStartProcess(name)
+ self.reactor.callLater(interval, delayedStart)
- if not self.protocols.has_key(name):
- return
- proc = self.protocols[name].transport
- del self.protocols[name]
- try:
- proc.signalProcess('TERM')
- except error.ProcessExitedAlready:
- pass
- else:
- self.murder[name] = reactor.callLater(self.killTime, self._forceStopProcess, proc, name)
-
-
class DelayedStartupLineLogger(object):
"""
A line logger that can handle very long lines.
Modified: CalendarServer/trunk/calendarserver/tap/test/test_caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/test/test_caldav.py 2010-09-15 03:15:56 UTC (rev 6292)
+++ CalendarServer/trunk/calendarserver/tap/test/test_caldav.py 2010-09-15 03:31:11 UTC (rev 6293)
@@ -29,9 +29,10 @@
from twisted.python.reflect import namedAny
from twisted.python import log
+from twisted.internet.interfaces import IProcessTransport, IReactorProcess
from twisted.internet.protocol import ServerFactory
-from twisted.internet.defer import Deferred, inlineCallbacks
-from twisted.internet.interfaces import IProcessTransport, IReactorProcess
+from twisted.internet.defer import Deferred
+from twisted.internet.task import Clock
from twisted.application.service import IService
from twisted.application import internet
@@ -40,7 +41,7 @@
from twext.web2.log import LogWrapperResource
from twext.python.filepath import CachingFilePath as FilePath
-from twext.python.plistlib import writePlist
+from twext.python.plistlib import writePlist #@UnresolvedImport
from twext.internet.tcp import MaxAcceptTCPServer, MaxAcceptSSLServer
from twistedcaldav.config import config, ConfigDict
@@ -58,6 +59,7 @@
DelayedStartupProcessMonitor, DelayedStartupLineLogger, TwistdSlaveProcess
)
from calendarserver.provision.root import RootResource
+from StringIO import StringIO
# Points to top of source tree.
@@ -86,9 +88,9 @@
self.childFDs = childFDs
-class InMemoryProcessSpawner(object):
+class InMemoryProcessSpawner(Clock):
"""
- Stub out L{IReactorProcess.spawnProcess} so that we can examine the
+ Stub out L{IReactorProcess} and L{IReactorClock} so that we can examine the
interaction of L{DelayedStartupProcessMonitor} and the reactor.
"""
implements(IReactorProcess)
@@ -97,18 +99,25 @@
"""
Create some storage to hold on to all the fake processes spawned.
"""
+ super(InMemoryProcessSpawner, self).__init__()
self.processTransports = []
self.waiting = []
- def waitForOneProcess(self):
+
+ def waitForOneProcess(self, amount=10.0):
"""
- Return a L{Deferred} which will fire when spawnProcess has been
- invoked, with the L{IProcessTransport}.
+ Wait for an L{IProcessTransport} to be created by advancing the clock.
+ If none are created in the specified amount of time, raise an
+ AssertionError.
"""
- d = Deferred()
- self.waiting.append(d)
- return d
+ self.advance(amount)
+ if self.processTransports:
+ return self.processTransports.pop(0)
+ else:
+ print 'wth', self.calls
+ raise AssertionError("There were no process transports available.")
+
def spawnProcess(self, processProtocol, executable, args=(), env={},
path=None, uid=None, gid=None, usePTY=0,
childFDs=None):
@@ -117,13 +126,14 @@
processProtocol, executable, args, env, path, uid, gid, usePTY,
childFDs
)
+ transport.startedAt = self.seconds()
self.processTransports.append(transport)
if self.waiting:
self.waiting.pop(0).callback(transport)
return transport
-
+
class TestCalDAVOptions (CalDAVOptions):
"""
A fake implementation of CalDAVOptions that provides
@@ -136,6 +146,21 @@
pass
+ def loadConfiguration(self):
+ """
+ Simple wrapper to avoid printing during test runs.
+ """
+ oldout = sys.stdout
+ newout = sys.stdout = StringIO()
+ try:
+ return CalDAVOptions.loadConfiguration(self)
+ finally:
+ sys.stdout = oldout
+ log.msg(
+ "load configuration console output: %s" % newout.getvalue()
+ )
+
+
class CalDAVOptionsTest (TestCase):
"""
Test various parameters of our usage.Options subclass
@@ -901,6 +926,17 @@
Test cases for L{DelayedStartupProcessMonitor}.
"""
+ def useFakeReactor(self, fakeReactor):
+ """
+ Earlier versions of Twisted used a global reactor in
+ L{twisted.runner.procmon}, so we need to take that into account when
+ testing.
+ """
+ if not DelayedStartupProcessMonitor._shouldPassReactor:
+ import twisted.runner.procmon as inherited
+ self.patch(inherited, "reactor", fakeReactor)
+
+
def test_lineAfterLongLine(self):
"""
A "long" line of output from a monitored process (longer than
@@ -949,14 +985,14 @@
return d
- @inlineCallbacks
def test_acceptDescriptorInheritance(self):
"""
If a L{TwistdSlaveProcess} specifies some file descriptors to be
inherited, they should be inherited by the subprocess.
"""
- dspm = DelayedStartupProcessMonitor()
- dspm.reactor = InMemoryProcessSpawner()
+ dspm = DelayedStartupProcessMonitor()
+ imps = dspm.reactor = InMemoryProcessSpawner()
+ self.useFakeReactor(imps)
# Most arguments here will be ignored, so these are bogus values.
slave = TwistdSlaveProcess(
@@ -971,16 +1007,16 @@
dspm.addProcessObject(slave, {})
dspm.startService()
- self.addCleanup(dspm.consistency.cancel)
# We can easily stub out spawnProcess, because caldav calls it, but a
# bunch of callLater calls are buried in procmon itself, so we need to
# use the real clock.
- oneProcessTransport = yield dspm.reactor.waitForOneProcess()
+ oneProcessTransport = imps.waitForOneProcess()
self.assertEquals(oneProcessTransport.childFDs,
{0: 'w', 1: 'r', 2: 'r',
3: 3, 7: 7,
19: 19, 25: 25})
- @inlineCallbacks
+
+
def test_metaDescriptorInheritance(self):
"""
If a L{TwistdSlaveProcess} specifies a meta-file-descriptor to be
@@ -988,20 +1024,9 @@
configuration argument should be passed that indicates to the
subprocess.
"""
- dspm = DelayedStartupProcessMonitor()
- dspm.reactor = InMemoryProcessSpawner()
- class FakeFD:
- def __init__(self, n):
- self.fd = n
- def fileno(self):
- return self.fd
-
- class FakeDispatcher:
- n = 3
- def addSocket(self):
- self.n += 1
- return FakeFD(self.n)
-
+ dspm = DelayedStartupProcessMonitor()
+ imps = dspm.reactor = InMemoryProcessSpawner()
+ self.useFakeReactor(imps)
# Most arguments here will be ignored, so these are bogus values.
slave = TwistdSlaveProcess(
twistd = "bleh",
@@ -1014,8 +1039,7 @@
dspm.addProcessObject(slave, {})
dspm.startService()
- self.addCleanup(dspm.consistency.cancel)
- oneProcessTransport = yield dspm.reactor.waitForOneProcess()
+ oneProcessTransport = imps.waitForOneProcess()
self.assertIn("MetaFD=4", oneProcessTransport.args)
self.assertEquals(
oneProcessTransport.args[oneProcessTransport.args.index("MetaFD=4")-1],
@@ -1027,7 +1051,58 @@
4: 4})
+ def test_startServiceDelay(self):
+ """
+ Starting a L{DelayedStartupProcessMonitor} should result in the process
+ objects that have been added to it being started once per
+ delayInterval.
+ """
+ dspm = DelayedStartupProcessMonitor()
+ dspm.delayInterval = 3.0
+ imps = dspm.reactor = InMemoryProcessSpawner()
+ self.useFakeReactor(imps)
+ sampleCounter = range(0, 5)
+ for counter in sampleCounter:
+ slave = TwistdSlaveProcess(
+ twistd = "bleh",
+ tapname = "caldav",
+ configFile = "/does/not/exist",
+ id = counter * 10,
+ interfaces = '127.0.0.1',
+ metaSocket = FakeDispatcher().addSocket()
+ )
+ dspm.addProcessObject(slave, {"SAMPLE_ENV_COUNTER": str(counter)})
+ dspm.startService()
+ # Advance the clock a bunch of times, allowing us to time things with a
+ # comprehensible resolution.
+ imps.pump([0] + [dspm.delayInterval / 2.0] * len(sampleCounter) * 3)
+ expectedValues = [dspm.delayInterval * n for n in sampleCounter]
+ self.assertEquals([x.startedAt for x in imps.processTransports],
+ expectedValues)
+
+
+
+class FakeFD(object):
+
+ def __init__(self, n):
+ self.fd = n
+
+
+ def fileno(self):
+ return self.fd
+
+
+
+class FakeDispatcher(object):
+ n = 3
+
+ def addSocket(self):
+ self.n += 1
+ return FakeFD(self.n)
+
+
+
class TwistdSlaveProcessTests(TestCase):
"""
Tests for L{TwistdSlaveProcess}.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20100914/ddc69ba1/attachment-0001.html>
More information about the calendarserver-changes
mailing list