[CalendarServer-changes] [2532] CalendarServer/branches/memcache-reconnect-2
source_changes at macosforge.org
source_changes at macosforge.org
Tue Jun 3 16:42:35 PDT 2008
Revision: 2532
http://trac.macosforge.org/projects/calendarserver/changeset/2532
Author: dreid at apple.com
Date: 2008-06-03 16:42:34 -0700 (Tue, 03 Jun 2008)
Log Message:
-----------
Merge forward.
Modified Paths:
--------------
CalendarServer/branches/memcache-reconnect-2/conf/caldavd-test.plist
CalendarServer/branches/memcache-reconnect-2/twistedcaldav/cache.py
CalendarServer/branches/memcache-reconnect-2/twistedcaldav/config.py
CalendarServer/branches/memcache-reconnect-2/twistedcaldav/root.py
CalendarServer/branches/memcache-reconnect-2/twistedcaldav/tap.py
CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/test_cache.py
CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/util.py
Added Paths:
-----------
CalendarServer/branches/memcache-reconnect-2/twistedcaldav/memcachepool.py
CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/test_memcachepool.py
Modified: CalendarServer/branches/memcache-reconnect-2/conf/caldavd-test.plist
===================================================================
--- CalendarServer/branches/memcache-reconnect-2/conf/caldavd-test.plist 2008-06-03 23:42:14 UTC (rev 2531)
+++ CalendarServer/branches/memcache-reconnect-2/conf/caldavd-test.plist 2008-06-03 23:42:34 UTC (rev 2532)
@@ -253,6 +253,8 @@
<key>LogLevels</key>
<dict>
+ <key>twistedcaldav.memcachepool</key>
+ <string>debug</string>
</dict>
<!-- Accounting -->
@@ -396,6 +398,8 @@
<true/>
<key>ClientEnabled</key>
<true/>
+ <key>MaxClients</key>
+ <integer>5</integer>
<key>memcached</key>
<string>../memcached-1.2.5/_root/bin/memcached</string>
<key>Options</key>
Modified: CalendarServer/branches/memcache-reconnect-2/twistedcaldav/cache.py
===================================================================
--- CalendarServer/branches/memcache-reconnect-2/twistedcaldav/cache.py 2008-06-03 23:42:14 UTC (rev 2531)
+++ CalendarServer/branches/memcache-reconnect-2/twistedcaldav/cache.py 2008-06-03 23:42:34 UTC (rev 2532)
@@ -36,7 +36,7 @@
from twisted.internet.threads import deferToThread
from twistedcaldav.log import LoggingMixIn
-from twistedcaldav.memcache import MemCacheProtocol
+from twistedcaldav.memcachepool import CachePoolUserMixIn
from twistedcaldav.config import config
@@ -69,52 +69,26 @@
self.uri)
-
-class MemcacheChangeNotifier(LoggingMixIn):
- _memcacheProtocol = None
-
- def __init__(self, resource):
+class MemcacheChangeNotifier(LoggingMixIn, CachePoolUserMixIn):
+ def __init__(self, resource, cachePool=None):
self._resource = resource
- self._host = config.Memcached['BindAddress']
- self._port = config.Memcached['Port']
+ self._cachePool = cachePool
- from twisted.internet import reactor
- self._reactor = reactor
-
def _newCacheToken(self):
return str(uuid.uuid4())
- def _getMemcacheProtocol(self):
- if MemcacheChangeNotifier._memcacheProtocol is not None:
- return succeed(self._memcacheProtocol)
-
- d = ClientCreator(self._reactor, MemCacheProtocol).connectTCP(
- self._host,
- self._port)
-
- def _cacheProtocol(proto):
- MemcacheChangeNotifier._memcacheProtocol = proto
- return proto
-
- return d.addCallback(_cacheProtocol)
-
-
def changed(self):
"""
Change the cache token for a resource
return: A L{Deferred} that fires when the token has been changed.
"""
- def _updateCacheToken(proto):
- return proto.set('cacheToken:%s' % (self._resource.url(),),
- self._newCacheToken())
-
self.log_debug("Changing Cache Token for %r" % (self._resource.url(),))
- d = self._getMemcacheProtocol()
- d.addCallback(_updateCacheToken)
- return d
+ return self.getCachePool().set(
+ 'cacheToken:%s' % (self._resource.url(),),
+ self._newCacheToken())
@@ -183,19 +157,12 @@
-class MemcacheResponseCache(BaseResponseCache):
- def __init__(self, docroot, host, port, reactor=None):
+class MemcacheResponseCache(BaseResponseCache, CachePoolUserMixIn):
+ def __init__(self, docroot, cachePool=None):
self._docroot = docroot
- self._host = host
- self._port = port
- if reactor is None:
- from twisted.internet import reactor
+ self._cachePool = cachePool
- self._reactor = reactor
- self._memcacheProtocol = None
-
-
def _tokenForURI(self, uri):
"""
Get a property store for the given C{uri}.
@@ -204,8 +171,7 @@
@return: A C{str} representing the token for the URI.
"""
- return self._getMemcacheProtocol().addCallback(
- lambda p: p.get('cacheToken:%s' % (uri,)))
+ return self.getCachePool().get('cacheToken:%s' % (uri,))
def _getTokens(self, request):
@@ -223,21 +189,6 @@
return d
- def _getMemcacheProtocol(self):
- if self._memcacheProtocol is not None:
- return succeed(self._memcacheProtocol)
-
- d = ClientCreator(self._reactor, MemCacheProtocol).connectTCP(
- self._host,
- self._port)
-
- def _cacheProtocol(proto):
- self._memcacheProtocol = proto
- return proto
-
- return d.addCallback(_cacheProtocol)
-
-
def _hashedRequestKey(self, request):
def _hashKey(key):
oldkey = key
@@ -292,31 +243,23 @@
return d2
- def _getCache(proto, key):
+ def _getCached(key):
self.log_debug("Checking cache for: %r" % (key,))
- d1 = proto.get(key)
+ d1 = self.getCachePool().get(key)
return d1.addCallback(_unpickleResponse, key)
- def _getProtocol(key):
- return self._getMemcacheProtocol().addCallback(_getCache, key)
-
def _handleExceptions(f):
f.trap(URINotFoundException)
self.log_warn("Could not locate URI: %r" % f.value)
return None
d = self._hashedRequestKey(request)
- d.addCallback(_getProtocol)
+ d.addCallback(_getCached)
d.addErrback(_handleExceptions)
return d
def cacheResponseForRequest(self, request, response):
- def _setCacheEntry(proto, key, cacheEntry):
- self.log_debug("Adding to cache: %r = %r" % (key, cacheEntry))
- return proto.set(key, cacheEntry).addCallback(
- lambda _: response)
-
def _makeCacheEntry((pToken, uToken), (key, responseBody)):
cacheEntry = cPickle.dumps(
(pToken,
@@ -325,9 +268,9 @@
dict(list(response.headers.getAllRawHeaders())),
responseBody)))
- d2 = self._getMemcacheProtocol()
- d2.addCallback(_setCacheEntry, key, cacheEntry)
- return d2
+ self.log_debug("Adding to cache: %r = %r" % (key, cacheEntry))
+ return self.getCachePool().set(key, cacheEntry).addCallback(
+ lambda _: response)
def _cacheResponse((key, responseBody)):
principalURI = self._principalURI(request.authnUser)
Modified: CalendarServer/branches/memcache-reconnect-2/twistedcaldav/config.py
===================================================================
--- CalendarServer/branches/memcache-reconnect-2/twistedcaldav/config.py 2008-06-03 23:42:14 UTC (rev 2531)
+++ CalendarServer/branches/memcache-reconnect-2/twistedcaldav/config.py 2008-06-03 23:42:34 UTC (rev 2532)
@@ -206,6 +206,7 @@
"ListenBacklog": 50,
"Memcached": {
+ "MaxClients": 5,
"ClientEnabled": False,
"ServerEnabled": False,
"BindAddress": "127.0.0.1",
Copied: CalendarServer/branches/memcache-reconnect-2/twistedcaldav/memcachepool.py (from rev 2530, CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py)
===================================================================
--- CalendarServer/branches/memcache-reconnect-2/twistedcaldav/memcachepool.py (rev 0)
+++ CalendarServer/branches/memcache-reconnect-2/twistedcaldav/memcachepool.py 2008-06-03 23:42:34 UTC (rev 2532)
@@ -0,0 +1,342 @@
+##
+# Copyright (c) 2008 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twisted.python.failure import Failure
+from twisted.internet.defer import Deferred, fail
+from twisted.internet.protocol import ReconnectingClientFactory
+
+from twistedcaldav.log import LoggingMixIn
+from twistedcaldav.memcache import MemCacheProtocol, NoSuchCommand
+
+
+class PooledMemCacheProtocol(MemCacheProtocol):
+ """
+ A MemCacheProtocol that will notify a connectionPool that it is ready
+ to accept requests.
+
+ @ivar factory: A L{MemCacheClientFactory} instance.
+ """
+ factory = None
+
+ def connectionMade(self):
+ """
+ Notify our factory that we're ready to accept connections.
+ """
+ MemCacheProtocol.connectionMade(self)
+
+ if self.factory.deferred is not None:
+ self.factory.deferred.callback(self)
+ self.factory.deferred = None
+
+
+
+class MemCacheClientFactory(ReconnectingClientFactory, LoggingMixIn):
+ """
+ A client factory for MemCache that reconnects and notifies a pool of it's
+ state.
+
+ @ivar connectionPool: A managing connection pool that we notify of events.
+ @ivar deferred: A L{Deferred} that represents the initial connection.
+ @ivar _protocolInstance: The current instance of our protocol that we pass
+ to our connectionPool.
+ """
+ protocol = PooledMemCacheProtocol
+ connectionPool = None
+ _protocolInstance = None
+
+
+ def __init__(self):
+ self.deferred = Deferred()
+
+
+ def clientConnectionLost(self, connector, reason):
+ """
+ Notify the connectionPool that we've lost our connection.
+ """
+ self.log_error("MemCache connection lost: %s" % (reason,))
+ if self._protocolInstance is not None:
+ self.connectionPool.clientBusy(self._protocolInstance)
+
+ ReconnectingClientFactory.clientConnectionLost(
+ self,
+ connector,
+ reason)
+
+
+ def clientConnectionFailed(self, connector, reason):
+ """
+ Notify the connectionPool that we're unable to connect
+ """
+ self.log_error("MemCache connection failed: %s" % (reason,))
+ if self._protocolInstance is not None:
+ self.connectionPool.clientBusy(self._protocolInstance)
+
+ ReconnectingClientFactory.clientConnectionFailed(
+ self,
+ connector,
+ reason)
+
+
+ def buildProtocol(self, addr):
+ """
+ Attach the C{self.connectionPool} to the protocol so it can tell it,
+ when we've connected.
+ """
+ if self._protocolInstance is not None:
+ self.connectionPool.clientGone(self._protocolInstance)
+
+ self._protocolInstance = self.protocol()
+ self._protocolInstance.factory = self
+ return self._protocolInstance
+
+
+
+class MemCachePool(LoggingMixIn):
+ """
+ A connection pool for MemCacheProtocol instances.
+
+ @ivar clientFactory: The L{ClientFactory} implementation that will be used
+ for each protocol.
+
+ @ivar _maxClients: A C{int} indicating the maximum number of clients.
+ @ivar _serverAddress: An L{IAddress} provider indicating the server to
+ connect to. (Only L{IPv4Address} currently supported.)
+ @ivar _reactor: The L{IReactorTCP} provider used to initiate new
+ connections.
+
+ @ivar _busyClients: A C{set} that contains all currently busy clients.
+ @ivar _freeClients: A C{set} that contains all currently free clients.
+ """
+ clientFactory = MemCacheClientFactory
+
+ def __init__(self, serverAddress, maxClients=5, reactor=None):
+ """
+ @param serverAddress: An L{IPv4Address} indicating the server to
+ connect to.
+ @param maxClients: A C{int} indicating the maximum number of clients.
+ @param reactor: An L{IReactorTCP{ provider used to initiate new
+ connections.
+ """
+ self._serverAddress = serverAddress
+ self._maxClients = maxClients
+
+ if reactor is None:
+ from twisted.internet import reactor
+
+ self._reactor = reactor
+
+ self._busyClients = set([])
+ self._freeClients = set([])
+ self._commands = []
+
+
+ def _newClientConnection(self):
+ """
+ Create a new client connection.
+
+ @return: A L{Deferred} that fires with the L{IProtocol} instance.
+ """
+ self.log_debug("Initating new client connection to: %r" % (
+ self._serverAddress,))
+ self._logClientStats()
+
+ factory = self.clientFactory()
+
+ factory.connectionPool = self
+
+ self._reactor.connectTCP(self._serverAddress.host,
+ self._serverAddress.port,
+ factory)
+ return factory.deferred
+
+
+ def _performRequestOnClient(self, client, command, *args, **kwargs):
+ """
+ Perform the given request on the given client.
+
+ @param client: A L{PooledMemCacheProtocol} that will be used to perform
+ the given request.
+
+ @param command: A C{str} representing an attribute of
+ L{MemCacheProtocol}.
+ @parma ar self.assertEquals(self.reactor.calls, [])
+gs: Any positional arguments that should be passed to
+ C{command}.
+ @param kwargs: Any keyword arguments that should be passed to
+ C{command}.
+
+ @return: A L{Deferred} that fires with the result of the given command.
+ """
+ def _freeClientAfterRequest(result):
+ self.clientFree(client)
+ return result
+
+ method = getattr(client, command, None)
+ if method is not None:
+ d = method(*args, **kwargs)
+ else:
+ d = fail(Failure(NoSuchCommand()))
+
+ d.addCallback(_freeClientAfterRequest)
+
+ return d
+
+
+ def performRequest(self, command, *args, **kwargs):
+ """
+ Select an available client and perform the given request on it.
+
+ @param command: A C{str} representing an attribute of
+ L{MemCacheProtocol}.
+ @parma args: Any positional arguments that should be passed to
+ C{command}.
+ @param kwargs: Any keyword arguments that should be passed to
+ C{command}.
+
+ @return: A L{Deferred} that fires with the result of the given command.
+ """
+ if len(self._freeClients) > 0:
+ client = self._freeClients.pop()
+ self.clientBusy(client)
+
+ d = self._performRequestOnClient(
+ client, command, *args, **kwargs)
+
+ elif len(self._busyClients) >= self._maxClients:
+ d = Deferred()
+ self._commands.append((d, command, args, kwargs))
+
+ else:
+ d = self._newClientConnection()
+ d.addCallback(self._performRequestOnClient,
+ command, *args, **kwargs)
+
+ return d
+
+
+ def _logClientStats(self):
+ self.log_debug("Clients #free: %d, #busy: %d" % (
+ len(self._freeClients),
+ len(self._busyClients)))
+
+
+ def clientGone(self, client):
+ """
+ Notify that the given client is to be removed from the pool completely.
+
+ @param client: An instance of L{PooledMemCacheProtocol}.
+ """
+ if client in self._busyClients:
+ self._busyClients.remove(client)
+
+ elif client in self._freeClients:
+ self._freeClients.remove(client)
+
+ self.log_debug("Removed client: %r" % (client,))
+ self._logClientStats()
+
+
+ def clientBusy(self, client):
+ """
+ Notify that the given client is being used to complete a request.
+
+ @param client: An instance of C{self.clientFactory}
+ """
+ if client in self._freeClients:
+ self._freeClients.remove(client)
+
+ self._busyClients.add(client)
+
+ self.log_debug("Busied client: %r" % (client,))
+ self._logClientStats()
+
+
+ def clientFree(self, client):
+ """
+ Notify that the given client is free to handle more requests.
+
+ @param client: An instance of C{self.clientFactory}
+ """
+ if client in self._busyClients:
+ self._busyClients.remove(client)
+
+ self._freeClients.add(client)
+
+ if len(self._commands) > 0:
+ d, command, args, kwargs = self._commands.pop(0)
+ _ign_d = self.performRequest(
+ command, *args, **kwargs)
+
+ _ign_d.addCallback(d.callback)
+
+ self.log_debug("Freed client: %r" % (client,))
+ self._logClientStats()
+
+
+ def suggestMaxClients(self, maxClients):
+ """
+ Suggest the maximum number of concurrently connected clients.
+
+ @param maxClients: A C{int} indicating how many client connections we
+ should keep open.
+ """
+ self._maxClients = maxClients
+
+
+ def get(self, *args, **kwargs):
+ return self.performRequest('get', *args, **kwargs)
+
+
+ def set(self, *args, **kwargs):
+ return self.performRequest('set', *args, **kwargs)
+
+
+ def delete(self, *args, **kwargs):
+ return self.performRequest('delete', *args, **kwargs)
+
+
+ def add(self, *args, **kwargs):
+ return self.performRequest('add', *args, **kwargs)
+
+
+
+class CachePoolUserMixIn(object):
+ """
+ A mixin that returns a saved cache pool or fetches the default cache pool.
+
+ @ivar _cachePool: A saved cachePool.
+ """
+ _cachePool = None
+
+ def getCachePool(self):
+ if self._cachePool is None:
+ return defaultCachePool()
+
+ return self._cachePool
+
+
+
+_memCachePool = None
+
+def installPool(serverAddress, maxClients=5, reactor=None):
+ global _memCachePool
+ _memCachePool = MemCachePool(serverAddress,
+ maxClients=5,
+ reactor=None)
+
+
+def defaultCachePool():
+ return _memCachePool
Modified: CalendarServer/branches/memcache-reconnect-2/twistedcaldav/root.py
===================================================================
--- CalendarServer/branches/memcache-reconnect-2/twistedcaldav/root.py 2008-06-03 23:42:14 UTC (rev 2531)
+++ CalendarServer/branches/memcache-reconnect-2/twistedcaldav/root.py 2008-06-03 23:42:34 UTC (rev 2532)
@@ -57,10 +57,7 @@
self.contentFilters = []
if config.Memcached['ClientEnabled']:
- self.responseCache = MemcacheResponseCache(
- self.fp,
- config.Memcached['BindAddress'],
- config.Memcached['Port'])
+ self.responseCache = MemcacheResponseCache(self.fp)
CalendarHomeFile.cacheNotifierFactory = MemcacheChangeNotifier
DirectoryPrincipalResource.cacheNotifierFactory = MemcacheChangeNotifier
Modified: CalendarServer/branches/memcache-reconnect-2/twistedcaldav/tap.py
===================================================================
--- CalendarServer/branches/memcache-reconnect-2/twistedcaldav/tap.py 2008-06-03 23:42:14 UTC (rev 2531)
+++ CalendarServer/branches/memcache-reconnect-2/twistedcaldav/tap.py 2008-06-03 23:42:34 UTC (rev 2532)
@@ -20,6 +20,7 @@
from zope.interface import implements
from twisted.internet import reactor
+from twisted.internet.address import IPv4Address
from twisted.python.log import FileLogObserver
from twisted.python.usage import Options, UsageError
@@ -56,6 +57,7 @@
from twistedcaldav.static import TimezoneServiceFile
from twistedcaldav.timezones import TimezoneCache
from twistedcaldav import pdmonster
+from twistedcaldav import memcachepool
log = Logger()
@@ -500,6 +502,17 @@
SudoDirectoryService.recordType_sudoers)
#
+ # Configure Memcached Client Pool
+ #
+ if config.Memcached["ClientEnabled"]:
+ memcachepool.installPool(
+ IPv4Address(
+ 'TCP',
+ config.Memcached["BindAddress"],
+ config.Memcached["Port"]),
+ config.Memcached["MaxClients"])
+
+ #
# Setup Resource hierarchy
#
@@ -781,6 +794,10 @@
config.ThreadPoolSize))
reactor.suggestThreadPoolSize(config.ThreadPoolSize)
+ log.info("Suggesting new max clients for memcache.")
+ memcachepool.getCachePool().suggestMaxClients(
+ config.Memcached["MaxClients"])
+
signal.signal(signal.SIGHUP, sighup_handler)
#def sigusr1_handler(num, frame):
Modified: CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/test_cache.py
===================================================================
--- CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/test_cache.py 2008-06-03 23:42:14 UTC (rev 2531)
+++ CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/test_cache.py 2008-06-03 23:42:34 UTC (rev 2532)
@@ -32,7 +32,7 @@
from twistedcaldav.cache import MemcacheResponseCache
from twistedcaldav.cache import MemcacheChangeNotifier
-from twistedcaldav.test.util import InMemoryPropertyStore
+from twistedcaldav.test.util import InMemoryMemcacheProtocol
def _newCacheToken(self):
@@ -75,28 +75,6 @@
-class InMemoryMemcacheProtocol(object):
- def __init__(self):
- self._cache = {}
-
-
- def get(self, key):
- if key not in self._cache:
- return succeed((0, None))
-
- return succeed(self._cache[key])
-
-
- def set(self, key, value, flags=0, expireTime=0):
- try:
- self._cache[key] = (flags, value)
- return succeed(True)
-
- except Exception, err:
- return fail(Failure())
-
-
-
class StubURLResource(object):
def __init__(self, url):
self._url = url
@@ -110,8 +88,10 @@
class MemCacheChangeNotifierTests(TestCase):
def setUp(self):
self.memcache = InMemoryMemcacheProtocol()
- self.ccn = MemcacheChangeNotifier(StubURLResource(':memory:'))
- MemcacheChangeNotifier._memcacheProtocol = self.memcache
+ self.ccn = MemcacheChangeNotifier(
+ StubURLResource(':memory:'),
+ cachePool=self.memcache)
+
self.ccn._newCacheToken = instancemethod(_newCacheToken,
self.ccn,
MemcacheChangeNotifier)
@@ -319,7 +299,7 @@
super(MemcacheResponseCacheTests, self).setUp()
memcacheStub = InMemoryMemcacheProtocol()
- self.rc = MemcacheResponseCache(None, None, None, None)
+ self.rc = MemcacheResponseCache(None, cachePool=memcacheStub)
self.rc.logger.setLevel('debug')
self.tokens = {}
@@ -350,7 +330,3 @@
(self.expected_response[0],
dict(list(self.expected_response[1].getAllRawHeaders())),
self.expected_response[2]))))
-
- self.rc._memcacheProtocol = memcacheStub
-
-
Copied: CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/test_memcachepool.py (from rev 2530, CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_memcachepool.py)
===================================================================
--- CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/test_memcachepool.py (rev 0)
+++ CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/test_memcachepool.py 2008-06-03 23:42:34 UTC (rev 2532)
@@ -0,0 +1,382 @@
+##
+# Copyright (c) 2008 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from zope.interface import implements
+
+from twisted.trial.unittest import TestCase
+
+from twisted.internet.interfaces import IConnector, IReactorTCP
+from twisted.internet.address import IPv4Address
+
+from twistedcaldav.test.util import InMemoryMemcacheProtocol
+from twistedcaldav.memcachepool import PooledMemCacheProtocol
+from twistedcaldav.memcachepool import MemCacheClientFactory
+from twistedcaldav.memcachepool import MemCachePool
+
+
+MC_ADDRESS = IPv4Address('TCP', '127.0.0.1', 11211)
+
+
+class StubConnectionPool(object):
+ """
+ A stub client connection pool that records it's calls in the form of a list
+ of (status, client) tuples where status is C{'free'} or C{'busy'}
+
+ @ivar calls: A C{list} of C{tuple}s of the form C{(status, client)} where
+ status is C{'free'}, C{'busy'} or C{'gone'} and client is the protocol
+ instance that made the call.
+ """
+ def __init__(self):
+ self.calls = []
+
+
+ def clientFree(self, client):
+ """
+ Record a C{'free'} call for C{client}.
+ """
+ self.calls.append(('free', client))
+
+
+ def clientBusy(self, client):
+ """
+ Record a C{'busy'} call for C{client}.
+ """
+ self.calls.append(('busy', client))
+
+
+ def clientGone(self, client):
+ """
+ Record a C{'gone'} call for C{client}
+ """
+ self.calls.append(('gone', client))
+
+
+
+class StubConnector(object):
+ """
+ A stub L{IConnector} that can be used for testing.
+ """
+ implements(IConnector)
+
+ def connect(self):
+ """
+ A L{IConnector.connect} implementation that doesn't do anything.
+ """
+
+
+ def stopConnecting(self):
+ """
+ A L{IConnector.stopConnecting} that doesn't do anything.
+ """
+
+
+
+class StubReactor(object):
+ """
+ A stub L{IReactorTCP} that records the calls to connectTCP.
+
+ @ivar calls: A C{list} of tuples (args, kwargs) sent to connectTCP.
+ """
+ implements(IReactorTCP)
+
+ def __init__(self):
+ self.calls = []
+
+
+ def connectTCP(self, *args, **kwargs):
+ self.calls.append((args, kwargs))
+ return StubConnector()
+
+
+
+class PooledMemCacheProtocolTests(TestCase):
+ """
+ Tests for the L{PooledMemCacheProtocol}
+ """
+ def test_connectionMadeFiresDeferred(self):
+ """
+ Test that L{PooledMemCacheProtocol.connectionMade} fires the factory's
+ deferred.
+ """
+ p = PooledMemCacheProtocol()
+ p.factory = MemCacheClientFactory()
+ p.connectionPool = StubConnectionPool()
+ d = p.factory.deferred
+ d.addCallback(self.assertEquals, p)
+
+ p.connectionMade()
+ return d
+
+
+class MemCacheClientFactoryTests(TestCase):
+ """
+ Tests for the L{MemCacheClientFactory}
+
+ @ivar factory: A L{MemCacheClientFactory} instance with a
+ L{StubConnectionPool}.
+ @ivar protocol: A L{PooledMemCacheProtocol} that was built by
+ L{MemCacheClientFactory.buildProtocol}.
+ @ivar pool: The L{StubConnectionPool} attached to C{self.factory} and
+ C{self.protocol}.
+ """
+ def setUp(self):
+ """
+ Create a L{MemCacheClientFactory} instance and and give it a
+ L{StubConnectionPool} instance.
+ """
+ self.pool = StubConnectionPool()
+ self.factory = MemCacheClientFactory()
+ self.factory.connectionPool = self.pool
+ self.protocol = self.factory.buildProtocol(None)
+
+
+ def test_clientConnectionFailedNotifiesPool(self):
+ """
+ Test that L{MemCacheClientFactory.clientConnectionFailed} notifies
+ the it's connectionPool that it is busy.
+ """
+ self.factory.clientConnectionFailed(StubConnector(), None)
+ self.assertEquals(self.factory.connectionPool.calls,
+ [('busy', self.protocol)])
+
+
+ def test_clientConnectionLostNotifiesPool(self):
+ """
+ Test that L{MemCacheClientFactory.clientConnectionLost} notifies
+ the it's connectionPool that it is busy.
+ """
+ self.factory.clientConnectionLost(StubConnector(), None)
+ self.assertEquals(self.factory.connectionPool.calls,
+ [('busy', self.protocol)])
+
+
+ def test_buildProtocolRemovesExistingClient(self):
+ """
+ Test that L{MemCacheClientFactory.buildProtocol} notifies
+ the connectionPool when an old protocol instance is going away.
+
+ This will happen when we get reconnected. We'll remove the old protocol
+ and add a new one.
+ """
+ protocol = self.factory.buildProtocol(None)
+ self.assertEquals(self.factory.connectionPool.calls,
+ [('gone', self.protocol)])
+
+
+ def tearDown(self):
+ """
+ Make sure the L{MemCacheClientFactory} isn't trying to reconnect
+ anymore.
+ """
+ self.factory.stopTrying()
+
+
+
+class MemCachePoolTests(TestCase):
+ """
+ Tests for L{MemCachePool}.
+
+ @ivar reactor: A L{StubReactor} instance.
+ @ivar pool: A L{MemCachePool} for testing.
+ """
+ def setUp(self):
+ """
+ Create a L{MemCachePool}.
+ """
+ self.reactor = StubReactor()
+ self.pool = MemCachePool(MC_ADDRESS,
+ maxClients=5,
+ reactor=self.reactor)
+
+
+ def test_clientFreeAddsNewClient(self):
+ """
+ Test that a client not in the busy set gets added to the free set.
+ """
+ p = MemCacheClientFactory().buildProtocol(None)
+ self.pool.clientFree(p)
+
+ self.assertEquals(self.pool._freeClients, set([p]))
+
+
+ def test_clientFreeAddsBusyClient(self):
+ """
+ Test that a client in the busy set gets moved to the free set.
+ """
+ p = MemCacheClientFactory().buildProtocol(None)
+
+ self.pool.clientBusy(p)
+ self.pool.clientFree(p)
+
+ self.assertEquals(self.pool._freeClients, set([p]))
+ self.assertEquals(self.pool._busyClients, set([]))
+
+
+ def test_clientBusyAddsNewClient(self):
+ """
+ Test that a client not in the free set gets added to the busy set.
+ """
+ p = MemCacheClientFactory().buildProtocol(None)
+ self.pool.clientBusy(p)
+
+ self.assertEquals(self.pool._busyClients, set([p]))
+
+
+ def test_clientBusyAddsFreeClient(self):
+ """
+ Test that a client in the free set gets moved to the busy set.
+ """
+ p = MemCacheClientFactory().buildProtocol(None)
+
+ self.pool.clientFree(p)
+ self.pool.clientBusy(p)
+
+ self.assertEquals(self.pool._busyClients, set([p]))
+ self.assertEquals(self.pool._freeClients, set([]))
+
+
+ def test_clientGoneRemovesFreeClient(self):
+ """
+ Test that a client in the free set gets removed when
+ L{MemCachePool.clientGone} is called.
+ """
+ p = MemCacheClientFactory().buildProtocol(None)
+ self.pool.clientFree(p)
+ self.assertEquals(self.pool._freeClients, set([p]))
+ self.assertEquals(self.pool._busyClients, set([]))
+
+ self.pool.clientGone(p)
+ self.assertEquals(self.pool._freeClients, set([]))
+
+
+ def test_clientGoneRemovesBusyClient(self):
+ """
+ Test that a client in the busy set gets removed when
+ L{MemCachePool.clientGone} is called.
+ """
+ p = MemCacheClientFactory().buildProtocol(None)
+ self.pool.clientBusy(p)
+ self.assertEquals(self.pool._busyClients, set([p]))
+ self.assertEquals(self.pool._freeClients, set([]))
+
+ self.pool.clientGone(p)
+ self.assertEquals(self.pool._busyClients, set([]))
+
+
+ def test_performRequestCreatesConnection(self):
+ """
+ Test that L{MemCachePool.performRequest} on a fresh instance causes
+ a new connection to be created.
+ """
+ def _checkResult(result):
+ self.assertEquals(result, (0, 'bar'))
+
+
+ p = InMemoryMemcacheProtocol()
+ p.set('foo', 'bar')
+
+ d = self.pool.performRequest('get', 'foo')
+ d.addCallback(_checkResult)
+
+ args, kwargs = self.reactor.calls.pop()
+
+ self.assertEquals(args[:2], (MC_ADDRESS.host, MC_ADDRESS.port))
+ self.failUnless(isinstance(args[2], MemCacheClientFactory))
+ self.assertEquals(kwargs, {})
+
+ args[2].deferred.callback(p)
+
+ return d
+
+
+ def test_performRequestUsesFreeConnection(self):
+ """
+ Test that L{MemCachePool.performRequest} doesn't create a new connection
+ to be created if there is a free connection.
+ """
+ def _checkResult(result):
+ self.assertEquals(result, (0, 'bar'))
+ self.assertEquals(self.reactor.calls, [])
+
+ p = InMemoryMemcacheProtocol()
+ p.set('foo', 'bar')
+
+ self.pool.clientFree(p)
+
+ d = self.pool.performRequest('get', 'foo')
+ d.addCallback(_checkResult)
+
+ return d
+
+
+ def test_performRequestMaxBusyQueuesRequest(self):
+ """
+ Test that L{MemCachePool.performRequest} queues the request if
+ all clients are busy.
+ """
+ def _checkResult(result):
+ self.assertEquals(result, (0, 'bar'))
+ self.assertEquals(self.reactor.calls, [])
+
+ p = InMemoryMemcacheProtocol()
+ p.set('foo', 'bar')
+
+ p1 = InMemoryMemcacheProtocol()
+ p1.set('foo', 'baz')
+
+ self.pool.suggestMaxClients(2)
+
+ self.pool.clientBusy(p)
+ self.pool.clientBusy(p1)
+
+ d = self.pool.performRequest('get', 'foo')
+ d.addCallback(_checkResult)
+
+ self.pool.clientFree(p)
+
+ return d
+
+
+ def test_performRequestCreatesConnectionsUntilMaxBusy(self):
+ """
+ Test that L{MemCachePool.performRequest} will create new connections
+ until it reaches the maximum number of busy clients.
+ """
+ def _checkResult(result):
+ self.assertEquals(result, (0, 'baz'))
+
+ self.pool.suggestMaxClients(2)
+
+ p = InMemoryMemcacheProtocol()
+ p.set('foo', 'bar')
+
+ p1 = InMemoryMemcacheProtocol()
+ p1.set('foo', 'baz')
+
+
+ self.pool.clientBusy(p)
+
+ d = self.pool.performRequest('get', 'foo')
+
+ args, kwargs = self.reactor.calls.pop()
+
+ self.assertEquals(args[:2], (MC_ADDRESS.host, MC_ADDRESS.port))
+ self.failUnless(isinstance(args[2], MemCacheClientFactory))
+ self.assertEquals(kwargs, {})
+
+ args[2].deferred.callback(p1)
+
+ return d
Modified: CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/util.py
===================================================================
--- CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/util.py 2008-06-03 23:42:14 UTC (rev 2531)
+++ CalendarServer/branches/memcache-reconnect-2/twistedcaldav/test/util.py 2008-06-03 23:42:34 UTC (rev 2532)
@@ -47,6 +47,29 @@
self._properties[property.qname()] = property
+
+class InMemoryMemcacheProtocol(object):
+ def __init__(self):
+ self._cache = {}
+
+
+ def get(self, key):
+ if key not in self._cache:
+ return succeed((0, None))
+
+ return succeed(self._cache[key])
+
+
+ def set(self, key, value, flags=0, expireTime=0):
+ try:
+ self._cache[key] = (flags, value)
+ return succeed(True)
+
+ except Exception, err:
+ return fail(Failure())
+
+
+
class StubCacheChangeNotifier(object):
def __init__(self, *args, **kwargs):
pass
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080603/b5d41c45/attachment-0001.htm
More information about the calendarserver-changes
mailing list