[CalendarServer-changes] [2529] CalendarServer/branches/memcache-reconnect/twistedcaldav

source_changes at macosforge.org source_changes at macosforge.org
Tue Jun 3 15:25:26 PDT 2008


Revision: 2529
          http://trac.macosforge.org/projects/calendarserver/changeset/2529
Author:   dreid at apple.com
Date:     2008-06-03 15:25:25 -0700 (Tue, 03 Jun 2008)

Log Message:
-----------
Some actual connection pooling.

Modified Paths:
--------------
    CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py
    CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_cache.py
    CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_memcachepool.py
    CalendarServer/branches/memcache-reconnect/twistedcaldav/test/util.py

Modified: CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py	2008-06-03 20:48:15 UTC (rev 2528)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py	2008-06-03 22:25:25 UTC (rev 2529)
@@ -14,10 +14,11 @@
 # limitations under the License.
 ##
 
-from twisted.internet.defer import Deferred
+from twisted.python.failure import Failure
+from twisted.internet.defer import Deferred, fail
 from twisted.internet.protocol import ReconnectingClientFactory
 
-from twistedcaldav.memcache import MemCacheProtocol
+from twistedcaldav.memcache import MemCacheProtocol, NoSuchCommand
 
 
 class PooledMemCacheProtocol(MemCacheProtocol):
@@ -26,17 +27,14 @@
     to accept requests.
 
     @ivar factory: A L{MemCacheClientFactory} instance.
-    @ivar connectionPool: A managing connection pool that we notify of events.
     """
     factory = None
-    connectionPool = None
 
     def connectionMade(self):
         """
-        Notify our connectionPool that we're ready to accept connections.
+        Notify our factory that we're ready to accept connections.
         """
         MemCacheProtocol.connectionMade(self)
-        self.connectionPool.clientFree(self)
 
         if self.factory.deferred is not None:
             self.factory.deferred.callback(self)
@@ -94,8 +92,10 @@
         Attach the C{self.connectionPool} to the protocol so it can tell it,
         when we've connected.
         """
+        if self._protocolInstance is not None:
+            self.connectionPool.clientGone(self._protocolInstance)
+
         self._protocolInstance = self.protocol()
-        self._protocolInstance.connectionPool = self.connectionPool
         self._protocolInstance.factory = self
         return self._protocolInstance
 
@@ -137,6 +137,7 @@
 
         self._busyClients = set([])
         self._freeClients = set([])
+        self._commands = []
 
 
     def _newClientConnection(self):
@@ -155,6 +156,38 @@
         return factory.deferred
 
 
+    def _performRequestOnClient(self, client, command, *args, **kwargs):
+        """
+        Perform the given request on the given client.
+
+        @param client: A L{PooledMemCacheProtocol} that will be used to perform
+            the given request.
+
+        @param command: A C{str} representing an attribute of
+            L{MemCacheProtocol}.
+        @parma ar        self.assertEquals(self.reactor.calls, [])
+gs: Any positional arguments that should be passed to
+            C{command}.
+        @param kwargs: Any keyword arguments that should be passed to
+            C{command}.
+
+        @return: A L{Deferred} that fires with the result of the given command.
+        """
+        def _freeClientAfterRequest(result):
+            self.clientFree(client)
+            return result
+
+        method = getattr(client, command, None)
+        if method is not None:
+            d = method(*args, **kwargs)
+        else:
+            d = fail(Failure(NoSuchCommand()))
+
+        d.addCallback(_freeClientAfterRequest)
+
+        return d
+
+
     def performRequest(self, command, *args, **kwargs):
         """
         Select an available client and perform the given request on it.
@@ -168,9 +201,38 @@
 
         @return: A L{Deferred} that fires with the result of the given command.
         """
-        d = self._newClientConnection()
+        if len(self._freeClients) > 0:
+            client = self._freeClients.pop()
+            self.clientBusy(client)
+
+            d = self._performRequestOnClient(
+                client, command, *args, **kwargs)
+
+        elif len(self._busyClients) >= self._maxClients:
+            d = Deferred()
+            self._commands.append((d, command, args, kwargs))
+
+        else:
+            d = self._newClientConnection()
+            d.addCallback(self._performRequestOnClient,
+                          command, *args, **kwargs)
+
         return d
 
