[CalendarServer-changes] [11882] CalendarServer/branches/users/glyph/whenNotProposed/twext/ enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Wed Mar 12 11:17:29 PDT 2014
Revision: 11882
http://trac.calendarserver.org//changeset/11882
Author: glyph at apple.com
Date: 2013-11-04 12:07:30 -0800 (Mon, 04 Nov 2013)
Log Message:
-----------
Cleanups for coding standard issues.
Modified Paths:
--------------
CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/queue.py
CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/test/test_queue.py
Modified: CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/queue.py 2013-11-04 20:06:35 UTC (rev 11881)
+++ CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/queue.py 2013-11-04 20:07:30 UTC (rev 11882)
@@ -149,7 +149,8 @@
NodeTable.addColumn("PORT", SQLType("integer", None))
NodeTable.addColumn("TIME", SQLType("timestamp", None)).setDefaultValue(
# Note: in the real data structure, this is actually a not-cleaned-up
- # sqlparse internal data structure, but it *should* look closer to this.
+ # sqlparse internal data structure, but it *should* look closer to
+ # this.
ProcedureCall("timezone", ["UTC", NamedValue('CURRENT_TIMESTAMP')])
)
for column in NodeTable.columns:
@@ -677,8 +678,8 @@
"""
def __init__(self, peerPool, boxReceiver=None, locator=None):
- super(ConnectionFromWorker, self).__init__(peerPool.schema, boxReceiver,
- locator)
+ super(ConnectionFromWorker, self).__init__(peerPool.schema,
+ boxReceiver, locator)
self.peerPool = peerPool
self._load = 0
@@ -830,9 +831,9 @@
workItem = yield workItemClass.load(txn, workID)
if workItem.group is not None:
yield NamedLock.acquire(txn, workItem.group)
- # TODO: what if we fail? error-handling should be recorded someplace,
- # the row should probably be marked, re-tries should be triggerable
- # administratively.
+ # TODO: what if we fail? error-handling should be recorded
+ # someplace, the row should probably be marked, re-tries should be
+ # triggerable administratively.
yield workItem.delete()
# TODO: verify that workID is the primary key someplace.
yield workItem.doWork()
@@ -865,9 +866,6 @@
-
-
-
class WorkerFactory(Factory, object):
"""
Factory, to be used as the client to connect from the worker to the
@@ -967,8 +965,8 @@
self._whenExecuted.errback(why)
reactor = self._chooser.reactor
when = max(0, astimestamp(item.notBefore) - reactor.seconds())
- # TODO: Track the returned DelayedCall so it can be stopped when
- # the service stops.
+ # TODO: Track the returned DelayedCall so it can be stopped
+ # when the service stops.
self._chooser.reactor.callLater(when, maybeLater)
@self.txn.postAbort
def whenFailed():
@@ -1023,6 +1021,8 @@
"""
return _cloneDeferred(self._whenCommitted)
+
+
class _BaseQueuer(object):
implements(IQueuer)
@@ -1030,13 +1030,16 @@
super(_BaseQueuer, self).__init__()
self.proposalCallbacks = set()
+
def callWithNewProposals(self, callback):
- self.proposalCallbacks.add(callback);
+ self.proposalCallbacks.add(callback)
+
def transferProposalCallbacks(self, newQueuer):
newQueuer.proposalCallbacks = self.proposalCallbacks
return newQueuer
+
def enqueueWork(self, txn, workItemType, **kw):
"""
There is some work to do. Do it, someplace else, ideally in parallel.
@@ -1061,6 +1064,7 @@
return wp
+
class PeerConnectionPool(_BaseQueuer, MultiService, object):
"""
Each node has a L{PeerConnectionPool} connecting it to all the other nodes
@@ -1140,7 +1144,7 @@
self.mappedPeers = {}
self.schema = schema
self._startingUp = None
- self._listeningPortObject = None
+ self._listeningPort = None
self._lastSeenTotalNodes = 1
self._lastSeenNodeIndex = 1
@@ -1197,7 +1201,8 @@
A peer has requested us to perform some work; choose a work performer
local to this node, and then execute it.
"""
- return self.choosePerformer(onlyLocally=True).performWork(table, workID)
+ performer = self.choosePerformer(onlyLocally=True)
+ return performer.performWork(table, workID)
def allWorkItemTypes(self):
@@ -1225,8 +1230,8 @@
@return: the maximum number of other L{PeerConnectionPool} instances
that may be connected to the database described by
- C{self.transactionFactory}. Note that this is not the current count
- by connectivity, but the count according to the database.
+ C{self.transactionFactory}. Note that this is not the current
+ count by connectivity, but the count according to the database.
@rtype: L{int}
"""
# TODO
@@ -1277,7 +1282,6 @@
overdueItem.workID)
return inTransaction(self.transactionFactory, workCheck)
-
_currentWorkDeferred = None
_lostWorkCheckCall = None
@@ -1315,10 +1319,10 @@
@inlineCallbacks
def startup(txn):
endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
- # If this fails, the failure mode is going to be ugly, just like all
- # conflicted-port failures. But, at least it won't proceed.
- self._listeningPortObject = yield endpoint.listen(self.peerFactory())
- self.ampPort = self._listeningPortObject.getHost().port
+ # If this fails, the failure mode is going to be ugly, just like
+ # all conflicted-port failures. But, at least it won't proceed.
+ self._listeningPort = yield endpoint.listen(self.peerFactory())
+ self.ampPort = self._listeningPort.getHost().port
yield Lock.exclusive(NodeInfo.table).on(txn)
nodes = yield self.activeNodes(txn)
selves = [node for node in nodes
@@ -1354,8 +1358,8 @@
yield super(PeerConnectionPool, self).stopService()
if self._startingUp is not None:
yield self._startingUp
- if self._listeningPortObject is not None:
- yield self._listeningPortObject.stopListening()
+ if self._listeningPort is not None:
+ yield self._listeningPort.stopListening()
if self._lostWorkCheckCall is not None:
self._lostWorkCheckCall.cancel()
if self._currentWorkDeferred is not None:
@@ -1430,8 +1434,6 @@
-
-
class LocalQueuer(_BaseQueuer):
"""
When work is enqueued with this queuer, it is just executed locally.
@@ -1458,7 +1460,8 @@
"""
Implementor of C{performWork} that doesn't actual perform any work. This
is used in the case where you want to be able to enqueue work for someone
- else to do, but not take on any work yourself (such as a command line tool).
+ else to do, but not take on any work yourself (such as a command line
+ tool).
"""
implements(_IWorkPerformer)
@@ -1469,6 +1472,7 @@
return succeed(None)
+
class NonPerformingQueuer(_BaseQueuer):
"""
When work is enqueued with this queuer, it is never executed locally.
@@ -1487,4 +1491,4 @@
"""
Choose to perform the work locally.
"""
- return NonPerformer()
\ No newline at end of file
+ return NonPerformer()
Modified: CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/test/test_queue.py 2013-11-04 20:06:35 UTC (rev 11881)
+++ CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/test/test_queue.py 2013-11-04 20:07:30 UTC (rev 11882)
@@ -67,7 +67,7 @@
def callLater(self, _seconds, _f, *args, **kw):
if _seconds < 0:
- raise ValueError("%s<0: "%(_seconds,))
+ raise ValueError("%s<0: " % (_seconds,))
return super(Clock, self).callLater(_seconds, _f, *args, **kw)
@@ -393,7 +393,8 @@
# Next, create one that's actually far enough into the past to run.
yield DummyWorkItem.create(
txn, a=3, b=4, notBefore=(
- # Schedule it in the past so that it should have already run.
+ # Schedule it in the past so that it should have already
+ # run.
fakeNow - datetime.timedelta(
seconds=qpool.queueProcessTimeout + 20
)
@@ -619,6 +620,7 @@
self.receiver, self.sender = self.sender, self.receiver
return result
+
def flush(self, turns=10):
"""
Keep relaying data until there's no more.
@@ -718,7 +720,7 @@
def op2(txn):
return Select([schema.DUMMY_WORK_DONE.WORK_ID,
schema.DUMMY_WORK_DONE.A_PLUS_B],
- From=schema.DUMMY_WORK_DONE).on(txn)
+ From=schema.DUMMY_WORK_DONE).on(txn)
rows = yield inTransaction(self.store.newTransaction, op2)
self.assertEquals(rows, [[4321, 7]])
@@ -729,7 +731,7 @@
When a L{WorkItem} is concurrently deleted by another transaction, it
should I{not} perform its work.
"""
- # Provide access to a method called 'concurrently' everything using
+ # Provide access to a method called 'concurrently' everything using
original = self.store.newTransaction
def decorate(*a, **k):
result = original(*a, **k)
@@ -746,13 +748,13 @@
# Sanity check on the concurrent deletion.
def op2(txn):
return Select([schema.DUMMY_WORK_ITEM.WORK_ID],
- From=schema.DUMMY_WORK_ITEM).on(txn)
+ From=schema.DUMMY_WORK_ITEM).on(txn)
rows = yield inTransaction(self.store.newTransaction, op2)
self.assertEquals(rows, [])
def op3(txn):
return Select([schema.DUMMY_WORK_DONE.WORK_ID,
schema.DUMMY_WORK_DONE.A_PLUS_B],
- From=schema.DUMMY_WORK_DONE).on(txn)
+ From=schema.DUMMY_WORK_DONE).on(txn)
rows = yield inTransaction(self.store.newTransaction, op3)
self.assertEquals(rows, [])
@@ -763,18 +765,23 @@
def __init__(self, *ignored):
pass
+
def _start(self):
pass
+
+
class BaseQueuerTests(TestCase):
def setUp(self):
self.proposal = None
self.patch(twext.enterprise.queue, "WorkProposal", DummyProposal)
+
def _proposalCallback(self, proposal):
self.proposal = proposal
+
def test_proposalCallbacks(self):
queuer = _BaseQueuer()
queuer.callWithNewProposals(self._proposalCallback)
@@ -783,6 +790,7 @@
self.assertNotEqual(self.proposal, None)
+
class NonPerformingQueuerTests(TestCase):
@inlineCallbacks
@@ -791,5 +799,3 @@
performer = queuer.choosePerformer()
result = (yield performer.performWork(None, None))
self.assertEquals(result, None)
-
-
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140312/d9f7ab3b/attachment.html>
More information about the calendarserver-changes
mailing list