[CalendarServer-changes] [2974] CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav
source_changes at macosforge.org
source_changes at macosforge.org
Wed Sep 10 20:30:05 PDT 2008
Revision: 2974
http://trac.macosforge.org/projects/calendarserver/changeset/2974
Author: cdaboo at apple.com
Date: 2008-09-10 20:30:02 -0700 (Wed, 10 Sep 2008)
Log Message:
-----------
Memcache based deferred lock.
Modified Paths:
--------------
CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/memcacher.py
Added Paths:
-----------
CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/memcachelock.py
CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/test/test_memcachelock.py
Added: CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/memcachelock.py
===================================================================
--- CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/memcachelock.py (rev 0)
+++ CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/memcachelock.py 2008-09-11 03:30:02 UTC (rev 2974)
@@ -0,0 +1,117 @@
+##
+# Copyright (c) 2008 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twistedcaldav.memcacher import Memcacher
+from twisted.internet.defer import inlineCallbacks, Deferred, returnValue,\
+ succeed
+from twisted.internet import reactor
+import time
+
+class MemcacheLock(Memcacher):
+
+ def __init__(self, namespace, locktoken, timeout=5.0, retry_interval=0.1, expire_time=0):
+ """
+
+ @param namespace: a unique namespace for this lock's tokens
+ @type namespace: C{str}
+ @param locktoken: the name of the locktoken
+ @type locktoken: C{str}
+ @param timeout: the maximum time in seconds that the lock should block
+ @type timeout: C{float}
+ @param retry_interval: the interval to retry acquiring the lock
+ @type retry_interval: C{float}
+ @param expiryTime: the time in seconds for the lock to expire. Zero: no expiration.
+ @type expiryTime: C{float}
+ """
+
+ super(MemcacheLock, self).__init__(namespace)
+ self._locktoken = locktoken
+ self._timeout = timeout
+ self._retry_interval = retry_interval
+ self._expire_time = expire_time
+ self._hasLock = False
+
+ def _getMemcacheProtocol(self):
+
+ result = super(MemcacheLock, self)._getMemcacheProtocol()
+
+ if isinstance(result, Memcacher.nullCacher):
+ raise AssertionError("No implementation of shared locking without memcached")
+
+ return result
+
+ @inlineCallbacks
+ def acquire(self):
+
+ assert not self._hasLock, "Lock already acquired."
+
+ start_time = time.time()
+ waiting = False
+ while time.time() < start_time + self._timeout:
+
+ result = (yield self.add(self._locktoken, "1", self._expire_time))
+ if result:
+ self._hasLock = True
+ if waiting:
+ self.log_debug("Got lock after waiting on %s" % (self._locktoken,))
+ break
+
+ waiting = True
+ self.log_debug("Waiting for lock on %s" % (self._locktoken,))
+ pause = Deferred()
+ def _timedDeferred():
+ pause.callback(True)
+ reactor.callLater(self._retry_interval, _timedDeferred)
+ yield pause
+ else:
+ self.log_debug("Timed out lock after waiting on %s" % (self._locktoken,))
+ raise MemcacheLockTimeoutError()
+
+ returnValue(True)
+
+ def release(self):
+
+ assert self._hasLock, "Lock not acquired."
+
+ def _done(result):
+ self._hasLock = False
+ return result
+
+ d = self.delete(self._locktoken)
+ d.addCallback(_done)
+ return d
+
+ def clean(self):
+
+ if self._hasLock:
+ return self.delete(self._locktoken)
+ else:
+ return succeed(True)
+
+ def locked(self):
+ """
+ Test if the lock is currently being held.
+ """
+
+ def _gotit(value):
+ return value is not None
+
+ d = self.get(self._locktoken)
+ d.addCallback(_gotit)
+ return d
+
+class MemcacheLockTimeoutError(Exception):
+ pass
Modified: CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/memcacher.py
===================================================================
--- CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/memcacher.py 2008-09-10 22:00:39 UTC (rev 2973)
+++ CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/memcacher.py 2008-09-11 03:30:02 UTC (rev 2974)
@@ -34,7 +34,14 @@
def __init__(self):
self._cache = {}
- def set(self, key, value):
+ def add(self, key, value, expire_time=0):
+ if key not in self._cache:
+ self._cache[key] = value
+ return succeed(True)
+ else:
+ return succeed(False)
+
+ def set(self, key, value, expire_time=0):
self._cache[key] = value
return succeed(True)
@@ -59,9 +66,12 @@
does not actually cache anything.
"""
- def set(self, key, value):
+ def add(self, key, value, expire_time=0):
return succeed(True)
+ def set(self, key, value, expire_time=0):
+ return succeed(True)
+
def get(self, key):
return succeed((0, None,))
@@ -109,12 +119,21 @@
return self._memcacheProtocol
- def set(self, key, value):
+ def add(self, key, value, expire_time=0):
my_value = value
if self._pickle:
my_value = cPickle.dumps(value)
+ self.log_debug("Adding Cache Token for %r" % (key,))
+ return self._getMemcacheProtocol().add(
+ '%s:%s' % (self._namespace, key), my_value, expireTime=expire_time)
+
+ def set(self, key, value, expire_time=0):
+ my_value = value
+ if self._pickle:
+ my_value = cPickle.dumps(value)
+ self.log_debug("Setting Cache Token for %r" % (key,))
return self._getMemcacheProtocol().set(
- '%s:%s' % (self._namespace, key), my_value)
+ '%s:%s' % (self._namespace, key), my_value, expireTime=expire_time)
def get(self, key):
def _gotit(result):
Added: CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/test/test_memcachelock.py
===================================================================
--- CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/test/test_memcachelock.py (rev 0)
+++ CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/test/test_memcachelock.py 2008-09-11 03:30:02 UTC (rev 2974)
@@ -0,0 +1,170 @@
+# Copyright (c) 2007 Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Test the memcache client protocol.
+"""
+
+from twistedcaldav.memcache import MemCacheProtocol
+
+from twisted.trial.unittest import TestCase
+from twisted.test.proto_helpers import StringTransportWithDisconnection
+from twisted.internet.task import Clock
+from twisted.internet.defer import inlineCallbacks
+from twistedcaldav.memcachelock import MemcacheLock
+
+
+class MemCacheTestCase(TestCase):
+ """
+ Test client protocol class L{MemCacheProtocol}.
+ """
+
+ class FakedMemcacheLock(MemcacheLock):
+
+ def __init__(self, faked, namespace, locktoken, timeout=5.0, retry_interval=0.1, expire_time=0):
+ """
+
+ @param namespace: a unique namespace for this lock's tokens
+ @type namespace: C{str}
+ @param locktoken: the name of the locktoken
+ @type locktoken: C{str}
+ @param timeout: the maximum time in seconds that the lock should block
+ @type timeout: C{float}
+ @param retry_interval: the interval to retry acquiring the lock
+ @type retry_interval: C{float}
+ @param expiryTime: the time in seconds for the lock to expire. Zero: no expiration.
+ @type expiryTime: C{float}
+ """
+
+ super(MemCacheTestCase.FakedMemcacheLock, self).__init__(namespace, locktoken, timeout, retry_interval, expire_time)
+ self.faked = faked
+
+ def _getMemcacheProtocol(self):
+
+ return self.faked
+
+ def setUp(self):
+ """
+ Create a memcache client, connect it to a string protocol, and make it
+ use a deterministic clock.
+ """
+ self.proto = MemCacheProtocol()
+ self.clock = Clock()
+ self.proto.callLater = self.clock.callLater
+ self.transport = StringTransportWithDisconnection()
+ self.transport.protocol = self.proto
+ self.proto.makeConnection(self.transport)
+
+
+ def _test(self, d, send, recv, result):
+ """
+ Shortcut method for classic tests.
+
+ @param d: the resulting deferred from the memcache command.
+ @type d: C{Deferred}
+
+ @param send: the expected data to be sent.
+ @type send: C{str}
+
+ @param recv: the data to simulate as reception.
+ @type recv: C{str}
+
+ @param result: the expected result.
+ @type result: C{any}
+ """
+ def cb(res):
+ self.assertEquals(res, result)
+ self.assertEquals(self.transport.value(), send)
+ self.transport.clear()
+ d.addCallback(cb)
+ self.proto.dataReceived(recv)
+ return d
+
+ def test_get(self):
+ """
+ L{MemCacheProtocol.get} should return a L{Deferred} which is
+ called back with the value and the flag associated with the given key
+ if the server returns a successful result.
+ """
+ lock = MemCacheTestCase.FakedMemcacheLock(self.proto, "lock", "locking")
+ return self._test(
+ lock.get("foo"),
+ "get lock:foo\r\n",
+ "VALUE lock:foo 0 3\r\nbar\r\nEND\r\n",
+ "bar"
+ )
+
+ def test_set(self):
+ """
+ L{MemCacheProtocol.get} should return a L{Deferred} which is
+ called back with the value and the flag associated with the given key
+ if the server returns a successful result.
+ """
+ lock = MemCacheTestCase.FakedMemcacheLock(self.proto, "lock", "locking")
+ return self._test(
+ lock.set("foo", "bar"),
+ "set lock:foo 0 0 3\r\nbar\r\n",
+ "STORED\r\n",
+ True
+ )
+
+ @inlineCallbacks
+ def test_acquire(self):
+ """
+ L{MemCacheProtocol.get} should return a L{Deferred} which is
+ called back with the value and the flag associated with the given key
+ if the server returns a successful result.
+ """
+ lock = MemCacheTestCase.FakedMemcacheLock(self.proto, "lock", "locking")
+ yield self._test(
+ lock.acquire(),
+ "add lock:locking 0 0 1\r\n1\r\n",
+ "STORED\r\n",
+ True
+ )
+ self.assertTrue(lock._hasLock)
+
+ @inlineCallbacks
+ def test_acquire_release(self):
+ """
+ L{MemCacheProtocol.get} should return a L{Deferred} which is
+ called back with the value and the flag associated with the given key
+ if the server returns a successful result.
+ """
+ lock = MemCacheTestCase.FakedMemcacheLock(self.proto, "lock", "locking")
+ yield self._test(
+ lock.acquire(),
+ "add lock:locking 0 0 1\r\n1\r\n",
+ "STORED\r\n",
+ True
+ )
+ self.assertTrue(lock._hasLock)
+ yield self._test(
+ lock.release(),
+ "delete lock:locking\r\n",
+ "DELETED\r\n",
+ True
+ )
+ self.assertFalse(lock._hasLock)
+
+ @inlineCallbacks
+ def test_acquire_clean(self):
+ """
+ L{MemCacheProtocol.get} should return a L{Deferred} which is
+ called back with the value and the flag associated with the given key
+ if the server returns a successful result.
+ """
+ lock = MemCacheTestCase.FakedMemcacheLock(self.proto, "lock", "locking")
+ yield self._test(
+ lock.acquire(),
+ "add lock:locking 0 0 1\r\n1\r\n",
+ "STORED\r\n",
+ True
+ )
+ yield self._test(
+ lock.clean(),
+ "delete lock:locking\r\n",
+ "DELETED\r\n",
+ True
+ )
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080910/262504f7/attachment-0001.html
More information about the calendarserver-changes
mailing list