[CalendarServer-changes] [8287] CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/ twistedcaldav

source_changes at macosforge.org source_changes at macosforge.org
Mon Nov 14 14:18:26 PST 2011


Revision: 8287
          http://trac.macosforge.org/projects/calendarserver/changeset/8287
Author:   cdaboo at apple.com
Date:     2011-11-14 14:18:26 -0800 (Mon, 14 Nov 2011)
Log Message:
-----------
Remove memcachefifo prior to merge with trunk.

Modified Paths:
--------------
    CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/method/put_common.py
    CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/processing.py
    CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/storebridge.py

Removed Paths:
-------------
    CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachefifolock.py
    CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/test/test_memcachefifolock.py

Deleted: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachefifolock.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachefifolock.py	2011-11-14 21:30:51 UTC (rev 8286)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachefifolock.py	2011-11-14 22:18:26 UTC (rev 8287)
@@ -1,201 +0,0 @@
-##
-# Copyright (c) 2008-2009 Apple Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-##
-
-from twistedcaldav.memcacher import Memcacher
-from twisted.internet.defer import inlineCallbacks, Deferred, returnValue,\
-    succeed
-from twisted.internet import reactor
-import time
-from twistedcaldav.memcachelock import MemcacheLock, MemcacheLockTimeoutError
-import cPickle
-
-class MemcacheFIFOLock(Memcacher):
-    """
-    Implements a shared lock with a queue such that lock requests are honored in the order
-    in which they are received. This differs from MemcacheLock which does not honor ordering
-    of lock requests.
-    
-    The implementation here uses two memcached entries. One represent a ticket counter - a
-    counter incremented and returned each time a lock is requested. The other is the
-    "next in line queue" - a sorted list of pending counters. To get a lock the caller
-    is given the next ticket value, then polls waiting for their number to come up as
-    "next in line". When it does they are given the lock, and upon releasing it their
-    "next in line" value is popped off the front of the queue, giving the next a chance
-    to get the lock.
-    
-    One complication here: making sure that we don't get the queue stuck waiting on a
-    "next in line" that has gone away - so we need to be careful about maintaining the
-    next in line queue appropriately.
-    
-    TODO: have a timeout for the next in line process to grab the lock. That way if that
-    process has dies, at least the others will eventually get a lock.
-    
-    Note that this is really a temporary solution meant to address the need to order scheduling
-    requests in order to be "fair" to clients. Ultimately we want a persistent scheduling queue
-    implementation that would effectively manage the next in line work loads and would not depend
-    on the presence of any one working process. 
-    """
-
-    def __init__(self, namespace, locktoken, timeout=60.0, retry_interval=0.01, expire_time=0):
-        """
-        
-        @param namespace: a unique namespace for this lock's tokens
-        @type namespace: C{str}
-        @param locktoken: the name of the locktoken
-        @type locktoken: C{str}
-        @param timeout: the maximum time in seconds that the lock should block
-        @type timeout: C{float}
-        @param retry_interval: the interval to retry acquiring the lock
-        @type retry_interval: C{float}
-        @param expiryTime: the time in seconds for the lock to expire. Zero: no expiration.
-        @type expiryTime: C{float}
-        """
-
-        super(MemcacheFIFOLock, self).__init__(namespace)
-        self._locktoken = locktoken
-        self._ticket_token = "%s-ticket" % (self._locktoken,)
-        self._nextinline_token = "%s-next" % (self._locktoken,)
-        self._queue_token = "%s-queue" % (self._locktoken,)
-        self._timeout = timeout
-        self._retry_interval = retry_interval
-        self._expire_time = expire_time
-        self._hasLock = False
-        self._ticket = None
-
-    def _getMemcacheProtocol(self):
-        
-        result = super(MemcacheFIFOLock, self)._getMemcacheProtocol()
-
-        if isinstance(result, Memcacher.nullCacher):
-            raise AssertionError("No implementation of shared locking without memcached")
-        
-        return result
-
-    @inlineCallbacks
-    def acquire(self):
-        
-        assert not self._hasLock, "Lock already acquired."
-    
-        # First make sure the ticket and queue keys exist
-        yield self.add(self._ticket_token, "1", self._expire_time)
-        yield self.add(self._nextinline_token, "0", self._expire_time)
-        
-        # Get the next ticket
-        self._ticket = (yield self.incr(self._ticket_token)) - 1
-        
-        # Add ourselves to the pending queue
-        yield self._addToQueue()
-
-        timeout_at = time.time() + self._timeout
-        waiting = False
-        while True:
-            
-            # Check next in line value
-            result = int((yield self.get(self._nextinline_token)))
-            if result == self._ticket:
-                self._hasLock = True
-                if waiting:
-                    self.log_debug("Got lock after waiting on %s" % (self._locktoken,))
-                break
-            
-            if self._timeout and time.time() < timeout_at:
-                waiting = True
-                self.log_debug("Waiting for lock on %s" % (self._locktoken,))
-                pause = Deferred()
-                def _timedDeferred():
-                    pause.callback(True)
-                reactor.callLater(self._retry_interval, _timedDeferred)
-                yield pause
-            else:
-                # Must remove our active ticket value otherwise the next in line will stall on
-                # this lock which will never happen
-                yield self._removeFromQueue()
-
-                self.log_debug("Timed out lock after waiting on %s" % (self._locktoken,))
-                raise MemcacheLockTimeoutError()
-        
-        returnValue(True)
-
-    @inlineCallbacks
-    def release(self):
-        
-        assert self._hasLock, "Lock not acquired."
-        self._hasLock = False
-    
-        # Remove active ticket value - this will bump the next in line value
-        yield self._removeFromQueue()
-            
-
-    def clean(self):
-        
-        if self._hasLock:
-            return self.release()
-        else:
-            return succeed(True)
-
-    def locked(self):
-        """
-        Test if the lock is currently being held.
-        """
-        
-        return succeed(self._hasLock)
-
-    @inlineCallbacks
-    def _addToQueue(self):
-        """
-        Add our ticket to the queue. If it is now the first pending ticket, set the next in line
-        value to that.
-        """
-        
-        # We need a shared lock to protect access to the queue
-        lock = MemcacheLock(self._namespace, self._locktoken)
-        yield lock.acquire()
-        
-        try:
-            queued_value = (yield self.get(self._queue_token))
-            queued_items = cPickle.loads(queued_value) if queued_value is not None else []
-            queued_items.append(self._ticket)
-            queued_items.sort()
-            if len(queued_items) == 1:
-                yield self.set(self._nextinline_token, str(queued_items[0]))
-            yield self.set(self._queue_token, cPickle.dumps(queued_items))
-        finally:
-            yield lock.release()
-        
-    @inlineCallbacks
-    def _removeFromQueue(self):
-        """
-        Remove our ticket from the queue. If it was the first next in line value, then bump
-        next in line to the new head of the queue value or reset it if the queue is empty.
-        """
-        
-        # We need a shared lock to protect access to the queue
-        lock = MemcacheLock(self._namespace, self._locktoken)
-        yield lock.acquire()
-        
-        try:
-            queued_value = (yield self.get(self._queue_token))
-            queued_items = cPickle.loads(queued_value) if queued_value is not None else []
-            
-            if queued_items[0] == self._ticket:
-                del queued_items[0]
-                yield self.set(self._nextinline_token, str(queued_items[0] if queued_items else 0))
-            else:
-                queued_items.remove(self._ticket)
-            queued_items.sort()
-            yield self.set(self._queue_token, cPickle.dumps(queued_items))
-        finally:
-            yield lock.release()

Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/method/put_common.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/method/put_common.py	2011-11-14 21:30:51 UTC (rev 8286)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/method/put_common.py	2011-11-14 22:18:26 UTC (rev 8287)
@@ -53,8 +53,7 @@
 from twistedcaldav.ical import Component, Property
 from twistedcaldav.instance import TooManyInstancesError,\
     InvalidOverriddenInstanceError
