[CalendarServer-changes] [2530] CalendarServer/branches/memcache-reconnect

source_changes at macosforge.org source_changes at macosforge.org
Tue Jun 3 16:30:09 PDT 2008


Revision: 2530
          http://trac.macosforge.org/projects/calendarserver/changeset/2530
Author:   dreid at apple.com
Date:     2008-06-03 16:30:08 -0700 (Tue, 03 Jun 2008)

Log Message:
-----------
Memcache connection pool integration.

Modified Paths:
--------------
    CalendarServer/branches/memcache-reconnect/conf/caldavd-test.plist
    CalendarServer/branches/memcache-reconnect/twistedcaldav/cache.py
    CalendarServer/branches/memcache-reconnect/twistedcaldav/config.py
    CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py
    CalendarServer/branches/memcache-reconnect/twistedcaldav/root.py
    CalendarServer/branches/memcache-reconnect/twistedcaldav/tap.py
    CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_cache.py

Modified: CalendarServer/branches/memcache-reconnect/conf/caldavd-test.plist
===================================================================
--- CalendarServer/branches/memcache-reconnect/conf/caldavd-test.plist	2008-06-03 22:25:25 UTC (rev 2529)
+++ CalendarServer/branches/memcache-reconnect/conf/caldavd-test.plist	2008-06-03 23:30:08 UTC (rev 2530)
@@ -396,6 +396,8 @@
     <true/>
     <key>ClientEnabled</key>
     <true/>
+    <key>MaxClients</key>
+    <integer>5</integer>
     <key>memcached</key>
     <string>../memcached-1.2.5/_root/bin/memcached</string>
     <key>Options</key>

Modified: CalendarServer/branches/memcache-reconnect/twistedcaldav/cache.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/cache.py	2008-06-03 22:25:25 UTC (rev 2529)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/cache.py	2008-06-03 23:30:08 UTC (rev 2530)
@@ -36,7 +36,7 @@
 from twisted.internet.threads import deferToThread
 
 from twistedcaldav.log import LoggingMixIn
-from twistedcaldav.memcache import MemCacheProtocol
+from twistedcaldav.memcachepool import CachePoolUserMixIn
 from twistedcaldav.config import config
 
 
@@ -69,52 +69,26 @@
             self.uri)
 
 
-
-class MemcacheChangeNotifier(LoggingMixIn):
-    _memcacheProtocol = None
-
-    def __init__(self, resource):
+class MemcacheChangeNotifier(LoggingMixIn, CachePoolUserMixIn):
+    def __init__(self, resource, cachePool=None):
         self._resource = resource
-        self._host = config.Memcached['BindAddress']
-        self._port = config.Memcached['Port']
+        self._cachePool = cachePool
 
-        from twisted.internet import reactor
-        self._reactor = reactor
 
-
     def _newCacheToken(self):
         return str(uuid.uuid4())
 
 
-    def _getMemcacheProtocol(self):
-        if MemcacheChangeNotifier._memcacheProtocol is not None:
-            return succeed(self._memcacheProtocol)
-
-        d = ClientCreator(self._reactor, MemCacheProtocol).connectTCP(
-            self._host,
-            self._port)
-
-        def _cacheProtocol(proto):
-            MemcacheChangeNotifier._memcacheProtocol = proto
-            return proto
-
-        return d.addCallback(_cacheProtocol)
-
-
     def changed(self):
         """
         Change the cache token for a resource
 
         return: A L{Deferred} that fires when the token has been changed.
         """
-        def _updateCacheToken(proto):
-            return proto.set('cacheToken:%s' % (self._resource.url(),),
-                             self._newCacheToken())
-
         self.log_debug("Changing Cache Token for %r" % (self._resource.url(),))
-        d = self._getMemcacheProtocol()
-        d.addCallback(_updateCacheToken)
-        return d
+        return self.getCachePool().set(
+            'cacheToken:%s' % (self._resource.url(),),
+            self._newCacheToken())
 
 
 
@@ -183,19 +157,12 @@
 
 
 
