[CalendarServer-changes] [7330] CalendarServer/trunk/twext/enterprise
source_changes at macosforge.org
source_changes at macosforge.org
Tue Apr 19 22:11:38 PDT 2011
Revision: 7330
http://trac.macosforge.org/projects/calendarserver/changeset/7330
Author: glyph at apple.com
Date: 2011-04-19 22:11:38 -0700 (Tue, 19 Apr 2011)
Log Message:
-----------
fix & test for threadpool starvation bug
Modified Paths:
--------------
CalendarServer/trunk/twext/enterprise/adbapi2.py
CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
Modified: CalendarServer/trunk/twext/enterprise/adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/adbapi2.py 2011-04-20 04:47:45 UTC (rev 7329)
+++ CalendarServer/trunk/twext/enterprise/adbapi2.py 2011-04-20 05:11:38 UTC (rev 7330)
@@ -703,8 +703,13 @@
def startService(self):
"""
- No startup necessary.
+ Increase the thread pool size of the reactor by the number of threads
+ that this service may consume. This is important because unlike most
+ L{IReactorThreads} users, the connection work units are very long-lived
+ and block until this service has been stopped.
"""
+ tp = self.reactor.getThreadPool()
+ self.reactor.suggestThreadPoolSize(tp.max + self.maxConnections)
@inlineCallbacks
@@ -742,7 +747,10 @@
# independently submitted from .abort() / .close().
yield self._free.pop()._releaseConnection()
+ tp = self.reactor.getThreadPool()
+ self.reactor.suggestThreadPoolSize(tp.max - self.maxConnections)
+
def _createHolder(self):
"""
Create a L{ThreadHolder}. (Test hook.)
Modified: CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py
===================================================================
--- CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py 2011-04-20 04:47:45 UTC (rev 7329)
+++ CalendarServer/trunk/twext/enterprise/test/test_adbapi2.py 2011-04-20 05:11:38 UTC (rev 7330)
@@ -20,6 +20,9 @@
from itertools import count
+from zope.interface.verify import verifyClass
+
+from twisted.python.threadpool import ThreadPool
from twisted.trial.unittest import TestCase
from twisted.internet.defer import execute
@@ -28,6 +31,8 @@
from twisted.internet.defer import Deferred
from twext.enterprise.ienterprise import ConnectionError
from twext.enterprise.ienterprise import AlreadyFinishedError
+from twisted.internet.interfaces import IReactorThreads
+from zope.interface.declarations import implements
from twext.enterprise.adbapi2 import ConnectionPool
@@ -324,6 +329,81 @@
+class ClockWithThreads(Clock):
+ """
+ A testing reactor that supplies L{IReactorTime} and L{IReactorThreads}.
+ """
+ implements(IReactorThreads)
+
+ def __init__(self):
+ super(ClockWithThreads, self).__init__()
+ self._pool = ThreadPool()
+
+
+ def getThreadPool(self):
+ """
+ Get the threadpool.
+ """
+ return self._pool
+
+
+ def suggestThreadPoolSize(self, size):
+ """
+ Approximate the behavior of a 'real' reactor.
+ """
+ self._pool.adjustPoolsize(maxthreads=size)
+
+
+ def callInThread(self, thunk, *a, **kw):
+ """
+ No implementation.
+ """
+
+
+ def callFromThread(self, thunk, *a, **kw):
+ """
+ No implementation.
+ """
+
+
+verifyClass(IReactorThreads, ClockWithThreads)
+
+
+
+class ConnectionPoolBootTestss(TestCase):
+ """
+ Tests for the start-up phase of L{ConnectionPool}.
+ """
+
+ def test_threadCount(self):
+ """
+ The reactor associated with a L{ConnectionPool} will have its maximum
+ thread count adjusted when L{ConnectionPool.startService} is called, to
+ accomodate for L{ConnectionPool.maxConnections} additional threads.
+
+ Stopping the service should restore it to its original value, so that a
+ repeatedly re-started L{ConnectionPool} will not cause the thread
+ ceiling to grow without bound.
+ """
+ defaultMax = 27
+ connsMax = 45
+ combinedMax = defaultMax + connsMax
+ pool = ConnectionPool(None, maxConnections=connsMax)
+ pool.reactor = ClockWithThreads()
+ threadpool = pool.reactor.getThreadPool()
+ pool.reactor.suggestThreadPoolSize(defaultMax)
+ self.assertEquals(threadpool.max, defaultMax)
+ pool.startService()
+ self.assertEquals(threadpool.max, combinedMax)
+ justChecking = []
+ pool.stopService().addCallback(justChecking.append)
+ # No SQL run, so no threads started, so this deferred should fire
+ # immediately. If not, we're in big trouble, so sanity check.
+ self.assertEquals(justChecking, [None])
+ self.assertEquals(threadpool.max, defaultMax)
+
+
+
class ConnectionPoolTests(TestCase):
"""
Tests for L{ConnectionPool}.
@@ -340,7 +420,7 @@
self.pool = ConnectionPool(self.factory.connect,
maxConnections=2)
self.pool._createHolder = self.makeAHolder
- self.clock = self.pool.reactor = Clock()
+ self.clock = self.pool.reactor = ClockWithThreads()
self.pool.startService()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110419/541f9699/attachment.html>
More information about the calendarserver-changes
mailing list