[CalendarServer-changes] [2442] CalendarServer/branches/unified-cache/twistedcaldav

source_changes at macosforge.org source_changes at macosforge.org
Thu May 22 16:13:52 PDT 2008


Revision: 2442
          http://trac.macosforge.org/projects/calendarserver/changeset/2442
Author:   dreid at apple.com
Date:     2008-05-22 16:13:51 -0700 (Thu, 22 May 2008)

Log Message:
-----------
Use Memcached, note this doesn't actually set up a protocol instance to talk to memcached yet.

Modified Paths:
--------------
    CalendarServer/branches/unified-cache/twistedcaldav/cache.py
    CalendarServer/branches/unified-cache/twistedcaldav/cluster.py
    CalendarServer/branches/unified-cache/twistedcaldav/root.py
    CalendarServer/branches/unified-cache/twistedcaldav/test/test_cache.py

Modified: CalendarServer/branches/unified-cache/twistedcaldav/cache.py
===================================================================
--- CalendarServer/branches/unified-cache/twistedcaldav/cache.py	2008-05-22 14:10:05 UTC (rev 2441)
+++ CalendarServer/branches/unified-cache/twistedcaldav/cache.py	2008-05-22 23:13:51 UTC (rev 2442)
@@ -17,12 +17,15 @@
 import uuid
 import time
 import os
+import hashlib
+import cPickle
 
 from zope.interface import implements
 
 from twisted.python.filepath import FilePath
-from twisted.python import log
 
+from twisted.internet.defer import succeed, fail
+
 from twisted.web2.iweb import IResource
 from twisted.web2.dav import davxml
 from twisted.web2.dav.util import allDataFromStream
@@ -59,32 +62,46 @@
         self._propertyStore.set(property)
 
 
+class BaseResponseCache(LoggingMixIn):
+    """
+    A base class which provides some common operations
+    """
+    def _principalURI(self, principal):
+        return str(principal.children[0])
 
 
-class ResponseCache(LoggingMixIn):
-    """
-    An object that caches responses to given requests.
+    def _requestKey(self, request):
+        if hasattr(request, 'cacheKey'):
+            return succeed(request.cacheKey)
 
-    @ivar CACHE_TIMEOUT: The number of seconds that a cache entry is valid,
-        (default 3600 seconds or 1 hour).
+        def _getKey(requestBody):
+            if requestBody is not None:
+                request.stream = MemoryStream(requestBody)
+                request.stream.doStartReading = None
 
-    @ivar _docroot: An L{FilePath} that points to the document root.
-    @ivar _responses: A C{dict} with (request-method, request-uri,
-         principal-uri) keys and (principal-token, uri-token, cache-time,
-         response) values.
-    """
+            request.cacheKey = (request.method,
+                                self._principalURI(request.authnUser),
+                                request.uri,
+                                request.headers.getHeader('depth'),
+                                hash(requestBody))
 
-    CACHE_SIZE = 1000
-    propertyStoreFactory = xattrPropertyStore
+            return request.cacheKey
 
-    def __init__(self, docroot, cacheSize=None):
-        self._docroot = docroot
-        self._responses = {}
+        d = allDataFromStream(request.stream)
+        d.addCallback(_getKey)
+        return d
 
-        if cacheSize is not None:
-            self.CACHE_SIZE = cacheSize
 
+    def _getTokensInThread(self, principalURI, requestURI):
+        def _getTokens():
+            pToken = self._tokenForURI(principalURI)
+            uToken = self._tokenForURI(requestURI)
 
