[CalendarServer-changes] [11310] CalendarServer/trunk/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Wed Jun 5 14:14:53 PDT 2013


Revision: 11310
          http://trac.calendarserver.org//changeset/11310
Author:   glyph at apple.com
Date:     2013-06-05 14:14:53 -0700 (Wed, 05 Jun 2013)
Log Message:
-----------
Merge q-delete-no-concurrency; make sure that deleted queue items don't run twice.

Modified Paths:
--------------
    CalendarServer/trunk/twext/enterprise/dal/record.py
    CalendarServer/trunk/twext/enterprise/dal/test/test_record.py
    CalendarServer/trunk/twext/enterprise/test/test_queue.py

Modified: CalendarServer/trunk/twext/enterprise/dal/record.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/record.py	2013-06-05 18:42:14 UTC (rev 11309)
+++ CalendarServer/trunk/twext/enterprise/dal/record.py	2013-06-05 21:14:53 UTC (rev 11310)
@@ -258,12 +258,13 @@
         """
         Delete this row from the database.
 
-        @return: a L{Deferred} which fires when the underlying row has been
-            deleted.
+        @return: a L{Deferred} which fires with C{None} when the underlying row
+            has been deleted, or fails with L{NoSuchRecord} if the underlying
+            row was already deleted.
         """
         return Delete(From=self.table,
                       Where=self._primaryKeyComparison(self._primaryKeyValue())
-                      ).on(self.transaction)
+                      ).on(self.transaction, raiseOnZeroRowCount=NoSuchRecord)
 
 
     @inlineCallbacks

Modified: CalendarServer/trunk/twext/enterprise/dal/test/test_record.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/test/test_record.py	2013-06-05 18:42:14 UTC (rev 11309)
+++ CalendarServer/trunk/twext/enterprise/dal/test/test_record.py	2013-06-05 21:14:53 UTC (rev 11310)
@@ -30,6 +30,7 @@
 from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
 from twext.enterprise.dal.syntax import SchemaSyntax
 from twisted.internet.defer import gatherResults
+from twisted.internet.defer import returnValue
 from twext.enterprise.fixtures import buildConnectionPool
 
 # from twext.enterprise.dal.syntax import
@@ -125,7 +126,7 @@
     def test_simpleDelete(self):
         """
         When a record object is deleted, a row with a matching primary key will
-        be created in the database.
+        be deleted in the database.
         """
         txn = self.pool.connection()
         def mkrow(beta, gamma):
@@ -140,6 +141,30 @@
 
 
     @inlineCallbacks
+    def oneRowCommitted(self, beta=123, gamma=u'456'):
+        """
+        Create, commit, and return one L{TestRecord}.
+        """
+        txn = self.pool.connection(self.id())
+        row = yield TestRecord.create(txn, beta=beta, gamma=gamma)
+        yield txn.commit()
+        returnValue(row)
+
+
+    @inlineCallbacks
+    def test_deleteWhenDeleted(self):
+        """
+        When a record object is deleted, if it's already been deleted, it will
+        raise L{NoSuchRecord}.
+        """
+        row = yield self.oneRowCommitted()
+        txn = self.pool.connection(self.id())
+        newRow = yield TestRecord.load(txn, row.beta)
+        yield newRow.delete()
+        self.failUnlessFailure(newRow.delete(), NoSuchRecord)
+
+
+    @inlineCallbacks
     def test_cantCreateWithoutRequiredValues(self):
         """
         When a L{Record} object is created without required values, it raises a

Modified: CalendarServer/trunk/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_queue.py	2013-06-05 18:42:14 UTC (rev 11309)
+++ CalendarServer/trunk/twext/enterprise/test/test_queue.py	2013-06-05 21:14:53 UTC (rev 11310)
@@ -54,6 +54,7 @@
 from zope.interface.verify import verifyObject
 from twisted.test.proto_helpers import StringTransport, MemoryReactor
 from twext.enterprise.fixtures import SteppablePoolHelper
+from twisted.internet.defer import returnValue
 
 from twext.enterprise.queue import _BaseQueuer, NonPerformingQueuer
 import twext.enterprise.queue
@@ -157,7 +158,8 @@
 schemaText = SQL("""
     create table DUMMY_WORK_ITEM (WORK_ID integer primary key,
                                   NOT_BEFORE timestamp,
-                                  A integer, B integer);
+                                  A integer, B integer,
+                                  DELETE_ON_LOAD integer default 0);
     create table DUMMY_WORK_DONE (WORK_ID integer primary key,
                                   A_PLUS_B integer);
 """)
@@ -194,7 +196,24 @@
                                     aPlusB=self.a + self.b)
 
 
+    @classmethod
+    @inlineCallbacks
+    def load(cls, txn, *a, **kw):
+        """
+        Load L{DummyWorkItem} as normal...  unless the loaded item has
+        C{DELETE_ON_LOAD} set, in which case, do a deletion of this same row in
+        a concurrent transaction, then commit it.
+        """
+        self = yield super(DummyWorkItem, cls).load(txn, *a, **kw)
+        if self.deleteOnLoad:
+            otherTransaction = txn.concurrently()
+            otherSelf = yield super(DummyWorkItem, cls).load(txn, *a, **kw)
+            yield otherSelf.delete()
+            yield otherTransaction.commit()
+        returnValue(self)
 
+
+
 class SchemaAMPTests(TestCase):
     """
     Tests for L{SchemaAMP} faithfully relaying tables across the wire.
