[CalendarServer-changes] [2532] CalendarServer/branches/memcache-reconnect-2

source_changes at macosforge.org source_changes at macosforge.org
Tue Jun 3 16:42:35 PDT 2008


Revision: 2532
          http://trac.macosforge.org/projects/calendarserver/changeset/2532
Author:   dreid at apple.com
Date:     2008-06-03 16:42:34 -0700 (Tue, 03 Jun 2008)

Log Message:
-----------
Merge forward.

Modified Paths:
--------------
    CalendarServer/branches/memcache-reconnect-2/conf/caldavd-test.plist
    CalendarServer/branches/memcache-reconnect-2/twistedcaldav/cache.py
    CalendarServer/branches/memcache-reconnect-2/twistedcaldav/config.py
    CalendarServer/branches/memcache-reconnect-2/twistedcaldav/root.py
    CalendarServer/branches/memcache-reconnect-2/twistedcaldav/tap.py
    CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/test_cache.py
    CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/util.py

Added Paths:
-----------
    CalendarServer/branches/memcache-reconnect-2/twistedcaldav/memcachepool.py
    CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/test_memcachepool.py

Modified: CalendarServer/branches/memcache-reconnect-2/conf/caldavd-test.plist
===================================================================
--- CalendarServer/branches/memcache-reconnect-2/conf/caldavd-test.plist	2008-06-03 23:42:14 UTC (rev 2531)
+++ CalendarServer/branches/memcache-reconnect-2/conf/caldavd-test.plist	2008-06-03 23:42:34 UTC (rev 2532)
@@ -253,6 +253,8 @@
 
   <key>LogLevels</key>
   <dict>
+    <key>twistedcaldav.memcachepool</key>
+    <string>debug</string>
   </dict>
 
   <!-- Accounting -->
@@ -396,6 +398,8 @@
     <true/>
     <key>ClientEnabled</key>
     <true/>
+    <key>MaxClients</key>
+    <integer>5</integer>
     <key>memcached</key>
     <string>../memcached-1.2.5/_root/bin/memcached</string>
     <key>Options</key>

Modified: CalendarServer/branches/memcache-reconnect-2/twistedcaldav/cache.py
===================================================================
--- CalendarServer/branches/memcache-reconnect-2/twistedcaldav/cache.py	2008-06-03 23:42:14 UTC (rev 2531)
+++ CalendarServer/branches/memcache-reconnect-2/twistedcaldav/cache.py	2008-06-03 23:42:34 UTC (rev 2532)
@@ -36,7 +36,7 @@
 from twisted.internet.threads import deferToThread
 
 from twistedcaldav.log import LoggingMixIn
-from twistedcaldav.memcache import MemCacheProtocol
+from twistedcaldav.memcachepool import CachePoolUserMixIn
 from twistedcaldav.config import config
 
 
@@ -69,52 +69,26 @@
             self.uri)
 
 
-
-class MemcacheChangeNotifier(LoggingMixIn):
-    _memcacheProtocol = None
-
-    def __init__(self, resource):
+class MemcacheChangeNotifier(LoggingMixIn, CachePoolUserMixIn):
+    def __init__(self, resource, cachePool=None):
         self._resource = resource
-        self._host = config.Memcached['BindAddress']
-        self._port = config.Memcached['Port']
+        self._cachePool = cachePool
 
-        from twisted.internet import reactor
-        self._reactor = reactor
 
-
     def _newCacheToken(self):
         return str(uuid.uuid4())
 
 
-    def _getMemcacheProtocol(self):
-        if MemcacheChangeNotifier._memcacheProtocol is not None:
-            return succeed(self._memcacheProtocol)
-
-        d = ClientCreator(self._reactor, MemCacheProtocol).connectTCP(
-            self._host,
-            self._port)
-
-        def _cacheProtocol(proto):
-            MemcacheChangeNotifier._memcacheProtocol = proto
-            return proto
-
-        return d.addCallback(_cacheProtocol)
-
-
     def changed(self):
         """
         Change the cache token for a resource
 
         return: A L{Deferred} that fires when the token has been changed.
         """
-        def _updateCacheToken(proto):
-            return proto.set('cacheToken:%s' % (self._resource.url(),),
-                             self._newCacheToken())
-
         self.log_debug("Changing Cache Token for %r" % (self._resource.url(),))
-        d = self._getMemcacheProtocol()
-        d.addCallback(_updateCacheToken)
-        return d
+        return self.getCachePool().set(
+            'cacheToken:%s' % (self._resource.url(),),
+            self._newCacheToken())
 
 
 
