[CalendarServer-changes] [10277] CalendarServer/branches/users/glyph/queue-locking-and-timing
source_changes at macosforge.org
source_changes at macosforge.org
Fri Jan 4 16:39:46 PST 2013
Revision: 10277
http://trac.calendarserver.org//changeset/10277
Author: glyph at apple.com
Date: 2013-01-04 16:39:46 -0800 (Fri, 04 Jan 2013)
Log Message:
-----------
Test for lost work which fails with error.
Modified Paths:
--------------
CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py
Property Changed:
----------------
CalendarServer/branches/users/glyph/queue-locking-and-timing/
Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py 2013-01-05 00:39:45 UTC (rev 10276)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py 2013-01-05 00:39:46 UTC (rev 10277)
@@ -23,6 +23,8 @@
# TODO: There should be a store-building utility within twext.enterprise.
from twisted.protocols.amp import Command
+from twisted.internet.task import Clock
+
from txdav.common.datastore.test.util import buildStore
from twext.enterprise.dal.syntax import SchemaSyntax, Select
@@ -33,7 +35,7 @@
from twisted.trial.unittest import TestCase
from twisted.internet.defer import (
- Deferred, inlineCallbacks, gatherResults, passthru
+ Deferred, inlineCallbacks, gatherResults, passthru, returnValue
)
from twisted.application.service import Service, MultiService
@@ -46,9 +48,32 @@
from twext.enterprise.dal.record import Record
from twext.enterprise.queue import ConnectionFromPeerNode
+from twext.enterprise.fixtures import buildConnectionPool
from zope.interface.verify import verifyObject
from twisted.test.proto_helpers import StringTransport
+def transactionally(transactionCreator):
+ """
+ Perform the decorated function immediately in a transaction, replacing its
+ name with a L{Deferred}.
+
+ Use like so::
+
+ @transactionally(connectionPool.connection)
+ @inlineCallbacks
+ def it(txn):
+ yield txn.doSomething()
+ it.addCallback(firedWhenDone)
+
+ @param transactionCreator: A 0-arg callable that returns an
+ L{IAsyncTransaction}.
+ """
+ def thunk(operation):
+ return inTransaction(transactionCreator, operation)
+ return thunk
+
+
+
class UtilityTests(TestCase):
"""
Tests for supporting utilities.
@@ -104,9 +129,18 @@
create table DUMMY_WORK_ITEM (WORK_ID integer primary key,
NOT_BEFORE timestamp,
A integer, B integer);
- create table DUMMY_WORK_DONE (WORK_ID integer, A_PLUS_B integer);
+ create table DUMMY_WORK_DONE (WORK_ID integer primary key,
+ A_PLUS_B integer);
""")
+nodeSchema = SQL("""
+ create table NODE_INFO (HOSTNAME varchar(255) not null,
+ PID integer not null,
+ PORT integer not null,
+ TIME timestamp default current_timestamp not null,
+ primary key (HOSTNAME, PORT));
+""")
+
schema = SchemaSyntax(SimpleSchemaHelper().schemaFromString(schemaText))
dropSQL = ["drop table {name}".format(name=table.model.name)
@@ -284,7 +318,55 @@
self.assertEquals(performResult, [None])
+ @inlineCallbacks
+ def test_notBeforeWhenCheckingForLostWork(self):
+ """
+ L{PeerConnectionPool._periodicLostWorkCheck} should execute any
+ outstanding work items, but only those that are expired.
+ """
+ dbpool = buildConnectionPool(self, schemaText + nodeSchema)
+ # An arbitrary point in time.
+ fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+ # *why* does datetime still not have .astimestamp()
+ sinceEpoch = (fakeNow - datetime.datetime.utcfromtimestamp(0)).seconds
+ clock = Clock()
+ clock.advance(sinceEpoch)
+ qpool = PeerConnectionPool(clock, dbpool.connection, 0, schema)
+ # Let's create a couple of work items directly, not via the enqueue
+ # method, so that they exist but nobody will try to immediately execute
+ # them.
+
+ @transactionally(dbpool.connection)
+ @inlineCallbacks
+ def setup(txn):
+ # First, one that's right now.
+ yield DummyWorkItem.create(txn, a=1, b=2, notBefore=fakeNow)
+
+ # Next, create one that's actually far enough into the past to run.
+ yield DummyWorkItem.create(
+ txn, a=3, b=4, notBefore=(
+ # Schedule it in the past so that it should have already run.
+ fakeNow - datetime.timedelta(
+ seconds=qpool.queueProcessTimeout + 20
+ )
+ )
+ )
+
+ # Finally, one that's actually scheduled for the future.
+ yield DummyWorkItem.create(
+ txn, a=10, b=20, notBefore=fakeNow + datetime.timedelta(1000)
+ )
+ yield setup
+ yield qpool._periodicLostWorkCheck()
+ @transactionally(dbpool.connection)
+ def check(txn):
+ return DummyWorkDone.all(txn)
+ every = yield check
+ self.assertEquals([x.aPlusB for x in every], [7])
+
+
+
class HalfConnection(object):
def __init__(self, protocol):
self.protocol = protocol
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130104/2318a7e3/attachment.html>
More information about the calendarserver-changes
mailing list