[CalendarServer-changes] [8393] CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/ tap

source_changes at macosforge.org source_changes at macosforge.org
Sat Dec 3 00:39:23 PST 2011


Revision: 8393
          http://trac.macosforge.org/projects/calendarserver/changeset/8393
Author:   glyph at apple.com
Date:     2011-12-03 00:39:23 -0800 (Sat, 03 Dec 2011)
Log Message:
-----------
split out some functionality to prevent the tap module from more unwieldy

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/caldav.py
    CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/util.py

Added Paths:
-----------
    CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/cfgchild.py

Modified: CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/caldav.py	2011-12-03 08:39:13 UTC (rev 8392)
+++ CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/caldav.py	2011-12-03 08:39:23 UTC (rev 8393)
@@ -39,18 +39,13 @@
 from twisted.python.log import FileLogObserver, ILogObserver
 from twisted.python.logfile import LogFile
 from twisted.python.usage import Options, UsageError
-from twisted.python.reflect import namedAny, qual
 
 from twisted.internet.defer import gatherResults, Deferred
-from twisted.internet.defer import inlineCallbacks, returnValue
 
-from twisted.internet import reactor as _reactor
 from twisted.internet.process import ProcessExitedAlready
 from twisted.internet.protocol import Protocol, Factory
 from twisted.internet.protocol import ProcessProtocol
 
-from twisted.protocols.amp import AMP, Command, String, Integer#, ListOf
-
 from twisted.application.internet import TCPServer, UNIXServer
 from twisted.application.service import MultiService, IServiceMaker
 from twisted.application.service import Service
@@ -65,13 +60,10 @@
 from twext.web2.channel.http import LimitingHTTPFactory, SSLRedirectRequest
 from twext.web2.metafd import ConnectionLimiter, ReportingHTTPService
 
-from txdav.common.datastore.upgrade.migrate import (
-    UpgradeToDatabaseService, StoreSpawnerService, swapAMP
-)
-
 from txdav.common.datastore.upgrade.sql.upgrade import (
     UpgradeDatabaseSchemaService, UpgradeDatabaseDataService,
 )
+from txdav.common.datastore.upgrade.migrate import UpgradeToDatabaseService
 
 from twistedcaldav.config import ConfigurationError
 from twistedcaldav.config import config
@@ -86,7 +78,6 @@
 from twext.enterprise.ienterprise import POSTGRES_DIALECT
 from twext.enterprise.ienterprise import ORACLE_DIALECT
 from twext.enterprise.adbapi2 import ConnectionPool
-from twext.enterprise.adbapi2 import ConnectionPoolConnection
 
 try:
     from twistedcaldav.authkerb import NegotiateCredentialFactory
@@ -94,14 +85,17 @@
 except ImportError:
     NegotiateCredentialFactory = None
 
+from calendarserver.tap.util import ConnectionDispenser
+
 from calendarserver.accesslog import AMPCommonAccessLoggingObserver
 from calendarserver.accesslog import AMPLoggingFactory
 from calendarserver.accesslog import RotatingFileAccessLoggingObserver
 from calendarserver.tap.util import getRootResource, computeProcessCount
-from calendarserver.tap.util import ConnectionWithPeer
+
 from calendarserver.tap.util import storeFromConfig
 from calendarserver.tap.util import pgConnectorFromConfig
 from calendarserver.tap.util import oracleConnectorFromConfig
+from calendarserver.tap.cfgchild import ConfiguredChildSpawner
 from calendarserver.tools.util import checkDirectory
 
 try:
@@ -926,7 +920,7 @@
                                 PostDBImportService(config, store, mainService),
                                 store, uid=overrideUID, gid=overrideGID,
                                 spawner=ConfiguredChildSpawner(
-                                    self, ConnectionDispenser(cp)
+                                    self, ConnectionDispenser(cp), config
                                 ),
                                 parallel=config.MultiProcess.ProcessCount
                             ),
@@ -1170,35 +1164,6 @@
 
 
 