-class MemcacheResponseCache(BaseResponseCache):
-    def __init__(self, docroot, host, port, reactor=None):
+class MemcacheResponseCache(BaseResponseCache, CachePoolUserMixIn):
+    def __init__(self, docroot, cachePool=None):
         self._docroot = docroot
-        self._host = host
-        self._port = port
-        if reactor is None:
-            from twisted.internet import reactor
+        self._cachePool = cachePool
 
-        self._reactor = reactor
 
-        self._memcacheProtocol = None
-
-
     def _tokenForURI(self, uri):
         """
         Get a property store for the given C{uri}.
@@ -204,8 +171,7 @@
         @return: A C{str} representing the token for the URI.
         """
 
-        return self._getMemcacheProtocol().addCallback(
-            lambda p: p.get('cacheToken:%s' % (uri,)))
+        return self.getCachePool().get('cacheToken:%s' % (uri,))
 
 
     def _getTokens(self, request):
@@ -223,21 +189,6 @@
         return d
 
 
-    def _getMemcacheProtocol(self):
-        if self._memcacheProtocol is not None:
-            return succeed(self._memcacheProtocol)
-
-        d = ClientCreator(self._reactor, MemCacheProtocol).connectTCP(
-            self._host,
-            self._port)
-
-        def _cacheProtocol(proto):
-            self._memcacheProtocol = proto
-            return proto
-
-        return d.addCallback(_cacheProtocol)
-
-
     def _hashedRequestKey(self, request):
         def _hashKey(key):
             oldkey = key
@@ -292,31 +243,23 @@
 
             return d2
 
-        def _getCache(proto, key):
+        def _getCached(key):
             self.log_debug("Checking cache for: %r" % (key,))
-            d1 = proto.get(key)
+            d1 = self.getCachePool().get(key)
             return d1.addCallback(_unpickleResponse, key)
 
-        def _getProtocol(key):
-            return self._getMemcacheProtocol().addCallback(_getCache, key)
-
         def _handleExceptions(f):
             f.trap(URINotFoundException)
             self.log_warn("Could not locate URI: %r" % f.value)
             return None
 
         d = self._hashedRequestKey(request)
-        d.addCallback(_getProtocol)
+        d.addCallback(_getCached)
         d.addErrback(_handleExceptions)
         return d
 
 
     def cacheResponseForRequest(self, request, response):
-        def _setCacheEntry(proto, key, cacheEntry):
-            self.log_debug("Adding to cache: %r = %r" % (key, cacheEntry))
-            return proto.set(key, cacheEntry).addCallback(
-                lambda _: response)
-
         def _makeCacheEntry((pToken, uToken), (key, responseBody)):
             cacheEntry = cPickle.dumps(
                 (pToken,
@@ -325,9 +268,9 @@
                   dict(list(response.headers.getAllRawHeaders())),
                   responseBody)))
 
-            d2 = self._getMemcacheProtocol()
-            d2.addCallback(_setCacheEntry, key, cacheEntry)
-            return d2
+            self.log_debug("Adding to cache: %r = %r" % (key, cacheEntry))
+            return self.getCachePool().set(key, cacheEntry).addCallback(
+                lambda _: response)
 
         def _cacheResponse((key, responseBody)):
             principalURI = self._principalURI(request.authnUser)