+
+    def clientGone(self, client):
+        """
+        Notify that the given client is to be removed from the pool completely.
+
+        @param client: An instance of L{PooledMemCacheProtocol}.
+        """
+        if client in self._busyClients:
+            self._busyClients.remove(client)
+
+        elif client in self._freeClients:
+            self._freeClients.remove(client)
+
+
     def clientBusy(self, client):
         """
         Notify that the given client is being used to complete a request.
@@ -193,3 +255,20 @@
             self._busyClients.remove(client)
 
         self._freeClients.add(client)
+
+        if len(self._commands) > 0:
+            d, command, args, kwargs = self._commands.pop(0)
+            _ign_d = self.performRequest(
+                command, *args, **kwargs)
+
+            _ign_d.addCallback(d.callback)
+
+
+    def suggestMaxClients(self, maxClients):
+        """
+        Suggest the maximum number of concurrently connected clients.
+
+        @param maxClients: A C{int} indicating how many client connections we
+            should keep open.
+        """
+        self._maxClients = maxClients

Modified: CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_cache.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_cache.py	2008-06-03 20:48:15 UTC (rev 2528)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_cache.py	2008-06-03 22:25:25 UTC (rev 2529)
@@ -32,7 +32,7 @@
 from twistedcaldav.cache import MemcacheResponseCache
 from twistedcaldav.cache import MemcacheChangeNotifier
 
-from twistedcaldav.test.util import InMemoryPropertyStore
+from twistedcaldav.test.util import InMemoryMemcacheProtocol
 
 
 def _newCacheToken(self):
@@ -75,28 +75,6 @@
 
 
 
-class InMemoryMemcacheProtocol(object):
-    def __init__(self):
-        self._cache = {}
-
-
-    def get(self, key):
-        if key not in self._cache:
-            return succeed((0, None))
-
-        return succeed(self._cache[key])
-
-
-    def set(self, key, value, flags=0, expireTime=0):
-        try:
-            self._cache[key] = (flags, value)
-            return succeed(True)
-
-        except Exception, err:
-            return fail(Failure())
-
-
-
 class StubURLResource(object):
     def __init__(self, url):
         self._url = url

Modified: CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_memcachepool.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_memcachepool.py	2008-06-03 20:48:15 UTC (rev 2528)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_memcachepool.py	2008-06-03 22:25:25 UTC (rev 2529)
@@ -21,6 +21,7 @@
 from twisted.internet.interfaces import IConnector, IReactorTCP
 from twisted.internet.address import IPv4Address
 
+from twistedcaldav.test.util import InMemoryMemcacheProtocol
 from twistedcaldav.memcachepool import PooledMemCacheProtocol
 from twistedcaldav.memcachepool import MemCacheClientFactory
 from twistedcaldav.memcachepool import MemCachePool
@@ -35,8 +36,8 @@
     of (status, client) tuples where status is C{'free'} or C{'busy'}
 
     @ivar calls: A C{list} of C{tuple}s of the form C{(status, client)} where
-        status is C{'free'} or C{'busy'} and client is the protocol instance
-        that made the call.
+        status is C{'free'}, C{'busy'} or C{'gone'} and client is the protocol
+        instance that made the call.
     """
     def __init__(self):
         self.calls = []
@@ -56,7 +57,14 @@
         self.calls.append(('busy', client))
 
 
+    def clientGone(self, client):
+        """
+        Record a C{'gone'} call for C{client}
+        """
+        self.calls.append(('gone', client))
 
+
+
 class StubConnector(object):
     """
     A stub L{IConnector} that can be used for testing.
@@ -98,21 +106,6 @@
     """
     Tests for the L{PooledMemCacheProtocol}
     """
-    def test_connectionMadeNotifiesPool(self):
-        """
-        Test that L{PooledMemCacheProtocol.connectionMade} notifies the
-        connectionPool that it is free.
-        """
-        p = PooledMemCacheProtocol()
-        p.factory = MemCacheClientFactory()
-        p.connectionPool = StubConnectionPool()
-        p.connectionMade()
-
-        self.assertEquals(p.connectionPool.calls, [('free', p)])
-
-        return p.factory.deferred
-
-
     def test_connectionMadeFiresDeferred(self):
         """
         Test that L{PooledMemCacheProtocol.connectionMade} fires the factory's
@@ -150,15 +143,6 @@
         self.protocol = self.factory.buildProtocol(None)
 
 
-    def test_buildProtocolGivesProtocolConnectionPool(self):
-        """
-        Test that L{MemCacheClientFactory.buildProtocol} gives it's
-        protocol instance it's connectionPool.
-        """
-        self.assertEquals(self.factory.connectionPool,
-                          self.protocol.connectionPool)
-
-
     def test_clientConnectionFailedNotifiesPool(self):
         """
         Test that L{MemCacheClientFactory.clientConnectionFailed} notifies
@@ -179,6 +163,19 @@
                           [('busy', self.protocol)])
 
 
+    def test_buildProtocolRemovesExistingClient(self):
+        """
+        Test that L{MemCacheClientFactory.buildProtocol} notifies
+        the connectionPool when an old protocol instance is going away.
+
+        This will happen when we get reconnected.  We'll remove the old protocol
+        and add a new one.
+        """
+        protocol = self.factory.buildProtocol(None)
+        self.assertEquals(self.factory.connectionPool.calls,
+                          [('gone', self.protocol)])
+
+
     def tearDown(self):
         """
         Make sure the L{MemCacheClientFactory} isn't trying to reconnect
@@ -251,12 +248,48 @@
         self.assertEquals(self.pool._freeClients, set([]))
 
 