+            return (pToken, uToken)
+
+        return deferToThread(_getTokens)
+
+
     def _tokenForURI(self, uri):
         """
         Get a property store for the given C{uri}.
@@ -112,10 +129,37 @@
             pass
 
 
-    def _principalURI(self, principal):
-        return str(principal.children[0])
+    def _getResponseBody(self, key, response):
+        d1 = allDataFromStream(response.stream)
+        d1.addCallback(lambda responseBody: (key, responseBody))
+        return d1
 
 
+
+class ResponseCache(BaseResponseCache):
+    """
+    An object that caches responses to given requests.
+
+    @ivar CACHE_TIMEOUT: The number of seconds that a cache entry is valid,
+        (default 3600 seconds or 1 hour).
+
+    @ivar _docroot: An L{FilePath} that points to the document root.
+    @ivar _responses: A C{dict} with (request-method, request-uri,
+         principal-uri) keys and (principal-token, uri-token, cache-time,
+         response) values.
+    """
+
+    CACHE_SIZE = 1000
+    propertyStoreFactory = xattrPropertyStore
+
+    def __init__(self, docroot, cacheSize=None):
+        self._docroot = docroot
+        self._responses = {}
+
+        if cacheSize is not None:
+            self.CACHE_SIZE = cacheSize
+
+
     def _time(self):
         """
         Return the current time in seconds since the epoch
@@ -133,13 +177,8 @@
 
         @return: An L{IResponse} or C{None} if the response has not been cached.
         """
-        def _getTokens(pURI, rURI):
-            pToken = self._tokenForURI(pURI)
-            uToken = self._tokenForURI(rURI)
+        principalURI = self._principalURI(request.authnUser)
 
-            return (pToken, uToken)
-
-
         def _checkTokens((newPrincipalToken, newURIToken), key):
             (principalToken,
              uriToken,
@@ -179,37 +218,20 @@
             return responseObj
 
 
-        def _returnRequest(requestBody):
-
-            if requestBody is not None:
-                request.stream = MemoryStream(requestBody)
-                request.stream.doStartReading = None
-
-            principalURI = self._principalURI(request.authnUser)
-
-            key = (request.method,
-                   request.uri,
-                   principalURI,
-                   request.headers.getHeader('depth'),
-                   hash(requestBody))
-
+        def _checkKeyInCache(key):
             self.log_debug("Checking cache for: %r" % (key,))
 
-            request.cacheKey = key
-
             if key not in self._responses:
                 self.log_debug("Not in cache: %r" % (key,))
                 return None
 
-            d1 = deferToThread(_getTokens,
-                               principalURI,
-                               request.uri)
+            d1 = self._getTokensInThread(principalURI, request.uri)
             d1.addCallback(_checkTokens, key)
 
             return d1
 
-        d = allDataFromStream(request.stream)
-        d.addCallback(_returnRequest)
+        d = self._requestKey(request)
+        d.addCallback(_checkKeyInCache)
         return d
 
 
@@ -226,27 +248,9 @@
         @return: A deferred that fires when the response has been added
             to the cache.
         """
-        def _getRequestBody(responseBody):
-            d1 = allDataFromStream(request.stream)
-            d1.addCallback(lambda requestBody: (requestBody, responseBody))
-            return d1
-
-        def _cacheResponse((requestBody, responseBody)):
-            if requestBody is not None:
-                request.stream = MemoryStream(requestBody)
-                request.stream.doStartReading = None
-
+        def _cacheResponse((key, responseBody)):
             principalURI = self._principalURI(request.authnUser)
 
-            if hasattr(request, 'cacheKey'):
-                key = request.cacheKey
-            else:
-                key = (request.method,
-                       request.uri,
-                       principalURI,
-                       request.headers.getHeader('depth'),
-                       hash(requestBody))
-
             self.log_debug("Adding to cache: %r = %r" % (key,
                                                          response))
 
@@ -282,9 +286,100 @@
             response.stream = MemoryStream(responseBody)
             return response
 
+        d = self._requestKey(request)
+        d.addCallback(self._getResponseBody, response)
+        d.addCallback(_cacheResponse)
+        return d
 