Modified: CalendarServer/branches/memcache-reconnect/twistedcaldav/config.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/config.py	2008-06-03 22:25:25 UTC (rev 2529)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/config.py	2008-06-03 23:30:08 UTC (rev 2530)
@@ -206,6 +206,7 @@
     "ListenBacklog": 50,
 
     "Memcached": {
+        "MaxClients": 5,
         "ClientEnabled": False,
         "ServerEnabled": False,
         "BindAddress": "127.0.0.1",

Modified: CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py	2008-06-03 22:25:25 UTC (rev 2529)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/memcachepool.py	2008-06-03 23:30:08 UTC (rev 2530)
@@ -18,6 +18,7 @@
 from twisted.internet.defer import Deferred, fail
 from twisted.internet.protocol import ReconnectingClientFactory
 
+from twistedcaldav.log import LoggingMixIn
 from twistedcaldav.memcache import MemCacheProtocol, NoSuchCommand
 
 
@@ -42,7 +43,7 @@
 
 
 
-class MemCacheClientFactory(ReconnectingClientFactory):
+class MemCacheClientFactory(ReconnectingClientFactory, LoggingMixIn):
     """
     A client factory for MemCache that reconnects and notifies a pool of it's
     state.
@@ -65,6 +66,7 @@
         """
         Notify the connectionPool that we've lost our connection.
         """
+        self.log_error("MemCache connection lost: %s" % (reason,))
         if self._protocolInstance is not None:
             self.connectionPool.clientBusy(self._protocolInstance)
 
@@ -78,6 +80,7 @@
         """
         Notify the connectionPool that we're unable to connect
         """
+        self.log_error("MemCache connection failed: %s" % (reason,))
         if self._protocolInstance is not None:
             self.connectionPool.clientBusy(self._protocolInstance)
 
@@ -101,7 +104,7 @@
 
 
 
-class MemCachePool(object):
+class MemCachePool(LoggingMixIn):
     """
     A connection pool for MemCacheProtocol instances.
 
@@ -146,6 +149,10 @@
 
         @return: A L{Deferred} that fires with the L{IProtocol} instance.
         """
+        self.log_debug("Initating new client connection to: %r" % (
+                self._serverAddress,))
+        self._logClientStats()
+
         factory = self.clientFactory()
 
         factory.connectionPool = self
@@ -220,6 +227,12 @@
         return d
 
 
+    def _logClientStats(self):
+        self.log_debug("Clients #free: %d, #busy: %d" % (
+                len(self._freeClients),
+                len(self._busyClients)))
+
+
     def clientGone(self, client):
         """
         Notify that the given client is to be removed from the pool completely.
@@ -232,7 +245,10 @@
         elif client in self._freeClients:
             self._freeClients.remove(client)
 
+        self.log_debug("Removed client: %r" % (client,))
+        self._logClientStats()
 
+
     def clientBusy(self, client):
         """
         Notify that the given client is being used to complete a request.
@@ -244,7 +260,10 @@
 
         self._busyClients.add(client)
 
+        self.log_debug("Busied client: %r" % (client,))
+        self._logClientStats()
 
+
     def clientFree(self, client):
         """
         Notify that the given client is free to handle more requests.
@@ -263,7 +282,10 @@
 
             _ign_d.addCallback(d.callback)
 
+        self.log_debug("Freed client: %r" % (client,))
+        self._logClientStats()
 
+
     def suggestMaxClients(self, maxClients):
         """
         Suggest the maximum number of concurrently connected clients.
@@ -272,3 +294,49 @@
             should keep open.
         """
         self._maxClients = maxClients
+
+
+    def get(self, *args, **kwargs):
+        return self.performRequest('get', *args, **kwargs)
+
+
+    def set(self, *args, **kwargs):
+        return self.performRequest('set', *args, **kwargs)
+
+
+    def delete(self, *args, **kwargs):
+        return self.performRequest('delete', *args, **kwargs)
+
+
+    def add(self, *args, **kwargs):
+        return self.performRequest('add', *args, **kwargs)
+
+
+
+class CachePoolUserMixIn(object):
+    """
+    A mixin that returns a saved cache pool or fetches the default cache pool.
+
+    @ivar _cachePool: A saved cachePool.
+    """
+    _cachePool = None
+
+    def getCachePool(self):
+        if self._cachePool is None:
+            return defaultCachePool()
+
+        return self._cachePool
+
+
+
+_memCachePool = None
+
+def installPool(serverAddress, maxClients=5, reactor=None):
+    global _memCachePool
+    _memCachePool = MemCachePool(serverAddress,
+                                   maxClients=5,
+                                   reactor=None)
+
+
+def defaultCachePool():
+    return _memCachePool

Modified: CalendarServer/branches/memcache-reconnect/twistedcaldav/root.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/root.py	2008-06-03 22:25:25 UTC (rev 2529)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/root.py	2008-06-03 23:30:08 UTC (rev 2530)
@@ -57,10 +57,7 @@
         self.contentFilters = []
 
         if config.Memcached['ClientEnabled']:
-            self.responseCache = MemcacheResponseCache(
-                self.fp,
-                config.Memcached['BindAddress'],
-                config.Memcached['Port'])
+            self.responseCache = MemcacheResponseCache(self.fp)
 
             CalendarHomeFile.cacheNotifierFactory = MemcacheChangeNotifier
             DirectoryPrincipalResource.cacheNotifierFactory = MemcacheChangeNotifier

Modified: CalendarServer/branches/memcache-reconnect/twistedcaldav/tap.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/tap.py	2008-06-03 22:25:25 UTC (rev 2529)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/tap.py	2008-06-03 23:30:08 UTC (rev 2530)
@@ -20,6 +20,7 @@
 from zope.interface import implements
 
 from twisted.internet import reactor
+from twisted.internet.address import IPv4Address
 
 from twisted.python.log import FileLogObserver
 from twisted.python.usage import Options, UsageError
@@ -56,6 +57,7 @@
 from twistedcaldav.static import TimezoneServiceFile
 from twistedcaldav.timezones import TimezoneCache
 from twistedcaldav import pdmonster
+from twistedcaldav import memcachepool
 
 log = Logger()
 
@@ -500,6 +502,17 @@
                 SudoDirectoryService.recordType_sudoers)
 
         #
