[CalendarServer-changes] [8296] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Wed Nov 16 11:31:38 PST 2011


Revision: 8296
          http://trac.macosforge.org/projects/calendarserver/changeset/8296
Author:   cdaboo at apple.com
Date:     2011-11-16 11:31:38 -0800 (Wed, 16 Nov 2011)
Log Message:
-----------
Batched attendee refreshes.

Modified Paths:
--------------
    CalendarServer/trunk/conf/caldavd-test.plist
    CalendarServer/trunk/twistedcaldav/ical.py
    CalendarServer/trunk/twistedcaldav/scheduling/implicit.py
    CalendarServer/trunk/twistedcaldav/scheduling/processing.py
    CalendarServer/trunk/twistedcaldav/scheduling/test/test_implicit.py
    CalendarServer/trunk/twistedcaldav/stdconfig.py

Modified: CalendarServer/trunk/conf/caldavd-test.plist
===================================================================
--- CalendarServer/trunk/conf/caldavd-test.plist	2011-11-16 18:58:15 UTC (rev 8295)
+++ CalendarServer/trunk/conf/caldavd-test.plist	2011-11-16 19:31:38 UTC (rev 8296)
@@ -811,7 +811,7 @@
         <false/>
         <key>AllowResourceAsOrganizer</key>
         <false/>
-        <key>AttendeeRefreshIntervalSeconds</key>
+        <key>AttendeeRefreshBatch</key>
         <integer>0</integer>
       </dict>
 

Modified: CalendarServer/trunk/twistedcaldav/ical.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/ical.py	2011-11-16 18:58:15 UTC (rev 8295)
+++ CalendarServer/trunk/twistedcaldav/ical.py	2011-11-16 19:31:38 UTC (rev 8296)
@@ -1729,10 +1729,13 @@
                 yield attendee
 
 
-    def countAllUniqueAttendees(self, onlyScheduleAgentServer=True):
+    def getAllUniqueAttendees(self, onlyScheduleAgentServer=True):
         attendeesByInstance = self.getAttendeesByInstance(True, onlyScheduleAgentServer=onlyScheduleAgentServer)