-        d = allDataFromStream(response.stream)
-        d.addCallback(_getRequestBody)
+
+
+class MemcacheResponseCache(BaseResponseCache):
+    def __init__(self, docroot, host, port, reactor):
+        self._docroot = docroot
+        self._host = host
+        self._port = port
+        if reactor is None:
+            from twisted.internet import reactor
+
+        self._reactor = reactor
+
+        self._memcacheProtocol = None
+
+
+    def _getMemcacheProtocol(self):
+        if self._memcacheProtocol is not None:
+            return succeed(self._memcacheProtocol)
+
+    def getResponseForRequest(self, request):
+        def _checkTokens(curTokens, expectedTokens, (code, headers, body)):
+            if curTokens != expectedTokens:
+                return None
+
+            return Response(code, headers=headers,
+                            stream=MemoryStream(body))
+
+        def _unpickleResponse((flags, value), key):
+            if value is None:
+                self.log_debug("Not in cache: %r" % (key,))
+                return None
+
+            self.log_debug("Found in cache: %r = %r" % (key, value))
+
+            (principalToken, uriToken,
+             resp) = cPickle.loads(value)
+
+            d2 = self._getTokensInThread(self._principalURI(request.authnUser),
+                                         request.uri)
+
+            d2.addCallback(_checkTokens, (principalToken, uriToken), resp)
+
+            return d2
+
+        def _getCache(proto, key):
+            self.log_debug("Checking cache for: %r" % (key,))
+            d1 = proto.get(key)
+            return d1.addCallback(_unpickleResponse, key)
+
+        def _getProtocol(key):
+            request.cacheKey = key = hashlib.md5(':'.join(
+                    [str(t) for t in key])).hexdigest()
+
+            return self._getMemcacheProtocol().addCallback(_getCache, key)
+
+        d = self._requestKey(request)
+        d.addCallback(_getProtocol)
+        return d
+
+
+    def cacheResponseForRequest(self, request, response):
+        def _setCacheEntry(proto, key, cacheEntry):
+            self.log_debug("Adding to cache: %r = %r" % (key, cacheEntry))
+            return proto.set(key, cacheEntry).addCallback(
+                lambda _: response)
+
+        def _cacheResponse((key, responseBody)):
+            key = hashlib.md5(':'.join([str(t) for t in key])).hexdigest()
+
+            principalURI = self._principalURI(request.authnUser)
+
+            response.headers.removeHeader('date')
+            response.stream = MemoryStream(responseBody)
+
+            cacheEntry = cPickle.dumps(
+                (self._tokenForURI(principalURI),
+                 self._tokenForURI(request.uri),
+                 (response.code,
+                  response.headers,
+                  responseBody)))
+
+            d1 = self._getMemcacheProtocol()
+            d1.addCallback(_setCacheEntry, key, cacheEntry)
+
+            return d1
+
+
+        d = self._requestKey(request)
+        d.addCallback(self._getResponseBody, response)
         d.addCallback(_cacheResponse)
         return d
 
@@ -303,6 +398,7 @@
         return self, []
 
 
+
 class PropfindCacheMixin(object):
     def http_PROPFIND(self, request):
         def _cacheResponse(responseCache, response):

Modified: CalendarServer/branches/unified-cache/twistedcaldav/cluster.py
===================================================================
--- CalendarServer/branches/unified-cache/twistedcaldav/cluster.py	2008-05-22 14:10:05 UTC (rev 2441)
+++ CalendarServer/branches/unified-cache/twistedcaldav/cluster.py	2008-05-22 23:13:51 UTC (rev 2442)
@@ -293,6 +293,8 @@
 
 
     if config.Memcached["ServerEnabled"]:
