[CalendarServer-changes] [4014] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Mon Apr 13 18:49:36 PDT 2009


Revision: 4014
          http://trac.macosforge.org/projects/calendarserver/changeset/4014
Author:   sagen at apple.com
Date:     2009-04-13 18:49:36 -0700 (Mon, 13 Apr 2009)
Log Message:
-----------
Add gets_multi to memcache client

Modified Paths:
--------------
    CalendarServer/trunk/memcache.py
    CalendarServer/trunk/twistedcaldav/directory/cachingdirectory.py
    CalendarServer/trunk/twistedcaldav/test/util.py

Modified: CalendarServer/trunk/memcache.py
===================================================================
--- CalendarServer/trunk/memcache.py	2009-04-14 00:56:22 UTC (rev 4013)
+++ CalendarServer/trunk/memcache.py	2009-04-14 01:49:36 UTC (rev 4014)
@@ -49,6 +49,8 @@
 import os
 import re
 import types
+from twistedcaldav.config import config
+
 try:
     import cPickle as pickle
 except ImportError:
@@ -90,6 +92,11 @@
     Memcache connection error
     """
 
+class NotFoundError(MemcacheError):
+    """
+    NOT_FOUND error
+    """
+
 try:
     # Only exists in Python 2.4+
     from threading import local
@@ -98,7 +105,27 @@
     class local(object):
         pass
 
+class ClientFactory(object):
 
+    # unit tests should set this to True to enable the fake test cache
+    allowTestCache = False
+
+    @classmethod
+    def getClient(cls, servers, debug=0, pickleProtocol=0,
+                 pickler=pickle.Pickler, unpickler=pickle.Unpickler,
+                 pload=None, pid=None):
+
+        if config.Memcached.ClientEnabled:
+            return Client(servers, debug=debug, pickleProtocol=pickleProtocol,
+                pickler=pickler, unpickler=unpickler, pload=pload, pid=pid)
+        elif cls.allowTestCache:
+            return TestClient(servers, debug=debug,
+                pickleProtocol=pickleProtocol, pickler=pickler,
+                unpickler=unpickler, pload=pload, pid=pid)
+        else:
+            return None
+
+
 class Client(local):
     """
     Object representing a pool of memcache servers.
@@ -681,20 +708,21 @@
         if not server:
             return 0
 
-        if token:
-            cmd = "cas"
         self._statlog(cmd)
 
         store_info = self._val_to_store_info(val, min_compress_len)
         if not store_info: return(0)
 
-        if token:
-            fullcmd = "cas %s %d %d %d %d\r\n%s" % (key, store_info[0], time, store_info[1], token, store_info[2])
+        if token is not None:
+            fullcmd = "cas %s %d %d %d %s\r\n%s" % (key, store_info[0], time, store_info[1], token, store_info[2])
         else:
             fullcmd = "%s %s %d %d %d\r\n%s" % (cmd, key, store_info[0], time, store_info[1], store_info[2])
         try:
             server.send_cmd(fullcmd)
-            return (server.expect("STORED") == "STORED")
+            result = server.expect("STORED")
+            if (result == "NOT_FOUND"):
+                raise NotFoundError(key)
+            return (result == "STORED")
 
         except socket.error, msg:
             if type(msg) is types.TupleType: msg = msg[1]
@@ -736,7 +764,7 @@
         if not server:
             raise MemcacheError("Memcache connection error")
 
-        self._statlog('gets')
+        self._statlog('get')
 
         try:
             server.send_cmd("gets %s" % key)
@@ -823,6 +851,46 @@
                 server.mark_dead(msg)
         return retvals
 
