[CalendarServer-changes] [6522] CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap

source_changes at macosforge.org source_changes at macosforge.org
Mon Nov 1 14:21:50 PDT 2010


Revision: 6522
          http://trac.macosforge.org/projects/calendarserver/changeset/6522
Author:   glyph at apple.com
Date:     2010-11-01 14:21:47 -0700 (Mon, 01 Nov 2010)
Log Message:
-----------
refactor DelayedStartupProcessMonitor to be a stand-alone service, rather than using procmon.  (It does so much itself, there's not much point to continuing to inherit.)

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/caldav.py
    CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/test/test_caldav.py

Modified: CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/caldav.py	2010-11-01 21:21:27 UTC (rev 6521)
+++ CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/caldav.py	2010-11-01 21:21:47 UTC (rev 6522)
@@ -29,7 +29,6 @@
 from subprocess import Popen, PIPE
 from pwd import getpwuid, getpwnam
 from grp import getgrnam
-from inspect import getargspec
 import OpenSSL
 from OpenSSL.SSL import Error as SSLError
 
@@ -41,13 +40,14 @@
 from twisted.python.reflect import namedClass
 from twisted.plugin import IPlugin
 from twisted.internet.defer import gatherResults
-from twisted.internet import reactor
+from twisted.internet import reactor as _reactor
 from twisted.internet.reactor import addSystemEventTrigger
 from twisted.internet.process import ProcessExitedAlready
 from twisted.internet.protocol import Protocol, Factory
+from twisted.internet.protocol import ProcessProtocol
 from twisted.application.internet import TCPServer, UNIXServer
 from twisted.application.service import MultiService, IServiceMaker
-from twisted.runner import procmon
+from twisted.application.service import Service
 
 import twext
 from twext.web2.server import Site
@@ -1221,7 +1221,7 @@
 
 
 
-class DelayedStartupProcessMonitor(procmon.ProcessMonitor):
+class DelayedStartupProcessMonitor(Service, object):
     """
     A L{DelayedStartupProcessMonitor} is a L{procmon.ProcessMonitor} that
     defers building its command lines until the service is actually ready to
@@ -1252,22 +1252,22 @@
         Deferreds that track service shutdown.
     """
 
-    _shouldPassReactor = (
-        len(getargspec(procmon.ProcessMonitor.__init__)[0]) > 1
-    )
+    threshold = 1
+    killTime = 5
+    minRestartDelay = 1
+    maxRestartDelay = 3600
 
-    def __init__(self, *args, **kwargs):
-        reactorToUse = kwargs.get("reactor", reactor)
-        if not self._shouldPassReactor:
-            # Try to do this the right way if we can, otherwise, let the tests
-            # monkeypatch.  (Our superclass does not accept a 'reactor'
-            # argument in Twisted 10.0.0, but does in Twisted 10.1.0 and
-            # later.)
-            kwargs.pop('reactor', None)
-        procmon.ProcessMonitor.__init__(self, *args, **kwargs)
+    def __init__(self, reactor=_reactor):
+        super(DelayedStartupProcessMonitor, self).__init__()
+        self._reactor = reactor
+        self.processes = {}
+        self.protocols = {}
+        self.delay = {}
+        self.timeStarted = {}
+        self.murder = {}
+        self.restart = {}
         self.processObjects = []
         self._extraFDs = {}
-        self.reactor = reactorToUse
         self.stopping = False
         if config.MultiProcess.StaggeredStartup.Enabled:
             self.delayInterval = config.MultiProcess.StaggeredStartup.Interval
@@ -1275,6 +1275,39 @@
             self.delayInterval = 0
 
 
+    def addProcess(self, name, args, uid=None, gid=None, env={}):
+        """
+        Add a new monitored process and start it immediately if the
+        L{DelayedStartupProcessMonitor} service is running.
+
+        Note that args are passed to the system call, not to the shell. If
+        running the shell is desired, the common idiom is to use
+        C{ProcessMonitor.addProcess("name", ['/bin/sh', '-c', shell_script])}
+
+        @param name: A name for this process.  This value must be
+            unique across all processes added to this monitor.
+        @type name: C{str}
+        @param args: The argv sequence for the process to launch.
+        @param uid: The user ID to use to run the process.  If C{None},
+            the current UID is used.
+        @type uid: C{int}
+        @param gid: The group ID to use to run the process.  If C{None},
+            the current GID is used.
+        @type uid: C{int}
+        @param env: The environment to give to the launched process. See
+            L{IReactorProcess.spawnProcess}'s C{env} parameter.
+        @type env: C{dict}
+        @raises: C{KeyError} if a process with the given name already
+            exists
+        """
+        if name in self.processes:
+            raise KeyError("remove %s first" % (name,))
+        self.processes[name] = args, uid, gid, env
+        self.delay[name] = self.minRestartDelay
+        if self.running:
+            self.startProcess(name)
+
+
     def addProcessObject(self, process, env):
         """
         Add a process object to be run when this service is started.
@@ -1296,7 +1329,9 @@
             filedes = processObject.getFileDescriptors()
             self._extraFDs[name] = filedes
             self.addProcess(name, cmdline, env=env)
-        procmon.ProcessMonitor.startService(self)
+        super(DelayedStartupProcessMonitor, self).startService()
+        for name in self.processes:
+            self.startProcess(name)
 
 
     def stopService(self):
@@ -1305,21 +1340,96 @@
         """
         self.stopping = True
         self.deferreds = {}
-        procmon.ProcessMonitor.stopService(self)
+        super(DelayedStartupProcessMonitor, self).stopService()
+
+        # Cancel any outstanding restarts
+        for name, delayedCall in self.restart.items():
+            if delayedCall.active():
+                delayedCall.cancel()
+
+        for name in self.processes:
+            self.stopProcess(name)
         return gatherResults(self.deferreds.values())
 
 
+    def removeProcess(self, name):
+        """
+        Stop the named process and remove it from the list of monitored
+        processes.
+
+        @type name: C{str}
+        @param name: A string that uniquely identifies the process.
+        """
+        self.stopProcess(name)
+        del self.processes[name]
+
+
+    def stopProcess(self, name):
+        """
+        @param name: The name of the process to be stopped
+        """
+        if name not in self.processes:
+            raise KeyError('Unrecognized process name: %s' % (name,))
+
+        proto = self.protocols.get(name, None)
+        if proto is not None:
+            proc = proto.transport
+            try:
+                proc.signalProcess('TERM')
+            except ProcessExitedAlready:
+                pass
+            else:
+                self.murder[name] = self._reactor.callLater(
+                                            self.killTime,
+                                            self._forceStopProcess, proc)
+
+
     def processEnded(self, name):
         """
         When a child process has ended it calls me so I can fire the
         appropriate deferred which was created in stopService
         """
+        # Cancel the scheduled _forceStopProcess function if the process
+        # dies naturally
+        if name in self.murder:
+            if self.murder[name].active():
+                self.murder[name].cancel()
+            del self.murder[name]
+
+        del self.protocols[name]
+
+        if self._reactor.seconds() - self.timeStarted[name] < self.threshold:
+            # The process died too fast - backoff
+            nextDelay = self.delay[name]
+            self.delay[name] = min(self.delay[name] * 2, self.maxRestartDelay)
+
+        else:
+            # Process had been running for a significant amount of time
+            # restart immediately
+            nextDelay = 0
+            self.delay[name] = self.minRestartDelay
+
+        # Schedule a process restart if the service is running
+        if self.running and name in self.processes:
+            self.restart[name] = self._reactor.callLater(nextDelay,
+                                                         self.startProcess,
+                                                         name)
         if self.stopping:
             deferred = self.deferreds.get(name, None)
             if deferred is not None:
                 deferred.callback(None)
 
 
+    def _forceStopProcess(self, proc):
+        """
+        @param proc: An L{IProcessTransport} provider
+        """
+        try:
+            proc.signalProcess('KILL')
+        except ProcessExitedAlready:
+            pass
+
+
     def signalAll(self, signal, startswithname=None):
         """
         Send a signal to all child processes.
@@ -1372,7 +1482,7 @@
 
         childFDs.update(self._extraFDs.get(name, {}))
 
-        self.reactor.spawnProcess(
+        self._reactor.spawnProcess(
             p, args[0], args, uid=uid, gid=gid, env=env,
             childFDs=childFDs
         )
@@ -1392,10 +1502,38 @@
         def delayedStart():
             self._pendingStarts -= 1
             self.reallyStartProcess(name)
-        self.reactor.callLater(interval, delayedStart)
+        self._reactor.callLater(interval, delayedStart)
 
 
+    def restartAll(self):
+        """
+        Restart all processes. This is useful for third party management
+        services to allow a user to restart servers because of an outside change
+        in circumstances -- for example, a new version of a library is
+        installed.
+        """
+        for name in self.processes:
+            self.stopProcess(name)
 
+
+    def __repr__(self):
+        l = []
+        for name, proc in self.processes.items():
+            uidgid = ''
+            if proc[1] is not None:
+                uidgid = str(proc[1])
+            if proc[2] is not None:
+                uidgid += ':'+str(proc[2])
+
+            if uidgid:
+                uidgid = '(' + uidgid + ')'
+            l.append('%r%s: %r' % (name, uidgid, proc[0]))
+        return ('<' + self.__class__.__name__ + ' '
+                + ' '.join(l)
+                + '>')
+
+
+
 class DelayedStartupLineLogger(object):
     """
     A line logger that can handle very long lines.
@@ -1448,28 +1586,42 @@
 
 
 
-class DelayedStartupLoggingProtocol(procmon.LoggingProtocol, object):
+class DelayedStartupLoggingProtocol(ProcessProtocol):
     """
     Logging protocol that handles lines which are too long.
     """
 
+    service = None
+    name = None
+    empty = 1
+
     def connectionMade(self):
         """
         Replace the superclass's output monitoring logic with one that can
         handle lineLengthExceeded.
         """
-        super(DelayedStartupLoggingProtocol, self).connectionMade()
         self.output = DelayedStartupLineLogger()
+        self.output.makeConnection(self.transport)
         self.output.tag = self.name
 
+
+    def outReceived(self, data):
+        self.output.dataReceived(data)
+        self.empty = data[-1] == '\n'
+
+    errReceived = outReceived
+
+
     def processEnded(self, reason):
         """
         Let the service know that this child process has ended
         """
-        procmon.LoggingProtocol.processEnded(self, reason)
+        if not self.empty:
+            self.output.dataReceived('\n')
         self.service.processEnded(self.name)
 
 
+
 def getSSLPassphrase(*ignored):
 
     if not config.SSLPrivateKey:

Modified: CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/test/test_caldav.py
===================================================================
--- CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/test/test_caldav.py	2010-11-01 21:21:27 UTC (rev 6521)
+++ CalendarServer/branches/users/glyph/sharedpool/calendarserver/tap/test/test_caldav.py	2010-11-01 21:21:47 UTC (rev 6522)
@@ -932,17 +932,6 @@
     Test cases for L{DelayedStartupProcessMonitor}.
     """
 
-    def useFakeReactor(self, fakeReactor):
-        """
-        Earlier versions of Twisted used a global reactor in
-        L{twisted.runner.procmon}, so we need to take that into account when
-        testing.
-        """
-        if not DelayedStartupProcessMonitor._shouldPassReactor:
-            import twisted.runner.procmon as inherited
-            self.patch(inherited, "reactor", fakeReactor)
-
-
     def test_lineAfterLongLine(self):
         """
         A "long" line of output from a monitored process (longer than
@@ -972,7 +961,7 @@
                 logged.append(event)
                 if m == '[Dummy] z':
                     d.callback("done")
-            
+
         log.addObserver(tempObserver)
         self.addCleanup(log.removeObserver, tempObserver)
         d = Deferred()
@@ -996,9 +985,8 @@
         If a L{TwistdSlaveProcess} specifies some file descriptors to be
         inherited, they should be inherited by the subprocess.
         """
-        dspm                = DelayedStartupProcessMonitor()
-        imps = dspm.reactor = InMemoryProcessSpawner()
-        self.useFakeReactor(imps)
+        imps = InMemoryProcessSpawner()
+        dspm = DelayedStartupProcessMonitor(imps)
 
         # Most arguments here will be ignored, so these are bogus values.
         slave = TwistdSlaveProcess(
@@ -1030,9 +1018,8 @@
         configuration argument should be passed that indicates to the
         subprocess.
         """
-        dspm                = DelayedStartupProcessMonitor()
-        imps = dspm.reactor = InMemoryProcessSpawner()
-        self.useFakeReactor(imps)
+        imps = InMemoryProcessSpawner()
+        dspm = DelayedStartupProcessMonitor(imps)
         # Most arguments here will be ignored, so these are bogus values.
         slave = TwistdSlaveProcess(
             twistd     = "bleh",
@@ -1063,10 +1050,9 @@
         objects that have been added to it being started once per
         delayInterval.
         """
-        dspm                = DelayedStartupProcessMonitor()
+        imps = InMemoryProcessSpawner()
+        dspm = DelayedStartupProcessMonitor(imps)
         dspm.delayInterval = 3.0
-        imps = dspm.reactor = InMemoryProcessSpawner()
-        self.useFakeReactor(imps)
         sampleCounter = range(0, 5)
         for counter in sampleCounter:
             slave = TwistdSlaveProcess(
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20101101/2a5fc682/attachment-0001.html>


More information about the calendarserver-changes mailing list