[CalendarServer-changes] [10277] CalendarServer/branches/users/glyph/queue-locking-and-timing

source_changes at macosforge.org source_changes at macosforge.org
Fri Jan 4 16:39:46 PST 2013


Revision: 10277
          http://trac.calendarserver.org//changeset/10277
Author:   glyph at apple.com
Date:     2013-01-04 16:39:46 -0800 (Fri, 04 Jan 2013)
Log Message:
-----------
Test for lost work which fails with error.

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py

Property Changed:
----------------
    CalendarServer/branches/users/glyph/queue-locking-and-timing/

Modified: CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py	2013-01-05 00:39:45 UTC (rev 10276)
+++ CalendarServer/branches/users/glyph/queue-locking-and-timing/twext/enterprise/test/test_queue.py	2013-01-05 00:39:46 UTC (rev 10277)
@@ -23,6 +23,8 @@
 # TODO: There should be a store-building utility within twext.enterprise.
 
 from twisted.protocols.amp import Command
+from twisted.internet.task import Clock
+
 from txdav.common.datastore.test.util import buildStore
 
 from twext.enterprise.dal.syntax import SchemaSyntax, Select
@@ -33,7 +35,7 @@
 
 from twisted.trial.unittest import TestCase
 from twisted.internet.defer import (
-    Deferred, inlineCallbacks, gatherResults, passthru
+    Deferred, inlineCallbacks, gatherResults, passthru, returnValue
 )
 
 from twisted.application.service import Service, MultiService
@@ -46,9 +48,32 @@
 from twext.enterprise.dal.record import Record
 
 from twext.enterprise.queue import ConnectionFromPeerNode
+from twext.enterprise.fixtures import buildConnectionPool
 from zope.interface.verify import verifyObject
 from twisted.test.proto_helpers import StringTransport
 
+def transactionally(transactionCreator):
+    """
+    Perform the decorated function immediately in a transaction, replacing its
+    name with a L{Deferred}.
+
+    Use like so::
+
+        @transactionally(connectionPool.connection)
+        @inlineCallbacks
+        def it(txn):
+            yield txn.doSomething()
+        it.addCallback(firedWhenDone)
+
+    @param transactionCreator: A 0-arg callable that returns an
+        L{IAsyncTransaction}.
+    """
+    def thunk(operation):
+        return inTransaction(transactionCreator, operation)
+    return thunk
+
+
+
 class UtilityTests(TestCase):
     """
     Tests for supporting utilities.
@@ -104,9 +129,18 @@
     create table DUMMY_WORK_ITEM (WORK_ID integer primary key,
                                   NOT_BEFORE timestamp,
                                   A integer, B integer);
-    create table DUMMY_WORK_DONE (WORK_ID integer, A_PLUS_B integer);
+    create table DUMMY_WORK_DONE (WORK_ID integer primary key,
+                                  A_PLUS_B integer);
 """)
 
+nodeSchema = SQL("""
+    create table NODE_INFO (HOSTNAME varchar(255) not null,
+                            PID integer not null,
+                            PORT integer not null,
+                            TIME timestamp default current_timestamp not null,
+                            primary key (HOSTNAME, PORT));
+""")
+
 schema = SchemaSyntax(SimpleSchemaHelper().schemaFromString(schemaText))
 
 dropSQL = ["drop table {name}".format(name=table.model.name)
@@ -284,7 +318,55 @@
         self.assertEquals(performResult, [None])
 
 
+    @inlineCallbacks
+    def test_notBeforeWhenCheckingForLostWork(self):
+        """
+        L{PeerConnectionPool._periodicLostWorkCheck} should execute any
+        outstanding work items, but only those that are expired.
+        """
+        dbpool = buildConnectionPool(self, schemaText + nodeSchema)
+        # An arbitrary point in time.
+        fakeNow = datetime.datetime(2012, 12, 12, 12, 12, 12)
+        # *why* does datetime still not have .astimestamp()
+        sinceEpoch = (fakeNow - datetime.datetime.utcfromtimestamp(0)).seconds
+        clock = Clock()
+        clock.advance(sinceEpoch)
+        qpool = PeerConnectionPool(clock, dbpool.connection, 0, schema)
 
+        # Let's create a couple of work items directly, not via the enqueue
+        # method, so that they exist but nobody will try to immediately execute
+        # them.
+
+        @transactionally(dbpool.connection)
+        @inlineCallbacks
+        def setup(txn):
+            # First, one that's right now.
+            yield DummyWorkItem.create(txn, a=1, b=2, notBefore=fakeNow)
+
+            # Next, create one that's actually far enough into the past to run.
+            yield DummyWorkItem.create(
+                txn, a=3, b=4, notBefore=(
+                    # Schedule it in the past so that it should have already run.
+                    fakeNow - datetime.timedelta(
+                        seconds=qpool.queueProcessTimeout + 20
+                    )
+                )
+            )
+
+            # Finally, one that's actually scheduled for the future.
+            yield DummyWorkItem.create(
+                txn, a=10, b=20, notBefore=fakeNow + datetime.timedelta(1000)
+            )
+        yield setup
+        yield qpool._periodicLostWorkCheck()
+        @transactionally(dbpool.connection)
+        def check(txn):
+            return DummyWorkDone.all(txn)
+        every = yield check
+        self.assertEquals([x.aPlusB for x in every], [7])
+
+
+
 class HalfConnection(object):
     def __init__(self, protocol):
         self.protocol = protocol
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130104/2318a7e3/attachment.html>


More information about the calendarserver-changes mailing list