+    def test_clientGoneRemovesFreeClient(self):
+        """
+        Test that a client in the free set gets removed when
+        L{MemCachePool.clientGone} is called.
+        """
+        p = MemCacheClientFactory().buildProtocol(None)
+        self.pool.clientFree(p)
+        self.assertEquals(self.pool._freeClients, set([p]))
+        self.assertEquals(self.pool._busyClients, set([]))
+
+        self.pool.clientGone(p)
+        self.assertEquals(self.pool._freeClients, set([]))
+
+
+    def test_clientGoneRemovesBusyClient(self):
+        """
+        Test that a client in the busy set gets removed when
+        L{MemCachePool.clientGone} is called.
+        """
+        p = MemCacheClientFactory().buildProtocol(None)
+        self.pool.clientBusy(p)
+        self.assertEquals(self.pool._busyClients, set([p]))
+        self.assertEquals(self.pool._freeClients, set([]))
+
+        self.pool.clientGone(p)
+        self.assertEquals(self.pool._busyClients, set([]))
+
+
     def test_performRequestCreatesConnection(self):
         """
         Test that L{MemCachePool.performRequest} on a fresh instance causes
         a new connection to be created.
         """
+        def _checkResult(result):
+            self.assertEquals(result, (0, 'bar'))
+
+
+        p = InMemoryMemcacheProtocol()
+        p.set('foo', 'bar')
+
         d = self.pool.performRequest('get', 'foo')
+        d.addCallback(_checkResult)
 
         args, kwargs = self.reactor.calls.pop()
 
@@ -264,8 +297,86 @@
         self.failUnless(isinstance(args[2], MemCacheClientFactory))
         self.assertEquals(kwargs, {})
 
-        factory = args[2]
-        protocol = factory.buildProtocol(None)
-        protocol.connectionMade()
+        args[2].deferred.callback(p)
 
         return d
+
+
+    def test_performRequestUsesFreeConnection(self):
+        """
+        Test that L{MemCachePool.performRequest} doesn't create a new connection
+        to be created if there is a free connection.
+        """
+        def _checkResult(result):
+            self.assertEquals(result, (0, 'bar'))
+            self.assertEquals(self.reactor.calls, [])
+
+        p = InMemoryMemcacheProtocol()
+        p.set('foo', 'bar')
+
+        self.pool.clientFree(p)
+
+        d = self.pool.performRequest('get', 'foo')
+        d.addCallback(_checkResult)
+
+        return d
+
+
+    def test_performRequestMaxBusyQueuesRequest(self):
+        """
+        Test that L{MemCachePool.performRequest} queues the request if
+        all clients are busy.
+        """
+        def _checkResult(result):
+            self.assertEquals(result, (0, 'bar'))
+            self.assertEquals(self.reactor.calls, [])
+
+        p = InMemoryMemcacheProtocol()
+        p.set('foo', 'bar')
+
+        p1 = InMemoryMemcacheProtocol()
+        p1.set('foo', 'baz')
+
+        self.pool.suggestMaxClients(2)
+
+        self.pool.clientBusy(p)
+        self.pool.clientBusy(p1)
+
+        d = self.pool.performRequest('get', 'foo')
+        d.addCallback(_checkResult)
+
+        self.pool.clientFree(p)
+
+        return d
+
+
+    def test_performRequestCreatesConnectionsUntilMaxBusy(self):
+        """
+        Test that L{MemCachePool.performRequest} will create new connections
+        until it reaches the maximum number of busy clients.
+        """
+        def _checkResult(result):
+            self.assertEquals(result, (0, 'baz'))
+
+        self.pool.suggestMaxClients(2)
+
+        p = InMemoryMemcacheProtocol()
+        p.set('foo', 'bar')
+
+        p1 = InMemoryMemcacheProtocol()
+        p1.set('foo', 'baz')
+
+
+        self.pool.clientBusy(p)
+
+        d = self.pool.performRequest('get', 'foo')
+
+        args, kwargs = self.reactor.calls.pop()
+
+        self.assertEquals(args[:2], (MC_ADDRESS.host, MC_ADDRESS.port))
+        self.failUnless(isinstance(args[2], MemCacheClientFactory))
+        self.assertEquals(kwargs, {})
+
+        args[2].deferred.callback(p1)
+
+        return d

Modified: CalendarServer/branches/memcache-reconnect/twistedcaldav/test/util.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/test/util.py	2008-06-03 20:48:15 UTC (rev 2528)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/test/util.py	2008-06-03 22:25:25 UTC (rev 2529)
@@ -47,6 +47,29 @@
         self._properties[property.qname()] = property
 
 
+
+class InMemoryMemcacheProtocol(object):
+    def __init__(self):
+        self._cache = {}
+
+
+    def get(self, key):
+        if key not in self._cache:
+            return succeed((0, None))
+
+        return succeed(self._cache[key])
+
+
+    def set(self, key, value, flags=0, expireTime=0):
+        try:
+            self._cache[key] = (flags, value)
+            return succeed(True)
+
+        except Exception, err:
+            return fail(Failure())
+
+
+
 class StubCacheChangeNotifier(object):
     def __init__(self, *args, **kwargs):
         pass

-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080603/2e95b5ad/attachment-0001.htm 


More information about the calendarserver-changes mailing list