[CalendarServer-changes] [6522] CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap
source_changes at macosforge.org
source_changes at macosforge.org
Mon Nov 1 14:21:50 PDT 2010
Revision: 6522
http://trac.macosforge.org/projects/calendarserver/changeset/6522
Author: glyph at apple.com
Date: 2010-11-01 14:21:47 -0700 (Mon, 01 Nov 2010)
Log Message:
-----------
refactor DelayedStartupProcessMonitor to be a stand-alone service, rather than using procmon. (It does so much itself, there's not much point to continuing to inherit.)
Modified Paths:
--------------
CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/caldav.py
CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/test/test_caldav.py
Modified: CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/caldav.py 2010-11-01 21:21:27 UTC (rev 6521)
+++ CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/caldav.py 2010-11-01 21:21:47 UTC (rev 6522)
@@ -29,7 +29,6 @@
from subprocess import Popen, PIPE
from pwd import getpwuid, getpwnam
from grp import getgrnam
-from inspect import getargspec
import OpenSSL
from OpenSSL.SSL import Error as SSLError
@@ -41,13 +40,14 @@
from twisted.python.reflect import namedClass
from twisted.plugin import IPlugin
from twisted.internet.defer import gatherResults
-from twisted.internet import reactor
+from twisted.internet import reactor as _reactor
from twisted.internet.reactor import addSystemEventTrigger
from twisted.internet.process import ProcessExitedAlready
from twisted.internet.protocol import Protocol, Factory
+from twisted.internet.protocol import ProcessProtocol
from twisted.application.internet import TCPServer, UNIXServer
from twisted.application.service import MultiService, IServiceMaker
-from twisted.runner import procmon
+from twisted.application.service import Service
import twext
from twext.web2.server import Site
@@ -1221,7 +1221,7 @@
-class DelayedStartupProcessMonitor(procmon.ProcessMonitor):
+class DelayedStartupProcessMonitor(Service, object):
"""
A L{DelayedStartupProcessMonitor} is a L{procmon.ProcessMonitor} that
defers building its command lines until the service is actually ready to
@@ -1252,22 +1252,22 @@
Deferreds that track service shutdown.
"""
- _shouldPassReactor = (
- len(getargspec(procmon.ProcessMonitor.__init__)[0]) > 1
- )
+ threshold = 1
+ killTime = 5
+ minRestartDelay = 1
+ maxRestartDelay = 3600
- def __init__(self, *args, **kwargs):
- reactorToUse = kwargs.get("reactor", reactor)
- if not self._shouldPassReactor:
- # Try to do this the right way if we can, otherwise, let the tests
- # monkeypatch. (Our superclass does not accept a 'reactor'
- # argument in Twisted 10.0.0, but does in Twisted 10.1.0 and
- # later.)
- kwargs.pop('reactor', None)
- procmon.ProcessMonitor.__init__(self, *args, **kwargs)
+ def __init__(self, reactor=_reactor):
+ super(DelayedStartupProcessMonitor, self).__init__()
+ self._reactor = reactor
+ self.processes = {}
+ self.protocols = {}
+ self.delay = {}
+ self.timeStarted = {}
+ self.murder = {}
+ self.restart = {}
self.processObjects = []
self._extraFDs = {}
- self.reactor = reactorToUse
self.stopping = False
if config.MultiProcess.StaggeredStartup.Enabled:
self.delayInterval = config.MultiProcess.StaggeredStartup.Interval
@@ -1275,6 +1275,39 @@
self.delayInterval = 0
+ def addProcess(self, name, args, uid=None, gid=None, env={}):
+ """
+ Add a new monitored process and start it immediately if the
+ L{DelayedStartupProcessMonitor} service is running.
+
+ Note that args are passed to the system call, not to the shell. If
+ running the shell is desired, the common idiom is to use
+ C{ProcessMonitor.addProcess("name", ['/bin/sh', '-c', shell_script])}
+
+ @param name: A name for this process. This value must be
+ unique across all processes added to this monitor.
+ @type name: C{str}
+ @param args: The argv sequence for the process to launch.
+ @param uid: The user ID to use to run the process. If C{None},
+ the current UID is used.
+ @type uid: C{int}
+ @param gid: The group ID to use to run the process. If C{None},
+ the current GID is used.
+ @type uid: C{int}
+ @param env: The environment to give to the launched process. See
+ L{IReactorProcess.spawnProcess}'s C{env} parameter.
+ @type env: C{dict}
+ @raises: C{KeyError} if a process with the given name already
+ exists
+ """
+ if name in self.processes:
+ raise KeyError("remove %s first" % (name,))
+ self.processes[name] = args, uid, gid, env
+ self.delay[name] = self.minRestartDelay
+ if self.running:
+ self.startProcess(name)
+
+
def addProcessObject(self, process, env):
"""
Add a process object to be run when this service is started.
@@ -1296,7 +1329,9 @@
filedes = processObject.getFileDescriptors()
self._extraFDs[name] = filedes
self.addProcess(name, cmdline, env=env)
- procmon.ProcessMonitor.startService(self)
+ super(DelayedStartupProcessMonitor, self).startService()
+ for name in self.processes:
+ self.startProcess(name)
def stopService(self):
@@ -1305,21 +1340,96 @@
"""
self.stopping = True
self.deferreds = {}
- procmon.ProcessMonitor.stopService(self)
+ super(DelayedStartupProcessMonitor, self).stopService()
+
+ # Cancel any outstanding restarts
+ for name, delayedCall in self.restart.items():
+ if delayedCall.active():
+ delayedCall.cancel()
+
+ for name in self.processes:
+ self.stopProcess(name)
return gatherResults(self.deferreds.values())
+ def removeProcess(self, name):
+ """
+ Stop the named process and remove it from the list of monitored
+ processes.
+
+ @type name: C{str}
+ @param name: A string that uniquely identifies the process.
+ """
+ self.stopProcess(name)
+ del self.processes[name]
+
+
+ def stopProcess(self, name):
+ """
+ @param name: The name of the process to be stopped
+ """
+ if name not in self.processes:
+ raise KeyError('Unrecognized process name: %s' % (name,))
+
+ proto = self.protocols.get(name, None)
+ if proto is not None:
+ proc = proto.transport
+ try:
+ proc.signalProcess('TERM')
+ except ProcessExitedAlready:
+ pass
+ else:
+ self.murder[name] = self._reactor.callLater(
+ self.killTime,
+ self._forceStopProcess, proc)
+
+
def processEnded(self, name):
"""
When a child process has ended it calls me so I can fire the
appropriate deferred which was created in stopService
"""
+ # Cancel the scheduled _forceStopProcess function if the process
+ # dies naturally
+ if name in self.murder:
+ if self.murder[name].active():
+ self.murder[name].cancel()
+ del self.murder[name]
+
+ del self.protocols[name]
+
+ if self._reactor.seconds() - self.timeStarted[name] < self.threshold:
+ # The process died too fast - backoff
+ nextDelay = self.delay[name]
+ self.delay[name] = min(self.delay[name] * 2, self.maxRestartDelay)
+
+ else:
+ # Process had been running for a significant amount of time
+ # restart immediately
+ nextDelay = 0
+ self.delay[name] = self.minRestartDelay
+
+ # Schedule a process restart if the service is running
+ if self.running and name in self.processes:
+ self.restart[name] = self._reactor.callLater(nextDelay,
+ self.startProcess,
+ name)
if self.stopping:
deferred = self.deferreds.get(name, None)
if deferred is not None:
deferred.callback(None)
+ def _forceStopProcess(self, proc):
+ """
+ @param proc: An L{IProcessTransport} provider
+ """
+ try:
+ proc.signalProcess('KILL')
+ except ProcessExitedAlready:
+ pass
+
+
def signalAll(self, signal, startswithname=None):
"""
Send a signal to all child processes.
@@ -1372,7 +1482,7 @@
childFDs.update(self._extraFDs.get(name, {}))
- self.reactor.spawnProcess(
+ self._reactor.spawnProcess(
p, args[0], args, uid=uid, gid=gid, env=env,
childFDs=childFDs
)
@@ -1392,10 +1502,38 @@
def delayedStart():
self._pendingStarts -= 1
self.reallyStartProcess(name)
- self.reactor.callLater(interval, delayedStart)
+ self._reactor.callLater(interval, delayedStart)
+ def restartAll(self):
+ """
+ Restart all processes. This is useful for third party management
+ services to allow a user to restart servers because of an outside change
+ in circumstances -- for example, a new version of a library is
+ installed.
+ """
+ for name in self.processes:
+ self.stopProcess(name)
+
+ def __repr__(self):
+ l = []
+ for name, proc in self.processes.items():
+ uidgid = ''
+ if proc[1] is not None:
+ uidgid = str(proc[1])
+ if proc[2] is not None:
+ uidgid += ':'+str(proc[2])
+
+ if uidgid:
+ uidgid = '(' + uidgid + ')'
+ l.append('%r%s: %r' % (name, uidgid, proc[0]))
+ return ('<' + self.__class__.__name__ + ' '
+ + ' '.join(l)
+ + '>')
+
+
+
class DelayedStartupLineLogger(object):
"""
A line logger that can handle very long lines.
@@ -1448,28 +1586,42 @@
-class DelayedStartupLoggingProtocol(procmon.LoggingProtocol, object):
+class DelayedStartupLoggingProtocol(ProcessProtocol):
"""
Logging protocol that handles lines which are too long.
"""
+ service = None
+ name = None
+ empty = 1
+
def connectionMade(self):
"""
Replace the superclass's output monitoring logic with one that can
handle lineLengthExceeded.
"""
- super(DelayedStartupLoggingProtocol, self).connectionMade()
self.output = DelayedStartupLineLogger()
+ self.output.makeConnection(self.transport)
self.output.tag = self.name
+
+ def outReceived(self, data):
+ self.output.dataReceived(data)
+ self.empty = data[-1] == '\n'
+
+ errReceived = outReceived
+
+
def processEnded(self, reason):
"""
Let the service know that this child process has ended
"""
- procmon.LoggingProtocol.processEnded(self, reason)
+ if not self.empty:
+ self.output.dataReceived('\n')
self.service.processEnded(self.name)
+
def getSSLPassphrase(*ignored):
if not config.SSLPrivateKey:
Modified: CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/test/test_caldav.py
===================================================================
--- CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/test/test_caldav.py 2010-11-01 21:21:27 UTC (rev 6521)
+++ CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/test/test_caldav.py 2010-11-01 21:21:47 UTC (rev 6522)
@@ -932,17 +932,6 @@
Test cases for L{DelayedStartupProcessMonitor}.
"""
- def useFakeReactor(self, fakeReactor):
- """
- Earlier versions of Twisted used a global reactor in
- L{twisted.runner.procmon}, so we need to take that into account when
- testing.
- """
- if not DelayedStartupProcessMonitor._shouldPassReactor:
- import twisted.runner.procmon as inherited
- self.patch(inherited, "reactor", fakeReactor)
-
-
def test_lineAfterLongLine(self):
"""
A "long" line of output from a monitored process (longer than
@@ -972,7 +961,7 @@
logged.append(event)
if m == '[Dummy] z':
d.callback("done")
-
+
log.addObserver(tempObserver)
self.addCleanup(log.removeObserver, tempObserver)
d = Deferred()
@@ -996,9 +985,8 @@
If a L{TwistdSlaveProcess} specifies some file descriptors to be
inherited, they should be inherited by the subprocess.
"""
- dspm = DelayedStartupProcessMonitor()
- imps = dspm.reactor = InMemoryProcessSpawner()
- self.useFakeReactor(imps)
+ imps = InMemoryProcessSpawner()
+ dspm = DelayedStartupProcessMonitor(imps)
# Most arguments here will be ignored, so these are bogus values.
slave = TwistdSlaveProcess(
@@ -1030,9 +1018,8 @@
configuration argument should be passed that indicates to the
subprocess.
"""
- dspm = DelayedStartupProcessMonitor()
- imps = dspm.reactor = InMemoryProcessSpawner()
- self.useFakeReactor(imps)
+ imps = InMemoryProcessSpawner()
+ dspm = DelayedStartupProcessMonitor(imps)
# Most arguments here will be ignored, so these are bogus values.
slave = TwistdSlaveProcess(
twistd = "bleh",
@@ -1063,10 +1050,9 @@
objects that have been added to it being started once per
delayInterval.
"""
- dspm = DelayedStartupProcessMonitor()
+ imps = InMemoryProcessSpawner()
+ dspm = DelayedStartupProcessMonitor(imps)
dspm.delayInterval = 3.0
- imps = dspm.reactor = InMemoryProcessSpawner()
- self.useFakeReactor(imps)
sampleCounter = range(0, 5)
for counter in sampleCounter:
slave = TwistdSlaveProcess(
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20101101/2a5fc682/attachment-0001.html>
More information about the calendarserver-changes
mailing list