[CalendarServer-changes] [11898] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Wed Mar 12 11:19:56 PDT 2014


Revision: 11898
          http://trac.calendarserver.org//changeset/11898
Author:   glyph at apple.com
Date:     2013-11-06 15:36:01 -0800 (Wed, 06 Nov 2013)
Log Message:
-----------
Propagate failures to enqueue database work to the whenProposed Deferred, instead of leaving that Deferred un-fired forever and hanging correct work-queue code.

Modified Paths:
--------------
    CalendarServer/trunk/twext/enterprise/fixtures.py
    CalendarServer/trunk/twext/enterprise/queue.py
    CalendarServer/trunk/twext/enterprise/test/test_queue.py

Property Changed:
----------------
    CalendarServer/trunk/


Property changes on: CalendarServer/trunk
___________________________________________________________________
Modified: svn:mergeinfo
   - /CalDAVTester/trunk:11193-11198
/CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/release/CalendarServer-4.3-dev:10180-10190,10192
/CalendarServer/branches/release/CalendarServer-5.1-dev:11846
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/fix-no-ischedule:11607-11871
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/ischedule-dkim:9747-9979
/CalendarServer/branches/users/cdaboo/managed-attachments:9985-10145
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/performance-tweaks:11824-11836
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/store-scheduling:10876-11129
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/gaya/sharedgroups-3:11088-11204
/CalendarServer/branches/users/glyph/always-abort-txn-on-error:9958-9969
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/digest-auth-redux:10624-10635
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/enforce-max-requests:11640-11643
/CalendarServer/branches/users/glyph/hang-fix:11465-11491
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/launchd-wrapper-bis:11413-11436
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/log-cleanups:11691-11731
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/one-home-list-api:10048-10073
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/q:9560-9688
/CalendarServer/branches/users/glyph/queue-locking-and-timing:10204-10289
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sharing-api:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/start-service-start-loop:11060-11065
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/unshare-when-access-revoked:10562-10595
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize:9268-9296
/CalendarServer/branches/users/glyph/warning-cleanups:11347-11357
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/sagen/testing:10827-10851,10853-10855
/CalendarServer/branches/users/wsanchez/transations:5515-5593
   + /CalDAVTester/trunk:11193-11198
