<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>[14327] twext/trunk/twext/enterprise</title>
</head>
<body>

<style type="text/css"><!--
#msg dl.meta { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; }
#msg dl.meta dt { float: left; width: 6em; font-weight: bold; }
#msg dt:after { content:':';}
#msg dl, #msg dt, #msg ul, #msg li, #header, #footer, #logmsg { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt;  }
#msg dl a { font-weight: bold}
#msg dl a:link    { color:#fc3; }
#msg dl a:active  { color:#ff0; }
#msg dl a:visited { color:#cc6; }
h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: bold; }
#msg pre { overflow: auto; background: #ffc; border: 1px #fa0 solid; padding: 6px; }
#logmsg { background: #ffc; border: 1px #fa0 solid; padding: 1em 1em 0 1em; }
#logmsg p, #logmsg pre, #logmsg blockquote { margin: 0 0 1em 0; }
#logmsg p, #logmsg li, #logmsg dt, #logmsg dd { line-height: 14pt; }
#logmsg h1, #logmsg h2, #logmsg h3, #logmsg h4, #logmsg h5, #logmsg h6 { margin: .5em 0; }
#logmsg h1:first-child, #logmsg h2:first-child, #logmsg h3:first-child, #logmsg h4:first-child, #logmsg h5:first-child, #logmsg h6:first-child { margin-top: 0; }
#logmsg ul, #logmsg ol { padding: 0; list-style-position: inside; margin: 0 0 0 1em; }
#logmsg ul { text-indent: -1em; padding-left: 1em; }#logmsg ol { text-indent: -1.5em; padding-left: 1.5em; }
#logmsg > ul, #logmsg > ol { margin: 0 0 1em 0; }
#logmsg pre { background: #eee; padding: 1em; }
#logmsg blockquote { border: 1px solid #fa0; border-left-width: 10px; padding: 1em 1em 0 1em; background: white;}
#logmsg dl { margin: 0; }
#logmsg dt { font-weight: bold; }
#logmsg dd { margin: 0; padding: 0 0 0.5em 0; }
#logmsg dd:before { content:'\00bb';}
#logmsg table { border-spacing: 0px; border-collapse: collapse; border-top: 4px solid #fa0; border-bottom: 1px solid #fa0; background: #fff; }
#logmsg table th { text-align: left; font-weight: normal; padding: 0.2em 0.5em; border-top: 1px dotted #fa0; }
#logmsg table td { text-align: right; border-top: 1px dotted #fa0; padding: 0.2em 0.5em; }
#logmsg table thead th { text-align: center; border-bottom: 1px solid #fa0; }
#logmsg table th.Corner { text-align: left; }
#logmsg hr { border: none 0; border-top: 2px dashed #fa0; height: 1px; }
#header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: 6px; }
#patch { width: 100%; }
#patch h4 {font-family: verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;}
#patch .propset h4, #patch .binary h4 {margin:0;}
#patch pre {padding:0;line-height:1.2em;margin:0;}
#patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;}
#patch .propset .diff, #patch .binary .diff  {padding:10px 0;}
#patch span {display:block;padding:0 10px;}
#patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, #patch .copfile {border:1px solid #ccc;margin:10px 0;}
#patch ins {background:#dfd;text-decoration:none;display:block;padding:0 10px;}
#patch del {background:#fdd;text-decoration:none;display:block;padding:0 10px;}
#patch .lines, .info {color:#888;background:#fff;}
--></style>
<div id="msg">
<dl class="meta">
<dt>Revision</dt> <dd><a href="http://trac.calendarserver.org//changeset/14327">14327</a></dd>
<dt>Author</dt> <dd>sagen@apple.com</dd>
<dt>Date</dt> <dd>2015-01-19 17:42:50 -0800 (Mon, 19 Jan 2015)</dd>
</dl>