-        return len(set([attendee for attendee, _ignore in attendeesByInstance]))
-        
+        attendees = set()
+        for attendee, _ignore in attendeesByInstance:
+            attendees.add(attendee)
+        return attendees
+
     def getMaskUID(self):
         """
         Get the X-CALENDARSEREVR-MASK-UID value. Works on either a VCALENDAR or on a component.

Modified: CalendarServer/trunk/twistedcaldav/scheduling/implicit.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/scheduling/implicit.py	2011-11-16 18:58:15 UTC (rev 8295)
+++ CalendarServer/trunk/twistedcaldav/scheduling/implicit.py	2011-11-16 19:31:38 UTC (rev 8296)
@@ -247,6 +247,7 @@
         # Setup some parameters
         self.do_smart_merge = do_smart_merge
         self.except_attendees = ()
+        self.only_refresh_attendees = None
 
         # Determine what type of scheduling this is: Organizer triggered or Attendee triggered
         if self.state == "organizer":
@@ -264,7 +265,7 @@
             returnValue(self.return_calendar if hasattr(self, "return_calendar") else self.calendar)
 
     @inlineCallbacks
-    def refreshAllAttendeesExceptSome(self, request, resource, attendees):
+    def refreshAllAttendeesExceptSome(self, request, resource, except_attendees=(), only_attendees=None):
         """
         Refresh the iCalendar data for all attendees except the one specified in attendees.
         """
@@ -277,7 +278,8 @@
 
         self.calendar_owner = None
         self.internal_request = True
-        self.except_attendees = attendees
+        self.except_attendees = except_attendees
+        self.only_refresh_attendees = only_attendees
         self.changed_rids = None
         self.reinvites = None
 
@@ -457,6 +459,12 @@
         outbox = (yield self.request.locateResource(outboxURL))
         yield outbox.authorize(self.request, (caldavxml.ScheduleSend(),))
 
+    def makeScheduler(self):
+        """
+        Convenience method which we can override in unit tests to make testing easier.
+        """
+        return CalDAVScheduler(self.request, self.resource)
+
     @inlineCallbacks
     def doImplicitOrganizer(self):
         
@@ -829,7 +837,7 @@
             # Send scheduling message
             
             # This is a local CALDAV scheduling operation.
-            scheduler = CalDAVScheduler(self.request, self.resource)
+            scheduler = self.makeScheduler()
     
             # Do the PUT processing
             log.info("Implicit CANCEL - organizer: '%s' to attendee: '%s', UID: '%s', RIDs: '%s'" % (self.organizer, attendee, self.uid, rids))
@@ -858,6 +866,10 @@
             if attendee in self.except_attendees:
                 continue
 
+            # Only send to specified attendees
+            if self.only_refresh_attendees is not None and attendee not in self.only_refresh_attendees:
+                continue
+
             # If SCHEDULE-FORCE-SEND only change, only send message to those Attendees
             if self.reinvites and attendee in self.reinvites:
                 continue
@@ -867,7 +879,7 @@
             # Send scheduling message
             if itipmsg is not None:
                 # This is a local CALDAV scheduling operation.
-                scheduler = CalDAVScheduler(self.request, self.resource)
+                scheduler = self.makeScheduler()
         
                 # Do the PUT processing
                 log.info("Implicit REQUEST - organizer: '%s' to attendee: '%s', UID: '%s'" % (self.organizer, attendee, self.uid,))
@@ -1117,7 +1129,7 @@
         # Send scheduling message
 
         # This is a local CALDAV scheduling operation.
-        scheduler = CalDAVScheduler(self.request, self.resource)
+        scheduler = self.makeScheduler()
 
         # Do the PUT processing
         def _gotResponse(response):

Modified: CalendarServer/trunk/twistedcaldav/scheduling/processing.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/scheduling/processing.py	2011-11-16 18:58:15 UTC (rev 8295)
+++ CalendarServer/trunk/twistedcaldav/scheduling/processing.py	2011-11-16 19:31:38 UTC (rev 8296)
@@ -35,6 +35,7 @@
 from twistedcaldav.scheduling.itip import iTipProcessing, iTIPRequestStatus
 from twistedcaldav.scheduling.utils import getCalendarObjectForPrincipals
 from twistedcaldav.memcachelock import MemcacheLock, MemcacheLockTimeoutError
+from twistedcaldav.memcacher import Memcacher
 from pycalendar.duration import PyCalendarDuration
 from pycalendar.datetime import PyCalendarDateTime
 from pycalendar.timezone import PyCalendarTimezone
@@ -173,7 +174,7 @@
  
             # Update the organizer's copy of the event
             log.debug("ImplicitProcessing - originator '%s' to recipient '%s' processing METHOD:REPLY, UID: '%s' - updating event" % (self.originator.cuaddr, self.recipient.cuaddr, self.uid))
-            recipient_calendar_resource = (yield self.writeCalendarResource(self.recipient_calendar_collection_uri, self.recipient_calendar_collection, self.recipient_calendar_name, self.recipient_calendar))
+            self.organizer_calendar_resource = (yield self.writeCalendarResource(self.recipient_calendar_collection_uri, self.recipient_calendar_collection, self.recipient_calendar_name, self.recipient_calendar))
             
             # Build the schedule-changes XML element
             attendeeReplying, rids = processed
@@ -204,7 +205,7 @@
 
             # Only update other attendees when the partstat was changed by the reply
             if partstatChanged:
-                yield self.queueAttendeeUpdate(recipient_calendar_resource, (attendeeReplying,))
+                yield self.queueAttendeeUpdate((attendeeReplying,))
 
             result = (True, False, True, changes,)
 
@@ -215,9 +216,12 @@
         returnValue(result)
 
     @inlineCallbacks
-    def queueAttendeeUpdate(self, resource, attendees):
+    def queueAttendeeUpdate(self, exclude_attendees):
         """
         Queue up an update to attendees and use a memcache lock to ensure we don't update too frequently.