+        log.msg("Adding memcached service")
+
         memcachedArgv = [
                 config.Memcached["memcached"],
                 '-p', str(config.Memcached["Port"]),

Modified: CalendarServer/branches/unified-cache/twistedcaldav/root.py
===================================================================
--- CalendarServer/branches/unified-cache/twistedcaldav/root.py	2008-05-22 14:10:05 UTC (rev 2441)
+++ CalendarServer/branches/unified-cache/twistedcaldav/root.py	2008-05-22 23:13:51 UTC (rev 2442)
@@ -52,8 +52,16 @@
 
         self.contentFilters = []
 
-        self.responseCache = ResponseCache(self.fp, config.ResponseCacheSize)
+        if config.Memcached['ClientEnabled']:
+            self.responseCache = MemcacheResponseCache(
+                self.fp,
+                config.Memcached['BindAddresses'],
+                config.Memcached['Port'])
 
+        else:
+            self.responseCache = ResponseCache(self.fp,
+                                               config.ResponseCacheSize)
+
         if config.ResponseCompression:
             from twisted.web2.filter import gzip
             self.contentFilters.append((gzip.gzipfilter, True))

Modified: CalendarServer/branches/unified-cache/twistedcaldav/test/test_cache.py
===================================================================
--- CalendarServer/branches/unified-cache/twistedcaldav/test/test_cache.py	2008-05-22 14:10:05 UTC (rev 2441)
+++ CalendarServer/branches/unified-cache/twistedcaldav/test/test_cache.py	2008-05-22 23:13:51 UTC (rev 2442)
@@ -15,8 +15,12 @@
 ##
 
 from new import instancemethod
+import hashlib
+import cPickle
 
 from twisted.trial.unittest import TestCase
+from twisted.internet.defer import succeed, fail
+from twisted.python.failure import Failure
 
 from twisted.python.filepath import FilePath
 
@@ -28,6 +32,7 @@
 from twistedcaldav.cache import CacheChangeNotifier
 from twistedcaldav.cache import CacheTokensProperty
 from twistedcaldav.cache import ResponseCache
+from twistedcaldav.cache import MemcacheResponseCache
 
 from twistedcaldav.test.util import InMemoryPropertyStore
 
@@ -42,7 +47,7 @@
 
 
 class StubRequest(object):
-    def __init__(self, method, uri, authnUser, depth=1, body=None):
+    def __init__(self, method, uri, authnUser, depth='1', body=None):
         self.method = method
         self.uri = uri
         self.authnUser = davxml.Principal(davxml.HRef.fromString(authnUser))
@@ -65,6 +70,28 @@
 
 
 
+class InMemoryMemcacheProtocol(object):
+    def __init__(self):
+        self._cache = {}
+
+
+    def get(self, key):
+        if key not in self._cache:
+            return succeed((0, None))
+
+        return succeed(self._cache[key])
+
+
+    def set(self, key, value, flags=0, expireTime=0):
+        try:
+            self._cache[key] = (flags, value)
+            return succeed(True)
+
+        except Exception, err:
+            return fail(Failure())
+
+
+
 class CacheChangeNotifierTests(TestCase):
     def setUp(self):
         self.props = InMemoryPropertyStore()
@@ -90,37 +117,11 @@
 
 
 
-class ResponseCacheTests(TestCase):
-    def setUp(self):
-        self.tokens = {
-                '/calendars/users/cdaboo/': 'uriToken0',
-                '/principals/users/cdaboo/': 'principalToken0',
-                '/principals/users/dreid/': 'principalTokenX'}
-
-        self.rc = ResponseCache(None)
-        self.rc._tokenForURI = self.tokens.get
-
-        self.rc._time = (lambda:0)
-
-        self.expected_response = (200, Headers({}), "Foo")
-
-        expected_key = (
-                'PROPFIND',
-                '/calendars/users/cdaboo/',
-                '/principals/users/cdaboo/',
-                1,
-                hash('foobar'),
-                )
-
-        self.rc._responses[expected_key] = (
-            'principalToken0', 'uriToken0', 0, self.expected_response)
-
-        self.rc._accessList = [expected_key]
-
-
+class BaseCacheTestMixin(object):
     def assertResponse(self, response, expected):
         self.assertEquals(response.code, expected[0])
-        self.assertEquals(response.headers, expected[1])
+        self.assertEquals(set(response.headers.getAllRawHeaders()),
+                          set(expected[1].getAllRawHeaders()))
 
         d = allDataFromStream(response.stream)
         d.addCallback(self.assertEquals, expected[2])
@@ -176,7 +177,7 @@
                 'PROPFIND',
                 '/calendars/users/cdaboo/',
                 '/principals/users/cdaboo/',
-                depth=0))
+                depth='0'))
 
         d.addCallback(self.assertEquals, None)
         return d
@@ -219,6 +220,37 @@
         return d
 
 
