[CalendarServer-changes] [11882] CalendarServer/branches/users/glyph/whenNotProposed/twext/ enterprise

source_changes at macosforge.org source_changes at macosforge.org
Wed Mar 12 11:17:29 PDT 2014


Revision: 11882
          http://trac.calendarserver.org//changeset/11882
Author:   glyph at apple.com
Date:     2013-11-04 12:07:30 -0800 (Mon, 04 Nov 2013)
Log Message:
-----------
Cleanups for coding standard issues.

Modified Paths:
--------------
    CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/queue.py
    CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/test/test_queue.py

Modified: CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/queue.py
===================================================================
--- CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/queue.py	2013-11-04 20:06:35 UTC (rev 11881)
+++ CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/queue.py	2013-11-04 20:07:30 UTC (rev 11882)
@@ -149,7 +149,8 @@
     NodeTable.addColumn("PORT", SQLType("integer", None))
     NodeTable.addColumn("TIME", SQLType("timestamp", None)).setDefaultValue(
         # Note: in the real data structure, this is actually a not-cleaned-up
-        # sqlparse internal data structure, but it *should* look closer to this.
+        # sqlparse internal data structure, but it *should* look closer to
+        # this.
         ProcedureCall("timezone", ["UTC", NamedValue('CURRENT_TIMESTAMP')])
     )
     for column in NodeTable.columns:
@@ -677,8 +678,8 @@
     """
 
     def __init__(self, peerPool, boxReceiver=None, locator=None):
-        super(ConnectionFromWorker, self).__init__(peerPool.schema, boxReceiver,
-                                                   locator)
+        super(ConnectionFromWorker, self).__init__(peerPool.schema,
+                                                   boxReceiver, locator)
         self.peerPool = peerPool
         self._load = 0
 
@@ -830,9 +831,9 @@
             workItem = yield workItemClass.load(txn, workID)
             if workItem.group is not None:
                 yield NamedLock.acquire(txn, workItem.group)
-            # TODO: what if we fail?  error-handling should be recorded someplace,
-            # the row should probably be marked, re-tries should be triggerable
-            # administratively.
+            # TODO: what if we fail?  error-handling should be recorded
+            # someplace, the row should probably be marked, re-tries should be
+            # triggerable administratively.
             yield workItem.delete()
             # TODO: verify that workID is the primary key someplace.
             yield workItem.doWork()
@@ -865,9 +866,6 @@
 
 
 
-
-
-
 class WorkerFactory(Factory, object):
     """
     Factory, to be used as the client to connect from the worker to the
@@ -967,8 +965,8 @@
                         self._whenExecuted.errback(why)
                 reactor = self._chooser.reactor
                 when = max(0, astimestamp(item.notBefore) - reactor.seconds())
-                # TODO: Track the returned DelayedCall so it can be stopped when
-                # the service stops.
+                # TODO: Track the returned DelayedCall so it can be stopped
+                # when the service stops.
                 self._chooser.reactor.callLater(when, maybeLater)
             @self.txn.postAbort
             def whenFailed():
@@ -1023,6 +1021,8 @@
         """
         return _cloneDeferred(self._whenCommitted)
 
+
+
 class _BaseQueuer(object):
     implements(IQueuer)
 
@@ -1030,13 +1030,16 @@
         super(_BaseQueuer, self).__init__()
         self.proposalCallbacks = set()
 
+
     def callWithNewProposals(self, callback):
-        self.proposalCallbacks.add(callback);
+        self.proposalCallbacks.add(callback)
 
+
     def transferProposalCallbacks(self, newQueuer):
         newQueuer.proposalCallbacks = self.proposalCallbacks
         return newQueuer
 
+
     def enqueueWork(self, txn, workItemType, **kw):
         """
         There is some work to do.  Do it, someplace else, ideally in parallel.
@@ -1061,6 +1064,7 @@
         return wp
 
 
+
 class PeerConnectionPool(_BaseQueuer, MultiService, object):
     """
     Each node has a L{PeerConnectionPool} connecting it to all the other nodes
@@ -1140,7 +1144,7 @@
         self.mappedPeers = {}
         self.schema = schema
         self._startingUp = None
-        self._listeningPortObject = None
+        self._listeningPort = None
         self._lastSeenTotalNodes = 1
         self._lastSeenNodeIndex = 1
 
@@ -1197,7 +1201,8 @@
         A peer has requested us to perform some work; choose a work performer
         local to this node, and then execute it.
         """
-        return self.choosePerformer(onlyLocally=True).performWork(table, workID)
+        performer = self.choosePerformer(onlyLocally=True)
+        return performer.performWork(table, workID)
 
 
     def allWorkItemTypes(self):
@@ -1225,8 +1230,8 @@
 
         @return: the maximum number of other L{PeerConnectionPool} instances
             that may be connected to the database described by