/CalendarServer/branches/config-separation:4379-4443
/CalendarServer/branches/egg-info-351:4589-4625
/CalendarServer/branches/generic-sqlstore:6167-6191
/CalendarServer/branches/new-store:5594-5934
/CalendarServer/branches/new-store-no-caldavfile:5911-5935
/CalendarServer/branches/new-store-no-caldavfile-2:5936-5981
/CalendarServer/branches/release/CalendarServer-4.3-dev:10180-10190,10192
/CalendarServer/branches/release/CalendarServer-5.1-dev:11846
/CalendarServer/branches/users/cdaboo/batchupload-6699:6700-7198
/CalendarServer/branches/users/cdaboo/cached-subscription-calendars-5692:5693-5702
/CalendarServer/branches/users/cdaboo/component-set-fixes:8130-8346
/CalendarServer/branches/users/cdaboo/directory-cache-on-demand-3627:3628-3644
/CalendarServer/branches/users/cdaboo/fix-no-ischedule:11607-11871
/CalendarServer/branches/users/cdaboo/implicituidrace:8137-8141
/CalendarServer/branches/users/cdaboo/ischedule-dkim:9747-9979
/CalendarServer/branches/users/cdaboo/managed-attachments:9985-10145
/CalendarServer/branches/users/cdaboo/more-sharing-5591:5592-5601
/CalendarServer/branches/users/cdaboo/partition-4464:4465-4957
/CalendarServer/branches/users/cdaboo/performance-tweaks:11824-11836
/CalendarServer/branches/users/cdaboo/pods:7297-7377
/CalendarServer/branches/users/cdaboo/pycalendar:7085-7206
/CalendarServer/branches/users/cdaboo/pycard:7227-7237
/CalendarServer/branches/users/cdaboo/queued-attendee-refreshes:7740-8287
/CalendarServer/branches/users/cdaboo/relative-config-paths-5070:5071-5105
/CalendarServer/branches/users/cdaboo/shared-calendars-5187:5188-5440
/CalendarServer/branches/users/cdaboo/store-scheduling:10876-11129
/CalendarServer/branches/users/cdaboo/timezones:7443-7699
/CalendarServer/branches/users/cdaboo/txn-debugging:8730-8743
/CalendarServer/branches/users/gaya/sharedgroups-3:11088-11204
/CalendarServer/branches/users/glyph/always-abort-txn-on-error:9958-9969
/CalendarServer/branches/users/glyph/case-insensitive-uid:8772-8805
/CalendarServer/branches/users/glyph/conn-limit:6574-6577
/CalendarServer/branches/users/glyph/contacts-server-merge:4971-5080
/CalendarServer/branches/users/glyph/dalify:6932-7023
/CalendarServer/branches/users/glyph/db-reconnect:6824-6876
/CalendarServer/branches/users/glyph/deploybuild:7563-7572
/CalendarServer/branches/users/glyph/digest-auth-redux:10624-10635
/CalendarServer/branches/users/glyph/disable-quota:7718-7727
/CalendarServer/branches/users/glyph/dont-start-postgres:6592-6614
/CalendarServer/branches/users/glyph/enforce-max-requests:11640-11643
/CalendarServer/branches/users/glyph/hang-fix:11465-11491
/CalendarServer/branches/users/glyph/imip-and-admin-html:7866-7984
/CalendarServer/branches/users/glyph/ipv6-client:9054-9105
/CalendarServer/branches/users/glyph/launchd-wrapper-bis:11413-11436
/CalendarServer/branches/users/glyph/linux-tests:6893-6900
/CalendarServer/branches/users/glyph/log-cleanups:11691-11731
/CalendarServer/branches/users/glyph/migrate-merge:8690-8713
/CalendarServer/branches/users/glyph/misc-portability-fixes:7365-7374
/CalendarServer/branches/users/glyph/more-deferreds-6:6322-6368
/CalendarServer/branches/users/glyph/more-deferreds-7:6369-6445
/CalendarServer/branches/users/glyph/multiget-delete:8321-8330
/CalendarServer/branches/users/glyph/new-export:7444-7485
/CalendarServer/branches/users/glyph/one-home-list-api:10048-10073
/CalendarServer/branches/users/glyph/oracle:7106-7155
/CalendarServer/branches/users/glyph/oracle-nulls:7340-7351
/CalendarServer/branches/users/glyph/other-html:8062-8091
/CalendarServer/branches/users/glyph/parallel-sim:8240-8251
/CalendarServer/branches/users/glyph/parallel-upgrade:8376-8400
/CalendarServer/branches/users/glyph/parallel-upgrade_to_1:8571-8583
/CalendarServer/branches/users/glyph/q:9560-9688
/CalendarServer/branches/users/glyph/queue-locking-and-timing:10204-10289
/CalendarServer/branches/users/glyph/quota:7604-7637
/CalendarServer/branches/users/glyph/sendfdport:5388-5424
/CalendarServer/branches/users/glyph/shared-pool-fixes:8436-8443
/CalendarServer/branches/users/glyph/shared-pool-take2:8155-8174
/CalendarServer/branches/users/glyph/sharedpool:6490-6550
/CalendarServer/branches/users/glyph/sharing-api:9192-9205
/CalendarServer/branches/users/glyph/skip-lonely-vtimezones:8524-8535
/CalendarServer/branches/users/glyph/sql-store:5929-6073
/CalendarServer/branches/users/glyph/start-service-start-loop:11060-11065
/CalendarServer/branches/users/glyph/subtransactions:7248-7258
/CalendarServer/branches/users/glyph/table-alias:8651-8664
/CalendarServer/branches/users/glyph/uidexport:7673-7676
/CalendarServer/branches/users/glyph/unshare-when-access-revoked:10562-10595
/CalendarServer/branches/users/glyph/use-system-twisted:5084-5149
/CalendarServer/branches/users/glyph/uuid-normalize:9268-9296
/CalendarServer/branches/users/glyph/warning-cleanups:11347-11357
/CalendarServer/branches/users/glyph/whenNotProposed:11881-11897
/CalendarServer/branches/users/glyph/xattrs-from-files:7757-7769
/CalendarServer/branches/users/sagen/applepush:8126-8184
/CalendarServer/branches/users/sagen/inboxitems:7380-7381
/CalendarServer/branches/users/sagen/locations-resources:5032-5051
/CalendarServer/branches/users/sagen/locations-resources-2:5052-5061
/CalendarServer/branches/users/sagen/purge_old_events:6735-6746
/CalendarServer/branches/users/sagen/resource-delegates-4038:4040-4067
/CalendarServer/branches/users/sagen/resource-delegates-4066:4068-4075
/CalendarServer/branches/users/sagen/resources-2:5084-5093
/CalendarServer/branches/users/sagen/testing:10827-10851,10853-10855
/CalendarServer/branches/users/wsanchez/transations:5515-5593

