[CalendarServer-changes] [2529] CalendarServer/branches/memcache-reconnect/twistedcaldav
source_changes at macosforge.org
source_changes at macosforge.org
Tue Jun 3 15:25:26 PDT 2008
Revision: 2529
http://trac.macosforge.org/projects/calendarserver/changeset/2529
Author: dreid at apple.com
Date: 2008-06-03 15:25:25 -0700 (Tue, 03 Jun 2008)
Log Message:
-----------
Some actual connection pooling.
Modified Paths:
--------------
CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py
CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_cache.py
CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_memcachepool.py
CalendarServer/branches/memcache-reconnect/twistedcaldav/test/util.py
Modified: CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py 2008-06-03 20:48:15 UTC (rev 2528)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py 2008-06-03 22:25:25 UTC (rev 2529)
@@ -14,10 +14,11 @@
# limitations under the License.
##
-from twisted.internet.defer import Deferred
+from twisted.python.failure import Failure
+from twisted.internet.defer import Deferred, fail
from twisted.internet.protocol import ReconnectingClientFactory
-from twistedcaldav.memcache import MemCacheProtocol
+from twistedcaldav.memcache import MemCacheProtocol, NoSuchCommand
class PooledMemCacheProtocol(MemCacheProtocol):
@@ -26,17 +27,14 @@
to accept requests.
@ivar factory: A L{MemCacheClientFactory} instance.
- @ivar connectionPool: A managing connection pool that we notify of events.
"""
factory = None
- connectionPool = None
def connectionMade(self):
"""
- Notify our connectionPool that we're ready to accept connections.
+ Notify our factory that we're ready to accept connections.
"""
MemCacheProtocol.connectionMade(self)
- self.connectionPool.clientFree(self)
if self.factory.deferred is not None:
self.factory.deferred.callback(self)
@@ -94,8 +92,10 @@
Attach the C{self.connectionPool} to the protocol so it can tell it,
when we've connected.
"""
+ if self._protocolInstance is not None:
+ self.connectionPool.clientGone(self._protocolInstance)
+
self._protocolInstance = self.protocol()
- self._protocolInstance.connectionPool = self.connectionPool
self._protocolInstance.factory = self
return self._protocolInstance
@@ -137,6 +137,7 @@
self._busyClients = set([])
self._freeClients = set([])
+ self._commands = []
def _newClientConnection(self):
@@ -155,6 +156,38 @@
return factory.deferred
+ def _performRequestOnClient(self, client, command, *args, **kwargs):
+ """
+ Perform the given request on the given client.
+
+ @param client: A L{PooledMemCacheProtocol} that will be used to perform
+ the given request.
+
+ @param command: A C{str} representing an attribute of
+ L{MemCacheProtocol}.
+ @parma ar self.assertEquals(self.reactor.calls, [])
+gs: Any positional arguments that should be passed to
+ C{command}.
+ @param kwargs: Any keyword arguments that should be passed to
+ C{command}.
+
+ @return: A L{Deferred} that fires with the result of the given command.
+ """
+ def _freeClientAfterRequest(result):
+ self.clientFree(client)
+ return result
+
+ method = getattr(client, command, None)
+ if method is not None:
+ d = method(*args, **kwargs)
+ else:
+ d = fail(Failure(NoSuchCommand()))
+
+ d.addCallback(_freeClientAfterRequest)
+
+ return d
+
+
def performRequest(self, command, *args, **kwargs):
"""
Select an available client and perform the given request on it.
@@ -168,9 +201,38 @@
@return: A L{Deferred} that fires with the result of the given command.
"""
- d = self._newClientConnection()
+ if len(self._freeClients) > 0:
+ client = self._freeClients.pop()
+ self.clientBusy(client)
+
+ d = self._performRequestOnClient(
+ client, command, *args, **kwargs)
+
+ elif len(self._busyClients) >= self._maxClients:
+ d = Deferred()
+ self._commands.append((d, command, args, kwargs))
+
+ else:
+ d = self._newClientConnection()
+ d.addCallback(self._performRequestOnClient,
+ command, *args, **kwargs)
+
return d
+
+ def clientGone(self, client):
+ """
+ Notify that the given client is to be removed from the pool completely.
+
+ @param client: An instance of L{PooledMemCacheProtocol}.
+ """
+ if client in self._busyClients:
+ self._busyClients.remove(client)
+
+ elif client in self._freeClients:
+ self._freeClients.remove(client)
+
+
def clientBusy(self, client):
"""
Notify that the given client is being used to complete a request.
@@ -193,3 +255,20 @@
self._busyClients.remove(client)
self._freeClients.add(client)
+
+ if len(self._commands) > 0:
+ d, command, args, kwargs = self._commands.pop(0)
+ _ign_d = self.performRequest(
+ command, *args, **kwargs)
+
+ _ign_d.addCallback(d.callback)
+
+
+ def suggestMaxClients(self, maxClients):
+ """
+ Suggest the maximum number of concurrently connected clients.
+
+ @param maxClients: A C{int} indicating how many client connections we
+ should keep open.
+ """
+ self._maxClients = maxClients
Modified: CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_cache.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_cache.py 2008-06-03 20:48:15 UTC (rev 2528)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_cache.py 2008-06-03 22:25:25 UTC (rev 2529)
@@ -32,7 +32,7 @@
from twistedcaldav.cache import MemcacheResponseCache
from twistedcaldav.cache import MemcacheChangeNotifier
-from twistedcaldav.test.util import InMemoryPropertyStore
+from twistedcaldav.test.util import InMemoryMemcacheProtocol
def _newCacheToken(self):
@@ -75,28 +75,6 @@
-class InMemoryMemcacheProtocol(object):
- def __init__(self):
- self._cache = {}
-
-
- def get(self, key):
- if key not in self._cache:
- return succeed((0, None))
-
- return succeed(self._cache[key])
-
-
- def set(self, key, value, flags=0, expireTime=0):
- try:
- self._cache[key] = (flags, value)
- return succeed(True)
-
- except Exception, err:
- return fail(Failure())
-
-
-
class StubURLResource(object):
def __init__(self, url):
self._url = url
Modified: CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_memcachepool.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_memcachepool.py 2008-06-03 20:48:15 UTC (rev 2528)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_memcachepool.py 2008-06-03 22:25:25 UTC (rev 2529)
@@ -21,6 +21,7 @@
from twisted.internet.interfaces import IConnector, IReactorTCP
from twisted.internet.address import IPv4Address
+from twistedcaldav.test.util import InMemoryMemcacheProtocol
from twistedcaldav.memcachepool import PooledMemCacheProtocol
from twistedcaldav.memcachepool import MemCacheClientFactory
from twistedcaldav.memcachepool import MemCachePool
@@ -35,8 +36,8 @@
of (status, client) tuples where status is C{'free'} or C{'busy'}
@ivar calls: A C{list} of C{tuple}s of the form C{(status, client)} where
- status is C{'free'} or C{'busy'} and client is the protocol instance
- that made the call.
+ status is C{'free'}, C{'busy'} or C{'gone'} and client is the protocol
+ instance that made the call.
"""
def __init__(self):
self.calls = []
@@ -56,7 +57,14 @@
self.calls.append(('busy', client))
+ def clientGone(self, client):
+ """
+ Record a C{'gone'} call for C{client}
+ """
+ self.calls.append(('gone', client))
+
+
class StubConnector(object):
"""
A stub L{IConnector} that can be used for testing.
@@ -98,21 +106,6 @@
"""
Tests for the L{PooledMemCacheProtocol}
"""
- def test_connectionMadeNotifiesPool(self):
- """
- Test that L{PooledMemCacheProtocol.connectionMade} notifies the
- connectionPool that it is free.
- """
- p = PooledMemCacheProtocol()
- p.factory = MemCacheClientFactory()
- p.connectionPool = StubConnectionPool()
- p.connectionMade()
-
- self.assertEquals(p.connectionPool.calls, [('free', p)])
-
- return p.factory.deferred
-
-
def test_connectionMadeFiresDeferred(self):
"""
Test that L{PooledMemCacheProtocol.connectionMade} fires the factory's
@@ -150,15 +143,6 @@
self.protocol = self.factory.buildProtocol(None)
- def test_buildProtocolGivesProtocolConnectionPool(self):
- """
- Test that L{MemCacheClientFactory.buildProtocol} gives it's
- protocol instance it's connectionPool.
- """
- self.assertEquals(self.factory.connectionPool,
- self.protocol.connectionPool)
-
-
def test_clientConnectionFailedNotifiesPool(self):
"""
Test that L{MemCacheClientFactory.clientConnectionFailed} notifies
@@ -179,6 +163,19 @@
[('busy', self.protocol)])
+ def test_buildProtocolRemovesExistingClient(self):
+ """
+ Test that L{MemCacheClientFactory.buildProtocol} notifies
+ the connectionPool when an old protocol instance is going away.
+
+ This will happen when we get reconnected. We'll remove the old protocol
+ and add a new one.
+ """
+ protocol = self.factory.buildProtocol(None)
+ self.assertEquals(self.factory.connectionPool.calls,
+ [('gone', self.protocol)])
+
+
def tearDown(self):
"""
Make sure the L{MemCacheClientFactory} isn't trying to reconnect
@@ -251,12 +248,48 @@
self.assertEquals(self.pool._freeClients, set([]))
+ def test_clientGoneRemovesFreeClient(self):
+ """
+ Test that a client in the free set gets removed when
+ L{MemCachePool.clientGone} is called.
+ """
+ p = MemCacheClientFactory().buildProtocol(None)
+ self.pool.clientFree(p)
+ self.assertEquals(self.pool._freeClients, set([p]))
+ self.assertEquals(self.pool._busyClients, set([]))
+
+ self.pool.clientGone(p)
+ self.assertEquals(self.pool._freeClients, set([]))
+
+
+ def test_clientGoneRemovesBusyClient(self):
+ """
+ Test that a client in the busy set gets removed when
+ L{MemCachePool.clientGone} is called.
+ """
+ p = MemCacheClientFactory().buildProtocol(None)
+ self.pool.clientBusy(p)
+ self.assertEquals(self.pool._busyClients, set([p]))
+ self.assertEquals(self.pool._freeClients, set([]))
+
+ self.pool.clientGone(p)
+ self.assertEquals(self.pool._busyClients, set([]))
+
+
def test_performRequestCreatesConnection(self):
"""
Test that L{MemCachePool.performRequest} on a fresh instance causes
a new connection to be created.
"""
+ def _checkResult(result):
+ self.assertEquals(result, (0, 'bar'))
+
+
+ p = InMemoryMemcacheProtocol()
+ p.set('foo', 'bar')
+
d = self.pool.performRequest('get', 'foo')
+ d.addCallback(_checkResult)
args, kwargs = self.reactor.calls.pop()
@@ -264,8 +297,86 @@
self.failUnless(isinstance(args[2], MemCacheClientFactory))
self.assertEquals(kwargs, {})
- factory = args[2]
- protocol = factory.buildProtocol(None)
- protocol.connectionMade()
+ args[2].deferred.callback(p)
return d
+
+
+ def test_performRequestUsesFreeConnection(self):
+ """
+ Test that L{MemCachePool.performRequest} doesn't create a new connection
+ to be created if there is a free connection.
+ """
+ def _checkResult(result):
+ self.assertEquals(result, (0, 'bar'))
+ self.assertEquals(self.reactor.calls, [])
+
+ p = InMemoryMemcacheProtocol()
+ p.set('foo', 'bar')
+
+ self.pool.clientFree(p)
+
+ d = self.pool.performRequest('get', 'foo')
+ d.addCallback(_checkResult)
+
+ return d
+
+
+ def test_performRequestMaxBusyQueuesRequest(self):
+ """
+ Test that L{MemCachePool.performRequest} queues the request if
+ all clients are busy.
+ """
+ def _checkResult(result):
+ self.assertEquals(result, (0, 'bar'))
+ self.assertEquals(self.reactor.calls, [])
+
+ p = InMemoryMemcacheProtocol()
+ p.set('foo', 'bar')
+
+ p1 = InMemoryMemcacheProtocol()
+ p1.set('foo', 'baz')
+
+ self.pool.suggestMaxClients(2)
+
+ self.pool.clientBusy(p)
+ self.pool.clientBusy(p1)
+
+ d = self.pool.performRequest('get', 'foo')
+ d.addCallback(_checkResult)
+
+ self.pool.clientFree(p)
+
+ return d
+
+
+ def test_performRequestCreatesConnectionsUntilMaxBusy(self):
+ """
+ Test that L{MemCachePool.performRequest} will create new connections
+ until it reaches the maximum number of busy clients.
+ """
+ def _checkResult(result):
+ self.assertEquals(result, (0, 'baz'))
+
+ self.pool.suggestMaxClients(2)
+
+ p = InMemoryMemcacheProtocol()
+ p.set('foo', 'bar')
+
+ p1 = InMemoryMemcacheProtocol()
+ p1.set('foo', 'baz')
+
+
+ self.pool.clientBusy(p)
+
+ d = self.pool.performRequest('get', 'foo')
+
+ args, kwargs = self.reactor.calls.pop()
+
+ self.assertEquals(args[:2], (MC_ADDRESS.host, MC_ADDRESS.port))
+ self.failUnless(isinstance(args[2], MemCacheClientFactory))
+ self.assertEquals(kwargs, {})
+
+ args[2].deferred.callback(p1)
+
+ return d
Modified: CalendarServer/branches/memcache-reconnect/twistedcaldav/test/util.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/test/util.py 2008-06-03 20:48:15 UTC (rev 2528)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/test/util.py 2008-06-03 22:25:25 UTC (rev 2529)
@@ -47,6 +47,29 @@
self._properties[property.qname()] = property
+
+class InMemoryMemcacheProtocol(object):
+ def __init__(self):
+ self._cache = {}
+
+
+ def get(self, key):
+ if key not in self._cache:
+ return succeed((0, None))
+
+ return succeed(self._cache[key])
+
+
+ def set(self, key, value, flags=0, expireTime=0):
+ try:
+ self._cache[key] = (flags, value)
+ return succeed(True)
+
+ except Exception, err:
+ return fail(Failure())
+
+
+
class StubCacheChangeNotifier(object):
def __init__(self, *args, **kwargs):
pass
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080603/2e95b5ad/attachment-0001.htm
More information about the calendarserver-changes
mailing list