[CalendarServer-changes] [8124] CalendarServer/branches/users/cdaboo/queued-attendee-refreshes

source_changes at macosforge.org source_changes at macosforge.org
Tue Sep 27 12:00:26 PDT 2011


Revision: 8124
          http://trac.macosforge.org/projects/calendarserver/changeset/8124
Author:   cdaboo at apple.com
Date:     2011-09-27 12:00:26 -0700 (Tue, 27 Sep 2011)
Log Message:
-----------
Current snapshot - implemented a memcachefifolock that ensures requests for the same event get processed
in the order in which they come in.

Modified Paths:
--------------
    CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/conf/caldavd-test.plist
    CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/contrib/performance/loadtest/sim.py
    CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/contrib/performance/stats.py
    CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/ical.py
    CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachepool.py
    CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcacher.py
    CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/method/put_common.py
    CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/implicit.py
    CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/processing.py
    CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/stdconfig.py
    CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/storebridge.py

Added Paths:
-----------
    CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/contrib/performance/__init__.py
    CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachefifolock.py
    CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/test/test_memcachefifolock.py

Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/conf/caldavd-test.plist
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/conf/caldavd-test.plist	2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/conf/caldavd-test.plist	2011-09-27 19:00:26 UTC (rev 8124)
@@ -782,7 +782,7 @@
         <false/>
         <key>AllowResourceAsOrganizer</key>
         <false/>
-        <key>AttendeeRefreshInterval</key>
+        <key>AttendeeRefreshIntervalSeconds</key>
         <integer>0</integer>
       </dict>
 

Added: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/contrib/performance/__init__.py
===================================================================
Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/contrib/performance/loadtest/sim.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/contrib/performance/loadtest/sim.py	2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/contrib/performance/loadtest/sim.py	2011-09-27 19:00:26 UTC (rev 8124)
@@ -278,3 +278,7 @@
             obs.report()
 
 main = LoadSimulator.main
+
+if __name__ == '__main__':
+    main()
+

Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/contrib/performance/stats.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/contrib/performance/stats.py	2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/contrib/performance/stats.py	2011-09-27 19:00:26 UTC (rev 8124)
@@ -16,7 +16,7 @@
 
 import random, time, datetime
 
-import pytz
+#import pytz
 
 from zope.interface import Interface, implements
 
@@ -317,7 +317,7 @@
         self._daysOfWeek = [self._weekdayNames.index(day) for day in daysOfWeek]
         self._beginHour = beginHour
         self._endHour = endHour
