[CalendarServer-changes] [8378] CalendarServer/branches/users/glyph/parallel-upgrade/txdav/common/ datastore/upgrade/migrate.py

source_changes at macosforge.org source_changes at macosforge.org
Sat Dec 3 00:36:57 PST 2011


Revision: 8378
          http://trac.macosforge.org/projects/calendarserver/changeset/8378
Author:   glyph at apple.com
Date:     2011-12-03 00:36:57 -0800 (Sat, 03 Dec 2011)
Log Message:
-----------
Upgrade process, modulo propagation of configuration

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/parallel-upgrade/txdav/common/datastore/upgrade/migrate.py

Modified: CalendarServer/branches/users/glyph/parallel-upgrade/txdav/common/datastore/upgrade/migrate.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-upgrade/txdav/common/datastore/upgrade/migrate.py	2011-12-03 08:36:40 UTC (rev 8377)
+++ CalendarServer/branches/users/glyph/parallel-upgrade/txdav/common/datastore/upgrade/migrate.py	2011-12-03 08:36:57 UTC (rev 8378)
@@ -24,18 +24,155 @@
 import xattr
 
 from twext.python.log import LoggingMixIn
+
 from twisted.application.service import Service
 from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import maybeDeferred, DeferredList
 from twisted.python.runtime import platform
+from twisted.python import log
+from twext.python.filepath import CachingFilePath
 
+from twisted.protocols.amp import AMP, Command, String
+
+from twext.internet.spawnsvc import SpawnerService
+
 from txdav.caldav.datastore.util import migrateHome as migrateCalendarHome
 from txdav.carddav.datastore.util import migrateHome as migrateAddressbookHome
 from txdav.common.datastore.file import CommonDataStore as FileStore, TOPPATHS
 from txdav.base.propertystore.xattr import PropertyStore as XattrPropertyStore
-from txdav.base.propertystore.appledouble_xattr import (
-    PropertyStore as AppleDoubleStore)
 