@@ -626,19 +645,26 @@
             return txn.execSQL(schemaText)
         yield inTransaction(lambda: self.store.newTransaction("bonus schema"),
                             doit)
+        def indirectedTransactionFactory(*a):
+            """
+            Allow tests to replace 'self.store.newTransaction' to provide
+            fixtures with extra methods on a test-by-test basis.
+            """
+            return self.store.newTransaction(*a)
         def deschema():
             @inlineCallbacks
             def deletestuff(txn):
                 for stmt in dropSQL:
                     yield txn.execSQL(stmt)
-            return inTransaction(self.store.newTransaction, deletestuff)
+            return inTransaction(lambda *a: self.store.newTransaction(*a),
+                                 deletestuff)
         self.addCleanup(deschema)
 
         from twisted.internet import reactor
         self.node1 = PeerConnectionPool(
-            reactor, self.store.newTransaction, 0, schema)
+            reactor, indirectedTransactionFactory, 0, schema)
         self.node2 = PeerConnectionPool(
-            reactor, self.store.newTransaction, 0, schema)
+            reactor, indirectedTransactionFactory, 0, schema)
 
         class FireMeService(Service, object):
             def __init__(self, d):
@@ -681,9 +707,9 @@
         """
         # TODO: this exact test should run against LocalQueuer as well.
         def operation(txn):
-            # TODO: how does 'enqueue' get associated with the transaction? This
-            # is not the fact with a raw t.w.enterprise transaction.  Should
-            # probably do something with components.
+            # TODO: how does 'enqueue' get associated with the transaction?
+            # This is not the fact with a raw t.w.enterprise transaction.
+            # Should probably do something with components.
             return txn.enqueue(DummyWorkItem, a=3, b=4, workID=4321,
                                notBefore=datetime.datetime.utcnow())
         result = yield inTransaction(self.store.newTransaction, operation)
@@ -697,6 +723,41 @@
         self.assertEquals(rows, [[4321, 7]])
 
 
+    @inlineCallbacks
+    def test_noWorkDoneWhenConcurrentlyDeleted(self):
+        """
+        When a L{WorkItem} is concurrently deleted by another transaction, it
+        should I{not} perform its work.
+        """
+        # Provide access to a method called 'concurrently' everything using 
+        original = self.store.newTransaction
+        def decorate(*a, **k):
+            result = original(*a, **k)
+            result.concurrently = self.store.newTransaction
+            return result
+        self.store.newTransaction = decorate
+
+        def operation(txn):
+            return txn.enqueue(DummyWorkItem, a=30, b=40, workID=5678,
+                               deleteOnLoad=1,
+                               notBefore=datetime.datetime.utcnow())
+        proposal = yield inTransaction(self.store.newTransaction, operation)
+        yield proposal.whenExecuted()
+        # Sanity check on the concurrent deletion.
+        def op2(txn):
+            return Select([schema.DUMMY_WORK_ITEM.WORK_ID],
+                           From=schema.DUMMY_WORK_ITEM).on(txn)
+        rows = yield inTransaction(self.store.newTransaction, op2)
+        self.assertEquals(rows, [])
+        def op3(txn):
+            return Select([schema.DUMMY_WORK_DONE.WORK_ID,
+                           schema.DUMMY_WORK_DONE.A_PLUS_B],
+                           From=schema.DUMMY_WORK_DONE).on(txn)
+        rows = yield inTransaction(self.store.newTransaction, op3)
+        self.assertEquals(rows, [])
+
+
+
 class DummyProposal(object):
 
     def __init__(self, *ignored):
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130605/ccf8c938/attachment.html>


More information about the calendarserver-changes mailing list