-class ConnectionDispenser(object):
-    """
-    Object taht can dispense already-connected file descriptors, for use with
-    subprocess spawning.
-    """
-    # Very long term FIXME: this mechanism should ideally be eliminated, by
-    # making all subprocesses have a single stdio AMP connection that
-    # multiplexes between multiple protocols.
-
-    def __init__(self, connectionPool):
-        self.pool = connectionPool
-
-
-    def dispense(self):
-        """
-        Dispense a file descriptor, already connected to a server, for a
-        client.
-        """
-        # FIXME: these sockets need to be re-dispensed when the process is
-        # respawned, and they currently won't be.
-        c, s = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
-        protocol = ConnectionPoolConnection(self.pool)
-        transport = ConnectionWithPeer(s, protocol)
-        protocol.makeConnection(transport)
-        transport.startReading()
-        return c
-
-
-
 class TwistdSlaveProcess(object):
     """
     A L{TwistdSlaveProcess} is information about how to start a slave process
@@ -1342,128 +1307,7 @@
         return args
 
 
-class ConfigureChild(Command):
-    """
-    Configure a child process, most especially with all the information that it
-    needs in order to construct a data store.
-    """
 
-    arguments = [
-        # The name of the class to delegate to once configuration is complete.
-        ("delegateTo", String()),
-        ("pidFile", String()),
-        ("logID", String()),
-        ("configFile", String()),
-
-        # computed value determined only in master, so needs to be propagated
-        # to be correct.
-        ("processCount", Integer()),
-
-        ## only needed for request processing, and we're not using this
-        ## facility for that work (yet)
-        # ("inheritFDs", ListOf(Integer())),
-        # ("inheritSSLFDs", ListOf(Integer())),
-        # ("metaFD", String(optional=True)),
-
-        ## shared connection pool!
-        ("connectionPoolFD", Integer(optional=True)),
-    ]
-
-
-
-class ChildConfigurator(AMP):
-    """
-    Protocol which can configure a child process.
-    """
-
-    @ConfigureChild.responder
-    def conf(self, delegateTo, pidFile, logID, configFile, processCount,
-             connectionPoolFD=None):
-        """
-        Load the current config file into this child process, create a store
-        based on it, and delegate to the upgrade logic.
-        """
-        # Load the configuration file.
-        config.load(configFile)
-
-        # Adjust the child's configuration to add all the relevant options for
-        # the store that won't be mentioned in the config file.
-        changedConfig = dict(
-            LogID            = logID,
-            PIDFile          = pidFile,
-            MultiProcess     = dict(
-                ProcessCount = processCount
-            )
-        )
-        if connectionPoolFD is not None:
-            changedConfig.update(DBAMPFD=connectionPoolFD)
-        config.updateDefaults(changedConfig)
-
-        # Construct and start database pool and store.
-        pool, txnf = getDBPool(config)
-        if pool is not None:
-            pool.startService()
-            _reactor.addSystemEventTrigger(
-                "before", "shutdown", pool.stopService
-            )
-        dbstore = storeFromConfig(config, txnf)
-
-        # Finally, construct the class we're supposed to delegate to.
-        delegateClass = namedAny(delegateTo)
-        swapAMP(self, delegateClass(dbstore))
-        return {}
-
-
-
-class ConfiguredChildSpawner(StoreSpawnerService):
-    """
-    L{StoreSpawnerService} that will load a full configuration into each child.
-    """
-
-    def __init__(self, maker, options, dispenser):
-        """
-        Create a L{ConfiguredChildSpawner}.
-
-        @param maker: a L{CalDAVServiceMaker} instance that supplies the
-            configuration.
-
-        @param options: a L{CalDAVOptions} containing the command-line options
-            for the subprocess.
-
-        @param dispenser: a L{ConnectionDispenser} or C{None}.
-        """
-        self.nextID = 0
-        self.maker = maker
-        self.dispenser
-
-
-    @inlineCallbacks
-    def spawnWithStore(self, here, there):
-        """
-        Spawn the child with a store based on a configuration.
-        """
-        thisID = self.nextID
-        self.nextID += 1
-        if self.dispenser is not None:
-            poolfd = self.dispenser.dispense()
-            childFDs = {poolfd: poolfd}
-        else:
-            childFDs = None
-        controller = yield self.spawn(
-            AMP(), ChildConfigurator, childFDs=childFDs
-        )
-        yield controller.callRemote(
-            ConfigureChild,
-            delegateTo=qual(there),
-            pidfile="%s-migrator-%s" % (self.maker.tapname, thisID),
-            logID="migrator-%s" % (thisID,),
-            configFile=self.options['config'],
-            processCount=config.MultiProcess.processCount,
-        )
-        returnValue(swapAMP(controller, here))
-
-
-
 class ControlPortTCPServer(TCPServer):
     """ This TCPServer retrieves the port number that was actually assigned
         when the service was started, and stores that into config.ControlPort