+        # Configure Memcached Client Pool
+        #
+        if config.Memcached["ClientEnabled"]:
+            memcachepool.installPool(
+                IPv4Address(
+                    'TCP',
+                    config.Memcached["BindAddress"],
+                    config.Memcached["Port"]),
+                config.Memcached["MaxClients"])
+
+        #
         # Setup Resource hierarchy
         #
 
@@ -781,6 +794,10 @@
                         config.ThreadPoolSize))
                 reactor.suggestThreadPoolSize(config.ThreadPoolSize)
 
+                log.info("Suggesting new max clients for memcache.")
+                memcachepool.getCachePool().suggestMaxClients(
+                        config.Memcached["MaxClients"])
+
             signal.signal(signal.SIGHUP, sighup_handler)
 
             #def sigusr1_handler(num, frame):

Modified: CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_cache.py
===================================================================
--- CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_cache.py	2008-06-03 22:25:25 UTC (rev 2529)
+++ CalendarServer/branches/memcache-reconnect/twistedcaldav/test/test_cache.py	2008-06-03 23:30:08 UTC (rev 2530)
@@ -88,8 +88,10 @@
 class MemCacheChangeNotifierTests(TestCase):
     def setUp(self):
         self.memcache = InMemoryMemcacheProtocol()
-        self.ccn = MemcacheChangeNotifier(StubURLResource(':memory:'))
-        MemcacheChangeNotifier._memcacheProtocol = self.memcache
+        self.ccn = MemcacheChangeNotifier(
+            StubURLResource(':memory:'),
+            cachePool=self.memcache)
+
         self.ccn._newCacheToken = instancemethod(_newCacheToken,
                                                  self.ccn,
                                                  MemcacheChangeNotifier)
@@ -297,7 +299,7 @@
         super(MemcacheResponseCacheTests, self).setUp()
 
         memcacheStub = InMemoryMemcacheProtocol()
-        self.rc = MemcacheResponseCache(None, None, None, None)
+        self.rc = MemcacheResponseCache(None, cachePool=memcacheStub)
         self.rc.logger.setLevel('debug')
         self.tokens = {}
 
@@ -328,7 +330,3 @@
             (self.expected_response[0],
              dict(list(self.expected_response[1].getAllRawHeaders())),
              self.expected_response[2]))))
-
-        self.rc._memcacheProtocol = memcacheStub
-
-

-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080603/5ed7776e/attachment-0001.htm 


More information about the calendarserver-changes mailing list