+    def gets_multi(self, keys, key_prefix=''):
+        '''
+        Retrieves multiple keys from the memcache doing just one query.
+        See also L{gets} and L{get_multi}.
+        '''
+
+        self._statlog('gets_multi')
+
+        server_keys, prefixed_to_orig_key = self._map_and_prefix_keys(keys, key_prefix)
+
+        # send out all requests on each server before reading anything
+        dead_servers = []
+        for server in server_keys.iterkeys():
+            try:
+                server.send_cmd("gets %s" % " ".join(server_keys[server]))
+            except socket.error, msg:
+                if type(msg) is types.TupleType: msg = msg[1]
+                server.mark_dead(msg)
+                dead_servers.append(server)
+
+        # if any servers died on the way, don't expect them to respond.
+        for server in dead_servers:
+            del server_keys[server]
+
+        retvals = {}
+        for server in server_keys.iterkeys():
+            try:
+                line = server.readline()
+                while line and line != 'END':
+                    rkey, flags, rlen, cas_token = self._expectvalue_cas(server, line)
+                    #  Bo Yang reports that this can sometimes be None
+                    if rkey is not None:
+                        val = self._recv_value(server, flags, rlen)
+                        retvals[prefixed_to_orig_key[rkey]] = (val, cas_token)   # un-prefix returned key.
+                    line = server.readline()
+            except (_Error, socket.error), msg:
+                if type(msg) is types.TupleType: msg = msg[1]
+                server.mark_dead(msg)
+        return retvals
+
     def _expectvalue(self, server, line=None):
         if not line:
             line = server.readline()
@@ -840,10 +908,9 @@
             line = server.readline()
 
         if line[:5] == 'VALUE':
-            resp, rkey, flags, len, token = line.split()
+            resp, rkey, flags, len, rtoken = line.split()
             flags = int(flags)
             rlen = int(len)
-            rtoken = int(token)
             return (rkey, flags, rlen, rtoken)
         else:
             return (None, None, None, None)
@@ -884,6 +951,157 @@
         return val
 
 
+
+class TestClient(Client):
+    """
+    Fake memcache client for unit tests
+
+    """
+
+    def __init__(self, servers, debug=0, pickleProtocol=0,
+                 pickler=pickle.Pickler, unpickler=pickle.Unpickler,
+                 pload=None, pid=None):
+
+        local.__init__(self)
+
+        super(TestClient, self).__init__(servers, debug=debug,
+            pickleProtocol=pickleProtocol, pickler=pickler, unpickler=unpickler,
+            pload=pload, pid=pid)
+
+        self.data = {}
+        self.token = 0
+
+
+
+    def get_stats(self):
+        raise NotImplementedError()
+
+    def get_slabs(self):
+        raise NotImplementedError()
+
+    def flush_all(self):
+        raise NotImplementedError()
+
+    def forget_dead_hosts(self):
+        raise NotImplementedError()
+
+    def delete_multi(self, keys, time=0, key_prefix=''):
+        '''
+        Delete multiple keys in the memcache doing just one query.
+
+        >>> notset_keys = mc.set_multi({'key1' : 'val1', 'key2' : 'val2'})
+        >>> mc.get_multi(['key1', 'key2']) == {'key1' : 'val1', 'key2' : 'val2'}
+        1
+        >>> mc.delete_multi(['key1', 'key2'])
+        1
+        >>> mc.get_multi(['key1', 'key2']) == {}
+        1
+        '''
+
+        self._statlog('delete_multi')
+        for key in keys:
+            key = key_prefix + key
+            del self.data[key]
+        return 1
+
+    def delete(self, key, time=0):
+        '''Deletes a key from the memcache.
+
+        @return: Nonzero on success.
+        @param time: number of seconds any subsequent set / update commands should fail. Defaults to 0 for no delay.
+        @rtype: int
+        '''
+        check_key(key)
+        del self.data[key]
+        return 1
+
+
+    def incr(self, key, delta=1):
+        raise NotImplementedError()
+
+    def decr(self, key, delta=1):
+        raise NotImplementedError()
+
+    def add(self, key, val, time = 0, min_compress_len = 0):
+        raise NotImplementedError()
+
+    def append(self, key, val, time=0, min_compress_len=0):
+        raise NotImplementedError()
+
+    def prepend(self, key, val, time=0, min_compress_len=0):
+        raise NotImplementedError()
+
+    def replace(self, key, val, time=0, min_compress_len=0):
+        raise NotImplementedError()
+
+    def set(self, key, val, time=0, min_compress_len=0, token=None):
+        self._statlog('set')
+        return self._set("set", key, val, time, min_compress_len, token=token)
+
+    def set_multi(self, mapping, time=0, key_prefix='', min_compress_len=0):
+        self._statlog('set_multi')
+        for key, val in mapping.iteritems():
+            key = key_prefix + key
+            self._set("set", key, val, time, min_compress_len)
+        return []
+
+    def _set(self, cmd, key, val, time, min_compress_len = 0, token=None):
+        check_key(key)
+        self._statlog(cmd)
+
+        serialized = pickle.dumps(val, pickle.HIGHEST_PROTOCOL)
+
+        if token is not None:
+            if self.data.has_key(key):
+                stored_val, stored_token = self.data[key]
+                if token != stored_token:
+                    return False
+
+        self.data[key] = (serialized, str(self.token))
+        self.token += 1
+
+        return True
+
+    def get(self, key):
+        check_key(key)
+
+        self._statlog('get')
+        if self.data.has_key(key):
+            stored_val, stored_token = self.data[key]
+            val = pickle.loads(stored_val)
+            return val
+        return None
+
+
+    def gets(self, key):
+        check_key(key)
+        if self.data.has_key(key):
+            stored_val, stored_token = self.data[key]
+            val = pickle.loads(stored_val)
+            return (val, stored_token)
+        return (None, None)
+
+    def get_multi(self, keys, key_prefix=''):
+        self._statlog('get_multi')
+
+        results = {}
+        for key in keys:
+            key = key_prefix + key
+            val = self.get(key)
+            results[key] = val
+        return results
+
+    def gets_multi(self, keys, key_prefix=''):
+        self._statlog('gets_multi')
+        results = {}
+        for key in keys:
+            key = key_prefix + key
+            result = self.gets(key)
+            if result[1] is not None:
+                results[key] = result
+        return results
+
+
 class _Host:
     _DEAD_RETRY = 30  # number of seconds before retrying a dead server.
     _SOCKET_TIMEOUT = 3  #  number of seconds before sockets timeout.

