[CalendarServer-changes] [8393] CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/ tap
source_changes at macosforge.org
source_changes at macosforge.org
Sat Dec 3 00:39:23 PST 2011
Revision: 8393
http://trac.macosforge.org/projects/calendarserver/changeset/8393
Author: glyph at apple.com
Date: 2011-12-03 00:39:23 -0800 (Sat, 03 Dec 2011)
Log Message:
-----------
split out some functionality to prevent the tap module from more unwieldy
Modified Paths:
--------------
CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/caldav.py
CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/util.py
Added Paths:
-----------
CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/cfgchild.py
Modified: CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/caldav.py 2011-12-03 08:39:13 UTC (rev 8392)
+++ CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/caldav.py 2011-12-03 08:39:23 UTC (rev 8393)
@@ -39,18 +39,13 @@
from twisted.python.log import FileLogObserver, ILogObserver
from twisted.python.logfile import LogFile
from twisted.python.usage import Options, UsageError
-from twisted.python.reflect import namedAny, qual
from twisted.internet.defer import gatherResults, Deferred
-from twisted.internet.defer import inlineCallbacks, returnValue
-from twisted.internet import reactor as _reactor
from twisted.internet.process import ProcessExitedAlready
from twisted.internet.protocol import Protocol, Factory
from twisted.internet.protocol import ProcessProtocol
-from twisted.protocols.amp import AMP, Command, String, Integer#, ListOf
-
from twisted.application.internet import TCPServer, UNIXServer
from twisted.application.service import MultiService, IServiceMaker
from twisted.application.service import Service
@@ -65,13 +60,10 @@
from twext.web2.channel.http import LimitingHTTPFactory, SSLRedirectRequest
from twext.web2.metafd import ConnectionLimiter, ReportingHTTPService
-from txdav.common.datastore.upgrade.migrate import (
- UpgradeToDatabaseService, StoreSpawnerService, swapAMP
-)
-
from txdav.common.datastore.upgrade.sql.upgrade import (
UpgradeDatabaseSchemaService, UpgradeDatabaseDataService,
)
+from txdav.common.datastore.upgrade.migrate import UpgradeToDatabaseService
from twistedcaldav.config import ConfigurationError
from twistedcaldav.config import config
@@ -86,7 +78,6 @@
from twext.enterprise.ienterprise import POSTGRES_DIALECT
from twext.enterprise.ienterprise import ORACLE_DIALECT
from twext.enterprise.adbapi2 import ConnectionPool
-from twext.enterprise.adbapi2 import ConnectionPoolConnection
try:
from twistedcaldav.authkerb import NegotiateCredentialFactory
@@ -94,14 +85,17 @@
except ImportError:
NegotiateCredentialFactory = None
+from calendarserver.tap.util import ConnectionDispenser
+
from calendarserver.accesslog import AMPCommonAccessLoggingObserver
from calendarserver.accesslog import AMPLoggingFactory
from calendarserver.accesslog import RotatingFileAccessLoggingObserver
from calendarserver.tap.util import getRootResource, computeProcessCount
-from calendarserver.tap.util import ConnectionWithPeer
+
from calendarserver.tap.util import storeFromConfig
from calendarserver.tap.util import pgConnectorFromConfig
from calendarserver.tap.util import oracleConnectorFromConfig
+from calendarserver.tap.cfgchild import ConfiguredChildSpawner
from calendarserver.tools.util import checkDirectory
try:
@@ -926,7 +920,7 @@
PostDBImportService(config, store, mainService),
store, uid=overrideUID, gid=overrideGID,
spawner=ConfiguredChildSpawner(
- self, ConnectionDispenser(cp)
+ self, ConnectionDispenser(cp), config
),
parallel=config.MultiProcess.ProcessCount
),
@@ -1170,35 +1164,6 @@
-class ConnectionDispenser(object):
- """
- Object taht can dispense already-connected file descriptors, for use with
- subprocess spawning.
- """
- # Very long term FIXME: this mechanism should ideally be eliminated, by
- # making all subprocesses have a single stdio AMP connection that
- # multiplexes between multiple protocols.
-
- def __init__(self, connectionPool):
- self.pool = connectionPool
-
-
- def dispense(self):
- """
- Dispense a file descriptor, already connected to a server, for a
- client.
- """
- # FIXME: these sockets need to be re-dispensed when the process is
- # respawned, and they currently won't be.
- c, s = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM)
- protocol = ConnectionPoolConnection(self.pool)
- transport = ConnectionWithPeer(s, protocol)
- protocol.makeConnection(transport)
- transport.startReading()
- return c
-
-
-
class TwistdSlaveProcess(object):
"""
A L{TwistdSlaveProcess} is information about how to start a slave process
@@ -1342,128 +1307,7 @@
return args
-class ConfigureChild(Command):
- """
- Configure a child process, most especially with all the information that it
- needs in order to construct a data store.
- """
- arguments = [
- # The name of the class to delegate to once configuration is complete.
- ("delegateTo", String()),
- ("pidFile", String()),
- ("logID", String()),
- ("configFile", String()),
-
- # computed value determined only in master, so needs to be propagated
- # to be correct.
- ("processCount", Integer()),
-
- ## only needed for request processing, and we're not using this
- ## facility for that work (yet)
- # ("inheritFDs", ListOf(Integer())),
- # ("inheritSSLFDs", ListOf(Integer())),
- # ("metaFD", String(optional=True)),
-
- ## shared connection pool!
- ("connectionPoolFD", Integer(optional=True)),
- ]
-
-
-
-class ChildConfigurator(AMP):
- """
- Protocol which can configure a child process.
- """
-
- @ConfigureChild.responder
- def conf(self, delegateTo, pidFile, logID, configFile, processCount,
- connectionPoolFD=None):
- """
- Load the current config file into this child process, create a store
- based on it, and delegate to the upgrade logic.
- """
- # Load the configuration file.
- config.load(configFile)
-
- # Adjust the child's configuration to add all the relevant options for
- # the store that won't be mentioned in the config file.
- changedConfig = dict(
- LogID = logID,
- PIDFile = pidFile,
- MultiProcess = dict(
- ProcessCount = processCount
- )
- )
- if connectionPoolFD is not None:
- changedConfig.update(DBAMPFD=connectionPoolFD)
- config.updateDefaults(changedConfig)
-
- # Construct and start database pool and store.
- pool, txnf = getDBPool(config)
- if pool is not None:
- pool.startService()
- _reactor.addSystemEventTrigger(
- "before", "shutdown", pool.stopService
- )
- dbstore = storeFromConfig(config, txnf)
-
- # Finally, construct the class we're supposed to delegate to.
- delegateClass = namedAny(delegateTo)
- swapAMP(self, delegateClass(dbstore))
- return {}
-
-
-
-class ConfiguredChildSpawner(StoreSpawnerService):
- """
- L{StoreSpawnerService} that will load a full configuration into each child.
- """
-
- def __init__(self, maker, options, dispenser):
- """
- Create a L{ConfiguredChildSpawner}.
-
- @param maker: a L{CalDAVServiceMaker} instance that supplies the
- configuration.
-
- @param options: a L{CalDAVOptions} containing the command-line options
- for the subprocess.
-
- @param dispenser: a L{ConnectionDispenser} or C{None}.
- """
- self.nextID = 0
- self.maker = maker
- self.dispenser
-
-
- @inlineCallbacks
- def spawnWithStore(self, here, there):
- """
- Spawn the child with a store based on a configuration.
- """
- thisID = self.nextID
- self.nextID += 1
- if self.dispenser is not None:
- poolfd = self.dispenser.dispense()
- childFDs = {poolfd: poolfd}
- else:
- childFDs = None
- controller = yield self.spawn(
- AMP(), ChildConfigurator, childFDs=childFDs
- )
- yield controller.callRemote(
- ConfigureChild,
- delegateTo=qual(there),
- pidfile="%s-migrator-%s" % (self.maker.tapname, thisID),
- logID="migrator-%s" % (thisID,),
- configFile=self.options['config'],
- processCount=config.MultiProcess.processCount,
- )
- returnValue(swapAMP(controller, here))
-
-
-
class ControlPortTCPServer(TCPServer):
""" This TCPServer retrieves the port number that was actually assigned
when the service was started, and stores that into config.ControlPort
@@ -1503,8 +1347,10 @@
minRestartDelay = 1
maxRestartDelay = 3600
- def __init__(self, reactor=_reactor):
+ def __init__(self, reactor=None):
super(DelayedStartupProcessMonitor, self).__init__()
+ if reactor is None:
+ from twisted.internet import reactor
self._reactor = reactor
self.processes = {}
self.protocols = {}
Added: CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/cfgchild.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/cfgchild.py (rev 0)
+++ CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/cfgchild.py 2011-12-03 08:39:23 UTC (rev 8393)
@@ -0,0 +1,172 @@
+##
+# Copyright (c) 2005-2011 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tools for spawning general-purpose child processes that have a store devrived
+from a .
+"""
+
+__all__ = [
+ # Only the spawner service is really interesting; the other parts are
+ # internal implementation details which shouldn't be needed outside this
+ # file.
+ 'ConfiguredChildSpawner',
+]
+
+from twisted.python.reflect import namedAny, qual
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.protocols.amp import AMP, Command, String, Integer#, ListOf
+from txdav.common.datastore.upgrade.migrate import (
+ StoreSpawnerService, swapAMP
+)
+
+from calendarserver.tap.util import getDBPool, storeFromConfig
+
+class ConfigureChild(Command):
+ """
+ Configure a child process, most especially with all the information that it
+ needs in order to construct a data store.
+ """
+
+ arguments = [
+ # The name of the class to delegate to once configuration is complete.
+ ("delegateTo", String()),
+ ("pidFile", String()),
+ ("logID", String()),
+ ("configFile", String()),
+
+ # computed value determined only in master, so needs to be propagated
+ # to be correct.
+ ("processCount", Integer()),
+
+ ## only needed for request processing, and we're not using this
+ ## facility for that work (yet)
+ # ("inheritFDs", ListOf(Integer())),
+ # ("inheritSSLFDs", ListOf(Integer())),
+ # ("metaFD", String(optional=True)),
+
+ ## shared connection pool!
+ ("connectionPoolFD", Integer(optional=True)),
+ ]
+
+
+
+class ChildConfigurator(AMP):
+ """
+ Protocol which can configure a child process.
+ """
+
+ def __init__(self, config=None):
+ """
+ Optionally accept a configuration for testing, but normally created in
+ the subprocess configuration-free.
+ """
+ super(AMP, self).__init__()
+ if config is None:
+ from twistedcaldav.config import config
+ self.config = config
+
+
+ @ConfigureChild.responder
+ def conf(self, delegateTo, pidFile, logID, configFile, processCount,
+ connectionPoolFD=None):
+ """
+ Load the current config file into this child process, create a store
+ based on it, and delegate to the upgrade logic.
+ """
+ # Load the configuration file.
+ self.config.load(configFile)
+
+ # Adjust the child's configuration to add all the relevant options for
+ # the store that won't be mentioned in the config file.
+ changedConfig = dict(
+ LogID = logID,
+ PIDFile = pidFile,
+ MultiProcess = dict(
+ ProcessCount = processCount
+ )
+ )
+ if connectionPoolFD is not None:
+ changedConfig.update(DBAMPFD=connectionPoolFD)
+ self.config.updateDefaults(changedConfig)
+
+ # Construct and start database pool and store.
+ pool, txnf = getDBPool(self.config)
+ if pool is not None:
+ from twisted.internet import reactor
+ pool.startService()
+ reactor.addSystemEventTrigger(
+ "before", "shutdown", pool.stopService
+ )
+ dbstore = storeFromConfig(self.config, txnf)
+
+ # Finally, construct the class we're supposed to delegate to.
+ delegateClass = namedAny(delegateTo)
+ swapAMP(self, delegateClass(dbstore))
+ return {}
+
+
+
+class ConfiguredChildSpawner(StoreSpawnerService):
+ """
+ L{StoreSpawnerService} that will load a full configuration into each child.
+ """
+
+ def __init__(self, maker, dispenser, config):
+ """
+ Create a L{ConfiguredChildSpawner}.
+
+ @param maker: a L{CalDAVServiceMaker} instance that supplies the
+ configuration.
+
+ @param dispenser: a L{calendarserver.tap.ConnectionDispenser} or C{None}.
+
+ @param config: the L{twistedcaldav.config.Config} to use to configure
+ the subprocess.
+ """
+ self.nextID = 0
+ self.maker = maker
+ self.dispenser = dispenser
+ self.config = config
+
+
+ @inlineCallbacks
+ def spawnWithStore(self, here, there):
+ """
+ Spawn the child with a store based on a configuration.
+ """
+ thisID = self.nextID
+ self.nextID += 1
+ if self.dispenser is not None:
+ poolfd = self.dispenser.dispense()
+ childFDs = {poolfd: poolfd}
+ else:
+ childFDs = None
+ controller = yield self.spawn(
+ AMP(), ChildConfigurator, childFDs=childFDs
+ )
+ yield controller.callRemote(
+ ConfigureChild,
+ delegateTo=qual(there),
+ pidfile="%s-migrator-%s" % (self.maker.tapname, thisID),
+ logID="migrator-%s" % (thisID,),
+ configFile=self.config.getProvider().getConfigFileName(),
+ processCount=self.config.MultiProcess.processCount,
+ )
+ returnValue(swapAMP(controller, here))
+
+
+
Modified: CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/util.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/util.py 2011-12-03 08:39:13 UTC (rev 8392)
+++ CalendarServer/branches/users/glyph/parallel-upgrade/calendarserver/tap/util.py 2011-12-03 08:39:23 UTC (rev 8393)
@@ -28,7 +28,7 @@
import errno
import os
from time import sleep
-from socket import fromfd, AF_UNIX, SOCK_STREAM
+from socket import fromfd, AF_UNIX, SOCK_STREAM, socketpair
from twext.python.filepath import CachingFilePath as FilePath
from twext.python.log import Logger
@@ -67,7 +67,7 @@
from twistedcaldav.util import getMemorySize, getNCPU
from twext.enterprise.ienterprise import POSTGRES_DIALECT
from twext.enterprise.ienterprise import ORACLE_DIALECT
-from twext.enterprise.adbapi2 import ConnectionPool
+from twext.enterprise.adbapi2 import ConnectionPool, ConnectionPoolConnection
try:
from twistedcaldav.authkerb import NegotiateCredentialFactory
@@ -163,7 +163,8 @@
def transactionFactoryFromFD(dbampfd, dialect, paramstyle):
"""
- Create a transaction factory from an inherited file descriptor.
+ Create a transaction factory from an inherited file descriptor, such as one
+ created by L{ConnectionDispenser}.
"""
skt = fromfd(dbampfd, AF_UNIX, SOCK_STREAM)
os.close(dbampfd)
@@ -175,6 +176,35 @@
+class ConnectionDispenser(object):
+ """
+ A L{ConnectionDispenser} can dispense already-connected file descriptors,
+ for use with subprocess spawning.
+ """
+ # Very long term FIXME: this mechanism should ideally be eliminated, by
+ # making all subprocesses have a single stdio AMP connection that
+ # multiplexes between multiple protocols.
+
+ def __init__(self, connectionPool):
+ self.pool = connectionPool
+
+
+ def dispense(self):
+ """
+ Dispense a socket object, already connected to a server, for a client
+ in a subprocess.
+ """
+ # FIXME: these sockets need to be re-dispensed when the process is
+ # respawned, and they currently won't be.
+ c, s = socketpair(AF_UNIX, SOCK_STREAM)
+ protocol = ConnectionPoolConnection(self.pool)
+ transport = ConnectionWithPeer(s, protocol)
+ protocol.makeConnection(transport)
+ transport.startReading()
+ return c
+
+
+
def storeFromConfig(config, txnFactory):
"""
Produce an L{IDataStore} from the given configuration, transaction factory,
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111203/acf2ad08/attachment-0001.html>
More information about the calendarserver-changes
mailing list