[CalendarServer-changes] [8389] CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/ tap/caldav.py

source_changes at macosforge.org source_changes at macosforge.org
Sat Dec 3 00:38:45 PST 2011


Revision: 8389
          http://trac.macosforge.org/projects/calendarserver/changeset/8389
Author:   glyph at apple.com
Date:     2011-12-03 00:38:45 -0800 (Sat, 03 Dec 2011)
Log Message:
-----------
more complete propagation of values to subprocess

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/caldav.py

Modified: CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/caldav.py	2011-12-03 08:38:35 UTC (rev 8388)
+++ CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/caldav.py	2011-12-03 08:38:45 UTC (rev 8389)
@@ -39,8 +39,11 @@
 from twisted.python.log import FileLogObserver, ILogObserver
 from twisted.python.logfile import LogFile
 from twisted.python.usage import Options, UsageError
+from twisted.python.reflect import namedAny, qual
 
 from twisted.internet.defer import gatherResults, Deferred
+from twisted.internet.defer import inlineCallbacks, returnValue
+
 from twisted.internet import reactor as _reactor
 from twisted.internet.process import ProcessExitedAlready
 from twisted.internet.protocol import Protocol, Factory
@@ -62,11 +65,14 @@
 from twext.web2.channel.http import LimitingHTTPFactory, SSLRedirectRequest
 from twext.web2.metafd import ConnectionLimiter, ReportingHTTPService
 
-from txdav.common.datastore.upgrade.migrate import UpgradeToDatabaseService
-from txdav.common.datastore.upgrade.migrate import StoreSpawnerService
-from txdav.common.datastore.upgrade.sql.upgrade import UpgradeDatabaseSchemaService,\
-    UpgradeDatabaseDataService
+from txdav.common.datastore.upgrade.migrate import (
+    UpgradeToDatabaseService, StoreSpawnerService, swapAMP
+)
 
+from txdav.common.datastore.upgrade.sql.upgrade import (
+    UpgradeDatabaseSchemaService, UpgradeDatabaseDataService,
+)
+
 from twistedcaldav.config import ConfigurationError
 from twistedcaldav.config import config
 from twistedcaldav.localization import processLocalizationFiles
@@ -214,6 +220,7 @@
 
         self.overrides = {}
 
+
     @staticmethod
     def coerceOption(configDict, key, value):
         """
@@ -239,6 +246,7 @@
 
         return value
 
+
     @classmethod
     def setOverride(cls, configDict, path, value, overrideDict):
         """
@@ -899,6 +907,12 @@
 
         @rtype: L{IService}
         """
+
+        # FIXME: this is replicating the logic of getDBPool(), except for the
+        # part where the pgServiceFromConfig service is actually started here,
+        # and discarded in that function.  This should be refactored to simply
+        # use getDBPool.
+
         if config.UseDatabase:
 
             if os.getuid() == 0: # Only override if root
@@ -947,13 +961,18 @@
             cp.setServiceParent(ms)
             store = storeFromConfig(config, cp.connection)
             mainService = createMainService(cp, store)
-            upgradeSvc = UpgradeFileSystemFormatService(config,
+            upgradeSvc = UpgradeFileSystemFormatService(
+                config,
                 UpgradeDatabaseSchemaService.wrapService(
                     UpgradeDatabaseDataService.wrapService(
                         UpgradeToDatabaseService.wrapService(
                             CachingFilePath(config.DocumentRoot),
                             PostDBImportService(config, store, mainService),
-                            store, uid=uid, gid=gid
+                            store, uid=uid, gid=gid,
+                            spawner=ConfiguredChildSpawner(
+                                self, ConnectionDispenser(cp)
+                            ),
+                            parallel=config.MultiProcess.ProcessCount
                         ),
                         store, uid=uid, gid=gid
                     ),
@@ -1038,6 +1057,7 @@
         # Calculate the number of processes to spawn
         #
         if config.MultiProcess.ProcessCount == 0:
+            # TODO: this should probably be happening in a configuration hook.
             processCount = computeProcessCount(
                 config.MultiProcess.MinProcessCount,
                 config.MultiProcess.PerCPU,
@@ -1157,6 +1177,13 @@
 
 
 class ConnectionDispenser(object):
+    """
+    Object taht can dispense already-connected file descriptors, for use with
+    subprocess spawning.
+    """
+    # Very long term FIXME: this mechanism should ideally be eliminated, by
+    # making all subprocesses have a single stdio AMP connection that
+    # multiplexes between multiple protocols.
 
     def __init__(self, connectionPool):
         self.pool = connectionPool
@@ -1334,8 +1361,9 @@
         ("logID", String()),
         ("configFile", String()),
 
-        ## same as in config; no need to propagate it
-        # ("processCount", Integer()),
+        # computed value determined only in master, so needs to be propagated
+        # to be correct.
+        ("processCount", Integer()),
 
         ## only needed for request processing
         # ("inheritFDs", ListOf(Integer())),
@@ -1353,24 +1381,37 @@
     """
 
     @ConfigureChild.responder
