[CalendarServer-changes] [8296] CalendarServer/trunk
source_changes at macosforge.org
source_changes at macosforge.org
Wed Nov 16 11:31:38 PST 2011
Revision: 8296
http://trac.macosforge.org/projects/calendarserver/changeset/8296
Author: cdaboo at apple.com
Date: 2011-11-16 11:31:38 -0800 (Wed, 16 Nov 2011)
Log Message:
-----------
Batched attendee refreshes.
Modified Paths:
--------------
CalendarServer/trunk/conf/caldavd-test.plist
CalendarServer/trunk/twistedcaldav/ical.py
CalendarServer/trunk/twistedcaldav/scheduling/implicit.py
CalendarServer/trunk/twistedcaldav/scheduling/processing.py
CalendarServer/trunk/twistedcaldav/scheduling/test/test_implicit.py
CalendarServer/trunk/twistedcaldav/stdconfig.py
Modified: CalendarServer/trunk/conf/caldavd-test.plist
===================================================================
--- CalendarServer/trunk/conf/caldavd-test.plist 2011-11-16 18:58:15 UTC (rev 8295)
+++ CalendarServer/trunk/conf/caldavd-test.plist 2011-11-16 19:31:38 UTC (rev 8296)
@@ -811,7 +811,7 @@
<false/>
<key>AllowResourceAsOrganizer</key>
<false/>
- <key>AttendeeRefreshIntervalSeconds</key>
+ <key>AttendeeRefreshBatch</key>
<integer>0</integer>
</dict>
Modified: CalendarServer/trunk/twistedcaldav/ical.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/ical.py 2011-11-16 18:58:15 UTC (rev 8295)
+++ CalendarServer/trunk/twistedcaldav/ical.py 2011-11-16 19:31:38 UTC (rev 8296)
@@ -1729,10 +1729,13 @@
yield attendee
- def countAllUniqueAttendees(self, onlyScheduleAgentServer=True):
+ def getAllUniqueAttendees(self, onlyScheduleAgentServer=True):
attendeesByInstance = self.getAttendeesByInstance(True, onlyScheduleAgentServer=onlyScheduleAgentServer)
- return len(set([attendee for attendee, _ignore in attendeesByInstance]))
-
+ attendees = set()
+ for attendee, _ignore in attendeesByInstance:
+ attendees.add(attendee)
+ return attendees
+
def getMaskUID(self):
"""
Get the X-CALENDARSEREVR-MASK-UID value. Works on either a VCALENDAR or on a component.
Modified: CalendarServer/trunk/twistedcaldav/scheduling/implicit.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/scheduling/implicit.py 2011-11-16 18:58:15 UTC (rev 8295)
+++ CalendarServer/trunk/twistedcaldav/scheduling/implicit.py 2011-11-16 19:31:38 UTC (rev 8296)
@@ -247,6 +247,7 @@
# Setup some parameters
self.do_smart_merge = do_smart_merge
self.except_attendees = ()
+ self.only_refresh_attendees = None
# Determine what type of scheduling this is: Organizer triggered or Attendee triggered
if self.state == "organizer":
@@ -264,7 +265,7 @@
returnValue(self.return_calendar if hasattr(self, "return_calendar") else self.calendar)
@inlineCallbacks
- def refreshAllAttendeesExceptSome(self, request, resource, attendees):
+ def refreshAllAttendeesExceptSome(self, request, resource, except_attendees=(), only_attendees=None):
"""
Refresh the iCalendar data for all attendees except the one specified in attendees.
"""
@@ -277,7 +278,8 @@
self.calendar_owner = None
self.internal_request = True
- self.except_attendees = attendees
+ self.except_attendees = except_attendees
+ self.only_refresh_attendees = only_attendees
self.changed_rids = None
self.reinvites = None
@@ -457,6 +459,12 @@
outbox = (yield self.request.locateResource(outboxURL))
yield outbox.authorize(self.request, (caldavxml.ScheduleSend(),))
+ def makeScheduler(self):
+ """
+ Convenience method which we can override in unit tests to make testing easier.
+ """
+ return CalDAVScheduler(self.request, self.resource)
+
@inlineCallbacks
def doImplicitOrganizer(self):
@@ -829,7 +837,7 @@
# Send scheduling message
# This is a local CALDAV scheduling operation.
- scheduler = CalDAVScheduler(self.request, self.resource)
+ scheduler = self.makeScheduler()
# Do the PUT processing
log.info("Implicit CANCEL - organizer: '%s' to attendee: '%s', UID: '%s', RIDs: '%s'" % (self.organizer, attendee, self.uid, rids))
@@ -858,6 +866,10 @@
if attendee in self.except_attendees:
continue
+ # Only send to specified attendees
+ if self.only_refresh_attendees is not None and attendee not in self.only_refresh_attendees:
+ continue
+
# If SCHEDULE-FORCE-SEND only change, only send message to those Attendees
if self.reinvites and attendee in self.reinvites:
continue
@@ -867,7 +879,7 @@
# Send scheduling message
if itipmsg is not None:
# This is a local CALDAV scheduling operation.
- scheduler = CalDAVScheduler(self.request, self.resource)
+ scheduler = self.makeScheduler()
# Do the PUT processing
log.info("Implicit REQUEST - organizer: '%s' to attendee: '%s', UID: '%s'" % (self.organizer, attendee, self.uid,))
@@ -1117,7 +1129,7 @@
# Send scheduling message
# This is a local CALDAV scheduling operation.
- scheduler = CalDAVScheduler(self.request, self.resource)
+ scheduler = self.makeScheduler()
# Do the PUT processing
def _gotResponse(response):
Modified: CalendarServer/trunk/twistedcaldav/scheduling/processing.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/scheduling/processing.py 2011-11-16 18:58:15 UTC (rev 8295)
+++ CalendarServer/trunk/twistedcaldav/scheduling/processing.py 2011-11-16 19:31:38 UTC (rev 8296)
@@ -35,6 +35,7 @@
from twistedcaldav.scheduling.itip import iTipProcessing, iTIPRequestStatus
from twistedcaldav.scheduling.utils import getCalendarObjectForPrincipals
from twistedcaldav.memcachelock import MemcacheLock, MemcacheLockTimeoutError
+from twistedcaldav.memcacher import Memcacher
from pycalendar.duration import PyCalendarDuration
from pycalendar.datetime import PyCalendarDateTime
from pycalendar.timezone import PyCalendarTimezone
@@ -173,7 +174,7 @@
# Update the organizer's copy of the event
log.debug("ImplicitProcessing - originator '%s' to recipient '%s' processing METHOD:REPLY, UID: '%s' - updating event" % (self.originator.cuaddr, self.recipient.cuaddr, self.uid))
- recipient_calendar_resource = (yield self.writeCalendarResource(self.recipient_calendar_collection_uri, self.recipient_calendar_collection, self.recipient_calendar_name, self.recipient_calendar))
+ self.organizer_calendar_resource = (yield self.writeCalendarResource(self.recipient_calendar_collection_uri, self.recipient_calendar_collection, self.recipient_calendar_name, self.recipient_calendar))
# Build the schedule-changes XML element
attendeeReplying, rids = processed
@@ -204,7 +205,7 @@
# Only update other attendees when the partstat was changed by the reply
if partstatChanged:
- yield self.queueAttendeeUpdate(recipient_calendar_resource, (attendeeReplying,))
+ yield self.queueAttendeeUpdate((attendeeReplying,))
result = (True, False, True, changes,)
@@ -215,9 +216,12 @@
returnValue(result)
@inlineCallbacks
- def queueAttendeeUpdate(self, resource, attendees):
+ def queueAttendeeUpdate(self, exclude_attendees):
"""
Queue up an update to attendees and use a memcache lock to ensure we don't update too frequently.
+
+ @param exclude_attendees: list of attendees who should not be refreshed (e.g., the one that triggeed the refresh)
+ @type exclude_attendees: C{list}
"""
# When doing auto-processing of replies, only refresh attendees when the last auto-accept is done.
@@ -227,72 +231,159 @@
self.request.auto_reply_suppressed = True
returnValue(None)
if hasattr(self.request, "auto_reply_suppressed"):
- attendees = ()
+ exclude_attendees = ()
- # Use a memcachelock to ensure others don't refresh whilst we have an enqueued call
self.uid = self.recipient_calendar.resourceUID()
- if config.Scheduling.Options.AttendeeRefreshIntervalSeconds and self.recipient_calendar.countAllUniqueAttendees() > config.Scheduling.Options.AttendeeRefreshThreshold:
- attendees = ()
- lock = MemcacheLock("RefreshUIDLock", self.uid, timeout=0.0, expire_time=config.Scheduling.Options.AttendeeRefreshIntervalSeconds)
+
+ # Check for batched refreshes
+ if config.Scheduling.Options.AttendeeRefreshBatch:
- # Try lock, but fail immediately if already taken
+ # Need to lock whilst manipulating the batch list
+ lock = MemcacheLock("BatchRefreshUIDLock", self.uid, timeout=60.0, expire_time=60.0)
try:
yield lock.acquire()
except MemcacheLockTimeoutError:
+ # If we could not lock then just fail the refresh - not sure what else to do
returnValue(None)
+
+ try:
+ # Get all attendees to refresh
+ allAttendees = sorted(list(self.recipient_calendar.getAllUniqueAttendees()))
+
+ # Always need to refresh every attendee
+ exclude_attendees = ()
+
+ # See if there is already a pending refresh and merge current attendees into that list,
+ # otherwise just mark all attendees as pending
+ cache = Memcacher("BatchRefreshAttendees", pickle=True)
+ pendingAttendees = yield cache.get(self.uid)
+ firstTime = False
+ if pendingAttendees:
+ for attendee in allAttendees:
+ if attendee not in pendingAttendees:
+ pendingAttendees.append(attendee)
+ else:
+ firstTime = True
+ pendingAttendees = allAttendees
+ yield cache.set(self.uid, pendingAttendees)
+
+ # Now start the first batch off
+ if firstTime:
+ reactor.callLater(config.Scheduling.Options.AttendeeRefreshBatchDelaySeconds, self._doBatchRefresh)
+ finally:
+ yield lock.clean()
+
else:
- lock = None
+ yield self._doRefresh(self.organizer_calendar_resource, exclude_attendees)
- @inlineCallbacks
- def _doRefresh(organizer_resource):
- log.debug("ImplicitProcessing - refreshing UID: '%s'" % (self.uid,))
- from twistedcaldav.scheduling.implicit import ImplicitScheduler
- scheduler = ImplicitScheduler()
- yield scheduler.refreshAllAttendeesExceptSome(self.request, organizer_resource, attendees)
+ @inlineCallbacks
+ def _doRefresh(self, organizer_resource, exclude_attendees=(), only_attendees=None):
+ """
+ Do a refresh of attendees.
- @inlineCallbacks
- def _doDelayedRefresh():
+ @param organizer_resource: the resource for the organizer's calendar data
+ @type organizer_resource: L{DAVResource}
+ @param exclude_attendees: list of attendees to not refresh
+ @type exclude_attendees: C{tuple}
+ @param only_attendees: list of attendees to refresh (C{None} - refresh all)
+ @type only_attendees: C{tuple}
+ """
+ log.debug("ImplicitProcessing - refreshing UID: '%s', Attendees: %s" % (self.uid, ", ".join(only_attendees) if only_attendees else "all"))
+ from twistedcaldav.scheduling.implicit import ImplicitScheduler
+ scheduler = ImplicitScheduler()
+ yield scheduler.refreshAllAttendeesExceptSome(
+ self.request,
+ organizer_resource,
+ exclude_attendees,
+ only_attendees=only_attendees,
+ )
+
+ @inlineCallbacks
+ def _doDelayedRefresh(self, attendeesToProcess):
+ """
+ Do an attendee refresh that has been delayed until after processing of the request that called it. That
+ requires that we create a new transaction to work with.
- # We need to get the UID lock for implicit processing whilst we send the auto-reply
- # as the Organizer processing will attempt to write out data to other attendees to
- # refresh them. To prevent a race we need a lock.
- uidlock = MemcacheLock("ImplicitUIDLock", self.uid, timeout=60.0, expire_time=5*60)
-
+ @param attendeesToProcess: list of attendees to refresh.
+ @type attendeesToProcess: C{list}
+ """
+
+ # We need to get the UID lock for implicit processing whilst we send the auto-reply
+ # as the Organizer processing will attempt to write out data to other attendees to
+ # refresh them. To prevent a race we need a lock.
+ uidlock = MemcacheLock("ImplicitUIDLock", self.uid, timeout=60.0, expire_time=5*60)
+
+ try:
+ yield uidlock.acquire()
+ except MemcacheLockTimeoutError:
+ # Just try again to get the lock
+ reactor.callLater(2.0, self._doDelayedRefresh, attendeesToProcess)
+ else:
+
+ # inNewTransaction wipes out the remembered resource<-> URL mappings in the
+ # request object but we need to be able to map the actual reply resource to its
+ # URL when doing auto-processing, so we have to sneak that mapping back in here.
+ txn = yield self.organizer_calendar_resource.inNewTransaction(self.request)
+ organizer_resource = (yield self.request.locateResource(self.organizer_calendar_resource._url))
+
try:
- yield uidlock.acquire()
- except MemcacheLockTimeoutError:
- # Just try again to get the lock
- reactor.callLater(2.0, _doDelayedRefresh)
+ if organizer_resource.exists():
+ yield self._doRefresh(organizer_resource, only_attendees=attendeesToProcess)
+ else:
+ log.debug("ImplicitProcessing - skipping refresh of missing UID: '%s'" % (self.uid,))
+ except Exception, e:
+ log.debug("ImplicitProcessing - refresh exception UID: '%s', %s" % (self.uid, str(e)))
+ yield txn.abort()
else:
+ yield txn.commit()
+ finally:
+ yield uidlock.clean()
+
+ @inlineCallbacks
+ def _doBatchRefresh(self):
+ """
+ Do refresh of attendees in batches until the batch list is empty.
+ """
+
+ # Need to lock whilst manipulating the batch list
+ log.debug("ImplicitProcessing - batch refresh for UID: '%s'" % (self.uid,))
+ lock = MemcacheLock("BatchRefreshUIDLock", self.uid, timeout=60.0, expire_time=60.0)
+ try:
+ yield lock.acquire()
+ except MemcacheLockTimeoutError:
+ # If we could not lock then just fail the refresh - not sure what else to do
+ returnValue(None)
+
+ try:
+ # Get the batch list
+ cache = Memcacher("BatchRefreshAttendees", pickle=True)
+ pendingAttendees = yield cache.get(self.uid)
+ if pendingAttendees:
- # Release lock before sending refresh
+ # Get the next batch of attendees to process and update the cache value or remove it if
+ # no more processing is needed
+ attendeesToProcess = pendingAttendees[:config.Scheduling.Options.AttendeeRefreshBatch]
+ pendingAttendees = pendingAttendees[config.Scheduling.Options.AttendeeRefreshBatch:]
+ if pendingAttendees:
+ yield cache.set(self.uid, pendingAttendees)
+ else:
+ yield cache.delete(self.uid)
+
+ # Make sure we release this here to avoid potential deadlock when grabbing the ImplicitUIDLock in the next call
yield lock.release()
-
- # inNewTransaction wipes out the remembered resource<-> URL mappings in the
- # request object but we need to be able to map the actual reply resource to its
- # URL when doing auto-processing, so we have to sneak that mapping back in here.
- txn = yield resource.inNewTransaction(self.request)
- organizer_resource = (yield self.request.locateResource(resource._url))
-
- try:
- if organizer_resource.exists():
- yield _doRefresh(organizer_resource)
- else:
- log.debug("ImplicitProcessing - skipping refresh of missing UID: '%s'" % (self.uid,))
- except Exception, e:
- log.debug("ImplicitProcessing - refresh exception UID: '%s', %s" % (self.uid, str(e)))
- yield txn.abort()
- else:
- yield txn.commit()
- finally:
- # This correctly gets called only after commit or abort is done
- yield uidlock.clean()
-
- if lock:
- reactor.callLater(config.Scheduling.Options.AttendeeRefreshIntervalSeconds, _doDelayedRefresh)
- else:
- yield _doRefresh(resource)
-
+
+ # Now do the batch refresh
+ yield self._doDelayedRefresh(attendeesToProcess)
+
+ # Queue the next refresh if needed
+ if pendingAttendees:
+ reactor.callLater(config.Scheduling.Options.AttendeeRefreshBatchIntervalSeconds, self._doBatchRefresh)
+ else:
+ yield cache.delete(self.uid)
+ yield lock.release()
+ finally:
+ yield lock.clean()
+
@inlineCallbacks
def doImplicitAttendee(self):
Modified: CalendarServer/trunk/twistedcaldav/scheduling/test/test_implicit.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/scheduling/test/test_implicit.py 2011-11-16 18:58:15 UTC (rev 8295)
+++ CalendarServer/trunk/twistedcaldav/scheduling/test/test_implicit.py 2011-11-16 19:31:38 UTC (rev 8296)
@@ -14,12 +14,35 @@
# limitations under the License.
##
+from pycalendar.datetime import PyCalendarDateTime
+from pycalendar.timezone import PyCalendarTimezone
+from twext.web2 import responsecode
+from twisted.internet.defer import succeed, inlineCallbacks
from twistedcaldav.ical import Component
+from twistedcaldav.scheduling.implicit import ImplicitScheduler
+from twistedcaldav.scheduling.scheduler import ScheduleResponseQueue
import twistedcaldav.test.util
-from twistedcaldav.scheduling.implicit import ImplicitScheduler
-from pycalendar.datetime import PyCalendarDateTime
-from pycalendar.timezone import PyCalendarTimezone
+class FakeScheduler(object):
+ """
+ A fake CalDAVScheduler that does nothing except track who messages were sent to.
+ """
+
+ def __init__(self, recipients):
+ self.recipients = recipients
+
+ def doSchedulingViaPUT(self, originator, recipients, calendar, internal_request=False):
+ self.recipients.extend(recipients)
+ return succeed(ScheduleResponseQueue("FAKE", responsecode.OK))
+
+class FakePrincipal(object):
+
+ def __init__(self, cuaddr):
+ self.cuaddr = cuaddr
+
+ def calendarUserAddresses(self):
+ return (self.cuaddr,)
+
class Implicit (twistedcaldav.test.util.TestCase):
"""
iCalendar support tests
@@ -757,3 +780,60 @@
scheduler.findRemovedAttendees()
self.assertEqual(scheduler.cancelledAttendees, set(result), msg=description)
+
+ @inlineCallbacks
+ def test_process_request_excludes_includes(self):
+ """
+ Test that processRequests correctly excludes or includes the specified attendees.
+ """
+
+ data = (
+ ((), None, 3, ("mailto:user2 at example.com", "mailto:user3 at example.com", "mailto:user4 at example.com",),),
+ (("mailto:user2 at example.com",), None, 2, ("mailto:user3 at example.com", "mailto:user4 at example.com",),),
+ ((), ("mailto:user2 at example.com", "mailto:user4 at example.com",) , 2, ("mailto:user2 at example.com", "mailto:user4 at example.com",),),
+ )
+
+ calendar = """BEGIN:VCALENDAR
+VERSION:2.0
+PRODID:-//CALENDARSERVER.ORG//NONSGML Version 1//EN
+BEGIN:VEVENT
+UID:12345-67890
+DTSTART:20080601T120000Z
+DTEND:20080601T130000Z
+ORGANIZER;CN="User 01":mailto:user1 at example.com
+ATTENDEE:mailto:user1 at example.com
+ATTENDEE:mailto:user2 at example.com
+ATTENDEE:mailto:user3 at example.com
+ATTENDEE:mailto:user4 at example.com
+END:VEVENT
+END:VCALENDAR
+"""
+
+ for excludes, includes, result_count, result_set in data:
+ scheduler = ImplicitScheduler()
+ scheduler.resource = None
+ scheduler.request = None
+ scheduler.calendar = Component.fromString(calendar)
+ scheduler.state = "organizer"
+ scheduler.action = "modify"
+ scheduler.calendar_owner = None
+ scheduler.internal_request = True
+ scheduler.except_attendees = excludes
+ scheduler.only_refresh_attendees = includes
+ scheduler.changed_rids = None
+ scheduler.reinvites = None
+
+ # Get some useful information from the calendar
+ yield scheduler.extractCalendarData()
+ scheduler.organizerPrincipal = FakePrincipal(scheduler.organizer)
+
+ recipients = []
+
+ def makeFakeScheduler():
+ return FakeScheduler(recipients)
+ scheduler.makeScheduler = makeFakeScheduler
+
+ count = (yield scheduler.processRequests())
+ self.assertEqual(count, result_count)
+ self.assertEqual(len(recipients), result_count)
+ self.assertEqual(set(recipients), set(result_set))
Modified: CalendarServer/trunk/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/stdconfig.py 2011-11-16 18:58:15 UTC (rev 8295)
+++ CalendarServer/trunk/twistedcaldav/stdconfig.py 2011-11-16 19:31:38 UTC (rev 8296)
@@ -602,13 +602,14 @@
},
"Options" : {
- "AllowGroupAsOrganizer" : False, # Allow groups to be Organizers
- "AllowLocationAsOrganizer" : False, # Allow locations to be Organizers
- "AllowResourceAsOrganizer" : False, # Allow resources to be Organizers
- "AllowUserAutoAccept" : False, # Allow auto-accept for users
- "LimitFreeBusyAttendees" : 30, # Maximum number of attendees to request freebusy for
- "AttendeeRefreshIntervalSeconds" : 60, # Time after an iTIP REPLY at which attendee refresh will trigger
- "AttendeeRefreshThreshold" : 20, # Number of attendees above which refresh delays are used
+ "AllowGroupAsOrganizer" : False, # Allow groups to be Organizers
+ "AllowLocationAsOrganizer" : False, # Allow locations to be Organizers
+ "AllowResourceAsOrganizer" : False, # Allow resources to be Organizers
+ "AllowUserAutoAccept" : False, # Allow auto-accept for users
+ "LimitFreeBusyAttendees" : 30, # Maximum number of attendees to request freebusy for
+ "AttendeeRefreshBatch" : 5, # Number of attendees to do batched refreshes: 0 - no batching
+ "AttendeeRefreshBatchDelaySeconds" : 5, # Time after an iTIP REPLY for first batched attendee refresh
+ "AttendeeRefreshBatchIntervalSeconds" : 5, # Time between attendee batch refreshes
}
},
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111116/ce65ec28/attachment-0001.html>
More information about the calendarserver-changes
mailing list