+from txdav.base.propertystore.appledouble_xattr import (PropertyStore
+                                                        as AppleDoubleStore)
+
+
+homeTypeLookup = {
+    "calendar": (migrateCalendarHome,
+                 lambda txn: txn.calendarHomeWithUID),
+    "addressbook": (migrateAddressbookHome,
+                    lambda txn: txn.addressbookHomeWithUID)
+}
+
+
+
+class Configure(Command):
+    """
+    Configure the upgrade helper process.
+    """
+
+    arguments = [("filename", String())]
+
+
+    
+class OneUpgrade(Command):
+    """
+    Upgrade a single calendar home.
+    """
+
+    arguments = [("uid", String()),
+                 ("homeType", String())]
+
+
+
+class LogIt(Command):
+    """
+    Log a message.
+    """
+    arguments = [("message", String())]
+
+
+
+class UpgradeDriver(AMP):
+    """
+    Helper protocol which runs in the master process doing the upgrade.
+    """
+
+    def __init__(self, upgradeService):
+        super(UpgradeDriver, self).__init__()
+        self.service = upgradeService
+
+
+    def configure(self):
+        from twistedcaldav.config import config
+        return self.callRemote(Configure,
+                               filename=config._provider.getConfigFileName()
+                               or "")
+
+
+    def oneUpgrade(self, uid, homeType):
+        return self.callRemote(OneUpgrade, uid=uid, homeType=homeType)
+
+
+    @LogIt.responder
+    def logIt(self, message):
+        """
+        Log a message from the subprocess.
+        """
+        log.msg("Subprocess: " + message)
+        return {}
+
+
+
+def logFailures(thunk):
+    """
+    Decorator which logs 
+    """
+    def real(self, *a, **kw):
+        d = maybeDeferred(thunk, self, *a, **kw)
+        def logit(failure):
+            self.callRemote(LogIt, message=failure.getTraceback())
+            return failure
+        d.addErrback(logit)
+        return d
+    return real
+
+
+class UpgradeHelperProcess(AMP):
+    """
+    Helper protocol which runs in a subprocess to upgrade.
+    """
+
+    @Configure.responder
+    @logFailures
+    def configure(self, filename):
+        from twistedcaldav.config import config
+        from calendarserver.tap.util import getDBPool, storeFromConfig
+        config.load(filename)
+        subsvc = object()
+        pool, txnf = getDBPool(config)
+        if pool is not None:
+            pool.startService()
+            reactor.addSystemEventTrigger("before", "shutdown",
+                                          pool.stopService)
+        # XXX: SharedConnectionPool needs to be relayed out of band, as
+        # calendarserver.tap.caldav does with its own thing.
+        dbstore = storeFromConfig(config, txnf)
+        dbstore.setMigrating(True)
+        self.upgrader = UpgradeToDatabaseService.wrapService(
+            CachingFilePath(config.DocumentRoot), subsvc, dbstore
+        )
+        return {}
+
+
+    @OneUpgrade.responder
+    @logFailures
+    def oneUpgrade(self, uid, homeType):
+        """
+        Upgrade one calendar home.
+        """
+        migrateFunc, destFunc = homeTypeLookup[homeType]
+        fileTxn = self.upgrader.fileStore.newTransaction()
+        return (
+            maybeDeferred(destFunc(fileTxn), uid)
+            .addCallback(
+                lambda fileHome:
+                self.upgrader.migrateOneHome(fileTxn, homeType, fileHome)
+            )
+            .addCallback(lambda ignored: {})
+        )
+
+
+
 class UpgradeToDatabaseService(Service, LoggingMixIn, object):
     """
     Upgrade resources from a filesystem store to a database store.
@@ -108,7 +245,8 @@
         return service
 
 
-    def __init__(self, fileStore, sqlStore, service, uid=None, gid=None):
+    def __init__(self, fileStore, sqlStore, service, uid=None, gid=None,
+                 parallel=5):
         """
         Initialize the service.
         """
@@ -117,9 +255,38 @@
         self.sqlStore = sqlStore
         self.uid = uid
         self.gid = gid
+        self.parallel = parallel
 
 
     @inlineCallbacks
+    def migrateOneHome(self, fileTxn, homeType, fileHome):
+        """
+        Migrate an individual calendar or addressbook home.
+        """
+        migrateFunc, destFunc = homeTypeLookup.get(homeType)
+        uid = fileHome.uid()
+        self.log_warn("Migrating %s UID %r" % (homeType, uid))
+        sqlTxn = self.sqlStore.newTransaction()
+        homeGetter = destFunc(sqlTxn)
+        if (yield homeGetter(uid, create=False)) is not None:
+            self.log_warn(
+                "%s home %r already existed not migrating" % (
+                    homeType, uid))
+            yield sqlTxn.abort()
+            yield fileTxn.commit()
+            returnValue(None)
+        sqlHome = yield homeGetter(uid, create=True)
+        yield migrateFunc(fileHome, sqlHome)
+        yield fileTxn.commit()
+        yield sqlTxn.commit()
+        # Remove file home after migration. FIXME: instead, this should be a
+        # public remove...HomeWithUID() API for de-provisioning.  (If we had
+        # this, this would simply be a store-to-store migrator rather than a
+        # filesystem-to-database upgrade.)
+        fileHome._path.remove()
+
+
+    @inlineCallbacks
     def doMigration(self):
         """
         Do the migration.  Called by C{startService}, but a different method
