[CalendarServer-changes] [8574] CalendarServer/branches/users/glyph/parallel-upgrade_to_1/txdav/ common/datastore/upgrade/migrate.py

source_changes at macosforge.org source_changes at macosforge.org
Tue Jan 24 00:55:38 PST 2012


Revision: 8574
          http://trac.macosforge.org/projects/calendarserver/changeset/8574
Author:   glyph at apple.com
Date:     2012-01-24 00:55:38 -0800 (Tue, 24 Jan 2012)
Log Message:
-----------
use Parallelizer, and be a bit more parallel

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/parallel-upgrade_to_1/txdav/common/datastore/upgrade/migrate.py

Modified: CalendarServer/branches/users/glyph/parallel-upgrade_to_1/txdav/common/datastore/upgrade/migrate.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-upgrade_to_1/txdav/common/datastore/upgrade/migrate.py	2012-01-24 08:55:10 UTC (rev 8573)
+++ CalendarServer/branches/users/glyph/parallel-upgrade_to_1/txdav/common/datastore/upgrade/migrate.py	2012-01-24 08:55:38 UTC (rev 8574)
@@ -32,9 +32,10 @@
 from twisted.application.service import Service
 from twisted.internet import reactor
 from twisted.internet.defer import inlineCallbacks, returnValue
-from twisted.internet.defer import maybeDeferred, DeferredList
+from twisted.internet.defer import maybeDeferred, gatherResults
 
 from twext.python.filepath import CachingFilePath
+from twext.python.parallel import Parallelizer
 from twext.internet.spawnsvc import SpawnerService
 
 from twisted.protocols.amp import AMP, Command, String
@@ -339,12 +340,11 @@
                           (parallel,))
             spawner = self.spawner
             spawner.startService()
-            drivers = []
-            for value in xrange(parallel):
-                driver = yield spawner.spawnWithStore(UpgradeDriver(self),
-                                                      UpgradeHelperProcess)
-                drivers.append(driver)
-
+            drivers = yield gatherResults(
+                [spawner.spawnWithStore(UpgradeDriver(self),
+                                        UpgradeHelperProcess)
+                 for x in xrange(parallel)]
+            )
             # Wait for all subprocesses to be fully configured before
             # continuing, but let them configure in any order.
             self.log_warn("Configuring upgrade helper processes.")
@@ -356,13 +356,15 @@
             # know the intimate details of the fileStore implementation.
             # (Alternately, wrapService could just hold on to the details that
             # it used to construct the service in the first place.)
-            yield DeferredList([driver.configure(self.fileStore._path.path,
-                                                 self.fileStore._propertyStoreClass)
-                                for driver in drivers])
+            yield gatherResults(
+                [driver.configure(self.fileStore._path.path,
+                                  self.fileStore._propertyStoreClass)
+                 for driver in drivers]
+            )
             self.log_warn("Upgrade helpers ready.")
+            parallelizer = Parallelizer(drivers)
 
         self.log_warn("Beginning filesystem -> database upgrade.")
-        inParallel = []
         for homeType, eachFunc in [
                 ("calendar", self.fileStore.eachCalendarHome),
                 ("addressbook", self.fileStore.eachAddressbookHome),
@@ -374,27 +376,17 @@
                     # No-op transaction here: make sure everything's unlocked
                     # before asking the subprocess to handle it.
                     yield fileTxn.commit()
-                    if not drivers:
-                        # All the subprocesses are currently busy processing an
-                        # upgrade.  Wait for one to become available.
-                        yield DeferredList(inParallel, fireOnOneCallback=True,
-                                           fireOnOneErrback=True)
-                    busy = drivers.pop(0)
-                    d = busy.oneUpgrade(fileHome.uid(), homeType)
-                    inParallel.append(d)
-                    def freeUp(result, d=d, busy=busy, uid=uid,
-                               homeType=homeType):
-                        inParallel.remove(d)
-                        drivers.append(busy)
+                    @inlineCallbacks
+                    def doOneUpgrade(driver, fileUID=uid):
+                        yield driver.oneUpgrade(fileUID, homeType)
                         self.log_warn("Completed migration of %s uid %r" %
-                                      (homeType, uid))
-                        return result
-                    d.addBoth(freeUp)
+                                      (homeType, fileUID))
+                    yield parallelizer.do(doOneUpgrade)
                 else:
                     yield self.migrateOneHome(fileTxn, homeType, fileHome)
 
-        if inParallel:
-            yield DeferredList(inParallel)
+        if parallel:
+            yield parallelizer.done()
 
         for homeType in TOPPATHS:
             homesPath = self.fileStore._path.child(homeType)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120124/5ae21c79/attachment.html>


More information about the calendarserver-changes mailing list