[CalendarServer-changes] [4014] CalendarServer/trunk
source_changes at macosforge.org
source_changes at macosforge.org
Mon Apr 13 18:49:36 PDT 2009
Revision: 4014
http://trac.macosforge.org/projects/calendarserver/changeset/4014
Author: sagen at apple.com
Date: 2009-04-13 18:49:36 -0700 (Mon, 13 Apr 2009)
Log Message:
-----------
Add gets_multi to memcache client
Modified Paths:
--------------
CalendarServer/trunk/memcache.py
CalendarServer/trunk/twistedcaldav/directory/cachingdirectory.py
CalendarServer/trunk/twistedcaldav/test/util.py
Modified: CalendarServer/trunk/memcache.py
===================================================================
--- CalendarServer/trunk/memcache.py 2009-04-14 00:56:22 UTC (rev 4013)
+++ CalendarServer/trunk/memcache.py 2009-04-14 01:49:36 UTC (rev 4014)
@@ -49,6 +49,8 @@
import os
import re
import types
+from twistedcaldav.config import config
+
try:
import cPickle as pickle
except ImportError:
@@ -90,6 +92,11 @@
Memcache connection error
"""
+class NotFoundError(MemcacheError):
+ """
+ NOT_FOUND error
+ """
+
try:
# Only exists in Python 2.4+
from threading import local
@@ -98,7 +105,27 @@
class local(object):
pass
+class ClientFactory(object):
+ # unit tests should set this to True to enable the fake test cache
+ allowTestCache = False
+
+ @classmethod
+ def getClient(cls, servers, debug=0, pickleProtocol=0,
+ pickler=pickle.Pickler, unpickler=pickle.Unpickler,
+ pload=None, pid=None):
+
+ if config.Memcached.ClientEnabled:
+ return Client(servers, debug=debug, pickleProtocol=pickleProtocol,
+ pickler=pickler, unpickler=unpickler, pload=pload, pid=pid)
+ elif cls.allowTestCache:
+ return TestClient(servers, debug=debug,
+ pickleProtocol=pickleProtocol, pickler=pickler,
+ unpickler=unpickler, pload=pload, pid=pid)
+ else:
+ return None
+
+
class Client(local):
"""
Object representing a pool of memcache servers.
@@ -681,20 +708,21 @@
if not server:
return 0
- if token:
- cmd = "cas"
self._statlog(cmd)
store_info = self._val_to_store_info(val, min_compress_len)
if not store_info: return(0)
- if token:
- fullcmd = "cas %s %d %d %d %d\r\n%s" % (key, store_info[0], time, store_info[1], token, store_info[2])
+ if token is not None:
+ fullcmd = "cas %s %d %d %d %s\r\n%s" % (key, store_info[0], time, store_info[1], token, store_info[2])
else:
fullcmd = "%s %s %d %d %d\r\n%s" % (cmd, key, store_info[0], time, store_info[1], store_info[2])
try:
server.send_cmd(fullcmd)
- return (server.expect("STORED") == "STORED")
+ result = server.expect("STORED")
+ if (result == "NOT_FOUND"):
+ raise NotFoundError(key)
+ return (result == "STORED")
except socket.error, msg:
if type(msg) is types.TupleType: msg = msg[1]
@@ -736,7 +764,7 @@
if not server:
raise MemcacheError("Memcache connection error")
- self._statlog('gets')
+ self._statlog('get')
try:
server.send_cmd("gets %s" % key)
@@ -823,6 +851,46 @@
server.mark_dead(msg)
return retvals
+ def gets_multi(self, keys, key_prefix=''):
+ '''
+ Retrieves multiple keys from the memcache doing just one query.
+ See also L{gets} and L{get_multi}.
+ '''
+
+ self._statlog('gets_multi')
+
+ server_keys, prefixed_to_orig_key = self._map_and_prefix_keys(keys, key_prefix)
+
+ # send out all requests on each server before reading anything
+ dead_servers = []
+ for server in server_keys.iterkeys():
+ try:
+ server.send_cmd("gets %s" % " ".join(server_keys[server]))
+ except socket.error, msg:
+ if type(msg) is types.TupleType: msg = msg[1]
+ server.mark_dead(msg)
+ dead_servers.append(server)
+
+ # if any servers died on the way, don't expect them to respond.
+ for server in dead_servers:
+ del server_keys[server]
+
+ retvals = {}
+ for server in server_keys.iterkeys():
+ try:
+ line = server.readline()
+ while line and line != 'END':
+ rkey, flags, rlen, cas_token = self._expectvalue_cas(server, line)
+ # Bo Yang reports that this can sometimes be None
+ if rkey is not None:
+ val = self._recv_value(server, flags, rlen)
+ retvals[prefixed_to_orig_key[rkey]] = (val, cas_token) # un-prefix returned key.
+ line = server.readline()
+ except (_Error, socket.error), msg:
+ if type(msg) is types.TupleType: msg = msg[1]
+ server.mark_dead(msg)
+ return retvals
+
def _expectvalue(self, server, line=None):
if not line:
line = server.readline()
@@ -840,10 +908,9 @@
line = server.readline()
if line[:5] == 'VALUE':
- resp, rkey, flags, len, token = line.split()
+ resp, rkey, flags, len, rtoken = line.split()
flags = int(flags)
rlen = int(len)
- rtoken = int(token)
return (rkey, flags, rlen, rtoken)
else:
return (None, None, None, None)
@@ -884,6 +951,157 @@
return val
+
+class TestClient(Client):
+ """
+ Fake memcache client for unit tests
+
+ """
+
+ def __init__(self, servers, debug=0, pickleProtocol=0,
+ pickler=pickle.Pickler, unpickler=pickle.Unpickler,
+ pload=None, pid=None):
+
+ local.__init__(self)
+
+ super(TestClient, self).__init__(servers, debug=debug,
+ pickleProtocol=pickleProtocol, pickler=pickler, unpickler=unpickler,
+ pload=pload, pid=pid)
+
+ self.data = {}
+ self.token = 0
+
+
+
+ def get_stats(self):
+ raise NotImplementedError()
+
+ def get_slabs(self):
+ raise NotImplementedError()
+
+ def flush_all(self):
+ raise NotImplementedError()
+
+ def forget_dead_hosts(self):
+ raise NotImplementedError()
+
+ def delete_multi(self, keys, time=0, key_prefix=''):
+ '''
+ Delete multiple keys in the memcache doing just one query.
+
+ >>> notset_keys = mc.set_multi({'key1' : 'val1', 'key2' : 'val2'})
+ >>> mc.get_multi(['key1', 'key2']) == {'key1' : 'val1', 'key2' : 'val2'}
+ 1
+ >>> mc.delete_multi(['key1', 'key2'])
+ 1
+ >>> mc.get_multi(['key1', 'key2']) == {}
+ 1
+ '''
+
+ self._statlog('delete_multi')
+ for key in keys:
+ key = key_prefix + key
+ del self.data[key]
+ return 1
+
+ def delete(self, key, time=0):
+ '''Deletes a key from the memcache.
+
+ @return: Nonzero on success.
+ @param time: number of seconds any subsequent set / update commands should fail. Defaults to 0 for no delay.
+ @rtype: int
+ '''
+ check_key(key)
+ del self.data[key]
+ return 1
+
+
+ def incr(self, key, delta=1):
+ raise NotImplementedError()
+
+ def decr(self, key, delta=1):
+ raise NotImplementedError()
+
+ def add(self, key, val, time = 0, min_compress_len = 0):
+ raise NotImplementedError()
+
+ def append(self, key, val, time=0, min_compress_len=0):
+ raise NotImplementedError()
+
+ def prepend(self, key, val, time=0, min_compress_len=0):
+ raise NotImplementedError()
+
+ def replace(self, key, val, time=0, min_compress_len=0):
+ raise NotImplementedError()
+
+ def set(self, key, val, time=0, min_compress_len=0, token=None):
+ self._statlog('set')
+ return self._set("set", key, val, time, min_compress_len, token=token)
+
+ def set_multi(self, mapping, time=0, key_prefix='', min_compress_len=0):
+ self._statlog('set_multi')
+ for key, val in mapping.iteritems():
+ key = key_prefix + key
+ self._set("set", key, val, time, min_compress_len)
+ return []
+
+ def _set(self, cmd, key, val, time, min_compress_len = 0, token=None):
+ check_key(key)
+ self._statlog(cmd)
+
+ serialized = pickle.dumps(val, pickle.HIGHEST_PROTOCOL)
+
+ if token is not None:
+ if self.data.has_key(key):
+ stored_val, stored_token = self.data[key]
+ if token != stored_token:
+ return False
+
+ self.data[key] = (serialized, str(self.token))
+ self.token += 1
+
+ return True
+
+ def get(self, key):
+ check_key(key)
+
+ self._statlog('get')
+ if self.data.has_key(key):
+ stored_val, stored_token = self.data[key]
+ val = pickle.loads(stored_val)
+ return val
+ return None
+
+
+ def gets(self, key):
+ check_key(key)
+ if self.data.has_key(key):
+ stored_val, stored_token = self.data[key]
+ val = pickle.loads(stored_val)
+ return (val, stored_token)
+ return (None, None)
+
+ def get_multi(self, keys, key_prefix=''):
+ self._statlog('get_multi')
+
+ results = {}
+ for key in keys:
+ key = key_prefix + key
+ val = self.get(key)
+ results[key] = val
+ return results
+
+ def gets_multi(self, keys, key_prefix=''):
+ self._statlog('gets_multi')
+ results = {}
+ for key in keys:
+ key = key_prefix + key
+ result = self.gets(key)
+ if result[1] is not None:
+ results[key] = result
+ return results
+
+
class _Host:
_DEAD_RETRY = 30 # number of seconds before retrying a dead server.
_SOCKET_TIMEOUT = 3 # number of seconds before sockets timeout.
Modified: CalendarServer/trunk/twistedcaldav/directory/cachingdirectory.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/directory/cachingdirectory.py 2009-04-14 00:56:22 UTC (rev 4013)
+++ CalendarServer/trunk/twistedcaldav/directory/cachingdirectory.py 2009-04-14 01:49:36 UTC (rev 4014)
@@ -149,14 +149,12 @@
def _getMemcacheClient(self, refresh=False):
if refresh or not hasattr(self, "memcacheClient"):
- self.memcacheClient = memcache.Client(['%s:%s' %
+ self.memcacheClient = memcache.ClientFactory.getClient(['%s:%s' %
(config.Memcached.BindAddress, config.Memcached.Port)],
debug=0, pickleProtocol=2)
return self.memcacheClient
def memcacheSet(self, key, record):
- if not config.Memcached.ClientEnabled:
- return
hideService = isinstance(record, DirectoryRecord)
@@ -179,8 +177,6 @@
record.service = self
def memcacheGet(self, key):
- if not config.Memcached.ClientEnabled:
- return None
key = base64.b64encode(key)
try:
Modified: CalendarServer/trunk/twistedcaldav/test/util.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/test/util.py 2009-04-14 00:56:22 UTC (rev 4013)
+++ CalendarServer/trunk/twistedcaldav/test/util.py 2009-04-14 01:49:36 UTC (rev 4014)
@@ -25,6 +25,7 @@
from twistedcaldav.config import config
from twistedcaldav.static import CalDAVFile
+import memcache
import twisted.web2.dav.test.util
@@ -40,6 +41,7 @@
config.DataRoot = dataroot
config.Memcached.ClientEnabled = False
config.Memcached.ServerEnabled = False
+ memcache.ClientFactory.allowTestCache = True
def createHierarchy(self, structure):
root = self.mktemp()
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20090413/cd6132d0/attachment-0001.html>
More information about the calendarserver-changes
mailing list