+        
+        @param exclude_attendees: list of attendees who should not be refreshed (e.g., the one that triggeed the refresh)
+        @type exclude_attendees: C{list}
         """
         
         # When doing auto-processing of replies, only refresh attendees when the last auto-accept is done.
@@ -227,72 +231,159 @@
             self.request.auto_reply_suppressed = True
             returnValue(None)
         if hasattr(self.request, "auto_reply_suppressed"):
-            attendees = ()
+            exclude_attendees = ()
 
-        # Use a memcachelock to ensure others don't refresh whilst we have an enqueued call
         self.uid = self.recipient_calendar.resourceUID()
-        if config.Scheduling.Options.AttendeeRefreshIntervalSeconds and self.recipient_calendar.countAllUniqueAttendees() > config.Scheduling.Options.AttendeeRefreshThreshold:
-            attendees = ()
-            lock = MemcacheLock("RefreshUIDLock", self.uid, timeout=0.0, expire_time=config.Scheduling.Options.AttendeeRefreshIntervalSeconds)
+
+        # Check for batched refreshes
+        if config.Scheduling.Options.AttendeeRefreshBatch:
             
-            # Try lock, but fail immediately if already taken
+            # Need to lock whilst manipulating the batch list
+            lock = MemcacheLock("BatchRefreshUIDLock", self.uid, timeout=60.0, expire_time=60.0)
             try:
                 yield lock.acquire()
             except MemcacheLockTimeoutError:
+                # If we could not lock then just fail the refresh - not sure what else to do
                 returnValue(None)
+            
+            try:
+                # Get all attendees to refresh
+                allAttendees = sorted(list(self.recipient_calendar.getAllUniqueAttendees()))
+    
+                # Always need to refresh every attendee
+                exclude_attendees = ()
+                
+                # See if there is already a pending refresh and merge current attendees into that list,
+                # otherwise just mark all attendees as pending
+                cache = Memcacher("BatchRefreshAttendees", pickle=True)
+                pendingAttendees = yield cache.get(self.uid)
+                firstTime = False
+                if pendingAttendees:
+                    for attendee in allAttendees:
+                        if attendee not in pendingAttendees:
+                            pendingAttendees.append(attendee)
+                else:
+                    firstTime = True
+                    pendingAttendees = allAttendees
+                yield cache.set(self.uid, pendingAttendees)
+    
+                # Now start the first batch off
+                if firstTime:
+                    reactor.callLater(config.Scheduling.Options.AttendeeRefreshBatchDelaySeconds, self._doBatchRefresh)
+            finally:
+                yield lock.clean()
+        
         else:
-            lock = None
+            yield self._doRefresh(self.organizer_calendar_resource, exclude_attendees)
 
-        @inlineCallbacks
-        def _doRefresh(organizer_resource):
-            log.debug("ImplicitProcessing - refreshing UID: '%s'" % (self.uid,))
-            from twistedcaldav.scheduling.implicit import ImplicitScheduler
-            scheduler = ImplicitScheduler()
-            yield scheduler.refreshAllAttendeesExceptSome(self.request, organizer_resource, attendees)
+    @inlineCallbacks
+    def _doRefresh(self, organizer_resource, exclude_attendees=(), only_attendees=None):
+        """
+        Do a refresh of attendees.
 
-        @inlineCallbacks
-        def _doDelayedRefresh():
+        @param organizer_resource: the resource for the organizer's calendar data
+        @type organizer_resource: L{DAVResource}
+        @param exclude_attendees: list of attendees to not refresh
+        @type exclude_attendees: C{tuple}
+        @param only_attendees: list of attendees to refresh (C{None} - refresh all) 
+        @type only_attendees: C{tuple}
+        """
+        log.debug("ImplicitProcessing - refreshing UID: '%s', Attendees: %s" % (self.uid, ", ".join(only_attendees) if only_attendees else "all"))
+        from twistedcaldav.scheduling.implicit import ImplicitScheduler
+        scheduler = ImplicitScheduler()
+        yield scheduler.refreshAllAttendeesExceptSome(
+            self.request,
+            organizer_resource,
+            exclude_attendees,
+            only_attendees=only_attendees,
+        )
+        
+    @inlineCallbacks
+    def _doDelayedRefresh(self, attendeesToProcess):
+        """
+        Do an attendee refresh that has been delayed until after processing of the request that called it. That
+        requires that we create a new transaction to work with.
 