Modified: CalendarServer/trunk/twext/enterprise/fixtures.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/fixtures.py	2013-11-06 22:56:10 UTC (rev 11897)
+++ CalendarServer/trunk/twext/enterprise/fixtures.py	2013-11-06 23:36:01 UTC (rev 11898)
@@ -76,8 +76,8 @@
 
 def resultOf(deferred, propagate=False):
     """
-    Add a callback and errback which will capture the result of a L{Deferred} in
-    a list, and return that list.  If 'propagate' is True, pass through the
+    Add a callback and errback which will capture the result of a L{Deferred}
+    in a list, and return that list.  If 'propagate' is True, pass through the
     results.
     """
     results = []
@@ -551,6 +551,21 @@
         self._connectResultQueue.append(thunk)
 
 
+    def willConnectTo(self):
+        """
+        Queue a successful result for connect() and immediately add it as a
+        child to this L{ConnectionFactory}.
+
+        @return: a connection object
+        @rtype: L{FakeConnection}
+        """
+        aConnection = FakeConnection(self)
+        def thunk():
+            return aConnection
+        self._connectResultQueue.append(thunk)
+        return aConnection
+
+
     def willFail(self):
         """
         Used by tests to queue a successful result for connect().

Modified: CalendarServer/trunk/twext/enterprise/queue.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/queue.py	2013-11-06 22:56:10 UTC (rev 11897)
+++ CalendarServer/trunk/twext/enterprise/queue.py	2013-11-06 23:36:01 UTC (rev 11898)
@@ -149,7 +149,8 @@
     NodeTable.addColumn("PORT", SQLType("integer", None))
     NodeTable.addColumn("TIME", SQLType("timestamp", None)).setDefaultValue(
         # Note: in the real data structure, this is actually a not-cleaned-up
-        # sqlparse internal data structure, but it *should* look closer to this.
+        # sqlparse internal data structure, but it *should* look closer to
+        # this.
         ProcedureCall("timezone", ["UTC", NamedValue('CURRENT_TIMESTAMP')])
     )
     for column in NodeTable.columns:
@@ -677,8 +678,8 @@
     """
 
     def __init__(self, peerPool, boxReceiver=None, locator=None):
-        super(ConnectionFromWorker, self).__init__(peerPool.schema, boxReceiver,
-                                                   locator)
+        super(ConnectionFromWorker, self).__init__(peerPool.schema,
+                                                   boxReceiver, locator)
         self.peerPool = peerPool
         self._load = 0
 
@@ -830,9 +831,9 @@
             workItem = yield workItemClass.load(txn, workID)
             if workItem.group is not None:
                 yield NamedLock.acquire(txn, workItem.group)
-            # TODO: what if we fail?  error-handling should be recorded someplace,
-            # the row should probably be marked, re-tries should be triggerable
-            # administratively.
+            # TODO: what if we fail?  error-handling should be recorded
+            # someplace, the row should probably be marked, re-tries should be
+            # triggerable administratively.
             yield workItem.delete()
             # TODO: verify that workID is the primary key someplace.
             yield workItem.doWork()
@@ -865,9 +866,6 @@
 
 
 
