[CalendarServer-changes] [11898] CalendarServer/trunk
source_changes at macosforge.org
source_changes at macosforge.org
Wed Mar 12 11:19:56 PDT 2014
Revision: 11898
http://trac.calendarserver.org//changeset/11898
Author: glyph at apple.com
Date: 2013-11-06 15:36:01 -0800 (Wed, 06 Nov 2013)
Log Message:
-----------
Propagate failures to enqueue database work to the whenProposed Deferred, instead of leaving that Deferred un-fired forever and hanging correct work-queue code.
Modified Paths:
--------------
CalendarServer/trunk/twext/enterprise/fixtures.py
CalendarServer/trunk/twext/enterprise/queue.py
CalendarServer/trunk/twext/enterprise/test/test_queue.py
Property Changed:
----------------
CalendarServer/trunk/
Property changes on: CalendarServer/trunk
___________________________________________________________________
Modified: svn:mergeinfo
- /CalDAVTester/trunk:11193-11198
/CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/release/CalendarServer-4.3-dev:10180-10190,10192
/CalendarServer/branches/release/CalendarServer-5.1-dev:11846
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/fix-no-ischedule:11607-11871
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/ischedule-dkim:9747-9979
/CalendarServer/branches/users/cdaboo/managed-attachments:9985-10145
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/performance-tweaks:11824-11836
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/store-scheduling:10876-11129
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/gaya/sharedgroups-3:11088-11204
/CalendarServer/branches/users/glyph/always-abort-txn-on-error:9958-9969
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/digest-auth-redux:10624-10635
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/enforce-max-requests:11640-11643
/CalendarServer/branches/users/glyph/hang-fix:11465-11491
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/launchd-wrapper-bis:11413-11436
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/log-cleanups:11691-11731
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/one-home-list-api:10048-10073
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/q:9560-9688
/CalendarServer/branches/users/glyph/queue-locking-and-timing:10204-10289
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sharing-api:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/start-service-start-loop:11060-11065
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/unshare-when-access-revoked:10562-10595
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize:9268-9296
/CalendarServer/branches/users/glyph/warning-cleanups:11347-11357
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/sagen/testing:10827-10851,10853-10855
/CalendarServer/branches/users/wsanchez/transations:5515-5593
+ /CalDAVTester/trunk:11193-11198
/CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/release/CalendarServer-4.3-dev:10180-10190,10192
/CalendarServer/branches/release/CalendarServer-5.1-dev:11846
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/fix-no-ischedule:11607-11871
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/ischedule-dkim:9747-9979
/CalendarServer/branches/users/cdaboo/managed-attachments:9985-10145
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/performance-tweaks:11824-11836
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/store-scheduling:10876-11129
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/gaya/sharedgroups-3:11088-11204
/CalendarServer/branches/users/glyph/always-abort-txn-on-error:9958-9969
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/digest-auth-redux:10624-10635
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/enforce-max-requests:11640-11643
/CalendarServer/branches/users/glyph/hang-fix:11465-11491
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/launchd-wrapper-bis:11413-11436
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/log-cleanups:11691-11731
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/one-home-list-api:10048-10073
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/q:9560-9688
/CalendarServer/branches/users/glyph/queue-locking-and-timing:10204-10289
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sharing-api:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/start-service-start-loop:11060-11065
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/unshare-when-access-revoked:10562-10595
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize:9268-9296
/CalendarServer/branches/users/glyph/warning-cleanups:11347-11357
/CalendarServer/branches/users/glyph/whenNotProposed:11881-11897
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/sagen/testing:10827-10851,10853-10855
/CalendarServer/branches/users/wsanchez/transations:5515-5593
Modified: CalendarServer/trunk/twext/enterprise/fixtures.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/fixtures.py 2013-11-06 22:56:10 UTC (rev 11897)
+++ CalendarServer/trunk/twext/enterprise/fixtures.py 2013-11-06 23:36:01 UTC (rev 11898)
@@ -76,8 +76,8 @@
def resultOf(deferred, propagate=False):
"""
- Add a callback and errback which will capture the result of a L{Deferred} in
- a list, and return that list. If 'propagate' is True, pass through the
+ Add a callback and errback which will capture the result of a L{Deferred}
+ in a list, and return that list. If 'propagate' is True, pass through the
results.
"""
results = []
@@ -551,6 +551,21 @@
self._connectResultQueue.append(thunk)
+ def willConnectTo(self):
+ """
+ Queue a successful result for connect() and immediately add it as a
+ child to this L{ConnectionFactory}.
+
+ @return: a connection object
+ @rtype: L{FakeConnection}
+ """
+ aConnection = FakeConnection(self)
+ def thunk():
+ return aConnection
+ self._connectResultQueue.append(thunk)
+ return aConnection
+
+
def willFail(self):
"""
Used by tests to queue a successful result for connect().
Modified: CalendarServer/trunk/twext/enterprise/queue.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/queue.py 2013-11-06 22:56:10 UTC (rev 11897)
+++ CalendarServer/trunk/twext/enterprise/queue.py 2013-11-06 23:36:01 UTC (rev 11898)
@@ -149,7 +149,8 @@
NodeTable.addColumn("PORT", SQLType("integer", None))
NodeTable.addColumn("TIME", SQLType("timestamp", None)).setDefaultValue(
# Note: in the real data structure, this is actually a not-cleaned-up
- # sqlparse internal data structure, but it *should* look closer to this.
+ # sqlparse internal data structure, but it *should* look closer to
+ # this.
ProcedureCall("timezone", ["UTC", NamedValue('CURRENT_TIMESTAMP')])
)
for column in NodeTable.columns:
@@ -677,8 +678,8 @@
"""
def __init__(self, peerPool, boxReceiver=None, locator=None):
- super(ConnectionFromWorker, self).__init__(peerPool.schema, boxReceiver,
- locator)
+ super(ConnectionFromWorker, self).__init__(peerPool.schema,
+ boxReceiver, locator)
self.peerPool = peerPool
self._load = 0
@@ -830,9 +831,9 @@
workItem = yield workItemClass.load(txn, workID)
if workItem.group is not None:
yield NamedLock.acquire(txn, workItem.group)
- # TODO: what if we fail? error-handling should be recorded someplace,
- # the row should probably be marked, re-tries should be triggerable
- # administratively.
+ # TODO: what if we fail? error-handling should be recorded
+ # someplace, the row should probably be marked, re-tries should be
+ # triggerable administratively.
yield workItem.delete()
# TODO: verify that workID is the primary key someplace.
yield workItem.doWork()
@@ -865,9 +866,6 @@
-
-
-
class WorkerFactory(Factory, object):
"""
Factory, to be used as the client to connect from the worker to the
@@ -950,7 +948,7 @@
waiting for the transaction where that addition was completed to
commit, and asking the local node controller process to do the work.
"""
- @passthru(self.workItemType.create(self.txn, **self.kw).addCallback)
+ created = self.workItemType.create(self.txn, **self.kw)
def whenCreated(item):
self._whenProposed.callback(self)
@self.txn.postCommit
@@ -967,12 +965,15 @@
self._whenExecuted.errback(why)
reactor = self._chooser.reactor
when = max(0, astimestamp(item.notBefore) - reactor.seconds())
- # TODO: Track the returned DelayedCall so it can be stopped when
- # the service stops.
+ # TODO: Track the returned DelayedCall so it can be stopped
+ # when the service stops.
self._chooser.reactor.callLater(when, maybeLater)
@self.txn.postAbort
def whenFailed():
self._whenCommitted.errback(TransactionFailed)
+ def whenNotCreated(failure):
+ self._whenProposed.errback(failure)
+ created.addCallbacks(whenCreated, whenNotCreated)
def whenExecuted(self):
@@ -1023,6 +1024,8 @@
"""
return _cloneDeferred(self._whenCommitted)
+
+
class _BaseQueuer(object):
implements(IQueuer)
@@ -1030,13 +1033,16 @@
super(_BaseQueuer, self).__init__()
self.proposalCallbacks = set()
+
def callWithNewProposals(self, callback):
- self.proposalCallbacks.add(callback);
+ self.proposalCallbacks.add(callback)
+
def transferProposalCallbacks(self, newQueuer):
newQueuer.proposalCallbacks = self.proposalCallbacks
return newQueuer
+
def enqueueWork(self, txn, workItemType, **kw):
"""
There is some work to do. Do it, someplace else, ideally in parallel.
@@ -1061,6 +1067,7 @@
return wp
+
class PeerConnectionPool(_BaseQueuer, MultiService, object):
"""
Each node has a L{PeerConnectionPool} connecting it to all the other nodes
@@ -1140,7 +1147,7 @@
self.mappedPeers = {}
self.schema = schema
self._startingUp = None
- self._listeningPortObject = None
+ self._listeningPort = None
self._lastSeenTotalNodes = 1
self._lastSeenNodeIndex = 1
@@ -1197,7 +1204,8 @@
A peer has requested us to perform some work; choose a work performer
local to this node, and then execute it.
"""
- return self.choosePerformer(onlyLocally=True).performWork(table, workID)
+ performer = self.choosePerformer(onlyLocally=True)
+ return performer.performWork(table, workID)
def allWorkItemTypes(self):
@@ -1225,8 +1233,8 @@
@return: the maximum number of other L{PeerConnectionPool} instances
that may be connected to the database described by
- C{self.transactionFactory}. Note that this is not the current count
- by connectivity, but the count according to the database.
+ C{self.transactionFactory}. Note that this is not the current
+ count by connectivity, but the count according to the database.
@rtype: L{int}
"""
# TODO
@@ -1277,7 +1285,6 @@
overdueItem.workID)
return inTransaction(self.transactionFactory, workCheck)
-
_currentWorkDeferred = None
_lostWorkCheckCall = None
@@ -1315,10 +1322,10 @@
@inlineCallbacks
def startup(txn):
endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
- # If this fails, the failure mode is going to be ugly, just like all
- # conflicted-port failures. But, at least it won't proceed.
- self._listeningPortObject = yield endpoint.listen(self.peerFactory())
- self.ampPort = self._listeningPortObject.getHost().port
+ # If this fails, the failure mode is going to be ugly, just like
+ # all conflicted-port failures. But, at least it won't proceed.
+ self._listeningPort = yield endpoint.listen(self.peerFactory())
+ self.ampPort = self._listeningPort.getHost().port
yield Lock.exclusive(NodeInfo.table).on(txn)
nodes = yield self.activeNodes(txn)
selves = [node for node in nodes
@@ -1354,8 +1361,8 @@
yield super(PeerConnectionPool, self).stopService()
if self._startingUp is not None:
yield self._startingUp
- if self._listeningPortObject is not None:
- yield self._listeningPortObject.stopListening()
+ if self._listeningPort is not None:
+ yield self._listeningPort.stopListening()
if self._lostWorkCheckCall is not None:
self._lostWorkCheckCall.cancel()
if self._currentWorkDeferred is not None:
@@ -1430,8 +1437,6 @@
-
-
class LocalQueuer(_BaseQueuer):
"""
When work is enqueued with this queuer, it is just executed locally.
@@ -1458,7 +1463,8 @@
"""
Implementor of C{performWork} that doesn't actual perform any work. This
is used in the case where you want to be able to enqueue work for someone
- else to do, but not take on any work yourself (such as a command line tool).
+ else to do, but not take on any work yourself (such as a command line
+ tool).
"""
implements(_IWorkPerformer)
@@ -1469,6 +1475,7 @@
return succeed(None)
+
class NonPerformingQueuer(_BaseQueuer):
"""
When work is enqueued with this queuer, it is never executed locally.
@@ -1487,4 +1494,4 @@
"""
Choose to perform the work locally.
"""
- return NonPerformer()
\ No newline at end of file
+ return NonPerformer()
Modified: CalendarServer/trunk/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_queue.py 2013-11-06 22:56:10 UTC (rev 11897)
+++ CalendarServer/trunk/twext/enterprise/test/test_queue.py 2013-11-06 23:36:01 UTC (rev 11898)
@@ -36,6 +36,7 @@
)
from twisted.trial.unittest import TestCase
+from twisted.python.failure import Failure
from twisted.internet.defer import (
Deferred, inlineCallbacks, gatherResults, passthru#, returnValue
)
@@ -55,6 +56,8 @@
from twisted.test.proto_helpers import StringTransport, MemoryReactor
from twext.enterprise.fixtures import SteppablePoolHelper
from twisted.internet.defer import returnValue
+from twext.enterprise.queue import LocalQueuer
+from twext.enterprise.fixtures import ConnectionPoolHelper
from twext.enterprise.queue import _BaseQueuer, NonPerformingQueuer
import twext.enterprise.queue
@@ -67,7 +70,7 @@
def callLater(self, _seconds, _f, *args, **kw):
if _seconds < 0:
- raise ValueError("%s<0: "%(_seconds,))
+ raise ValueError("%s<0: " % (_seconds,))
return super(Clock, self).callLater(_seconds, _f, *args, **kw)
@@ -267,6 +270,56 @@
+class WorkProposalTests(TestCase):
+ """
+ Tests for L{WorkProposal}.
+ """
+
+ def test_whenProposedSuccess(self):
+ """
+ The L{Deferred} returned by L{WorkProposal.whenProposed} fires when the
+ SQL sent to the database has completed.
+ """
+ cph = ConnectionPoolHelper()
+ cph.setUp(test=self)
+ cph.pauseHolders()
+ lq = LocalQueuer(cph.createTransaction)
+ enqTxn = cph.createTransaction()
+ wp = lq.enqueueWork(enqTxn, DummyWorkItem, a=3, b=4)
+ d = wp.whenProposed()
+ r = cph.resultOf(d)
+ self.assertEquals(r, [])
+ cph.flushHolders()
+ self.assertEquals(len(r), 1)
+
+
+ def test_whenProposedFailure(self):
+ """
+ The L{Deferred} returned by L{WorkProposal.whenProposed} fails with an
+ errback when the SQL executed to create the WorkItem row fails.
+ """
+ cph = ConnectionPoolHelper()
+ cph.setUp(self)
+ cph.pauseHolders()
+ firstConnection = cph.factory.willConnectTo()
+ enqTxn = cph.createTransaction()
+ # Execute some SQL on the connection before enqueueing the work-item so
+ # that we don't get the initial-statement.
+ enqTxn.execSQL("some sql")
+ lq = LocalQueuer(cph.createTransaction)
+ cph.flushHolders()
+ cph.pauseHolders()
+ wp = lq.enqueueWork(enqTxn, DummyWorkItem, a=3, b=4)
+ firstConnection.executeWillFail(lambda: RuntimeError("foo"))
+ d = wp.whenProposed()
+ r = cph.resultOf(d)
+ self.assertEquals(r, [])
+ cph.flushHolders()
+ self.assertEquals(len(r), 1)
+ self.assertIsInstance(r[0], Failure)
+
+
+
class PeerConnectionPoolUnitTests(TestCase):
"""
L{PeerConnectionPool} has many internal components.
@@ -393,7 +446,8 @@
# Next, create one that's actually far enough into the past to run.
yield DummyWorkItem.create(
txn, a=3, b=4, notBefore=(
- # Schedule it in the past so that it should have already run.
+ # Schedule it in the past so that it should have already
+ # run.
fakeNow - datetime.timedelta(
seconds=qpool.queueProcessTimeout + 20
)
@@ -619,6 +673,7 @@
self.receiver, self.sender = self.sender, self.receiver
return result
+
def flush(self, turns=10):
"""
Keep relaying data until there's no more.
@@ -718,7 +773,7 @@
def op2(txn):
return Select([schema.DUMMY_WORK_DONE.WORK_ID,
schema.DUMMY_WORK_DONE.A_PLUS_B],
- From=schema.DUMMY_WORK_DONE).on(txn)
+ From=schema.DUMMY_WORK_DONE).on(txn)
rows = yield inTransaction(self.store.newTransaction, op2)
self.assertEquals(rows, [[4321, 7]])
@@ -729,7 +784,7 @@
When a L{WorkItem} is concurrently deleted by another transaction, it
should I{not} perform its work.
"""
- # Provide access to a method called 'concurrently' everything using
+ # Provide access to a method called 'concurrently' everything using
original = self.store.newTransaction
def decorate(*a, **k):
result = original(*a, **k)
@@ -746,13 +801,13 @@
# Sanity check on the concurrent deletion.
def op2(txn):
return Select([schema.DUMMY_WORK_ITEM.WORK_ID],
- From=schema.DUMMY_WORK_ITEM).on(txn)
+ From=schema.DUMMY_WORK_ITEM).on(txn)
rows = yield inTransaction(self.store.newTransaction, op2)
self.assertEquals(rows, [])
def op3(txn):
return Select([schema.DUMMY_WORK_DONE.WORK_ID,
schema.DUMMY_WORK_DONE.A_PLUS_B],
- From=schema.DUMMY_WORK_DONE).on(txn)
+ From=schema.DUMMY_WORK_DONE).on(txn)
rows = yield inTransaction(self.store.newTransaction, op3)
self.assertEquals(rows, [])
@@ -763,18 +818,23 @@
def __init__(self, *ignored):
pass
+
def _start(self):
pass
+
+
class BaseQueuerTests(TestCase):
def setUp(self):
self.proposal = None
self.patch(twext.enterprise.queue, "WorkProposal", DummyProposal)
+
def _proposalCallback(self, proposal):
self.proposal = proposal
+
def test_proposalCallbacks(self):
queuer = _BaseQueuer()
queuer.callWithNewProposals(self._proposalCallback)
@@ -783,6 +843,7 @@
self.assertNotEqual(self.proposal, None)
+
class NonPerformingQueuerTests(TestCase):
@inlineCallbacks
@@ -791,5 +852,3 @@
performer = queuer.choosePerformer()
result = (yield performer.performWork(None, None))
self.assertEquals(result, None)
-
-
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140312/0edcc206/attachment.html>
More information about the calendarserver-changes
mailing list