-            # We need to get the UID lock for implicit processing whilst we send the auto-reply
-            # as the Organizer processing will attempt to write out data to other attendees to
-            # refresh them. To prevent a race we need a lock.
-            uidlock = MemcacheLock("ImplicitUIDLock", self.uid, timeout=60.0, expire_time=5*60)
-    
+        @param attendeesToProcess: list of attendees to refresh.
+        @type attendeesToProcess: C{list}
+        """
+
+        # We need to get the UID lock for implicit processing whilst we send the auto-reply
+        # as the Organizer processing will attempt to write out data to other attendees to
+        # refresh them. To prevent a race we need a lock.
+        uidlock = MemcacheLock("ImplicitUIDLock", self.uid, timeout=60.0, expire_time=5*60)
+
+        try:
+            yield uidlock.acquire()
+        except MemcacheLockTimeoutError:
+            # Just try again to get the lock
+            reactor.callLater(2.0, self._doDelayedRefresh, attendeesToProcess)
+        else:
+
+            # inNewTransaction wipes out the remembered resource<-> URL mappings in the
+            # request object but we need to be able to map the actual reply resource to its
+            # URL when doing auto-processing, so we have to sneak that mapping back in here.
+            txn = yield self.organizer_calendar_resource.inNewTransaction(self.request)
+            organizer_resource = (yield self.request.locateResource(self.organizer_calendar_resource._url))
+
             try:
-                yield uidlock.acquire()
-            except MemcacheLockTimeoutError:
-                # Just try again to get the lock
-                reactor.callLater(2.0, _doDelayedRefresh)
+                if organizer_resource.exists():
+                    yield self._doRefresh(organizer_resource, only_attendees=attendeesToProcess)
+                else:
+                    log.debug("ImplicitProcessing - skipping refresh of missing UID: '%s'" % (self.uid,))
+            except Exception, e:
+                log.debug("ImplicitProcessing - refresh exception UID: '%s', %s" % (self.uid, str(e)))
+                yield txn.abort()
             else:
+                yield txn.commit()
+        finally:
+            yield uidlock.clean()
+
+    @inlineCallbacks
+    def _doBatchRefresh(self):
+        """
+        Do refresh of attendees in batches until the batch list is empty.
+        """
+
+        # Need to lock whilst manipulating the batch list
+        log.debug("ImplicitProcessing - batch refresh for UID: '%s'" % (self.uid,))
+        lock = MemcacheLock("BatchRefreshUIDLock", self.uid, timeout=60.0, expire_time=60.0)
+        try:
+            yield lock.acquire()
+        except MemcacheLockTimeoutError:
+            # If we could not lock then just fail the refresh - not sure what else to do
+            returnValue(None)
+
+        try:
+            # Get the batch list
+            cache = Memcacher("BatchRefreshAttendees", pickle=True)
+            pendingAttendees = yield cache.get(self.uid)
+            if pendingAttendees:
                 
-                # Release lock before sending refresh
+                # Get the next batch of attendees to process and update the cache value or remove it if
+                # no more processing is needed
+                attendeesToProcess = pendingAttendees[:config.Scheduling.Options.AttendeeRefreshBatch]
+                pendingAttendees = pendingAttendees[config.Scheduling.Options.AttendeeRefreshBatch:]
+                if pendingAttendees:
+                    yield cache.set(self.uid, pendingAttendees)
+                else:
+                    yield cache.delete(self.uid)
+                    
+                # Make sure we release this here to avoid potential deadlock when grabbing the ImplicitUIDLock in the next call
                 yield lock.release()
-    
-                # inNewTransaction wipes out the remembered resource<-> URL mappings in the
-                # request object but we need to be able to map the actual reply resource to its
-                # URL when doing auto-processing, so we have to sneak that mapping back in here.
-                txn = yield resource.inNewTransaction(self.request)
-                organizer_resource = (yield self.request.locateResource(resource._url))
-    
-                try:
-                    if organizer_resource.exists():
-                        yield _doRefresh(organizer_resource)
-                    else:
-                        log.debug("ImplicitProcessing - skipping refresh of missing UID: '%s'" % (self.uid,))
-                except Exception, e:
-                    log.debug("ImplicitProcessing - refresh exception UID: '%s', %s" % (self.uid, str(e)))
-                    yield txn.abort()
-                else:
-                    yield txn.commit()
-            finally:
-                # This correctly gets called only after commit or abort is done
-                yield uidlock.clean()
-
-        if lock:
-            reactor.callLater(config.Scheduling.Options.AttendeeRefreshIntervalSeconds, _doDelayedRefresh)
-        else:
-            yield _doRefresh(resource)
-
+                
+                # Now do the batch refresh
+                yield self._doDelayedRefresh(attendeesToProcess)
+                
+                # Queue the next refresh if needed
+                if pendingAttendees:
+                    reactor.callLater(config.Scheduling.Options.AttendeeRefreshBatchIntervalSeconds, self._doBatchRefresh)
+            else:
+                yield cache.delete(self.uid)
+                yield lock.release()
+        finally:
+            yield lock.clean()
+            
     @inlineCallbacks
     def doImplicitAttendee(self):
 

Modified: CalendarServer/trunk/twistedcaldav/scheduling/test/test_implicit.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/scheduling/test/test_implicit.py	2011-11-16 18:58:15 UTC (rev 8295)
+++ CalendarServer/trunk/twistedcaldav/scheduling/test/test_implicit.py	2011-11-16 19:31:38 UTC (rev 8296)
@@ -14,12 +14,35 @@
 # limitations under the License.
 ##
 
+from pycalendar.datetime import PyCalendarDateTime
+from pycalendar.timezone import PyCalendarTimezone
+from twext.web2 import responsecode
+from twisted.internet.defer import succeed, inlineCallbacks
 from twistedcaldav.ical import Component
+from twistedcaldav.scheduling.implicit import ImplicitScheduler
+from twistedcaldav.scheduling.scheduler import ScheduleResponseQueue
 import twistedcaldav.test.util
-from twistedcaldav.scheduling.implicit import ImplicitScheduler
-from pycalendar.datetime import PyCalendarDateTime
-from pycalendar.timezone import PyCalendarTimezone
 
+class FakeScheduler(object):
+    """
+    A fake CalDAVScheduler that does nothing except track who messages were sent to.
+    """
+    
+    def __init__(self, recipients):
+        self.recipients = recipients
+
+    def doSchedulingViaPUT(self, originator, recipients, calendar, internal_request=False):
+        self.recipients.extend(recipients)
+        return succeed(ScheduleResponseQueue("FAKE", responsecode.OK))
+
+class FakePrincipal(object):
+    
+    def __init__(self, cuaddr):
+        self.cuaddr = cuaddr
+        
+    def calendarUserAddresses(self):
+        return (self.cuaddr,)
+
 class Implicit (twistedcaldav.test.util.TestCase):
     """
     iCalendar support tests
