[CalendarServer-changes] [2524] CalendarServer/branches/memcache-reconnect/twistedcaldav

source_changes at macosforge.org source_changes at macosforge.org
Mon Jun 2 16:45:10 PDT 2008


Revision: 2524
          http://trac.macosforge.org/projects/calendarserver/changeset/2524
Author:   dreid at apple.com
Date:     2008-06-02 16:45:09 -0700 (Mon, 02 Jun 2008)

Log Message:
-----------
Start of MemCache connection pooling.

Added Paths:
-----------
    CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py
    CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_memcachepool.py

Added: CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py	                        (rev 0)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py	2008-06-02 23:45:09 UTC (rev 2524)
@@ -0,0 +1,195 @@
+##
+# Copyright (c) 2008 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twisted.internet.defer import Deferred
+from twisted.internet.protocol import ReconnectingClientFactory
+
+from twistedcaldav.memcache import MemCacheProtocol
+
+
+class PooledMemCacheProtocol(MemCacheProtocol):
+    """
+    A MemCacheProtocol that will notify a connectionPool that it is ready
+    to accept requests.
+
+    @ivar factory: A L{MemCacheClientFactory} instance.
+    @ivar connectionPool: A managing connection pool that we notify of events.
+    """
+    factory = None
+    connectionPool = None
+
+    def connectionMade(self):
+        """
+        Notify our connectionPool that we're ready to accept connections.
+        """
+        MemCacheProtocol.connectionMade(self)
+        self.connectionPool.clientFree(self)
+
+        if self.factory.deferred is not None:
+            self.factory.deferred.callback(self)
+            self.factory.deferred = None
+
+
+
+class MemCacheClientFactory(ReconnectingClientFactory):
+    """
+    A client factory for MemCache that reconnects and notifies a pool of it's
+    state.
+
+    @ivar connectionPool: A managing connection pool that we notify of events.
+    @ivar deferred: A L{Deferred} that represents the initial connection.
+    @ivar _protocolInstance: The current instance of our protocol that we pass
+        to our connectionPool.
+    """
+    protocol = PooledMemCacheProtocol
+    connectionPool = None
+    _protocolInstance = None
+
+
+    def __init__(self):
+        self.deferred = Deferred()
+
+
+    def clientConnectionLost(self, connector, reason):
+        """
+        Notify the connectionPool that we've lost our connection.
+        """
+        if self._protocolInstance is not None:
+            self.connectionPool.clientBusy(self._protocolInstance)
+
+        ReconnectingClientFactory.clientConnectionLost(
+            self,
+            connector,
+            reason)
+
+
+    def clientConnectionFailed(self, connector, reason):
+        """
+        Notify the connectionPool that we're unable to connect
+        """
+        if self._protocolInstance is not None:
+            self.connectionPool.clientBusy(self._protocolInstance)
+
+        ReconnectingClientFactory.clientConnectionFailed(
+            self,
+            connector,
+            reason)
+
+
+    def buildProtocol(self, addr):
+        """
+        Attach the C{self.connectionPool} to the protocol so it can tell it,
+        when we've connected.
+        """
+        self._protocolInstance = self.protocol()
+        self._protocolInstance.connectionPool = self.connectionPool
+        self._protocolInstance.factory = self
+        return self._protocolInstance
+
+
+
+class MemCachePool(object):
+    """
+    A connection pool for MemCacheProtocol instances.
+
+    @ivar clientFactory: The L{ClientFactory} implementation that will be used
+        for each protocol.
+
+    @ivar _maxClients: A C{int} indicating the maximum number of clients.
+    @ivar _serverAddress: An L{IAddress} provider indicating the server to
+        connect to.  (Only L{IPv4Address} currently supported.)
+    @ivar _reactor: The L{IReactorTCP} provider used to initiate new
+        connections.
+
+    @ivar _busyClients: A C{set} that contains all currently busy clients.
+    @ivar _freeClients: A C{set} that contains all currently free clients.
+    """
+    clientFactory = MemCacheClientFactory
+
+    def __init__(self, serverAddress, maxClients=5, reactor=None):
+        """
+        @param serverAddress: An L{IPv4Address} indicating the server to
+            connect to.
+        @param maxClients: A C{int} indicating the maximum number of clients.
+        @param reactor: An L{IReactorTCP{ provider used to initiate new
+            connections.
+        """
+        self._serverAddress = serverAddress
+        self._maxClients = maxClients
+
+        if reactor is None:
+            from twisted.internet import reactor
+
+        self._reactor = reactor
+
+        self._busyClients = set([])
+        self._freeClients = set([])
+
+
+    def _newClientConnection(self):
+        """
+        Create a new client connection.
+
+        @return: A L{Deferred} that fires with the L{IProtocol} instance.
+        """
+        factory = self.clientFactory()
+
+        factory.connectionPool = self
+
+        self._reactor.connectTCP(self._serverAddress.host,
+                                 self._serverAddress.port,
+                                 factory)
+        return factory.deferred
+
+
+    def performRequest(self, command, *args, **kwargs):
+        """
+        Select an available client and perform the given request on it.
+
+        @param command: A C{str} representing an attribute of
+            L{MemCacheProtocol}.
+        @parma args: Any positional arguments that should be passed to
+            C{command}.
+        @param kwargs: Any keyword arguments that should be passed to
+            C{command}.
+
+        @return: A L{Deferred} that fires with the result of the given command.
+        """
+        d = self._newClientConnection()
+        return d
+
+    def clientBusy(self, client):
+        """
+        Notify that the given client is being used to complete a request.
+
+        @param client: An instance of C{self.clientFactory}
+        """
+        if client in self._freeClients:
+            self._freeClients.remove(client)
+
+        self._busyClients.add(client)
+
+
+    def clientFree(self, client):
+        """
+        Notify that the given client is free to handle more requests.
+
+        @param client: An instance of C{self.clientFactory}
+        """
+        if client in self._busyClients:
+            self._busyClients.remove(client)
+
+        self._freeClients.add(client)