@@ -127,48 +294,60 @@
 
         @return: a Deferred which fires when the migration is complete.
         """
-        self.log_warn("Beginning filesystem -> database upgrade.")
-
         self.sqlStore.setMigrating(True)
+        parallel = self.parallel
+        if parallel:
+            self.log_warn("Starting upgrade helper processes.")
+            spawner = SpawnerService()
+            spawner.startService()
+            drivers = []
+            for value in xrange(parallel):
+                driver = spawner.spawn(UpgradeDriver(self),
+                                       UpgradeHelperProcess)
+                drivers.append(driver)
 
-        for homeType, migrateFunc, eachFunc, destFunc, _ignore_topPathName in [
-            ("calendar", migrateCalendarHome,
-                self.fileStore.eachCalendarHome,
-                lambda txn: txn.calendarHomeWithUID,
-                "calendars"),
-            ("addressbook", migrateAddressbookHome,
-                self.fileStore.eachAddressbookHome,
-                lambda txn: txn.addressbookHomeWithUID,
-                "addressbooks")
+            # Wait for all subprocesses to be fully configured before
+            # continuing, but let them configure in any order.
+            self.log_warn("Configuring upgrade helper processes.")
+            yield DeferredList([driver.configure() for driver in drivers])
+            self.log_warn("Upgrade helpers ready.")
+
+        self.log_warn("Beginning filesystem -> database upgrade.")
+        inParallel = []
+        for homeType, eachFunc in [
+                ("calendar", self.fileStore.eachCalendarHome),
+                ("addressbook", self.fileStore.eachAddressbookHome),
             ]:
             for fileTxn, fileHome in eachFunc():
                 uid = fileHome.uid()
                 self.log_warn("Migrating %s UID %r" % (homeType, uid))
-                sqlTxn = self.sqlStore.newTransaction()
-                homeGetter = destFunc(sqlTxn)
-                if (yield homeGetter(uid, create=False)) is not None:
-                    self.log_warn(
-                        "%s home %r already existed not migrating" % (
-                            homeType, uid))
-                    yield sqlTxn.abort()
+                if parallel:
+                    # No-op transaction here: make sure everything's unlocked
+                    # before asking the subprocess to handle it.
                     yield fileTxn.commit()
-                    continue
-                sqlHome = yield homeGetter(uid, create=True)
-                if sqlHome is None:
-                    raise RuntimeError("THIS SHOULD NOT BE POSSIBLE.")
-                yield migrateFunc(fileHome, sqlHome)
-                yield fileTxn.commit()
-                yield sqlTxn.commit()
-                # FIXME: need a public remove...HomeWithUID() for de-
-                # provisioning
-
-                # Remove file home after migration
-                fileHome._path.remove()
+                    if not drivers:
+                        # All the subprocesses are currently busy processing an
+                        # upgrade.  Wait for one to become available.
+                        yield DeferredList(inParallel, fireOnOneCallback=True,
+                                           fireOnOneErrback=True)
+                    busy = drivers.pop(0)
+                    d = busy.oneUpgrade(fileHome.uid(), homeType)
+                    inParallel.append(d)
+                    def freeUp(result, d=d, busy=busy):
+                        inParallel.remove(d)
+                        drivers.append(busy)
+                        return result
+                    d.addBoth(freeUp)
+                else:
+                    yield self.migrateOneHome(fileTxn, homeType, fileHome)
         for homeType in TOPPATHS:
             homesPath = self.fileStore._path.child(homeType)
             if homesPath.isdir():
                 homesPath.remove()
 
+        if inParallel:
+            yield DeferredList(inParallel)
+        
         # Set attachment directory ownership.  FIXME: is this still necessary
         # since attachments started living outside the database directory
         # created by initdb?  default permissions might be correct now.
@@ -180,11 +359,15 @@
             for fp in sqlAttachmentsPath.walk():
                 os.chown(fp.path, uid, gid)
 
-        self.sqlStore.setMigrating(False) 
+        self.sqlStore.setMigrating(False)
+
+        if parallel:
+            self.log_warn("Stopping upgrade helper processes.")
+            yield spawner.stopService()
+            self.log_warn("Upgrade helpers all stopped.")
         self.log_warn(
             "Filesystem upgrade complete, launching database service."
         )
-
         # see http://twistedmatrix.com/trac/ticket/4649
         reactor.callLater(0, self.wrappedService.setServiceParent, self.parent)
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111203/c6485e39/attachment-0001.html>


More information about the calendarserver-changes mailing list