[CalendarServer-changes] [2974] CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav

source_changes at macosforge.org source_changes at macosforge.org
Wed Sep 10 20:30:05 PDT 2008


Revision: 2974
          http://trac.macosforge.org/projects/calendarserver/changeset/2974
Author:   cdaboo at apple.com
Date:     2008-09-10 20:30:02 -0700 (Wed, 10 Sep 2008)
Log Message:
-----------
Memcache based deferred lock.

Modified Paths:
--------------
    CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/memcacher.py

Added Paths:
-----------
    CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/memcachelock.py
    CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/test/test_memcachelock.py

Added: CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/memcachelock.py
===================================================================
--- CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/memcachelock.py	                        (rev 0)
+++ CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/memcachelock.py	2008-09-11 03:30:02 UTC (rev 2974)
@@ -0,0 +1,117 @@
+##
+# Copyright (c) 2008 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twistedcaldav.memcacher import Memcacher
+from twisted.internet.defer import inlineCallbacks, Deferred, returnValue,\
+    succeed
+from twisted.internet import reactor
+import time
+
+class MemcacheLock(Memcacher):
+
+    def __init__(self, namespace, locktoken, timeout=5.0, retry_interval=0.1, expire_time=0):
+        """
+        
+        @param namespace: a unique namespace for this lock's tokens
+        @type namespace: C{str}
+        @param locktoken: the name of the locktoken
+        @type locktoken: C{str}
+        @param timeout: the maximum time in seconds that the lock should block
+        @type timeout: C{float}
+        @param retry_interval: the interval to retry acquiring the lock
+        @type retry_interval: C{float}
+        @param expiryTime: the time in seconds for the lock to expire. Zero: no expiration.
+        @type expiryTime: C{float}
+        """
+
+        super(MemcacheLock, self).__init__(namespace)
+        self._locktoken = locktoken
+        self._timeout = timeout
+        self._retry_interval = retry_interval
+        self._expire_time = expire_time
+        self._hasLock = False
+
+    def _getMemcacheProtocol(self):
+        
+        result = super(MemcacheLock, self)._getMemcacheProtocol()
+
+        if isinstance(result, Memcacher.nullCacher):
+            raise AssertionError("No implementation of shared locking without memcached")
+        
+        return result
+
+    @inlineCallbacks
+    def acquire(self):
+        
+        assert not self._hasLock, "Lock already acquired."
+    
+        start_time = time.time()
+        waiting = False
+        while time.time() < start_time + self._timeout:
+            
+            result = (yield self.add(self._locktoken, "1", self._expire_time))
+            if result:
+                self._hasLock = True
+                if waiting:
+                    self.log_debug("Got lock after waiting on %s" % (self._locktoken,))
+                break
+            
+            waiting = True
+            self.log_debug("Waiting for lock on %s" % (self._locktoken,))
+            pause = Deferred()
+            def _timedDeferred():
+                pause.callback(True)
+            reactor.callLater(self._retry_interval, _timedDeferred)
+            yield pause
+        else:
+            self.log_debug("Timed out lock after waiting on %s" % (self._locktoken,))
+            raise MemcacheLockTimeoutError()
+        
+        returnValue(True)
+
+    def release(self):
+        
+        assert self._hasLock, "Lock not acquired."
+    
+        def _done(result):
+            self._hasLock = False
+            return result
+
+        d = self.delete(self._locktoken)
+        d.addCallback(_done)
+        return d
+
+    def clean(self):
+        
+        if self._hasLock:
+            return self.delete(self._locktoken)
+        else:
+            return succeed(True)
+
+    def locked(self):
+        """
+        Test if the lock is currently being held.
+        """
+        
+        def _gotit(value):
+            return value is not None
+
+        d = self.get(self._locktoken)
+        d.addCallback(_gotit)
+        return d
+
+class MemcacheLockTimeoutError(Exception):
+    pass