@@ -183,19 +157,12 @@
 
 
 
-class MemcacheResponseCache(BaseResponseCache):
-    def __init__(self, docroot, host, port, reactor=None):
+class MemcacheResponseCache(BaseResponseCache, CachePoolUserMixIn):
+    def __init__(self, docroot, cachePool=None):
         self._docroot = docroot
-        self._host = host
-        self._port = port
-        if reactor is None:
-            from twisted.internet import reactor
+        self._cachePool = cachePool
 
-        self._reactor = reactor
 
-        self._memcacheProtocol = None
-
-
     def _tokenForURI(self, uri):
         """
         Get a property store for the given C{uri}.
@@ -204,8 +171,7 @@
         @return: A C{str} representing the token for the URI.
         """
 
-        return self._getMemcacheProtocol().addCallback(
-            lambda p: p.get('cacheToken:%s' % (uri,)))
+        return self.getCachePool().get('cacheToken:%s' % (uri,))
 
 
     def _getTokens(self, request):
@@ -223,21 +189,6 @@
         return d
 
 
-    def _getMemcacheProtocol(self):
-        if self._memcacheProtocol is not None:
-            return succeed(self._memcacheProtocol)
-
-        d = ClientCreator(self._reactor, MemCacheProtocol).connectTCP(
-            self._host,
-            self._port)
-
-        def _cacheProtocol(proto):
-            self._memcacheProtocol = proto
-            return proto
-
-        return d.addCallback(_cacheProtocol)
-
-
     def _hashedRequestKey(self, request):
         def _hashKey(key):
             oldkey = key
@@ -292,31 +243,23 @@
 
             return d2
 
-        def _getCache(proto, key):
+        def _getCached(key):
             self.log_debug("Checking cache for: %r" % (key,))
-            d1 = proto.get(key)
+            d1 = self.getCachePool().get(key)
             return d1.addCallback(_unpickleResponse, key)
 
-        def _getProtocol(key):
-            return self._getMemcacheProtocol().addCallback(_getCache, key)
-
         def _handleExceptions(f):
             f.trap(URINotFoundException)
             self.log_warn("Could not locate URI: %r" % f.value)
             return None
 
         d = self._hashedRequestKey(request)
-        d.addCallback(_getProtocol)
+        d.addCallback(_getCached)
         d.addErrback(_handleExceptions)
         return d
 
 
     def cacheResponseForRequest(self, request, response):
-        def _setCacheEntry(proto, key, cacheEntry):
-            self.log_debug("Adding to cache: %r = %r" % (key, cacheEntry))
-            return proto.set(key, cacheEntry).addCallback(
-                lambda _: response)
-
         def _makeCacheEntry((pToken, uToken), (key, responseBody)):
             cacheEntry = cPickle.dumps(
                 (pToken,
@@ -325,9 +268,9 @@
                   dict(list(response.headers.getAllRawHeaders())),
                   responseBody)))
 
-            d2 = self._getMemcacheProtocol()
-            d2.addCallback(_setCacheEntry, key, cacheEntry)
-            return d2
+            self.log_debug("Adding to cache: %r = %r" % (key, cacheEntry))
+            return self.getCachePool().set(key, cacheEntry).addCallback(
+                lambda _: response)
 
         def _cacheResponse((key, responseBody)):
             principalURI = self._principalURI(request.authnUser)