<h3>Log Message</h3>
<pre>Make PeerConnectionPool configurable to not take on work itself (e.g. so the master process will give all work to the workers, and not take on work prior to spawning the workers</pre>

<h3>Modified Paths</h3>
<ul>
<li><a href="#twexttrunktwextenterprisejobqueuepy">twext/trunk/twext/enterprise/jobqueue.py</a></li>
<li><a href="#twexttrunktwextenterprisetesttest_jobqueuepy">twext/trunk/twext/enterprise/test/test_jobqueue.py</a></li>
</ul>

</div>
<div id="patch">
<h3>Diff</h3>
<a id="twexttrunktwextenterprisejobqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/jobqueue.py (14326 => 14327)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/jobqueue.py        2015-01-19 18:50:50 UTC (rev 14326)
+++ twext/trunk/twext/enterprise/jobqueue.py        2015-01-20 01:42:50 UTC (rev 14327)
</span><span class="lines">@@ -1428,7 +1428,7 @@
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         current = sum(worker.currentLoad for worker in self.workers)
</span><span class="cx">         total = len(self.workers) * self.maximumLoadPerWorker
</span><del>-        return ((current * 100) / total) if total else 0
</del><ins>+        return ((current * 100) / total) if total else 100
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     def eachWorkerLoad(self):
</span><span class="lines">@@ -1863,7 +1863,7 @@
</span><span class="cx">     highPriorityLevel = 80      # Percentage load level above which only high priority jobs are processed
</span><span class="cx">     mediumPriorityLevel = 50    # Percentage load level above which high and medium priority jobs are processed
</span><span class="cx"> 
</span><del>-    def __init__(self, reactor, transactionFactory, ampPort):
</del><ins>+    def __init__(self, reactor, transactionFactory, ampPort, useWorkerPool=True):
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Initialize a L{PeerConnectionPool}.
</span><span class="cx"> 
</span><span class="lines">@@ -1876,6 +1876,9 @@
</span><span class="cx"> 
</span><span class="cx">         @param transactionFactory: a 0- or 1-argument callable that produces an
</span><span class="cx">             L{IAsyncTransaction}
</span><ins>+
+        @param useWorkerPool:  Whether to use a worker pool to manage load
+            or instead take on all work ourselves (e.g. in single process mode)
</ins><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         super(PeerConnectionPool, self).__init__()
</span><span class="cx">         self.reactor = reactor
</span><span class="lines">@@ -1884,7 +1887,7 @@
</span><span class="cx">         self.pid = self.getpid()
</span><span class="cx">         self.ampPort = ampPort
</span><span class="cx">         self.thisProcess = None
</span><del>-        self.workerPool = WorkerConnectionPool()
</del><ins>+        self.workerPool = WorkerConnectionPool() if useWorkerPool else None
</ins><span class="cx">         self.peers = []
</span><span class="cx">         self.mappedPeers = {}
</span><span class="cx">         self._startingUp = None
</span><span class="lines">@@ -1904,7 +1907,7 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     def totalLoad(self):
</span><del>-        return self.workerPool.allWorkerLoad()
</del><ins>+        return self.workerPool.allWorkerLoad() if self.workerPool else 0
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     def workerListenerFactory(self):
</span><span class="lines">@@ -1935,15 +1938,19 @@
</span><span class="cx">         @rtype: L{_IJobPerformer} L{ConnectionFromPeerNode} or
</span><span class="cx">             L{WorkerConnectionPool}
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        if self.workerPool.hasAvailableCapacity():
-            return self.workerPool
</del><ins>+        if self.workerPool:
</ins><span class="cx"> 
</span><del>-        if self.peers and not onlyLocally:
-            return sorted(self.peers, key=lambda p: p.currentLoadEstimate())[0]
-        else:
-            return LocalPerformer(self.transactionFactory)
</del><ins>+            if self.workerPool.hasAvailableCapacity():
+                return self.workerPool
</ins><span class="cx"> 
</span><ins>+            if self.peers and not onlyLocally:
+                return sorted(self.peers, key=lambda p: p.currentLoadEstimate())[0]
+            else:
+                raise JobFailedError(&quot;No capacity for work&quot;)
</ins><span class="cx"> 
</span><ins>+        return LocalPerformer(self.transactionFactory)
+
+
</ins><span class="cx">     def performJobForPeer(self, job):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         A peer has requested us to perform a job; choose a job performer
</span><span class="lines">@@ -2007,7 +2014,8 @@
</span><span class="cx">             # FIXME: need to include capacity of other nodes. For now we only check
</span><span class="cx">             # our own capacity and stop processing if too busy. Other nodes that
</span><span class="cx">             # are not busy will pick up work.
</span><del>-            level = self.workerPool.loadLevel()
</del><ins>+            # If no workerPool, set level to 0, taking on all work.
+            level = 0 if self.workerPool is None else self.workerPool.loadLevel()
</ins><span class="cx"> 
</span><span class="cx">             # Check overload level first
</span><span class="cx">             if level &gt; self.overloadLevel:
</span><span class="lines">@@ -2081,8 +2089,8 @@
</span><span class="cx">                     yield txn.commit()
</span><span class="cx"> 
</span><span class="cx">             if nextJob is not None:
</span><del>-                peer = self.choosePerformer(onlyLocally=True)
</del><span class="cx">                 try:
</span><ins>+                    peer = self.choosePerformer(onlyLocally=True)
</ins><span class="cx">                     # Send the job over but DO NOT block on the response - that will ensure
</span><span class="cx">                     # we can do stuff in parallel
</span><span class="cx">                     peer.performJob(nextJob.descriptor())
</span></span></pre></div>
<a id="twexttrunktwextenterprisetesttest_jobqueuepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/enterprise/test/test_jobqueue.py (14326 => 14327)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/enterprise/test/test_jobqueue.py        2015-01-19 18:50:50 UTC (rev 14326)
+++ twext/trunk/twext/enterprise/test/test_jobqueue.py        2015-01-20 01:42:50 UTC (rev 14327)
</span><span class="lines">@@ -43,7 +43,7 @@
</span><span class="cx">     ConnectionFromPeerNode,
</span><span class="cx">     _BaseQueuer, NonPerformingQueuer, JobItem,
</span><span class="cx">     WORK_PRIORITY_LOW, WORK_PRIORITY_HIGH, WORK_PRIORITY_MEDIUM,
</span><del>-    JobDescriptor, SingletonWorkItem
</del><ins>+    JobDescriptor, SingletonWorkItem, JobFailedError
</ins><span class="cx"> )
</span><span class="cx"> import twext.enterprise.jobqueue
</span><span class="cx"> 
</span><span class="lines">@@ -408,7 +408,7 @@
</span><span class="cx">         sinceEpoch = astimestamp(fakeNow)
</span><span class="cx">         clock = Clock()
</span><span class="cx">         clock.advance(sinceEpoch)
</span><del>-        qpool = PeerConnectionPool(clock, dbpool.connection, 0)
</del><ins>+        qpool = PeerConnectionPool(clock, dbpool.connection, 0, useWorkerPool=False)
</ins><span class="cx">         realChoosePerformer = qpool.choosePerformer
</span><span class="cx">         performerChosen = []
</span><span class="cx"> 
</span><span class="lines">@@ -668,7 +668,7 @@
</span><span class="cx">         then = datetime.datetime(2012, 12, 12, 12, 12, 12)
</span><span class="cx">         reactor.advance(astimestamp(then))
</span><span class="cx">         cph.setUp(self)
</span><del>-        qpool = PeerConnectionPool(reactor, cph.pool.connection, 4321)
</del><ins>+        qpool = PeerConnectionPool(reactor, cph.pool.connection, 4321, useWorkerPool=False)
</ins><span class="cx"> 
</span><span class="cx">         realChoosePerformer = qpool.choosePerformer
</span><span class="cx">         performerChosen = []
</span><span class="lines">@@ -694,9 +694,21 @@
</span><span class="cx">         or outgoing), then it chooses an implementation of C{performJob} that
</span><span class="cx">         simply executes the work locally.
</span><span class="cx">         &quot;&quot;&quot;
</span><ins>+
+        # If we're using worker pool, this should raise
+        try:
+            self.pcp.choosePerformer()
+        except JobFailedError:
+            pass
+        else:
+            self.fail(&quot;Didn't raise JobFailedError&quot;)
+
+        # If we're not using worker pool, we should get back LocalPerformer
+        self.pcp = PeerConnectionPool(None, None, 4321, useWorkerPool=False)
</ins><span class="cx">         self.checkPerformer(LocalPerformer)
</span><span class="cx"> 
</span><span class="cx"> 
</span><ins>+
</ins><span class="cx">     def test_choosingPerformerWithLocalCapacity(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         If L{PeerConnectionPool.choosePerformer} is invoked when some workers
</span><span class="lines">@@ -704,6 +716,10 @@
</span><span class="cx">         performer.
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         # Give it some local capacity.
</span><ins>+
+        # In this case we want pcp to have a workerPool, so create a new pcp
+        # for this test
+        self.pcp = PeerConnectionPool(None, None, 4321)
</ins><span class="cx">         wlf = self.pcp.workerListenerFactory()
</span><span class="cx">         proto = wlf.buildProtocol(None)
</span><span class="cx">         proto.makeConnection(StringTransport())
</span><span class="lines">@@ -945,7 +961,7 @@
</span><span class="cx">         then = datetime.datetime(2012, 12, 12, 12, 12, 0)
</span><span class="cx">         reactor.advance(astimestamp(then))
</span><span class="cx">         cph.setUp(self)
</span><del>-        pcp = PeerConnectionPool(reactor, cph.pool.connection, 4321)
</del><ins>+        pcp = PeerConnectionPool(reactor, cph.pool.connection, 4321, useWorkerPool=False)
</ins><span class="cx">         now = then + datetime.timedelta(seconds=20)
</span><span class="cx"> 
</span><span class="cx">         @transactionally(cph.pool.connection)
</span><span class="lines">@@ -1163,9 +1179,9 @@
</span><span class="cx">         self.addCleanup(deschema)
</span><span class="cx"> 
</span><span class="cx">         self.node1 = PeerConnectionPool(
</span><del>-            reactor, indirectedTransactionFactory, 0)
</del><ins>+            reactor, indirectedTransactionFactory, 0, useWorkerPool=False)
</ins><span class="cx">         self.node2 = PeerConnectionPool(
</span><del>-            reactor, indirectedTransactionFactory, 0)
</del><ins>+            reactor, indirectedTransactionFactory, 0, useWorkerPool=False)
</ins><span class="cx"> 
</span><span class="cx">         class FireMeService(Service, object):
</span><span class="cx">             def __init__(self, d):
</span></span></pre>
</div>
</div>

</body>
</html>