[CalendarServer-changes] [11798] CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav

source_changes at macosforge.org source_changes at macosforge.org
Wed Oct 9 07:06:58 PDT 2013


Revision: 11798
          http://trac.calendarserver.org//changeset/11798
Author:   cdaboo at apple.com
Date:     2013-10-09 07:06:58 -0700 (Wed, 09 Oct 2013)
Log Message:
-----------
Fix some concurrency issues with attendee refresh work queue. Refactor a little for tests.

Modified Paths:
--------------
    CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py
    CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/test/test_pocessing.py
    CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql
    CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_25_to_26.sql

Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py	2013-10-08 23:02:09 UTC (rev 11797)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py	2013-10-09 14:06:58 UTC (rev 11798)
@@ -274,17 +274,31 @@
 
         # Check for batched refreshes
         if config.Scheduling.Options.AttendeeRefreshBatch:
-            yield self.ScheduleRefreshWork.refreshAttendees(
-                self.txn,
-                self.recipient_calendar_resource,
-                self.recipient_calendar,
-                exclude_attendees,
-            )
-
+            # Batch refresh those attendees that need it.
+            allAttendees = sorted(list(self.recipient_calendar.getAllUniqueAttendees()))
+            allAttendees = filter(lambda x: x not in exclude_attendees, allAttendees)
+            if allAttendees:
+                yield self._enqueueBatchRefresh(allAttendees)
         else:
             yield self._doRefresh(self.organizer_calendar_resource, exclude_attendees)
 
 
+    def _enqueueBatchRefresh(self, attendees):
+        """
+        Create a batch refresh work item. Do this in a separate method to allow for easy
+        unit testing.
+
+        @param attendees: the list of attendees to refresh
+        @type attendees: C{list}
+        """
+        return self.ScheduleRefreshWork.refreshAttendees(
+            self.txn,
+            self.recipient_calendar_resource,
+            self.recipient_calendar,
+            attendees,
+        )
+
+
     @inlineCallbacks
     def _doRefresh(self, organizer_resource, exclude_attendees=(), only_attendees=None):
         """
@@ -949,61 +963,101 @@
 
 
     class ScheduleRefreshWork(WorkItem, fromTable(schema.SCHEDULE_REFRESH_WORK)):
+        """
+        The associated work item table is SCHEDULE_REFRESH_WORK.
 