@@ -1503,8 +1347,10 @@
     minRestartDelay = 1
     maxRestartDelay = 3600
 
-    def __init__(self, reactor=_reactor):
+    def __init__(self, reactor=None):
         super(DelayedStartupProcessMonitor, self).__init__()
+        if reactor is None:
+            from twisted.internet import reactor
         self._reactor = reactor
         self.processes = {}
         self.protocols = {}

Added: CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/cfgchild.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/cfgchild.py	                        (rev 0)
+++ CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/cfgchild.py	2011-12-03 08:39:23 UTC (rev 8393)
@@ -0,0 +1,172 @@
+##
+# Copyright (c) 2005-2011 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tools for spawning general-purpose child processes that have a store devrived
+from a .
+"""
+
+__all__ = [
+    # Only the spawner service is really interesting; the other parts are
+    # internal implementation details which shouldn't be needed outside this
+    # file.
+    'ConfiguredChildSpawner',
+]
+
+from twisted.python.reflect import namedAny, qual
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.protocols.amp import AMP, Command, String, Integer#, ListOf
+from txdav.common.datastore.upgrade.migrate import (
+    StoreSpawnerService, swapAMP
+)
+
+from calendarserver.tap.util import getDBPool, storeFromConfig
+
+class ConfigureChild(Command):
+    """
+    Configure a child process, most especially with all the information that it
+    needs in order to construct a data store.
+    """
+
+    arguments = [
+        # The name of the class to delegate to once configuration is complete.
+        ("delegateTo", String()),
+        ("pidFile", String()),
+        ("logID", String()),
+        ("configFile", String()),
+
+        # computed value determined only in master, so needs to be propagated
+        # to be correct.
+        ("processCount", Integer()),
+
+        ## only needed for request processing, and we're not using this
+        ## facility for that work (yet)
+        # ("inheritFDs", ListOf(Integer())),
+        # ("inheritSSLFDs", ListOf(Integer())),
+        # ("metaFD", String(optional=True)),
+
+        ## shared connection pool!
+        ("connectionPoolFD", Integer(optional=True)),
+    ]
+
+
+
+class ChildConfigurator(AMP):
+    """
+    Protocol which can configure a child process.
+    """
+
+    def __init__(self, config=None):
+        """
+        Optionally accept a configuration for testing, but normally created in
+        the subprocess configuration-free.
+        """
+        super(AMP, self).__init__()
+        if config is None:
+            from twistedcaldav.config import config
+        self.config = config
+
+
+    @ConfigureChild.responder
+    def conf(self, delegateTo, pidFile, logID, configFile, processCount,
+             connectionPoolFD=None):
+        """
+        Load the current config file into this child process, create a store
+        based on it, and delegate to the upgrade logic.
+        """
+        # Load the configuration file.
+        self.config.load(configFile)
+
+        # Adjust the child's configuration to add all the relevant options for
+        # the store that won't be mentioned in the config file.
+        changedConfig = dict(
+            LogID            = logID,
+            PIDFile          = pidFile,
+            MultiProcess     = dict(
+                ProcessCount = processCount
+            )
+        )
+        if connectionPoolFD is not None:
+            changedConfig.update(DBAMPFD=connectionPoolFD)
+        self.config.updateDefaults(changedConfig)
+
+        # Construct and start database pool and store.
+        pool, txnf = getDBPool(self.config)
+        if pool is not None:
+            from twisted.internet import reactor
+            pool.startService()
+            reactor.addSystemEventTrigger(
+                "before", "shutdown", pool.stopService
+            )
+        dbstore = storeFromConfig(self.config, txnf)
+
+        # Finally, construct the class we're supposed to delegate to.
+        delegateClass = namedAny(delegateTo)
+        swapAMP(self, delegateClass(dbstore))
+        return {}
+
+
+
+class ConfiguredChildSpawner(StoreSpawnerService):
+    """
+    L{StoreSpawnerService} that will load a full configuration into each child.
+    """
+
+    def __init__(self, maker, dispenser, config):
+        """
+        Create a L{ConfiguredChildSpawner}.
+
+        @param maker: a L{CalDAVServiceMaker} instance that supplies the
+            configuration.
+
+        @param dispenser: a L{calendarserver.tap.ConnectionDispenser} or C{None}.
+
+        @param config: the L{twistedcaldav.config.Config} to use to configure
+            the subprocess.
+        """
+        self.nextID = 0
+        self.maker = maker
+        self.dispenser = dispenser
+        self.config = config
+
+
+    @inlineCallbacks
+    def spawnWithStore(self, here, there):
+        """
+        Spawn the child with a store based on a configuration.
+        """
+        thisID = self.nextID
+        self.nextID += 1
+        if self.dispenser is not None:
+            poolfd = self.dispenser.dispense()
+            childFDs = {poolfd: poolfd}
+        else:
+            childFDs = None
+        controller = yield self.spawn(
+            AMP(), ChildConfigurator, childFDs=childFDs
+        )
+        yield controller.callRemote(
+            ConfigureChild,
+            delegateTo=qual(there),
+            pidfile="%s-migrator-%s" % (self.maker.tapname, thisID),
+            logID="migrator-%s" % (thisID,),
+            configFile=self.config.getProvider().getConfigFileName(),
+            processCount=self.config.MultiProcess.processCount,
+        )
+        returnValue(swapAMP(controller, here))
+
+
+

Modified: CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/util.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/util.py	2011-12-03 08:39:13 UTC (rev 8392)
+++ CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/util.py	2011-12-03 08:39:23 UTC (rev 8393)
@@ -28,7 +28,7 @@
 import errno
 import os
 from time import sleep
-from socket import fromfd, AF_UNIX, SOCK_STREAM
+from socket import fromfd, AF_UNIX, SOCK_STREAM, socketpair
 
 from twext.python.filepath import CachingFilePath as FilePath
 from twext.python.log import Logger
@@ -67,7 +67,7 @@
 from twistedcaldav.util import getMemorySize, getNCPU
 from twext.enterprise.ienterprise import POSTGRES_DIALECT
 from twext.enterprise.ienterprise import ORACLE_DIALECT
-from twext.enterprise.adbapi2 import ConnectionPool
+from twext.enterprise.adbapi2 import ConnectionPool, ConnectionPoolConnection
 
 try:
     from twistedcaldav.authkerb import NegotiateCredentialFactory
@@ -163,7 +163,8 @@
 
 def transactionFactoryFromFD(dbampfd, dialect, paramstyle):
     """