-    def conf(self, delegateTo, pidfile, logID, configFile, connectionPoolFD=None):
+    def conf(self, delegateTo, pidfile, logID, configFile, processCount,
+             connectionPoolFD=None):
         """
-        Do the configuration.
+        Load the current config file into this child process, create a store
+        based on it, and delegate to the upgrade logic.
         """
-        # This stuff needs to be done by somebody in caldavd.py
-        from twistedcaldav.config import config
-        from calendarserver.tap.util import getDBPool, storeFromConfig
+        # Load the configuration file.
         config.load(configFile)
+
+        # Adjust the child's configuration to add all the relevant options for
+        # the store that won't be mentioned in the config file.
+        config.updateDefaults(dict(
+            LogID            = logID,
+            PIDFile          = pidfile,
+            DBAMPFD          = connectionPoolFD,
+            MultiProcess     = dict(
+                ProcessCount = processCount
+            )
+        ))
+
         pool, txnf = getDBPool(config)
         if pool is not None:
-            from twisted.internet import reactor
             pool.startService()
-            reactor.addSystemEventTrigger("before", "shutdown",
-                                          pool.stopService)
-        # XXX: SharedConnectionPool needs to be relayed out of band, as
-        # calendarserver.tap.caldav does with its own thing.
+            _reactor.addSystemEventTrigger(
+                "before", "shutdown", pool.stopService
+            )
         dbstore = storeFromConfig(config, txnf)
-        dbstore.setMigrating(True)
+        delegateClass = namedAny(delegateTo)
+        swapAMP(self, delegateClass(dbstore))
+        if connectionPoolFD is not None:
+            pass
         return {}
 
 
@@ -1380,11 +1421,48 @@
     L{StoreSpawnerService} that will load a full configuration into each child.
     """
 
+    def __init__(self, maker, options, dispenser):
+        """
+        Create a L{ConfiguredChildSpawner}.
+
+        @param maker: a L{CalDAVServiceMaker} instance that supplies the
+            configuration.
+
+        @param options: a L{CalDAVOptions} containing the command-line options
+            for the subprocess.
+
+        @param dispenser: a L{ConnectionDispenser} or C{None}.
+        """
+        self.nextID = 0
+        self.maker = maker
+        self.dispenser
+
+
+    @inlineCallbacks
     def spawnWithStore(self, here, there):
         """
         Spawn the child with a store based on a configuration.
         """
-        return {}
+        thisID = self.nextID
+        self.nextID += 1
+        if self.dispenser is not None:
+            poolfd = self.dispenser.dispense()
+            childFDs = {poolfd: poolfd}
+        else:
+            childFDs = None
+        controller = yield self.spawn(
+            AMP(), ChildConfigurator, childFDs=childFDs
+        )
+        yield controller.callRemote(
+            ConfigureChild,
+            delegateTo=qual(there),
+            pidfile="%s-migrator-%s" % (self.maker.tapname,
+                                        thisID),
+            logID="migrator-%s" % (thisID,),
+            configFile=self.options['config'],
+            processCount=config.MultiProcess.processCount,
+        )
+        returnValue(swapAMP(controller, here))
 
 
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111203/44a49ca6/attachment-0001.html>


More information about the calendarserver-changes mailing list