[CalendarServer-changes] [11798] CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav
source_changes at macosforge.org
source_changes at macosforge.org
Wed Oct 9 07:06:58 PDT 2013
Revision: 11798
http://trac.calendarserver.org//changeset/11798
Author: cdaboo at apple.com
Date: 2013-10-09 07:06:58 -0700 (Wed, 09 Oct 2013)
Log Message:
-----------
Fix some concurrency issues with attendee refresh work queue. Refactor a little for tests.
Modified Paths:
--------------
CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py
CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/test/test_pocessing.py
CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql
CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_25_to_26.sql
Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py 2013-10-08 23:02:09 UTC (rev 11797)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py 2013-10-09 14:06:58 UTC (rev 11798)
@@ -274,17 +274,31 @@
# Check for batched refreshes
if config.Scheduling.Options.AttendeeRefreshBatch:
- yield self.ScheduleRefreshWork.refreshAttendees(
- self.txn,
- self.recipient_calendar_resource,
- self.recipient_calendar,
- exclude_attendees,
- )
-
+ # Batch refresh those attendees that need it.
+ allAttendees = sorted(list(self.recipient_calendar.getAllUniqueAttendees()))
+ allAttendees = filter(lambda x: x not in exclude_attendees, allAttendees)
+ if allAttendees:
+ yield self._enqueueBatchRefresh(allAttendees)
else:
yield self._doRefresh(self.organizer_calendar_resource, exclude_attendees)
+ def _enqueueBatchRefresh(self, attendees):
+ """
+ Create a batch refresh work item. Do this in a separate method to allow for easy
+ unit testing.
+
+ @param attendees: the list of attendees to refresh
+ @type attendees: C{list}
+ """
+ return self.ScheduleRefreshWork.refreshAttendees(
+ self.txn,
+ self.recipient_calendar_resource,
+ self.recipient_calendar,
+ attendees,
+ )
+
+
@inlineCallbacks
def _doRefresh(self, organizer_resource, exclude_attendees=(), only_attendees=None):
"""
@@ -949,61 +963,101 @@
class ScheduleRefreshWork(WorkItem, fromTable(schema.SCHEDULE_REFRESH_WORK)):
+ """
+ The associated work item table is SCHEDULE_REFRESH_WORK.
+ This work item is used to trigger an iTIP refresh of attendees. This happens when one attendee
+ replies to an invite, and we want to have the others attendees see that change - eventually. We
+ are going to use the SCHEDULE_REFRESH_ATTENDEES table to track the list of attendees needing
+ a refresh for each calendar object resource (identified by the organizer's resource-id for that
+ calendar object). We want to do refreshes in batches with a configurable time between each batch.
+
+ The tricky part here is handling race conditions, where two or more attendee replies happen at the
+ same time, or happen whilst a previously queued refresh has started batch processing. Here is how
+ we will handle that:
+
+ 1) Each time a refresh is needed we will add all attendees to the SCHEDULE_REFRESH_ATTENDEES table.
+ This will happen even if those attendees are currently listed in that table. We ensure the table is
+ not unique wrt to attendees - this means that two simultaneous refreshes can happily insert the
+ same set of attendees without running into unique constraints and thus without having to use
+ savepoints to cope with that. This will mean duplicate attendees listed in the table, but we take
+ care of that when executing the work item, as per the next point.
+
+ 2) When a work item is triggered we get the set of unique attendees needing a refresh from the
+ SCHEDULE_REFRESH_ATTENDEES table. We split out a batch of those to actually refresh - with the
+ others being left in the table as-is. We then remove the batch of attendees from the
+ SCHEDULE_REFRESH_ATTENDEES table - this will remove duplicates. The refresh is then done and a
+ new work item scheduled to do the next batch. We only stop rescheduling work items when nothing
+ is found during the initial query. Note that if any refresh is done we will always reschedule work
+ even if we know none remain. That should handle the case where a new refresh occurs whilst
+ processing the last batch from a previous refresh.
+
+ Hopefully the above methodology will deal with concurrency issues, preventing any excessive locking
+ or failed inserts etc.
+ """
+
group = property(lambda self: "ScheduleRefreshWork:%s" % (self.resourceID,))
@classmethod
@inlineCallbacks
- def refreshAttendees(cls, txn, organizer_resource, organizer_calendar, exclude_attendees):
- # Get all attendees to refresh
- allAttendees = sorted(list(organizer_calendar.getAllUniqueAttendees()))
- allAttendees = filter(lambda x: x not in exclude_attendees, allAttendees)
+ def refreshAttendees(cls, txn, organizer_resource, organizer_calendar, attendees):
+ # See if there is already a pending refresh and merge current attendees into that list,
+ # otherwise just mark all attendees as pending
+ sra = schema.SCHEDULE_REFRESH_ATTENDEES
+ pendingAttendees = (yield Select(
+ [sra.ATTENDEE, ],
+ From=sra,
+ Where=sra.RESOURCE_ID == organizer_resource.id(),
+ ).on(txn))
+ pendingAttendees = [row[0] for row in pendingAttendees]
+ attendeesToRefresh = set(attendees) - set(pendingAttendees)
+ for attendee in attendeesToRefresh:
+ yield Insert(
+ {
+ sra.RESOURCE_ID: organizer_resource.id(),
+ sra.ATTENDEE: attendee,
+ }
+ ).on(txn)
- if allAttendees:
- # See if there is already a pending refresh and merge current attendees into that list,
- # otherwise just mark all attendees as pending
- sra = schema.SCHEDULE_REFRESH_ATTENDEES
- pendingAttendees = (yield Select(
- [sra.ATTENDEE, ],
- From=sra,
- Where=sra.RESOURCE_ID == organizer_resource.id(),
- ).on(txn))
- pendingAttendees = [row[0] for row in pendingAttendees]
- attendeesToRefresh = set(allAttendees) - set(pendingAttendees)
- for attendee in attendeesToRefresh:
- yield Insert(
- {
- sra.RESOURCE_ID: organizer_resource.id(),
- sra.ATTENDEE: attendee,
- }
- ).on(txn)
+ # Always queue up new work - coalescing happens when work is executed
+ notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchDelaySeconds)
+ yield txn.enqueue(
+ cls,
+ homeResourceID=organizer_resource._home.id(),
+ resourceID=organizer_resource.id(),
+ notBefore=notBefore
+ )
- # If there were no previous attendees queued that means we need to kick off new work
- notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchDelaySeconds)
- yield txn.enqueue(
- cls,
- homeResourceID=organizer_resource._home.id(),
- resourceID=organizer_resource.id(),
- notBefore=notBefore
- )
-
@inlineCallbacks
def doWork(self):
+ # Look for other work items for this resource and ignore this one if other later ones exist
+ srw = schema.SCHEDULE_REFRESH_WORK
+ rows = (yield Select(
+ (srw.WORK_ID,),
+ From=srw,
+ Where=(srw.HOME_RESOURCE_ID == self.homeResourceID).And(
+ srw.RESOURCE_ID == self.resourceID),
+ ).on(self.transaction))
+ if rows:
+ log.debug("Schedule refresh for resource-id: {rid} - ignored", rid=self.resourceID)
+ returnValue(None)
+
log.debug("Schedule refresh for resource-id: {rid}", rid=self.resourceID)
- # Get list of pending attendees and split into batch to process
+ # Get the unique list of pending attendees and split into batch to process
# TODO: do a DELETE ... and rownum <= N returning attendee - but have to fix Oracle to
- # handle multi-row returning. Would be better than entire select + delete of each one
+ # handle multi-row returning. Would be better than entire select + delete of each one,
+ # but need to make sure to use UNIQUE as there may be duplicate attendees.
sra = schema.SCHEDULE_REFRESH_ATTENDEES
pendingAttendees = (yield Select(
[sra.ATTENDEE, ],
From=sra,
Where=sra.RESOURCE_ID == self.resourceID,
).on(self.transaction))
- pendingAttendees = [row[0] for row in pendingAttendees]
+ pendingAttendees = list(set([row[0] for row in pendingAttendees]))
# Nothing left so done
if len(pendingAttendees) == 0:
@@ -1017,16 +1071,15 @@
Where=(sra.RESOURCE_ID == self.resourceID).And(sra.ATTENDEE.In(Parameter("attendeesToProcess", len(attendeesToProcess))))
).on(self.transaction, attendeesToProcess=attendeesToProcess)
- # Always reschedule work item, even if nothing appears to be left. This should take care of
- # any race condition where another transaction adding attendees to refresh thinks there is still
- # a pending work item and does not reschedule one itself.
- notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchDelaySeconds)
- yield self.transaction.enqueue(
- self.__class__,
- homeResourceID=self.homeResourceID,
- resourceID=self.resourceID,
- notBefore=notBefore
- )
+ # Reschedule work item if pending attendees remain.
+ if len(pendingAttendees) != 0:
+ notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchIntervalSeconds)
+ yield self.transaction.enqueue(
+ self.__class__,
+ homeResourceID=self.homeResourceID,
+ resourceID=self.resourceID,
+ notBefore=notBefore
+ )
# Do refresh
yield self._doDelayedRefresh(attendeesToProcess)
Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/test/test_pocessing.py
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/test/test_pocessing.py 2013-10-08 23:02:09 UTC (rev 11797)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/test/test_pocessing.py 2013-10-09 14:06:58 UTC (rev 11798)
@@ -34,7 +34,7 @@
self.batches = 0
- def _enqueueBatchRefresh(self):
+ def _enqueueBatchRefresh(self, exclude_attendees):
self.batches += 1
@@ -69,7 +69,7 @@
def id(self):
- return None
+ return 1
@@ -107,6 +107,7 @@
processor = FakeImplicitProcessor()
processor.txn = ""
processor.uid = "12345-67890"
+ processor.recipient_calendar_resource = FakeResource()
processor.recipient_calendar = calendar
yield processor.queueAttendeeUpdate(("urn:uuid:user02", "urn:uuid:user01",))
self.assertEqual(processor.batches, 0)
@@ -134,6 +135,7 @@
processor = FakeImplicitProcessor()
processor.txn = ""
processor.uid = "12345-67890"
+ processor.recipient_calendar_resource = FakeResource()
processor.recipient_calendar = calendar
yield processor.queueAttendeeUpdate(("urn:uuid:user02", "urn:uuid:user01",))
self.assertEqual(processor.batches, 1)
@@ -219,7 +221,7 @@
processor.txn = ""
processor.recipient_calendar = calendar.duplicate()
processor.uid = processor.recipient_calendar.newUID()
- processor.recipient_calendar_resource = None
+ processor.recipient_calendar_resource = FakeResource()
processor.message = itip.duplicate()
processor.message.newUID(processor.uid)
processor.originator = LocalCalendarUser(None, None)
Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql 2013-10-08 23:02:09 UTC (rev 11797)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql 2013-10-09 14:06:58 UTC (rev 11798)
@@ -697,16 +697,31 @@
RESOURCE_ID integer not null references CALENDAR_OBJECT on delete cascade
);
+create index SCHEDULE_REFRESH_WORK_HOME_RESOURCE_ID on
+ SCHEDULE_REFRESH_WORK(HOME_RESOURCE_ID);
create index SCHEDULE_REFRESH_WORK_RESOURCE_ID on
SCHEDULE_REFRESH_WORK(RESOURCE_ID);
create table SCHEDULE_REFRESH_ATTENDEES (
RESOURCE_ID integer not null references CALENDAR_OBJECT on delete cascade,
- ATTENDEE varchar(255),
-
- primary key (RESOURCE_ID, ATTENDEE)
+ ATTENDEE varchar(255) not null
);
+create index SCHEDULE_REFRESH_ATTENDEES_RESOURCE_ID_ATTENDEE on
+ SCHEDULE_REFRESH_ATTENDEES(RESOURCE_ID, ATTENDEE);
+
+------------------------------
+-- Schedule Auto Reply Work --
+------------------------------
+
+--create table SCHEDULE_AUTO_REPLY_WORK (
+-- WORK_ID integer primary key default nextval('WORKITEM_SEQ') not null, -- implicit index
+-- NOT_BEFORE timestamp default timezone('UTC', CURRENT_TIMESTAMP),
+-- HOME_RESOURCE_ID integer not null references CALENDAR_HOME on delete cascade,
+-- RESOURCE_ID integer not null references CALENDAR_OBJECT on delete cascade,
+-- PARTSTAT varchar(255) not null
+--);
+
--------------------
-- Schema Version --
--------------------
Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_25_to_26.sql
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_25_to_26.sql 2013-10-08 23:02:09 UTC (rev 11797)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_25_to_26.sql 2013-10-09 14:06:58 UTC (rev 11798)
@@ -22,19 +22,23 @@
create table SCHEDULE_REFRESH_WORK (
WORK_ID integer primary key default nextval('WORKITEM_SEQ') not null, -- implicit index
NOT_BEFORE timestamp default timezone('UTC', CURRENT_TIMESTAMP),
+ HOME_RESOURCE_ID integer not null references CALENDAR_HOME on delete cascade,
RESOURCE_ID integer not null references CALENDAR_OBJECT on delete cascade
);
+create index SCHEDULE_REFRESH_WORK_HOME_RESOURCE_ID on
+ SCHEDULE_REFRESH_WORK(HOME_RESOURCE_ID);
create index SCHEDULE_REFRESH_WORK_RESOURCE_ID on
SCHEDULE_REFRESH_WORK(RESOURCE_ID);
create table SCHEDULE_REFRESH_ATTENDEES (
RESOURCE_ID integer not null references CALENDAR_OBJECT on delete cascade,
- ATTENDEE varchar(255),
-
- primary key (RESOURCE_ID, ATTENDEE)
+ ATTENDEE varchar(255) not null
);
+create index SCHEDULE_REFRESH_ATTENDEES_RESOURCE_ID_ATTENDEE on
+ SCHEDULE_REFRESH_ATTENDEES(RESOURCE_ID, ATTENDEE);
+
-- Now update the version
-- No data upgrades
update CALENDARSERVER set VALUE = '26' where NAME = 'VERSION';
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20131009/1e33ab2f/attachment-0001.html>
More information about the calendarserver-changes
mailing list