-            C{self.transactionFactory}.  Note that this is not the current count
-            by connectivity, but the count according to the database.
+            C{self.transactionFactory}.  Note that this is not the current
+            count by connectivity, but the count according to the database.
         @rtype: L{int}
         """
         # TODO
@@ -1277,7 +1282,6 @@
                                            overdueItem.workID)
         return inTransaction(self.transactionFactory, workCheck)
 
-
     _currentWorkDeferred = None
     _lostWorkCheckCall = None
 
@@ -1315,10 +1319,10 @@
         @inlineCallbacks
         def startup(txn):
             endpoint = TCP4ServerEndpoint(self.reactor, self.ampPort)
-            # If this fails, the failure mode is going to be ugly, just like all
-            # conflicted-port failures.  But, at least it won't proceed.
-            self._listeningPortObject = yield endpoint.listen(self.peerFactory())
-            self.ampPort = self._listeningPortObject.getHost().port
+            # If this fails, the failure mode is going to be ugly, just like
+            # all conflicted-port failures.  But, at least it won't proceed.
+            self._listeningPort = yield endpoint.listen(self.peerFactory())
+            self.ampPort = self._listeningPort.getHost().port
             yield Lock.exclusive(NodeInfo.table).on(txn)
             nodes = yield self.activeNodes(txn)
             selves = [node for node in nodes
@@ -1354,8 +1358,8 @@
         yield super(PeerConnectionPool, self).stopService()
         if self._startingUp is not None:
             yield self._startingUp
-        if self._listeningPortObject is not None:
-            yield self._listeningPortObject.stopListening()
+        if self._listeningPort is not None:
+            yield self._listeningPort.stopListening()
         if self._lostWorkCheckCall is not None:
             self._lostWorkCheckCall.cancel()
         if self._currentWorkDeferred is not None:
@@ -1430,8 +1434,6 @@
 
 
 
-
-
 class LocalQueuer(_BaseQueuer):
     """
     When work is enqueued with this queuer, it is just executed locally.
@@ -1458,7 +1460,8 @@
     """
     Implementor of C{performWork} that doesn't actual perform any work.  This
     is used in the case where you want to be able to enqueue work for someone
-    else to do, but not take on any work yourself (such as a command line tool).
+    else to do, but not take on any work yourself (such as a command line
+    tool).
     """
     implements(_IWorkPerformer)
 
@@ -1469,6 +1472,7 @@
         return succeed(None)
 
 
+
 class NonPerformingQueuer(_BaseQueuer):
     """
     When work is enqueued with this queuer, it is never executed locally.
@@ -1487,4 +1491,4 @@
         """
         Choose to perform the work locally.
         """
-        return NonPerformer()
\ No newline at end of file
+        return NonPerformer()

Modified: CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/test/test_queue.py
===================================================================
--- CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/test/test_queue.py	2013-11-04 20:06:35 UTC (rev 11881)
+++ CalendarServer/branches/users/glyph/whenNotProposed/twext/enterprise/test/test_queue.py	2013-11-04 20:07:30 UTC (rev 11882)
@@ -67,7 +67,7 @@
 
     def callLater(self, _seconds, _f, *args, **kw):
         if _seconds < 0:
-            raise ValueError("%s<0: "%(_seconds,))
+            raise ValueError("%s<0: " % (_seconds,))
         return super(Clock, self).callLater(_seconds, _f, *args, **kw)
 
 
@@ -393,7 +393,8 @@
             # Next, create one that's actually far enough into the past to run.
             yield DummyWorkItem.create(
                 txn, a=3, b=4, notBefore=(
-                    # Schedule it in the past so that it should have already run.
+                    # Schedule it in the past so that it should have already
+                    # run.
                     fakeNow - datetime.timedelta(
                         seconds=qpool.queueProcessTimeout + 20
                     )
@@ -619,6 +620,7 @@
         self.receiver, self.sender = self.sender, self.receiver
         return result
 
+
     def flush(self, turns=10):
         """
         Keep relaying data until there's no more.
@@ -718,7 +720,7 @@
         def op2(txn):
             return Select([schema.DUMMY_WORK_DONE.WORK_ID,
                            schema.DUMMY_WORK_DONE.A_PLUS_B],
-                           From=schema.DUMMY_WORK_DONE).on(txn)
+                          From=schema.DUMMY_WORK_DONE).on(txn)
         rows = yield inTransaction(self.store.newTransaction, op2)
         self.assertEquals(rows, [[4321, 7]])
 
@@ -729,7 +731,7 @@
         When a L{WorkItem} is concurrently deleted by another transaction, it
         should I{not} perform its work.
         """
-        # Provide access to a method called 'concurrently' everything using 
+        # Provide access to a method called 'concurrently' everything using
         original = self.store.newTransaction
         def decorate(*a, **k):
             result = original(*a, **k)
@@ -746,13 +748,13 @@
         # Sanity check on the concurrent deletion.
         def op2(txn):
             return Select([schema.DUMMY_WORK_ITEM.WORK_ID],
-                           From=schema.DUMMY_WORK_ITEM).on(txn)
+                          From=schema.DUMMY_WORK_ITEM).on(txn)
         rows = yield inTransaction(self.store.newTransaction, op2)
         self.assertEquals(rows, [])
         def op3(txn):
             return Select([schema.DUMMY_WORK_DONE.WORK_ID,
                            schema.DUMMY_WORK_DONE.A_PLUS_B],
-                           From=schema.DUMMY_WORK_DONE).on(txn)
+                          From=schema.DUMMY_WORK_DONE).on(txn)
         rows = yield inTransaction(self.store.newTransaction, op3)
         self.assertEquals(rows, [])
 
@@ -763,18 +765,23 @@
     def __init__(self, *ignored):
         pass
 
+
     def _start(self):
         pass
 
+
+
 class BaseQueuerTests(TestCase):
 
     def setUp(self):
         self.proposal = None
         self.patch(twext.enterprise.queue, "WorkProposal", DummyProposal)
 
+
     def _proposalCallback(self, proposal):
         self.proposal = proposal
 
+
     def test_proposalCallbacks(self):
         queuer = _BaseQueuer()
         queuer.callWithNewProposals(self._proposalCallback)
@@ -783,6 +790,7 @@
         self.assertNotEqual(self.proposal, None)
 
 
+
 class NonPerformingQueuerTests(TestCase):
 
     @inlineCallbacks
@@ -791,5 +799,3 @@
         performer = queuer.choosePerformer()
         result = (yield performer.performWork(None, None))
         self.assertEquals(result, None)
-
-
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140312/d9f7ab3b/attachment.html>


More information about the calendarserver-changes mailing list