[CalendarServer-changes] [8581] CalendarServer/branches/users/glyph/parallel-upgrade_to_1
source_changes at macosforge.org
source_changes at macosforge.org
Tue Jan 24 00:59:05 PST 2012
Revision: 8581
http://trac.macosforge.org/projects/calendarserver/changeset/8581
Author: glyph at apple.com
Date: 2012-01-24 00:59:05 -0800 (Tue, 24 Jan 2012)
Log Message:
-----------
Do the home-data-fixing parts of upgrade_to_1 in parallel.
Modified Paths:
--------------
CalendarServer/branches/users/glyph/parallel-upgrade_to_1/calendarserver/tap/caldav.py
CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twistedcaldav/upgrade.py
Modified: CalendarServer/branches/users/glyph/parallel-upgrade_to_1/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-upgrade_to_1/calendarserver/tap/caldav.py 2012-01-24 08:58:32 UTC (rev 8580)
+++ CalendarServer/branches/users/glyph/parallel-upgrade_to_1/calendarserver/tap/caldav.py 2012-01-24 08:59:05 UTC (rev 8581)
@@ -918,17 +918,16 @@
parallel = config.MultiProcess.ProcessCount
else:
parallel = 0
+ spawner = ConfiguredChildSpawner(self, dispenser, config)
upgradeSvc = UpgradeFileSystemFormatService(
- config,
+ config, spawner, parallel,
UpgradeDatabaseSchemaService.wrapService(
UpgradeDatabaseDataService.wrapService(
UpgradeToDatabaseService.wrapService(
CachingFilePath(config.DocumentRoot),
PostDBImportService(config, store, mainService),
store, uid=overrideUID, gid=overrideGID,
- spawner=ConfiguredChildSpawner(
- self, dispenser, config
- ),
+ spawner=spawner,
parallel=parallel
),
store, uid=overrideUID, gid=overrideGID,
Modified: CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twistedcaldav/upgrade.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twistedcaldav/upgrade.py 2012-01-24 08:58:32 UTC (rev 8580)
+++ CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twistedcaldav/upgrade.py 2012-01-24 08:59:05 UTC (rev 8581)
@@ -17,7 +17,7 @@
from __future__ import with_statement
-import xattr, os, zlib, hashlib, datetime, pwd, grp, shutil, errno
+import xattr, os, zlib, hashlib, datetime, pwd, grp, shutil, errno, operator
from zlib import compress
from cPickle import loads as unpickle, UnpicklingError
@@ -40,16 +40,21 @@
from twisted.application.service import Service
from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, succeed, returnValue
+from twisted.internet.defer import (
+ inlineCallbacks, succeed, returnValue, gatherResults
+)
from twisted.python.reflect import namedAny
from twisted.python.reflect import namedClass
from txdav.caldav.datastore.index_file import db_basename
+from twisted.protocols.amp import AMP, Command, String, Boolean
+
from calendarserver.tap.util import getRootResource, FakeRequest, directoryFromConfig
from calendarserver.tools.resources import migrateResources
from calendarserver.tools.util import getDirectory
+from twext.python.parallel import Parallelizer
deadPropertyXattrPrefix = namedAny(
"txdav.base.propertystore.xattr.PropertyStore.deadPropertyXattrPrefix"
@@ -81,102 +86,150 @@
return uid, gid
-#
-# upgrade_to_1
-#
-# Upconverts data from any calendar server version prior to data format 1
-#
- at inlineCallbacks
-def upgrade_to_1(config, spawner, directory):
+def fixBadQuotes(data):
+ if (
+ data.find('\\"') != -1 or
+ data.find('\\\r\n "') != -1 or
+ data.find('\r\n \r\n "') != -1
+ ):
+ # Fix by continuously replacing \" with " until no more
+ # replacements occur
+ while True:
+ newData = data.replace('\\"', '"').replace('\\\r\n "', '\r\n "').replace('\r\n \r\n "', '\r\n "')
+ if newData == data:
+ break
+ else:
+ data = newData
- errorOccurred = False
+ return data, True
+ else:
+ return data, False
- def fixBadQuotes(data):
- if (
- data.find('\\"') != -1 or
- data.find('\\\r\n "') != -1 or
- data.find('\r\n \r\n "') != -1
- ):
- # Fix by continuously replacing \" with " until no more
- # replacements occur
- while True:
- newData = data.replace('\\"', '"').replace('\\\r\n "', '\r\n "').replace('\r\n \r\n "', '\r\n "')
- if newData == data:
- break
- else:
- data = newData
- return data, True
- else:
- return data, False
+def upgradeCalendarCollection(calPath, directory, cuaCache):
+ errorOccurred = False
+ collectionUpdated = False
+ for resource in os.listdir(calPath):
+ if resource.startswith("."):
+ continue
+ resPath = os.path.join(calPath, resource)
- def upgradeCalendarCollection(calPath, directory, cuaCache):
+ if os.path.isdir(resPath):
+ # Skip directories
+ continue
- errorOccurred = False
- collectionUpdated = False
+ log.debug("Processing: %s" % (resPath,))
+ needsRewrite = False
+ with open(resPath) as res:
+ data = res.read()
- for resource in os.listdir(calPath):
+ try:
+ data, fixed = fixBadQuotes(data)
+ if fixed:
+ log.warn("Fixing bad quotes in %s" % (resPath,))
+ needsRewrite = True
+ except Exception, e:
+ log.error("Error while fixing bad quotes in %s: %s" %
+ (resPath, e))
+ errorOccurred = True
+ continue
- if resource.startswith("."):
+ try:
+ data, fixed = removeIllegalCharacters(data)
+ if fixed:
+ log.warn("Removing illegal characters in %s" % (resPath,))
+ needsRewrite = True
+ except Exception, e:
+ log.error("Error while removing illegal characters in %s: %s" %
+ (resPath, e))
+ errorOccurred = True
continue
- resPath = os.path.join(calPath, resource)
-
- if os.path.isdir(resPath):
- # Skip directories
+ try:
+ data, fixed = normalizeCUAddrs(data, directory, cuaCache)
+ if fixed:
+ log.debug("Normalized CUAddrs in %s" % (resPath,))
+ needsRewrite = True
+ except Exception, e:
+ log.error("Error while normalizing %s: %s" %
+ (resPath, e))
+ errorOccurred = True
continue
- log.debug("Processing: %s" % (resPath,))
- needsRewrite = False
- with open(resPath) as res:
- data = res.read()
+ if needsRewrite:
+ with open(resPath, "w") as res:
+ res.write(data)
- try:
- data, fixed = fixBadQuotes(data)
- if fixed:
- log.warn("Fixing bad quotes in %s" % (resPath,))
- needsRewrite = True
- except Exception, e:
- log.error("Error while fixing bad quotes in %s: %s" %
- (resPath, e))
- errorOccurred = True
- continue
+ md5value = "<?xml version='1.0' encoding='UTF-8'?>\r\n<getcontentmd5 xmlns='http://twistedmatrix.com/xml_namespace/dav/'>%s</getcontentmd5>\r\n" % (hashlib.md5(data).hexdigest(),)
+ md5value = zlib.compress(md5value)
+ try:
+ xattr.setxattr(resPath, xattrname("{http:%2F%2Ftwistedmatrix.com%2Fxml_namespace%2Fdav%2F}getcontentmd5"), md5value)
+ except IOError, ioe:
+ if ioe.errno == errno.EOPNOTSUPP:
+ # On non-native xattr systems we cannot do this,
+ # but those systems will typically not be migrating
+ # from pre-v1
+ pass
+ except:
+ raise
- try:
- data, fixed = removeIllegalCharacters(data)
- if fixed:
- log.warn("Removing illegal characters in %s" % (resPath,))
- needsRewrite = True
- except Exception, e:
- log.error("Error while removing illegal characters in %s: %s" %
- (resPath, e))
- errorOccurred = True
- continue
+ collectionUpdated = True
- try:
- data, fixed = normalizeCUAddrs(data, directory, cuaCache)
- if fixed:
- log.debug("Normalized CUAddrs in %s" % (resPath,))
- needsRewrite = True
- except Exception, e:
- log.error("Error while normalizing %s: %s" %
- (resPath, e))
- errorOccurred = True
- continue
- if needsRewrite:
- with open(resPath, "w") as res:
- res.write(data)
+ if collectionUpdated:
+ ctagValue = "<?xml version='1.0' encoding='UTF-8'?>\r\n<getctag xmlns='http://calendarserver.org/ns/'>%s</getctag>\r\n" % (str(datetime.datetime.now()),)
+ ctagValue = zlib.compress(ctagValue)
+ try:
+ xattr.setxattr(calPath, xattrname("{http:%2F%2Fcalendarserver.org%2Fns%2F}getctag"), ctagValue)
+ except IOError, ioe:
+ if ioe.errno == errno.EOPNOTSUPP:
+ # On non-native xattr systems we cannot do this,
+ # but those systems will typically not be migrating
+ # from pre-v1
+ pass
+ except:
+ raise
- md5value = "<?xml version='1.0' encoding='UTF-8'?>\r\n<getcontentmd5 xmlns='http://twistedmatrix.com/xml_namespace/dav/'>%s</getcontentmd5>\r\n" % (hashlib.md5(data).hexdigest(),)
- md5value = zlib.compress(md5value)
+ return errorOccurred
+
+
+
+def upgradeCalendarHome(homePath, directory, cuaCache):
+
+ errorOccurred = False
+
+ log.debug("Upgrading calendar home: %s" % (homePath,))
+
+ try:
+ for cal in os.listdir(homePath):
+ calPath = os.path.join(homePath, cal)
+ if not os.path.isdir(calPath):
+ # Skip non-directories; these might have been uploaded by a
+ # random DAV client, they can't be calendar collections.
+ continue
+ if cal == 'notifications':
+ # Delete the old, now obsolete, notifications directory.
+ rmdir(calPath)
+ continue
+ log.debug("Upgrading calendar: %s" % (calPath,))
+ if not upgradeCalendarCollection(calPath, directory, cuaCache):
+ errorOccurred = True
+
+ # Change the calendar-free-busy-set xattrs of the inbox to the
+ # __uids__/<guid> form
+ if cal == "inbox":
try:
- xattr.setxattr(resPath, xattrname("{http:%2F%2Ftwistedmatrix.com%2Fxml_namespace%2Fdav%2F}getcontentmd5"), md5value)
+ for attr, value in xattr.xattr(calPath).iteritems():
+ if attr == xattrname("{urn:ietf:params:xml:ns:caldav}calendar-free-busy-set"):
+ value = updateFreeBusySet(value, directory)
+ if value is not None:
+ # Need to write the xattr back to disk
+ xattr.setxattr(calPath, attr, value)
except IOError, ioe:
if ioe.errno == errno.EOPNOTSUPP:
# On non-native xattr systems we cannot do this,
@@ -185,75 +238,61 @@
pass
except:
raise
+ except Exception, e:
+ log.error("Failed to upgrade calendar home %s: %s" % (homePath, e))
+ raise
- collectionUpdated = True
+ return errorOccurred
- if collectionUpdated:
- ctagValue = "<?xml version='1.0' encoding='UTF-8'?>\r\n<getctag xmlns='http://calendarserver.org/ns/'>%s</getctag>\r\n" % (str(datetime.datetime.now()),)
- ctagValue = zlib.compress(ctagValue)
- try:
- xattr.setxattr(calPath, xattrname("{http:%2F%2Fcalendarserver.org%2Fns%2F}getctag"), ctagValue)
- except IOError, ioe:
- if ioe.errno == errno.EOPNOTSUPP:
- # On non-native xattr systems we cannot do this,
- # but those systems will typically not be migrating
- # from pre-v1
- pass
- except:
- raise
- return errorOccurred
+class UpgradeOneHome(Command):
+ arguments = [('path', String())]
+ response = [('succeeded', Boolean())]
- def upgradeCalendarHome(homePath, directory, cuaCache):
- errorOccurred = False
+class To1Driver(AMP):
+ """
+ Upgrade driver which runs in the parent process.
+ """
- log.debug("Upgrading calendar home: %s" % (homePath,))
+ def upgradeHomeInHelper(self, path):
+ return self.callRemote(UpgradeOneHome, path=path).addCallback(
+ operator.itemgetter("succeeded")
+ )
- try:
- for cal in os.listdir(homePath):
- calPath = os.path.join(homePath, cal)
- if not os.path.isdir(calPath):
- # Skip non-directories; these might have been uploaded by a
- # random DAV client, they can't be calendar collections.
- continue
- if cal == 'notifications':
- # Delete the old, now obsolete, notifications directory.
- rmdir(calPath)
- continue
- log.debug("Upgrading calendar: %s" % (calPath,))
- if not upgradeCalendarCollection(calPath, directory, cuaCache):
- errorOccurred = True
- # Change the calendar-free-busy-set xattrs of the inbox to the
- # __uids__/<guid> form
- if cal == "inbox":
- try:
- for attr, value in xattr.xattr(calPath).iteritems():
- if attr == xattrname("{urn:ietf:params:xml:ns:caldav}calendar-free-busy-set"):
- value = updateFreeBusySet(value, directory)
- if value is not None:
- # Need to write the xattr back to disk
- xattr.setxattr(calPath, attr, value)
- except IOError, ioe:
- if ioe.errno == errno.EOPNOTSUPP:
- # On non-native xattr systems we cannot do this,
- # but those systems will typically not be migrating
- # from pre-v1
- pass
- except:
- raise
+class To1Home(AMP):
+ """
+ Upgrade worker which runs in dedicated subprocesses.
+ """
- except Exception, e:
- log.error("Failed to upgrade calendar home %s: %s" % (homePath, e))
- raise
+ def __init__(self, config):
+ super(To1Home, self).__init__()
+ self.directory = getDirectory(config)
+ self.cuaCache = {}
- return errorOccurred
+ @UpgradeOneHome.responder
+ def upgradeOne(self, path):
+ result = upgradeCalendarHome(path, self.directory, self.cuaCache)
+ return dict(succeeded=result)
+
+
+ at inlineCallbacks
+def upgrade_to_1(config, spawner, parallel, directory):
+ """
+ Upconvert data from any calendar server version prior to data format 1.
+ """
+ errorOccurred = []
+ def setError(f=None):
+ if f is not None:
+ log.err(f)
+ errorOccurred.append(True)
+
def doProxyDatabaseMoveUpgrade(config, uid=-1, gid=-1):
# See if the new one is already present
oldFilename = ".db.calendaruserproxy"
@@ -467,6 +506,12 @@
os.chown(inboxItemsFile, uid, gid)
if total:
+ if parallel:
+ spawner.startService()
+ parallelizer = Parallelizer((yield gatherResults(
+ [spawner.spawnWithConfig(config, To1Driver(), To1Home)
+ for x in xrange(parallel)]
+ )))
log.warn("Processing %d calendar homes in %s" % (total, uidHomes))
# Upgrade calendar homes in the new location:
@@ -484,15 +529,30 @@
# Skip non-directories
continue
- if not upgradeCalendarHome(homePath,
- directory, cuaCache):
- errorOccurred = True
+ if parallel:
+ def doIt(driver, hp=homePath):
+ d = driver.upgradeHomeInHelper(hp)
+ def itWorked(succeeded):
+ if not succeeded:
+ setError()
+ return succeeded
+ d.addCallback(itWorked)
+ d.addErrback(setError)
+ return d
+ yield parallelizer.do(doIt)
+ else:
+ if not upgradeCalendarHome(
+ homePath, directory, cuaCache
+ ):
+ setError()
count += 1
if count % 10 == 0:
log.warn("Processed calendar home %d of %d"
% (count, total))
-
+ if parallel:
+ yield parallelizer.done()
+ yield spawner.stopService()
log.warn("Done processing calendar homes")
yield migrateResourceInfo(config, directory, uid, gid)
@@ -522,7 +582,7 @@
cal = Component.fromString(data)
def lookupFunction(cuaddr):
-
+ from twisted.python.failure import Failure
# Return cached results, if any.
if cuaCache.has_key(cuaddr):
return cuaCache[cuaddr]
@@ -559,7 +619,7 @@
@inlineCallbacks
-def upgrade_to_2(config, spawner, directory):
+def upgrade_to_2(config, spawner, parallel, directory):
errorOccurred = False
@@ -675,7 +735,7 @@
]
@inlineCallbacks
-def upgradeData(config, spawner=None):
+def upgradeData(config, spawner=None, parallel=0):
directory = getDirectory()
@@ -707,7 +767,7 @@
for version, method in upgradeMethods:
if onDiskVersion < version:
log.warn("Upgrading to version %d" % (version,))
- (yield method(config, spawner, directory))
+ (yield method(config, spawner, parallel, directory))
log.warn("Upgraded to version %d" % (version,))
with open(versionFilePath, "w") as verFile:
verFile.write(str(version))
@@ -899,12 +959,14 @@
Upgrade filesystem from previous versions.
"""
- def __init__(self, config, service):
+ def __init__(self, spawner, parallel, config, service):
"""
Initialize the service.
"""
self.wrappedService = service
self.config = config
+ self.spawner = spawner
+ self.parallel = parallel
@inlineCallbacks
@@ -921,7 +983,7 @@
memcacheEnabled = self.config.Memcached.Pools.Default.ClientEnabled
self.config.Memcached.Pools.Default.ClientEnabled = False
- yield upgradeData(self.config)
+ yield upgradeData(self.config, self.spawner, self.parallel)
# Restore memcached client setting
self.config.Memcached.Pools.Default.ClientEnabled = memcacheEnabled
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120124/d682b7a8/attachment-0001.html>
More information about the calendarserver-changes
mailing list