[CalendarServer-changes] [2530] CalendarServer/branches/memcache-reconnect
source_changes at macosforge.org
source_changes at macosforge.org
Tue Jun 3 16:30:09 PDT 2008
Revision: 2530
http://trac.macosforge.org/projects/calendarserver/changeset/2530
Author: dreid at apple.com
Date: 2008-06-03 16:30:08 -0700 (Tue, 03 Jun 2008)
Log Message:
-----------
Memcache connection pool integration.
Modified Paths:
--------------
CalendarServer/branches/memcache-reconnect/conf/caldavd-test.plist
CalendarServer/branches/memcache-reconnect/twistedcaldav/cache.py
CalendarServer/branches/memcache-reconnect/twistedcaldav/config.py
CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py
CalendarServer/branches/memcache-reconnect/twistedcaldav/root.py
CalendarServer/branches/memcache-reconnect/twistedcaldav/tap.py
CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_cache.py
Modified: CalendarServer/branches/memcache-reconnect/conf/caldavd-test.plist
===================================================================
--- CalendarServer/branches/memcache-reconnect/conf/caldavd-test.plist 2008-06-03 22:25:25 UTC (rev 2529)
+++ CalendarServer/branches/memcache-reconnect/conf/caldavd-test.plist 2008-06-03 23:30:08 UTC (rev 2530)
@@ -396,6 +396,8 @@
<true/>
<key>ClientEnabled</key>
<true/>
+ <key>MaxClients</key>
+ <integer>5</integer>
<key>memcached</key>
<string>../memcached-1.2.5/_root/bin/memcached</string>
<key>Options</key>
Modified: CalendarServer/branches/memcache-reconnect/twistedcaldav/cache.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/cache.py 2008-06-03 22:25:25 UTC (rev 2529)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/cache.py 2008-06-03 23:30:08 UTC (rev 2530)
@@ -36,7 +36,7 @@
from twisted.internet.threads import deferToThread
from twistedcaldav.log import LoggingMixIn
-from twistedcaldav.memcache import MemCacheProtocol
+from twistedcaldav.memcachepool import CachePoolUserMixIn
from twistedcaldav.config import config
@@ -69,52 +69,26 @@
self.uri)
-
-class MemcacheChangeNotifier(LoggingMixIn):
- _memcacheProtocol = None
-
- def __init__(self, resource):
+class MemcacheChangeNotifier(LoggingMixIn, CachePoolUserMixIn):
+ def __init__(self, resource, cachePool=None):
self._resource = resource
- self._host = config.Memcached['BindAddress']
- self._port = config.Memcached['Port']
+ self._cachePool = cachePool
- from twisted.internet import reactor
- self._reactor = reactor
-
def _newCacheToken(self):
return str(uuid.uuid4())
- def _getMemcacheProtocol(self):
- if MemcacheChangeNotifier._memcacheProtocol is not None:
- return succeed(self._memcacheProtocol)
-
- d = ClientCreator(self._reactor, MemCacheProtocol).connectTCP(
- self._host,
- self._port)
-
- def _cacheProtocol(proto):
- MemcacheChangeNotifier._memcacheProtocol = proto
- return proto
-
- return d.addCallback(_cacheProtocol)
-
-
def changed(self):
"""
Change the cache token for a resource
return: A L{Deferred} that fires when the token has been changed.
"""
- def _updateCacheToken(proto):
- return proto.set('cacheToken:%s' % (self._resource.url(),),
- self._newCacheToken())
-
self.log_debug("Changing Cache Token for %r" % (self._resource.url(),))
- d = self._getMemcacheProtocol()
- d.addCallback(_updateCacheToken)
- return d
+ return self.getCachePool().set(
+ 'cacheToken:%s' % (self._resource.url(),),
+ self._newCacheToken())
@@ -183,19 +157,12 @@
-class MemcacheResponseCache(BaseResponseCache):
- def __init__(self, docroot, host, port, reactor=None):
+class MemcacheResponseCache(BaseResponseCache, CachePoolUserMixIn):
+ def __init__(self, docroot, cachePool=None):
self._docroot = docroot
- self._host = host
- self._port = port
- if reactor is None:
- from twisted.internet import reactor
+ self._cachePool = cachePool
- self._reactor = reactor
- self._memcacheProtocol = None
-
-
def _tokenForURI(self, uri):
"""
Get a property store for the given C{uri}.
@@ -204,8 +171,7 @@
@return: A C{str} representing the token for the URI.
"""
- return self._getMemcacheProtocol().addCallback(
- lambda p: p.get('cacheToken:%s' % (uri,)))
+ return self.getCachePool().get('cacheToken:%s' % (uri,))
def _getTokens(self, request):
@@ -223,21 +189,6 @@
return d
- def _getMemcacheProtocol(self):
- if self._memcacheProtocol is not None:
- return succeed(self._memcacheProtocol)
-
- d = ClientCreator(self._reactor, MemCacheProtocol).connectTCP(
- self._host,
- self._port)
-
- def _cacheProtocol(proto):
- self._memcacheProtocol = proto
- return proto
-
- return d.addCallback(_cacheProtocol)
-
-
def _hashedRequestKey(self, request):
def _hashKey(key):
oldkey = key
@@ -292,31 +243,23 @@
return d2
- def _getCache(proto, key):
+ def _getCached(key):
self.log_debug("Checking cache for: %r" % (key,))
- d1 = proto.get(key)
+ d1 = self.getCachePool().get(key)
return d1.addCallback(_unpickleResponse, key)
- def _getProtocol(key):
- return self._getMemcacheProtocol().addCallback(_getCache, key)
-
def _handleExceptions(f):
f.trap(URINotFoundException)
self.log_warn("Could not locate URI: %r" % f.value)
return None
d = self._hashedRequestKey(request)
- d.addCallback(_getProtocol)
+ d.addCallback(_getCached)
d.addErrback(_handleExceptions)
return d
def cacheResponseForRequest(self, request, response):
- def _setCacheEntry(proto, key, cacheEntry):
- self.log_debug("Adding to cache: %r = %r" % (key, cacheEntry))
- return proto.set(key, cacheEntry).addCallback(
- lambda _: response)
-
def _makeCacheEntry((pToken, uToken), (key, responseBody)):
cacheEntry = cPickle.dumps(
(pToken,
@@ -325,9 +268,9 @@
dict(list(response.headers.getAllRawHeaders())),
responseBody)))
- d2 = self._getMemcacheProtocol()
- d2.addCallback(_setCacheEntry, key, cacheEntry)
- return d2
+ self.log_debug("Adding to cache: %r = %r" % (key, cacheEntry))
+ return self.getCachePool().set(key, cacheEntry).addCallback(
+ lambda _: response)
def _cacheResponse((key, responseBody)):
principalURI = self._principalURI(request.authnUser)
Modified: CalendarServer/branches/memcache-reconnect/twistedcaldav/config.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/config.py 2008-06-03 22:25:25 UTC (rev 2529)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/config.py 2008-06-03 23:30:08 UTC (rev 2530)
@@ -206,6 +206,7 @@
"ListenBacklog": 50,
"Memcached": {
+ "MaxClients": 5,
"ClientEnabled": False,
"ServerEnabled": False,
"BindAddress": "127.0.0.1",
Modified: CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py 2008-06-03 22:25:25 UTC (rev 2529)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py 2008-06-03 23:30:08 UTC (rev 2530)
@@ -18,6 +18,7 @@
from twisted.internet.defer import Deferred, fail
from twisted.internet.protocol import ReconnectingClientFactory
+from twistedcaldav.log import LoggingMixIn
from twistedcaldav.memcache import MemCacheProtocol, NoSuchCommand
@@ -42,7 +43,7 @@
-class MemCacheClientFactory(ReconnectingClientFactory):
+class MemCacheClientFactory(ReconnectingClientFactory, LoggingMixIn):
"""
A client factory for MemCache that reconnects and notifies a pool of it's
state.
@@ -65,6 +66,7 @@
"""
Notify the connectionPool that we've lost our connection.
"""
+ self.log_error("MemCache connection lost: %s" % (reason,))
if self._protocolInstance is not None:
self.connectionPool.clientBusy(self._protocolInstance)
@@ -78,6 +80,7 @@
"""
Notify the connectionPool that we're unable to connect
"""
+ self.log_error("MemCache connection failed: %s" % (reason,))
if self._protocolInstance is not None:
self.connectionPool.clientBusy(self._protocolInstance)
@@ -101,7 +104,7 @@
-class MemCachePool(object):
+class MemCachePool(LoggingMixIn):
"""
A connection pool for MemCacheProtocol instances.
@@ -146,6 +149,10 @@
@return: A L{Deferred} that fires with the L{IProtocol} instance.
"""
+ self.log_debug("Initating new client connection to: %r" % (
+ self._serverAddress,))
+ self._logClientStats()
+
factory = self.clientFactory()
factory.connectionPool = self
@@ -220,6 +227,12 @@
return d
+ def _logClientStats(self):
+ self.log_debug("Clients #free: %d, #busy: %d" % (
+ len(self._freeClients),
+ len(self._busyClients)))
+
+
def clientGone(self, client):
"""
Notify that the given client is to be removed from the pool completely.
@@ -232,7 +245,10 @@
elif client in self._freeClients:
self._freeClients.remove(client)
+ self.log_debug("Removed client: %r" % (client,))
+ self._logClientStats()
+
def clientBusy(self, client):
"""
Notify that the given client is being used to complete a request.
@@ -244,7 +260,10 @@
self._busyClients.add(client)
+ self.log_debug("Busied client: %r" % (client,))
+ self._logClientStats()
+
def clientFree(self, client):
"""
Notify that the given client is free to handle more requests.
@@ -263,7 +282,10 @@
_ign_d.addCallback(d.callback)
+ self.log_debug("Freed client: %r" % (client,))
+ self._logClientStats()
+
def suggestMaxClients(self, maxClients):
"""
Suggest the maximum number of concurrently connected clients.
@@ -272,3 +294,49 @@
should keep open.
"""
self._maxClients = maxClients
+
+
+ def get(self, *args, **kwargs):
+ return self.performRequest('get', *args, **kwargs)
+
+
+ def set(self, *args, **kwargs):
+ return self.performRequest('set', *args, **kwargs)
+
+
+ def delete(self, *args, **kwargs):
+ return self.performRequest('delete', *args, **kwargs)
+
+
+ def add(self, *args, **kwargs):
+ return self.performRequest('add', *args, **kwargs)
+
+
+
+class CachePoolUserMixIn(object):
+ """
+ A mixin that returns a saved cache pool or fetches the default cache pool.
+
+ @ivar _cachePool: A saved cachePool.
+ """
+ _cachePool = None
+
+ def getCachePool(self):
+ if self._cachePool is None:
+ return defaultCachePool()
+
+ return self._cachePool
+
+
+
+_memCachePool = None
+
+def installPool(serverAddress, maxClients=5, reactor=None):
+ global _memCachePool
+ _memCachePool = MemCachePool(serverAddress,
+ maxClients=5,
+ reactor=None)
+
+
+def defaultCachePool():
+ return _memCachePool
Modified: CalendarServer/branches/memcache-reconnect/twistedcaldav/root.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/root.py 2008-06-03 22:25:25 UTC (rev 2529)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/root.py 2008-06-03 23:30:08 UTC (rev 2530)
@@ -57,10 +57,7 @@
self.contentFilters = []
if config.Memcached['ClientEnabled']:
- self.responseCache = MemcacheResponseCache(
- self.fp,
- config.Memcached['BindAddress'],
- config.Memcached['Port'])
+ self.responseCache = MemcacheResponseCache(self.fp)
CalendarHomeFile.cacheNotifierFactory = MemcacheChangeNotifier
DirectoryPrincipalResource.cacheNotifierFactory = MemcacheChangeNotifier
Modified: CalendarServer/branches/memcache-reconnect/twistedcaldav/tap.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/tap.py 2008-06-03 22:25:25 UTC (rev 2529)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/tap.py 2008-06-03 23:30:08 UTC (rev 2530)
@@ -20,6 +20,7 @@
from zope.interface import implements
from twisted.internet import reactor
+from twisted.internet.address import IPv4Address
from twisted.python.log import FileLogObserver
from twisted.python.usage import Options, UsageError
@@ -56,6 +57,7 @@
from twistedcaldav.static import TimezoneServiceFile
from twistedcaldav.timezones import TimezoneCache
from twistedcaldav import pdmonster
+from twistedcaldav import memcachepool
log = Logger()
@@ -500,6 +502,17 @@
SudoDirectoryService.recordType_sudoers)
#
+ # Configure Memcached Client Pool
+ #
+ if config.Memcached["ClientEnabled"]:
+ memcachepool.installPool(
+ IPv4Address(
+ 'TCP',
+ config.Memcached["BindAddress"],
+ config.Memcached["Port"]),
+ config.Memcached["MaxClients"])
+
+ #
# Setup Resource hierarchy
#
@@ -781,6 +794,10 @@
config.ThreadPoolSize))
reactor.suggestThreadPoolSize(config.ThreadPoolSize)
+ log.info("Suggesting new max clients for memcache.")
+ memcachepool.getCachePool().suggestMaxClients(
+ config.Memcached["MaxClients"])
+
signal.signal(signal.SIGHUP, sighup_handler)
#def sigusr1_handler(num, frame):
Modified: CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_cache.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_cache.py 2008-06-03 22:25:25 UTC (rev 2529)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_cache.py 2008-06-03 23:30:08 UTC (rev 2530)
@@ -88,8 +88,10 @@
class MemCacheChangeNotifierTests(TestCase):
def setUp(self):
self.memcache = InMemoryMemcacheProtocol()
- self.ccn = MemcacheChangeNotifier(StubURLResource(':memory:'))
- MemcacheChangeNotifier._memcacheProtocol = self.memcache
+ self.ccn = MemcacheChangeNotifier(
+ StubURLResource(':memory:'),
+ cachePool=self.memcache)
+
self.ccn._newCacheToken = instancemethod(_newCacheToken,
self.ccn,
MemcacheChangeNotifier)
@@ -297,7 +299,7 @@
super(MemcacheResponseCacheTests, self).setUp()
memcacheStub = InMemoryMemcacheProtocol()
- self.rc = MemcacheResponseCache(None, None, None, None)
+ self.rc = MemcacheResponseCache(None, cachePool=memcacheStub)
self.rc.logger.setLevel('debug')
self.tokens = {}
@@ -328,7 +330,3 @@
(self.expected_response[0],
dict(list(self.expected_response[1].getAllRawHeaders())),
self.expected_response[2]))))
-
- self.rc._memcacheProtocol = memcacheStub
-
-
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080603/5ed7776e/attachment-0001.htm
More information about the calendarserver-changes
mailing list