<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>[11882] CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise</title>
</head>
<body>
<style type="text/css"><!--
#msg dl.meta { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; }
#msg dl.meta dt { float: left; width: 6em; font-weight: bold; }
#msg dt:after { content:':';}
#msg dl, #msg dt, #msg ul, #msg li, #header, #footer, #logmsg { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; }
#msg dl a { font-weight: bold}
#msg dl a:link { color:#fc3; }
#msg dl a:active { color:#ff0; }
#msg dl a:visited { color:#cc6; }
h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: bold; }
#msg pre { overflow: auto; background: #ffc; border: 1px #fa0 solid; padding: 6px; }
#logmsg { background: #ffc; border: 1px #fa0 solid; padding: 1em 1em 0 1em; }
#logmsg p, #logmsg pre, #logmsg blockquote { margin: 0 0 1em 0; }
#logmsg p, #logmsg li, #logmsg dt, #logmsg dd { line-height: 14pt; }
#logmsg h1, #logmsg h2, #logmsg h3, #logmsg h4, #logmsg h5, #logmsg h6 { margin: .5em 0; }
#logmsg h1:first-child, #logmsg h2:first-child, #logmsg h3:first-child, #logmsg h4:first-child, #logmsg h5:first-child, #logmsg h6:first-child { margin-top: 0; }
#logmsg ul, #logmsg ol { padding: 0; list-style-position: inside; margin: 0 0 0 1em; }
#logmsg ul { text-indent: -1em; padding-left: 1em; }#logmsg ol { text-indent: -1.5em; padding-left: 1.5em; }
#logmsg > ul, #logmsg > ol { margin: 0 0 1em 0; }
#logmsg pre { background: #eee; padding: 1em; }
#logmsg blockquote { border: 1px solid #fa0; border-left-width: 10px; padding: 1em 1em 0 1em; background: white;}
#logmsg dl { margin: 0; }
#logmsg dt { font-weight: bold; }
#logmsg dd { margin: 0; padding: 0 0 0.5em 0; }
#logmsg dd:before { content:'\00bb';}
#logmsg table { border-spacing: 0px; border-collapse: collapse; border-top: 4px solid #fa0; border-bottom: 1px solid #fa0; background: #fff; }
#logmsg table th { text-align: left; font-weight: normal; padding: 0.2em 0.5em; border-top: 1px dotted #fa0; }
#logmsg table td { text-align: right; border-top: 1px dotted #fa0; padding: 0.2em 0.5em; }
#logmsg table thead th { text-align: center; border-bottom: 1px solid #fa0; }
#logmsg table th.Corner { text-align: left; }
#logmsg hr { border: none 0; border-top: 2px dashed #fa0; height: 1px; }
#header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: 6px; }
#patch { width: 100%; }
#patch h4 {font-family: verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;}
#patch .propset h4, #patch .binary h4 {margin:0;}
#patch pre {padding:0;line-height:1.2em;margin:0;}
#patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;}
#patch .propset .diff, #patch .binary .diff {padding:10px 0;}
#patch span {display:block;padding:0 10px;}
#patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, #patch .copfile {border:1px solid #ccc;margin:10px 0;}
#patch ins {background:#dfd;text-decoration:none;display:block;padding:0 10px;}
#patch del {background:#fdd;text-decoration:none;display:block;padding:0 10px;}
#patch .lines, .info {color:#888;background:#fff;}
--></style>
<div id="msg">
<dl class="meta">
<dt>Revision</dt> <dd><a href="http://trac.calendarserver.org//changeset/11882">11882</a></dd>
<dt>Author</dt> <dd>glyph@apple.com</dd>
<dt>Date</dt> <dd>2013-11-04 12:07:30 -0800 (Mon, 04 Nov 2013)</dd>
</dl>
<h3>Log Message</h3>
<pre>Cleanups for coding standard issues.</pre>
<h3>Modified Paths</h3>
<ul>
<li><a href="#CalendarServerbranchesusersglyphwhenNotProposedtwextenterprisequeuepy">CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/queue.py</a></li>
<li><a href="#CalendarServerbranchesusersglyphwhenNotProposedtwextenterprisetesttest_queuepy">CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/test/test_queue.py</a></li>
</ul>
</div>
<div id="patch">
<h3>Diff</h3>
<a id="CalendarServerbranchesusersglyphwhenNotProposedtwextenterprisequeuepy"></a>
<div class="modfile"><h4>Modified: CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/queue.py (11881 => 11882)</h4>
<pre class="diff"><span>
<span class="info">--- CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/queue.py        2013-11-04 20:06:35 UTC (rev 11881)
+++ CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/queue.py        2013-11-04 20:07:30 UTC (rev 11882)
</span><span class="lines">@@ -149,7 +149,8 @@
</span><span class="cx"> NodeTable.addColumn("PORT", SQLType("integer", None))
</span><span class="cx"> NodeTable.addColumn("TIME", SQLType("timestamp", None)).setDefaultValue(
</span><span class="cx"> # Note: in the real data structure, this is actually a not-cleaned-up
</span><del>- # sqlparse internal data structure, but it *should* look closer to this.
</del><ins>+ # sqlparse internal data structure, but it *should* look closer to
+ # this.
</ins><span class="cx"> ProcedureCall("timezone", ["UTC", NamedValue('CURRENT_TIMESTAMP')])
</span><span class="cx"> )
</span><span class="cx"> for column in NodeTable.columns:
</span><span class="lines">@@ -677,8 +678,8 @@
</span><span class="cx"> """
</span><span class="cx">
</span><span class="cx"> def __init__(self, peerPool, boxReceiver=None, locator=None):
</span><del>- super(ConnectionFromWorker, self).__init__(peerPool.schema, boxReceiver,
- locator)
</del><ins>+ super(ConnectionFromWorker, self).__init__(peerPool.schema,
+ boxReceiver, locator)
</ins><span class="cx"> self.peerPool = peerPool
</span><span class="cx"> self._load = 0
</span><span class="cx">
</span><span class="lines">@@ -830,9 +831,9 @@
</span><span class="cx"> workItem = yield workItemClass.load(txn, workID)
</span><span class="cx"> if workItem.group is not None:
</span><span class="cx"> yield NamedLock.acquire(txn, workItem.group)
</span><del>- # TODO: what if we fail? error-handling should be recorded someplace,
- # the row should probably be marked, re-tries should be triggerable
- # administratively.
</del><ins>+ # TODO: what if we fail? error-handling should be recorded
+ # someplace, the row should probably be marked, re-tries should be
+ # triggerable administratively.
</ins><span class="cx"> yield workItem.delete()
</span><span class="cx"> # TODO: verify that workID is the primary key someplace.
</span><span class="cx"> yield workItem.doWork()
</span><span class="lines">@@ -865,9 +866,6 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx">
</span><del>-
-
-
</del><span class="cx"> class WorkerFactory(Factory, object):
</span><span class="cx"> """
</span><span class="cx"> Factory, to be used as the client to connect from the worker to the
</span><span class="lines">@@ -967,8 +965,8 @@
</span><span class="cx"> self._whenExecuted.errback(why)
</span><span class="cx"> reactor = self._chooser.reactor
</span><span class="cx"> when = max(0, astimestamp(item.notBefore) - reactor.seconds())
</span><del>- # TODO: Track the returned DelayedCall so it can be stopped when
- # the service stops.
</del><ins>+ # TODO: Track the returned DelayedCall so it can be stopped
+ # when the service stops.
</ins><span class="cx"> self._chooser.reactor.callLater(when, maybeLater)
</span><span class="cx"> @self.txn.postAbort
</span><span class="cx"> def whenFailed():
</span><span class="lines">@@ -1023,6 +1021,8 @@
</span><span class="cx"> """
</span><span class="cx"> return _cloneDeferred(self._whenCommitted)
</span><span class="cx">
</span><ins>+
+
</ins><span class="cx"> class _BaseQueuer(object):
</span><span class="cx"> implements(IQueuer)
</span><span class="cx">
</span><span class="lines">@@ -1030,13 +1030,16 @@
</span><span class="cx"> super(_BaseQueuer, self).__init__()
</span><span class="cx"> self.proposalCallbacks = set()
</span><span class="cx">
</span><ins>+
</ins><span class="cx"> def callWithNewProposals(self, callback):
</span><del>- self.proposalCallbacks.add(callback);
</del><ins>+ self.proposalCallbacks.add(callback)
</ins><span class="cx">
</span><ins>+
</ins><span class="cx"> def transferProposalCallbacks(self, newQueuer):
</span><span class="cx"> newQueuer.proposalCallbacks = self.proposalCallbacks
</span><span class="cx"> return newQueuer
</span><span class="cx">
</span><ins>+
</ins><span class="cx"> def enqueueWork(self, txn, workItemType, **kw):
</span><span class="cx"> """
</span><span class="cx"> There is some work to do. Do it, someplace else, ideally in parallel.
</span><span class="lines">@@ -1061,6 +1064,7 @@
</span><span class="cx"> return wp
</span><span class="cx">
</span><span class="cx">
</span><ins>+
</ins><span class="cx"> class PeerConnectionPool(_BaseQueuer, MultiService, object):
</span><span class="cx"> """
</span><span class="cx"> Each node has a L{PeerConnectionPool} connecting it to all the other nodes
</span><span class="lines">@@ -1140,7 +1144,7 @@
</span><span class="cx"> self.mappedPeers = {}
</span><span class="cx"> self.schema = schema
</span><span class="cx"> self._startingUp = None
</span><del>- self._listeningPortObject = None
</del><ins>+ self._listeningPort = None
</ins><span class="cx"> self._lastSeenTotalNodes = 1
</span><span class="cx"> self._lastSeenNodeIndex = 1
</span><span class="cx">
</span><span class="lines">@@ -1197,7 +1201,8 @@
</span><span class="cx"> A peer has requested us to perform some work; choose a work performer
</span><span class="cx"> local to this node, and then execute it.
</span><span class="cx"> """
</span><del>- return self.choosePerformer(onlyLocally=True).performWork(table, workID)
</del><ins>+ performer = self.choosePerformer(onlyLocally=True)
+ return performer.performWork(table, workID)
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> def allWorkItemTypes(self):
</span><span class="lines">@@ -1225,8 +1230,8 @@
</span><span class="cx">
</span><span class="cx"> @return: the maximum number of other L{PeerConnectionPool} instances
</span><span class="cx"> that may be connected to the database described by
</span><del>- C{self.transactionFactory}. Note that this is not the current count
- by connectivity, but the count according to the database.
</del><ins>+ C{self.transactionFactory}. Note that this is not the current
+ count by connectivity, but the count according to the database.
</ins><span class="cx"> @rtype: L{int}
</span><span class="cx"> """
</span><span class="cx"> # TODO
</span><span class="lines">@@ -1277,7 +1282,6 @@
</span><span class="cx"> overdueItem.workID)
</span><span class="cx"> return inTransaction(self.transactionFactory, workCheck)
</span><span class="cx">
</span><del>-
</del><span class="cx"> _currentWorkDeferred = None
</span><span class="cx"> _lostWorkCheckCall = None
</span><span class="cx">
</span><span class="lines">@@ -1315,10 +1319,10 @@
</span><span class="cx"> @inlineCallbacks
</span><span class="cx"> def startup(txn):
</span><span class="cx"> endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
</span><del>- # If this fails, the failure mode is going to be ugly, just like all
- # conflicted-port failures. But, at least it won't proceed.
- self._listeningPortObject = yield endpoint.listen(self.peerFactory())
- self.ampPort = self._listeningPortObject.getHost().port
</del><ins>+ # If this fails, the failure mode is going to be ugly, just like
+ # all conflicted-port failures. But, at least it won't proceed.
+ self._listeningPort = yield endpoint.listen(self.peerFactory())
+ self.ampPort = self._listeningPort.getHost().port
</ins><span class="cx"> yield Lock.exclusive(NodeInfo.table).on(txn)
</span><span class="cx"> nodes = yield self.activeNodes(txn)
</span><span class="cx"> selves = [node for node in nodes
</span><span class="lines">@@ -1354,8 +1358,8 @@
</span><span class="cx"> yield super(PeerConnectionPool, self).stopService()
</span><span class="cx"> if self._startingUp is not None:
</span><span class="cx"> yield self._startingUp
</span><del>- if self._listeningPortObject is not None:
- yield self._listeningPortObject.stopListening()
</del><ins>+ if self._listeningPort is not None:
+ yield self._listeningPort.stopListening()
</ins><span class="cx"> if self._lostWorkCheckCall is not None:
</span><span class="cx"> self._lostWorkCheckCall.cancel()
</span><span class="cx"> if self._currentWorkDeferred is not None:
</span><span class="lines">@@ -1430,8 +1434,6 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx">
</span><del>-
-
</del><span class="cx"> class LocalQueuer(_BaseQueuer):
</span><span class="cx"> """
</span><span class="cx"> When work is enqueued with this queuer, it is just executed locally.
</span><span class="lines">@@ -1458,7 +1460,8 @@
</span><span class="cx"> """
</span><span class="cx"> Implementor of C{performWork} that doesn't actual perform any work. This
</span><span class="cx"> is used in the case where you want to be able to enqueue work for someone
</span><del>- else to do, but not take on any work yourself (such as a command line tool).
</del><ins>+ else to do, but not take on any work yourself (such as a command line
+ tool).
</ins><span class="cx"> """
</span><span class="cx"> implements(_IWorkPerformer)
</span><span class="cx">
</span><span class="lines">@@ -1469,6 +1472,7 @@
</span><span class="cx"> return succeed(None)
</span><span class="cx">
</span><span class="cx">
</span><ins>+
</ins><span class="cx"> class NonPerformingQueuer(_BaseQueuer):
</span><span class="cx"> """
</span><span class="cx"> When work is enqueued with this queuer, it is never executed locally.
</span><span class="lines">@@ -1487,4 +1491,4 @@
</span><span class="cx"> """
</span><span class="cx"> Choose to perform the work locally.
</span><span class="cx"> """
</span><del>- return NonPerformer()
</del><span class="cx">\ No newline at end of file
</span><ins>+ return NonPerformer()
</ins></span></pre></div>
<a id="CalendarServerbranchesusersglyphwhenNotProposedtwextenterprisetesttest_queuepy"></a>
<div class="modfile"><h4>Modified: CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/test/test_queue.py (11881 => 11882)</h4>
<pre class="diff"><span>
<span class="info">--- CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/test/test_queue.py        2013-11-04 20:06:35 UTC (rev 11881)
+++ CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/test/test_queue.py        2013-11-04 20:07:30 UTC (rev 11882)
</span><span class="lines">@@ -67,7 +67,7 @@
</span><span class="cx">
</span><span class="cx"> def callLater(self, _seconds, _f, *args, **kw):
</span><span class="cx"> if _seconds < 0:
</span><del>- raise ValueError("%s<0: "%(_seconds,))
</del><ins>+ raise ValueError("%s<0: " % (_seconds,))
</ins><span class="cx"> return super(Clock, self).callLater(_seconds, _f, *args, **kw)
</span><span class="cx">
</span><span class="cx">
</span><span class="lines">@@ -393,7 +393,8 @@
</span><span class="cx"> # Next, create one that's actually far enough into the past to run.
</span><span class="cx"> yield DummyWorkItem.create(
</span><span class="cx"> txn, a=3, b=4, notBefore=(
</span><del>- # Schedule it in the past so that it should have already run.
</del><ins>+ # Schedule it in the past so that it should have already
+ # run.
</ins><span class="cx"> fakeNow - datetime.timedelta(
</span><span class="cx"> seconds=qpool.queueProcessTimeout + 20
</span><span class="cx"> )
</span><span class="lines">@@ -619,6 +620,7 @@
</span><span class="cx"> self.receiver, self.sender = self.sender, self.receiver
</span><span class="cx"> return result
</span><span class="cx">
</span><ins>+
</ins><span class="cx"> def flush(self, turns=10):
</span><span class="cx"> """
</span><span class="cx"> Keep relaying data until there's no more.
</span><span class="lines">@@ -718,7 +720,7 @@
</span><span class="cx"> def op2(txn):
</span><span class="cx"> return Select([schema.DUMMY_WORK_DONE.WORK_ID,
</span><span class="cx"> schema.DUMMY_WORK_DONE.A_PLUS_B],
</span><del>- From=schema.DUMMY_WORK_DONE).on(txn)
</del><ins>+ From=schema.DUMMY_WORK_DONE).on(txn)
</ins><span class="cx"> rows = yield inTransaction(self.store.newTransaction, op2)
</span><span class="cx"> self.assertEquals(rows, [[4321, 7]])
</span><span class="cx">
</span><span class="lines">@@ -729,7 +731,7 @@
</span><span class="cx"> When a L{WorkItem} is concurrently deleted by another transaction, it
</span><span class="cx"> should I{not} perform its work.
</span><span class="cx"> """
</span><del>- # Provide access to a method called 'concurrently' everything using
</del><ins>+ # Provide access to a method called 'concurrently' everything using
</ins><span class="cx"> original = self.store.newTransaction
</span><span class="cx"> def decorate(*a, **k):
</span><span class="cx"> result = original(*a, **k)
</span><span class="lines">@@ -746,13 +748,13 @@
</span><span class="cx"> # Sanity check on the concurrent deletion.
</span><span class="cx"> def op2(txn):
</span><span class="cx"> return Select([schema.DUMMY_WORK_ITEM.WORK_ID],
</span><del>- From=schema.DUMMY_WORK_ITEM).on(txn)
</del><ins>+ From=schema.DUMMY_WORK_ITEM).on(txn)
</ins><span class="cx"> rows = yield inTransaction(self.store.newTransaction, op2)
</span><span class="cx"> self.assertEquals(rows, [])
</span><span class="cx"> def op3(txn):
</span><span class="cx"> return Select([schema.DUMMY_WORK_DONE.WORK_ID,
</span><span class="cx"> schema.DUMMY_WORK_DONE.A_PLUS_B],
</span><del>- From=schema.DUMMY_WORK_DONE).on(txn)
</del><ins>+ From=schema.DUMMY_WORK_DONE).on(txn)
</ins><span class="cx"> rows = yield inTransaction(self.store.newTransaction, op3)
</span><span class="cx"> self.assertEquals(rows, [])
</span><span class="cx">
</span><span class="lines">@@ -763,18 +765,23 @@
</span><span class="cx"> def __init__(self, *ignored):
</span><span class="cx"> pass
</span><span class="cx">
</span><ins>+
</ins><span class="cx"> def _start(self):
</span><span class="cx"> pass
</span><span class="cx">
</span><ins>+
+
</ins><span class="cx"> class BaseQueuerTests(TestCase):
</span><span class="cx">
</span><span class="cx"> def setUp(self):
</span><span class="cx"> self.proposal = None
</span><span class="cx"> self.patch(twext.enterprise.queue, "WorkProposal", DummyProposal)
</span><span class="cx">
</span><ins>+
</ins><span class="cx"> def _proposalCallback(self, proposal):
</span><span class="cx"> self.proposal = proposal
</span><span class="cx">
</span><ins>+
</ins><span class="cx"> def test_proposalCallbacks(self):
</span><span class="cx"> queuer = _BaseQueuer()
</span><span class="cx"> queuer.callWithNewProposals(self._proposalCallback)
</span><span class="lines">@@ -783,6 +790,7 @@
</span><span class="cx"> self.assertNotEqual(self.proposal, None)
</span><span class="cx">
</span><span class="cx">
</span><ins>+
</ins><span class="cx"> class NonPerformingQueuerTests(TestCase):
</span><span class="cx">
</span><span class="cx"> @inlineCallbacks
</span><span class="lines">@@ -791,5 +799,3 @@
</span><span class="cx"> performer = queuer.choosePerformer()
</span><span class="cx"> result = (yield performer.performWork(None, None))
</span><span class="cx"> self.assertEquals(result, None)
</span><del>-
-
</del></span></pre>
</div>
</div>
</body>
</html>