Modified: CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/memcacher.py
===================================================================
--- CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/memcacher.py	2008-09-10 22:00:39 UTC (rev 2973)
+++ CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/memcacher.py	2008-09-11 03:30:02 UTC (rev 2974)
@@ -34,7 +34,14 @@
         def __init__(self):
             self._cache = {}
 
-        def set(self, key, value):
+        def add(self, key, value, expire_time=0):
+            if key not in self._cache:
+                self._cache[key] = value
+                return succeed(True)
+            else:
+                return succeed(False)
+
+        def set(self, key, value, expire_time=0):
             self._cache[key] = value
             return succeed(True)
 
@@ -59,9 +66,12 @@
         does not actually cache anything.
         """
 
-        def set(self, key, value):
+        def add(self, key, value, expire_time=0):
             return succeed(True)
 
+        def set(self, key, value, expire_time=0):
+            return succeed(True)
+
         def get(self, key):
             return succeed((0, None,))
 
@@ -109,12 +119,21 @@
             return self._memcacheProtocol
 
 
-    def set(self, key, value):
+    def add(self, key, value, expire_time=0):
         my_value = value
         if self._pickle:
             my_value = cPickle.dumps(value)
+        self.log_debug("Adding Cache Token for %r" % (key,))
+        return self._getMemcacheProtocol().add(
+            '%s:%s' % (self._namespace, key), my_value, expireTime=expire_time)
+
+    def set(self, key, value, expire_time=0):
+        my_value = value
+        if self._pickle:
+            my_value = cPickle.dumps(value)
+        self.log_debug("Setting Cache Token for %r" % (key,))
         return self._getMemcacheProtocol().set(
-            '%s:%s' % (self._namespace, key), my_value)
+            '%s:%s' % (self._namespace, key), my_value, expireTime=expire_time)
 
     def get(self, key):
         def _gotit(result):

Added: CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/test/test_memcachelock.py
===================================================================
--- CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/test/test_memcachelock.py	                        (rev 0)
+++ CalendarServer/branches/users/cdaboo/implicitauto-2947/twistedcaldav/test/test_memcachelock.py	2008-09-11 03:30:02 UTC (rev 2974)
@@ -0,0 +1,170 @@
+# Copyright (c) 2007 Twisted Matrix Laboratories.
+# See LICENSE for details.
+
+"""
+Test the memcache client protocol.
+"""
+
+from twistedcaldav.memcache import MemCacheProtocol
+
+from twisted.trial.unittest import TestCase
+from twisted.test.proto_helpers import StringTransportWithDisconnection
+from twisted.internet.task import Clock
+from twisted.internet.defer import inlineCallbacks
+from twistedcaldav.memcachelock import MemcacheLock
+
+
+class MemCacheTestCase(TestCase):
+    """
+    Test client protocol class L{MemCacheProtocol}.
+    """
+
+    class FakedMemcacheLock(MemcacheLock):
+        
+        def __init__(self, faked, namespace, locktoken, timeout=5.0, retry_interval=0.1, expire_time=0):
+            """
+            
+            @param namespace: a unique namespace for this lock's tokens
+            @type namespace: C{str}
+            @param locktoken: the name of the locktoken
+            @type locktoken: C{str}
+            @param timeout: the maximum time in seconds that the lock should block
+            @type timeout: C{float}
+            @param retry_interval: the interval to retry acquiring the lock
+            @type retry_interval: C{float}
+            @param expiryTime: the time in seconds for the lock to expire. Zero: no expiration.
+            @type expiryTime: C{float}
+            """
+    
+            super(MemCacheTestCase.FakedMemcacheLock, self).__init__(namespace, locktoken, timeout, retry_interval, expire_time)
+            self.faked = faked
+
+        def _getMemcacheProtocol(self):
+            
+            return self.faked
+        
+    def setUp(self):
+        """
+        Create a memcache client, connect it to a string protocol, and make it
+        use a deterministic clock.
+        """
+        self.proto = MemCacheProtocol()
+        self.clock = Clock()
+        self.proto.callLater = self.clock.callLater
+        self.transport = StringTransportWithDisconnection()
+        self.transport.protocol = self.proto
+        self.proto.makeConnection(self.transport)
+
+
+    def _test(self, d, send, recv, result):
+        """
+        Shortcut method for classic tests.
+
+        @param d: the resulting deferred from the memcache command.
+        @type d: C{Deferred}
+
+        @param send: the expected data to be sent.
+        @type send: C{str}
+
+        @param recv: the data to simulate as reception.
+        @type recv: C{str}
+
+        @param result: the expected result.
+        @type result: C{any}
+        """
+        def cb(res):
+            self.assertEquals(res, result)
+        self.assertEquals(self.transport.value(), send)
+        self.transport.clear()
+        d.addCallback(cb)
+        self.proto.dataReceived(recv)
+        return d
+
+    def test_get(self):
+        """
+        L{MemCacheProtocol.get} should return a L{Deferred} which is
+        called back with the value and the flag associated with the given key
+        if the server returns a successful result.
+        """
+        lock = MemCacheTestCase.FakedMemcacheLock(self.proto, "lock", "locking")
+        return self._test(
+            lock.get("foo"),
+            "get lock:foo\r\n",
+            "VALUE lock:foo 0 3\r\nbar\r\nEND\r\n",
+            "bar"
+        )
+
+    def test_set(self):
+        """
+        L{MemCacheProtocol.get} should return a L{Deferred} which is
+        called back with the value and the flag associated with the given key
+        if the server returns a successful result.
+        """
+        lock = MemCacheTestCase.FakedMemcacheLock(self.proto, "lock", "locking")
+        return self._test(
+            lock.set("foo", "bar"),
+            "set lock:foo 0 0 3\r\nbar\r\n",
+            "STORED\r\n",
+            True
+        )
+
+    @inlineCallbacks
+    def test_acquire(self):
+        """
+        L{MemCacheProtocol.get} should return a L{Deferred} which is
+        called back with the value and the flag associated with the given key
+        if the server returns a successful result.
+        """
+        lock = MemCacheTestCase.FakedMemcacheLock(self.proto, "lock", "locking")
+        yield self._test(
+            lock.acquire(),
+            "add lock:locking 0 0 1\r\n1\r\n",
+            "STORED\r\n",
+            True
+        )
+        self.assertTrue(lock._hasLock)
+
+    @inlineCallbacks
+    def test_acquire_release(self):
+        """
+        L{MemCacheProtocol.get} should return a L{Deferred} which is
+        called back with the value and the flag associated with the given key
+        if the server returns a successful result.
+        """
+        lock = MemCacheTestCase.FakedMemcacheLock(self.proto, "lock", "locking")
+        yield self._test(
+            lock.acquire(),
+            "add lock:locking 0 0 1\r\n1\r\n",
+            "STORED\r\n",
+            True
+        )
+        self.assertTrue(lock._hasLock)
+        yield self._test(
+            lock.release(),
+            "delete lock:locking\r\n",
+            "DELETED\r\n",
+            True
+        )
+        self.assertFalse(lock._hasLock)
+
+    @inlineCallbacks
+    def test_acquire_clean(self):
+        """
+        L{MemCacheProtocol.get} should return a L{Deferred} which is
+        called back with the value and the flag associated with the given key
+        if the server returns a successful result.
+        """
+        lock = MemCacheTestCase.FakedMemcacheLock(self.proto, "lock", "locking")
+        yield self._test(
+            lock.acquire(),
+            "add lock:locking 0 0 1\r\n1\r\n",
+            "STORED\r\n",
+            True
+        )
+        yield self._test(
+            lock.clean(),
+            "delete lock:locking\r\n",
+            "DELETED\r\n",
+            True
+        )
+
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080910/262504f7/attachment-0001.html 


More information about the calendarserver-changes mailing list