[CalendarServer-changes] [8124] CalendarServer/branches/users/cdaboo/queued-attendee-refreshes
source_changes at macosforge.org
source_changes at macosforge.org
Tue Sep 27 12:00:26 PDT 2011
Revision: 8124
http://trac.macosforge.org/projects/calendarserver/changeset/8124
Author: cdaboo at apple.com
Date: 2011-09-27 12:00:26 -0700 (Tue, 27 Sep 2011)
Log Message:
-----------
Current snapshot - implemented a memcachefifolock that ensures requests for the same event get processed
in the order in which they come in.
Modified Paths:
--------------
CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/conf/caldavd-test.plist
CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/contrib/performance/loadtest/sim.py
CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/contrib/performance/stats.py
CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/ical.py
CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachepool.py
CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcacher.py
CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/method/put_common.py
CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/implicit.py
CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/processing.py
CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/stdconfig.py
CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/storebridge.py
Added Paths:
-----------
CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/contrib/performance/__init__.py
CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachefifolock.py
CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/test/test_memcachefifolock.py
Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/conf/caldavd-test.plist
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/conf/caldavd-test.plist 2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/conf/caldavd-test.plist 2011-09-27 19:00:26 UTC (rev 8124)
@@ -782,7 +782,7 @@
<false/>
<key>AllowResourceAsOrganizer</key>
<false/>
- <key>AttendeeRefreshInterval</key>
+ <key>AttendeeRefreshIntervalSeconds</key>
<integer>0</integer>
</dict>
Added: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/contrib/performance/__init__.py
===================================================================
Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/contrib/performance/loadtest/sim.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/contrib/performance/loadtest/sim.py 2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/contrib/performance/loadtest/sim.py 2011-09-27 19:00:26 UTC (rev 8124)
@@ -278,3 +278,7 @@
obs.report()
main = LoadSimulator.main
+
+if __name__ == '__main__':
+ main()
+
Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/contrib/performance/stats.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/contrib/performance/stats.py 2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/contrib/performance/stats.py 2011-09-27 19:00:26 UTC (rev 8124)
@@ -16,7 +16,7 @@
import random, time, datetime
-import pytz
+#import pytz
from zope.interface import Interface, implements
@@ -317,7 +317,7 @@
self._daysOfWeek = [self._weekdayNames.index(day) for day in daysOfWeek]
self._beginHour = beginHour
self._endHour = endHour
- self._tzinfo = pytz.timezone(tzname)
+ self._tzinfo = None #pytz.timezone(tzname)
self._helperDistribution = NormalDistribution(
# Mean 6 workdays in the future
60 * 60 * 8 * 6,
Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/ical.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/ical.py 2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/ical.py 2011-09-27 19:00:26 UTC (rev 8124)
@@ -1719,6 +1719,10 @@
yield attendee
+ def countAllUniqueAttendees(self, onlyScheduleAgentServer=True):
+ attendeesByInstance = self.getAttendeesByInstance(True, onlyScheduleAgentServer=onlyScheduleAgentServer)
+ return len(set([attendee for attendee, _ignore in attendeesByInstance]))
+
def getMaskUID(self):
"""
Get the X-CALENDARSEREVR-MASK-UID value. Works on either a VCALENDAR or on a component.
Added: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachefifolock.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachefifolock.py (rev 0)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachefifolock.py 2011-09-27 19:00:26 UTC (rev 8124)
@@ -0,0 +1,201 @@
+##
+# Copyright (c) 2008-2009 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twistedcaldav.memcacher import Memcacher
+from twisted.internet.defer import inlineCallbacks, Deferred, returnValue,\
+ succeed
+from twisted.internet import reactor
+import time
+from twistedcaldav.memcachelock import MemcacheLock, MemcacheLockTimeoutError
+import cPickle
+
+class MemcacheFIFOLock(Memcacher):
+ """
+ Implements a shared lock with a queue such that lock requests are honored in the order
+ in which they are received. This differs from MemcacheLock which does not honor ordering
+ of lock requests.
+
+ The implementation here uses two memcached entries. One represent a ticket counter - a
+ counter incremented and returned each time a lock is requested. The other is the
+ "next in line queue" - a sorted list of pending counters. To get a lock the caller
+ is given the next ticket value, then polls waiting for their number to come up as
+ "next in line". When it does they are given the lock, and upon releasing it their
+ "next in line" value is popped off the front of the queue, giving the next a chance
+ to get the lock.
+
+ One complication here: making sure that we don't get the queue stuck waiting on a
+ "next in line" that has gone away - so we need to be careful about maintaining the
+ next in line queue appropriately.
+
+ TODO: have a timeout for the next in line process to grab the lock. That way if that
+ process has dies, at least the others will eventually get a lock.
+
+ Note that this is really a temporary solution meant to address the need to order scheduling
+ requests in order to be "fair" to clients. Ultimately we want a persistent scheduling queue
+ implementation that would effectively manage the next in line work loads and would not depend
+ on the presence of any one working process.
+ """
+
+ def __init__(self, namespace, locktoken, timeout=60.0, retry_interval=0.01, expire_time=0):
+ """
+
+ @param namespace: a unique namespace for this lock's tokens
+ @type namespace: C{str}
+ @param locktoken: the name of the locktoken
+ @type locktoken: C{str}
+ @param timeout: the maximum time in seconds that the lock should block
+ @type timeout: C{float}
+ @param retry_interval: the interval to retry acquiring the lock
+ @type retry_interval: C{float}
+ @param expiryTime: the time in seconds for the lock to expire. Zero: no expiration.
+ @type expiryTime: C{float}
+ """
+
+ super(MemcacheFIFOLock, self).__init__(namespace)
+ self._locktoken = locktoken
+ self._ticket_token = "%s-ticket" % (self._locktoken,)
+ self._nextinline_token = "%s-next" % (self._locktoken,)
+ self._queue_token = "%s-queue" % (self._locktoken,)
+ self._timeout = timeout
+ self._retry_interval = retry_interval
+ self._expire_time = expire_time
+ self._hasLock = False
+ self._ticket = None
+
+ def _getMemcacheProtocol(self):
+
+ result = super(MemcacheFIFOLock, self)._getMemcacheProtocol()
+
+ if isinstance(result, Memcacher.nullCacher):
+ raise AssertionError("No implementation of shared locking without memcached")
+
+ return result
+
+ @inlineCallbacks
+ def acquire(self):
+
+ assert not self._hasLock, "Lock already acquired."
+
+ # First make sure the ticket and queue keys exist
+ yield self.add(self._ticket_token, "1", self._expire_time)
+ yield self.add(self._nextinline_token, "0", self._expire_time)
+
+ # Get the next ticket
+ self._ticket = (yield self.incr(self._ticket_token)) - 1
+
+ # Add ourselves to the pending queue
+ yield self._addToQueue()
+
+ timeout_at = time.time() + self._timeout
+ waiting = False
+ while True:
+
+ # Check next in line value
+ result = int((yield self.get(self._nextinline_token)))
+ if result == self._ticket:
+ self._hasLock = True
+ if waiting:
+ self.log_debug("Got lock after waiting on %s" % (self._locktoken,))
+ break
+
+ if self._timeout and time.time() < timeout_at:
+ waiting = True
+ self.log_debug("Waiting for lock on %s" % (self._locktoken,))
+ pause = Deferred()
+ def _timedDeferred():
+ pause.callback(True)
+ reactor.callLater(self._retry_interval, _timedDeferred)
+ yield pause
+ else:
+ # Must remove our active ticket value otherwise the next in line will stall on
+ # this lock which will never happen
+ yield self._removeFromQueue()
+
+ self.log_debug("Timed out lock after waiting on %s" % (self._locktoken,))
+ raise MemcacheLockTimeoutError()
+
+ returnValue(True)
+
+ @inlineCallbacks
+ def release(self):
+
+ assert self._hasLock, "Lock not acquired."
+ self._hasLock = False
+
+ # Remove active ticket value - this will bump the next in line value
+ yield self._removeFromQueue()
+
+
+ def clean(self):
+
+ if self._hasLock:
+ return self.release()
+ else:
+ return succeed(True)
+
+ def locked(self):
+ """
+ Test if the lock is currently being held.
+ """
+
+ return succeed(self._hasLock)
+
+ @inlineCallbacks
+ def _addToQueue(self):
+ """
+ Add our ticket to the queue. If it is now the first pending ticket, set the next in line
+ value to that.
+ """
+
+ # We need a shared lock to protect access to the queue
+ lock = MemcacheLock(self._namespace, self._locktoken)
+ yield lock.acquire()
+
+ try:
+ queued_value = (yield self.get(self._queue_token))
+ queued_items = cPickle.loads(queued_value) if queued_value is not None else []
+ queued_items.append(self._ticket)
+ queued_items.sort()
+ if len(queued_items) == 1:
+ yield self.set(self._nextinline_token, str(queued_items[0]))
+ yield self.set(self._queue_token, cPickle.dumps(queued_items))
+ finally:
+ yield lock.release()
+
+ @inlineCallbacks
+ def _removeFromQueue(self):
+ """
+ Remove our ticket from the queue. If it was the first next in line value, then bump
+ next in line to the new head of the queue value or reset it if the queue is empty.
+ """
+
+ # We need a shared lock to protect access to the queue
+ lock = MemcacheLock(self._namespace, self._locktoken)
+ yield lock.acquire()
+
+ try:
+ queued_value = (yield self.get(self._queue_token))
+ queued_items = cPickle.loads(queued_value) if queued_value is not None else []
+
+ if queued_items[0] == self._ticket:
+ del queued_items[0]
+ yield self.set(self._nextinline_token, str(queued_items[0] if queued_items else 0))
+ else:
+ queued_items.remove(self._ticket)
+ queued_items.sort()
+ yield self.set(self._queue_token, cPickle.dumps(queued_items))
+ finally:
+ yield lock.release()
Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachepool.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachepool.py 2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcachepool.py 2011-09-27 19:00:26 UTC (rev 8124)
@@ -360,6 +360,12 @@
def add(self, *args, **kwargs):
return self.performRequest('add', *args, **kwargs)
+ def incr(self, *args, **kwargs):
+ return self.performRequest('increment', *args, **kwargs)
+
+ def decr(self, *args, **kwargs):
+ return self.performRequest('decrement', *args, **kwargs)
+
def flush_all(self, *args, **kwargs):
return self.performRequest('flush_all', *args, **kwargs)
Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcacher.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcacher.py 2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/memcacher.py 2011-09-27 19:00:26 UTC (rev 8124)
@@ -70,6 +70,32 @@
except KeyError:
return succeed(False)
+ def incr(self, key, delta=1):
+ value = self._cache.get(key, None)
+ if value is not None:
+ try:
+ value = int(value)
+ except ValueError:
+ value = None
+ else:
+ value += delta
+ self._cache[key] = str(value)
+ return succeed(value)
+
+ def decr(self, key, delta=1):
+ value = self._cache.get(key, None)
+ if value is not None:
+ try:
+ value = int(value)
+ except ValueError:
+ value = None
+ else:
+ value -= delta
+ if value < 0:
+ value = 0
+ self._cache[key] = str(value)
+ return succeed(value)
+
def flush_all(self):
self._cache = {}
return succeed(True)
@@ -97,6 +123,12 @@
def delete(self, key):
return succeed(True)
+ def incr(self, key, delta=1):
+ return succeed(None)
+
+ def decr(self, key, delta=1):
+ return succeed(None)
+
def flush_all(self):
return succeed(True)
@@ -195,6 +227,14 @@
self.log_debug("Deleting Cache Token for %r" % (key,))
return self._getMemcacheProtocol().delete('%s:%s' % (self._namespace, self._normalizeKey(key)))
+ def incr(self, key, delta=1):
+ self.log_debug("Incrementing Cache Token for %r" % (key,))
+ return self._getMemcacheProtocol().incr('%s:%s' % (self._namespace, self._normalizeKey(key)), delta)
+
+ def decr(self, key, delta=1):
+ self.log_debug("Decrementing Cache Token for %r" % (key,))
+ return self._getMemcacheProtocol().incr('%s:%s' % (self._namespace, self._normalizeKey(key)), delta)
+
def flush_all(self):
self.log_debug("Flushing All Cache Tokens")
return self._getMemcacheProtocol().flush_all()
Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/method/put_common.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/method/put_common.py 2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/method/put_common.py 2011-09-27 19:00:26 UTC (rev 8124)
@@ -56,7 +56,8 @@
from twistedcaldav.ical import Component, Property
from twistedcaldav.instance import TooManyInstancesError,\
InvalidOverriddenInstanceError
-from twistedcaldav.memcachelock import MemcacheLock, MemcacheLockTimeoutError
+from twistedcaldav.memcachelock import MemcacheLockTimeoutError
+from twistedcaldav.memcachefifolock import MemcacheFIFOLock
from twistedcaldav.scheduling.implicit import ImplicitScheduler
log = Logger()
@@ -69,7 +70,7 @@
if internal_request:
self.lock = None
else:
- self.lock = MemcacheLock("ImplicitUIDLock", uid, timeout=60.0)
+ self.lock = MemcacheFIFOLock("ImplicitUIDLock", uid, timeout=60.0, retry_interval=0.01)
self.reserved = False
self.index = index
self.uid = uid
@@ -88,7 +89,7 @@
# Lets use a deferred for this and loop a few times if we cannot reserve so that we give
# time to whoever has the reservation to finish and release it.
failure_count = 0
- while(failure_count < 10):
+ while(failure_count < 100):
try:
yield self.index.reserveUID(self.uid)
self.reserved = True
@@ -100,7 +101,7 @@
pause = Deferred()
def _timedDeferred():
pause.callback(True)
- reactor.callLater(0.5, _timedDeferred)
+ reactor.callLater(0.05, _timedDeferred)
yield pause
if self.uri and not self.reserved:
@@ -938,7 +939,7 @@
# All auto-processed updates for an Organizer leave the tag unchanged
change_scheduletag = False
elif self.processing_organizer == False:
- # Auto-processed updates that are the result of an organizer "refresh' due
+ # Auto-processed updates that are the result of an organizer "refresh" due
# to another Attendee's REPLY should leave the tag unchanged
change_scheduletag = not hasattr(self.request, "doing_attendee_refresh")
Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/implicit.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/implicit.py 2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/implicit.py 2011-09-27 19:00:26 UTC (rev 8124)
@@ -242,20 +242,14 @@
returnValue(self.return_calendar if hasattr(self, "return_calendar") else self.calendar)
@inlineCallbacks
- def refreshAllAttendeesExceptSome(self, request, resource, calendar, attendees):
+ def refreshAllAttendeesExceptSome(self, request, resource, attendees):
"""
-
- @param request:
- @type request:
- @param attendee:
- @type attendee:
- @param calendar:
- @type calendar:
+ Refresh the iCalendar data for all attendees except the one specified in attendees.
"""
self.request = request
self.resource = resource
- self.calendar = calendar
+ self.calendar = (yield self.resource.iCalendarForUser(self.request))
self.state = "organizer"
self.action = "modify"
Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/processing.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/processing.py 2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/scheduling/processing.py 2011-09-27 19:00:26 UTC (rev 8124)
@@ -38,6 +38,7 @@
from pycalendar.duration import PyCalendarDuration
from pycalendar.datetime import PyCalendarDateTime
from pycalendar.timezone import PyCalendarTimezone
+from twistedcaldav.memcachefifolock import MemcacheFIFOLock
__all__ = [
"ImplicitProcessor",
@@ -231,9 +232,9 @@
# Use a memcachelock to ensure others don't refresh whilst we have an enqueued call
self.uid = self.recipient_calendar.resourceUID()
- if config.Scheduling.Options.AttendeeRefreshInterval:
+ if config.Scheduling.Options.AttendeeRefreshIntervalSeconds and self.recipient_calendar.countAllUniqueAttendees() > config.Scheduling.Options.AttendeeRefreshThreshold:
attendees = ()
- lock = MemcacheLock("RefreshUIDLock", self.uid, timeout=0.0, expire_time=config.Scheduling.Options.AttendeeRefreshInterval)
+ lock = MemcacheLock("RefreshUIDLock", self.uid, timeout=0.0, expire_time=config.Scheduling.Options.AttendeeRefreshIntervalSeconds)
# Try lock, but fail immediately if already taken
try:
@@ -248,7 +249,7 @@
log.debug("ImplicitProcessing - refreshing UID: '%s'" % (self.uid,))
from twistedcaldav.scheduling.implicit import ImplicitScheduler
scheduler = ImplicitScheduler()
- yield scheduler.refreshAllAttendeesExceptSome(self.request, organizer_resource, self.recipient_calendar, attendees)
+ yield scheduler.refreshAllAttendeesExceptSome(self.request, organizer_resource, attendees)
@inlineCallbacks
def _doDelayedRefresh():
@@ -256,7 +257,7 @@
# We need to get the UID lock for implicit processing whilst we send the auto-reply
# as the Organizer processing will attempt to write out data to other attendees to
# refresh them. To prevent a race we need a lock.
- uidlock = MemcacheLock("ImplicitUIDLock", self.uid, timeout=60.0)
+ uidlock = MemcacheFIFOLock("ImplicitUIDLock", self.uid, timeout=60.0)
try:
yield uidlock.acquire()
@@ -288,7 +289,7 @@
yield uidlock.clean()
if lock:
- reactor.callLater(config.Scheduling.Options.AttendeeRefreshInterval, _doDelayedRefresh)
+ reactor.callLater(config.Scheduling.Options.AttendeeRefreshIntervalSeconds, _doDelayedRefresh)
else:
yield _doRefresh(resource)
@@ -507,7 +508,7 @@
# We need to get the UID lock for implicit processing whilst we send the auto-reply
# as the Organizer processing will attempt to write out data to other attendees to
# refresh them. To prevent a race we need a lock.
- lock = MemcacheLock("ImplicitUIDLock", calendar.resourceUID(), timeout=60.0)
+ lock = MemcacheFIFOLock("ImplicitUIDLock", calendar.resourceUID(), timeout=60.0)
# Note that this lock also protects the request, as this request is
# being re-used by potentially multiple transactions and should not be
Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/stdconfig.py 2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/stdconfig.py 2011-09-27 19:00:26 UTC (rev 8124)
@@ -563,11 +563,12 @@
},
"Options" : {
- "AllowGroupAsOrganizer" : False, # Allow groups to be Organizers
- "AllowLocationAsOrganizer" : False, # Allow locations to be Organizers
- "AllowResourceAsOrganizer" : False, # Allow resources to be Organizers
- "LimitFreeBusyAttendees" : 30, # Maximum number of attendees to request freebusy for
- "AttendeeRefreshInterval" : 60, # Time after an iTIP REPLY at which attendee refresh will trigger
+ "AllowGroupAsOrganizer" : False, # Allow groups to be Organizers
+ "AllowLocationAsOrganizer" : False, # Allow locations to be Organizers
+ "AllowResourceAsOrganizer" : False, # Allow resources to be Organizers
+ "LimitFreeBusyAttendees" : 30, # Maximum number of attendees to request freebusy for
+ "AttendeeRefreshIntervalSeconds" : 60, # Time after an iTIP REPLY at which attendee refresh will trigger
+ "AttendeeRefreshThreshold" : 20, # Number of attendees above which refresh delays are used
}
},
Modified: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/storebridge.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/storebridge.py 2011-09-27 05:54:33 UTC (rev 8123)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/storebridge.py 2011-09-27 19:00:26 UTC (rev 8124)
@@ -44,7 +44,8 @@
from twistedcaldav.config import config
from twistedcaldav.ical import Component as VCalendar, Property as VProperty,\
InvalidICalendarDataError, iCalendarProductID
-from twistedcaldav.memcachelock import MemcacheLock, MemcacheLockTimeoutError
+from twistedcaldav.memcachelock import MemcacheLockTimeoutError
+from twistedcaldav.memcachefifolock import MemcacheFIFOLock
from twistedcaldav.method.put_addressbook_common import StoreAddressObjectResource
from twistedcaldav.method.put_common import StoreCalendarObjectResource
from twistedcaldav.notifications import (
@@ -1874,7 +1875,7 @@
)
)
if do_implicit_action:
- lock = MemcacheLock(
+ lock = MemcacheFIFOLock(
"ImplicitUIDLock", calendar.resourceUID(), timeout=60.0
)
Added: CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/test/test_memcachefifolock.py
===================================================================
--- CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/test/test_memcachefifolock.py (rev 0)
+++ CalendarServer/branches/users/cdaboo/queued-attendee-refreshes/twistedcaldav/test/test_memcachefifolock.py 2011-09-27 19:00:26 UTC (rev 8124)
@@ -0,0 +1,249 @@
+##
+# Copyright (c) 2011 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twisted.internet.defer import inlineCallbacks, Deferred
+
+from twistedcaldav.memcachefifolock import MemcacheFIFOLock
+
+from twistedcaldav.test.util import TestCase
+import cPickle
+from twistedcaldav.memcachelock import MemcacheLockTimeoutError
+from twistedcaldav.memcacher import Memcacher
+from twisted.internet import reactor
+
+class MemCacheTestCase(TestCase):
+ """
+ Test client protocol class L{MemCacheProtocol}.
+ """
+
+ @inlineCallbacks
+ def test_one_lock(self):
+
+ lock = MemcacheFIFOLock("lock", "test")
+ lock.allowTestCache = True
+
+ yield lock.acquire()
+ self.assertTrue((yield lock.locked()))
+
+ yield lock.release()
+ self.assertFalse((yield lock.locked()))
+
+ self.assertEqual((yield lock.get(lock._ticket_token)), "2")
+ self.assertEqual((yield lock.get(lock._nextinline_token)), "0")
+ self.assertEqual(cPickle.loads((yield lock.get(lock._queue_token))), [])
+
+ @inlineCallbacks
+ def test_two_locks_with_timeout(self):
+
+ lock1 = MemcacheFIFOLock("lock", "test")
+ lock1.allowTestCache = True
+
+ lock2 = MemcacheFIFOLock("lock", "test", timeout=0.01)
+ lock2.allowTestCache = True
+
+ lock1._memcacheProtocol = lock2._memcacheProtocol = Memcacher.memoryCacher()
+
+ yield lock1.acquire()
+ self.assertTrue((yield lock1.locked()))
+
+ try:
+ yield lock2.acquire()
+ except MemcacheLockTimeoutError:
+ pass
+ else:
+ self.fail("Did not timeout lock")
+
+ self.assertTrue((yield lock1.locked()))
+ self.assertFalse((yield lock2.locked()))
+
+ self.assertEqual((yield lock1.get(lock1._ticket_token)), "3")
+ self.assertEqual((yield lock1.get(lock1._nextinline_token)), "1")
+ self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [1,])
+
+ yield lock1.release()
+ self.assertFalse((yield lock1.locked()))
+
+ self.assertEqual((yield lock1.get(lock1._ticket_token)), "3")
+ self.assertEqual((yield lock1.get(lock1._nextinline_token)), "0")
+ self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [])
+
+ @inlineCallbacks
+ def test_two_locks_in_sequence(self):
+
+ lock1 = MemcacheFIFOLock("lock", "test")
+ lock1.allowTestCache = True
+
+ lock2 = MemcacheFIFOLock("lock", "test")
+ lock2.allowTestCache = True
+
+ lock1._memcacheProtocol = lock2._memcacheProtocol = Memcacher.memoryCacher()
+
+ yield lock1.acquire()
+ self.assertTrue((yield lock1.locked()))
+ self.assertFalse((yield lock2.locked()))
+
+ yield lock1.release()
+ self.assertFalse((yield lock1.locked()))
+ self.assertFalse((yield lock2.locked()))
+
+ yield lock2.acquire()
+ self.assertFalse((yield lock1.locked()))
+ self.assertTrue((yield lock2.locked()))
+
+ yield lock2.release()
+ self.assertFalse((yield lock1.locked()))
+ self.assertFalse((yield lock2.locked()))
+
+ self.assertEqual((yield lock1.get(lock1._ticket_token)), "3")
+ self.assertEqual((yield lock1.get(lock1._nextinline_token)), "0")
+ self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [])
+
+ @inlineCallbacks
+ def test_two_locks_in_parallel(self):
+
+ lock1 = MemcacheFIFOLock("lock", "test", timeout=1.0)
+ lock1.allowTestCache = True
+
+ lock2 = MemcacheFIFOLock("lock", "test", timeout=1.0)
+ lock2.allowTestCache = True
+
+ lock1._memcacheProtocol = lock2._memcacheProtocol = Memcacher.memoryCacher()
+
+ yield lock1.acquire()
+ self.assertTrue((yield lock1.locked()))
+ self.assertFalse((yield lock2.locked()))
+
+ @inlineCallbacks
+ def _release1():
+ self.assertTrue((yield lock1.locked()))
+ self.assertFalse((yield lock2.locked()))
+ yield lock1.release()
+ self.assertFalse((yield lock1.locked()))
+ self.assertFalse((yield lock2.locked()))
+ reactor.callLater(0.1, _release1)
+
+ yield lock2.acquire()
+ self.assertFalse((yield lock1.locked()))
+ self.assertTrue((yield lock2.locked()))
+
+ yield lock2.release()
+ self.assertFalse((yield lock1.locked()))
+ self.assertFalse((yield lock2.locked()))
+
+ self.assertEqual((yield lock1.get(lock1._ticket_token)), "3")
+ self.assertEqual((yield lock1.get(lock1._nextinline_token)), "0")
+ self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [])
+
+
+ @inlineCallbacks
+ def test_three_in_order(self):
+ """
+ This tests overlaps a lock on #1 with two attempts on #2 and #3. #3 has
+ a very much shorter polling interval than #2 so when #1 releases, #3 will
+ poll first. We want to make sure that #3 is not given the lock until after
+ #2 has polled and acquired/releases it. i.e., this tests the FIFO behavior.
+ """
+
+ lock1 = MemcacheFIFOLock("lock", "test", timeout=2.0)
+ lock1.allowTestCache = True
+
+ lock2 = MemcacheFIFOLock("lock", "test", timeout=2.0, retry_interval=0.5)
+ lock2.allowTestCache = True
+
+ lock3 = MemcacheFIFOLock("lock", "test", timeout=2.0, retry_interval=0.01) # retry a lot faster than #2
+ lock3.allowTestCache = True
+
+ lock1._memcacheProtocol = \
+ lock2._memcacheProtocol = \
+ lock3._memcacheProtocol = Memcacher.memoryCacher()
+
+ d = Deferred()
+
+ yield lock1.acquire()
+ self.assertTrue((yield lock1.locked()))
+ self.assertFalse((yield lock2.locked()))
+ self.assertFalse((yield lock3.locked()))
+
+ @inlineCallbacks
+ def _release1():
+ self.assertTrue((yield lock1.locked()))
+ self.assertFalse((yield lock2.locked()))
+ self.assertFalse((yield lock3.locked()))
+ yield lock1.release()
+ self.assertFalse((yield lock1.locked()))
+ self.assertFalse((yield lock2.locked()))
+ self.assertFalse((yield lock3.locked()))
+
+ self.assertEqual((yield lock1.get(lock1._ticket_token)), "4")
+ self.assertEqual((yield lock1.get(lock1._nextinline_token)), "2")
+ self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [2, 3])
+ reactor.callLater(0.1, _release1)
+
+ @inlineCallbacks
+ def _acquire2():
+ self.assertTrue((yield lock1.locked()))
+ self.assertFalse((yield lock2.locked()))
+ self.assertFalse((yield lock3.locked()))
+
+ @inlineCallbacks
+ def _release2():
+ self.assertFalse((yield lock1.locked()))
+ self.assertTrue((yield lock2.locked()))
+ self.assertFalse((yield lock3.locked()))
+ yield lock2.release()
+ self.assertFalse((yield lock1.locked()))
+ self.assertFalse((yield lock2.locked()))
+ self.assertFalse((yield lock3.locked()))
+
+ yield lock2.acquire()
+ reactor.callLater(0.1, _release2)
+ self.assertFalse((yield lock1.locked()))
+ self.assertTrue((yield lock2.locked()))
+ self.assertFalse((yield lock3.locked()))
+
+ reactor.callLater(0.01, _acquire2)
+
+ @inlineCallbacks
+ def _acquire3():
+ self.assertTrue((yield lock1.locked()))
+ self.assertFalse((yield lock2.locked()))
+ self.assertFalse((yield lock3.locked()))
+
+ @inlineCallbacks
+ def _release3():
+ self.assertFalse((yield lock1.locked()))
+ self.assertFalse((yield lock2.locked()))
+ self.assertTrue((yield lock3.locked()))
+ yield lock3.release()
+ self.assertFalse((yield lock1.locked()))
+ self.assertFalse((yield lock2.locked()))
+ self.assertFalse((yield lock3.locked()))
+
+ self.assertEqual((yield lock1.get(lock1._ticket_token)), "4")
+ self.assertEqual((yield lock1.get(lock1._nextinline_token)), "0")
+ self.assertEqual(cPickle.loads((yield lock1.get(lock1._queue_token))), [])
+
+ d.callback(True)
+
+ yield lock3.acquire()
+ reactor.callLater(0.1, _release3)
+ self.assertFalse((yield lock1.locked()))
+ self.assertFalse((yield lock2.locked()))
+ self.assertTrue((yield lock3.locked()))
+
+ reactor.callLater(0.02, _acquire3)
+
+ yield d
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20110927/5d0a3302/attachment-0001.html>
More information about the calendarserver-changes
mailing list