[CalendarServer-changes] [11109] CalendarServer/trunk
source_changes at macosforge.org
source_changes at macosforge.org
Fri Apr 26 20:20:20 PDT 2013
Revision: 11109
http://trac.calendarserver.org//changeset/11109
Author: sagen at apple.com
Date: 2013-04-26 20:20:20 -0700 (Fri, 26 Apr 2013)
Log Message:
-----------
Clean up the upgrade code, changing from wrapped Services to sequentially executed steps
Modified Paths:
--------------
CalendarServer/trunk/bin/caldavd
CalendarServer/trunk/calendarserver/tap/caldav.py
CalendarServer/trunk/calendarserver/tap/test/test_caldav.py
CalendarServer/trunk/calendarserver/tap/test/test_util.py
CalendarServer/trunk/calendarserver/tap/util.py
CalendarServer/trunk/calendarserver/tools/ampnotifications.py
CalendarServer/trunk/calendarserver/tools/cmdline.py
CalendarServer/trunk/calendarserver/tools/gateway.py
CalendarServer/trunk/calendarserver/tools/principals.py
CalendarServer/trunk/calendarserver/tools/purge.py
CalendarServer/trunk/calendarserver/tools/push.py
CalendarServer/trunk/twistedcaldav/stdconfig.py
CalendarServer/trunk/twistedcaldav/test/test_upgrade.py
CalendarServer/trunk/twistedcaldav/upgrade.py
CalendarServer/trunk/txdav/common/datastore/upgrade/migrate.py
CalendarServer/trunk/txdav/common/datastore/upgrade/sql/others/test/test_attachment_migration.py
CalendarServer/trunk/txdav/common/datastore/upgrade/sql/test/test_upgrade.py
CalendarServer/trunk/txdav/common/datastore/upgrade/sql/upgrade.py
CalendarServer/trunk/txdav/common/datastore/upgrade/test/test_migrate.py
Modified: CalendarServer/trunk/bin/caldavd
===================================================================
--- CalendarServer/trunk/bin/caldavd 2013-04-27 02:33:05 UTC (rev 11108)
+++ CalendarServer/trunk/bin/caldavd 2013-04-27 03:20:20 UTC (rev 11109)
@@ -94,12 +94,15 @@
echo " -t Process type (Master, Slave or Combined)";
echo " -p Path to the desired pstats file.";
echo " -R The Twisted Reactor to run [${reactor}]";
+ echo " -o Pass option through to server";
if [ "${1-}" == "-" ]; then return 0; fi;
exit 64;
}
-while getopts 'hXLu:g:f:T:P:t:p:R:' option; do
+extra="-o FailIfUpgradeNeeded=False";
+
+while getopts 'hXLu:g:f:T:P:t:p:R:o:' option; do
case "${option}" in
'?') usage; ;;
'h') usage -; exit 0; ;;
@@ -114,6 +117,7 @@
'p') twistd_profile="--profiler=cprofile-cpu --profile=${OPTARG}/master.pstats --savestats";
profile="-o Profiling/Enabled=True -o Profiling/BaseDirectory=${OPTARG}"; ;;
'R') twistd_reactor="--reactor=${OPTARG}"; child_reactor="-o Twisted/reactor=${OPTARG}"; ;;
+ 'o') extra="${extra} -o ${OPTARG}"; ;;
esac;
done;
@@ -125,6 +129,5 @@
export PYTHONPATH
-extra="-o FailIfUpgradeNeeded=False";
exec "${python}" "${twistdpath}" ${twistd_profile} ${twistd_reactor} ${daemonize} ${username} ${groupname} "${plugin_name}" ${configfile} ${service_type} ${errorlogenabled} ${profile} ${child_reactor} ${extra};
Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py 2013-04-27 02:33:05 UTC (rev 11108)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py 2013-04-27 03:20:20 UTC (rev 11109)
@@ -43,7 +43,7 @@
from twisted.python.logfile import LogFile
from twisted.python.usage import Options, UsageError
-from twisted.internet.defer import gatherResults, Deferred, inlineCallbacks
+from twisted.internet.defer import gatherResults, Deferred, inlineCallbacks, succeed
from twisted.internet.process import ProcessExitedAlready
from twisted.internet.protocol import Protocol, Factory
@@ -66,18 +66,19 @@
from txdav.common.datastore.sql_tables import schema
from txdav.common.datastore.upgrade.sql.upgrade import (
- UpgradeDatabaseSchemaService, UpgradeDatabaseDataService, UpgradeDatabaseOtherService,
+ UpgradeDatabaseSchemaStep, UpgradeDatabaseDataStep, UpgradeDatabaseOtherStep,
)
-from txdav.common.datastore.upgrade.migrate import UpgradeToDatabaseService
+from txdav.common.datastore.upgrade.migrate import UpgradeToDatabaseStep
from twistedcaldav.directory import calendaruserproxy
from twistedcaldav.directory.directory import GroupMembershipCacheUpdater
from twistedcaldav.localization import processLocalizationFiles
from twistedcaldav import memcachepool
-from twistedcaldav.upgrade import UpgradeFileSystemFormatService, PostDBImportService
+from twistedcaldav.upgrade import UpgradeFileSystemFormatStep, PostDBImportStep
from calendarserver.tap.util import pgServiceFromConfig, getDBPool, MemoryLimitService
from calendarserver.tap.util import directoryFromConfig, checkDirectories
+from calendarserver.tap.util import Stepper
from twext.enterprise.ienterprise import POSTGRES_DIALECT
from twext.enterprise.ienterprise import ORACLE_DIALECT
@@ -105,7 +106,6 @@
from calendarserver.tap.util import storeFromConfig
from calendarserver.tap.util import pgConnectorFromConfig
from calendarserver.tap.util import oracleConnectorFromConfig
-from calendarserver.tap.cfgchild import ConfiguredChildSpawner
from calendarserver.push.notifier import PushDistributor
from calendarserver.push.amppush import AMPPushMaster, AMPPushForwarder
from calendarserver.push.applepush import ApplePushNotifierService
@@ -542,7 +542,123 @@
MultiService.stopService(self)
+class PreProcessingService(Service):
+ """
+ A Service responsible for running any work that needs to be finished prior
+ to the main service starting. Once that work is done, it instantiates the
+ main service and adds it to the Service hierarchy (specifically to its
+ parent). If the final work step does not return a Failure, that is an
+ indication the store is ready and it is passed to the main service.
+ Otherwise, None is passed to the main service in place of a store. This
+ is mostly useful in the case of command line utilities that need to do
+ something different if the store is not available (e.g. utilities that
+ aren't allowed to upgrade the database).
+ """
+ def __init__(self, serviceCreator, connectionPool, store, logObserver,
+ reactor=None):
+ """
+ @param serviceCreator: callable which will be passed the connection
+ pool, store, and log observer, and should return a Service
+ @param connectionPool: connection pool to pass to serviceCreator
+ @param store: the store object being processed
+ @param logObserver: log observer to pass to serviceCreator
+ """
+ self.serviceCreator = serviceCreator
+ self.connectionPool = connectionPool
+ self.store = store
+ self.logObserver = logObserver
+ self.stepper = Stepper()
+
+ if reactor is None:
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+ def stepWithResult(self, result):
+ """
+ The final "step"; if we get here we know our store is ready, so
+ we create the main service and pass in the store.
+ """
+ service = self.serviceCreator(self.connectionPool, self.store,
+ self.logObserver)
+ if self.parent is not None:
+ self.reactor.callLater(0, service.setServiceParent, self.parent)
+ return succeed(None)
+
+ def stepWithFailure(self, failure):
+ """
+ The final "step", but if we get here we know our store is not ready,
+ so we create the main service and pass in a None for the store.
+ """
+ try:
+ service = self.serviceCreator(self.connectionPool, None,
+ self.logObserver)
+ if self.parent is not None:
+ self.reactor.callLater(0, service.setServiceParent, self.parent)
+ except StoreNotAvailable:
+ self.reactor.stop()
+
+ return succeed(None)
+
+ def addStep(self, step):
+ """
+ Hand the step to our Stepper
+
+ @param step: an object implementing stepWithResult( )
+ """
+ self.stepper.addStep(step)
+ return self
+
+ def startService(self):
+ """
+ Add ourself as the final step, and then tell the coordinator to start
+ working on each step one at a time.
+ """
+ self.addStep(self)
+ self.stepper.start()
+
+
+class PostUpgradeStopRequested(Exception):
+ """
+ Raised when we've been asked to stop just after upgrade has completed.
+ """
+
+class StoreNotAvailable(Exception):
+ """
+ Raised when we want to give up because the store is not available
+ """
+
+class QuitAfterUpgradeStep(object):
+
+ def __init__(self, triggerFile, reactor=None):
+ self.triggerFile = triggerFile
+ if reactor is None:
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+ def removeTriggerFile(self):
+ try:
+ os.remove(self.triggerFile)
+ except OSError:
+ pass
+
+ def stepWithResult(self, result):
+ if os.path.exists(self.triggerFile):
+ self.removeTriggerFile()
+ self.reactor.stop()
+ raise PostUpgradeStopRequested()
+ else:
+ return succeed(result)
+
+ def stepWithFailure(self, failure):
+ if os.path.exists(self.triggerFile):
+ self.removeTriggerFile()
+ self.reactor.stop()
+ raise PostUpgradeStopRequested()
+ else:
+ return failure
+
+
class CalDAVServiceMaker (LoggingMixIn):
implements(IPlugin, IServiceMaker)
@@ -957,31 +1073,12 @@
configuration. Memcached will be spawned automatically.
"""
def slaveSvcCreator(pool, store, logObserver):
+
+ if store is None:
+ raise StoreNotAvailable()
+
result = self.requestProcessingService(options, store, logObserver)
- # Optionally launch memcached. Note, this is not going through a
- # ProcessMonitor because there is code elsewhere that needs to
- # access memcached before startService() gets called, so we're just
- # directly using Popen to spawn memcached.
- for name, pool in config.Memcached.Pools.items():
- if pool.ServerEnabled:
- self.log_info(
- "Adding memcached service for pool: %s" % (name,)
- )
- memcachedArgv = [
- config.Memcached.memcached,
- "-p", str(pool.Port),
- "-l", pool.BindAddress,
- "-U", "0",
- ]
- if config.Memcached.MaxMemory is not 0:
- memcachedArgv.extend(
- ["-m", str(config.Memcached.MaxMemory)]
- )
- if config.UserName:
- memcachedArgv.extend(["-u", config.UserName])
- memcachedArgv.extend(config.Memcached.Options)
- Popen(memcachedArgv)
# Optionally set up push notifications
pushDistributor = None
@@ -1043,6 +1140,30 @@
config.AccessLogFile,
)
+ # Optionally launch memcached. Note, this is not going through a
+ # ProcessMonitor because there is code elsewhere that needs to
+ # access memcached before startService() gets called, so we're just
+ # directly using Popen to spawn memcached.
+ for name, pool in config.Memcached.Pools.items():
+ if pool.ServerEnabled:
+ self.log_info(
+ "Adding memcached service for pool: %s" % (name,)
+ )
+ memcachedArgv = [
+ config.Memcached.memcached,
+ "-p", str(pool.Port),
+ "-l", pool.BindAddress,
+ "-U", "0",
+ ]
+ if config.Memcached.MaxMemory is not 0:
+ memcachedArgv.extend(
+ ["-m", str(config.Memcached.MaxMemory)]
+ )
+ if config.UserName:
+ memcachedArgv.extend(["-u", config.UserName])
+ memcachedArgv.extend(config.Memcached.Options)
+ Popen(memcachedArgv)
+
return self.storageService(slaveSvcCreator, logObserver, uid=uid, gid=gid)
@@ -1076,7 +1197,7 @@
not require any particular setup, then this may return the
C{mainService} argument.
- @type mainService: C{callable} that takes C{(connectionPool, store)}
+ @type createMainService: C{callable} that takes C{(connectionPool, store)}
and returns L{IService}
@param uid: the user ID to run the backend as, if this process is
@@ -1100,42 +1221,62 @@
maxConnections=config.MaxDBConnectionsPerPool)
cp.setServiceParent(ms)
store = storeFromConfig(config, cp.connection)
- mainService = createMainService(cp, store, logObserver)
- if config.SharedConnectionPool:
- dispenser = ConnectionDispenser(cp)
- else:
- dispenser = None
- if config.ParallelUpgrades:
- parallel = config.MultiProcess.ProcessCount
- else:
- parallel = 0
- spawner = ConfiguredChildSpawner(self, dispenser, config)
- if getattr(self, "doPostImport", True):
- postImport = PostDBImportService(config, store, mainService)
- else:
- postImport = mainService
- upgradeSvc = UpgradeFileSystemFormatService(
- config, spawner, parallel,
- UpgradeDatabaseSchemaService.wrapService(
- UpgradeDatabaseDataService.wrapService(
- UpgradeToDatabaseService.wrapService(
- CachingFilePath(config.DocumentRoot),
- UpgradeDatabaseOtherService.wrapService(
- postImport,
- store, uid=overrideUID, gid=overrideGID,
- ),
- store, uid=overrideUID, gid=overrideGID,
- spawner=spawner, merge=config.MergeUpgrades,
- parallel=parallel
- ),
- store, uid=overrideUID, gid=overrideGID,
+
+ pps = PreProcessingService(createMainService, cp, store,
+ logObserver)
+
+ # The following "steps" will run sequentially when the service
+ # hierarchy is started. If any of the steps raise an exception
+ # the subsequent steps' stepWithFailure methods will be called
+ # instead, until one of them returns a non-Failure.
+
+ # Still need this for Snow Leopard support
+ pps.addStep(
+ UpgradeFileSystemFormatStep(config)
+ )
+
+ pps.addStep(
+ UpgradeDatabaseSchemaStep(
+ store, uid=overrideUID, gid=overrideGID,
+ failIfUpgradeNeeded=config.FailIfUpgradeNeeded
+ )
+ )
+
+ pps.addStep(
+ UpgradeDatabaseDataStep(
+ store, uid=overrideUID, gid=overrideGID
+ )
+ )
+
+ pps.addStep(
+ UpgradeToDatabaseStep(
+ UpgradeToDatabaseStep.fileStoreFromPath(
+ CachingFilePath(config.DocumentRoot)
),
store, uid=overrideUID, gid=overrideGID,
- failIfUpgradeNeeded=config.FailIfUpgradeNeeded,
+ merge=config.MergeUpgrades
)
)
- upgradeSvc.setServiceParent(ms)
+
+ pps.addStep(
+ UpgradeDatabaseOtherStep(
+ store, uid=overrideUID, gid=overrideGID
+ )
+ )
+
+ # Conditionally stop after upgrade at this point
+ pps.addStep(
+ QuitAfterUpgradeStep(config.StopAfterUpgradeTriggerFile)
+ )
+
+ pps.addStep(
+ PostDBImportStep(store, config,
+ getattr(self, "doPostImport", True)
+ )
+ )
+ pps.setServiceParent(ms)
return ms
+
return subServiceFactory
# FIXME: this is replicating the logic of getDBPool(), except for the
@@ -1359,6 +1500,9 @@
# filesystem to the database (if that's necessary, and there is
# filesystem data in need of upgrading).
def spawnerSvcCreator(pool, store, ignored):
+ if store is None:
+ raise StoreNotAvailable()
+
from twisted.internet import reactor
pool = PeerConnectionPool(reactor, store.newTransaction,
7654, schema)
Modified: CalendarServer/trunk/calendarserver/tap/test/test_caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/test/test_caldav.py 2013-04-27 02:33:05 UTC (rev 11108)
+++ CalendarServer/trunk/calendarserver/tap/test/test_caldav.py 2013-04-27 03:20:20 UTC (rev 11109)
@@ -33,7 +33,7 @@
from twisted.internet.interfaces import IProcessTransport, IReactorProcess
from twisted.internet.protocol import ServerFactory
-from twisted.internet.defer import Deferred, inlineCallbacks, passthru
+from twisted.internet.defer import Deferred, inlineCallbacks, passthru, succeed
from twisted.internet.task import Clock
from twisted.internet import reactor
@@ -60,7 +60,8 @@
from calendarserver.tap.caldav import (
CalDAVOptions, CalDAVServiceMaker, CalDAVService, GroupOwnedUNIXServer,
DelayedStartupProcessMonitor, DelayedStartupLineLogger, TwistdSlaveProcess,
- _CONTROL_SERVICE_NAME, getSystemIDs
+ _CONTROL_SERVICE_NAME, getSystemIDs, PreProcessingService,
+ QuitAfterUpgradeStep
)
from calendarserver.provision.root import RootResource
from twext.enterprise.queue import PeerConnectionPool, LocalQueuer
@@ -335,8 +336,6 @@
self.config.Memcached.Pools.Default.ServerEnabled = False
self.config.DirectoryAddressBook.Enabled = False
- self.config.SudoersFile = ""
-
if self.configOptions:
self.config.update(self.configOptions)
@@ -857,22 +856,6 @@
))
-sudoersFile = """<?xml version="1.0" encoding="UTF-8"?>
-<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
-<plist version="1.0">
-<dict>
- <key>users</key>
- <array>
- <dict>
- <key>password</key>
- <string>superuser</string>
- <key>username</key>
- <string>superuser</string>
- </dict>
- </array>
-</dict>
-</plist>
-"""
class DirectoryServiceTest(BaseServiceMakerTests):
"""
@@ -1308,3 +1291,122 @@
If names are provided, use the IDs corresponding to those names
"""
self.assertEquals(self._wrappedFunction()("exists", "exists"), (42, 43))
+
+
+#
+# Tests for PreProcessingService
+#
+
+class Step(object):
+
+ def __init__(self, recordCallback, shouldFail):
+ self._recordCallback = recordCallback
+ self._shouldFail = shouldFail
+
+ def stepWithResult(self, result):
+ self._recordCallback(self.successValue, None)
+ if self._shouldFail:
+ 1/0
+ return succeed(result)
+
+ def stepWithFailure(self, failure):
+ self._recordCallback(self.errorValue, failure)
+ if self._shouldFail:
+ return failure
+
+class StepOne(Step):
+ successValue = "one success"
+ errorValue = "one failure"
+
+class StepTwo(Step):
+ successValue = "two success"
+ errorValue = "two failure"
+
+class StepThree(Step):
+ successValue = "three success"
+ errorValue = "three failure"
+
+class StepFour(Step):
+ successValue = "four success"
+ errorValue = "four failure"
+
+class PreProcessingServiceTestCase(TestCase):
+
+ def fakeServiceCreator(self, cp, store, lo):
+ self.history.append(("serviceCreator", store))
+
+ def setUp(self):
+ self.history = []
+ self.clock = Clock()
+ self.pps = PreProcessingService(self.fakeServiceCreator, None, "store",
+ None, reactor=self.clock)
+
+ def _record(self, value, failure):
+ self.history.append(value)
+
+ def test_allSuccess(self):
+ self.pps.addStep(
+ StepOne(self._record, False)
+ ).addStep(
+ StepTwo(self._record, False)
+ ).addStep(
+ StepThree(self._record, False)
+ ).addStep(
+ StepFour(self._record, False)
+ )
+ self.pps.startService()
+ self.assertEquals(self.history,
+ ['one success', 'two success', 'three success', 'four success',
+ ('serviceCreator', 'store')])
+
+ def test_allFailure(self):
+ self.pps.addStep(
+ StepOne(self._record, True)
+ ).addStep(
+ StepTwo(self._record, True)
+ ).addStep(
+ StepThree(self._record, True)
+ ).addStep(
+ StepFour(self._record, True)
+ )
+ self.pps.startService()
+ self.assertEquals(self.history,
+ ['one success', 'two failure', 'three failure', 'four failure',
+ ('serviceCreator', None)])
+
+ def test_partialFailure(self):
+ self.pps.addStep(
+ StepOne(self._record, True)
+ ).addStep(
+ StepTwo(self._record, False)
+ ).addStep(
+ StepThree(self._record, True)
+ ).addStep(
+ StepFour(self._record, False)
+ )
+ self.pps.startService()
+ self.assertEquals(self.history,
+ ['one success', 'two failure', 'three success', 'four failure',
+ ('serviceCreator', 'store')])
+
+ def test_quitAfterUpgradeStep(self):
+ triggerFileName = "stop_after_upgrade"
+ triggerFile = FilePath(triggerFileName)
+ self.pps.addStep(
+ StepOne(self._record, False)
+ ).addStep(
+ StepTwo(self._record, False)
+ ).addStep(
+ QuitAfterUpgradeStep(triggerFile.path, reactor=self.clock)
+ ).addStep(
+ StepFour(self._record, True)
+ )
+ triggerFile.setContent("")
+ self.pps.startService()
+ self.assertEquals(self.history,
+ ['one success', 'two success', 'four failure',
+ ('serviceCreator', None)])
+ self.assertFalse(triggerFile.exists())
+
+
+
Modified: CalendarServer/trunk/calendarserver/tap/test/test_util.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/test/test_util.py 2013-04-27 02:33:05 UTC (rev 11108)
+++ CalendarServer/trunk/calendarserver/tap/test/test_util.py 2013-04-27 03:20:20 UTC (rev 11109)
@@ -14,12 +14,13 @@
# limitations under the License.
##
-from calendarserver.tap.util import directoryFromConfig, MemoryLimitService
+from calendarserver.tap.util import directoryFromConfig, MemoryLimitService, Stepper
from twistedcaldav.util import computeProcessCount
from twistedcaldav.test.util import TestCase
from twistedcaldav.config import config
from twistedcaldav.directory.augment import AugmentXMLDB
from twisted.internet.task import Clock
+from twisted.internet.defer import succeed, inlineCallbacks
class ProcessCountTestCase(TestCase):
@@ -141,3 +142,91 @@
processMonitor.history = []
clock.advance(10)
self.assertEquals(processMonitor.history, ['process #1', 'process #2', 'process #3'])
+
+
+
+#
+# Tests for Stepper
+#
+
+class Step(object):
+
+ def __init__(self, recordCallback, shouldFail):
+ self._recordCallback = recordCallback
+ self._shouldFail = shouldFail
+
+ def stepWithResult(self, result):
+ self._recordCallback(self.successValue, None)
+ if self._shouldFail:
+ 1/0
+ return succeed(result)
+
+ def stepWithFailure(self, failure):
+ self._recordCallback(self.errorValue, failure)
+ if self._shouldFail:
+ return failure
+
+class StepOne(Step):
+ successValue = "one success"
+ errorValue = "one failure"
+
+class StepTwo(Step):
+ successValue = "two success"
+ errorValue = "two failure"
+
+class StepThree(Step):
+ successValue = "three success"
+ errorValue = "three failure"
+
+class StepFour(Step):
+ successValue = "four success"
+ errorValue = "four failure"
+
+ # def stepWithFailure(self, failure):
+ # Step.stepWithFailure(self, failure)
+ # return succeed(None)
+
+class StepperTestCase(TestCase):
+
+ def setUp(self):
+ self.history = []
+ self.stepper = Stepper()
+
+ def _record(self, value, failure):
+ self.history.append(value)
+
+ @inlineCallbacks
+ def test_allSuccess(self):
+ self.stepper.addStep(
+ StepOne(self._record, False)
+ ).addStep(
+ StepTwo(self._record, False)
+ ).addStep(
+ StepThree(self._record, False)
+ ).addStep(
+ StepFour(self._record, False)
+ )
+ result = (yield self.stepper.start("abc"))
+ self.assertEquals(result, "abc") # original result passed through
+ self.assertEquals(self.history,
+ ['one success', 'two success', 'three success', 'four success'])
+
+ def test_allFailure(self):
+ self.stepper.addStep(StepOne(self._record, True))
+ self.stepper.addStep(StepTwo(self._record, True))
+ self.stepper.addStep(StepThree(self._record, True))
+ self.stepper.addStep(StepFour(self._record, True))
+ self.failUnlessFailure(self.stepper.start(), ZeroDivisionError)
+ self.assertEquals(self.history,
+ ['one success', 'two failure', 'three failure', 'four failure'])
+
+ @inlineCallbacks
+ def test_partialFailure(self):
+ self.stepper.addStep(StepOne(self._record, True))
+ self.stepper.addStep(StepTwo(self._record, False))
+ self.stepper.addStep(StepThree(self._record, True))
+ self.stepper.addStep(StepFour(self._record, False))
+ result = (yield self.stepper.start("abc"))
+ self.assertEquals(result, None) # original result is gone
+ self.assertEquals(self.history,
+ ['one success', 'two failure', 'three success', 'four failure'])
Modified: CalendarServer/trunk/calendarserver/tap/util.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/util.py 2013-04-27 02:33:05 UTC (rev 11108)
+++ CalendarServer/trunk/calendarserver/tap/util.py 2013-04-27 03:20:20 UTC (rev 11109)
@@ -41,11 +41,12 @@
from twisted.application.service import Service
from twisted.cred.portal import Portal
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, succeed
from twisted.internet import reactor as _reactor
from twisted.internet.reactor import addSystemEventTrigger
from twisted.internet.tcp import Connection
from twisted.python.reflect import namedClass
+# from twisted.python.failure import Failure
from twistedcaldav.bind import doBind
from twistedcaldav.directory import calendaruserproxy
@@ -987,3 +988,105 @@
access=os.W_OK,
create=(0770, config.UserName, config.GroupName),
)
+
+
+
+
+class Stepper(object):
+ """
+ Manages the sequential, deferred execution of "steps" which are objects
+ implementing these methods:
+
+ - stepWithResult(result)
+ @param result: the result returned from the previous step
+ @returns: Deferred
+
+ - stepWithFailure(failure)
+ @param failure: a Failure encapsulating the exception from the
+ previous step
+ @returns: Failure to continue down the errback chain, or a
+ Deferred returning a non-Failure to switch back to the
+ callback chain
+
+ "Step" objects are added in order by calling addStep(), and when start()
+ is called, the Stepper will call the stepWithResult() of the first step.
+ If stepWithResult() doesn't raise an Exception, the Stepper will call the
+ next step's stepWithResult(). If a stepWithResult() raises an Exception,
+ the Stepper will call the next step's stepWithFailure() -- if it's
+ implemented -- passing it a Failure object. If the stepWithFailure()
+ decides it can handle the Failure and proceed, it can return a non-Failure
+ which is an indicator to the Stepper to call the next step's
+ stepWithResult().
+
+ TODO: Create an IStep interface (?)
+ """
+
+ def __init__(self):
+ self.steps = []
+ self.failure = None
+ self.result = None
+ self.running = False
+
+ def addStep(self, step):
+ """
+ Adds a step object to the ordered list of steps
+
+ @param step: the object to add
+ @type step: an object implementing stepWithResult()
+
+ @return: the Stepper object itself so addStep() calls can be chained
+ """
+ if self.running:
+ raise RuntimeError("Can't add step after start")
+ self.steps.append(step)
+ return self
+
+ def defaultStepWithResult(self, result):
+ return succeed(result)
+
+ def defaultStepWithFailure(self, failure):
+ log.warn(failure)
+ return failure
+
+ # def protectStep(self, callback):
+ # def _protected(result):
+ # try:
+ # return callback(result)
+ # except Exception, e:
+ # # TODO: how to turn Exception into Failure
+ # return Failure()
+ # return _protected
+
+ def start(self, result=None):
+ """
+ Begin executing the added steps in sequence. If a step object
+ does not implement a stepWithResult/stepWithFailure method, a
+ default implementation will be used.
+
+ @param result: an optional value to pass to the first step
+ @return: the Deferred that will fire when steps are done
+ """
+ self.running = True
+ self.deferred = Deferred()
+
+ for step in self.steps:
+
+ # See if we need to use a default implementation of the step methods:
+ if hasattr(step, "stepWithResult"):
+ callBack = step.stepWithResult
+ # callBack = self.protectStep(step.stepWithResult)
+ else:
+ callBack = self.defaultStepWithResult
+ if hasattr(step, "stepWithFailure"):
+ errBack = step.stepWithFailure
+ else:
+ errBack = self.defaultStepWithFailure
+
+ # Add callbacks to the Deferred
+ self.deferred.addCallbacks(callBack, errBack)
+
+ # Get things going
+ self.deferred.callback(result)
+
+ return self.deferred
+
Modified: CalendarServer/trunk/calendarserver/tools/ampnotifications.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/ampnotifications.py 2013-04-27 02:33:05 UTC (rev 11108)
+++ CalendarServer/trunk/calendarserver/tools/ampnotifications.py 2013-04-27 03:20:20 UTC (rev 11109)
@@ -58,7 +58,7 @@
class WorkerService(Service):
def __init__(self, store):
- self._store = store
+ self.store = store
@inlineCallbacks
Modified: CalendarServer/trunk/calendarserver/tools/cmdline.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/cmdline.py 2013-04-27 02:33:05 UTC (rev 11108)
+++ CalendarServer/trunk/calendarserver/tools/cmdline.py 2013-04-27 03:20:20 UTC (rev 11109)
@@ -25,11 +25,11 @@
from twext.python.log import StandardIOObserver
from twistedcaldav.config import ConfigurationError
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import inlineCallbacks, succeed
+from twisted.application.service import Service
import sys
from calendarserver.tap.util import getRootResource
-from twisted.application.service import Service
from errno import ENOENT, EACCES
from twext.enterprise.queue import NonPerformingQueuer
@@ -110,15 +110,13 @@
class WorkerService(Service):
def __init__(self, store):
- self._store = store
- # Work can be queued but will not be performed by the command line tool
- store.queuer = NonPerformingQueuer()
+ self.store = store
def rootResource(self):
try:
from twistedcaldav.config import config
- rootResource = getRootResource(config, self._store)
+ rootResource = getRootResource(config, self.store)
except OSError, e:
if e.errno == ENOENT:
# Trying to re-write resources.xml but its parent directory does
@@ -137,11 +135,19 @@
raise
return rootResource
+
@inlineCallbacks
def startService(self):
+
from twisted.internet import reactor
try:
- yield self.doWork()
+ # Work can be queued but will not be performed by the command
+ # line tool
+ if self.store is not None:
+ self.store.queuer = NonPerformingQueuer()
+ yield self.doWork()
+ else:
+ yield self.doWorkWithoutStore()
except ConfigurationError, ce:
sys.stderr.write("Error: %s\n" % (str(ce),))
except Exception, e:
@@ -150,3 +156,11 @@
finally:
reactor.stop()
+ def doWorkWithoutStore(self):
+ """
+ Subclasses can override doWorkWithoutStore if there is any work they
+ can accomplish without access to the store, or if they want to emit
+ their own error message.
+ """
+ sys.stderr.write("Error: Data store is not available\n")
+ return succeed(None)
Modified: CalendarServer/trunk/calendarserver/tools/gateway.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/gateway.py 2013-04-27 02:33:05 UTC (rev 11108)
+++ CalendarServer/trunk/calendarserver/tools/gateway.py 2013-04-27 03:20:20 UTC (rev 11109)
@@ -24,7 +24,7 @@
from twext.python.plistlib import readPlistFromString, writePlistToString
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import inlineCallbacks, succeed
from twistedcaldav.directory.directory import DirectoryError
from txdav.xml import element as davxml
@@ -73,12 +73,18 @@
"""
rootResource = self.rootResource()
directory = rootResource.getDirectory()
- runner = Runner(rootResource, directory, self._store, self.commands)
+ runner = Runner(rootResource, directory, self.store, self.commands)
if runner.validate():
yield runner.run()
+ def doWorkWithoutStore(self):
+ respondWithError("Database is not available")
+ return succeed(None)
+
+
+
def main():
try:
Modified: CalendarServer/trunk/calendarserver/tools/principals.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/principals.py 2013-04-27 02:33:05 UTC (rev 11108)
+++ CalendarServer/trunk/calendarserver/tools/principals.py 2013-04-27 03:20:20 UTC (rev 11109)
@@ -113,7 +113,7 @@
if self.function is not None:
rootResource = self.rootResource()
directory = rootResource.getDirectory()
- yield self.function(rootResource, directory, self._store, *self.params)
+ yield self.function(rootResource, directory, self.store, *self.params)
def main():
Modified: CalendarServer/trunk/calendarserver/tools/purge.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/purge.py 2013-04-27 02:33:05 UTC (rev 11108)
+++ CalendarServer/trunk/calendarserver/tools/purge.py 2013-04-27 03:20:20 UTC (rev 11109)
@@ -186,7 +186,7 @@
if self.dryrun:
if self.verbose:
print("(Dry run) Searching for old events...")
- txn = self._store.newTransaction(label="Find old events")
+ txn = self.store.newTransaction(label="Find old events")
oldEvents = (yield txn.eventsOlderThan(self.cutoff))
eventCount = len(oldEvents)
if self.verbose:
@@ -204,7 +204,7 @@
numEventsRemoved = -1
totalRemoved = 0
while numEventsRemoved:
- txn = self._store.newTransaction(label="Remove old events")
+ txn = self.store.newTransaction(label="Remove old events")
numEventsRemoved = (yield txn.removeOldEvents(self.cutoff, batchSize=self.batchSize))
(yield txn.commit())
if numEventsRemoved:
@@ -396,7 +396,7 @@
if self.verbose:
print("(Dry run) Searching for orphaned attachments...")
- txn = self._store.newTransaction(label="Find orphaned attachments")
+ txn = self.store.newTransaction(label="Find orphaned attachments")
orphans = (yield txn.orphanedAttachments(self.uuid))
returnValue(orphans)
@@ -406,7 +406,7 @@
if self.verbose:
print("(Dry run) Searching for old dropbox attachments...")
- txn = self._store.newTransaction(label="Find old dropbox attachments")
+ txn = self.store.newTransaction(label="Find old dropbox attachments")
cutoffs = (yield txn.oldDropboxAttachments(self.cutoff, self.uuid))
yield txn.commit()
@@ -418,7 +418,7 @@
if self.verbose:
print("(Dry run) Searching for old managed attachments...")
- txn = self._store.newTransaction(label="Find old managed attachments")
+ txn = self.store.newTransaction(label="Find old managed attachments")
cutoffs = (yield txn.oldManagedAttachments(self.cutoff, self.uuid))
yield txn.commit()
@@ -500,7 +500,7 @@
numOrphansRemoved = -1
totalRemoved = 0
while numOrphansRemoved:
- txn = self._store.newTransaction(label="Remove orphaned attachments")
+ txn = self.store.newTransaction(label="Remove orphaned attachments")
numOrphansRemoved = (yield txn.removeOrphanedAttachments(self.uuid, batchSize=self.batchSize))
yield txn.commit()
if numOrphansRemoved:
@@ -531,7 +531,7 @@
numOldRemoved = -1
totalRemoved = 0
while numOldRemoved:
- txn = self._store.newTransaction(label="Remove old dropbox attachments")
+ txn = self.store.newTransaction(label="Remove old dropbox attachments")
numOldRemoved = (yield txn.removeOldDropboxAttachments(self.cutoff, self.uuid, batchSize=self.batchSize))
yield txn.commit()
if numOldRemoved:
@@ -562,7 +562,7 @@
numOldRemoved = -1
totalRemoved = 0
while numOldRemoved:
- txn = self._store.newTransaction(label="Remove old managed attachments")
+ txn = self.store.newTransaction(label="Remove old managed attachments")
numOldRemoved = (yield txn.removeOldManagedAttachments(self.cutoff, self.uuid, batchSize=self.batchSize))
yield txn.commit()
if numOldRemoved:
@@ -768,7 +768,7 @@
)
# See if calendar home is provisioned
- txn = self._store.newTransaction()
+ txn = self.store.newTransaction()
storeCalHome = (yield txn.calendarHomeWithUID(uid))
calHomeProvisioned = storeCalHome is not None
@@ -937,7 +937,7 @@
raise e
try:
- txn = self._store.newTransaction()
+ txn = self.store.newTransaction()
# Remove empty calendar collections (and calendar home if no more
# calendars)
@@ -1129,7 +1129,7 @@
proxyFor = (yield principal.proxyFor(proxyType == "write"))
for other in proxyFor:
assignments.append((principal.record.uid, proxyType, other.record.uid))
- (yield removeProxy(self.root, self.directory, self._store, other, principal))
+ (yield removeProxy(self.root, self.directory, self.store, other, principal))
subPrincipal = principal.getChild("calendar-proxy-" + proxyType)
proxies = (yield subPrincipal.readProperty(davxml.GroupMemberSet, None))
Modified: CalendarServer/trunk/calendarserver/tools/push.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/push.py 2013-04-27 02:33:05 UTC (rev 11108)
+++ CalendarServer/trunk/calendarserver/tools/push.py 2013-04-27 03:20:20 UTC (rev 11109)
@@ -34,12 +34,12 @@
class WorkerService(Service):
def __init__(self, store):
- self._store = store
+ self.store = store
def rootResource(self):
try:
- rootResource = getRootResource(config, self._store)
+ rootResource = getRootResource(config, self.store)
except OSError, e:
if e.errno == ENOENT:
# Trying to re-write resources.xml but its parent directory does
@@ -81,7 +81,7 @@
def doWork(self):
rootResource = self.rootResource()
directory = rootResource.getDirectory()
- return displayAPNSubscriptions(self._store, directory, rootResource,
+ return displayAPNSubscriptions(self.store, directory, rootResource,
self.users)
Modified: CalendarServer/trunk/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/stdconfig.py 2013-04-27 02:33:05 UTC (rev 11108)
+++ CalendarServer/trunk/twistedcaldav/stdconfig.py 2013-04-27 03:20:20 UTC (rev 11109)
@@ -308,6 +308,8 @@
"FailIfUpgradeNeeded" : True, # Set to True to prevent the server or utility tools
# tools from running if the database needs a schema
# upgrade.
+ "StopAfterUpgradeTriggerFile" : "stop_after_upgrade", # if this file exists
+ # in ConfigRoot, stop the service after finishing upgrade phase
#
# Types of service provided
@@ -1063,7 +1065,7 @@
("DataRoot", "DatabaseRoot"),
("DataRoot", "AttachmentsRoot"),
("DataRoot", ("TimezoneService", "BasePath",)),
- ("ConfigRoot", "SudoersFile"),
+ ("ConfigRoot", "StopAfterUpgradeTriggerFile"),
("ConfigRoot", ("Scheduling", "iSchedule", "DNSDebug",)),
("ConfigRoot", ("Scheduling", "iSchedule", "DKIM", "PrivateKeyFile",)),
("ConfigRoot", ("Scheduling", "iSchedule", "DKIM", "PublicKeyFile",)),
Modified: CalendarServer/trunk/twistedcaldav/test/test_upgrade.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/test/test_upgrade.py 2013-04-27 02:33:05 UTC (rev 11108)
+++ CalendarServer/trunk/twistedcaldav/test/test_upgrade.py 2013-04-27 03:20:20 UTC (rev 11109)
@@ -1708,15 +1708,3 @@
return True
except ValueError:
return False
-
-
-
-class ParallelUpgradeTests(UpgradeTests):
- """
- Tests for upgradeData in parallel.
- """
-
- def doUpgrade(self, config):
- from txdav.common.datastore.upgrade.test.test_migrate import StubSpawner
- spawner = StubSpawner(config)
- return upgradeData(config, spawner, 2)
Modified: CalendarServer/trunk/twistedcaldav/upgrade.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/upgrade.py 2013-04-27 02:33:05 UTC (rev 11108)
+++ CalendarServer/trunk/twistedcaldav/upgrade.py 2013-04-27 03:20:20 UTC (rev 11109)
@@ -31,6 +31,7 @@
from zlib import compress
from cPickle import loads as unpickle, UnpicklingError
+
from twext.python.log import Logger
from txdav.xml import element
from twext.web2.dav.fileop import rmdir
@@ -51,10 +52,8 @@
from twistedcaldav.scheduling.scheduler import DirectScheduler
from twistedcaldav.util import normalizationLookup
-from twisted.application.service import Service
-from twisted.internet import reactor
from twisted.internet.defer import (
- inlineCallbacks, succeed, returnValue, gatherResults
+ inlineCallbacks, succeed, returnValue
)
from twisted.python.reflect import namedAny
from twisted.python.reflect import namedClass
@@ -67,7 +66,6 @@
from calendarserver.tools.resources import migrateResources
from calendarserver.tools.util import getDirectory
-from twext.python.parallel import Parallelizer
from twistedcaldav.scheduling.imip.mailgateway import migrateTokensToStore
from twistedcaldav.scheduling.imip.inbound import scheduleNextMailPoll
@@ -302,7 +300,7 @@
@inlineCallbacks
-def upgrade_to_1(config, spawner, parallel, directory):
+def upgrade_to_1(config, directory):
"""
Upconvert data from any calendar server version prior to data format 1.
"""
@@ -533,12 +531,6 @@
os.chown(inboxItemsFile, uid, gid)
if total:
- if parallel:
- spawner.startService()
- parallelizer = Parallelizer((yield gatherResults(
- [spawner.spawnWithConfig(config, To1Driver(), To1Home)
- for _ignore_x in xrange(parallel)]
- )))
log.warn("Processing %d calendar homes in %s" % (total, uidHomes))
# Upgrade calendar homes in the new location:
@@ -556,30 +548,15 @@
# Skip non-directories
continue
- if parallel:
- def doIt(driver, hp=homePath):
- d = driver.upgradeHomeInHelper(hp)
- def itWorked(succeeded):
- if not succeeded:
- setError()
- return succeeded
- d.addCallback(itWorked)
- d.addErrback(setError)
- return d
- yield parallelizer.do(doIt)
- else:
- if not upgradeCalendarHome(
- homePath, directory, cuaCache
- ):
- setError()
+ if not upgradeCalendarHome(
+ homePath, directory, cuaCache
+ ):
+ setError()
count += 1
if count % 10 == 0:
log.warn("Processed calendar home %d of %d"
% (count, total))
- if parallel:
- yield parallelizer.done()
- yield spawner.stopService()
log.warn("Done processing calendar homes")
triggerPath = os.path.join(config.ServerRoot, TRIGGER_FILE)
@@ -632,7 +609,7 @@
@inlineCallbacks
-def upgrade_to_2(config, spawner, parallel, directory):
+def upgrade_to_2(config, directory):
def renameProxyDB():
#
@@ -746,7 +723,7 @@
]
@inlineCallbacks
-def upgradeData(config, spawner=None, parallel=0):
+def upgradeData(config):
directory = getDirectory()
@@ -784,7 +761,7 @@
for version, method in upgradeMethods:
if onDiskVersion < version:
log.warn("Upgrading to version %d" % (version,))
- (yield method(config, spawner, parallel, directory))
+ (yield method(config, directory))
log.warn("Upgraded to version %d" % (version,))
with open(versionFilePath, "w") as verFile:
verFile.write(str(version))
@@ -981,19 +958,16 @@
-class UpgradeFileSystemFormatService(Service, object):
+class UpgradeFileSystemFormatStep(object):
"""
Upgrade filesystem from previous versions.
"""
- def __init__(self, config, spawner, parallel, service):
+ def __init__(self, config):
"""
Initialize the service.
"""
- self.wrappedService = service
self.config = config
- self.spawner = spawner
- self.parallel = parallel
@inlineCallbacks
@@ -1010,26 +984,25 @@
memcacheEnabled = self.config.Memcached.Pools.Default.ClientEnabled
self.config.Memcached.Pools.Default.ClientEnabled = False
- yield upgradeData(self.config, self.spawner, self.parallel)
+ yield upgradeData(self.config)
# Restore memcached client setting
self.config.Memcached.Pools.Default.ClientEnabled = memcacheEnabled
- # see http://twistedmatrix.com/trac/ticket/4649
- reactor.callLater(0, self.wrappedService.setServiceParent, self.parent)
+ returnValue(None)
- def startService(self):
+ def stepWithResult(self, result):
"""
- Start the service.
+ Execute the step.
"""
- self.doUpgrade()
+ return self.doUpgrade()
-class PostDBImportService(Service, object):
+class PostDBImportStep(object):
"""
- Service which runs after database import but before workers are spawned
+ Step which runs after database import but before workers are spawned
(except memcached will be running at this point)
The jobs carried out here are:
@@ -1038,67 +1011,69 @@
2. Processing non-implicit inbox items
"""
- def __init__(self, config, store, service):
+
+ def __init__(self, store, config, doPostImport):
"""
Initialize the service.
"""
- self.wrappedService = service
self.store = store
self.config = config
+ self.doPostImport = doPostImport
@inlineCallbacks
- def startService(self):
- """
- Start the service.
- """
+ def stepWithResult(self, result):
+ if self.doPostImport:
- directory = directoryFromConfig(self.config)
+ directory = directoryFromConfig(self.config)
- # Load proxy assignments from XML if specified
- if self.config.ProxyLoadFromFile:
- proxydbClass = namedClass(self.config.ProxyDBService.type)
- calendaruserproxy.ProxyDBService = proxydbClass(
- **self.config.ProxyDBService.params)
- loader = XMLCalendarUserProxyLoader(self.config.ProxyLoadFromFile)
- yield loader.updateProxyDB()
-
- # Populate the group membership cache
- if (self.config.GroupCaching.Enabled and
- self.config.GroupCaching.EnableUpdater):
- proxydb = calendaruserproxy.ProxyDBService
- if proxydb is None:
+ # Load proxy assignments from XML if specified
+ if self.config.ProxyLoadFromFile:
proxydbClass = namedClass(self.config.ProxyDBService.type)
- proxydb = proxydbClass(**self.config.ProxyDBService.params)
+ calendaruserproxy.ProxyDBService = proxydbClass(
+ **self.config.ProxyDBService.params)
+ loader = XMLCalendarUserProxyLoader(self.config.ProxyLoadFromFile)
+ yield loader.updateProxyDB()
- updater = GroupMembershipCacheUpdater(proxydb,
- directory,
- self.config.GroupCaching.UpdateSeconds,
- self.config.GroupCaching.ExpireSeconds,
- namespace=self.config.GroupCaching.MemcachedPool,
- useExternalProxies=self.config.GroupCaching.UseExternalProxies)
- yield updater.updateCache(fast=True)
- # Set in motion the work queue based updates:
- yield scheduleNextGroupCachingUpdate(self.store, 0)
+ # Populate the group membership cache
+ if (self.config.GroupCaching.Enabled and
+ self.config.GroupCaching.EnableUpdater):
+ proxydb = calendaruserproxy.ProxyDBService
+ if proxydb is None:
+ proxydbClass = namedClass(self.config.ProxyDBService.type)
+ proxydb = proxydbClass(**self.config.ProxyDBService.params)
- uid, gid = getCalendarServerIDs(self.config)
- dbPath = os.path.join(self.config.DataRoot, "proxies.sqlite")
- if os.path.exists(dbPath):
- os.chown(dbPath, uid, gid)
+ updater = GroupMembershipCacheUpdater(proxydb,
+ directory,
+ self.config.GroupCaching.UpdateSeconds,
+ self.config.GroupCaching.ExpireSeconds,
+ namespace=self.config.GroupCaching.MemcachedPool,
+ useExternalProxies=self.config.GroupCaching.UseExternalProxies)
+ yield updater.updateCache(fast=True)
- # Process old inbox items
- self.store.setMigrating(True)
- yield self.processInboxItems()
- self.store.setMigrating(False)
+ uid, gid = getCalendarServerIDs(self.config)
+ dbPath = os.path.join(self.config.DataRoot, "proxies.sqlite")
+ if os.path.exists(dbPath):
+ os.chown(dbPath, uid, gid)
- # Migrate mail tokens from sqlite to store
- yield migrateTokensToStore(self.config.DataRoot, self.store)
- # Set mail polling in motion
- if self.config.Scheduling.iMIP.Enabled:
- yield scheduleNextMailPoll(self.store, 0)
+ # Process old inbox items
+ self.store.setMigrating(True)
+ yield self.processInboxItems()
+ self.store.setMigrating(False)
+ # Migrate mail tokens from sqlite to store
+ yield migrateTokensToStore(self.config.DataRoot, self.store)
+ # Set mail polling in motion
+ if self.config.Scheduling.iMIP.Enabled:
+ yield scheduleNextMailPoll(self.store, 0)
+ if (self.config.GroupCaching.Enabled and
+ self.config.GroupCaching.EnableUpdater):
+ # Set in motion the work queue based updates:
+ yield scheduleNextGroupCachingUpdate(self.store, 0)
+
+
@inlineCallbacks
def processInboxItems(self):
"""
@@ -1225,7 +1200,6 @@
os.remove(inboxItemsList)
log.info("Completed inbox item processing.")
- reactor.callLater(0, self.wrappedService.setServiceParent, self.parent)
@inlineCallbacks
Modified: CalendarServer/trunk/txdav/common/datastore/upgrade/migrate.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/upgrade/migrate.py 2013-04-27 02:33:05 UTC (rev 11108)
+++ CalendarServer/trunk/txdav/common/datastore/upgrade/migrate.py 2013-04-27 03:20:20 UTC (rev 11109)
@@ -29,13 +29,10 @@
from twisted.python.runtime import platform
from twisted.python.reflect import namedAny, qual
-from twisted.application.service import Service
-from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, returnValue
-from twisted.internet.defer import maybeDeferred, gatherResults
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed
+from twisted.internet.defer import maybeDeferred
from twext.python.filepath import CachingFilePath
-from twext.python.parallel import Parallelizer
from twext.internet.spawnsvc import SpawnerService
from twisted.protocols.amp import AMP, Command, String, Boolean
@@ -201,7 +198,7 @@
@Configure.responder
def configure(self, filename, appropriateStoreClass, merge):
subsvc = None
- self.upgrader = UpgradeToDatabaseService(
+ self.upgrader = UpgradeToDatabaseStep(
FileStore(
CachingFilePath(filename), None, True, True,
propertyStoreClass=namedAny(appropriateStoreClass)
@@ -231,45 +228,39 @@
-class UpgradeToDatabaseService(Service, LoggingMixIn, object):
+class UpgradeToDatabaseStep(LoggingMixIn, object):
"""
Upgrade resources from a filesystem store to a database store.
"""
- @classmethod
- def wrapService(cls, path, service, store, uid=None, gid=None,
- parallel=0, spawner=None, merge=False):
+ def __init__(self, fileStore, sqlStore, uid=None, gid=None, merge=False):
"""
- Create an L{UpgradeToDatabaseService} if there are still file-based
+ Create an L{UpgradeToDatabaseStep} if there are still file-based
calendar or addressbook homes remaining in the given path.
- @param path: a path pointing at the document root, where the file-based
- data-store is located.
- @type path: L{CachingFilePath}
+ @param sqlStore: the SQL storage service.
- @param service: the service to wrap. This service should be started
- when the upgrade is complete. (This is accomplished by returning
- it directly when no upgrade needs to be done, and by adding it to
- the service hierarchy when the upgrade completes; assuming that the
- service parent of the resulting service will be set to a
- L{MultiService} or similar.)
-
- @param store: the SQL storage service.
-
- @type service: L{IService}
-
- @param parallel: The number of parallel subprocesses that should manage
- the upgrade.
-
- @param spawner: a concrete L{StoreSpawnerService} subclass that will be
- used to spawn helper processes.
-
@param merge: merge filesystem homes into SQL homes, rather than
skipping them.
@return: a service
@rtype: L{IService}
"""
+
+ self.fileStore = fileStore
+ self.sqlStore = sqlStore
+ self.uid = uid
+ self.gid = gid
+ self.merge = merge
+
+ @classmethod
+ def fileStoreFromPath(cls, path):
+ """
+ @param path: a path pointing at the document root, where the file-based
+ data-store is located.
+ @type path: L{CachingFilePath}
+ """
+
# TODO: TOPPATHS should be computed based on enabled flags in 'store',
# not hard coded.
for homeType in TOPPATHS:
@@ -304,31 +295,11 @@
appropriateStoreClass = AppleDoubleStore
- self = cls(
- FileStore(path, None, True, True,
- propertyStoreClass=appropriateStoreClass),
- store, service, uid=uid, gid=gid,
- parallel=parallel, spawner=spawner, merge=merge
- )
- return self
- return service
+ return FileStore(path, None, True, True,
+ propertyStoreClass=appropriateStoreClass)
+ return None
- def __init__(self, fileStore, sqlStore, service, uid=None, gid=None,
- parallel=0, spawner=None, merge=False):
- """
- Initialize the service.
- """
- self.wrappedService = service
- self.fileStore = fileStore
- self.sqlStore = sqlStore
- self.uid = uid
- self.gid = gid
- self.parallel = parallel
- self.spawner = spawner
- self.merge = merge
-
-
@inlineCallbacks
def migrateOneHome(self, fileTxn, homeType, fileHome):
"""
@@ -373,38 +344,6 @@
@return: a Deferred which fires when the migration is complete.
"""
self.sqlStore.setMigrating(True)
- parallel = self.parallel
- if parallel:
- self.log_warn("Starting %d upgrade helper processes." %
- (parallel,))
- spawner = self.spawner
- spawner.startService()
- drivers = yield gatherResults(
- [spawner.spawnWithStore(UpgradeDriver(self),
- UpgradeHelperProcess)
- for x in xrange(parallel)]
- )
- # Wait for all subprocesses to be fully configured before
- # continuing, but let them configure in any order.
- self.log_warn("Configuring upgrade helper processes.")
-
- # FIXME: abstraction violations galore here; not too important,
- # since fileStore and this code are part of the same conceptual
- # unit, but if these become more independent there should probably
- # be a store-serialization API so that this code doesn't need to
- # know the intimate details of the fileStore implementation.
- # (Alternately, wrapService could just hold on to the details that
- # it used to construct the service in the first place.)
- yield gatherResults(
- [driver.configure(self.fileStore._path.path,
- self.fileStore._propertyStoreClass)
- for driver in drivers]
- )
- self.log_warn("Upgrade helpers ready.")
- parallelizer = Parallelizer(drivers)
- else:
- parallelizer = None
-
self.log_warn("Beginning filesystem -> database upgrade.")
for homeType, eachFunc in [
@@ -413,13 +352,10 @@
]:
yield eachFunc(
lambda txn, home: self._upgradeAction(
- txn, home, homeType, parallel, parallelizer
+ txn, home, homeType
)
)
- if parallel:
- yield parallelizer.done()
-
for homeType in TOPPATHS:
homesPath = self.fileStore._path.child(homeType)
if homesPath.isdir():
@@ -438,38 +374,19 @@
self.sqlStore.setMigrating(False)
- if parallel:
- self.log_warn("Stopping upgrade helper processes.")
- yield spawner.stopService()
- self.log_warn("Upgrade helpers all stopped.")
self.log_warn(
"Filesystem upgrade complete, launching database service."
)
- wrapped = self.wrappedService
- if wrapped is not None:
- # see http://twistedmatrix.com/trac/ticket/4649
- reactor.callLater(0, wrapped.setServiceParent, self.parent)
@inlineCallbacks
- def _upgradeAction(self, fileTxn, fileHome, homeType, parallel,
- parallelizer):
+ def _upgradeAction(self, fileTxn, fileHome, homeType):
uid = fileHome.uid()
self.log_warn("Migrating %s UID %r" % (homeType, uid))
- if parallel:
- @inlineCallbacks
- def doOneUpgrade(driver, fileUID=uid, homeType=homeType):
- yield driver.oneUpgrade(fileUID, homeType)
- self.log_warn("Completed migration of %s uid %r" %
- (homeType, fileUID))
- yield parallelizer.do(doOneUpgrade)
- else:
- yield self.migrateOneHome(fileTxn, homeType, fileHome)
+ yield self.migrateOneHome(fileTxn, homeType, fileHome)
-
- def startService(self):
- """
- Start the service.
- """
- self.doMigration()
+ def stepWithResult(self, result):
+ if self.fileStore is None:
+ return succeed(None)
+ return self.doMigration()
Modified: CalendarServer/trunk/txdav/common/datastore/upgrade/sql/others/test/test_attachment_migration.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/upgrade/sql/others/test/test_attachment_migration.py 2013-04-27 02:33:05 UTC (rev 11108)
+++ CalendarServer/trunk/txdav/common/datastore/upgrade/sql/others/test/test_attachment_migration.py 2013-04-27 03:20:20 UTC (rev 11109)
@@ -26,7 +26,7 @@
from txdav.common.datastore.test.util import theStoreBuilder, \
StubNotifierFactory
from txdav.common.datastore.upgrade.sql.others import attachment_migration
-from txdav.common.datastore.upgrade.sql.upgrade import UpgradeDatabaseOtherService
+from txdav.common.datastore.upgrade.sql.upgrade import UpgradeDatabaseOtherStep
"""
Tests for L{txdav.common.datastore.upgrade.sql.upgrade}.
@@ -35,7 +35,7 @@
class AttachmentMigrationTests(TestCase):
"""
- Tests for L{UpgradeDatabaseSchemaService}.
+ Tests for L{UpgradeDatabaseSchemaStep}.
"""
@inlineCallbacks
@@ -80,7 +80,7 @@
store = (yield self._initStore())
- upgrader = UpgradeDatabaseOtherService(store, None)
+ upgrader = UpgradeDatabaseOtherStep(store)
yield attachment_migration.doUpgrade(upgrader)
self.assertFalse(didUpgrade[0])
@@ -108,7 +108,7 @@
store = (yield self._initStore())
- upgrader = UpgradeDatabaseOtherService(store, None)
+ upgrader = UpgradeDatabaseOtherStep(store)
yield attachment_migration.doUpgrade(upgrader)
self.assertTrue(didUpgrade[0])
@@ -139,7 +139,7 @@
yield txn.setCalendarserverValue("MANAGED-ATTACHMENTS", "1")
yield txn.commit()
- upgrader = UpgradeDatabaseOtherService(store, None)
+ upgrader = UpgradeDatabaseOtherStep(store)
yield attachment_migration.doUpgrade(upgrader)
self.assertFalse(didUpgrade[0])
@@ -167,7 +167,7 @@
store = (yield self._initStore(False))
- upgrader = UpgradeDatabaseOtherService(store, None)
+ upgrader = UpgradeDatabaseOtherStep(store)
yield attachment_migration.doUpgrade(upgrader)
self.assertFalse(didUpgrade[0])
Modified: CalendarServer/trunk/txdav/common/datastore/upgrade/sql/test/test_upgrade.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/upgrade/sql/test/test_upgrade.py 2013-04-27 02:33:05 UTC (rev 11108)
+++ CalendarServer/trunk/txdav/common/datastore/upgrade/sql/test/test_upgrade.py 2013-04-27 03:20:20 UTC (rev 11109)
@@ -23,13 +23,13 @@
from twisted.python.modules import getModule
from twisted.trial.unittest import TestCase
from txdav.common.datastore.test.util import theStoreBuilder, StubNotifierFactory
-from txdav.common.datastore.upgrade.sql.upgrade import UpgradeDatabaseSchemaService, \
- UpgradeDatabaseDataService
+from txdav.common.datastore.upgrade.sql.upgrade import UpgradeDatabaseSchemaStep, \
+ UpgradeDatabaseDataStep
import re
class SchemaUpgradeTests(TestCase):
"""
- Tests for L{UpgradeDatabaseSchemaService}.
+ Tests for L{UpgradeDatabaseSchemaStep}.
"""
def _getSchemaVersion(self, fp, versionKey):
@@ -45,7 +45,7 @@
def test_scanUpgradeFiles(self):
- upgrader = UpgradeDatabaseSchemaService(None, None)
+ upgrader = UpgradeDatabaseSchemaStep(None)
upgrader.schemaLocation = getModule(__name__).filePath.sibling("fake_schema1")
files = upgrader.scanForUpgradeFiles("fake_dialect")
@@ -66,7 +66,7 @@
def test_determineUpgradeSequence(self):
- upgrader = UpgradeDatabaseSchemaService(None, None)
+ upgrader = UpgradeDatabaseSchemaStep(None)
upgrader.schemaLocation = getModule(__name__).filePath.sibling("fake_schema1")
files = upgrader.scanForUpgradeFiles("fake_dialect")
@@ -104,7 +104,7 @@
"""
for dialect in (POSTGRES_DIALECT, ORACLE_DIALECT,):
- upgrader = UpgradeDatabaseSchemaService(None, None)
+ upgrader = UpgradeDatabaseSchemaStep(None)
files = upgrader.scanForUpgradeFiles(dialect)
current_version = self._getSchemaVersion(upgrader.schemaLocation.child("current.sql"), "VERSION")
@@ -120,7 +120,7 @@
# """
#
# for dialect in (POSTGRES_DIALECT, ORACLE_DIALECT,):
-# upgrader = UpgradeDatabaseSchemaService(None, None)
+# upgrader = UpgradeDatabaseSchemaStep(None)
# files = upgrader.scanForUpgradeFiles(dialect)
# for _ignore_from, _ignore_to, fp in files:
# result = upgrader.getDataUpgrade(fp)
@@ -175,12 +175,12 @@
self.addCleanup(_cleanupOldSchema)
- test_upgrader = UpgradeDatabaseSchemaService(None, None)
+ test_upgrader = UpgradeDatabaseSchemaStep(None)
expected_version = self._getSchemaVersion(test_upgrader.schemaLocation.child("current.sql"), "VERSION")
for child in test_upgrader.schemaLocation.child("old").child(POSTGRES_DIALECT).globChildren("*.sql"):
# Upgrade allowed
- upgrader = UpgradeDatabaseSchemaService(store, None)
+ upgrader = UpgradeDatabaseSchemaStep(store)
yield _loadOldSchema(child)
yield upgrader.databaseUpgrade()
new_version = yield _loadVersion()
@@ -189,7 +189,7 @@
self.assertEqual(new_version, expected_version)
# Upgrade disallowed
- upgrader = UpgradeDatabaseSchemaService(store, None, failIfUpgradeNeeded=True, stopOnFail=False)
+ upgrader = UpgradeDatabaseSchemaStep(store, failIfUpgradeNeeded=True, stopOnFail=False)
yield _loadOldSchema(child)
old_version = yield _loadVersion()
try:
@@ -257,14 +257,14 @@
self.addCleanup(_cleanupOldData)
- test_upgrader = UpgradeDatabaseSchemaService(None, None)
+ test_upgrader = UpgradeDatabaseSchemaStep(None)
expected_version = self._getSchemaVersion(test_upgrader.schemaLocation.child("current.sql"), "CALENDAR-DATAVERSION")
versions = set()
for child in test_upgrader.schemaLocation.child("old").child(POSTGRES_DIALECT).globChildren("*.sql"):
versions.add(self._getSchemaVersion(child, "CALENDAR-DATAVERSION"))
for oldVersion in sorted(versions):
- upgrader = UpgradeDatabaseDataService(store, None)
+ upgrader = UpgradeDatabaseDataStep(store)
yield _loadOldData(test_upgrader.schemaLocation.child("current.sql"), oldVersion)
yield upgrader.databaseUpgrade()
new_version = yield _loadVersion()
Modified: CalendarServer/trunk/txdav/common/datastore/upgrade/sql/upgrade.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/upgrade/sql/upgrade.py 2013-04-27 02:33:05 UTC (rev 11108)
+++ CalendarServer/trunk/txdav/common/datastore/upgrade/sql/upgrade.py 2013-04-27 03:20:20 UTC (rev 11109)
@@ -24,7 +24,6 @@
from twext.python.log import LoggingMixIn
-from twisted.application.service import Service
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.python.modules import getModule
@@ -32,7 +31,7 @@
from txdav.common.datastore.upgrade.sql.others import attachment_migration
-class UpgradeDatabaseCoreService(Service, LoggingMixIn, object):
+class UpgradeDatabaseCoreStep(LoggingMixIn, object):
"""
Base class for either schema or data upgrades on the database.
@@ -42,45 +41,12 @@
@ivar sqlStore: The store to operate on.
@type sqlStore: L{txdav.idav.IDataStore}
-
- @ivar wrappedService: Wrapped L{IService} that will be started after this
- L{UpgradeDatabaseSchemaService}'s work is done and the database schema
- of C{sqlStore} is fully upgraded. This may also be specified as
- C{None}, in which case no service will be started.
-
- @type wrappedService: L{IService} or C{NoneType}
"""
- @classmethod
- def wrapService(cls, service, store, uid=None, gid=None, **kwargs):
+ def __init__(self, sqlStore, uid=None, gid=None, failIfUpgradeNeeded=False, stopOnFail=True):
"""
- Create an L{UpgradeDatabaseSchemaService} when starting the database
- so we can check the schema version and do any upgrades.
-
- @param service: the service to wrap. This service should be started
- when the upgrade is complete. (This is accomplished by returning
- it directly when no upgrade needs to be done, and by adding it to
- the service hierarchy when the upgrade completes; assuming that the
- service parent of the resulting service will be set to a
- L{MultiService} or similar.)
-
- @param store: the SQL storage service.
-
- @type service: L{IService}
-
- @type store: L{txdav.idav.IDataStore}
-
- @return: a service
- @rtype: L{IService}
- """
- return cls(store, service, uid=uid, gid=gid, **kwargs)
-
-
- def __init__(self, sqlStore, service, uid=None, gid=None, failIfUpgradeNeeded=False, stopOnFail=True):
- """
Initialize the service.
"""
- self.wrappedService = service
self.sqlStore = sqlStore
self.uid = uid
self.gid = gid
@@ -95,11 +61,11 @@
self.defaultKeyValue = None
- def startService(self):
+ def stepWithResult(self, result):
"""
Start the service.
"""
- self.databaseUpgrade()
+ return self.databaseUpgrade()
@inlineCallbacks
@@ -123,6 +89,7 @@
elif self.failIfUpgradeNeeded:
if self.stopOnFail:
reactor.stop()
+ # TODO: change this exception to be upgrade-specific
raise RuntimeError("Database upgrade is needed but not allowed.")
else:
self.sqlStore.setUpgrading(True)
@@ -131,9 +98,7 @@
self.log_warn("Database %s check complete." % (self.versionDescriptor,))
- # see http://twistedmatrix.com/trac/ticket/4649
- if self.wrappedService is not None:
- reactor.callLater(0, self.wrappedService.setServiceParent, self.parent)
+ returnValue(None)
@inlineCallbacks
@@ -263,7 +228,7 @@
-class UpgradeDatabaseSchemaService(UpgradeDatabaseCoreService):
+class UpgradeDatabaseSchemaStep(UpgradeDatabaseCoreStep):
"""
Checks and upgrades the database schema. This assumes there are a bunch of
upgrade files in sql syntax that we can execute against the database to
@@ -272,23 +237,15 @@
@ivar sqlStore: The store to operate on.
@type sqlStore: L{txdav.idav.IDataStore}
-
- @ivar wrappedService: Wrapped L{IService} that will be started after this
- L{UpgradeDatabaseSchemaService}'s work is done and the database schema
- of C{sqlStore} is fully upgraded. This may also be specified as
- C{None}, in which case no service will be started.
-
- @type wrappedService: L{IService} or C{NoneType}
"""
- def __init__(self, sqlStore, service, **kwargs):
+ def __init__(self, sqlStore, **kwargs):
"""
Initialize the service.
@param sqlStore: The store to operate on. Can be C{None} when doing unit tests.
- @param service: Wrapped service. Can be C{None} when doing unit tests.
"""
- super(UpgradeDatabaseSchemaService, self).__init__(sqlStore, service, **kwargs)
+ super(UpgradeDatabaseSchemaStep, self).__init__(sqlStore, **kwargs)
self.versionKey = "VERSION"
self.versionDescriptor = "schema"
@@ -316,7 +273,7 @@
-class UpgradeDatabaseDataService(UpgradeDatabaseCoreService):
+class UpgradeDatabaseDataStep(UpgradeDatabaseCoreStep):
"""
Checks and upgrades the database data. This assumes there are a bunch of
upgrade python modules that we can execute against the database to
@@ -325,23 +282,15 @@
@ivar sqlStore: The store to operate on.
@type sqlStore: L{txdav.idav.IDataStore}
-
- @ivar wrappedService: Wrapped L{IService} that will be started after this
- L{UpgradeDatabaseSchemaService}'s work is done and the database schema
- of C{sqlStore} is fully upgraded. This may also be specified as
- C{None}, in which case no service will be started.
-
- @type wrappedService: L{IService} or C{NoneType}
"""
- def __init__(self, sqlStore, service, **kwargs):
+ def __init__(self, sqlStore, **kwargs):
"""
- Initialize the service.
+ Initialize the Step.
@param sqlStore: The store to operate on. Can be C{None} when doing unit tests.
- @param service: Wrapped service. Can be C{None} when doing unit tests.
"""
- super(UpgradeDatabaseDataService, self).__init__(sqlStore, service, **kwargs)
+ super(UpgradeDatabaseDataStep, self).__init__(sqlStore, **kwargs)
self.versionKey = "CALENDAR-DATAVERSION"
self.versionDescriptor = "data"
@@ -373,27 +322,22 @@
-class UpgradeDatabaseOtherService(UpgradeDatabaseCoreService):
+class UpgradeDatabaseOtherStep(UpgradeDatabaseCoreStep):
"""
Do any other upgrade behaviors once all the schema, data, file migration upgraders
are done.
@ivar sqlStore: The store to operate on.
@type sqlStore: L{txdav.idav.IDataStore}
-
- @ivar wrappedService: Wrapped L{IService} that will be started after this
- L{UpgradeDatabaseOtherService}'s work is done
- @type wrappedService: L{IService} or C{NoneType}
"""
- def __init__(self, sqlStore, service, **kwargs):
+ def __init__(self, sqlStore, **kwargs):
"""
- Initialize the service.
+ Initialize the Step.
@param sqlStore: The store to operate on. Can be C{None} when doing unit tests.
- @param service: Wrapped service. Can be C{None} when doing unit tests.
"""
- super(UpgradeDatabaseOtherService, self).__init__(sqlStore, service, **kwargs)
+ super(UpgradeDatabaseOtherStep, self).__init__(sqlStore, **kwargs)
self.versionDescriptor = "other upgrades"
@@ -415,6 +359,4 @@
self.log_warn("Database %s check complete." % (self.versionDescriptor,))
- # see http://twistedmatrix.com/trac/ticket/4649
- if self.wrappedService is not None:
- reactor.callLater(0, self.wrappedService.setServiceParent, self.parent)
+ returnValue(None)
Modified: CalendarServer/trunk/txdav/common/datastore/upgrade/test/test_migrate.py
===================================================================
--- CalendarServer/trunk/txdav/common/datastore/upgrade/test/test_migrate.py 2013-04-27 02:33:05 UTC (rev 11108)
+++ CalendarServer/trunk/txdav/common/datastore/upgrade/test/test_migrate.py 2013-04-27 03:20:20 UTC (rev 11109)
@@ -23,7 +23,6 @@
from twext.python.filepath import CachingFilePath
from twext.web2.http_headers import MimeType
-from twisted.application.service import Service, MultiService
from twisted.internet.defer import inlineCallbacks, Deferred, returnValue
from twisted.internet.protocol import Protocol
from twisted.protocols.amp import AMP, Command, String
@@ -42,7 +41,7 @@
populateCalendarsFrom, StubNotifierFactory, resetCalendarMD5s, \
populateAddressBooksFrom, resetAddressBookMD5s, deriveValue, \
withSpecialValue
-from txdav.common.datastore.upgrade.migrate import UpgradeToDatabaseService, \
+from txdav.common.datastore.upgrade.migrate import UpgradeToDatabaseStep, \
StoreSpawnerService, swapAMP
import copy
@@ -127,18 +126,9 @@
class HomeMigrationTests(TestCase):
"""
- Tests for L{UpgradeToDatabaseService}.
+ Tests for L{UpgradeToDatabaseStep}.
"""
- def createUpgradeService(self):
- """
- Create an upgrade service.
- """
- return UpgradeToDatabaseService(
- self.fileStore, self.sqlStore, self.stubService
- )
-
-
@inlineCallbacks
def setUp(self):
"""
@@ -154,30 +144,7 @@
self.sqlStore = yield theStoreBuilder.buildStore(
self, StubNotifierFactory()
)
- subStarted = self.subStarted = Deferred()
- class StubService(Service, object):
- def startService(self):
- super(StubService, self).startService()
- if not subStarted.called:
- subStarted.callback(None)
- from twisted.python import log
- def justOnce(evt):
- if evt.get('isError') and not hasattr(subStarted, 'result'):
- subStarted.errback(
- evt.get('failure',
- RuntimeError("error starting up (see log)"))
- )
- log.addObserver(justOnce)
- def cleanObserver():
- try:
- log.removeObserver(justOnce)
- except ValueError:
- pass # x not in list, I don't care.
- self.addCleanup(cleanObserver)
- self.stubService = StubService()
- self.topService = MultiService()
- self.upgrader = self.createUpgradeService()
- self.upgrader.setServiceParent(self.topService)
+ self.upgrader = UpgradeToDatabaseStep(self.fileStore, self.sqlStore)
requirements = CommonTests.requirements
extras = deriveValue(self, "extraRequirements", lambda t: {})
@@ -246,10 +213,9 @@
don't have a UID and can't be stored properly in the database, so they
should not be migrated.
"""
- self.topService.startService()
+ yield self.upgrader.stepWithResult(None)
txn = self.sqlStore.newTransaction()
self.addCleanup(txn.commit)
- yield self.subStarted
self.assertIdentical(
None,
(yield (yield (yield
@@ -265,9 +231,7 @@
L{UpgradeToDatabaseService.startService} will do the upgrade, then
start its dependent service by adding it to its service hierarchy.
"""
- self.topService.startService()
- yield self.subStarted
- self.assertEquals(self.stubService.running, True)
+ yield self.upgrader.stepWithResult(None)
txn = self.sqlStore.newTransaction()
self.addCleanup(txn.commit)
for uid in CommonTests.requirements:
@@ -301,8 +265,7 @@
startTxn = self.sqlStore.newTransaction("populate empty sample")
yield startTxn.calendarHomeWithUID("home1", create=True)
yield startTxn.commit()
- self.topService.startService()
- yield self.subStarted
+ yield self.upgrader.stepWithResult(None)
vrfyTxn = self.sqlStore.newTransaction("verify sample still empty")
self.addCleanup(vrfyTxn.commit)
home = yield vrfyTxn.calendarHomeWithUID("home1")
@@ -358,8 +321,7 @@
transport.write(someAttachmentData)
yield transport.loseConnection()
yield maybeCommit()
- self.topService.startService()
- yield self.subStarted
+ yield self.upgrader.stepWithResult(None)
committed = []
txn = self.sqlStore.newTransaction()
outObject = yield getSampleObj()
@@ -383,9 +345,7 @@
L{UpgradeToDatabaseService.startService} will do the upgrade, then
start its dependent service by adding it to its service hierarchy.
"""
- self.topService.startService()
- yield self.subStarted
- self.assertEquals(self.stubService.running, True)
+ yield self.upgrader.stepWithResult(None)
txn = self.sqlStore.newTransaction()
self.addCleanup(txn.commit)
for uid in ABCommonTests.requirements:
@@ -407,20 +367,3 @@
):
object = (yield adbk.addressbookObjectWithName(name))
self.assertEquals(object.md5(), md5)
-
-
-
-class ParallelHomeMigrationTests(HomeMigrationTests):
- """
- Tests for home migrations running in parallel. Functionally this should be
- the same, so it's just a store created slightly differently.
- """
-
- def createUpgradeService(self):
- """
- Create an upgrade service.
- """
- return UpgradeToDatabaseService(
- self.fileStore, self.sqlStore, self.stubService,
- parallel=2, spawner=StubSpawner()
- )
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130426/ac10bc75/attachment-0001.html>
More information about the calendarserver-changes
mailing list