[CalendarServer-changes] [10285] CalendarServer/branches/users/glyph/queue-locking-and-timing

source_changes at macosforge.org source_changes at macosforge.org
Fri Jan 4 16:39:57 PST 2013


Revision: 10285
          http://trac.calendarserver.org//changeset/10285
Author:   glyph at apple.com
Date:     2013-01-04 16:39:57 -0800 (Fri, 04 Jan 2013)
Log Message:
-----------
Make that test pass, albeit a the expense of twext.enterprise.test.test_queue.PeerConnectionPoolIntegrationTests

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
    CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/queue-locking-and-timing/

Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py	2013-01-05 00:39:55 UTC (rev 10284)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py	2013-01-05 00:39:57 UTC (rev 10285)
@@ -192,6 +192,14 @@
 
 
 
+def astimestamp(v):
+    """
+    Convert the given datetime to a POSIX timestamp.
+    """
+    return (v - datetime.utcfromtimestamp(0)).total_seconds()
+
+
+
 class TableSyntaxByName(Argument):
     """
     Serialize and deserialize L{TableSyntax} objects for an AMP protocol with
@@ -900,7 +908,8 @@
     another node, perhaps in the future.
 
     @ivar _chooser: The object which will choose where the work in this
-        proposal gets performed.
+        proposal gets performed.  This must have both a C{choosePerformer}
+        method and a C{reactor} attribute, providing an L{IReactorTime}.
     @type _chooser: L{PeerConnectionPool} or L{LocalQueuer}
 
     @ivar txn: The transaction where the work will be enqueued.
@@ -936,14 +945,20 @@
             @self.txn.postCommit
             def whenDone():
                 self._whenCommitted.callback(self)
-                performer = self._chooser.choosePerformer()
-                @passthru(performer.performWork(item.table, item.workID)
-                          .addCallback)
-                def performed(result):
-                    self._whenExecuted.callback(self)
-                @performed.addErrback
-                def notPerformed(why):
-                    self._whenExecuted.errback(why)
+                def maybeLater():
+                    performer = self._chooser.choosePerformer()
+                    @passthru(performer.performWork(item.table, item.workID)
+                              .addCallback)
+                    def performed(result):
+                        self._whenExecuted.callback(self)
+                    @performed.addErrback
+                    def notPerformed(why):
+                        self._whenExecuted.errback(why)
+                reactor = self._chooser.reactor
+                when = astimestamp(item.notBefore) - reactor.seconds()
+                # TODO: Track the returned DelayedCall so it can be stopped when
+                # the service stops.
+                self._chooser.reactor.callLater(when, maybeLater)
             @self.txn.postAbort
             def whenFailed():
                 self._whenCommitted.errback(TransactionFailed)
@@ -1388,8 +1403,11 @@
     """
     implements(IQueuer)
 
-    def __init__(self, txnFactory):
+    def __init__(self, txnFactory, reactor=None):
         self.txnFactory = txnFactory
+        if reactor is None:
+            from twisted.internet import reactor
+        self.reactor = reactor
 
 
     def choosePerformer(self):

Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py	2013-01-05 00:39:55 UTC (rev 10284)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py	2013-01-05 00:39:57 UTC (rev 10285)
@@ -31,7 +31,9 @@
 from twext.enterprise.dal.record import fromTable
 from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
 
-from twext.enterprise.queue import inTransaction, PeerConnectionPool, WorkItem
+from twext.enterprise.queue import (
+    inTransaction, PeerConnectionPool, WorkItem, astimestamp
+)
 
 from twisted.trial.unittest import TestCase
 from twisted.internet.defer import (
@@ -74,14 +76,6 @@
 
 
 
-def astimestamp(v):
-    """
-    Convert the given datetime to a POSIX timestamp.
-    """
-    return (v - datetime.datetime.utcfromtimestamp(0)).total_seconds()
-
-
-
 class UtilityTests(TestCase):
     """
     Tests for supporting utilities.
@@ -395,7 +389,7 @@
         @transactionally(dbpool.connection)
         def check(txn):
             return qpool.enqueueWork(
-                txn, DummyWorkItem,
+                txn, DummyWorkItem, a=3, b=9,
                 notBefore=datetime.datetime(2012, 12, 12, 12, 12, 20)
             ).whenProposed()
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130104/38e73904/attachment.html>


More information about the calendarserver-changes mailing list