Modified: CalendarServer/branches/memcache-reconnect-2/twistedcaldav/config.py
===================================================================
--- CalendarServer/branches/memcache-reconnect-2/twistedcaldav/config.py	2008-06-03 23:42:14 UTC (rev 2531)
+++ CalendarServer/branches/memcache-reconnect-2/twistedcaldav/config.py	2008-06-03 23:42:34 UTC (rev 2532)
@@ -206,6 +206,7 @@
     "ListenBacklog": 50,
 
     "Memcached": {
+        "MaxClients": 5,
         "ClientEnabled": False,
         "ServerEnabled": False,
         "BindAddress": "127.0.0.1",

Copied: CalendarServer/branches/memcache-reconnect-2/twistedcaldav/memcachepool.py (from rev 2530, CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py)
===================================================================
--- CalendarServer/branches/memcache-reconnect-2/twistedcaldav/memcachepool.py	                        (rev 0)
+++ CalendarServer/branches/memcache-reconnect-2/twistedcaldav/memcachepool.py	2008-06-03 23:42:34 UTC (rev 2532)
@@ -0,0 +1,342 @@
+##
+# Copyright (c) 2008 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twisted.python.failure import Failure
+from twisted.internet.defer import Deferred, fail
+from twisted.internet.protocol import ReconnectingClientFactory
+
+from twistedcaldav.log import LoggingMixIn
+from twistedcaldav.memcache import MemCacheProtocol, NoSuchCommand
+
+
+class PooledMemCacheProtocol(MemCacheProtocol):
+    """
+    A MemCacheProtocol that will notify a connectionPool that it is ready
+    to accept requests.
+
+    @ivar factory: A L{MemCacheClientFactory} instance.
+    """
+    factory = None
+
+    def connectionMade(self):
+        """
+        Notify our factory that we're ready to accept connections.
+        """
+        MemCacheProtocol.connectionMade(self)
+
+        if self.factory.deferred is not None:
+            self.factory.deferred.callback(self)
+            self.factory.deferred = None
+
+
+
+class MemCacheClientFactory(ReconnectingClientFactory, LoggingMixIn):
+    """
+    A client factory for MemCache that reconnects and notifies a pool of it's
+    state.
+
+    @ivar connectionPool: A managing connection pool that we notify of events.
+    @ivar deferred: A L{Deferred} that represents the initial connection.
+    @ivar _protocolInstance: The current instance of our protocol that we pass
+        to our connectionPool.
+    """
+    protocol = PooledMemCacheProtocol
+    connectionPool = None
+    _protocolInstance = None
+
+
+    def __init__(self):
+        self.deferred = Deferred()
+
+
+    def clientConnectionLost(self, connector, reason):
+        """
+        Notify the connectionPool that we've lost our connection.
+        """
+        self.log_error("MemCache connection lost: %s" % (reason,))
+        if self._protocolInstance is not None:
+            self.connectionPool.clientBusy(self._protocolInstance)
+
+        ReconnectingClientFactory.clientConnectionLost(
+            self,
+            connector,
+            reason)
+
+
+    def clientConnectionFailed(self, connector, reason):
+        """
+        Notify the connectionPool that we're unable to connect
+        """
+        self.log_error("MemCache connection failed: %s" % (reason,))
+        if self._protocolInstance is not None:
+            self.connectionPool.clientBusy(self._protocolInstance)
+
+        ReconnectingClientFactory.clientConnectionFailed(
+            self,
+            connector,
+            reason)
+
+
+    def buildProtocol(self, addr):
+        """
+        Attach the C{self.connectionPool} to the protocol so it can tell it,
+        when we've connected.
+        """
+        if self._protocolInstance is not None:
+            self.connectionPool.clientGone(self._protocolInstance)
+
+        self._protocolInstance = self.protocol()
+        self._protocolInstance.factory = self
+        return self._protocolInstance
+
+
+
+class MemCachePool(LoggingMixIn):
+    """
+    A connection pool for MemCacheProtocol instances.
+
+    @ivar clientFactory: The L{ClientFactory} implementation that will be used
+        for each protocol.
+
+    @ivar _maxClients: A C{int} indicating the maximum number of clients.
+    @ivar _serverAddress: An L{IAddress} provider indicating the server to
+        connect to.  (Only L{IPv4Address} currently supported.)
+    @ivar _reactor: The L{IReactorTCP} provider used to initiate new
+        connections.
+
+    @ivar _busyClients: A C{set} that contains all currently busy clients.
+    @ivar _freeClients: A C{set} that contains all currently free clients.
+    """
+    clientFactory = MemCacheClientFactory
+
+    def __init__(self, serverAddress, maxClients=5, reactor=None):
+        """
+        @param serverAddress: An L{IPv4Address} indicating the server to
+            connect to.
+        @param maxClients: A C{int} indicating the maximum number of clients.
+        @param reactor: An L{IReactorTCP{ provider used to initiate new
+            connections.
+        """
+        self._serverAddress = serverAddress
+        self._maxClients = maxClients
+
+        if reactor is None:
+            from twisted.internet import reactor
+
+        self._reactor = reactor
+
+        self._busyClients = set([])
+        self._freeClients = set([])
+        self._commands = []
+
+
+    def _newClientConnection(self):
+        """
+        Create a new client connection.
+
+        @return: A L{Deferred} that fires with the L{IProtocol} instance.
+        """
+        self.log_debug("Initating new client connection to: %r" % (
+                self._serverAddress,))
+        self._logClientStats()
+
+        factory = self.clientFactory()
+
+        factory.connectionPool = self
+
+        self._reactor.connectTCP(self._serverAddress.host,
+                                 self._serverAddress.port,
+                                 factory)
+        return factory.deferred
+
+
+    def _performRequestOnClient(self, client, command, *args, **kwargs):
+        """
+        Perform the given request on the given client.
+
+        @param client: A L{PooledMemCacheProtocol} that will be used to perform
+            the given request.
+
+        @param command: A C{str} representing an attribute of
+            L{MemCacheProtocol}.
+        @parma ar        self.assertEquals(self.reactor.calls, [])
+gs: Any positional arguments that should be passed to
+            C{command}.
+        @param kwargs: Any keyword arguments that should be passed to
+            C{command}.
+
+        @return: A L{Deferred} that fires with the result of the given command.
+        """
+        def _freeClientAfterRequest(result):
+            self.clientFree(client)
+            return result
+
+        method = getattr(client, command, None)
+        if method is not None:
+            d = method(*args, **kwargs)
+        else:
+            d = fail(Failure(NoSuchCommand()))
+
+        d.addCallback(_freeClientAfterRequest)
+
+        return d
+
+
+    def performRequest(self, command, *args, **kwargs):
+        """
+        Select an available client and perform the given request on it.
+
+        @param command: A C{str} representing an attribute of
+            L{MemCacheProtocol}.
+        @parma args: Any positional arguments that should be passed to
+            C{command}.
+        @param kwargs: Any keyword arguments that should be passed to
+            C{command}.
+
+        @return: A L{Deferred} that fires with the result of the given command.
+        """
+        if len(self._freeClients) > 0:
+            client = self._freeClients.pop()
+            self.clientBusy(client)
+
+            d = self._performRequestOnClient(
+                client, command, *args, **kwargs)
+
+        elif len(self._busyClients) >= self._maxClients:
+            d = Deferred()
+            self._commands.append((d, command, args, kwargs))
+
+        else:
+            d = self._newClientConnection()
+            d.addCallback(self._performRequestOnClient,
+                          command, *args, **kwargs)
+
+        return d
+
+
+    def _logClientStats(self):
+        self.log_debug("Clients #free: %d, #busy: %d" % (
+                len(self._freeClients),
+                len(self._busyClients)))
+
+
+    def clientGone(self, client):
+        """
+        Notify that the given client is to be removed from the pool completely.
+
+        @param client: An instance of L{PooledMemCacheProtocol}.
+        """
+        if client in self._busyClients:
+            self._busyClients.remove(client)
+
+        elif client in self._freeClients:
+            self._freeClients.remove(client)
+
+        self.log_debug("Removed client: %r" % (client,))
+        self._logClientStats()
+
+
+    def clientBusy(self, client):
+        """
+        Notify that the given client is being used to complete a request.
+
+        @param client: An instance of C{self.clientFactory}
+        """
+        if client in self._freeClients:
+            self._freeClients.remove(client)
+
+        self._busyClients.add(client)
+
+        self.log_debug("Busied client: %r" % (client,))
+        self._logClientStats()
+
+
+    def clientFree(self, client):
+        """
+        Notify that the given client is free to handle more requests.
+
+        @param client: An instance of C{self.clientFactory}
+        """
+        if client in self._busyClients:
+            self._busyClients.remove(client)
+
+        self._freeClients.add(client)
+
+        if len(self._commands) > 0:
+            d, command, args, kwargs = self._commands.pop(0)
+            _ign_d = self.performRequest(
+                command, *args, **kwargs)
+
+            _ign_d.addCallback(d.callback)
+
+        self.log_debug("Freed client: %r" % (client,))
+        self._logClientStats()
+
+
+    def suggestMaxClients(self, maxClients):
+        """
+        Suggest the maximum number of concurrently connected clients.
+
+        @param maxClients: A C{int} indicating how many client connections we
+            should keep open.
+        """
+        self._maxClients = maxClients
+
+
+    def get(self, *args, **kwargs):
+        return self.performRequest('get', *args, **kwargs)
+
+
+    def set(self, *args, **kwargs):
+        return self.performRequest('set', *args, **kwargs)
+
+
+    def delete(self, *args, **kwargs):
+        return self.performRequest('delete', *args, **kwargs)
+
+
+    def add(self, *args, **kwargs):
+        return self.performRequest('add', *args, **kwargs)
+
+
+
+class CachePoolUserMixIn(object):
+    """
+    A mixin that returns a saved cache pool or fetches the default cache pool.
+
+    @ivar _cachePool: A saved cachePool.
+    """
+    _cachePool = None
+
+    def getCachePool(self):
+        if self._cachePool is None:
+            return defaultCachePool()
+
+        return self._cachePool
+
+
+
+_memCachePool = None
+
+def installPool(serverAddress, maxClients=5, reactor=None):
+    global _memCachePool
+    _memCachePool = MemCachePool(serverAddress,
+                                   maxClients=5,
+                                   reactor=None)
+
+
+def defaultCachePool():
+    return _memCachePool

Modified: CalendarServer/branches/memcache-reconnect-2/twistedcaldav/root.py
===================================================================
--- CalendarServer/branches/memcache-reconnect-2/twistedcaldav/root.py	2008-06-03 23:42:14 UTC (rev 2531)
+++ CalendarServer/branches/memcache-reconnect-2/twistedcaldav/root.py	2008-06-03 23:42:34 UTC (rev 2532)
@@ -57,10 +57,7 @@
         self.contentFilters = []
 
         if config.Memcached['ClientEnabled']:
-            self.responseCache = MemcacheResponseCache(
-                self.fp,
-                config.Memcached['BindAddress'],
-                config.Memcached['Port'])
+            self.responseCache = MemcacheResponseCache(self.fp)
 
             CalendarHomeFile.cacheNotifierFactory = MemcacheChangeNotifier
             DirectoryPrincipalResource.cacheNotifierFactory = MemcacheChangeNotifier

Modified: CalendarServer/branches/memcache-reconnect-2/twistedcaldav/tap.py
===================================================================
--- CalendarServer/branches/memcache-reconnect-2/twistedcaldav/tap.py	2008-06-03 23:42:14 UTC (rev 2531)
+++ CalendarServer/branches/memcache-reconnect-2/twistedcaldav/tap.py	2008-06-03 23:42:34 UTC (rev 2532)
@@ -20,6 +20,7 @@
 from zope.interface import implements
 
 from twisted.internet import reactor
+from twisted.internet.address import IPv4Address
 
 from twisted.python.log import FileLogObserver
 from twisted.python.usage import Options, UsageError
@@ -56,6 +57,7 @@
 from twistedcaldav.static import TimezoneServiceFile
 from twistedcaldav.timezones import TimezoneCache
 from twistedcaldav import pdmonster
+from twistedcaldav import memcachepool
 
 log = Logger()
 
@@ -500,6 +502,17 @@
                 SudoDirectoryService.recordType_sudoers)
 
         #
