[CalendarServer-changes] [8573] CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twext/ python

source_changes at macosforge.org source_changes at macosforge.org
Tue Jan 24 00:55:10 PST 2012


Revision: 8573
          http://trac.macosforge.org/projects/calendarserver/changeset/8573
Author:   glyph at apple.com
Date:     2012-01-24 00:55:10 -0800 (Tue, 24 Jan 2012)
Log Message:
-----------
utility for inlineCallbacks functions that want to do some work in parallel with an accompanying set of resources, but not too much.

Added Paths:
-----------
    CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twext/python/parallel.py
    CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twext/python/test/test_parallel.py

Added: CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twext/python/parallel.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twext/python/parallel.py	                        (rev 0)
+++ CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twext/python/parallel.py	2012-01-24 08:55:10 UTC (rev 8573)
@@ -0,0 +1,94 @@
+# -*- test-case-name: twext.python.test.test_parallel -*-
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Utilities for parallelizing tasks.
+"""
+
+from twisted.internet.defer import inlineCallbacks, DeferredList, returnValue
+
+class Parallelizer(object):
+    """
+    Do some operation with a degree of parallelism, using a set of resources
+    which may each only be used for one task at a time, given some underlying
+    API that returns L{Deferreds}.
+
+    @ivar available: A list of available resources from the C{resources}
+        constructor parameter.
+
+    @ivar busy: A list of resources which are currently being used by
+        operations.
+    """
+
+    def __init__(self, resources):
+        """
+        Initialize a L{Parallelizer} with a list of objects that will be passed
+        to the callables sent to L{Parallelizer.do}.
+
+        @param resources: objects which may be of any arbitrary type.
+        @type resources: C{list}
+        """
+        self.available = list(resources)
+        self.busy = []
+        self.activeDeferreds = []
+
+
+    @inlineCallbacks
+    def do(self, operation):
+        """
+        Call C{operation} with one of the resources in C{self.available},
+        removing that value for use by other callers of C{do} until the task
+        performed by C{operation} is complete (in other words, the L{Deferred}
+        returned by C{operation} has fired).
+
+        @param operation: a 1-argument callable taking a resource from
+            C{self.active} and returning a L{Deferred} when it's done using
+            that resource.
+        @type operation: C{callable}
+
+        @return: a L{Deferred} that fires as soon as there are resources
+            available such that this task can be I{started} - not completed.
+        """
+        if not self.available:
+            yield DeferredList(self.activeDeferreds, fireOnOneCallback=True,
+                               fireOnOneErrback=True)
+        active = self.available.pop(0)
+        self.busy.append(active)
+        o = operation(active)
+        def andFinally(whatever):
+            self.activeDeferreds.remove(o)
+            self.busy.remove(active)
+            self.available.append(active)
+            return whatever
+        self.activeDeferreds.append(o)
+        o.addBoth(andFinally)
+        returnValue(None)
+
+
+    def done(self):
+        """
+        Wait until all operations started by L{Parallelizer.do} are completed.
+
+        @return: a L{Deferred} that fires (with C{None}) when all the currently
+            pending work on this L{Parallelizer} is completed and C{busy} is
+            empty again.
+        """
+        return (DeferredList(self.activeDeferreds)
+                .addCallback(lambda ignored: None))
+
+
+

Added: CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twext/python/test/test_parallel.py
===================================================================
--- CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twext/python/test/test_parallel.py	                        (rev 0)
+++ CalendarServer/branches/users/glyph/parallel-upgrade_to_1/twext/python/test/test_parallel.py	2012-01-24 08:55:10 UTC (rev 8573)
@@ -0,0 +1,63 @@
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Tests for L{twext.python.parallel}.
+"""
+
+from twisted.internet.defer import Deferred
+
+from twext.python.parallel import Parallelizer
+
+from twisted.trial.unittest import TestCase
+
+class ParallelizerTests(TestCase):
+    """
+    Tests for L{Parallelizer}.
+    """
+
+    def test_doAndDone(self):
+        """
+        Blanket catch-all test.  (TODO: split this up into more nice
+        fine-grained tests.)
+        """
+        d1 = Deferred()
+        d2 = Deferred()
+        d3 = Deferred()
+        d4 = Deferred()
+        doing = []
+        done = []
+        allDone = []
+        p = Parallelizer(['a', 'b', 'c'])
+        p.do(lambda a: doing.append(a) or d1).addCallback(done.append)
+        p.do(lambda b: doing.append(b) or d2).addCallback(done.append)
+        p.do(lambda c: doing.append(c) or d3).addCallback(done.append)
+        p.do(lambda b1: doing.append(b1) or d4).addCallback(done.append)
+        p.done().addCallback(allDone.append)
+        self.assertEqual(allDone, [])
+        self.assertEqual(doing, ['a', 'b', 'c'])
+        self.assertEqual(done, [None, None, None])
+        d2.callback(1)
+        self.assertEqual(doing, ['a', 'b', 'c', 'b'])
+        self.assertEqual(done, [None, None, None, None])
+        self.assertEqual(allDone, [])
+        d3.callback(2)
+        d4.callback(3)
+        d1.callback(4)
+        self.assertEqual(done, [None, None, None, None])
+        self.assertEqual(allDone, [None])
+
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120124/1f1692df/attachment-0001.html>


More information about the calendarserver-changes mailing list