[CalendarServer-changes] [8581] CalendarServer/branches/users/glyph/parallel-upgrade_to_1

source_changes at macosforge.org source_changes at macosforge.org
Tue Jan 24 00:59:05 PST 2012


Revision: 8581
          http://trac.macosforge.org/projects/calendarserver/changeset/8581
Author:   glyph at apple.com
Date:     2012-01-24 00:59:05 -0800 (Tue, 24 Jan 2012)
Log Message:
-----------
Do the home-data-fixing parts of upgrade_to_1 in parallel.

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/parallel-upgrade_to_1/calendarserver/tap/caldav.py
    CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twistedcaldav/upgrade.py

Modified: CalendarServer/branches/users/glyph/parallel-upgrade_to_1/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-upgrade_to_1/calendarserver/tap/caldav.py	2012-01-24 08:58:32 UTC (rev 8580)
+++ CalendarServer/branches/users/glyph/parallel-upgrade_to_1/calendarserver/tap/caldav.py	2012-01-24 08:59:05 UTC (rev 8581)
@@ -918,17 +918,16 @@
                     parallel = config.MultiProcess.ProcessCount
                 else:
                     parallel = 0
+                spawner = ConfiguredChildSpawner(self, dispenser, config)
                 upgradeSvc = UpgradeFileSystemFormatService(
-                    config,
+                    config, spawner, parallel,
                     UpgradeDatabaseSchemaService.wrapService(
                         UpgradeDatabaseDataService.wrapService(
                             UpgradeToDatabaseService.wrapService(
                                 CachingFilePath(config.DocumentRoot),
                                 PostDBImportService(config, store, mainService),
                                 store, uid=overrideUID, gid=overrideGID,
-                                spawner=ConfiguredChildSpawner(
-                                    self, dispenser, config
-                                ),
+                                spawner=spawner,
                                 parallel=parallel
                             ),
                             store, uid=overrideUID, gid=overrideGID,

Modified: CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twistedcaldav/upgrade.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twistedcaldav/upgrade.py	2012-01-24 08:58:32 UTC (rev 8580)
+++ CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twistedcaldav/upgrade.py	2012-01-24 08:59:05 UTC (rev 8581)
@@ -17,7 +17,7 @@
 
 from __future__ import with_statement
 
-import xattr, os, zlib, hashlib, datetime, pwd, grp, shutil, errno
+import xattr, os, zlib, hashlib, datetime, pwd, grp, shutil, errno, operator
 from zlib import compress
 from cPickle import loads as unpickle, UnpicklingError
 
@@ -40,16 +40,21 @@
 
 from twisted.application.service import Service
 from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, succeed, returnValue
+from twisted.internet.defer import (
+    inlineCallbacks, succeed, returnValue, gatherResults
+)
 from twisted.python.reflect import namedAny
 from twisted.python.reflect import namedClass
 
 from txdav.caldav.datastore.index_file import db_basename
 
+from twisted.protocols.amp import AMP, Command, String, Boolean
+
 from calendarserver.tap.util import getRootResource, FakeRequest, directoryFromConfig
 from calendarserver.tools.resources import migrateResources
 from calendarserver.tools.util import getDirectory
 
+from twext.python.parallel import Parallelizer
 
 deadPropertyXattrPrefix = namedAny(
     "txdav.base.propertystore.xattr.PropertyStore.deadPropertyXattrPrefix"
@@ -81,102 +86,150 @@
 
     return uid, gid
 
-#
-# upgrade_to_1
-#
-# Upconverts data from any calendar server version prior to data format 1
-#
 
- at inlineCallbacks
-def upgrade_to_1(config, spawner, directory):
+def fixBadQuotes(data):
+    if (
+        data.find('\\"') != -1 or
+        data.find('\\\r\n "') != -1 or
+        data.find('\r\n \r\n "') != -1
+    ):
+        # Fix by continuously replacing \" with " until no more
+        # replacements occur
+        while True:
+            newData = data.replace('\\"', '"').replace('\\\r\n "', '\r\n "').replace('\r\n \r\n "', '\r\n "')
+            if newData == data:
+                break
+            else:
+                data = newData
 
-    errorOccurred = False
+        return data, True
+    else:
+        return data, False
 
-    def fixBadQuotes(data):
-        if (
-            data.find('\\"') != -1 or
-            data.find('\\\r\n "') != -1 or
-            data.find('\r\n \r\n "') != -1
-        ):
-            # Fix by continuously replacing \" with " until no more
-            # replacements occur
-            while True:
-                newData = data.replace('\\"', '"').replace('\\\r\n "', '\r\n "').replace('\r\n \r\n "', '\r\n "')
-                if newData == data:
-                    break
-                else:
-                    data = newData
 
-            return data, True
-        else:
-            return data, False
 
+def upgradeCalendarCollection(calPath, directory, cuaCache):
+    errorOccurred = False
+    collectionUpdated = False
 
+    for resource in os.listdir(calPath):
 
+        if resource.startswith("."):
+            continue
 
+        resPath = os.path.join(calPath, resource)
 
-    def upgradeCalendarCollection(calPath, directory, cuaCache):
+        if os.path.isdir(resPath):
+            # Skip directories
+            continue
 
-        errorOccurred = False
-        collectionUpdated = False
+        log.debug("Processing: %s" % (resPath,))
+        needsRewrite = False
+        with open(resPath) as res:
+            data = res.read()
 
-        for resource in os.listdir(calPath):
+            try:
+                data, fixed = fixBadQuotes(data)
+                if fixed:
+                    log.warn("Fixing bad quotes in %s" % (resPath,))
+                    needsRewrite = True
+            except Exception, e:
+                log.error("Error while fixing bad quotes in %s: %s" %
+                    (resPath, e))
+                errorOccurred = True
+                continue
 
-            if resource.startswith("."):
+            try:
+                data, fixed = removeIllegalCharacters(data)
+                if fixed:
+                    log.warn("Removing illegal characters in %s" % (resPath,))
+                    needsRewrite = True
+            except Exception, e:
+                log.error("Error while removing illegal characters in %s: %s" %
+                    (resPath, e))
+                errorOccurred = True
                 continue
 
-            resPath = os.path.join(calPath, resource)
-
-            if os.path.isdir(resPath):
-                # Skip directories
+            try:
+                data, fixed = normalizeCUAddrs(data, directory, cuaCache)
+                if fixed:
+                    log.debug("Normalized CUAddrs in %s" % (resPath,))
+                    needsRewrite = True
+            except Exception, e:
+                log.error("Error while normalizing %s: %s" %
+                    (resPath, e))
+                errorOccurred = True
                 continue
 
-            log.debug("Processing: %s" % (resPath,))
-            needsRewrite = False
-            with open(resPath) as res:
-                data = res.read()
+        if needsRewrite:
+            with open(resPath, "w") as res:
+                res.write(data)
 
-                try:
-                    data, fixed = fixBadQuotes(data)
-                    if fixed:
-                        log.warn("Fixing bad quotes in %s" % (resPath,))
-                        needsRewrite = True
-                except Exception, e:
-                    log.error("Error while fixing bad quotes in %s: %s" %
-                        (resPath, e))
-                    errorOccurred = True
-                    continue
+            md5value = "<?xml version='1.0' encoding='UTF-8'?>\r\n<getcontentmd5 xmlns='http://twistedmatrix.com/xml_namespace/dav/'>%s</getcontentmd5>\r\n" % (hashlib.md5(data).hexdigest(),)
+            md5value = zlib.compress(md5value)
+            try:
+                xattr.setxattr(resPath, xattrname("{http:%2F%2Ftwistedmatrix.com%2Fxml_namespace%2Fdav%2F}getcontentmd5"), md5value)
+            except IOError, ioe:
+                if ioe.errno == errno.EOPNOTSUPP:
+                    # On non-native xattr systems we cannot do this,
+                    # but those systems will typically not be migrating
+                    # from pre-v1
+                    pass
+            except:
+                raise
 
-                try:
-                    data, fixed = removeIllegalCharacters(data)
-                    if fixed:
-                        log.warn("Removing illegal characters in %s" % (resPath,))
-                        needsRewrite = True
-                except Exception, e:
-                    log.error("Error while removing illegal characters in %s: %s" %
-                        (resPath, e))
-                    errorOccurred = True
-                    continue
+            collectionUpdated = True
 
-                try:
-                    data, fixed = normalizeCUAddrs(data, directory, cuaCache)
-                    if fixed:
-                        log.debug("Normalized CUAddrs in %s" % (resPath,))
-                        needsRewrite = True
-                except Exception, e:
-                    log.error("Error while normalizing %s: %s" %
-                        (resPath, e))
-                    errorOccurred = True
-                    continue
 
-            if needsRewrite:
-                with open(resPath, "w") as res:
-                    res.write(data)
+    if collectionUpdated:
+        ctagValue = "<?xml version='1.0' encoding='UTF-8'?>\r\n<getctag xmlns='http://calendarserver.org/ns/'>%s</getctag>\r\n" % (str(datetime.datetime.now()),)
+        ctagValue = zlib.compress(ctagValue)
+        try:
+            xattr.setxattr(calPath, xattrname("{http:%2F%2Fcalendarserver.org%2Fns%2F}getctag"), ctagValue)
+        except IOError, ioe:
+            if ioe.errno == errno.EOPNOTSUPP:
+                # On non-native xattr systems we cannot do this,
+                # but those systems will typically not be migrating
+                # from pre-v1
+                pass
+        except:
+            raise
 
-                md5value = "<?xml version='1.0' encoding='UTF-8'?>\r\n<getcontentmd5 xmlns='http://twistedmatrix.com/xml_namespace/dav/'>%s</getcontentmd5>\r\n" % (hashlib.md5(data).hexdigest(),)
-                md5value = zlib.compress(md5value)
+    return errorOccurred
+
+
+
+def upgradeCalendarHome(homePath, directory, cuaCache):
+
+    errorOccurred = False
+
+    log.debug("Upgrading calendar home: %s" % (homePath,))
+
+    try:
+        for cal in os.listdir(homePath):
+            calPath = os.path.join(homePath, cal)
+            if not os.path.isdir(calPath):
+                # Skip non-directories; these might have been uploaded by a
+                # random DAV client, they can't be calendar collections.
+                continue
+            if cal == 'notifications':
+                # Delete the old, now obsolete, notifications directory.
+                rmdir(calPath)
+                continue
+            log.debug("Upgrading calendar: %s" % (calPath,))
+            if not upgradeCalendarCollection(calPath, directory, cuaCache):
+                errorOccurred = True
+
+            # Change the calendar-free-busy-set xattrs of the inbox to the
+            # __uids__/<guid> form
+            if cal == "inbox":
                 try:
-                    xattr.setxattr(resPath, xattrname("{http:%2F%2Ftwistedmatrix.com%2Fxml_namespace%2Fdav%2F}getcontentmd5"), md5value)
+                    for attr, value in xattr.xattr(calPath).iteritems():
+                        if attr == xattrname("{urn:ietf:params:xml:ns:caldav}calendar-free-busy-set"):
+                            value = updateFreeBusySet(value, directory)
+                            if value is not None:
+                                # Need to write the xattr back to disk
+                                xattr.setxattr(calPath, attr, value)
                 except IOError, ioe:
                     if ioe.errno == errno.EOPNOTSUPP:
                         # On non-native xattr systems we cannot do this,
@@ -185,75 +238,61 @@
                         pass
                 except:
                     raise
+    except Exception, e:
+        log.error("Failed to upgrade calendar home %s: %s" % (homePath, e))
+        raise
 
-                collectionUpdated = True
+    return errorOccurred
 
 
-        if collectionUpdated:
-            ctagValue = "<?xml version='1.0' encoding='UTF-8'?>\r\n<getctag xmlns='http://calendarserver.org/ns/'>%s</getctag>\r\n" % (str(datetime.datetime.now()),)
-            ctagValue = zlib.compress(ctagValue)
-            try:
-                xattr.setxattr(calPath, xattrname("{http:%2F%2Fcalendarserver.org%2Fns%2F}getctag"), ctagValue)
-            except IOError, ioe:
-                if ioe.errno == errno.EOPNOTSUPP:
-                    # On non-native xattr systems we cannot do this,
-                    # but those systems will typically not be migrating
-                    # from pre-v1
-                    pass
-            except:
-                raise
 
-        return errorOccurred
+class UpgradeOneHome(Command):
+    arguments = [('path', String())]
+    response = [('succeeded', Boolean())]
 
 
-    def upgradeCalendarHome(homePath, directory, cuaCache):
 
-        errorOccurred = False
+class To1Driver(AMP):
+    """
+    Upgrade driver which runs in the parent process.
+    """
 
-        log.debug("Upgrading calendar home: %s" % (homePath,))
+    def upgradeHomeInHelper(self, path):
+        return self.callRemote(UpgradeOneHome, path=path).addCallback(
+            operator.itemgetter("succeeded")
+        )
 
-        try:
-            for cal in os.listdir(homePath):
-                calPath = os.path.join(homePath, cal)
-                if not os.path.isdir(calPath):
-                    # Skip non-directories; these might have been uploaded by a
-                    # random DAV client, they can't be calendar collections.
-                    continue
-                if cal == 'notifications':
-                    # Delete the old, now obsolete, notifications directory.
-                    rmdir(calPath)
-                    continue
-                log.debug("Upgrading calendar: %s" % (calPath,))
-                if not upgradeCalendarCollection(calPath, directory, cuaCache):
-                    errorOccurred = True
 
-                # Change the calendar-free-busy-set xattrs of the inbox to the
-                # __uids__/<guid> form
-                if cal == "inbox":
-                    try:
-                        for attr, value in xattr.xattr(calPath).iteritems():
-                            if attr == xattrname("{urn:ietf:params:xml:ns:caldav}calendar-free-busy-set"):
-                                value = updateFreeBusySet(value, directory)
-                                if value is not None:
-                                    # Need to write the xattr back to disk
-                                    xattr.setxattr(calPath, attr, value)
-                    except IOError, ioe:
-                        if ioe.errno == errno.EOPNOTSUPP:
-                            # On non-native xattr systems we cannot do this,
-                            # but those systems will typically not be migrating
-                            # from pre-v1
-                            pass
-                    except:
-                        raise
 
+class To1Home(AMP):
+    """
+    Upgrade worker which runs in dedicated subprocesses.
+    """
 
-        except Exception, e:
-            log.error("Failed to upgrade calendar home %s: %s" % (homePath, e))
-            raise
+    def __init__(self, config):
+        super(To1Home, self).__init__()
+        self.directory = getDirectory(config)
+        self.cuaCache = {}
 
-        return errorOccurred
 
+    @UpgradeOneHome.responder
+    def upgradeOne(self, path):
+        result = upgradeCalendarHome(path, self.directory, self.cuaCache)
+        return dict(succeeded=result)
 
+
+
+ at inlineCallbacks
+def upgrade_to_1(config, spawner, parallel, directory):
+    """
+    Upconvert data from any calendar server version prior to data format 1.
+    """
+    errorOccurred = []
+    def setError(f=None):
+        if f is not None:
+            log.err(f)
+        errorOccurred.append(True)
+
     def doProxyDatabaseMoveUpgrade(config, uid=-1, gid=-1):
         # See if the new one is already present
         oldFilename = ".db.calendaruserproxy"
@@ -467,6 +506,12 @@
                 os.chown(inboxItemsFile, uid, gid)
 
             if total:
+                if parallel:
+                    spawner.startService()
+                    parallelizer = Parallelizer((yield gatherResults(
+                        [spawner.spawnWithConfig(config, To1Driver(), To1Home)
+                         for x in xrange(parallel)]
+                    )))
                 log.warn("Processing %d calendar homes in %s" % (total, uidHomes))
 
                 # Upgrade calendar homes in the new location:
@@ -484,15 +529,30 @@
                                         # Skip non-directories
                                         continue
 
-                                    if not upgradeCalendarHome(homePath,
-                                        directory, cuaCache):
-                                        errorOccurred = True
+                                    if parallel:
+                                        def doIt(driver, hp=homePath):
+                                            d = driver.upgradeHomeInHelper(hp)
+                                            def itWorked(succeeded):
+                                                if not succeeded:
+                                                    setError()
+                                                return succeeded
+                                            d.addCallback(itWorked)
+                                            d.addErrback(setError)
+                                            return d
+                                        yield parallelizer.do(doIt)
+                                    else:
+                                        if not upgradeCalendarHome(
+                                            homePath, directory, cuaCache
+                                        ):
+                                            setError()
 
                                     count += 1
                                     if count % 10 == 0:
                                         log.warn("Processed calendar home %d of %d"
                                             % (count, total))
-
+                if parallel:
+                    yield parallelizer.done()
+                    yield spawner.stopService()
                 log.warn("Done processing calendar homes")
 
     yield migrateResourceInfo(config, directory, uid, gid)
@@ -522,7 +582,7 @@
     cal = Component.fromString(data)
 
     def lookupFunction(cuaddr):
-
+        from twisted.python.failure import Failure
         # Return cached results, if any.
         if cuaCache.has_key(cuaddr):
             return cuaCache[cuaddr]
@@ -559,7 +619,7 @@
 
 
 @inlineCallbacks
-def upgrade_to_2(config, spawner, directory):
+def upgrade_to_2(config, spawner, parallel, directory):
     
     errorOccurred = False
 
@@ -675,7 +735,7 @@
 ]
 
 @inlineCallbacks
-def upgradeData(config, spawner=None):
+def upgradeData(config, spawner=None, parallel=0):
 
     directory = getDirectory()
 
@@ -707,7 +767,7 @@
     for version, method in upgradeMethods:
         if onDiskVersion < version:
             log.warn("Upgrading to version %d" % (version,))
-            (yield method(config, spawner, directory))
+            (yield method(config, spawner, parallel, directory))
             log.warn("Upgraded to version %d" % (version,))
             with open(versionFilePath, "w") as verFile:
                 verFile.write(str(version))
@@ -899,12 +959,14 @@
     Upgrade filesystem from previous versions.
     """
 
-    def __init__(self, config, service):
+    def __init__(self, spawner, parallel, config, service):
         """
         Initialize the service.
         """
         self.wrappedService = service
         self.config = config
+        self.spawner = spawner
+        self.parallel = parallel
 
 
     @inlineCallbacks
@@ -921,7 +983,7 @@
         memcacheEnabled = self.config.Memcached.Pools.Default.ClientEnabled
         self.config.Memcached.Pools.Default.ClientEnabled = False
 
-        yield upgradeData(self.config)
+        yield upgradeData(self.config, self.spawner, self.parallel)
 
         # Restore memcached client setting
         self.config.Memcached.Pools.Default.ClientEnabled = memcacheEnabled
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120124/d682b7a8/attachment-0001.html>


More information about the calendarserver-changes mailing list