+        This work item is used to trigger an iTIP refresh of attendees. This happens when one attendee
+        replies to an invite, and we want to have the others attendees see that change - eventually. We
+        are going to use the SCHEDULE_REFRESH_ATTENDEES table to track the list of attendees needing
+        a refresh for each calendar object resource (identified by the organizer's resource-id for that
+        calendar object). We want to do refreshes in batches with a configurable time between each batch.
+
+        The tricky part here is handling race conditions, where two or more attendee replies happen at the
+        same time, or happen whilst a previously queued refresh has started batch processing. Here is how
+        we will handle that:
+
+        1) Each time a refresh is needed we will add all attendees to the SCHEDULE_REFRESH_ATTENDEES table.
+        This will happen even if those attendees are currently listed in that table. We ensure the table is
+        not unique wrt to attendees - this means that two simultaneous refreshes can happily insert the
+        same set of attendees without running into unique constraints and thus without having to use
+        savepoints to cope with that. This will mean duplicate attendees listed in the table, but we take
+        care of that when executing the work item, as per the next point.
+
+        2) When a work item is triggered we get the set of unique attendees needing a refresh from the
+        SCHEDULE_REFRESH_ATTENDEES table. We split out a batch of those to actually refresh - with the
+        others being left in the table as-is. We then remove the batch of attendees from the
+        SCHEDULE_REFRESH_ATTENDEES table - this will remove duplicates. The refresh is then done and a
+        new work item scheduled to do the next batch. We only stop rescheduling work items when nothing
+        is found during the initial query. Note that if any refresh is done we will always reschedule work
+        even if we know none remain. That should handle the case where a new refresh occurs whilst
+        processing the last batch from a previous refresh.
+
+        Hopefully the above methodology will deal with concurrency issues, preventing any excessive locking
+        or failed inserts etc.
+        """
+
         group = property(lambda self: "ScheduleRefreshWork:%s" % (self.resourceID,))
 
 
         @classmethod
         @inlineCallbacks
-        def refreshAttendees(cls, txn, organizer_resource, organizer_calendar, exclude_attendees):
-            # Get all attendees to refresh
-            allAttendees = sorted(list(organizer_calendar.getAllUniqueAttendees()))
-            allAttendees = filter(lambda x: x not in exclude_attendees, allAttendees)
+        def refreshAttendees(cls, txn, organizer_resource, organizer_calendar, attendees):
+            # See if there is already a pending refresh and merge current attendees into that list,
+            # otherwise just mark all attendees as pending
+            sra = schema.SCHEDULE_REFRESH_ATTENDEES
+            pendingAttendees = (yield Select(
+                [sra.ATTENDEE, ],
+                From=sra,
+                Where=sra.RESOURCE_ID == organizer_resource.id(),
+            ).on(txn))
+            pendingAttendees = [row[0] for row in pendingAttendees]
+            attendeesToRefresh = set(attendees) - set(pendingAttendees)
+            for attendee in attendeesToRefresh:
+                yield Insert(
+                    {
+                        sra.RESOURCE_ID: organizer_resource.id(),
+                        sra.ATTENDEE: attendee,
+                    }
+                ).on(txn)
 
-            if allAttendees:
-                # See if there is already a pending refresh and merge current attendees into that list,
-                # otherwise just mark all attendees as pending
-                sra = schema.SCHEDULE_REFRESH_ATTENDEES
-                pendingAttendees = (yield Select(
-                    [sra.ATTENDEE, ],
-                    From=sra,
-                    Where=sra.RESOURCE_ID == organizer_resource.id(),
-                ).on(txn))
-                pendingAttendees = [row[0] for row in pendingAttendees]
-                attendeesToRefresh = set(allAttendees) - set(pendingAttendees)
-                for attendee in attendeesToRefresh:
-                    yield Insert(
-                        {
-                            sra.RESOURCE_ID: organizer_resource.id(),
-                            sra.ATTENDEE: attendee,
-                        }
-                    ).on(txn)
+            # Always queue up new work - coalescing happens when work is executed
+            notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchDelaySeconds)
+            yield txn.enqueue(
+                cls,
+                homeResourceID=organizer_resource._home.id(),
+                resourceID=organizer_resource.id(),
+                notBefore=notBefore
+            )
 
-                # If there were no previous attendees queued that means we need to kick off new work
-                notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchDelaySeconds)
-                yield txn.enqueue(
-                    cls,
-                    homeResourceID=organizer_resource._home.id(),
-                    resourceID=organizer_resource.id(),
-                    notBefore=notBefore
-                )
 
-
         @inlineCallbacks
         def doWork(self):
 
+            # Look for other work items for this resource and ignore this one if other later ones exist
+            srw = schema.SCHEDULE_REFRESH_WORK
+            rows = (yield Select(
+                (srw.WORK_ID,),
+                From=srw,
+                Where=(srw.HOME_RESOURCE_ID == self.homeResourceID).And(
+                       srw.RESOURCE_ID == self.resourceID),
+            ).on(self.transaction))
+            if rows:
+                log.debug("Schedule refresh for resource-id: {rid} - ignored", rid=self.resourceID)
+                returnValue(None)
+
             log.debug("Schedule refresh for resource-id: {rid}", rid=self.resourceID)
 
-            # Get list of pending attendees and split into batch to process
+            # Get the unique list of pending attendees and split into batch to process
             # TODO: do a DELETE ... and rownum <= N returning attendee - but have to fix Oracle to
-            # handle multi-row returning. Would be better than entire select + delete of each one
+            # handle multi-row returning. Would be better than entire select + delete of each one,
+            # but need to make sure to use UNIQUE as there may be duplicate attendees.
             sra = schema.SCHEDULE_REFRESH_ATTENDEES
             pendingAttendees = (yield Select(
                 [sra.ATTENDEE, ],
                 From=sra,
                 Where=sra.RESOURCE_ID == self.resourceID,
             ).on(self.transaction))
-            pendingAttendees = [row[0] for row in pendingAttendees]
+            pendingAttendees = list(set([row[0] for row in pendingAttendees]))
 
             # Nothing left so done
             if len(pendingAttendees) == 0:
@@ -1017,16 +1071,15 @@
                 Where=(sra.RESOURCE_ID == self.resourceID).And(sra.ATTENDEE.In(Parameter("attendeesToProcess", len(attendeesToProcess))))
             ).on(self.transaction, attendeesToProcess=attendeesToProcess)
 
-            # Always reschedule work item, even if nothing appears to be left. This should take care of
-            # any race condition where another transaction adding attendees to refresh thinks there is still
-            # a pending work item and does not reschedule one itself.
-            notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchDelaySeconds)
-            yield self.transaction.enqueue(
-                self.__class__,
-                homeResourceID=self.homeResourceID,
-                resourceID=self.resourceID,
-                notBefore=notBefore
-            )
+            # Reschedule work item if pending attendees remain.
+            if len(pendingAttendees) != 0:
+                notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchIntervalSeconds)
+                yield self.transaction.enqueue(
+                    self.__class__,
+                    homeResourceID=self.homeResourceID,
+                    resourceID=self.resourceID,
+                    notBefore=notBefore
+                )
 
             # Do refresh
             yield self._doDelayedRefresh(attendeesToProcess)

Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/test/test_pocessing.py
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/test/test_pocessing.py	2013-10-08 23:02:09 UTC (rev 11797)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/test/test_pocessing.py	2013-10-09 14:06:58 UTC (rev 11798)
@@ -34,7 +34,7 @@
         self.batches = 0
 
 
-    def _enqueueBatchRefresh(self):
+    def _enqueueBatchRefresh(self, exclude_attendees):
         self.batches += 1
 
 
@@ -69,7 +69,7 @@
 
 
     def id(self):
-        return None
+        return 1
 
 
 
@@ -107,6 +107,7 @@
         processor = FakeImplicitProcessor()
         processor.txn = ""
         processor.uid = "12345-67890"
+        processor.recipient_calendar_resource = FakeResource()
         processor.recipient_calendar = calendar
         yield processor.queueAttendeeUpdate(("urn:uuid:user02", "urn:uuid:user01",))
         self.assertEqual(processor.batches, 0)
@@ -134,6 +135,7 @@
         processor = FakeImplicitProcessor()
         processor.txn = ""
         processor.uid = "12345-67890"
+        processor.recipient_calendar_resource = FakeResource()
         processor.recipient_calendar = calendar
         yield processor.queueAttendeeUpdate(("urn:uuid:user02", "urn:uuid:user01",))
         self.assertEqual(processor.batches, 1)
@@ -219,7 +221,7 @@
             processor.txn = ""
             processor.recipient_calendar = calendar.duplicate()
             processor.uid = processor.recipient_calendar.newUID()
-            processor.recipient_calendar_resource = None
+            processor.recipient_calendar_resource = FakeResource()
             processor.message = itip.duplicate()
             processor.message.newUID(processor.uid)
             processor.originator = LocalCalendarUser(None, None)

Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql	2013-10-08 23:02:09 UTC (rev 11797)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql	2013-10-09 14:06:58 UTC (rev 11798)
@@ -697,16 +697,31 @@
   RESOURCE_ID                   integer      not null references CALENDAR_OBJECT on delete cascade
 );
 
+create index SCHEDULE_REFRESH_WORK_HOME_RESOURCE_ID on
+	SCHEDULE_REFRESH_WORK(HOME_RESOURCE_ID);
 create index SCHEDULE_REFRESH_WORK_RESOURCE_ID on
 	SCHEDULE_REFRESH_WORK(RESOURCE_ID);
 
 create table SCHEDULE_REFRESH_ATTENDEES (
   RESOURCE_ID                   integer      not null references CALENDAR_OBJECT on delete cascade,
-  ATTENDEE			            varchar(255),
-
-  primary key (RESOURCE_ID, ATTENDEE)
+  ATTENDEE			            varchar(255) not null
 );
 
+create index SCHEDULE_REFRESH_ATTENDEES_RESOURCE_ID_ATTENDEE on
+	SCHEDULE_REFRESH_ATTENDEES(RESOURCE_ID, ATTENDEE);
+
+------------------------------
+-- Schedule Auto Reply Work --
+------------------------------
+
+--create table SCHEDULE_AUTO_REPLY_WORK (
+--  WORK_ID                       integer      primary key default nextval('WORKITEM_SEQ') not null, -- implicit index
+--  NOT_BEFORE                    timestamp    default timezone('UTC', CURRENT_TIMESTAMP),
+--  HOME_RESOURCE_ID              integer      not null references CALENDAR_HOME on delete cascade,
+--  RESOURCE_ID                   integer      not null references CALENDAR_OBJECT on delete cascade,
+--  PARTSTAT						varchar(255) not null
+--);
+
 --------------------
 -- Schema Version --
 --------------------

Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_25_to_26.sql
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_25_to_26.sql	2013-10-08 23:02:09 UTC (rev 11797)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_25_to_26.sql	2013-10-09 14:06:58 UTC (rev 11798)
@@ -22,19 +22,23 @@
 create table SCHEDULE_REFRESH_WORK (
   WORK_ID                       integer      primary key default nextval('WORKITEM_SEQ') not null, -- implicit index
   NOT_BEFORE                    timestamp    default timezone('UTC', CURRENT_TIMESTAMP),
+  HOME_RESOURCE_ID              integer      not null references CALENDAR_HOME on delete cascade,
   RESOURCE_ID                   integer      not null references CALENDAR_OBJECT on delete cascade
 );
 
+create index SCHEDULE_REFRESH_WORK_HOME_RESOURCE_ID on
+	SCHEDULE_REFRESH_WORK(HOME_RESOURCE_ID);
 create index SCHEDULE_REFRESH_WORK_RESOURCE_ID on
 	SCHEDULE_REFRESH_WORK(RESOURCE_ID);
 
 create table SCHEDULE_REFRESH_ATTENDEES (
   RESOURCE_ID                   integer      not null references CALENDAR_OBJECT on delete cascade,
-  ATTENDEE			            varchar(255),
-
-  primary key (RESOURCE_ID, ATTENDEE)
+  ATTENDEE			            varchar(255) not null
 );
 
+create index SCHEDULE_REFRESH_ATTENDEES_RESOURCE_ID_ATTENDEE on
+	SCHEDULE_REFRESH_ATTENDEES(RESOURCE_ID, ATTENDEE);
+
 -- Now update the version
 -- No data upgrades
 update CALENDARSERVER set VALUE = '26' where NAME = 'VERSION';
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20131009/1e33ab2f/attachment-0001.html>


More information about the calendarserver-changes mailing list