[CalendarServer-changes] [2442] CalendarServer/branches/unified-cache/twistedcaldav
source_changes at macosforge.org
source_changes at macosforge.org
Thu May 22 16:13:52 PDT 2008
Revision: 2442
http://trac.macosforge.org/projects/calendarserver/changeset/2442
Author: dreid at apple.com
Date: 2008-05-22 16:13:51 -0700 (Thu, 22 May 2008)
Log Message:
-----------
Use Memcached, note this doesn't actually set up a protocol instance to talk to memcached yet.
Modified Paths:
--------------
CalendarServer/branches/unified-cache/twistedcaldav/cache.py
CalendarServer/branches/unified-cache/twistedcaldav/cluster.py
CalendarServer/branches/unified-cache/twistedcaldav/root.py
CalendarServer/branches/unified-cache/twistedcaldav/test/test_cache.py
Modified: CalendarServer/branches/unified-cache/twistedcaldav/cache.py
===================================================================
--- CalendarServer/branches/unified-cache/twistedcaldav/cache.py 2008-05-22 14:10:05 UTC (rev 2441)
+++ CalendarServer/branches/unified-cache/twistedcaldav/cache.py 2008-05-22 23:13:51 UTC (rev 2442)
@@ -17,12 +17,15 @@
import uuid
import time
import os
+import hashlib
+import cPickle
from zope.interface import implements
from twisted.python.filepath import FilePath
-from twisted.python import log
+from twisted.internet.defer import succeed, fail
+
from twisted.web2.iweb import IResource
from twisted.web2.dav import davxml
from twisted.web2.dav.util import allDataFromStream
@@ -59,32 +62,46 @@
self._propertyStore.set(property)
+class BaseResponseCache(LoggingMixIn):
+ """
+ A base class which provides some common operations
+ """
+ def _principalURI(self, principal):
+ return str(principal.children[0])
-class ResponseCache(LoggingMixIn):
- """
- An object that caches responses to given requests.
+ def _requestKey(self, request):
+ if hasattr(request, 'cacheKey'):
+ return succeed(request.cacheKey)
- @ivar CACHE_TIMEOUT: The number of seconds that a cache entry is valid,
- (default 3600 seconds or 1 hour).
+ def _getKey(requestBody):
+ if requestBody is not None:
+ request.stream = MemoryStream(requestBody)
+ request.stream.doStartReading = None
- @ivar _docroot: An L{FilePath} that points to the document root.
- @ivar _responses: A C{dict} with (request-method, request-uri,
- principal-uri) keys and (principal-token, uri-token, cache-time,
- response) values.
- """
+ request.cacheKey = (request.method,
+ self._principalURI(request.authnUser),
+ request.uri,
+ request.headers.getHeader('depth'),
+ hash(requestBody))
- CACHE_SIZE = 1000
- propertyStoreFactory = xattrPropertyStore
+ return request.cacheKey
- def __init__(self, docroot, cacheSize=None):
- self._docroot = docroot
- self._responses = {}
+ d = allDataFromStream(request.stream)
+ d.addCallback(_getKey)
+ return d
- if cacheSize is not None:
- self.CACHE_SIZE = cacheSize
+ def _getTokensInThread(self, principalURI, requestURI):
+ def _getTokens():
+ pToken = self._tokenForURI(principalURI)
+ uToken = self._tokenForURI(requestURI)
+ return (pToken, uToken)
+
+ return deferToThread(_getTokens)
+
+
def _tokenForURI(self, uri):
"""
Get a property store for the given C{uri}.
@@ -112,10 +129,37 @@
pass
- def _principalURI(self, principal):
- return str(principal.children[0])
+ def _getResponseBody(self, key, response):
+ d1 = allDataFromStream(response.stream)
+ d1.addCallback(lambda responseBody: (key, responseBody))
+ return d1
+
+class ResponseCache(BaseResponseCache):
+ """
+ An object that caches responses to given requests.
+
+ @ivar CACHE_TIMEOUT: The number of seconds that a cache entry is valid,
+ (default 3600 seconds or 1 hour).
+
+ @ivar _docroot: An L{FilePath} that points to the document root.
+ @ivar _responses: A C{dict} with (request-method, request-uri,
+ principal-uri) keys and (principal-token, uri-token, cache-time,
+ response) values.
+ """
+
+ CACHE_SIZE = 1000
+ propertyStoreFactory = xattrPropertyStore
+
+ def __init__(self, docroot, cacheSize=None):
+ self._docroot = docroot
+ self._responses = {}
+
+ if cacheSize is not None:
+ self.CACHE_SIZE = cacheSize
+
+
def _time(self):
"""
Return the current time in seconds since the epoch
@@ -133,13 +177,8 @@
@return: An L{IResponse} or C{None} if the response has not been cached.
"""
- def _getTokens(pURI, rURI):
- pToken = self._tokenForURI(pURI)
- uToken = self._tokenForURI(rURI)
+ principalURI = self._principalURI(request.authnUser)
- return (pToken, uToken)
-
-
def _checkTokens((newPrincipalToken, newURIToken), key):
(principalToken,
uriToken,
@@ -179,37 +218,20 @@
return responseObj
- def _returnRequest(requestBody):
-
- if requestBody is not None:
- request.stream = MemoryStream(requestBody)
- request.stream.doStartReading = None
-
- principalURI = self._principalURI(request.authnUser)
-
- key = (request.method,
- request.uri,
- principalURI,
- request.headers.getHeader('depth'),
- hash(requestBody))
-
+ def _checkKeyInCache(key):
self.log_debug("Checking cache for: %r" % (key,))
- request.cacheKey = key
-
if key not in self._responses:
self.log_debug("Not in cache: %r" % (key,))
return None
- d1 = deferToThread(_getTokens,
- principalURI,
- request.uri)
+ d1 = self._getTokensInThread(principalURI, request.uri)
d1.addCallback(_checkTokens, key)
return d1
- d = allDataFromStream(request.stream)
- d.addCallback(_returnRequest)
+ d = self._requestKey(request)
+ d.addCallback(_checkKeyInCache)
return d
@@ -226,27 +248,9 @@
@return: A deferred that fires when the response has been added
to the cache.
"""
- def _getRequestBody(responseBody):
- d1 = allDataFromStream(request.stream)
- d1.addCallback(lambda requestBody: (requestBody, responseBody))
- return d1
-
- def _cacheResponse((requestBody, responseBody)):
- if requestBody is not None:
- request.stream = MemoryStream(requestBody)
- request.stream.doStartReading = None
-
+ def _cacheResponse((key, responseBody)):
principalURI = self._principalURI(request.authnUser)
- if hasattr(request, 'cacheKey'):
- key = request.cacheKey
- else:
- key = (request.method,
- request.uri,
- principalURI,
- request.headers.getHeader('depth'),
- hash(requestBody))
-
self.log_debug("Adding to cache: %r = %r" % (key,
response))
@@ -282,9 +286,100 @@
response.stream = MemoryStream(responseBody)
return response
+ d = self._requestKey(request)
+ d.addCallback(self._getResponseBody, response)
+ d.addCallback(_cacheResponse)
+ return d
- d = allDataFromStream(response.stream)
- d.addCallback(_getRequestBody)
+
+
+class MemcacheResponseCache(BaseResponseCache):
+ def __init__(self, docroot, host, port, reactor):
+ self._docroot = docroot
+ self._host = host
+ self._port = port
+ if reactor is None:
+ from twisted.internet import reactor
+
+ self._reactor = reactor
+
+ self._memcacheProtocol = None
+
+
+ def _getMemcacheProtocol(self):
+ if self._memcacheProtocol is not None:
+ return succeed(self._memcacheProtocol)
+
+ def getResponseForRequest(self, request):
+ def _checkTokens(curTokens, expectedTokens, (code, headers, body)):
+ if curTokens != expectedTokens:
+ return None
+
+ return Response(code, headers=headers,
+ stream=MemoryStream(body))
+
+ def _unpickleResponse((flags, value), key):
+ if value is None:
+ self.log_debug("Not in cache: %r" % (key,))
+ return None
+
+ self.log_debug("Found in cache: %r = %r" % (key, value))
+
+ (principalToken, uriToken,
+ resp) = cPickle.loads(value)
+
+ d2 = self._getTokensInThread(self._principalURI(request.authnUser),
+ request.uri)
+
+ d2.addCallback(_checkTokens, (principalToken, uriToken), resp)
+
+ return d2
+
+ def _getCache(proto, key):
+ self.log_debug("Checking cache for: %r" % (key,))
+ d1 = proto.get(key)
+ return d1.addCallback(_unpickleResponse, key)
+
+ def _getProtocol(key):
+ request.cacheKey = key = hashlib.md5(':'.join(
+ [str(t) for t in key])).hexdigest()
+
+ return self._getMemcacheProtocol().addCallback(_getCache, key)
+
+ d = self._requestKey(request)
+ d.addCallback(_getProtocol)
+ return d
+
+
+ def cacheResponseForRequest(self, request, response):
+ def _setCacheEntry(proto, key, cacheEntry):
+ self.log_debug("Adding to cache: %r = %r" % (key, cacheEntry))
+ return proto.set(key, cacheEntry).addCallback(
+ lambda _: response)
+
+ def _cacheResponse((key, responseBody)):
+ key = hashlib.md5(':'.join([str(t) for t in key])).hexdigest()
+
+ principalURI = self._principalURI(request.authnUser)
+
+ response.headers.removeHeader('date')
+ response.stream = MemoryStream(responseBody)
+
+ cacheEntry = cPickle.dumps(
+ (self._tokenForURI(principalURI),
+ self._tokenForURI(request.uri),
+ (response.code,
+ response.headers,
+ responseBody)))
+
+ d1 = self._getMemcacheProtocol()
+ d1.addCallback(_setCacheEntry, key, cacheEntry)
+
+ return d1
+
+
+ d = self._requestKey(request)
+ d.addCallback(self._getResponseBody, response)
d.addCallback(_cacheResponse)
return d
@@ -303,6 +398,7 @@
return self, []
+
class PropfindCacheMixin(object):
def http_PROPFIND(self, request):
def _cacheResponse(responseCache, response):
Modified: CalendarServer/branches/unified-cache/twistedcaldav/cluster.py
===================================================================
--- CalendarServer/branches/unified-cache/twistedcaldav/cluster.py 2008-05-22 14:10:05 UTC (rev 2441)
+++ CalendarServer/branches/unified-cache/twistedcaldav/cluster.py 2008-05-22 23:13:51 UTC (rev 2442)
@@ -293,6 +293,8 @@
if config.Memcached["ServerEnabled"]:
+ log.msg("Adding memcached service")
+
memcachedArgv = [
config.Memcached["memcached"],
'-p', str(config.Memcached["Port"]),
Modified: CalendarServer/branches/unified-cache/twistedcaldav/root.py
===================================================================
--- CalendarServer/branches/unified-cache/twistedcaldav/root.py 2008-05-22 14:10:05 UTC (rev 2441)
+++ CalendarServer/branches/unified-cache/twistedcaldav/root.py 2008-05-22 23:13:51 UTC (rev 2442)
@@ -52,8 +52,16 @@
self.contentFilters = []
- self.responseCache = ResponseCache(self.fp, config.ResponseCacheSize)
+ if config.Memcached['ClientEnabled']:
+ self.responseCache = MemcacheResponseCache(
+ self.fp,
+ config.Memcached['BindAddresses'],
+ config.Memcached['Port'])
+ else:
+ self.responseCache = ResponseCache(self.fp,
+ config.ResponseCacheSize)
+
if config.ResponseCompression:
from twisted.web2.filter import gzip
self.contentFilters.append((gzip.gzipfilter, True))
Modified: CalendarServer/branches/unified-cache/twistedcaldav/test/test_cache.py
===================================================================
--- CalendarServer/branches/unified-cache/twistedcaldav/test/test_cache.py 2008-05-22 14:10:05 UTC (rev 2441)
+++ CalendarServer/branches/unified-cache/twistedcaldav/test/test_cache.py 2008-05-22 23:13:51 UTC (rev 2442)
@@ -15,8 +15,12 @@
##
from new import instancemethod
+import hashlib
+import cPickle
from twisted.trial.unittest import TestCase
+from twisted.internet.defer import succeed, fail
+from twisted.python.failure import Failure
from twisted.python.filepath import FilePath
@@ -28,6 +32,7 @@
from twistedcaldav.cache import CacheChangeNotifier
from twistedcaldav.cache import CacheTokensProperty
from twistedcaldav.cache import ResponseCache
+from twistedcaldav.cache import MemcacheResponseCache
from twistedcaldav.test.util import InMemoryPropertyStore
@@ -42,7 +47,7 @@
class StubRequest(object):
- def __init__(self, method, uri, authnUser, depth=1, body=None):
+ def __init__(self, method, uri, authnUser, depth='1', body=None):
self.method = method
self.uri = uri
self.authnUser = davxml.Principal(davxml.HRef.fromString(authnUser))
@@ -65,6 +70,28 @@
+class InMemoryMemcacheProtocol(object):
+ def __init__(self):
+ self._cache = {}
+
+
+ def get(self, key):
+ if key not in self._cache:
+ return succeed((0, None))
+
+ return succeed(self._cache[key])
+
+
+ def set(self, key, value, flags=0, expireTime=0):
+ try:
+ self._cache[key] = (flags, value)
+ return succeed(True)
+
+ except Exception, err:
+ return fail(Failure())
+
+
+
class CacheChangeNotifierTests(TestCase):
def setUp(self):
self.props = InMemoryPropertyStore()
@@ -90,37 +117,11 @@
-class ResponseCacheTests(TestCase):
- def setUp(self):
- self.tokens = {
- '/calendars/users/cdaboo/': 'uriToken0',
- '/principals/users/cdaboo/': 'principalToken0',
- '/principals/users/dreid/': 'principalTokenX'}
-
- self.rc = ResponseCache(None)
- self.rc._tokenForURI = self.tokens.get
-
- self.rc._time = (lambda:0)
-
- self.expected_response = (200, Headers({}), "Foo")
-
- expected_key = (
- 'PROPFIND',
- '/calendars/users/cdaboo/',
- '/principals/users/cdaboo/',
- 1,
- hash('foobar'),
- )
-
- self.rc._responses[expected_key] = (
- 'principalToken0', 'uriToken0', 0, self.expected_response)
-
- self.rc._accessList = [expected_key]
-
-
+class BaseCacheTestMixin(object):
def assertResponse(self, response, expected):
self.assertEquals(response.code, expected[0])
- self.assertEquals(response.headers, expected[1])
+ self.assertEquals(set(response.headers.getAllRawHeaders()),
+ set(expected[1].getAllRawHeaders()))
d = allDataFromStream(response.stream)
d.addCallback(self.assertEquals, expected[2])
@@ -176,7 +177,7 @@
'PROPFIND',
'/calendars/users/cdaboo/',
'/principals/users/cdaboo/',
- depth=0))
+ depth='0'))
d.addCallback(self.assertEquals, None)
return d
@@ -219,6 +220,37 @@
return d
+
+class ResponseCacheTests(BaseCacheTestMixin, TestCase):
+ def setUp(self):
+ self.tokens = {
+ '/calendars/users/cdaboo/': 'uriToken0',
+ '/principals/users/cdaboo/': 'principalToken0',
+ '/principals/users/dreid/': 'principalTokenX'}
+
+ self.rc = ResponseCache(None)
+ self.rc._tokenForURI = self.tokens.get
+
+ self.rc._time = (lambda:0)
+
+ self.expected_response = (200, Headers({}), "Foo")
+
+ expected_key = (
+ 'PROPFIND',
+ '/principals/users/cdaboo/',
+ '/calendars/users/cdaboo/',
+ '1',
+ hash('foobar'),
+ )
+
+ self.rc._responses[expected_key] = (
+ 'principalToken0', 'uriToken0', '0', self.expected_response)
+
+ self.rc._accessList = [expected_key]
+
+
+
+
def test__tokenForURI(self):
docroot = FilePath(self.mktemp())
principal = docroot.child('principals').child('users').child('wsanchez')
@@ -260,37 +292,65 @@
return d
- def test_cacheExpirationBenchmark(self):
- self.rc.CACHE_SIZE = 70000
- import time
+# def test_cacheExpirationBenchmark(self):
+# self.rc.CACHE_SIZE = 70000
+# import time
- self.rc._responses = {}
- self.rc._accessList = []
+# self.rc._responses = {}
+# self.rc._accessList = []
- for x in xrange(0, self.rc.CACHE_SIZE):
- req = StubRequest('PROPFIND',
- '/principals/users/user%d' % (x,),
- '/principals/users/user%d' % (x,))
- self.rc._responses[req] = (
- 'pTokenUser%d' % (x,), 'rTokenUser%d' % (x,), 0,
- (200, {}, 'foobar'))
+# for x in xrange(0, self.rc.CACHE_SIZE):
+# req = StubRequest('PROPFIND',
+# '/principals/users/user%d' % (x,),
+# '/principals/users/user%d' % (x,))
+# self.rc._responses[req] = (
+# 'pTokenUser%d' % (x,), 'rTokenUser%d' % (x,), 0,
+# (200, {}, 'foobar'))
- self.rc._accessList.append(req)
+# self.rc._accessList.append(req)
- def assertTime(result, startTime):
- duration = time.time() - startTime
+# def assertTime(result, startTime):
+# duration = time.time() - startTime
- self.failUnless(
- duration < 0.01,
- "Took to long to add to the cache: %r" % (duration,))
+# self.failUnless(
+# duration < 0.01,
+# "Took to long to add to the cache: %r" % (duration,))
- startTime = time.time()
+# startTime = time.time()
- d = self.rc.cacheResponseForRequest(
- StubRequest('PROPFIND',
- '/principals/users/dreid/',
- '/principals/users/dreid/'),
- StubResponse(200, {}, 'Foobar'))
+# d = self.rc.cacheResponseForRequest(
+# StubRequest('PROPFIND',
+# '/principals/users/dreid/',
+# '/principals/users/dreid/'),
+# StubResponse(200, {}, 'Foobar'))
- d.addCallback(assertTime, startTime)
- return d
+# d.addCallback(assertTime, startTime)
+# return d
+
+
+class MemcacheResponseCacheTests(BaseCacheTestMixin, TestCase):
+ def setUp(self):
+ memcacheStub = InMemoryMemcacheProtocol()
+ self.rc = MemcacheResponseCache(None, None, None, None)
+ self.tokens = {}
+
+ self.tokens['/calendars/users/cdaboo/'] = 'uriToken0'
+ self.tokens['/principals/users/cdaboo/'] = 'principalToken0'
+ self.tokens['/principals/users/dreid/'] = 'principalTokenX'
+
+ self.rc._tokenForURI = self.tokens.get
+
+ self.expected_response = (200, Headers({}), "Foo")
+
+ expected_key = hashlib.md5(':'.join([str(t) for t in (
+ 'PROPFIND',
+ '/principals/users/cdaboo/',
+ '/calendars/users/cdaboo/',
+ '1',
+ hash('foobar'),
+ )])).hexdigest()
+
+ memcacheStub._cache[expected_key] = (0, cPickle.dumps((
+ 'principalToken0', 'uriToken0', self.expected_response)))
+
+ self.rc._memcacheProtocol = memcacheStub
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080522/32e65d2b/attachment-0001.htm
More information about the calendarserver-changes
mailing list