[CalendarServer-changes] [8378] CalendarServer/branches/users/glyph/parallel-upgrade/txdav/common/ datastore/upgrade/migrate.py
source_changes at macosforge.org
source_changes at macosforge.org
Sat Dec 3 00:36:57 PST 2011
Revision: 8378
http://trac.macosforge.org/projects/calendarserver/changeset/8378
Author: glyph at apple.com
Date: 2011-12-03 00:36:57 -0800 (Sat, 03 Dec 2011)
Log Message:
-----------
Upgrade process, modulo propagation of configuration
Modified Paths:
--------------
CalendarServer/branches/users/glyph/parallel-upgrade/txdav/common/datastore/upgrade/migrate.py
Modified: CalendarServer/branches/users/glyph/parallel-upgrade/txdav/common/datastore/upgrade/migrate.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-upgrade/txdav/common/datastore/upgrade/migrate.py 2011-12-03 08:36:40 UTC (rev 8377)
+++ CalendarServer/branches/users/glyph/parallel-upgrade/txdav/common/datastore/upgrade/migrate.py 2011-12-03 08:36:57 UTC (rev 8378)
@@ -24,18 +24,155 @@
import xattr
from twext.python.log import LoggingMixIn
+
from twisted.application.service import Service
from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import maybeDeferred, DeferredList
from twisted.python.runtime import platform
+from twisted.python import log
+from twext.python.filepath import CachingFilePath
+from twisted.protocols.amp import AMP, Command, String
+
+from twext.internet.spawnsvc import SpawnerService
+
from txdav.caldav.datastore.util import migrateHome as migrateCalendarHome
from txdav.carddav.datastore.util import migrateHome as migrateAddressbookHome
from txdav.common.datastore.file import CommonDataStore as FileStore, TOPPATHS
from txdav.base.propertystore.xattr import PropertyStore as XattrPropertyStore
-from txdav.base.propertystore.appledouble_xattr import (
- PropertyStore as AppleDoubleStore)
+from txdav.base.propertystore.appledouble_xattr import (PropertyStore
+ as AppleDoubleStore)
+
+
+homeTypeLookup = {
+ "calendar": (migrateCalendarHome,
+ lambda txn: txn.calendarHomeWithUID),
+ "addressbook": (migrateAddressbookHome,
+ lambda txn: txn.addressbookHomeWithUID)
+}
+
+
+
+class Configure(Command):
+ """
+ Configure the upgrade helper process.
+ """
+
+ arguments = [("filename", String())]
+
+
+
+class OneUpgrade(Command):
+ """
+ Upgrade a single calendar home.
+ """
+
+ arguments = [("uid", String()),
+ ("homeType", String())]
+
+
+
+class LogIt(Command):
+ """
+ Log a message.
+ """
+ arguments = [("message", String())]
+
+
+
+class UpgradeDriver(AMP):
+ """
+ Helper protocol which runs in the master process doing the upgrade.
+ """
+
+ def __init__(self, upgradeService):
+ super(UpgradeDriver, self).__init__()
+ self.service = upgradeService
+
+
+ def configure(self):
+ from twistedcaldav.config import config
+ return self.callRemote(Configure,
+ filename=config._provider.getConfigFileName()
+ or "")
+
+
+ def oneUpgrade(self, uid, homeType):
+ return self.callRemote(OneUpgrade, uid=uid, homeType=homeType)
+
+
+ @LogIt.responder
+ def logIt(self, message):
+ """
+ Log a message from the subprocess.
+ """
+ log.msg("Subprocess: " + message)
+ return {}
+
+
+
+def logFailures(thunk):
+ """
+ Decorator which logs
+ """
+ def real(self, *a, **kw):
+ d = maybeDeferred(thunk, self, *a, **kw)
+ def logit(failure):
+ self.callRemote(LogIt, message=failure.getTraceback())
+ return failure
+ d.addErrback(logit)
+ return d
+ return real
+
+
+class UpgradeHelperProcess(AMP):
+ """
+ Helper protocol which runs in a subprocess to upgrade.
+ """
+
+ @Configure.responder
+ @logFailures
+ def configure(self, filename):
+ from twistedcaldav.config import config
+ from calendarserver.tap.util import getDBPool, storeFromConfig
+ config.load(filename)
+ subsvc = object()
+ pool, txnf = getDBPool(config)
+ if pool is not None:
+ pool.startService()
+ reactor.addSystemEventTrigger("before", "shutdown",
+ pool.stopService)
+ # XXX: SharedConnectionPool needs to be relayed out of band, as
+ # calendarserver.tap.caldav does with its own thing.
+ dbstore = storeFromConfig(config, txnf)
+ dbstore.setMigrating(True)
+ self.upgrader = UpgradeToDatabaseService.wrapService(
+ CachingFilePath(config.DocumentRoot), subsvc, dbstore
+ )
+ return {}
+
+
+ @OneUpgrade.responder
+ @logFailures
+ def oneUpgrade(self, uid, homeType):
+ """
+ Upgrade one calendar home.
+ """
+ migrateFunc, destFunc = homeTypeLookup[homeType]
+ fileTxn = self.upgrader.fileStore.newTransaction()
+ return (
+ maybeDeferred(destFunc(fileTxn), uid)
+ .addCallback(
+ lambda fileHome:
+ self.upgrader.migrateOneHome(fileTxn, homeType, fileHome)
+ )
+ .addCallback(lambda ignored: {})
+ )
+
+
+
class UpgradeToDatabaseService(Service, LoggingMixIn, object):
"""
Upgrade resources from a filesystem store to a database store.
@@ -108,7 +245,8 @@
return service
- def __init__(self, fileStore, sqlStore, service, uid=None, gid=None):
+ def __init__(self, fileStore, sqlStore, service, uid=None, gid=None,
+ parallel=5):
"""
Initialize the service.
"""
@@ -117,9 +255,38 @@
self.sqlStore = sqlStore
self.uid = uid
self.gid = gid
+ self.parallel = parallel
@inlineCallbacks
+ def migrateOneHome(self, fileTxn, homeType, fileHome):
+ """
+ Migrate an individual calendar or addressbook home.
+ """
+ migrateFunc, destFunc = homeTypeLookup.get(homeType)
+ uid = fileHome.uid()
+ self.log_warn("Migrating %s UID %r" % (homeType, uid))
+ sqlTxn = self.sqlStore.newTransaction()
+ homeGetter = destFunc(sqlTxn)
+ if (yield homeGetter(uid, create=False)) is not None:
+ self.log_warn(
+ "%s home %r already existed not migrating" % (
+ homeType, uid))
+ yield sqlTxn.abort()
+ yield fileTxn.commit()
+ returnValue(None)
+ sqlHome = yield homeGetter(uid, create=True)
+ yield migrateFunc(fileHome, sqlHome)
+ yield fileTxn.commit()
+ yield sqlTxn.commit()
+ # Remove file home after migration. FIXME: instead, this should be a
+ # public remove...HomeWithUID() API for de-provisioning. (If we had
+ # this, this would simply be a store-to-store migrator rather than a
+ # filesystem-to-database upgrade.)
+ fileHome._path.remove()
+
+
+ @inlineCallbacks
def doMigration(self):
"""
Do the migration. Called by C{startService}, but a different method
@@ -127,48 +294,60 @@
@return: a Deferred which fires when the migration is complete.
"""
- self.log_warn("Beginning filesystem -> database upgrade.")
-
self.sqlStore.setMigrating(True)
+ parallel = self.parallel
+ if parallel:
+ self.log_warn("Starting upgrade helper processes.")
+ spawner = SpawnerService()
+ spawner.startService()
+ drivers = []
+ for value in xrange(parallel):
+ driver = spawner.spawn(UpgradeDriver(self),
+ UpgradeHelperProcess)
+ drivers.append(driver)
- for homeType, migrateFunc, eachFunc, destFunc, _ignore_topPathName in [
- ("calendar", migrateCalendarHome,
- self.fileStore.eachCalendarHome,
- lambda txn: txn.calendarHomeWithUID,
- "calendars"),
- ("addressbook", migrateAddressbookHome,
- self.fileStore.eachAddressbookHome,
- lambda txn: txn.addressbookHomeWithUID,
- "addressbooks")
+ # Wait for all subprocesses to be fully configured before
+ # continuing, but let them configure in any order.
+ self.log_warn("Configuring upgrade helper processes.")
+ yield DeferredList([driver.configure() for driver in drivers])
+ self.log_warn("Upgrade helpers ready.")
+
+ self.log_warn("Beginning filesystem -> database upgrade.")
+ inParallel = []
+ for homeType, eachFunc in [
+ ("calendar", self.fileStore.eachCalendarHome),
+ ("addressbook", self.fileStore.eachAddressbookHome),
]:
for fileTxn, fileHome in eachFunc():
uid = fileHome.uid()
self.log_warn("Migrating %s UID %r" % (homeType, uid))
- sqlTxn = self.sqlStore.newTransaction()
- homeGetter = destFunc(sqlTxn)
- if (yield homeGetter(uid, create=False)) is not None:
- self.log_warn(
- "%s home %r already existed not migrating" % (
- homeType, uid))
- yield sqlTxn.abort()
+ if parallel:
+ # No-op transaction here: make sure everything's unlocked
+ # before asking the subprocess to handle it.
yield fileTxn.commit()
- continue
- sqlHome = yield homeGetter(uid, create=True)
- if sqlHome is None:
- raise RuntimeError("THIS SHOULD NOT BE POSSIBLE.")
- yield migrateFunc(fileHome, sqlHome)
- yield fileTxn.commit()
- yield sqlTxn.commit()
- # FIXME: need a public remove...HomeWithUID() for de-
- # provisioning
-
- # Remove file home after migration
- fileHome._path.remove()
+ if not drivers:
+ # All the subprocesses are currently busy processing an
+ # upgrade. Wait for one to become available.
+ yield DeferredList(inParallel, fireOnOneCallback=True,
+ fireOnOneErrback=True)
+ busy = drivers.pop(0)
+ d = busy.oneUpgrade(fileHome.uid(), homeType)
+ inParallel.append(d)
+ def freeUp(result, d=d, busy=busy):
+ inParallel.remove(d)
+ drivers.append(busy)
+ return result
+ d.addBoth(freeUp)
+ else:
+ yield self.migrateOneHome(fileTxn, homeType, fileHome)
for homeType in TOPPATHS:
homesPath = self.fileStore._path.child(homeType)
if homesPath.isdir():
homesPath.remove()
+ if inParallel:
+ yield DeferredList(inParallel)
+
# Set attachment directory ownership. FIXME: is this still necessary
# since attachments started living outside the database directory
# created by initdb? default permissions might be correct now.
@@ -180,11 +359,15 @@
for fp in sqlAttachmentsPath.walk():
os.chown(fp.path, uid, gid)
- self.sqlStore.setMigrating(False)
+ self.sqlStore.setMigrating(False)
+
+ if parallel:
+ self.log_warn("Stopping upgrade helper processes.")
+ yield spawner.stopService()
+ self.log_warn("Upgrade helpers all stopped.")
self.log_warn(
"Filesystem upgrade complete, launching database service."
)
-
# see http://twistedmatrix.com/trac/ticket/4649
reactor.callLater(0, self.wrappedService.setServiceParent, self.parent)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111203/c6485e39/attachment-0001.html>
More information about the calendarserver-changes
mailing list