-
-
-
 class WorkerFactory(Factory, object):
     """
     Factory, to be used as the client to connect from the worker to the
@@ -950,7 +948,7 @@
         waiting for the transaction where that addition was completed to
         commit, and asking the local node controller process to do the work.
         """
-        @passthru(self.workItemType.create(self.txn, **self.kw).addCallback)
+        created = self.workItemType.create(self.txn, **self.kw)
         def whenCreated(item):
             self._whenProposed.callback(self)
             @self.txn.postCommit
@@ -967,12 +965,15 @@
                         self._whenExecuted.errback(why)
                 reactor = self._chooser.reactor
                 when = max(0, astimestamp(item.notBefore) - reactor.seconds())
-                # TODO: Track the returned DelayedCall so it can be stopped when
-                # the service stops.
+                # TODO: Track the returned DelayedCall so it can be stopped
+                # when the service stops.
                 self._chooser.reactor.callLater(when, maybeLater)
             @self.txn.postAbort
             def whenFailed():
                 self._whenCommitted.errback(TransactionFailed)
+        def whenNotCreated(failure):
+            self._whenProposed.errback(failure)
+        created.addCallbacks(whenCreated, whenNotCreated)
 
 
     def whenExecuted(self):
@@ -1023,6 +1024,8 @@
         """
         return _cloneDeferred(self._whenCommitted)
 
+
+
 class _BaseQueuer(object):
     implements(IQueuer)
 
@@ -1030,13 +1033,16 @@
         super(_BaseQueuer, self).__init__()
         self.proposalCallbacks = set()
 
+
     def callWithNewProposals(self, callback):
-        self.proposalCallbacks.add(callback);
+        self.proposalCallbacks.add(callback)
 
+
     def transferProposalCallbacks(self, newQueuer):
         newQueuer.proposalCallbacks = self.proposalCallbacks
         return newQueuer
 
+
     def enqueueWork(self, txn, workItemType, **kw):
         """
         There is some work to do.  Do it, someplace else, ideally in parallel.
@@ -1061,6 +1067,7 @@
         return wp
 
 
+
 class PeerConnectionPool(_BaseQueuer, MultiService, object):
     """
     Each node has a L{PeerConnectionPool} connecting it to all the other nodes
@@ -1140,7 +1147,7 @@
         self.mappedPeers = {}
         self.schema = schema
         self._startingUp = None
-        self._listeningPortObject = None
+        self._listeningPort = None
         self._lastSeenTotalNodes = 1
         self._lastSeenNodeIndex = 1
 
@@ -1197,7 +1204,8 @@
         A peer has requested us to perform some work; choose a work performer
         local to this node, and then execute it.
         """
-        return self.choosePerformer(onlyLocally=True).performWork(table, workID)
+        performer = self.choosePerformer(onlyLocally=True)
+        return performer.performWork(table, workID)
 
 
     def allWorkItemTypes(self):
@@ -1225,8 +1233,8 @@
 
         @return: the maximum number of other L{PeerConnectionPool} instances
             that may be connected to the database described by
-            C{self.transactionFactory}.  Note that this is not the current count
-            by connectivity, but the count according to the database.
+            C{self.transactionFactory}.  Note that this is not the current
+            count by connectivity, but the count according to the database.
         @rtype: L{int}
         """
         # TODO
@@ -1277,7 +1285,6 @@
                                            overdueItem.workID)
         return inTransaction(self.transactionFactory, workCheck)
 
-
     _currentWorkDeferred = None
     _lostWorkCheckCall = None
 
@@ -1315,10 +1322,10 @@
         @inlineCallbacks
         def startup(txn):
             endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
-            # If this fails, the failure mode is going to be ugly, just like all
-            # conflicted-port failures.  But, at least it won't proceed.
-            self._listeningPortObject = yield endpoint.listen(self.peerFactory())
-            self.ampPort = self._listeningPortObject.getHost().port
+            # If this fails, the failure mode is going to be ugly, just like
+            # all conflicted-port failures.  But, at least it won't proceed.
+            self._listeningPort = yield endpoint.listen(self.peerFactory())
+            self.ampPort = self._listeningPort.getHost().port
             yield Lock.exclusive(NodeInfo.table).on(txn)
             nodes = yield self.activeNodes(txn)
             selves = [node for node in nodes
@@ -1354,8 +1361,8 @@
         yield super(PeerConnectionPool, self).stopService()
         if self._startingUp is not None:
             yield self._startingUp
-        if self._listeningPortObject is not None:
-            yield self._listeningPortObject.stopListening()
+        if self._listeningPort is not None:
+            yield self._listeningPort.stopListening()
         if self._lostWorkCheckCall is not None:
             self._lostWorkCheckCall.cancel()
         if self._currentWorkDeferred is not None:
@@ -1430,8 +1437,6 @@
 
 
 
-
-
 class LocalQueuer(_BaseQueuer):
     """
     When work is enqueued with this queuer, it is just executed locally.
