[CalendarServer-changes] [11802] CalendarServer/branches/users/cdaboo/scheduling-queue-refresh
source_changes at macosforge.org
source_changes at macosforge.org
Wed Oct 9 19:47:15 PDT 2013
Revision: 11802
http://trac.calendarserver.org//changeset/11802
Author: cdaboo at apple.com
Date: 2013-10-09 19:47:15 -0700 (Wed, 09 Oct 2013)
Log Message:
-----------
Work queue item for attendee auto-reply processing.
Modified Paths:
--------------
CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twistedcaldav/stdconfig.py
CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/implicit.py
CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py
CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current-oracle-dialect.sql
CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql
CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_25_to_26.sql
CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_25_to_26.sql
Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twistedcaldav/stdconfig.py 2013-10-09 23:16:20 UTC (rev 11801)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twistedcaldav/stdconfig.py 2013-10-10 02:47:15 UTC (rev 11802)
@@ -713,6 +713,7 @@
"AttendeeRefreshBatchDelaySeconds" : 5, # Time after an iTIP REPLY for first batched attendee refresh
"AttendeeRefreshBatchIntervalSeconds" : 5, # Time between attendee batch refreshes
"AttendeeRefreshCountLimit" : 50, # Number of attendees above which attendee refreshes are suppressed: 0 - no limit
+ "AutoReplyDelaySeconds" : 5, # Time delay for sending an auto reply iTIP message
"UIDLockTimeoutSeconds" : 60, # Time for implicit UID lock timeout
"UIDLockExpirySeconds" : 300, # Expiration time for UID lock,
"PrincipalHostAliases" : [], # Host names matched in http(s) CUAs
Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/implicit.py
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/implicit.py 2013-10-09 23:16:20 UTC (rev 11801)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/implicit.py 2013-10-10 02:47:15 UTC (rev 11802)
@@ -388,14 +388,14 @@
@inlineCallbacks
- def sendAttendeeReply(self, txn, resource, calendar, attendee):
+ def sendAttendeeReply(self, txn, resource):
self.txn = txn
self.resource = resource
self.calendar_home = self.resource.parentCollection().ownerHome()
- self.calendar = calendar
+ self.calendar = (yield self.resource.componentForUser())
self.action = "modify"
self.state = "attendee"
@@ -405,8 +405,8 @@
# Get some useful information from the calendar
yield self.extractCalendarData()
- self.originator = self.attendee = attendee.principal.canonicalCalendarUserAddress()
- self.attendeePrincipal = attendee.principal
+ self.attendeePrincipal = self.calendar_home.directoryService().recordWithUID(self.calendar_home.uid())
+ self.originator = self.attendee = self.attendeePrincipal.canonicalCalendarUserAddress()
result = (yield self.scheduleWithOrganizer())
Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py 2013-10-09 23:16:20 UTC (rev 11801)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py 2013-10-10 02:47:15 UTC (rev 11802)
@@ -26,7 +26,6 @@
from twext.web2.dav.method.report import NumberOfMatchesWithinLimits
from twext.web2.http import HTTPError
-from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue
from twistedcaldav import customxml, caldavxml
@@ -258,18 +257,6 @@
@type exclude_attendees: C{list}
"""
- # When doing auto-processing of replies, only refresh attendees when the last auto-accept is done.
- # Note that when we do this we also need to refresh the attendee that is generating the reply because they
- # are no longer up to date with changes of other auto-accept attendees. See docstr for sendAttendeeAutoReply
- # below for more details of what is going on here.
- if getattr(self.txn, "auto_reply_processing_count", 0) > 1:
- log.debug("ImplicitProcessing - refreshing UID: '%s', Suppressed: %s" % (self.uid, self.txn.auto_reply_processing_count,))
- self.txn.auto_reply_suppressed = True
- returnValue(None)
- if getattr(self.txn, "auto_reply_suppressed", False):
- log.debug("ImplicitProcessing - refreshing UID: '%s', Suppression lifted" % (self.uid,))
- exclude_attendees = ()
-
self.uid = self.recipient_calendar.resourceUID()
# Check for batched refreshes
@@ -444,9 +431,8 @@
if send_reply:
# Track outstanding auto-reply processing
- self.txn.auto_reply_processing_count = getattr(self.txn, "auto_reply_processing_count", 0) + 1
- log.debug("ImplicitProcessing - recipient '%s' processing UID: '%s' - auto-reply queued: %s" % (self.recipient.cuaddr, self.uid, self.txn.auto_reply_processing_count,))
- reactor.callLater(2.0, self.sendAttendeeAutoReply, *(new_calendar, new_resource, partstat))
+ log.debug("ImplicitProcessing - recipient '%s' processing UID: '%s' - auto-reply queued" % (self.recipient.cuaddr, self.uid,))
+ self.ScheduleAutoReplyWork.autoReply(self.txn, new_resource, partstat)
# Build the schedule-changes XML element
changes = customxml.ScheduleChanges(
@@ -485,9 +471,8 @@
if send_reply:
# Track outstanding auto-reply processing
- self.txn.auto_reply_processing_count = getattr(self.txn, "auto_reply_processing_count", 0) + 1
- log.debug("ImplicitProcessing - recipient '%s' processing UID: '%s' - auto-reply queued: %s" % (self.recipient.cuaddr, self.uid, self.txn.auto_reply_processing_count,))
- reactor.callLater(2.0, self.sendAttendeeAutoReply, *(new_calendar, new_resource, partstat))
+ log.debug("ImplicitProcessing - recipient '%s' processing UID: '%s' - auto-reply queued" % (self.recipient.cuaddr, self.uid,))
+ self.ScheduleAutoReplyWork.autoReply(self.txn, new_resource, partstat)
# Build the schedule-changes XML element
update_details = []
@@ -585,66 +570,6 @@
@inlineCallbacks
- def sendAttendeeAutoReply(self, calendar, resource, partstat):
- """
- Auto-process the calendar option to generate automatic accept/decline status and
- send a reply if needed.
-
- There is some tricky behavior here: when multiple auto-accept attendees are present in a
- calendar object, we want to suppress the processing of other attendee refreshes until all
- auto-accepts have replied, to avoid a flood of refreshes. We do that by tracking the pending
- auto-replies via a "auto_reply_processing_count" attribute on the original txn objection (even
- though that has been committed). We also use a "auto_reply_suppressed" attribute on that txn
- to indicate when suppression has occurred, to ensure that when the refresh is finally sent, we
- send it to everyone to make sure all are in sync. In order for the actual refreshes to be
- suppressed we have to "transfer" those two attributes from the original txn to the new one
- used to send the reply. Then we transfer "auto_reply_suppressed" back when done, and decrement
- "auto_reply_processing_count" (all done under a UID lock to prevent race conditions).
-
- @param calendar: calendar data to examine
- @type calendar: L{Component}
-
- @return: L{Component} for the new calendar data to write
- """
-
- # The original transaction is still around but likely committed at this point, so we need a brand new
- # transaction to do this work.
- txn = yield self.txn.store().newTransaction("Attendee (%s) auto-reply for UID: %s" % (self.recipient.cuaddr, self.uid,))
-
- aborted = False
- try:
- # We need to get the UID lock for implicit processing whilst we send the auto-reply
- # as the Organizer processing will attempt to write out data to other attendees to
- # refresh them. To prevent a race we need a lock.
- yield NamedLock.acquire(txn, "ImplicitUIDLock:%s" % (hashlib.md5(calendar.resourceUID()).hexdigest(),))
-
- # Must be done after acquiring the lock to avoid a race-condition
- txn.auto_reply_processing_count = getattr(self.txn, "auto_reply_processing_count", 0)
- txn.auto_reply_suppressed = getattr(self.txn, "auto_reply_suppressed", False)
-
- # Send out a reply
- log.debug("ImplicitProcessing - recipient '%s' processing UID: '%s' - auto-reply: %s" % (self.recipient.cuaddr, self.uid, partstat))
- from txdav.caldav.datastore.scheduling.implicit import ImplicitScheduler
- scheduler = ImplicitScheduler()
- yield scheduler.sendAttendeeReply(txn, resource, calendar, self.recipient)
- except Exception, e:
- log.debug("ImplicitProcessing - auto-reply exception UID: '%s', %s" % (self.uid, str(e)))
- aborted = True
- except:
- log.debug("ImplicitProcessing - auto-reply bare exception UID: '%s'" % (self.uid,))
- aborted = True
-
- # Track outstanding auto-reply processing - must be done before commit/abort which releases the lock
- self.txn.auto_reply_processing_count = getattr(self.txn, "auto_reply_processing_count", 0) - 1
- self.txn.auto_reply_suppressed = txn.auto_reply_suppressed
-
- if aborted:
- yield txn.abort()
- else:
- yield txn.commit()
-
-
- @inlineCallbacks
def checkAttendeeAutoReply(self, calendar, automode):
"""
Check whether a reply to the given iTIP message is needed and if so make the
@@ -1133,3 +1058,83 @@
organizer_resource,
only_attendees=only_attendees,
)
+
+
+ class ScheduleAutoReplyWork(WorkItem, fromTable(schema.SCHEDULE_AUTO_REPLY_WORK)):
+ """
+ The associated work item table is SCHEDULE_AUTO_REPLY_WORK.
+
+ This work item is used to send auto-reply iTIP messages after the calendar data for the
+ auto-accept user has been written to the user calendar.
+ """
+
+ group = property(lambda self: "ScheduleAutoReplyWork:%s" % (self.resourceID,))
+
+
+ @classmethod
+ @inlineCallbacks
+ def autoReply(cls, txn, resource, partstat):
+ # Always queue up new work - coalescing happens when work is executed
+ notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AutoReplyDelaySeconds)
+ yield txn.enqueue(
+ cls,
+ homeResourceID=resource._home.id(),
+ resourceID=resource.id(),
+ partstat=partstat,
+ notBefore=notBefore,
+ )
+
+
+ @inlineCallbacks
+ def doWork(self):
+
+ log.debug("Schedule auto-reply for resource-id: {rid}", rid=self.resourceID)
+
+ # Delete all other work items with the same pushID
+ yield Delete(From=self.table,
+ Where=self.table.RESOURCE_ID == self.resourceID
+ ).on(self.transaction)
+
+ # Do reply
+ yield self._sendAttendeeAutoReply()
+
+
+ @inlineCallbacks
+ def _sendAttendeeAutoReply(self):
+ """
+ Auto-process the calendar option to generate automatic accept/decline status and
+ send a reply if needed.
+
+ We used to have logic to suppress attendee refreshes until after all auto-replies have
+ been processed. We can't do that with the work queue (easily) so we are going to ignore
+ that for now. It may not be a big deal given that the refreshes are themselves done in the
+ queue and we only do the refresh when the last queued work item is processed.
+
+ @param resource: calendar resource to process
+ @type resource: L{CalendarObject}
+ @param partstat: new partstat value
+ @type partstat: C{str}
+ """
+
+ home = (yield self.transaction.calendarHomeWithResourceID(self.homeResourceID))
+ resource = (yield home.objectResourceWithID(self.resourceID))
+ if resource is not None:
+ try:
+ # We need to get the UID lock for implicit processing whilst we send the auto-reply
+ # as the Organizer processing will attempt to write out data to other attendees to
+ # refresh them. To prevent a race we need a lock.
+ yield NamedLock.acquire(self.transaction, "ImplicitUIDLock:%s" % (hashlib.md5(resource.uid()).hexdigest(),))
+
+ # Send out a reply
+ log.debug("ImplicitProcessing - recipient '%s' processing UID: '%s' - auto-reply: %s" % (home.uid(), resource.uid(), self.partstat))
+ from txdav.caldav.datastore.scheduling.implicit import ImplicitScheduler
+ scheduler = ImplicitScheduler()
+ yield scheduler.sendAttendeeReply(self.transaction, resource)
+ except Exception, e:
+ log.debug("ImplicitProcessing - auto-reply exception UID: '%s', %s" % (resource.uid(), str(e)))
+ raise
+ except:
+ log.debug("ImplicitProcessing - auto-reply bare exception UID: '%s'" % (resource.uid(),))
+ raise
+ else:
+ log.debug("ImplicitProcessing - skipping auto-reply of missing ID: '{rid}'", rid=self.resourceID)
Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current-oracle-dialect.sql
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current-oracle-dialect.sql 2013-10-09 23:16:20 UTC (rev 11801)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current-oracle-dialect.sql 2013-10-10 02:47:15 UTC (rev 11802)
@@ -363,15 +363,23 @@
create table SCHEDULE_REFRESH_WORK (
"WORK_ID" integer primary key not null,
"NOT_BEFORE" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+ "HOME_RESOURCE_ID" integer not null references CALENDAR_HOME on delete cascade,
"RESOURCE_ID" integer not null references CALENDAR_OBJECT on delete cascade
);
create table SCHEDULE_REFRESH_ATTENDEES (
"RESOURCE_ID" integer not null references CALENDAR_OBJECT on delete cascade,
- "ATTENDEE" nvarchar2(255),
- primary key("RESOURCE_ID", "ATTENDEE")
+ "ATTENDEE" nvarchar2(255)
);
+create table SCHEDULE_AUTO_REPLY_WORK (
+ "WORK_ID" integer primary key not null,
+ "NOT_BEFORE" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+ "HOME_RESOURCE_ID" integer not null references CALENDAR_HOME on delete cascade,
+ "RESOURCE_ID" integer not null references CALENDAR_OBJECT on delete cascade,
+ "PARTSTAT" nvarchar2(255)
+);
+
create table CALENDARSERVER (
"NAME" nvarchar2(255) primary key,
"VALUE" nvarchar2(255)
@@ -501,7 +509,24 @@
RESOURCE_ID
);
+create index SCHEDULE_REFRESH_WORK_26084c7b on SCHEDULE_REFRESH_WORK (
+ HOME_RESOURCE_ID
+);
+
create index SCHEDULE_REFRESH_WORK_989efe54 on SCHEDULE_REFRESH_WORK (
RESOURCE_ID
);
+create index SCHEDULE_REFRESH_ATTE_83053b91 on SCHEDULE_REFRESH_ATTENDEES (
+ RESOURCE_ID,
+ ATTENDEE
+);
+
+create index SCHEDULE_AUTO_REPLY_W_0256478d on SCHEDULE_AUTO_REPLY_WORK (
+ HOME_RESOURCE_ID
+);
+
+create index SCHEDULE_AUTO_REPLY_W_0755e754 on SCHEDULE_AUTO_REPLY_WORK (
+ RESOURCE_ID
+);
+
Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql 2013-10-09 23:16:20 UTC (rev 11801)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql 2013-10-10 02:47:15 UTC (rev 11802)
@@ -714,14 +714,19 @@
-- Schedule Auto Reply Work --
------------------------------
---create table SCHEDULE_AUTO_REPLY_WORK (
--- WORK_ID integer primary key default nextval('WORKITEM_SEQ') not null, -- implicit index
--- NOT_BEFORE timestamp default timezone('UTC', CURRENT_TIMESTAMP),
--- HOME_RESOURCE_ID integer not null references CALENDAR_HOME on delete cascade,
--- RESOURCE_ID integer not null references CALENDAR_OBJECT on delete cascade,
--- PARTSTAT varchar(255) not null
---);
+create table SCHEDULE_AUTO_REPLY_WORK (
+ WORK_ID integer primary key default nextval('WORKITEM_SEQ') not null, -- implicit index
+ NOT_BEFORE timestamp default timezone('UTC', CURRENT_TIMESTAMP),
+ HOME_RESOURCE_ID integer not null references CALENDAR_HOME on delete cascade,
+ RESOURCE_ID integer not null references CALENDAR_OBJECT on delete cascade,
+ PARTSTAT varchar(255) not null
+);
+create index SCHEDULE_AUTO_REPLY_WORK_HOME_RESOURCE_ID on
+ SCHEDULE_AUTO_REPLY_WORK(HOME_RESOURCE_ID);
+create index SCHEDULE_AUTO_REPLY_WORK_RESOURCE_ID on
+ SCHEDULE_AUTO_REPLY_WORK(RESOURCE_ID);
+
--------------------
-- Schema Version --
--------------------
Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_25_to_26.sql
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_25_to_26.sql 2013-10-09 23:16:20 UTC (rev 11801)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_25_to_26.sql 2013-10-10 02:47:15 UTC (rev 11802)
@@ -23,20 +23,44 @@
create table SCHEDULE_REFRESH_WORK (
"WORK_ID" integer primary key not null,
"NOT_BEFORE" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+ "HOME_RESOURCE_ID" integer not null references CALENDAR_HOME on delete cascade,
"RESOURCE_ID" integer not null references CALENDAR_OBJECT on delete cascade
);
+create index SCHEDULE_REFRESH_WORK_26084c7b on SCHEDULE_REFRESH_WORK (
+ HOME_RESOURCE_ID
+);
+
create index SCHEDULE_REFRESH_WORK_989efe54 on SCHEDULE_REFRESH_WORK (
RESOURCE_ID
);
-
create table SCHEDULE_REFRESH_ATTENDEES (
"RESOURCE_ID" integer not null references CALENDAR_OBJECT on delete cascade,
- "ATTENDEE" nvarchar2(255),
- primary key("RESOURCE_ID", "ATTENDEE")
+ "ATTENDEE" nvarchar2(255)
);
+create index SCHEDULE_REFRESH_ATTE_83053b91 on SCHEDULE_REFRESH_ATTENDEES (
+ RESOURCE_ID,
+ ATTENDEE
+);
+
+create table SCHEDULE_AUTO_REPLY_WORK (
+ "WORK_ID" integer primary key not null,
+ "NOT_BEFORE" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+ "HOME_RESOURCE_ID" integer not null references CALENDAR_HOME on delete cascade,
+ "RESOURCE_ID" integer not null references CALENDAR_OBJECT on delete cascade,
+ "PARTSTAT" nvarchar2(255)
+);
+
+create index SCHEDULE_AUTO_REPLY_W_0256478d on SCHEDULE_AUTO_REPLY_WORK (
+ HOME_RESOURCE_ID
+);
+
+create index SCHEDULE_AUTO_REPLY_W_0755e754 on SCHEDULE_AUTO_REPLY_WORK (
+ RESOURCE_ID
+);
+
-- Now update the version
-- No data upgrades
update CALENDARSERVER set VALUE = '26' where NAME = 'VERSION';
Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_25_to_26.sql
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_25_to_26.sql 2013-10-09 23:16:20 UTC (rev 11801)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_25_to_26.sql 2013-10-10 02:47:15 UTC (rev 11802)
@@ -19,6 +19,7 @@
---------------------------------------------------
-- New tables
+
create table SCHEDULE_REFRESH_WORK (
WORK_ID integer primary key default nextval('WORKITEM_SEQ') not null, -- implicit index
NOT_BEFORE timestamp default timezone('UTC', CURRENT_TIMESTAMP),
@@ -39,6 +40,20 @@
create index SCHEDULE_REFRESH_ATTENDEES_RESOURCE_ID_ATTENDEE on
SCHEDULE_REFRESH_ATTENDEES(RESOURCE_ID, ATTENDEE);
+
+ create table SCHEDULE_AUTO_REPLY_WORK (
+ WORK_ID integer primary key default nextval('WORKITEM_SEQ') not null, -- implicit index
+ NOT_BEFORE timestamp default timezone('UTC', CURRENT_TIMESTAMP),
+ HOME_RESOURCE_ID integer not null references CALENDAR_HOME on delete cascade,
+ RESOURCE_ID integer not null references CALENDAR_OBJECT on delete cascade,
+ PARTSTAT varchar(255) not null
+);
+
+create index SCHEDULE_AUTO_REPLY_WORK_HOME_RESOURCE_ID on
+ SCHEDULE_AUTO_REPLY_WORK(HOME_RESOURCE_ID);
+create index SCHEDULE_AUTO_REPLY_WORK_RESOURCE_ID on
+ SCHEDULE_AUTO_REPLY_WORK(RESOURCE_ID);
+
-- Now update the version
-- No data upgrades
update CALENDARSERVER set VALUE = '26' where NAME = 'VERSION';
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20131009/89209e83/attachment-0001.html>
More information about the calendarserver-changes
mailing list