-from twistedcaldav.memcachelock import MemcacheLockTimeoutError
-from twistedcaldav.memcachefifolock import MemcacheFIFOLock
+from twistedcaldav.memcachelock import MemcacheLock, MemcacheLockTimeoutError
 from twistedcaldav.scheduling.implicit import ImplicitScheduler
 
 log = Logger()
@@ -67,7 +66,7 @@
             if internal_request:
                 self.lock = None
             else:
-                self.lock = MemcacheFIFOLock("ImplicitUIDLock", uid, timeout=60.0, retry_interval=0.01, expire_time=5*60)
+                self.lock = MemcacheLock("ImplicitUIDLock", uid, timeout=60.0, expire_time=5*60)
             self.reserved = False
             self.index = index
             self.uid = uid

Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/processing.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/processing.py	2011-11-14 21:30:51 UTC (rev 8286)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/processing.py	2011-11-14 22:18:26 UTC (rev 8287)
@@ -38,7 +38,6 @@
 from pycalendar.duration import PyCalendarDuration
 from pycalendar.datetime import PyCalendarDateTime
 from pycalendar.timezone import PyCalendarTimezone
-from twistedcaldav.memcachefifolock import MemcacheFIFOLock
 
 __all__ = [
     "ImplicitProcessor",
@@ -257,7 +256,7 @@
             # We need to get the UID lock for implicit processing whilst we send the auto-reply
             # as the Organizer processing will attempt to write out data to other attendees to
             # refresh them. To prevent a race we need a lock.
-            uidlock = MemcacheFIFOLock("ImplicitUIDLock", self.uid, timeout=60.0, expire_time=5*60)
+            uidlock = MemcacheLock("ImplicitUIDLock", self.uid, timeout=60.0, expire_time=5*60)
     
             try:
                 yield uidlock.acquire()
@@ -515,7 +514,7 @@
         # We need to get the UID lock for implicit processing whilst we send the auto-reply
         # as the Organizer processing will attempt to write out data to other attendees to
         # refresh them. To prevent a race we need a lock.
-        lock = MemcacheFIFOLock("ImplicitUIDLock", calendar.resourceUID(), timeout=60.0, expire_time=5*60)
+        lock = MemcacheLock("ImplicitUIDLock", calendar.resourceUID(), timeout=60.0, expire_time=5*60)
 
         # Note that this lock also protects the request, as this request is
         # being re-used by potentially multiple transactions and should not be

Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/storebridge.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/storebridge.py	2011-11-14 21:30:51 UTC (rev 8286)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/storebridge.py	2011-11-14 22:18:26 UTC (rev 8287)
@@ -44,8 +44,7 @@
 from twistedcaldav.config import config
 from twistedcaldav.ical import Component as VCalendar, Property as VProperty,\
     InvalidICalendarDataError, iCalendarProductID
-from twistedcaldav.memcachelock import MemcacheLockTimeoutError
-from twistedcaldav.memcachefifolock import MemcacheFIFOLock
+from twistedcaldav.memcachelock import MemcacheLock, MemcacheLockTimeoutError
 from twistedcaldav.method.put_addressbook_common import StoreAddressObjectResource
 from twistedcaldav.method.put_common import StoreCalendarObjectResource
 from twistedcaldav.notifications import (
@@ -1876,7 +1875,7 @@
                 )
             )
             if do_implicit_action:
