[CalendarServer-changes] [8389] CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/ tap/caldav.py
source_changes at macosforge.org
source_changes at macosforge.org
Sat Dec 3 00:38:45 PST 2011
Revision: 8389
http://trac.macosforge.org/projects/calendarserver/changeset/8389
Author: glyph at apple.com
Date: 2011-12-03 00:38:45 -0800 (Sat, 03 Dec 2011)
Log Message:
-----------
more complete propagation of values to subprocess
Modified Paths:
--------------
CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/caldav.py
Modified: CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/caldav.py 2011-12-03 08:38:35 UTC (rev 8388)
+++ CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/caldav.py 2011-12-03 08:38:45 UTC (rev 8389)
@@ -39,8 +39,11 @@
from twisted.python.log import FileLogObserver, ILogObserver
from twisted.python.logfile import LogFile
from twisted.python.usage import Options, UsageError
+from twisted.python.reflect import namedAny, qual
from twisted.internet.defer import gatherResults, Deferred
+from twisted.internet.defer import inlineCallbacks, returnValue
+
from twisted.internet import reactor as _reactor
from twisted.internet.process import ProcessExitedAlready
from twisted.internet.protocol import Protocol, Factory
@@ -62,11 +65,14 @@
from twext.web2.channel.http import LimitingHTTPFactory, SSLRedirectRequest
from twext.web2.metafd import ConnectionLimiter, ReportingHTTPService
-from txdav.common.datastore.upgrade.migrate import UpgradeToDatabaseService
-from txdav.common.datastore.upgrade.migrate import StoreSpawnerService
-from txdav.common.datastore.upgrade.sql.upgrade import UpgradeDatabaseSchemaService,\
- UpgradeDatabaseDataService
+from txdav.common.datastore.upgrade.migrate import (
+ UpgradeToDatabaseService, StoreSpawnerService, swapAMP
+)
+from txdav.common.datastore.upgrade.sql.upgrade import (
+ UpgradeDatabaseSchemaService, UpgradeDatabaseDataService,
+)
+
from twistedcaldav.config import ConfigurationError
from twistedcaldav.config import config
from twistedcaldav.localization import processLocalizationFiles
@@ -214,6 +220,7 @@
self.overrides = {}
+
@staticmethod
def coerceOption(configDict, key, value):
"""
@@ -239,6 +246,7 @@
return value
+
@classmethod
def setOverride(cls, configDict, path, value, overrideDict):
"""
@@ -899,6 +907,12 @@
@rtype: L{IService}
"""
+
+ # FIXME: this is replicating the logic of getDBPool(), except for the
+ # part where the pgServiceFromConfig service is actually started here,
+ # and discarded in that function. This should be refactored to simply
+ # use getDBPool.
+
if config.UseDatabase:
if os.getuid() == 0: # Only override if root
@@ -947,13 +961,18 @@
cp.setServiceParent(ms)
store = storeFromConfig(config, cp.connection)
mainService = createMainService(cp, store)
- upgradeSvc = UpgradeFileSystemFormatService(config,
+ upgradeSvc = UpgradeFileSystemFormatService(
+ config,
UpgradeDatabaseSchemaService.wrapService(
UpgradeDatabaseDataService.wrapService(
UpgradeToDatabaseService.wrapService(
CachingFilePath(config.DocumentRoot),
PostDBImportService(config, store, mainService),
- store, uid=uid, gid=gid
+ store, uid=uid, gid=gid,
+ spawner=ConfiguredChildSpawner(
+ self, ConnectionDispenser(cp)
+ ),
+ parallel=config.MultiProcess.ProcessCount
),
store, uid=uid, gid=gid
),
@@ -1038,6 +1057,7 @@
# Calculate the number of processes to spawn
#
if config.MultiProcess.ProcessCount == 0:
+ # TODO: this should probably be happening in a configuration hook.
processCount = computeProcessCount(
config.MultiProcess.MinProcessCount,
config.MultiProcess.PerCPU,
@@ -1157,6 +1177,13 @@
class ConnectionDispenser(object):
+ """
+ Object taht can dispense already-connected file descriptors, for use with
+ subprocess spawning.
+ """
+ # Very long term FIXME: this mechanism should ideally be eliminated, by
+ # making all subprocesses have a single stdio AMP connection that
+ # multiplexes between multiple protocols.
def __init__(self, connectionPool):
self.pool = connectionPool
@@ -1334,8 +1361,9 @@
("logID", String()),
("configFile", String()),
- ## same as in config; no need to propagate it
- # ("processCount", Integer()),
+ # computed value determined only in master, so needs to be propagated
+ # to be correct.
+ ("processCount", Integer()),
## only needed for request processing
# ("inheritFDs", ListOf(Integer())),
@@ -1353,24 +1381,37 @@
"""
@ConfigureChild.responder
- def conf(self, delegateTo, pidfile, logID, configFile, connectionPoolFD=None):
+ def conf(self, delegateTo, pidfile, logID, configFile, processCount,
+ connectionPoolFD=None):
"""
- Do the configuration.
+ Load the current config file into this child process, create a store
+ based on it, and delegate to the upgrade logic.
"""
- # This stuff needs to be done by somebody in caldavd.py
- from twistedcaldav.config import config
- from calendarserver.tap.util import getDBPool, storeFromConfig
+ # Load the configuration file.
config.load(configFile)
+
+ # Adjust the child's configuration to add all the relevant options for
+ # the store that won't be mentioned in the config file.
+ config.updateDefaults(dict(
+ LogID = logID,
+ PIDFile = pidfile,
+ DBAMPFD = connectionPoolFD,
+ MultiProcess = dict(
+ ProcessCount = processCount
+ )
+ ))
+
pool, txnf = getDBPool(config)
if pool is not None:
- from twisted.internet import reactor
pool.startService()
- reactor.addSystemEventTrigger("before", "shutdown",
- pool.stopService)
- # XXX: SharedConnectionPool needs to be relayed out of band, as
- # calendarserver.tap.caldav does with its own thing.
+ _reactor.addSystemEventTrigger(
+ "before", "shutdown", pool.stopService
+ )
dbstore = storeFromConfig(config, txnf)
- dbstore.setMigrating(True)
+ delegateClass = namedAny(delegateTo)
+ swapAMP(self, delegateClass(dbstore))
+ if connectionPoolFD is not None:
+ pass
return {}
@@ -1380,11 +1421,48 @@
L{StoreSpawnerService} that will load a full configuration into each child.
"""
+ def __init__(self, maker, options, dispenser):
+ """
+ Create a L{ConfiguredChildSpawner}.
+
+ @param maker: a L{CalDAVServiceMaker} instance that supplies the
+ configuration.
+
+ @param options: a L{CalDAVOptions} containing the command-line options
+ for the subprocess.
+
+ @param dispenser: a L{ConnectionDispenser} or C{None}.
+ """
+ self.nextID = 0
+ self.maker = maker
+ self.dispenser
+
+
+ @inlineCallbacks
def spawnWithStore(self, here, there):
"""
Spawn the child with a store based on a configuration.
"""
- return {}
+ thisID = self.nextID
+ self.nextID += 1
+ if self.dispenser is not None:
+ poolfd = self.dispenser.dispense()
+ childFDs = {poolfd: poolfd}
+ else:
+ childFDs = None
+ controller = yield self.spawn(
+ AMP(), ChildConfigurator, childFDs=childFDs
+ )
+ yield controller.callRemote(
+ ConfigureChild,
+ delegateTo=qual(there),
+ pidfile="%s-migrator-%s" % (self.maker.tapname,
+ thisID),
+ logID="migrator-%s" % (thisID,),
+ configFile=self.options['config'],
+ processCount=config.MultiProcess.processCount,
+ )
+ returnValue(swapAMP(controller, here))
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111203/44a49ca6/attachment-0001.html>
More information about the calendarserver-changes
mailing list