@@ -757,3 +780,60 @@
             scheduler.findRemovedAttendees()
             self.assertEqual(scheduler.cancelledAttendees, set(result), msg=description)
 
+
+    @inlineCallbacks   
+    def test_process_request_excludes_includes(self):
+        """
+        Test that processRequests correctly excludes or includes the specified attendees.
+        """
+
+        data = (
+            ((), None, 3, ("mailto:user2 at example.com", "mailto:user3 at example.com", "mailto:user4 at example.com",),),
+            (("mailto:user2 at example.com",), None, 2, ("mailto:user3 at example.com", "mailto:user4 at example.com",),),
+            ((), ("mailto:user2 at example.com", "mailto:user4 at example.com",) , 2, ("mailto:user2 at example.com", "mailto:user4 at example.com",),),
+        )
+
+        calendar = """BEGIN:VCALENDAR
+VERSION:2.0
+PRODID:-//CALENDARSERVER.ORG//NONSGML Version 1//EN
+BEGIN:VEVENT
+UID:12345-67890
+DTSTART:20080601T120000Z
+DTEND:20080601T130000Z
+ORGANIZER;CN="User 01":mailto:user1 at example.com
+ATTENDEE:mailto:user1 at example.com
+ATTENDEE:mailto:user2 at example.com
+ATTENDEE:mailto:user3 at example.com
+ATTENDEE:mailto:user4 at example.com
+END:VEVENT
+END:VCALENDAR
+"""
+
+        for excludes, includes, result_count, result_set in data:
+            scheduler = ImplicitScheduler()
+            scheduler.resource = None
+            scheduler.request = None
+            scheduler.calendar = Component.fromString(calendar)
+            scheduler.state = "organizer"
+            scheduler.action = "modify"
+            scheduler.calendar_owner = None
+            scheduler.internal_request = True
+            scheduler.except_attendees = excludes
+            scheduler.only_refresh_attendees = includes
+            scheduler.changed_rids = None
+            scheduler.reinvites = None
+    
+            # Get some useful information from the calendar
+            yield scheduler.extractCalendarData()
+            scheduler.organizerPrincipal = FakePrincipal(scheduler.organizer)
+    
+            recipients = []
+            
+            def makeFakeScheduler():
+                return FakeScheduler(recipients)
+            scheduler.makeScheduler = makeFakeScheduler
+            
+            count = (yield scheduler.processRequests())
+            self.assertEqual(count, result_count)
+            self.assertEqual(len(recipients), result_count)
+            self.assertEqual(set(recipients), set(result_set))