Added: CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_memcachepool.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_memcachepool.py	                        (rev 0)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_memcachepool.py	2008-06-02 23:45:09 UTC (rev 2524)
@@ -0,0 +1,271 @@
+##
+# Copyright (c) 2008 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from zope.interface import implements
+
+from twisted.trial.unittest import TestCase
+
+from twisted.internet.interfaces import IConnector, IReactorTCP
+from twisted.internet.address import IPv4Address
+
+from twistedcaldav.memcachepool import PooledMemCacheProtocol
+from twistedcaldav.memcachepool import MemCacheClientFactory
+from twistedcaldav.memcachepool import MemCachePool
+
+
+MC_ADDRESS = IPv4Address('TCP', '127.0.0.1', 11211)
+
+
+class StubConnectionPool(object):
+    """
+    A stub client connection pool that records it's calls in the form of a list
+    of (status, client) tuples where status is C{'free'} or C{'busy'}
+
+    @ivar calls: A C{list} of C{tuple}s of the form C{(status, client)} where
+        status is C{'free'} or C{'busy'} and client is the protocol instance
+        that made the call.
+    """
+    def __init__(self):
+        self.calls = []
+
+
+    def clientFree(self, client):
+        """
+        Record a C{'free'} call for C{client}.
+        """
+        self.calls.append(('free', client))
+
+
+    def clientBusy(self, client):
+        """
+        Record a C{'busy'} call for C{client}.
+        """
+        self.calls.append(('busy', client))
+
+
+
+class StubConnector(object):
+    """
+    A stub L{IConnector} that can be used for testing.
+    """
+    implements(IConnector)
+
+    def connect(self):
+        """
+        A L{IConnector.connect} implementation that doesn't do anything.
+        """
+
+
+    def stopConnecting(self):
+        """
+        A L{IConnector.stopConnecting} that doesn't do anything.
+        """
+
+
+
+class StubReactor(object):
+    """
+    A stub L{IReactorTCP} that records the calls to connectTCP.
+
+    @ivar calls: A C{list} of tuples (args, kwargs) sent to connectTCP.
+    """
+    implements(IReactorTCP)
+
+    def __init__(self):
+        self.calls = []
+
+
+    def connectTCP(self, *args, **kwargs):
+        self.calls.append((args, kwargs))
+        return StubConnector()
+
+
+
+class PooledMemCacheProtocolTests(TestCase):
+    """
+    Tests for the L{PooledMemCacheProtocol}
+    """
+    def test_connectionMadeNotifiesPool(self):
+        """
+        Test that L{PooledMemCacheProtocol.connectionMade} notifies the
+        connectionPool that it is free.
+        """
+        p = PooledMemCacheProtocol()
+        p.factory = MemCacheClientFactory()
+        p.connectionPool = StubConnectionPool()
+        p.connectionMade()
+
+        self.assertEquals(p.connectionPool.calls, [('free', p)])
+
+        return p.factory.deferred
+
+
+    def test_connectionMadeFiresDeferred(self):
+        """
+        Test that L{PooledMemCacheProtocol.connectionMade} fires the factory's
+        deferred.
+        """
+        p = PooledMemCacheProtocol()
+        p.factory = MemCacheClientFactory()
+        p.connectionPool = StubConnectionPool()
+        d = p.factory.deferred
+        d.addCallback(self.assertEquals, p)
+
+        p.connectionMade()
+        return d
+
+
+class MemCacheClientFactoryTests(TestCase):
+    """
+    Tests for the L{MemCacheClientFactory}
+
+    @ivar factory: A L{MemCacheClientFactory} instance with a
+        L{StubConnectionPool}.
+    @ivar protocol: A L{PooledMemCacheProtocol} that was built by
+        L{MemCacheClientFactory.buildProtocol}.
+    @ivar pool: The L{StubConnectionPool} attached to C{self.factory} and
+        C{self.protocol}.
+    """
+    def setUp(self):
+        """
+        Create a L{MemCacheClientFactory} instance and and give it a
+        L{StubConnectionPool} instance.
+        """
+        self.pool = StubConnectionPool()
+        self.factory = MemCacheClientFactory()
+        self.factory.connectionPool = self.pool
+        self.protocol = self.factory.buildProtocol(None)
+
+
+    def test_buildProtocolGivesProtocolConnectionPool(self):
+        """
+        Test that L{MemCacheClientFactory.buildProtocol} gives it's
+        protocol instance it's connectionPool.
+        """
+        self.assertEquals(self.factory.connectionPool,
+                          self.protocol.connectionPool)
+
+
+    def test_clientConnectionFailedNotifiesPool(self):
+        """
+        Test that L{MemCacheClientFactory.clientConnectionFailed} notifies
+        the it's connectionPool that it is busy.
+        """
+        self.factory.clientConnectionFailed(StubConnector(), None)
+        self.assertEquals(self.factory.connectionPool.calls,
+                          [('busy', self.protocol)])
+
+
+    def test_clientConnectionLostNotifiesPool(self):
+        """
+        Test that L{MemCacheClientFactory.clientConnectionLost} notifies
+        the it's connectionPool that it is busy.
+        """
+        self.factory.clientConnectionLost(StubConnector(), None)
+        self.assertEquals(self.factory.connectionPool.calls,
+                          [('busy', self.protocol)])
+
+
+    def tearDown(self):
+        """
+        Make sure the L{MemCacheClientFactory} isn't trying to reconnect
+        anymore.
+        """
+        self.factory.stopTrying()
+
+
+
+class MemCachePoolTests(TestCase):
+    """
+    Tests for L{MemCachePool}.
+
+    @ivar reactor: A L{StubReactor} instance.
+    @ivar pool: A L{MemCachePool} for testing.
+    """
+    def setUp(self):
+        """
+        Create a L{MemCachePool}.
+        """
+        self.reactor = StubReactor()
+        self.pool = MemCachePool(MC_ADDRESS,
+                                 maxClients=5,
+                                 reactor=self.reactor)
+
+
+    def test_clientFreeAddsNewClient(self):
+        """
+        Test that a client not in the busy set gets added to the free set.
+        """
+        p = MemCacheClientFactory().buildProtocol(None)
+        self.pool.clientFree(p)
+
+        self.assertEquals(self.pool._freeClients, set([p]))
+
+
+    def test_clientFreeAddsBusyClient(self):
+        """
+        Test that a client in the busy set gets moved to the free set.
+        """
+        p = MemCacheClientFactory().buildProtocol(None)
+
+        self.pool.clientBusy(p)
+        self.pool.clientFree(p)
+
+        self.assertEquals(self.pool._freeClients, set([p]))
+        self.assertEquals(self.pool._busyClients, set([]))
+
+
+    def test_clientBusyAddsNewClient(self):
+        """
+        Test that a client not in the free set gets added to the busy set.
+        """
+        p = MemCacheClientFactory().buildProtocol(None)
+        self.pool.clientBusy(p)
+
+        self.assertEquals(self.pool._busyClients, set([p]))
+
+
+    def test_clientBusyAddsFreeClient(self):
+        """
+        Test that a client in the free set gets moved to the busy set.
+        """
+        p = MemCacheClientFactory().buildProtocol(None)
+
+        self.pool.clientFree(p)
+        self.pool.clientBusy(p)
+
+        self.assertEquals(self.pool._busyClients, set([p]))
+        self.assertEquals(self.pool._freeClients, set([]))
+
+
+    def test_performRequestCreatesConnection(self):
+        """
+        Test that L{MemCachePool.performRequest} on a fresh instance causes
+        a new connection to be created.
+        """
+        d = self.pool.performRequest('get', 'foo')
+
+        args, kwargs = self.reactor.calls.pop()
+
+        self.assertEquals(args[:2], (MC_ADDRESS.host, MC_ADDRESS.port))
+        self.failUnless(isinstance(args[2], MemCacheClientFactory))
+        self.assertEquals(kwargs, {})
+
+        factory = args[2]
+        protocol = factory.buildProtocol(None)
+        protocol.connectionMade()
+
+        return d

-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080602/d2f166af/attachment-0001.htm 


More information about the calendarserver-changes mailing list