-        self._tzinfo = pytz.timezone(tzname)
+        self._tzinfo = None #pytz.timezone(tzname)
         self._helperDistribution = NormalDistribution(
             # Mean 6 workdays in the future
             60 * 60 * 8 * 6,

Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/ical.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/ical.py	2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/ical.py	2011-09-27 19:00:26 UTC (rev 8124)
@@ -1719,6 +1719,10 @@
                 yield attendee
 
 
+    def countAllUniqueAttendees(self, onlyScheduleAgentServer=True):
+        attendeesByInstance = self.getAttendeesByInstance(True, onlyScheduleAgentServer=onlyScheduleAgentServer)
+        return len(set([attendee for attendee, _ignore in attendeesByInstance]))
+        
     def getMaskUID(self):
         """
         Get the X-CALENDARSEREVR-MASK-UID value. Works on either a VCALENDAR or on a component.

Added: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachefifolock.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachefifolock.py	                        (rev 0)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachefifolock.py	2011-09-27 19:00:26 UTC (rev 8124)
@@ -0,0 +1,201 @@
+##
+# Copyright (c) 2008-2009 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twistedcaldav.memcacher import Memcacher
+from twisted.internet.defer import inlineCallbacks, Deferred, returnValue,\
+    succeed
+from twisted.internet import reactor
+import time
+from twistedcaldav.memcachelock import MemcacheLock, MemcacheLockTimeoutError
+import cPickle
+
+class MemcacheFIFOLock(Memcacher):
+    """
+    Implements a shared lock with a queue such that lock requests are honored in the order
+    in which they are received. This differs from MemcacheLock which does not honor ordering
+    of lock requests.
+    
+    The implementation here uses two memcached entries. One represent a ticket counter - a
+    counter incremented and returned each time a lock is requested. The other is the
+    "next in line queue" - a sorted list of pending counters. To get a lock the caller
+    is given the next ticket value, then polls waiting for their number to come up as
+    "next in line". When it does they are given the lock, and upon releasing it their
+    "next in line" value is popped off the front of the queue, giving the next a chance
+    to get the lock.
+    
+    One complication here: making sure that we don't get the queue stuck waiting on a
+    "next in line" that has gone away - so we need to be careful about maintaining the
+    next in line queue appropriately.
+    
+    TODO: have a timeout for the next in line process to grab the lock. That way if that
+    process has dies, at least the others will eventually get a lock.
+    
+    Note that this is really a temporary solution meant to address the need to order scheduling
+    requests in order to be "fair" to clients. Ultimately we want a persistent scheduling queue
+    implementation that would effectively manage the next in line work loads and would not depend
+    on the presence of any one working process. 
+    """
+
+    def __init__(self, namespace, locktoken, timeout=60.0, retry_interval=0.01, expire_time=0):
+        """
+        
+        @param namespace: a unique namespace for this lock's tokens
+        @type namespace: C{str}
+        @param locktoken: the name of the locktoken
+        @type locktoken: C{str}
+        @param timeout: the maximum time in seconds that the lock should block
+        @type timeout: C{float}
+        @param retry_interval: the interval to retry acquiring the lock
+        @type retry_interval: C{float}
+        @param expiryTime: the time in seconds for the lock to expire. Zero: no expiration.
+        @type expiryTime: C{float}
+        """
+
+        super(MemcacheFIFOLock, self).__init__(namespace)
+        self._locktoken = locktoken
+        self._ticket_token = "%s-ticket" % (self._locktoken,)
+        self._nextinline_token = "%s-next" % (self._locktoken,)
+        self._queue_token = "%s-queue" % (self._locktoken,)
+        self._timeout = timeout
+        self._retry_interval = retry_interval
+        self._expire_time = expire_time
+        self._hasLock = False
+        self._ticket = None
+
+    def _getMemcacheProtocol(self):
+        
+        result = super(MemcacheFIFOLock, self)._getMemcacheProtocol()
+
+        if isinstance(result, Memcacher.nullCacher):
+            raise AssertionError("No implementation of shared locking without memcached")
+        
+        return result
+
+    @inlineCallbacks
+    def acquire(self):
+        
+        assert not self._hasLock, "Lock already acquired."
+    
+        # First make sure the ticket and queue keys exist
+        yield self.add(self._ticket_token, "1", self._expire_time)
+        yield self.add(self._nextinline_token, "0", self._expire_time)
+        
+        # Get the next ticket
+        self._ticket = (yield self.incr(self._ticket_token)) - 1
+        
+        # Add ourselves to the pending queue
+        yield self._addToQueue()
+
+        timeout_at = time.time() + self._timeout
+        waiting = False
+        while True:
+            
+            # Check next in line value
+            result = int((yield self.get(self._nextinline_token)))
+            if result == self._ticket:
+                self._hasLock = True
+                if waiting:
+                    self.log_debug("Got lock after waiting on %s" % (self._locktoken,))
+                break
+            
+            if self._timeout and time.time() < timeout_at:
+                waiting = True
+                self.log_debug("Waiting for lock on %s" % (self._locktoken,))
+                pause = Deferred()
+                def _timedDeferred():
+                    pause.callback(True)
+                reactor.callLater(self._retry_interval, _timedDeferred)
+                yield pause
+            else:
+                # Must remove our active ticket value otherwise the next in line will stall on
+                # this lock which will never happen
+                yield self._removeFromQueue()
+
+                self.log_debug("Timed out lock after waiting on %s" % (self._locktoken,))
+                raise MemcacheLockTimeoutError()
+        
+        returnValue(True)
+
+    @inlineCallbacks
+    def release(self):
+        
+        assert self._hasLock, "Lock not acquired."
+        self._hasLock = False
+    
+        # Remove active ticket value - this will bump the next in line value
+        yield self._removeFromQueue()
+            
+
+    def clean(self):
+        
+        if self._hasLock:
+            return self.release()
+        else:
+            return succeed(True)
+
+    def locked(self):
+        """
+        Test if the lock is currently being held.
+        """
+        
+        return succeed(self._hasLock)
+
+    @inlineCallbacks
+    def _addToQueue(self):
+        """
+        Add our ticket to the queue. If it is now the first pending ticket, set the next in line
+        value to that.
+        """
+        
+        # We need a shared lock to protect access to the queue
+        lock = MemcacheLock(self._namespace, self._locktoken)
+        yield lock.acquire()
+        
+        try:
+            queued_value = (yield self.get(self._queue_token))
+            queued_items = cPickle.loads(queued_value) if queued_value is not None else []
+            queued_items.append(self._ticket)
+            queued_items.sort()
+            if len(queued_items) == 1:
+                yield self.set(self._nextinline_token, str(queued_items[0]))
+            yield self.set(self._queue_token, cPickle.dumps(queued_items))
+        finally:
+            yield lock.release()
+        
+    @inlineCallbacks
+    def _removeFromQueue(self):
+        """
+        Remove our ticket from the queue. If it was the first next in line value, then bump
+        next in line to the new head of the queue value or reset it if the queue is empty.
+        """
+        
+        # We need a shared lock to protect access to the queue
+        lock = MemcacheLock(self._namespace, self._locktoken)
+        yield lock.acquire()
+        
+        try:
+            queued_value = (yield self.get(self._queue_token))
+            queued_items = cPickle.loads(queued_value) if queued_value is not None else []
+            
+            if queued_items[0] == self._ticket:
+                del queued_items[0]
+                yield self.set(self._nextinline_token, str(queued_items[0] if queued_items else 0))
+            else:
+                queued_items.remove(self._ticket)
+            queued_items.sort()
+            yield self.set(self._queue_token, cPickle.dumps(queued_items))
+        finally:
+            yield lock.release()

Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachepool.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachepool.py	2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachepool.py	2011-09-27 19:00:26 UTC (rev 8124)
@@ -360,6 +360,12 @@
     def add(self, *args, **kwargs):
         return self.performRequest('add', *args, **kwargs)
 
+    def incr(self, *args, **kwargs):
+        return self.performRequest('increment', *args, **kwargs)
+
+    def decr(self, *args, **kwargs):
+        return self.performRequest('decrement', *args, **kwargs)
+
     def flush_all(self, *args, **kwargs):
         return self.performRequest('flush_all', *args, **kwargs)
 

Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcacher.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcacher.py	2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcacher.py	2011-09-27 19:00:26 UTC (rev 8124)
@@ -70,6 +70,32 @@
             except KeyError:
                 return succeed(False)
 
+        def incr(self, key, delta=1):
+            value = self._cache.get(key, None)
+            if value is not None:
+                try:
+                    value = int(value)
+                except ValueError:
+                    value = None
+                else:
+                    value += delta
+                    self._cache[key] = str(value)
+            return succeed(value)
+
+        def decr(self, key, delta=1):
+            value = self._cache.get(key, None)
+            if value is not None:
+                try:
+                    value = int(value)
+                except ValueError:
+                    value = None
+                else:
+                    value -= delta
+                    if value < 0:
+                        value = 0
+                    self._cache[key] = str(value)
+            return succeed(value)
+
         def flush_all(self):
             self._cache = {}
             return succeed(True)
@@ -97,6 +123,12 @@
         def delete(self, key):
             return succeed(True)
 
+        def incr(self, key, delta=1):
+            return succeed(None)
+
+        def decr(self, key, delta=1):
+            return succeed(None)
+
         def flush_all(self):
             return succeed(True)
 
@@ -195,6 +227,14 @@
         self.log_debug("Deleting Cache Token for %r" % (key,))
         return self._getMemcacheProtocol().delete('%s:%s' % (self._namespace, self._normalizeKey(key)))
 
+    def incr(self, key, delta=1):
+        self.log_debug("Incrementing Cache Token for %r" % (key,))
+        return self._getMemcacheProtocol().incr('%s:%s' % (self._namespace, self._normalizeKey(key)), delta)
+
+    def decr(self, key, delta=1):
+        self.log_debug("Decrementing Cache Token for %r" % (key,))
+        return self._getMemcacheProtocol().incr('%s:%s' % (self._namespace, self._normalizeKey(key)), delta)
+
     def flush_all(self):
         self.log_debug("Flushing All Cache Tokens")
         return self._getMemcacheProtocol().flush_all()

Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/method/put_common.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/method/put_common.py	2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/method/put_common.py	2011-09-27 19:00:26 UTC (rev 8124)
@@ -56,7 +56,8 @@
 from twistedcaldav.ical import Component, Property
 from twistedcaldav.instance import TooManyInstancesError,\
     InvalidOverriddenInstanceError
-from twistedcaldav.memcachelock import MemcacheLock, MemcacheLockTimeoutError
+from twistedcaldav.memcachelock import MemcacheLockTimeoutError
+from twistedcaldav.memcachefifolock import MemcacheFIFOLock
 from twistedcaldav.scheduling.implicit import ImplicitScheduler
 
 log = Logger()
@@ -69,7 +70,7 @@
             if internal_request:
                 self.lock = None
             else:
-                self.lock = MemcacheLock("ImplicitUIDLock", uid, timeout=60.0)
+                self.lock = MemcacheFIFOLock("ImplicitUIDLock", uid, timeout=60.0, retry_interval=0.01)
             self.reserved = False
             self.index = index
             self.uid = uid
@@ -88,7 +89,7 @@
             # Lets use a deferred for this and loop a few times if we cannot reserve so that we give
             # time to whoever has the reservation to finish and release it.
             failure_count = 0
-            while(failure_count < 10):
+            while(failure_count < 100):
                 try:
                     yield self.index.reserveUID(self.uid)
                     self.reserved = True
@@ -100,7 +101,7 @@
                 pause = Deferred()
                 def _timedDeferred():
                     pause.callback(True)
-                reactor.callLater(0.5, _timedDeferred)
+                reactor.callLater(0.05, _timedDeferred)
                 yield pause
             
             if self.uri and not self.reserved:
@@ -938,7 +939,7 @@
                     # All auto-processed updates for an Organizer leave the tag unchanged
                     change_scheduletag = False
                 elif self.processing_organizer == False:
-                    # Auto-processed updates that are the result of an organizer "refresh' due
+                    # Auto-processed updates that are the result of an organizer "refresh" due
                     # to another Attendee's REPLY should leave the tag unchanged
                     change_scheduletag = not hasattr(self.request, "doing_attendee_refresh")
 

Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/implicit.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/implicit.py	2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/implicit.py	2011-09-27 19:00:26 UTC (rev 8124)
@@ -242,20 +242,14 @@
             returnValue(self.return_calendar if hasattr(self, "return_calendar") else self.calendar)
 
     @inlineCallbacks
-    def refreshAllAttendeesExceptSome(self, request, resource, calendar, attendees):
+    def refreshAllAttendeesExceptSome(self, request, resource, attendees):
         """
-        
-        @param request:
-        @type request:
-        @param attendee:
-        @type attendee:
-        @param calendar:
-        @type calendar:
+        Refresh the iCalendar data for all attendees except the one specified in attendees.
         """
 
         self.request = request
         self.resource = resource
-        self.calendar = calendar
+        self.calendar = (yield self.resource.iCalendarForUser(self.request))
         self.state = "organizer"
         self.action = "modify"
 

Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/processing.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/processing.py	2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/processing.py	2011-09-27 19:00:26 UTC (rev 8124)
@@ -38,6 +38,7 @@
 from pycalendar.duration import PyCalendarDuration
 from pycalendar.datetime import PyCalendarDateTime
 from pycalendar.timezone import PyCalendarTimezone
+from twistedcaldav.memcachefifolock import MemcacheFIFOLock
 
 __all__ = [
     "ImplicitProcessor",
@@ -231,9 +232,9 @@
 
         # Use a memcachelock to ensure others don't refresh whilst we have an enqueued call
         self.uid = self.recipient_calendar.resourceUID()
-        if config.Scheduling.Options.AttendeeRefreshInterval:
+        if config.Scheduling.Options.AttendeeRefreshIntervalSeconds and self.recipient_calendar.countAllUniqueAttendees() > config.Scheduling.Options.AttendeeRefreshThreshold:
             attendees = ()
-            lock = MemcacheLock("RefreshUIDLock", self.uid, timeout=0.0, expire_time=config.Scheduling.Options.AttendeeRefreshInterval)
+            lock = MemcacheLock("RefreshUIDLock", self.uid, timeout=0.0, expire_time=config.Scheduling.Options.AttendeeRefreshIntervalSeconds)
             
             # Try lock, but fail immediately if already taken
             try:
@@ -248,7 +249,7 @@
             log.debug("ImplicitProcessing - refreshing UID: '%s'" % (self.uid,))
             from twistedcaldav.scheduling.implicit import ImplicitScheduler
             scheduler = ImplicitScheduler()
-            yield scheduler.refreshAllAttendeesExceptSome(self.request, organizer_resource, self.recipient_calendar, attendees)
+            yield scheduler.refreshAllAttendeesExceptSome(self.request, organizer_resource, attendees)
 
         @inlineCallbacks
         def _doDelayedRefresh():
@@ -256,7 +257,7 @@
             # We need to get the UID lock for implicit processing whilst we send the auto-reply
             # as the Organizer processing will attempt to write out data to other attendees to
             # refresh them. To prevent a race we need a lock.
-            uidlock = MemcacheLock("ImplicitUIDLock", self.uid, timeout=60.0)
+            uidlock = MemcacheFIFOLock("ImplicitUIDLock", self.uid, timeout=60.0)
     
             try:
                 yield uidlock.acquire()
@@ -288,7 +289,7 @@
                 yield uidlock.clean()
 
         if lock:
-            reactor.callLater(config.Scheduling.Options.AttendeeRefreshInterval, _doDelayedRefresh)
+            reactor.callLater(config.Scheduling.Options.AttendeeRefreshIntervalSeconds, _doDelayedRefresh)
         else:
             yield _doRefresh(resource)
 
@@ -507,7 +508,7 @@
         # We need to get the UID lock for implicit processing whilst we send the auto-reply
         # as the Organizer processing will attempt to write out data to other attendees to
         # refresh them. To prevent a race we need a lock.
-        lock = MemcacheLock("ImplicitUIDLock", calendar.resourceUID(), timeout=60.0)
+        lock = MemcacheFIFOLock("ImplicitUIDLock", calendar.resourceUID(), timeout=60.0)
 
         # Note that this lock also protects the request, as this request is
         # being re-used by potentially multiple transactions and should not be

Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/stdconfig.py	2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/stdconfig.py	2011-09-27 19:00:26 UTC (rev 8124)
@@ -563,11 +563,12 @@
         },
 
         "Options" : {
-            "AllowGroupAsOrganizer"      : False, # Allow groups to be Organizers
-            "AllowLocationAsOrganizer"   : False, # Allow locations to be Organizers
-            "AllowResourceAsOrganizer"   : False, # Allow resources to be Organizers
-            "LimitFreeBusyAttendees"     : 30,    # Maximum number of attendees to request freebusy for
-            "AttendeeRefreshInterval"    : 60,    # Time after an iTIP REPLY at which attendee refresh will trigger 
+            "AllowGroupAsOrganizer"          : False, # Allow groups to be Organizers
+            "AllowLocationAsOrganizer"       : False, # Allow locations to be Organizers
+            "AllowResourceAsOrganizer"       : False, # Allow resources to be Organizers
+            "LimitFreeBusyAttendees"         : 30,    # Maximum number of attendees to request freebusy for
+            "AttendeeRefreshIntervalSeconds" : 60,    # Time after an iTIP REPLY at which attendee refresh will trigger 
+            "AttendeeRefreshThreshold"       : 20,    # Number of attendees above which refresh delays are used 
         }
     },
 

Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/storebridge.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/storebridge.py	2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/storebridge.py	2011-09-27 19:00:26 UTC (rev 8124)
@@ -44,7 +44,8 @@
 from twistedcaldav.config import config
 from twistedcaldav.ical import Component as VCalendar, Property as VProperty,\
     InvalidICalendarDataError, iCalendarProductID
-from twistedcaldav.memcachelock import MemcacheLock, MemcacheLockTimeoutError
+from twistedcaldav.memcachelock import MemcacheLockTimeoutError
+from twistedcaldav.memcachefifolock import MemcacheFIFOLock
 from twistedcaldav.method.put_addressbook_common import StoreAddressObjectResource
 from twistedcaldav.method.put_common import StoreCalendarObjectResource
 from twistedcaldav.notifications import (
@@ -1874,7 +1875,7 @@
                 )
             )
             if do_implicit_action:
-                lock = MemcacheLock(
+                lock = MemcacheFIFOLock(
                     "ImplicitUIDLock", calendar.resourceUID(), timeout=60.0
                 )
 

Added: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/test/test_memcachefifolock.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/test/test_memcachefifolock.py	                        (rev 0)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/test/test_memcachefifolock.py	2011-09-27 19:00:26 UTC (rev 8124)
@@ -0,0 +1,249 @@
+##
+# Copyright (c) 2011 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twisted.internet.defer import inlineCallbacks, Deferred
+
+from twistedcaldav.memcachefifolock import MemcacheFIFOLock
+
+from twistedcaldav.test.util import TestCase
+import cPickle
+from twistedcaldav.memcachelock import MemcacheLockTimeoutError
+from twistedcaldav.memcacher import Memcacher
+from twisted.internet import reactor
+
+class MemCacheTestCase(TestCase):
+    """
+    Test client protocol class L{MemCacheProtocol}.
+    """
+
+    @inlineCallbacks
+    def test_one_lock(self):
+
+        lock = MemcacheFIFOLock("lock", "test")
+        lock.allowTestCache = True
+
+        yield lock.acquire()
+        self.assertTrue((yield lock.locked()))
+
+        yield lock.release()
+        self.assertFalse((yield lock.locked()))
+
+        self.assertEqual((yield lock.get(lock._ticket_token)), "2")
+        self.assertEqual((yield lock.get(lock._nextinline_token)), "0")
+        self.assertEqual(cPickle.loads((yield lock.get(lock._queue_token))), [])
+
+    @inlineCallbacks
+    def test_two_locks_with_timeout(self):
+
+        lock1 = MemcacheFIFOLock("lock", "test")
+        lock1.allowTestCache = True
+
+        lock2 = MemcacheFIFOLock("lock", "test", timeout=0.01)
+        lock2.allowTestCache = True
+        
+        lock1._memcacheProtocol = lock2._memcacheProtocol = Memcacher.memoryCacher()
+
+        yield lock1.acquire()
+        self.assertTrue((yield lock1.locked()))
+
+        try:
+            yield lock2.acquire()
+        except MemcacheLockTimeoutError:
+            pass
+        else:
+            self.fail("Did not timeout lock")
+
+        self.assertTrue((yield lock1.locked()))
+        self.assertFalse((yield lock2.locked()))
+
+        self.assertEqual((yield lock1.get(lock1._ticket_token)), "3")
+        self.assertEqual((yield lock1.get(lock1._nextinline_token)), "1")
+        self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [1,])
+
+        yield lock1.release()
+        self.assertFalse((yield lock1.locked()))
+
+        self.assertEqual((yield lock1.get(lock1._ticket_token)), "3")
+        self.assertEqual((yield lock1.get(lock1._nextinline_token)), "0")
+        self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [])
+
+    @inlineCallbacks
+    def test_two_locks_in_sequence(self):
+
+        lock1 = MemcacheFIFOLock("lock", "test")
+        lock1.allowTestCache = True
+
+        lock2 = MemcacheFIFOLock("lock", "test")
+        lock2.allowTestCache = True
+        
+        lock1._memcacheProtocol = lock2._memcacheProtocol = Memcacher.memoryCacher()
+
+        yield lock1.acquire()
+        self.assertTrue((yield lock1.locked()))
+        self.assertFalse((yield lock2.locked()))
+
+        yield lock1.release()
+        self.assertFalse((yield lock1.locked()))
+        self.assertFalse((yield lock2.locked()))
+
+        yield lock2.acquire()
+        self.assertFalse((yield lock1.locked()))
+        self.assertTrue((yield lock2.locked()))
+
+        yield lock2.release()
+        self.assertFalse((yield lock1.locked()))
+        self.assertFalse((yield lock2.locked()))
+
+        self.assertEqual((yield lock1.get(lock1._ticket_token)), "3")
+        self.assertEqual((yield lock1.get(lock1._nextinline_token)), "0")
+        self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [])
+
+    @inlineCallbacks
+    def test_two_locks_in_parallel(self):
+
+        lock1 = MemcacheFIFOLock("lock", "test", timeout=1.0)
+        lock1.allowTestCache = True
+
+        lock2 = MemcacheFIFOLock("lock", "test", timeout=1.0)
+        lock2.allowTestCache = True
+        
+        lock1._memcacheProtocol = lock2._memcacheProtocol = Memcacher.memoryCacher()
+
+        yield lock1.acquire()
+        self.assertTrue((yield lock1.locked()))
+        self.assertFalse((yield lock2.locked()))
+
+        @inlineCallbacks
+        def _release1():
+            self.assertTrue((yield lock1.locked()))
+            self.assertFalse((yield lock2.locked()))
+            yield lock1.release()
+            self.assertFalse((yield lock1.locked()))
+            self.assertFalse((yield lock2.locked()))
+        reactor.callLater(0.1, _release1)
+
+        yield lock2.acquire()
+        self.assertFalse((yield lock1.locked()))
+        self.assertTrue((yield lock2.locked()))
+
+        yield lock2.release()
+        self.assertFalse((yield lock1.locked()))
+        self.assertFalse((yield lock2.locked()))
+
+        self.assertEqual((yield lock1.get(lock1._ticket_token)), "3")
+        self.assertEqual((yield lock1.get(lock1._nextinline_token)), "0")
+        self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [])
+
+
+    @inlineCallbacks
+    def test_three_in_order(self):
+        """
+        This tests overlaps a lock on #1 with two attempts on #2 and #3. #3 has
+        a very much shorter polling interval than #2 so when #1 releases, #3 will
+        poll first. We want to make sure that #3 is not given the lock until after
+        #2 has polled and acquired/releases it. i.e., this tests the FIFO behavior.
+        """
+
+        lock1 = MemcacheFIFOLock("lock", "test", timeout=2.0)
+        lock1.allowTestCache = True
+
+        lock2 = MemcacheFIFOLock("lock", "test", timeout=2.0, retry_interval=0.5)
+        lock2.allowTestCache = True
+        
+        lock3 = MemcacheFIFOLock("lock", "test", timeout=2.0, retry_interval=0.01) # retry a lot faster than #2
+        lock3.allowTestCache = True
+        
+        lock1._memcacheProtocol = \
+        lock2._memcacheProtocol = \
+        lock3._memcacheProtocol = Memcacher.memoryCacher()
+
+        d = Deferred()
+
+        yield lock1.acquire()
+        self.assertTrue((yield lock1.locked()))
+        self.assertFalse((yield lock2.locked()))
+        self.assertFalse((yield lock3.locked()))
+
+        @inlineCallbacks
+        def _release1():
+            self.assertTrue((yield lock1.locked()))
+            self.assertFalse((yield lock2.locked()))
+            self.assertFalse((yield lock3.locked()))
+            yield lock1.release()
+            self.assertFalse((yield lock1.locked()))
+            self.assertFalse((yield lock2.locked()))
+            self.assertFalse((yield lock3.locked()))
+    
+            self.assertEqual((yield lock1.get(lock1._ticket_token)), "4")
+            self.assertEqual((yield lock1.get(lock1._nextinline_token)), "2")
+            self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [2, 3])
+        reactor.callLater(0.1, _release1)
+
+        @inlineCallbacks
+        def _acquire2():
+            self.assertTrue((yield lock1.locked()))
+            self.assertFalse((yield lock2.locked()))
+            self.assertFalse((yield lock3.locked()))
+            
+            @inlineCallbacks
+            def _release2():
+                self.assertFalse((yield lock1.locked()))
+                self.assertTrue((yield lock2.locked()))
+                self.assertFalse((yield lock3.locked()))
+                yield lock2.release()
+                self.assertFalse((yield lock1.locked()))
+                self.assertFalse((yield lock2.locked()))
+                self.assertFalse((yield lock3.locked()))
+
+            yield lock2.acquire()
+            reactor.callLater(0.1, _release2)
+            self.assertFalse((yield lock1.locked()))
+            self.assertTrue((yield lock2.locked()))
+            self.assertFalse((yield lock3.locked()))
+
+        reactor.callLater(0.01, _acquire2)
+
+        @inlineCallbacks
+        def _acquire3():
+            self.assertTrue((yield lock1.locked()))
+            self.assertFalse((yield lock2.locked()))
+            self.assertFalse((yield lock3.locked()))
+            
+            @inlineCallbacks
+            def _release3():
+                self.assertFalse((yield lock1.locked()))
+                self.assertFalse((yield lock2.locked()))
+                self.assertTrue((yield lock3.locked()))
+                yield lock3.release()
+                self.assertFalse((yield lock1.locked()))
+                self.assertFalse((yield lock2.locked()))
+                self.assertFalse((yield lock3.locked()))
+        
+                self.assertEqual((yield lock1.get(lock1._ticket_token)), "4")
+                self.assertEqual((yield lock1.get(lock1._nextinline_token)), "0")
+                self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [])
+                
+                d.callback(True)
+
+            yield lock3.acquire()
+            reactor.callLater(0.1, _release3)
+            self.assertFalse((yield lock1.locked()))
+            self.assertFalse((yield lock2.locked()))
+            self.assertTrue((yield lock3.locked()))
+
+        reactor.callLater(0.02, _acquire3)
+
+        yield d
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110927/5d0a3302/attachment-0001.html>


More information about the calendarserver-changes mailing list