@@ -1458,7 +1463,8 @@
     """
     Implementor of C{performWork} that doesn't actual perform any work.  This
     is used in the case where you want to be able to enqueue work for someone
-    else to do, but not take on any work yourself (such as a command line tool).
+    else to do, but not take on any work yourself (such as a command line
+    tool).
     """
     implements(_IWorkPerformer)
 
@@ -1469,6 +1475,7 @@
         return succeed(None)
 
 
+
 class NonPerformingQueuer(_BaseQueuer):
     """
     When work is enqueued with this queuer, it is never executed locally.
@@ -1487,4 +1494,4 @@
         """
         Choose to perform the work locally.
         """
-        return NonPerformer()
\ No newline at end of file
+        return NonPerformer()

Modified: CalendarServer/trunk/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_queue.py	2013-11-06 22:56:10 UTC (rev 11897)
+++ CalendarServer/trunk/twext/enterprise/test/test_queue.py	2013-11-06 23:36:01 UTC (rev 11898)
@@ -36,6 +36,7 @@
 )
 
 from twisted.trial.unittest import TestCase
+from twisted.python.failure import Failure
 from twisted.internet.defer import (
     Deferred, inlineCallbacks, gatherResults, passthru#, returnValue
 )
@@ -55,6 +56,8 @@
 from twisted.test.proto_helpers import StringTransport, MemoryReactor
 from twext.enterprise.fixtures import SteppablePoolHelper
 from twisted.internet.defer import returnValue
+from twext.enterprise.queue import LocalQueuer
+from twext.enterprise.fixtures import ConnectionPoolHelper
 
 from twext.enterprise.queue import _BaseQueuer, NonPerformingQueuer
 import twext.enterprise.queue
@@ -67,7 +70,7 @@
 
     def callLater(self, _seconds, _f, *args, **kw):
         if _seconds < 0:
-            raise ValueError("%s<0: "%(_seconds,))
+            raise ValueError("%s<0: " % (_seconds,))
         return super(Clock, self).callLater(_seconds, _f, *args, **kw)
 
 
@@ -267,6 +270,56 @@
 
 
 
+class WorkProposalTests(TestCase):
+    """
+    Tests for L{WorkProposal}.
+    """
+
+    def test_whenProposedSuccess(self):
+        """
+        The L{Deferred} returned by L{WorkProposal.whenProposed} fires when the
+        SQL sent to the database has completed.
+        """
+        cph = ConnectionPoolHelper()
+        cph.setUp(test=self)
+        cph.pauseHolders()
+        lq = LocalQueuer(cph.createTransaction)
+        enqTxn = cph.createTransaction()
+        wp = lq.enqueueWork(enqTxn, DummyWorkItem, a=3, b=4)
+        d = wp.whenProposed()
+        r = cph.resultOf(d)
+        self.assertEquals(r, [])
+        cph.flushHolders()
+        self.assertEquals(len(r), 1)
+
+
+    def test_whenProposedFailure(self):
+        """
+        The L{Deferred} returned by L{WorkProposal.whenProposed} fails with an
+        errback when the SQL executed to create the WorkItem row fails.
+        """
+        cph = ConnectionPoolHelper()
+        cph.setUp(self)
+        cph.pauseHolders()
+        firstConnection = cph.factory.willConnectTo()
+        enqTxn = cph.createTransaction()
+        # Execute some SQL on the connection before enqueueing the work-item so
+        # that we don't get the initial-statement.
+        enqTxn.execSQL("some sql")
+        lq = LocalQueuer(cph.createTransaction)
+        cph.flushHolders()
+        cph.pauseHolders()
+        wp = lq.enqueueWork(enqTxn, DummyWorkItem, a=3, b=4)
+        firstConnection.executeWillFail(lambda: RuntimeError("foo"))
+        d = wp.whenProposed()
+        r = cph.resultOf(d)
+        self.assertEquals(r, [])
+        cph.flushHolders()
+        self.assertEquals(len(r), 1)
+        self.assertIsInstance(r[0], Failure)
+
+
+
 class PeerConnectionPoolUnitTests(TestCase):
     """
     L{PeerConnectionPool} has many internal components.