+        # Configure Memcached Client Pool
+        #
+        if config.Memcached["ClientEnabled"]:
+            memcachepool.installPool(
+                IPv4Address(
+                    'TCP',
+                    config.Memcached["BindAddress"],
+                    config.Memcached["Port"]),
+                config.Memcached["MaxClients"])
+
+        #
         # Setup Resource hierarchy
         #
 
@@ -781,6 +794,10 @@
                         config.ThreadPoolSize))
                 reactor.suggestThreadPoolSize(config.ThreadPoolSize)
 
+                log.info("Suggesting new max clients for memcache.")
+                memcachepool.getCachePool().suggestMaxClients(
+                        config.Memcached["MaxClients"])
+
             signal.signal(signal.SIGHUP, sighup_handler)
 
             #def sigusr1_handler(num, frame):

Modified: CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/test_cache.py
===================================================================
--- CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/test_cache.py	2008-06-03 23:42:14 UTC (rev 2531)
+++ CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/test_cache.py	2008-06-03 23:42:34 UTC (rev 2532)
@@ -32,7 +32,7 @@
 from twistedcaldav.cache import MemcacheResponseCache
 from twistedcaldav.cache import MemcacheChangeNotifier
 
-from twistedcaldav.test.util import InMemoryPropertyStore
+from twistedcaldav.test.util import InMemoryMemcacheProtocol
 
 
 def _newCacheToken(self):