Modified: CalendarServer/trunk/twistedcaldav/directory/cachingdirectory.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/directory/cachingdirectory.py	2009-04-14 00:56:22 UTC (rev 4013)
+++ CalendarServer/trunk/twistedcaldav/directory/cachingdirectory.py	2009-04-14 01:49:36 UTC (rev 4014)
@@ -149,14 +149,12 @@
 
     def _getMemcacheClient(self, refresh=False):
         if refresh or not hasattr(self, "memcacheClient"):
-            self.memcacheClient = memcache.Client(['%s:%s' %
+            self.memcacheClient = memcache.ClientFactory.getClient(['%s:%s' %
                 (config.Memcached.BindAddress, config.Memcached.Port)],
                 debug=0, pickleProtocol=2)
         return self.memcacheClient
 
     def memcacheSet(self, key, record):
-        if not config.Memcached.ClientEnabled:
-            return
 
         hideService = isinstance(record, DirectoryRecord)
 
@@ -179,8 +177,6 @@
                 record.service = self
 
     def memcacheGet(self, key):
-        if not config.Memcached.ClientEnabled:
-            return None
 
         key = base64.b64encode(key)
         try:

Modified: CalendarServer/trunk/twistedcaldav/test/util.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/test/util.py	2009-04-14 00:56:22 UTC (rev 4013)
+++ CalendarServer/trunk/twistedcaldav/test/util.py	2009-04-14 01:49:36 UTC (rev 4014)
@@ -25,6 +25,7 @@
 
 from twistedcaldav.config import config
 from twistedcaldav.static import CalDAVFile
+import memcache
 
 import twisted.web2.dav.test.util
 
@@ -40,6 +41,7 @@
         config.DataRoot = dataroot
         config.Memcached.ClientEnabled = False
         config.Memcached.ServerEnabled = False
+        memcache.ClientFactory.allowTestCache = True
 
     def createHierarchy(self, structure):
         root = self.mktemp()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20090413/cd6132d0/attachment-0001.html>


More information about the calendarserver-changes mailing list