-                lock = MemcacheFIFOLock(
+                lock = MemcacheLock(
                     "ImplicitUIDLock", calendar.resourceUID(), timeout=60.0, expire_time=5*60
                 )
 

Deleted: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/test/test_memcachefifolock.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/test/test_memcachefifolock.py	2011-11-14 21:30:51 UTC (rev 8286)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/test/test_memcachefifolock.py	2011-11-14 22:18:26 UTC (rev 8287)
@@ -1,249 +0,0 @@
-##
-# Copyright (c) 2011 Apple Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-##
-
-from twisted.internet.defer import inlineCallbacks, Deferred
-
-from twistedcaldav.memcachefifolock import MemcacheFIFOLock
-
-from twistedcaldav.test.util import TestCase
-import cPickle
-from twistedcaldav.memcachelock import MemcacheLockTimeoutError
-from twistedcaldav.memcacher import Memcacher
-from twisted.internet import reactor
-
-class MemCacheTestCase(TestCase):
-    """
-    Test client protocol class L{MemCacheProtocol}.
-    """
-
-    @inlineCallbacks
-    def test_one_lock(self):
-
-        lock = MemcacheFIFOLock("lock", "test")
-        lock.allowTestCache = True
-
-        yield lock.acquire()
-        self.assertTrue((yield lock.locked()))
-
-        yield lock.release()
-        self.assertFalse((yield lock.locked()))
-
-        self.assertEqual((yield lock.get(lock._ticket_token)), "2")
-        self.assertEqual((yield lock.get(lock._nextinline_token)), "0")
-        self.assertEqual(cPickle.loads((yield lock.get(lock._queue_token))), [])
-
-    @inlineCallbacks
-    def test_two_locks_with_timeout(self):
-
-        lock1 = MemcacheFIFOLock("lock", "test")
-        lock1.allowTestCache = True
-
-        lock2 = MemcacheFIFOLock("lock", "test", timeout=0.01)
-        lock2.allowTestCache = True
-        
-        lock1._memcacheProtocol = lock2._memcacheProtocol = Memcacher.memoryCacher()
-
-        yield lock1.acquire()
-        self.assertTrue((yield lock1.locked()))
-
-        try:
-            yield lock2.acquire()
-        except MemcacheLockTimeoutError:
-            pass
-        else:
-            self.fail("Did not timeout lock")
-
-        self.assertTrue((yield lock1.locked()))
-        self.assertFalse((yield lock2.locked()))
-
-        self.assertEqual((yield lock1.get(lock1._ticket_token)), "3")
-        self.assertEqual((yield lock1.get(lock1._nextinline_token)), "1")
-        self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [1,])
-
-        yield lock1.release()
-        self.assertFalse((yield lock1.locked()))
-
-        self.assertEqual((yield lock1.get(lock1._ticket_token)), "3")
-        self.assertEqual((yield lock1.get(lock1._nextinline_token)), "0")
-        self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [])
-
-    @inlineCallbacks
-    def test_two_locks_in_sequence(self):
-
-        lock1 = MemcacheFIFOLock("lock", "test")
-        lock1.allowTestCache = True
-
-        lock2 = MemcacheFIFOLock("lock", "test")
-        lock2.allowTestCache = True
-        
-        lock1._memcacheProtocol = lock2._memcacheProtocol = Memcacher.memoryCacher()
-
-        yield lock1.acquire()
-        self.assertTrue((yield lock1.locked()))
-        self.assertFalse((yield lock2.locked()))
-
-        yield lock1.release()
-        self.assertFalse((yield lock1.locked()))
-        self.assertFalse((yield lock2.locked()))
-
-        yield lock2.acquire()
-        self.assertFalse((yield lock1.locked()))
-        self.assertTrue((yield lock2.locked()))
-
-        yield lock2.release()
-        self.assertFalse((yield lock1.locked()))
-        self.assertFalse((yield lock2.locked()))
-
-        self.assertEqual((yield lock1.get(lock1._ticket_token)), "3")
-        self.assertEqual((yield lock1.get(lock1._nextinline_token)), "0")
-        self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [])
-
-    @inlineCallbacks
-    def test_two_locks_in_parallel(self):
-
-        lock1 = MemcacheFIFOLock("lock", "test", timeout=1.0)
-        lock1.allowTestCache = True
-
-        lock2 = MemcacheFIFOLock("lock", "test", timeout=1.0)
-        lock2.allowTestCache = True
-        
-        lock1._memcacheProtocol = lock2._memcacheProtocol = Memcacher.memoryCacher()
-
-        yield lock1.acquire()
-        self.assertTrue((yield lock1.locked()))
-        self.assertFalse((yield lock2.locked()))
-
-        @inlineCallbacks
-        def _release1():
-            self.assertTrue((yield lock1.locked()))
-            self.assertFalse((yield lock2.locked()))
-            yield lock1.release()
-            self.assertFalse((yield lock1.locked()))
-            self.assertFalse((yield lock2.locked()))
-        reactor.callLater(0.1, _release1)
-
-        yield lock2.acquire()
-        self.assertFalse((yield lock1.locked()))
-        self.assertTrue((yield lock2.locked()))
-
-        yield lock2.release()
-        self.assertFalse((yield lock1.locked()))
-        self.assertFalse((yield lock2.locked()))
-
-        self.assertEqual((yield lock1.get(lock1._ticket_token)), "3")
-        self.assertEqual((yield lock1.get(lock1._nextinline_token)), "0")
-        self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [])
-
-
-    @inlineCallbacks
-    def test_three_in_order(self):
-        """
-        This tests overlaps a lock on #1 with two attempts on #2 and #3. #3 has
-        a very much shorter polling interval than #2 so when #1 releases, #3 will
-        poll first. We want to make sure that #3 is not given the lock until after
-        #2 has polled and acquired/releases it. i.e., this tests the FIFO behavior.
-        """
-
-        lock1 = MemcacheFIFOLock("lock", "test", timeout=2.0)
-        lock1.allowTestCache = True
-
-        lock2 = MemcacheFIFOLock("lock", "test", timeout=2.0, retry_interval=0.5)
-        lock2.allowTestCache = True
-        
-        lock3 = MemcacheFIFOLock("lock", "test", timeout=2.0, retry_interval=0.01) # retry a lot faster than #2
-        lock3.allowTestCache = True
-        
-        lock1._memcacheProtocol = \
-        lock2._memcacheProtocol = \
-        lock3._memcacheProtocol = Memcacher.memoryCacher()
-
-        d = Deferred()
-
-        yield lock1.acquire()
-        self.assertTrue((yield lock1.locked()))
-        self.assertFalse((yield lock2.locked()))
-        self.assertFalse((yield lock3.locked()))
-
-        @inlineCallbacks
-        def _release1():
-            self.assertTrue((yield lock1.locked()))
-            self.assertFalse((yield lock2.locked()))
-            self.assertFalse((yield lock3.locked()))
-            yield lock1.release()
-            self.assertFalse((yield lock1.locked()))
-            self.assertFalse((yield lock2.locked()))
-            self.assertFalse((yield lock3.locked()))
-    
-            self.assertEqual((yield lock1.get(lock1._ticket_token)), "4")
-            self.assertEqual((yield lock1.get(lock1._nextinline_token)), "2")
-            self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [2, 3])
-        reactor.callLater(0.1, _release1)
-
-        @inlineCallbacks
-        def _acquire2():
-            self.assertTrue((yield lock1.locked()))
-            self.assertFalse((yield lock2.locked()))
-            self.assertFalse((yield lock3.locked()))
-            
-            @inlineCallbacks
-            def _release2():
-                self.assertFalse((yield lock1.locked()))
-                self.assertTrue((yield lock2.locked()))
-                self.assertFalse((yield lock3.locked()))
-                yield lock2.release()
-                self.assertFalse((yield lock1.locked()))
-                self.assertFalse((yield lock2.locked()))
-                self.assertFalse((yield lock3.locked()))
-
-            yield lock2.acquire()
-            reactor.callLater(0.1, _release2)
-            self.assertFalse((yield lock1.locked()))
-            self.assertTrue((yield lock2.locked()))
-            self.assertFalse((yield lock3.locked()))
-
-        reactor.callLater(0.01, _acquire2)
-
-        @inlineCallbacks
-        def _acquire3():
-            self.assertTrue((yield lock1.locked()))
-            self.assertFalse((yield lock2.locked()))
-            self.assertFalse((yield lock3.locked()))
-            
-            @inlineCallbacks
-            def _release3():
-                self.assertFalse((yield lock1.locked()))
-                self.assertFalse((yield lock2.locked()))
-                self.assertTrue((yield lock3.locked()))
-                yield lock3.release()
-                self.assertFalse((yield lock1.locked()))
-                self.assertFalse((yield lock2.locked()))
-                self.assertFalse((yield lock3.locked()))
-        
-                self.assertEqual((yield lock1.get(lock1._ticket_token)), "4")
-                self.assertEqual((yield lock1.get(lock1._nextinline_token)), "0")
-                self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [])
-                
-                d.callback(True)
-
-            yield lock3.acquire()
-            reactor.callLater(0.1, _release3)
-            self.assertFalse((yield lock1.locked()))
-            self.assertFalse((yield lock2.locked()))
-            self.assertTrue((yield lock3.locked()))
-
-        reactor.callLater(0.02, _acquire3)
-
-        yield d
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111114/6bbeaed2/attachment-0001.html>


More information about the calendarserver-changes mailing list