@@ -75,28 +75,6 @@
 
 
 
-class InMemoryMemcacheProtocol(object):
-    def __init__(self):
-        self._cache = {}
-
-
-    def get(self, key):
-        if key not in self._cache:
-            return succeed((0, None))
-
-        return succeed(self._cache[key])
-
-
-    def set(self, key, value, flags=0, expireTime=0):
-        try:
-            self._cache[key] = (flags, value)
-            return succeed(True)
-
-        except Exception, err:
-            return fail(Failure())
-
-
-
 class StubURLResource(object):
     def __init__(self, url):
         self._url = url
@@ -110,8 +88,10 @@
 class MemCacheChangeNotifierTests(TestCase):
     def setUp(self):
         self.memcache = InMemoryMemcacheProtocol()
-        self.ccn = MemcacheChangeNotifier(StubURLResource(':memory:'))
-        MemcacheChangeNotifier._memcacheProtocol = self.memcache
+        self.ccn = MemcacheChangeNotifier(
+            StubURLResource(':memory:'),
+            cachePool=self.memcache)
+
         self.ccn._newCacheToken = instancemethod(_newCacheToken,
                                                  self.ccn,
                                                  MemcacheChangeNotifier)
@@ -319,7 +299,7 @@
         super(MemcacheResponseCacheTests, self).setUp()
 
         memcacheStub = InMemoryMemcacheProtocol()
