[CalendarServer-changes] [11310] CalendarServer/trunk/twext/enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Wed Jun 5 14:14:53 PDT 2013
Revision: 11310
http://trac.calendarserver.org//changeset/11310
Author: glyph at apple.com
Date: 2013-06-05 14:14:53 -0700 (Wed, 05 Jun 2013)
Log Message:
-----------
Merge q-delete-no-concurrency; make sure that deleted queue items don't run twice.
Modified Paths:
--------------
CalendarServer/trunk/twext/enterprise/dal/record.py
CalendarServer/trunk/twext/enterprise/dal/test/test_record.py
CalendarServer/trunk/twext/enterprise/test/test_queue.py
Modified: CalendarServer/trunk/twext/enterprise/dal/record.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/record.py 2013-06-05 18:42:14 UTC (rev 11309)
+++ CalendarServer/trunk/twext/enterprise/dal/record.py 2013-06-05 21:14:53 UTC (rev 11310)
@@ -258,12 +258,13 @@
"""
Delete this row from the database.
- @return: a L{Deferred} which fires when the underlying row has been
- deleted.
+ @return: a L{Deferred} which fires with C{None} when the underlying row
+ has been deleted, or fails with L{NoSuchRecord} if the underlying
+ row was already deleted.
"""
return Delete(From=self.table,
Where=self._primaryKeyComparison(self._primaryKeyValue())
- ).on(self.transaction)
+ ).on(self.transaction, raiseOnZeroRowCount=NoSuchRecord)
@inlineCallbacks
Modified: CalendarServer/trunk/twext/enterprise/dal/test/test_record.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/dal/test/test_record.py 2013-06-05 18:42:14 UTC (rev 11309)
+++ CalendarServer/trunk/twext/enterprise/dal/test/test_record.py 2013-06-05 21:14:53 UTC (rev 11310)
@@ -30,6 +30,7 @@
from twext.enterprise.dal.test.test_parseschema import SchemaTestHelper
from twext.enterprise.dal.syntax import SchemaSyntax
from twisted.internet.defer import gatherResults
+from twisted.internet.defer import returnValue
from twext.enterprise.fixtures import buildConnectionPool
# from twext.enterprise.dal.syntax import
@@ -125,7 +126,7 @@
def test_simpleDelete(self):
"""
When a record object is deleted, a row with a matching primary key will
- be created in the database.
+ be deleted in the database.
"""
txn = self.pool.connection()
def mkrow(beta, gamma):
@@ -140,6 +141,30 @@
@inlineCallbacks
+ def oneRowCommitted(self, beta=123, gamma=u'456'):
+ """
+ Create, commit, and return one L{TestRecord}.
+ """
+ txn = self.pool.connection(self.id())
+ row = yield TestRecord.create(txn, beta=beta, gamma=gamma)
+ yield txn.commit()
+ returnValue(row)
+
+
+ @inlineCallbacks
+ def test_deleteWhenDeleted(self):
+ """
+ When a record object is deleted, if it's already been deleted, it will
+ raise L{NoSuchRecord}.
+ """
+ row = yield self.oneRowCommitted()
+ txn = self.pool.connection(self.id())
+ newRow = yield TestRecord.load(txn, row.beta)
+ yield newRow.delete()
+ self.failUnlessFailure(newRow.delete(), NoSuchRecord)
+
+
+ @inlineCallbacks
def test_cantCreateWithoutRequiredValues(self):
"""
When a L{Record} object is created without required values, it raises a
Modified: CalendarServer/trunk/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_queue.py 2013-06-05 18:42:14 UTC (rev 11309)
+++ CalendarServer/trunk/twext/enterprise/test/test_queue.py 2013-06-05 21:14:53 UTC (rev 11310)
@@ -54,6 +54,7 @@
from zope.interface.verify import verifyObject
from twisted.test.proto_helpers import StringTransport, MemoryReactor
from twext.enterprise.fixtures import SteppablePoolHelper
+from twisted.internet.defer import returnValue
from twext.enterprise.queue import _BaseQueuer, NonPerformingQueuer
import twext.enterprise.queue
@@ -157,7 +158,8 @@
schemaText = SQL("""
create table DUMMY_WORK_ITEM (WORK_ID integer primary key,
NOT_BEFORE timestamp,
- A integer, B integer);
+ A integer, B integer,
+ DELETE_ON_LOAD integer default 0);
create table DUMMY_WORK_DONE (WORK_ID integer primary key,
A_PLUS_B integer);
""")
@@ -194,7 +196,24 @@
aPlusB=self.a + self.b)
+ @classmethod
+ @inlineCallbacks
+ def load(cls, txn, *a, **kw):
+ """
+ Load L{DummyWorkItem} as normal... unless the loaded item has
+ C{DELETE_ON_LOAD} set, in which case, do a deletion of this same row in
+ a concurrent transaction, then commit it.
+ """
+ self = yield super(DummyWorkItem, cls).load(txn, *a, **kw)
+ if self.deleteOnLoad:
+ otherTransaction = txn.concurrently()
+ otherSelf = yield super(DummyWorkItem, cls).load(txn, *a, **kw)
+ yield otherSelf.delete()
+ yield otherTransaction.commit()
+ returnValue(self)
+
+
class SchemaAMPTests(TestCase):
"""
Tests for L{SchemaAMP} faithfully relaying tables across the wire.
@@ -626,19 +645,26 @@
return txn.execSQL(schemaText)
yield inTransaction(lambda: self.store.newTransaction("bonus schema"),
doit)
+ def indirectedTransactionFactory(*a):
+ """
+ Allow tests to replace 'self.store.newTransaction' to provide
+ fixtures with extra methods on a test-by-test basis.
+ """
+ return self.store.newTransaction(*a)
def deschema():
@inlineCallbacks
def deletestuff(txn):
for stmt in dropSQL:
yield txn.execSQL(stmt)
- return inTransaction(self.store.newTransaction, deletestuff)
+ return inTransaction(lambda *a: self.store.newTransaction(*a),
+ deletestuff)
self.addCleanup(deschema)
from twisted.internet import reactor
self.node1 = PeerConnectionPool(
- reactor, self.store.newTransaction, 0, schema)
+ reactor, indirectedTransactionFactory, 0, schema)
self.node2 = PeerConnectionPool(
- reactor, self.store.newTransaction, 0, schema)
+ reactor, indirectedTransactionFactory, 0, schema)
class FireMeService(Service, object):
def __init__(self, d):
@@ -681,9 +707,9 @@
"""
# TODO: this exact test should run against LocalQueuer as well.
def operation(txn):
- # TODO: how does 'enqueue' get associated with the transaction? This
- # is not the fact with a raw t.w.enterprise transaction. Should
- # probably do something with components.
+ # TODO: how does 'enqueue' get associated with the transaction?
+ # This is not the fact with a raw t.w.enterprise transaction.
+ # Should probably do something with components.
return txn.enqueue(DummyWorkItem, a=3, b=4, workID=4321,
notBefore=datetime.datetime.utcnow())
result = yield inTransaction(self.store.newTransaction, operation)
@@ -697,6 +723,41 @@
self.assertEquals(rows, [[4321, 7]])
+ @inlineCallbacks
+ def test_noWorkDoneWhenConcurrentlyDeleted(self):
+ """
+ When a L{WorkItem} is concurrently deleted by another transaction, it
+ should I{not} perform its work.
+ """
+ # Provide access to a method called 'concurrently' everything using
+ original = self.store.newTransaction
+ def decorate(*a, **k):
+ result = original(*a, **k)
+ result.concurrently = self.store.newTransaction
+ return result
+ self.store.newTransaction = decorate
+
+ def operation(txn):
+ return txn.enqueue(DummyWorkItem, a=30, b=40, workID=5678,
+ deleteOnLoad=1,
+ notBefore=datetime.datetime.utcnow())
+ proposal = yield inTransaction(self.store.newTransaction, operation)
+ yield proposal.whenExecuted()
+ # Sanity check on the concurrent deletion.
+ def op2(txn):
+ return Select([schema.DUMMY_WORK_ITEM.WORK_ID],
+ From=schema.DUMMY_WORK_ITEM).on(txn)
+ rows = yield inTransaction(self.store.newTransaction, op2)
+ self.assertEquals(rows, [])
+ def op3(txn):
+ return Select([schema.DUMMY_WORK_DONE.WORK_ID,
+ schema.DUMMY_WORK_DONE.A_PLUS_B],
+ From=schema.DUMMY_WORK_DONE).on(txn)
+ rows = yield inTransaction(self.store.newTransaction, op3)
+ self.assertEquals(rows, [])
+
+
+
class DummyProposal(object):
def __init__(self, *ignored):
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130605/ccf8c938/attachment.html>
More information about the calendarserver-changes
mailing list