[CalendarServer-changes] [14327] twext/trunk/twext/enterprise

source_changes at macosforge.org source_changes at macosforge.org
Mon Jan 19 17:42:50 PST 2015


Revision: 14327
          http://trac.calendarserver.org//changeset/14327
Author:   sagen at apple.com
Date:     2015-01-19 17:42:50 -0800 (Mon, 19 Jan 2015)
Log Message:
-----------
Make PeerConnectionPool configurable to not take on work itself (e.g. so the master process will give all work to the workers, and not take on work prior to spawning the workers

Modified Paths:
--------------
    twext/trunk/twext/enterprise/jobqueue.py
    twext/trunk/twext/enterprise/test/test_jobqueue.py

Modified: twext/trunk/twext/enterprise/jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/jobqueue.py	2015-01-19 18:50:50 UTC (rev 14326)
+++ twext/trunk/twext/enterprise/jobqueue.py	2015-01-20 01:42:50 UTC (rev 14327)
@@ -1428,7 +1428,7 @@
         """
         current = sum(worker.currentLoad for worker in self.workers)
         total = len(self.workers) * self.maximumLoadPerWorker
-        return ((current * 100) / total) if total else 0
+        return ((current * 100) / total) if total else 100
 
 
     def eachWorkerLoad(self):
@@ -1863,7 +1863,7 @@
     highPriorityLevel = 80      # Percentage load level above which only high priority jobs are processed
     mediumPriorityLevel = 50    # Percentage load level above which high and medium priority jobs are processed
 
-    def __init__(self, reactor, transactionFactory, ampPort):
+    def __init__(self, reactor, transactionFactory, ampPort, useWorkerPool=True):
         """
         Initialize a L{PeerConnectionPool}.
 
@@ -1876,6 +1876,9 @@
 
         @param transactionFactory: a 0- or 1-argument callable that produces an
             L{IAsyncTransaction}
+
+        @param useWorkerPool:  Whether to use a worker pool to manage load
+            or instead take on all work ourselves (e.g. in single process mode)
         """
         super(PeerConnectionPool, self).__init__()
         self.reactor = reactor
@@ -1884,7 +1887,7 @@
         self.pid = self.getpid()
         self.ampPort = ampPort
         self.thisProcess = None
-        self.workerPool = WorkerConnectionPool()
+        self.workerPool = WorkerConnectionPool() if useWorkerPool else None
         self.peers = []
         self.mappedPeers = {}
         self._startingUp = None
@@ -1904,7 +1907,7 @@
 
 
     def totalLoad(self):
-        return self.workerPool.allWorkerLoad()
+        return self.workerPool.allWorkerLoad() if self.workerPool else 0
 
 
     def workerListenerFactory(self):
@@ -1935,15 +1938,19 @@
         @rtype: L{_IJobPerformer} L{ConnectionFromPeerNode} or
             L{WorkerConnectionPool}
         """
-        if self.workerPool.hasAvailableCapacity():
-            return self.workerPool
+        if self.workerPool:
 
-        if self.peers and not onlyLocally:
-            return sorted(self.peers, key=lambda p: p.currentLoadEstimate())[0]
-        else:
-            return LocalPerformer(self.transactionFactory)
+            if self.workerPool.hasAvailableCapacity():
+                return self.workerPool
 
+            if self.peers and not onlyLocally:
+                return sorted(self.peers, key=lambda p: p.currentLoadEstimate())[0]
+            else:
+                raise JobFailedError("No capacity for work")
 
+        return LocalPerformer(self.transactionFactory)
+
+
     def performJobForPeer(self, job):
         """
         A peer has requested us to perform a job; choose a job performer
@@ -2007,7 +2014,8 @@
             # FIXME: need to include capacity of other nodes. For now we only check
             # our own capacity and stop processing if too busy. Other nodes that
             # are not busy will pick up work.
-            level = self.workerPool.loadLevel()
+            # If no workerPool, set level to 0, taking on all work.
+            level = 0 if self.workerPool is None else self.workerPool.loadLevel()
 
             # Check overload level first
             if level > self.overloadLevel:
@@ -2081,8 +2089,8 @@
                     yield txn.commit()
 
             if nextJob is not None:
-                peer = self.choosePerformer(onlyLocally=True)
                 try:
+                    peer = self.choosePerformer(onlyLocally=True)
                     # Send the job over but DO NOT block on the response - that will ensure
                     # we can do stuff in parallel
                     peer.performJob(nextJob.descriptor())

Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/test/test_jobqueue.py	2015-01-19 18:50:50 UTC (rev 14326)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py	2015-01-20 01:42:50 UTC (rev 14327)
@@ -43,7 +43,7 @@
     ConnectionFromPeerNode,
     _BaseQueuer, NonPerformingQueuer, JobItem,
     WORK_PRIORITY_LOW, WORK_PRIORITY_HIGH, WORK_PRIORITY_MEDIUM,
-    JobDescriptor, SingletonWorkItem
+    JobDescriptor, SingletonWorkItem, JobFailedError
 )
 import twext.enterprise.jobqueue
 
@@ -408,7 +408,7 @@
         sinceEpoch = astimestamp(fakeNow)
         clock = Clock()
         clock.advance(sinceEpoch)
-        qpool = PeerConnectionPool(clock, dbpool.connection, 0)
+        qpool = PeerConnectionPool(clock, dbpool.connection, 0, useWorkerPool=False)
         realChoosePerformer = qpool.choosePerformer
         performerChosen = []
 
@@ -668,7 +668,7 @@
         then = datetime.datetime(2012, 12, 12, 12, 12, 12)
         reactor.advance(astimestamp(then))
         cph.setUp(self)
-        qpool = PeerConnectionPool(reactor, cph.pool.connection, 4321)
+        qpool = PeerConnectionPool(reactor, cph.pool.connection, 4321, useWorkerPool=False)
 
         realChoosePerformer = qpool.choosePerformer
         performerChosen = []
@@ -694,9 +694,21 @@
         or outgoing), then it chooses an implementation of C{performJob} that
         simply executes the work locally.
         """
+
+        # If we're using worker pool, this should raise
+        try:
+            self.pcp.choosePerformer()
+        except JobFailedError:
+            pass
+        else:
+            self.fail("Didn't raise JobFailedError")
+
+        # If we're not using worker pool, we should get back LocalPerformer
+        self.pcp = PeerConnectionPool(None, None, 4321, useWorkerPool=False)
         self.checkPerformer(LocalPerformer)
 
 
+
     def test_choosingPerformerWithLocalCapacity(self):
         """
         If L{PeerConnectionPool.choosePerformer} is invoked when some workers
@@ -704,6 +716,10 @@
         performer.
         """
         # Give it some local capacity.
+
+        # In this case we want pcp to have a workerPool, so create a new pcp
+        # for this test
+        self.pcp = PeerConnectionPool(None, None, 4321)
         wlf = self.pcp.workerListenerFactory()
         proto = wlf.buildProtocol(None)
         proto.makeConnection(StringTransport())
@@ -945,7 +961,7 @@
         then = datetime.datetime(2012, 12, 12, 12, 12, 0)
         reactor.advance(astimestamp(then))
         cph.setUp(self)
-        pcp = PeerConnectionPool(reactor, cph.pool.connection, 4321)
+        pcp = PeerConnectionPool(reactor, cph.pool.connection, 4321, useWorkerPool=False)
         now = then + datetime.timedelta(seconds=20)
 
         @transactionally(cph.pool.connection)
@@ -1163,9 +1179,9 @@
         self.addCleanup(deschema)
 
         self.node1 = PeerConnectionPool(
-            reactor, indirectedTransactionFactory, 0)
+            reactor, indirectedTransactionFactory, 0, useWorkerPool=False)
         self.node2 = PeerConnectionPool(
-            reactor, indirectedTransactionFactory, 0)
+            reactor, indirectedTransactionFactory, 0, useWorkerPool=False)
 
         class FireMeService(Service, object):
             def __init__(self, d):
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150119/549f1cac/attachment.html>


More information about the calendarserver-changes mailing list