Modified: CalendarServer/trunk/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/stdconfig.py	2011-11-16 18:58:15 UTC (rev 8295)
+++ CalendarServer/trunk/twistedcaldav/stdconfig.py	2011-11-16 19:31:38 UTC (rev 8296)
@@ -602,13 +602,14 @@
         },
 
         "Options" : {
-            "AllowGroupAsOrganizer"          : False, # Allow groups to be Organizers
-            "AllowLocationAsOrganizer"       : False, # Allow locations to be Organizers
-            "AllowResourceAsOrganizer"       : False, # Allow resources to be Organizers
-            "AllowUserAutoAccept"            : False, # Allow auto-accept for users
-            "LimitFreeBusyAttendees"         : 30,    # Maximum number of attendees to request freebusy for
-            "AttendeeRefreshIntervalSeconds" : 60,    # Time after an iTIP REPLY at which attendee refresh will trigger 
-            "AttendeeRefreshThreshold"       : 20,    # Number of attendees above which refresh delays are used 
+            "AllowGroupAsOrganizer"               : False, # Allow groups to be Organizers
+            "AllowLocationAsOrganizer"            : False, # Allow locations to be Organizers
+            "AllowResourceAsOrganizer"            : False, # Allow resources to be Organizers
+            "AllowUserAutoAccept"                 : False, # Allow auto-accept for users
+            "LimitFreeBusyAttendees"              : 30,    # Maximum number of attendees to request freebusy for
+            "AttendeeRefreshBatch"                :  5,    # Number of attendees to do batched refreshes: 0 - no batching
+            "AttendeeRefreshBatchDelaySeconds"    :  5,    # Time after an iTIP REPLY for first batched attendee refresh
+            "AttendeeRefreshBatchIntervalSeconds" :  5,    # Time between attendee batch refreshes 
         }
     },
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111116/ce65ec28/attachment-0001.html>


More information about the calendarserver-changes mailing list