-        self.rc = MemcacheResponseCache(None, None, None, None)
+        self.rc = MemcacheResponseCache(None, cachePool=memcacheStub)
         self.rc.logger.setLevel('debug')
         self.tokens = {}
 
@@ -350,7 +330,3 @@
             (self.expected_response[0],
              dict(list(self.expected_response[1].getAllRawHeaders())),
              self.expected_response[2]))))
-
-        self.rc._memcacheProtocol = memcacheStub
-
-

Copied: CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/test_memcachepool.py (from rev 2530, CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_memcachepool.py)
===================================================================
--- CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/test_memcachepool.py	                        (rev 0)
+++ CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/test_memcachepool.py	2008-06-03 23:42:34 UTC (rev 2532)
@@ -0,0 +1,382 @@
+##
+# Copyright (c) 2008 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from zope.interface import implements
+
+from twisted.trial.unittest import TestCase
+
+from twisted.internet.interfaces import IConnector, IReactorTCP
+from twisted.internet.address import IPv4Address
+
+from twistedcaldav.test.util import InMemoryMemcacheProtocol
+from twistedcaldav.memcachepool import PooledMemCacheProtocol
+from twistedcaldav.memcachepool import MemCacheClientFactory
+from twistedcaldav.memcachepool import MemCachePool
+
+
+MC_ADDRESS = IPv4Address('TCP', '127.0.0.1', 11211)
+
+
+class StubConnectionPool(object):
+    """
+    A stub client connection pool that records it's calls in the form of a list
+    of (status, client) tuples where status is C{'free'} or C{'busy'}
+
+    @ivar calls: A C{list} of C{tuple}s of the form C{(status, client)} where
+        status is C{'free'}, C{'busy'} or C{'gone'} and client is the protocol
+        instance that made the call.
+    """
+    def __init__(self):
+        self.calls = []
+
+
+    def clientFree(self, client):
+        """
+        Record a C{'free'} call for C{client}.
+        """
+        self.calls.append(('free', client))
+
+
+    def clientBusy(self, client):
+        """
+        Record a C{'busy'} call for C{client}.
+        """
+        self.calls.append(('busy', client))
+
+
+    def clientGone(self, client):
+        """
+        Record a C{'gone'} call for C{client}
+        """
+        self.calls.append(('gone', client))
+
+
+
+class StubConnector(object):
+    """
+    A stub L{IConnector} that can be used for testing.
+    """
+    implements(IConnector)
+
+    def connect(self):
+        """
+        A L{IConnector.connect} implementation that doesn't do anything.
+        """
+
+
+    def stopConnecting(self):
+        """
+        A L{IConnector.stopConnecting} that doesn't do anything.
+        """
+
+
+
+class StubReactor(object):
+    """
+    A stub L{IReactorTCP} that records the calls to connectTCP.
+
+    @ivar calls: A C{list} of tuples (args, kwargs) sent to connectTCP.
+    """
+    implements(IReactorTCP)
+
+    def __init__(self):
+        self.calls = []
+
+
+    def connectTCP(self, *args, **kwargs):
+        self.calls.append((args, kwargs))
+        return StubConnector()
+
+
+
+class PooledMemCacheProtocolTests(TestCase):
+    """
+    Tests for the L{PooledMemCacheProtocol}
+    """
+    def test_connectionMadeFiresDeferred(self):
+        """
+        Test that L{PooledMemCacheProtocol.connectionMade} fires the factory's
+        deferred.
+        """
+        p = PooledMemCacheProtocol()
+        p.factory = MemCacheClientFactory()
+        p.connectionPool = StubConnectionPool()
+        d = p.factory.deferred
+        d.addCallback(self.assertEquals, p)
+
+        p.connectionMade()
+        return d
+
+
+class MemCacheClientFactoryTests(TestCase):
+    """
+    Tests for the L{MemCacheClientFactory}
+
+    @ivar factory: A L{MemCacheClientFactory} instance with a
+        L{StubConnectionPool}.
+    @ivar protocol: A L{PooledMemCacheProtocol} that was built by
+        L{MemCacheClientFactory.buildProtocol}.
+    @ivar pool: The L{StubConnectionPool} attached to C{self.factory} and
+        C{self.protocol}.
+    """
+    def setUp(self):
+        """
+        Create a L{MemCacheClientFactory} instance and and give it a
+        L{StubConnectionPool} instance.
+        """
+        self.pool = StubConnectionPool()
+        self.factory = MemCacheClientFactory()
+        self.factory.connectionPool = self.pool
+        self.protocol = self.factory.buildProtocol(None)
+
+
+    def test_clientConnectionFailedNotifiesPool(self):
+        """
+        Test that L{MemCacheClientFactory.clientConnectionFailed} notifies
+        the it's connectionPool that it is busy.
+        """
+        self.factory.clientConnectionFailed(StubConnector(), None)
+        self.assertEquals(self.factory.connectionPool.calls,
+                          [('busy', self.protocol)])
+
+
+    def test_clientConnectionLostNotifiesPool(self):
+        """
+        Test that L{MemCacheClientFactory.clientConnectionLost} notifies
+        the it's connectionPool that it is busy.
+        """
+        self.factory.clientConnectionLost(StubConnector(), None)
+        self.assertEquals(self.factory.connectionPool.calls,
+                          [('busy', self.protocol)])
+
+
+    def test_buildProtocolRemovesExistingClient(self):
+        """
+        Test that L{MemCacheClientFactory.buildProtocol} notifies
+        the connectionPool when an old protocol instance is going away.
+
+        This will happen when we get reconnected.  We'll remove the old protocol
+        and add a new one.
+        """
+        protocol = self.factory.buildProtocol(None)
+        self.assertEquals(self.factory.connectionPool.calls,
+                          [('gone', self.protocol)])
+
+
+    def tearDown(self):
+        """
+        Make sure the L{MemCacheClientFactory} isn't trying to reconnect
+        anymore.
+        """
+        self.factory.stopTrying()
+
+
+
+class MemCachePoolTests(TestCase):
+    """
+    Tests for L{MemCachePool}.
+
+    @ivar reactor: A L{StubReactor} instance.
+    @ivar pool: A L{MemCachePool} for testing.
+    """
+    def setUp(self):
+        """
+        Create a L{MemCachePool}.
+        """
+        self.reactor = StubReactor()
+        self.pool = MemCachePool(MC_ADDRESS,
+                                 maxClients=5,
+                                 reactor=self.reactor)
+
+
+    def test_clientFreeAddsNewClient(self):
+        """
+        Test that a client not in the busy set gets added to the free set.
+        """
+        p = MemCacheClientFactory().buildProtocol(None)
+        self.pool.clientFree(p)
+
+        self.assertEquals(self.pool._freeClients, set([p]))
+
+
+    def test_clientFreeAddsBusyClient(self):
+        """
+        Test that a client in the busy set gets moved to the free set.
+        """
+        p = MemCacheClientFactory().buildProtocol(None)
+
+        self.pool.clientBusy(p)
+        self.pool.clientFree(p)
+
+        self.assertEquals(self.pool._freeClients, set([p]))
+        self.assertEquals(self.pool._busyClients, set([]))
+
+
+    def test_clientBusyAddsNewClient(self):
+        """
+        Test that a client not in the free set gets added to the busy set.
+        """
+        p = MemCacheClientFactory().buildProtocol(None)
+        self.pool.clientBusy(p)
+
+        self.assertEquals(self.pool._busyClients, set([p]))
+
+
+    def test_clientBusyAddsFreeClient(self):
+        """
+        Test that a client in the free set gets moved to the busy set.
+        """
+        p = MemCacheClientFactory().buildProtocol(None)
+
+        self.pool.clientFree(p)
+        self.pool.clientBusy(p)
+
+        self.assertEquals(self.pool._busyClients, set([p]))
+        self.assertEquals(self.pool._freeClients, set([]))
+
+
+    def test_clientGoneRemovesFreeClient(self):
+        """
+        Test that a client in the free set gets removed when
+        L{MemCachePool.clientGone} is called.
+        """
+        p = MemCacheClientFactory().buildProtocol(None)
+        self.pool.clientFree(p)
+        self.assertEquals(self.pool._freeClients, set([p]))
+        self.assertEquals(self.pool._busyClients, set([]))
+
+        self.pool.clientGone(p)
+        self.assertEquals(self.pool._freeClients, set([]))
+
+
+    def test_clientGoneRemovesBusyClient(self):
+        """
+        Test that a client in the busy set gets removed when
+        L{MemCachePool.clientGone} is called.
+        """
+        p = MemCacheClientFactory().buildProtocol(None)
+        self.pool.clientBusy(p)
+        self.assertEquals(self.pool._busyClients, set([p]))
+        self.assertEquals(self.pool._freeClients, set([]))
+
+        self.pool.clientGone(p)
+        self.assertEquals(self.pool._busyClients, set([]))
+
+
+    def test_performRequestCreatesConnection(self):
+        """
+        Test that L{MemCachePool.performRequest} on a fresh instance causes
+        a new connection to be created.
+        """
+        def _checkResult(result):
+            self.assertEquals(result, (0, 'bar'))
+
+
+        p = InMemoryMemcacheProtocol()
+        p.set('foo', 'bar')
+
+        d = self.pool.performRequest('get', 'foo')
+        d.addCallback(_checkResult)
+
+        args, kwargs = self.reactor.calls.pop()
+
+        self.assertEquals(args[:2], (MC_ADDRESS.host, MC_ADDRESS.port))
+        self.failUnless(isinstance(args[2], MemCacheClientFactory))
+        self.assertEquals(kwargs, {})
+
+        args[2].deferred.callback(p)
+
+        return d
+
+
+    def test_performRequestUsesFreeConnection(self):
+        """
+        Test that L{MemCachePool.performRequest} doesn't create a new connection
+        to be created if there is a free connection.
+        """
+        def _checkResult(result):
+            self.assertEquals(result, (0, 'bar'))
+            self.assertEquals(self.reactor.calls, [])
+
+        p = InMemoryMemcacheProtocol()
+        p.set('foo', 'bar')
+
+        self.pool.clientFree(p)
+
+        d = self.pool.performRequest('get', 'foo')
+        d.addCallback(_checkResult)
+
+        return d
+
+
+    def test_performRequestMaxBusyQueuesRequest(self):
+        """
+        Test that L{MemCachePool.performRequest} queues the request if
+        all clients are busy.
+        """
+        def _checkResult(result):
+            self.assertEquals(result, (0, 'bar'))
+            self.assertEquals(self.reactor.calls, [])
+
+        p = InMemoryMemcacheProtocol()
+        p.set('foo', 'bar')
+
+        p1 = InMemoryMemcacheProtocol()
+        p1.set('foo', 'baz')
+
+        self.pool.suggestMaxClients(2)
+
+        self.pool.clientBusy(p)
+        self.pool.clientBusy(p1)
+
+        d = self.pool.performRequest('get', 'foo')
+        d.addCallback(_checkResult)
+
+        self.pool.clientFree(p)
+
+        return d
+
+
+    def test_performRequestCreatesConnectionsUntilMaxBusy(self):
+        """
+        Test that L{MemCachePool.performRequest} will create new connections
+        until it reaches the maximum number of busy clients.
+        """
+        def _checkResult(result):
+            self.assertEquals(result, (0, 'baz'))
+
+        self.pool.suggestMaxClients(2)
+
+        p = InMemoryMemcacheProtocol()
+        p.set('foo', 'bar')
+
+        p1 = InMemoryMemcacheProtocol()
+        p1.set('foo', 'baz')
+
+
+        self.pool.clientBusy(p)
+
+        d = self.pool.performRequest('get', 'foo')
+
+        args, kwargs = self.reactor.calls.pop()
+
+        self.assertEquals(args[:2], (MC_ADDRESS.host, MC_ADDRESS.port))
+        self.failUnless(isinstance(args[2], MemCacheClientFactory))
+        self.assertEquals(kwargs, {})
+
+        args[2].deferred.callback(p1)
+
+        return d

Modified: CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/util.py
===================================================================
--- CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/util.py	2008-06-03 23:42:14 UTC (rev 2531)
+++ CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/util.py	2008-06-03 23:42:34 UTC (rev 2532)
@@ -47,6 +47,29 @@
         self._properties[property.qname()] = property
 
 
+
+class InMemoryMemcacheProtocol(object):
+    def __init__(self):
+        self._cache = {}
+
+
+    def get(self, key):
+        if key not in self._cache:
+            return succeed((0, None))
+
+        return succeed(self._cache[key])
+
+
+    def set(self, key, value, flags=0, expireTime=0):
+        try:
+            self._cache[key] = (flags, value)
+            return succeed(True)
+
+        except Exception, err:
+            return fail(Failure())
+
+
+
 class StubCacheChangeNotifier(object):
     def __init__(self, *args, **kwargs):
         pass

-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080603/b5d41c45/attachment-0001.htm 


More information about the calendarserver-changes mailing list