[CalendarServer-changes] [10285] CalendarServer/branches/users/glyph/queue-locking-and-timing
source_changes at macosforge.org
source_changes at macosforge.org
Fri Jan 4 16:39:57 PST 2013
Revision: 10285
http://trac.calendarserver.org//changeset/10285
Author: glyph at apple.com
Date: 2013-01-04 16:39:57 -0800 (Fri, 04 Jan 2013)
Log Message:
-----------
Make that test pass, albeit a the expense of twext.enterprise.test.test_queue.PeerConnectionPoolIntegrationTests
Modified Paths:
--------------
CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py
Property Changed:
----------------
CalendarServer/branches/users/glyph/queue-locking-and-timing/
Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py 2013-01-05 00:39:55 UTC (rev 10284)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/queue.py 2013-01-05 00:39:57 UTC (rev 10285)
@@ -192,6 +192,14 @@
+def astimestamp(v):
+ """
+ Convert the given datetime to a POSIX timestamp.
+ """
+ return (v - datetime.utcfromtimestamp(0)).total_seconds()
+
+
+
class TableSyntaxByName(Argument):
"""
Serialize and deserialize L{TableSyntax} objects for an AMP protocol with
@@ -900,7 +908,8 @@
another node, perhaps in the future.
@ivar _chooser: The object which will choose where the work in this
- proposal gets performed.
+ proposal gets performed. This must have both a C{choosePerformer}
+ method and a C{reactor} attribute, providing an L{IReactorTime}.
@type _chooser: L{PeerConnectionPool} or L{LocalQueuer}
@ivar txn: The transaction where the work will be enqueued.
@@ -936,14 +945,20 @@
@self.txn.postCommit
def whenDone():
self._whenCommitted.callback(self)
- performer = self._chooser.choosePerformer()
- @passthru(performer.performWork(item.table, item.workID)
- .addCallback)
- def performed(result):
- self._whenExecuted.callback(self)
- @performed.addErrback
- def notPerformed(why):
- self._whenExecuted.errback(why)
+ def maybeLater():
+ performer = self._chooser.choosePerformer()
+ @passthru(performer.performWork(item.table, item.workID)
+ .addCallback)
+ def performed(result):
+ self._whenExecuted.callback(self)
+ @performed.addErrback
+ def notPerformed(why):
+ self._whenExecuted.errback(why)
+ reactor = self._chooser.reactor
+ when = astimestamp(item.notBefore) - reactor.seconds()
+ # TODO: Track the returned DelayedCall so it can be stopped when
+ # the service stops.
+ self._chooser.reactor.callLater(when, maybeLater)
@self.txn.postAbort
def whenFailed():
self._whenCommitted.errback(TransactionFailed)
@@ -1388,8 +1403,11 @@
"""
implements(IQueuer)
- def __init__(self, txnFactory):
+ def __init__(self, txnFactory, reactor=None):
self.txnFactory = txnFactory
+ if reactor is None:
+ from twisted.internet import reactor
+ self.reactor = reactor
def choosePerformer(self):
Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py 2013-01-05 00:39:55 UTC (rev 10284)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py 2013-01-05 00:39:57 UTC (rev 10285)
@@ -31,7 +31,9 @@
from twext.enterprise.dal.record import fromTable
from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
-from twext.enterprise.queue import inTransaction, PeerConnectionPool, WorkItem
+from twext.enterprise.queue import (
+ inTransaction, PeerConnectionPool, WorkItem, astimestamp
+)
from twisted.trial.unittest import TestCase
from twisted.internet.defer import (
@@ -74,14 +76,6 @@
-def astimestamp(v):
- """
- Convert the given datetime to a POSIX timestamp.
- """
- return (v - datetime.datetime.utcfromtimestamp(0)).total_seconds()
-
-
-
class UtilityTests(TestCase):
"""
Tests for supporting utilities.
@@ -395,7 +389,7 @@
@transactionally(dbpool.connection)
def check(txn):
return qpool.enqueueWork(
- txn, DummyWorkItem,
+ txn, DummyWorkItem, a=3, b=9,
notBefore=datetime.datetime(2012, 12, 12, 12, 12, 20)
).whenProposed()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130104/38e73904/attachment.html>
More information about the calendarserver-changes
mailing list