@@ -393,7 +446,8 @@
             # Next, create one that's actually far enough into the past to run.
             yield DummyWorkItem.create(
                 txn, a=3, b=4, notBefore=(
-                    # Schedule it in the past so that it should have already run.
+                    # Schedule it in the past so that it should have already
+                    # run.
                     fakeNow - datetime.timedelta(
                         seconds=qpool.queueProcessTimeout + 20
                     )
@@ -619,6 +673,7 @@
         self.receiver, self.sender = self.sender, self.receiver
         return result
 
+
     def flush(self, turns=10):
         """
         Keep relaying data until there's no more.
@@ -718,7 +773,7 @@
         def op2(txn):
             return Select([schema.DUMMY_WORK_DONE.WORK_ID,
                            schema.DUMMY_WORK_DONE.A_PLUS_B],
-                           From=schema.DUMMY_WORK_DONE).on(txn)
+                          From=schema.DUMMY_WORK_DONE).on(txn)
         rows = yield inTransaction(self.store.newTransaction, op2)
         self.assertEquals(rows, [[4321, 7]])
 
@@ -729,7 +784,7 @@
         When a L{WorkItem} is concurrently deleted by another transaction, it
         should I{not} perform its work.
         """
-        # Provide access to a method called 'concurrently' everything using 
+        # Provide access to a method called 'concurrently' everything using
         original = self.store.newTransaction
         def decorate(*a, **k):
             result = original(*a, **k)
@@ -746,13 +801,13 @@
         # Sanity check on the concurrent deletion.
         def op2(txn):
             return Select([schema.DUMMY_WORK_ITEM.WORK_ID],
-                           From=schema.DUMMY_WORK_ITEM).on(txn)
+                          From=schema.DUMMY_WORK_ITEM).on(txn)
         rows = yield inTransaction(self.store.newTransaction, op2)
         self.assertEquals(rows, [])
         def op3(txn):
             return Select([schema.DUMMY_WORK_DONE.WORK_ID,
                            schema.DUMMY_WORK_DONE.A_PLUS_B],
-                           From=schema.DUMMY_WORK_DONE).on(txn)
+                          From=schema.DUMMY_WORK_DONE).on(txn)
         rows = yield inTransaction(self.store.newTransaction, op3)
         self.assertEquals(rows, [])
 
@@ -763,18 +818,23 @@
     def __init__(self, *ignored):
         pass
 
+
     def _start(self):
         pass
 
+
+
 class BaseQueuerTests(TestCase):
 
     def setUp(self):
         self.proposal = None
         self.patch(twext.enterprise.queue, "WorkProposal", DummyProposal)
 
+
     def _proposalCallback(self, proposal):
         self.proposal = proposal
 
+
     def test_proposalCallbacks(self):
         queuer = _BaseQueuer()
         queuer.callWithNewProposals(self._proposalCallback)
@@ -783,6 +843,7 @@
         self.assertNotEqual(self.proposal, None)
 
 
+
 class NonPerformingQueuerTests(TestCase):
 
     @inlineCallbacks
@@ -791,5 +852,3 @@
         performer = queuer.choosePerformer()
         result = (yield performer.performWork(None, None))
         self.assertEquals(result, None)
-
-
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140312/0edcc206/attachment.html>


More information about the calendarserver-changes mailing list