[CalendarServer-changes] [14327] twext/trunk/twext/enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Mon Jan 19 17:42:50 PST 2015
Revision: 14327
http://trac.calendarserver.org//changeset/14327
Author: sagen at apple.com
Date: 2015-01-19 17:42:50 -0800 (Mon, 19 Jan 2015)
Log Message:
-----------
Make PeerConnectionPool configurable to not take on work itself (e.g. so the master process will give all work to the workers, and not take on work prior to spawning the workers
Modified Paths:
--------------
twext/trunk/twext/enterprise/jobqueue.py
twext/trunk/twext/enterprise/test/test_jobqueue.py
Modified: twext/trunk/twext/enterprise/jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/jobqueue.py 2015-01-19 18:50:50 UTC (rev 14326)
+++ twext/trunk/twext/enterprise/jobqueue.py 2015-01-20 01:42:50 UTC (rev 14327)
@@ -1428,7 +1428,7 @@
"""
current = sum(worker.currentLoad for worker in self.workers)
total = len(self.workers) * self.maximumLoadPerWorker
- return ((current * 100) / total) if total else 0
+ return ((current * 100) / total) if total else 100
def eachWorkerLoad(self):
@@ -1863,7 +1863,7 @@
highPriorityLevel = 80 # Percentage load level above which only high priority jobs are processed
mediumPriorityLevel = 50 # Percentage load level above which high and medium priority jobs are processed
- def __init__(self, reactor, transactionFactory, ampPort):
+ def __init__(self, reactor, transactionFactory, ampPort, useWorkerPool=True):
"""
Initialize a L{PeerConnectionPool}.
@@ -1876,6 +1876,9 @@
@param transactionFactory: a 0- or 1-argument callable that produces an
L{IAsyncTransaction}
+
+ @param useWorkerPool: Whether to use a worker pool to manage load
+ or instead take on all work ourselves (e.g. in single process mode)
"""
super(PeerConnectionPool, self).__init__()
self.reactor = reactor
@@ -1884,7 +1887,7 @@
self.pid = self.getpid()
self.ampPort = ampPort
self.thisProcess = None
- self.workerPool = WorkerConnectionPool()
+ self.workerPool = WorkerConnectionPool() if useWorkerPool else None
self.peers = []
self.mappedPeers = {}
self._startingUp = None
@@ -1904,7 +1907,7 @@
def totalLoad(self):
- return self.workerPool.allWorkerLoad()
+ return self.workerPool.allWorkerLoad() if self.workerPool else 0
def workerListenerFactory(self):
@@ -1935,15 +1938,19 @@
@rtype: L{_IJobPerformer} L{ConnectionFromPeerNode} or
L{WorkerConnectionPool}
"""
- if self.workerPool.hasAvailableCapacity():
- return self.workerPool
+ if self.workerPool:
- if self.peers and not onlyLocally:
- return sorted(self.peers, key=lambda p: p.currentLoadEstimate())[0]
- else:
- return LocalPerformer(self.transactionFactory)
+ if self.workerPool.hasAvailableCapacity():
+ return self.workerPool
+ if self.peers and not onlyLocally:
+ return sorted(self.peers, key=lambda p: p.currentLoadEstimate())[0]
+ else:
+ raise JobFailedError("No capacity for work")
+ return LocalPerformer(self.transactionFactory)
+
+
def performJobForPeer(self, job):
"""
A peer has requested us to perform a job; choose a job performer
@@ -2007,7 +2014,8 @@
# FIXME: need to include capacity of other nodes. For now we only check
# our own capacity and stop processing if too busy. Other nodes that
# are not busy will pick up work.
- level = self.workerPool.loadLevel()
+ # If no workerPool, set level to 0, taking on all work.
+ level = 0 if self.workerPool is None else self.workerPool.loadLevel()
# Check overload level first
if level > self.overloadLevel:
@@ -2081,8 +2089,8 @@
yield txn.commit()
if nextJob is not None:
- peer = self.choosePerformer(onlyLocally=True)
try:
+ peer = self.choosePerformer(onlyLocally=True)
# Send the job over but DO NOT block on the response - that will ensure
# we can do stuff in parallel
peer.performJob(nextJob.descriptor())
Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py
===================================================================
--- twext/trunk/twext/enterprise/test/test_jobqueue.py 2015-01-19 18:50:50 UTC (rev 14326)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py 2015-01-20 01:42:50 UTC (rev 14327)
@@ -43,7 +43,7 @@
ConnectionFromPeerNode,
_BaseQueuer, NonPerformingQueuer, JobItem,
WORK_PRIORITY_LOW, WORK_PRIORITY_HIGH, WORK_PRIORITY_MEDIUM,
- JobDescriptor, SingletonWorkItem
+ JobDescriptor, SingletonWorkItem, JobFailedError
)
import twext.enterprise.jobqueue
@@ -408,7 +408,7 @@
sinceEpoch = astimestamp(fakeNow)
clock = Clock()
clock.advance(sinceEpoch)
- qpool = PeerConnectionPool(clock, dbpool.connection, 0)
+ qpool = PeerConnectionPool(clock, dbpool.connection, 0, useWorkerPool=False)
realChoosePerformer = qpool.choosePerformer
performerChosen = []
@@ -668,7 +668,7 @@
then = datetime.datetime(2012, 12, 12, 12, 12, 12)
reactor.advance(astimestamp(then))
cph.setUp(self)
- qpool = PeerConnectionPool(reactor, cph.pool.connection, 4321)
+ qpool = PeerConnectionPool(reactor, cph.pool.connection, 4321, useWorkerPool=False)
realChoosePerformer = qpool.choosePerformer
performerChosen = []
@@ -694,9 +694,21 @@
or outgoing), then it chooses an implementation of C{performJob} that
simply executes the work locally.
"""
+
+ # If we're using worker pool, this should raise
+ try:
+ self.pcp.choosePerformer()
+ except JobFailedError:
+ pass
+ else:
+ self.fail("Didn't raise JobFailedError")
+
+ # If we're not using worker pool, we should get back LocalPerformer
+ self.pcp = PeerConnectionPool(None, None, 4321, useWorkerPool=False)
self.checkPerformer(LocalPerformer)
+
def test_choosingPerformerWithLocalCapacity(self):
"""
If L{PeerConnectionPool.choosePerformer} is invoked when some workers
@@ -704,6 +716,10 @@
performer.
"""
# Give it some local capacity.
+
+ # In this case we want pcp to have a workerPool, so create a new pcp
+ # for this test
+ self.pcp = PeerConnectionPool(None, None, 4321)
wlf = self.pcp.workerListenerFactory()
proto = wlf.buildProtocol(None)
proto.makeConnection(StringTransport())
@@ -945,7 +961,7 @@
then = datetime.datetime(2012, 12, 12, 12, 12, 0)
reactor.advance(astimestamp(then))
cph.setUp(self)
- pcp = PeerConnectionPool(reactor, cph.pool.connection, 4321)
+ pcp = PeerConnectionPool(reactor, cph.pool.connection, 4321, useWorkerPool=False)
now = then + datetime.timedelta(seconds=20)
@transactionally(cph.pool.connection)
@@ -1163,9 +1179,9 @@
self.addCleanup(deschema)
self.node1 = PeerConnectionPool(
- reactor, indirectedTransactionFactory, 0)
+ reactor, indirectedTransactionFactory, 0, useWorkerPool=False)
self.node2 = PeerConnectionPool(
- reactor, indirectedTransactionFactory, 0)
+ reactor, indirectedTransactionFactory, 0, useWorkerPool=False)
class FireMeService(Service, object):
def __init__(self, d):
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150119/549f1cac/attachment.html>
More information about the calendarserver-changes
mailing list