+
+class ResponseCacheTests(BaseCacheTestMixin, TestCase):
+    def setUp(self):
+        self.tokens = {
+                '/calendars/users/cdaboo/': 'uriToken0',
+                '/principals/users/cdaboo/': 'principalToken0',
+                '/principals/users/dreid/': 'principalTokenX'}
+
+        self.rc = ResponseCache(None)
+        self.rc._tokenForURI = self.tokens.get
+
+        self.rc._time = (lambda:0)
+
+        self.expected_response = (200, Headers({}), "Foo")
+
+        expected_key = (
+                'PROPFIND',
+                '/principals/users/cdaboo/',
+                '/calendars/users/cdaboo/',
+                '1',
+                hash('foobar'),
+                )
+
+        self.rc._responses[expected_key] = (
+            'principalToken0', 'uriToken0', '0', self.expected_response)
+
+        self.rc._accessList = [expected_key]
+
+
+
+
     def test__tokenForURI(self):
         docroot = FilePath(self.mktemp())
         principal = docroot.child('principals').child('users').child('wsanchez')
@@ -260,37 +292,65 @@
         return d
 
 
-    def test_cacheExpirationBenchmark(self):
-        self.rc.CACHE_SIZE = 70000
-        import time
+#     def test_cacheExpirationBenchmark(self):
+#         self.rc.CACHE_SIZE = 70000
+#         import time
 
-        self.rc._responses = {}
-        self.rc._accessList = []
+#         self.rc._responses = {}
+#         self.rc._accessList = []
 
-        for x in xrange(0, self.rc.CACHE_SIZE):
-            req = StubRequest('PROPFIND',
-                              '/principals/users/user%d' % (x,),
-                              '/principals/users/user%d' % (x,))
-            self.rc._responses[req] = (
-                'pTokenUser%d' % (x,), 'rTokenUser%d' % (x,), 0,
-                (200, {}, 'foobar'))
+#         for x in xrange(0, self.rc.CACHE_SIZE):
+#             req = StubRequest('PROPFIND',
+#                               '/principals/users/user%d' % (x,),
+#                               '/principals/users/user%d' % (x,))
+#             self.rc._responses[req] = (
+#                 'pTokenUser%d' % (x,), 'rTokenUser%d' % (x,), 0,
+#                 (200, {}, 'foobar'))
 
-            self.rc._accessList.append(req)
+#             self.rc._accessList.append(req)
 
-        def assertTime(result, startTime):
-            duration = time.time() - startTime
+#         def assertTime(result, startTime):
+#             duration = time.time() - startTime
 
-            self.failUnless(
-                duration < 0.01,
-                "Took to long to add to the cache: %r" % (duration,))
+#             self.failUnless(
+#                 duration < 0.01,
+#                 "Took to long to add to the cache: %r" % (duration,))
 
-        startTime = time.time()
+#         startTime = time.time()
 
-        d = self.rc.cacheResponseForRequest(
-            StubRequest('PROPFIND',
-                        '/principals/users/dreid/',
-                        '/principals/users/dreid/'),
-            StubResponse(200, {}, 'Foobar'))
+#         d = self.rc.cacheResponseForRequest(
+#             StubRequest('PROPFIND',
+#                         '/principals/users/dreid/',
+#                         '/principals/users/dreid/'),
+#             StubResponse(200, {}, 'Foobar'))
 
-        d.addCallback(assertTime, startTime)
-        return d
+#         d.addCallback(assertTime, startTime)
+#         return d
+
+
+class MemcacheResponseCacheTests(BaseCacheTestMixin, TestCase):
+    def setUp(self):
+        memcacheStub = InMemoryMemcacheProtocol()
+        self.rc = MemcacheResponseCache(None, None, None, None)
+        self.tokens = {}
+
+        self.tokens['/calendars/users/cdaboo/'] = 'uriToken0'
+        self.tokens['/principals/users/cdaboo/'] = 'principalToken0'
+        self.tokens['/principals/users/dreid/'] = 'principalTokenX'
+
+        self.rc._tokenForURI = self.tokens.get
+
+        self.expected_response = (200, Headers({}), "Foo")
+
+        expected_key = hashlib.md5(':'.join([str(t) for t in (
+                'PROPFIND',
+                '/principals/users/cdaboo/',
+                '/calendars/users/cdaboo/',
+                '1',
+                hash('foobar'),
+                )])).hexdigest()
+
+        memcacheStub._cache[expected_key] = (0, cPickle.dumps((
+            'principalToken0', 'uriToken0', self.expected_response)))
+
+        self.rc._memcacheProtocol = memcacheStub

-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080522/32e65d2b/attachment-0001.htm 


More information about the calendarserver-changes mailing list