[CalendarServer-changes] [8377] CalendarServer/branches/users/glyph/parallel-upgrade/twext/internet/ spawnsvc.py

source_changes at macosforge.org source_changes at macosforge.org
Sat Dec 3 00:36:41 PST 2011


Revision: 8377
          http://trac.macosforge.org/projects/calendarserver/changeset/8377
Author:   glyph at apple.com
Date:     2011-12-03 00:36:40 -0800 (Sat, 03 Dec 2011)
Log Message:
-----------
Subprocess spawning service.

Added Paths:
-----------
    CalendarServer/branches/users/glyph/parallel-upgrade/twext/internet/spawnsvc.py

Added: CalendarServer/branches/users/glyph/parallel-upgrade/twext/internet/spawnsvc.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-upgrade/twext/internet/spawnsvc.py	                        (rev 0)
+++ CalendarServer/branches/users/glyph/parallel-upgrade/twext/internet/spawnsvc.py	2011-12-03 08:36:40 UTC (rev 8377)
@@ -0,0 +1,218 @@
+##
+# Copyright (c) 2011 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Utility service that can spawn subprocesses.
+"""
+
+import os
+import sys
+
+from twisted.python.reflect import namedAny
+from twisted.internet.stdio import StandardIO
+
+if __name__ == '__main__':
+
+    there = sys.argv[1]
+    protocolClass = namedAny(there)
+    proto = protocolClass()
+    origLost = proto.connectionLost
+    def goodbye(reason):
+        """
+        Stop the process if stdin is closed.
+        """
+        reactor.stop()
+        return origLost(reason)
+    proto.connectionLost = goodbye
+    StandardIO(proto)
+    from twisted.internet import reactor
+    reactor.run()
+    os._exit(0)
+
+
+import sys
+
+from zope.interface import implements
+
+from twisted.internet.interfaces import ITransport, IPushProducer, IConsumer
+
+from twisted.application.service import Service
+from twisted.python.reflect import qual
+from twisted.internet.protocol import ProcessProtocol
+from twisted.internet.defer import Deferred, succeed
+
+
+class BridgeTransport(object):
+
+    implements(ITransport, IPushProducer, IConsumer)
+
+    def __init__(self, processTransport):
+        self.transport = processTransport
+
+
+    def __getattr__(self, name):
+        return getattr(self.transport, name)
+
+
+    def getPeer(self):
+        return "Peer:PID:" + str(self.transport.pid)
+
+
+    def getHost(self):
+        return "Host:PID:" + str(self.transport.pid)
+
+
+
+class BridgeProtocol(ProcessProtocol):
+    """
+    A protocol for a bridge.
+
+    @ivar service: a L{SpawnerService} that created this L{BridgeProtocol}
+
+    @ivar protocol: a reference to the L{IProtocol}.
+    """
+
+    def __init__(self, service, protocol, killTimeout=15.0):
+        self.service = service
+        self.protocol = protocol
+        self.killTimeout = killTimeout
+        self.service.addBridge(self)
+
+
+    def connectionMade(self):
+        """
+        The subprocess was started.
+        """
+        self.protocol.makeConnection(BridgeTransport(self.transport))
+
+
+    def outReceived(self, data):
+        """
+        Some data was received to standard output; relay it to the protocol.
+        """
+        self.protocol.dataReceived(data)
+
+
+    _killTimeout = None
+    def eventuallyStop(self):
+        """
+        Eventually stop this subprocess.  Send it a SIGTERM, and if it hasn't
+        stopped by C{self.killTimeout} seconds, send it a SIGKILL.
+        """
+        self.transport.signalProcess('TERM')
+        def reallyStop():
+            self.transport.signalProcess("KILL")
+            self._killTimeout = None
+        self._killTimeout = (
+            self.service.reactor.callLater(self.killTimeout, reallyStop)
+        )
+
+
+    def processEnded(self, reason):
+        """
+        The process has ended; notify the service that this bridge has stopped.
+        """
+        if self._killTimeout is not None:
+            self._killTimeout.cancel()
+        self.protocol.connectionLost(reason)
+        self.service.removeBridge(self)
+
+
+
+class SpawnerService(Service, object):
+    """
+    Process to spawn services and then shut them down.
+
+    @ivar reactor: an L{IReactorProcess}/L{IReactorTime}
+
+    @ivar pendingSpawns: a C{list} of 2-C{tuple}s of hereProto, thereProto.
+
+    @ivar bridges: a C{list} of L{BridgeProtocol} instances.
+    """
+
+    def __init__(self, reactor=None):
+        if reactor is None:
+            from twisted.internet import reactor
+        self.reactor = reactor
+        self.pendingSpawns = []
+        self.bridges = []
+
+
+    def spawn(self, hereProto, thereProto):
+        """
+        Spawn a subprocess.
+
+        @param hereProto: a L{Protocol} instance to listen in this process.
+
+        @param thereProto: a top-level class or function.
+
+        @return: hereProto
+        """
+        if not self.running:
+            self.pendingSpawns.append((hereProto, thereProto))
+            return
+        name = qual(thereProto)
+        self.reactor.spawnProcess(
+            BridgeProtocol(self, hereProto),
+            sys.executable, [sys.executable, '-m', __name__, name], os.environ
+        )
+        return hereProto
+
+
+    def startService(self):
+        """
+        Start the service; spawn any processes previously started with spawn().
+        """
+        super(SpawnerService, self).startService()
+        for spawn in self.pendingSpawns:
+            self.spawn(*spawn)
+
+
+    def addBridge(self, bridge):
+        """
+        Add a L{BridgeProtocol} to the list to be tracked.
+        """
+        self.bridges.append(bridge)
+
+
+    def removeBridge(self, bridge):
+        """
+        The process controlled by a L{BridgeProtocol} has terminated; remove it
+        from the active list, and fire any outstanding Deferred.
+
+        @param bridge: the protocol which has ended.
+        """
+        self.bridges.remove(bridge)
+        if self._stopAllDeferred is not None:
+            if len(self.bridges) == 0:
+                self._stopAllDeferred.callback(None)
+                self._stopAllDeferred = False
+
+
+    def stopService(self):
+        """
+        Stop the service.
+        """
+        super(SpawnerService, self).stopService()
+        if self.bridges:
+            self._stopAllDeferred = Deferred()
+            for bridge in self.bridges:
+                bridge.eventuallyStop()
+            return self._stopAllDeferred
+        return succeed(None)
+
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111203/c3d50009/attachment-0001.html>


More information about the calendarserver-changes mailing list