-    Create a transaction factory from an inherited file descriptor.
+    Create a transaction factory from an inherited file descriptor, such as one
+    created by L{ConnectionDispenser}.
     """
     skt = fromfd(dbampfd, AF_UNIX, SOCK_STREAM)
     os.close(dbampfd)
@@ -175,6 +176,35 @@
 
 
 
+class ConnectionDispenser(object):
+    """
+    A L{ConnectionDispenser} can dispense already-connected file descriptors,
+    for use with subprocess spawning.
+    """
+    # Very long term FIXME: this mechanism should ideally be eliminated, by
+    # making all subprocesses have a single stdio AMP connection that
+    # multiplexes between multiple protocols.
+
+    def __init__(self, connectionPool):
+        self.pool = connectionPool
+
+
+    def dispense(self):
+        """
+        Dispense a socket object, already connected to a server, for a client
+        in a subprocess.
+        """
+        # FIXME: these sockets need to be re-dispensed when the process is
+        # respawned, and they currently won't be.
+        c, s = socketpair(AF_UNIX, SOCK_STREAM)
+        protocol = ConnectionPoolConnection(self.pool)
+        transport = ConnectionWithPeer(s, protocol)
+        protocol.makeConnection(transport)
+        transport.startReading()
+        return c
+
+
+
 def storeFromConfig(config, txnFactory):
     """
     Produce an L{IDataStore} from the given configuration, transaction factory,
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111203/acf2ad08/attachment-0001.html>


More information about the calendarserver-changes mailing list