Revision: 11811 http://trac.calendarserver.org//changeset/11811 Author: cdaboo@apple.com Date: 2013-10-13 07:59:35 -0700 (Sun, 13 Oct 2013) Log Message: ----------- Work items for scheduling replies. Modified Paths: -------------- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twistedcaldav/stdconfig.py CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/implicit.py CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/test/test_implicit.py CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current-oracle-dialect.sql CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_25_to_26.sql CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_25_to_26.sql Added Paths: ----------- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/work.py Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twistedcaldav/stdconfig.py =================================================================== --- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twistedcaldav/stdconfig.py 2013-10-13 14:57:55 UTC (rev 11810) +++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twistedcaldav/stdconfig.py 2013-10-13 14:59:35 UTC (rev 11811) @@ -714,6 +714,8 @@ "AttendeeRefreshBatchIntervalSeconds" : 5, # Time between attendee batch refreshes "AttendeeRefreshCountLimit" : 50, # Number of attendees above which attendee refreshes are suppressed: 0 - no limit "AutoReplyDelaySeconds" : 5, # Time delay for sending an auto reply iTIP message + "QueuedRequestDelaySeconds" : 5, # Number of seconds delay for a queued scheduling request/cancel + "QueuedReplyDelaySeconds" : 1, # Number of seconds delay for a queued scheduling reply "UIDLockTimeoutSeconds" : 60, # Time for implicit UID lock timeout "UIDLockExpirySeconds" : 300, # Expiration time for UID lock, "PrincipalHostAliases" : [], # Host names matched in http(s) CUAs Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/implicit.py =================================================================== --- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/implicit.py 2013-10-13 14:57:55 UTC (rev 11810) +++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/implicit.py 2013-10-13 14:59:35 UTC (rev 11811) @@ -34,6 +34,8 @@ from txdav.caldav.datastore.scheduling.icaldiff import iCalDiff from txdav.caldav.datastore.scheduling.itip import iTipGenerator, iTIPRequestStatus from txdav.caldav.datastore.scheduling.utils import getCalendarObjectForRecord +from txdav.caldav.datastore.scheduling.work import ScheduleReplyWork, \ + ScheduleReplyCancelWork import collections @@ -1306,12 +1308,22 @@ self.logItems["itip.reply"] = "reply" - itipmsg = iTipGenerator.generateAttendeeReply(self.calendar, self.attendee, changedRids=changedRids) +# itipmsg = iTipGenerator.generateAttendeeReply(self.calendar, self.attendee, changedRids=changedRids) +# +# # Send scheduling message +# return self.sendToOrganizer("REPLY", itipmsg) - # Send scheduling message - return self.sendToOrganizer("REPLY", itipmsg) + # Always make it look like scheduling succeeded when queuing + self.calendar.setParameterToValueForPropertyWithValue( + "SCHEDULE-STATUS", + iTIPRequestStatus.MESSAGE_DELIVERED_CODE, + "ORGANIZER", + self.organizer, + ) + return ScheduleReplyWork.reply(self.txn, self.calendar_home, self.resource, changedRids, self.attendee) + def scheduleCancelWithOrganizer(self): # First make sure we are allowed to schedule @@ -1319,12 +1331,15 @@ self.logItems["itip.reply"] = "cancel" - itipmsg = iTipGenerator.generateAttendeeReply(self.calendar, self.attendee, force_decline=True) +# itipmsg = iTipGenerator.generateAttendeeReply(self.calendar, self.attendee, force_decline=True) +# +# # Send scheduling message +# return self.sendToOrganizer("CANCEL", itipmsg) - # Send scheduling message - return self.sendToOrganizer("CANCEL", itipmsg) + return ScheduleReplyCancelWork.replyCancel(self.txn, self.calendar_home, self.calendar, self.attendee) + @inlineCallbacks def sendToOrganizer(self, action, itipmsg): # Send scheduling message @@ -1333,10 +1348,6 @@ scheduler = self.makeScheduler() # Do the PUT processing - def _gotResponse(response): - self.handleSchedulingResponse(response, False) - log.info("Implicit %s - attendee: '%s' to organizer: '%s', UID: '%s'" % (action, self.attendee, self.organizer, self.uid,)) - d = scheduler.doSchedulingViaPUT(self.originator, (self.organizer,), itipmsg, internal_request=True) - d.addCallback(_gotResponse) - return d + response = (yield scheduler.doSchedulingViaPUT(self.originator, (self.organizer,), itipmsg, internal_request=True)) + self.handleSchedulingResponse(response, False) Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py =================================================================== --- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py 2013-10-13 14:57:55 UTC (rev 11810) +++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py 2013-10-13 14:59:35 UTC (rev 11811) @@ -18,10 +18,6 @@ from pycalendar.duration import PyCalendarDuration from pycalendar.timezone import PyCalendarTimezone -from twext.enterprise.dal.record import fromTable -from twext.enterprise.dal.syntax import Select, Delete, Insert, Parameter -from twext.enterprise.locking import NamedLock -from twext.enterprise.queue import WorkItem from twext.python.log import Logger from twext.web2.dav.method.report import NumberOfMatchesWithinLimits from twext.web2.http import HTTPError @@ -37,11 +33,11 @@ from txdav.caldav.datastore.scheduling.freebusy import generateFreeBusyInfo from txdav.caldav.datastore.scheduling.itip import iTipProcessing, iTIPRequestStatus from txdav.caldav.datastore.scheduling.utils import getCalendarObjectForRecord +from txdav.caldav.datastore.scheduling.work import ScheduleRefreshWork, \ + ScheduleAutoReplyWork from txdav.caldav.icalendarstore import ComponentUpdateState, ComponentRemoveState -from txdav.common.datastore.sql_tables import schema import collections -import datetime import hashlib import uuid @@ -278,7 +274,7 @@ @param attendees: the list of attendees to refresh @type attendees: C{list} """ - return self.ScheduleRefreshWork.refreshAttendees( + return ScheduleRefreshWork.refreshAttendees( self.txn, self.recipient_calendar_resource, self.recipient_calendar, @@ -432,7 +428,7 @@ if send_reply: # Track outstanding auto-reply processing log.debug("ImplicitProcessing - recipient '%s' processing UID: '%s' - auto-reply queued" % (self.recipient.cuaddr, self.uid,)) - self.ScheduleAutoReplyWork.autoReply(self.txn, new_resource, partstat) + ScheduleAutoReplyWork.autoReply(self.txn, new_resource, partstat) # Build the schedule-changes XML element changes = customxml.ScheduleChanges( @@ -472,7 +468,7 @@ if send_reply: # Track outstanding auto-reply processing log.debug("ImplicitProcessing - recipient '%s' processing UID: '%s' - auto-reply queued" % (self.recipient.cuaddr, self.uid,)) - self.ScheduleAutoReplyWork.autoReply(self.txn, new_resource, partstat) + ScheduleAutoReplyWork.autoReply(self.txn, new_resource, partstat) # Build the schedule-changes XML element update_details = [] @@ -885,256 +881,3 @@ yield self.deleteCalendarResource(recipient_resource) returnValue(True) - - - class ScheduleRefreshWork(WorkItem, fromTable(schema.SCHEDULE_REFRESH_WORK)): - """ - The associated work item table is SCHEDULE_REFRESH_WORK. - - This work item is used to trigger an iTIP refresh of attendees. This happens when one attendee - replies to an invite, and we want to have the others attendees see that change - eventually. We - are going to use the SCHEDULE_REFRESH_ATTENDEES table to track the list of attendees needing - a refresh for each calendar object resource (identified by the organizer's resource-id for that - calendar object). We want to do refreshes in batches with a configurable time between each batch. - - The tricky part here is handling race conditions, where two or more attendee replies happen at the - same time, or happen whilst a previously queued refresh has started batch processing. Here is how - we will handle that: - - 1) Each time a refresh is needed we will add all attendees to the SCHEDULE_REFRESH_ATTENDEES table. - This will happen even if those attendees are currently listed in that table. We ensure the table is - not unique wrt to attendees - this means that two simultaneous refreshes can happily insert the - same set of attendees without running into unique constraints and thus without having to use - savepoints to cope with that. This will mean duplicate attendees listed in the table, but we take - care of that when executing the work item, as per the next point. - - 2) When a work item is triggered we get the set of unique attendees needing a refresh from the - SCHEDULE_REFRESH_ATTENDEES table. We split out a batch of those to actually refresh - with the - others being left in the table as-is. We then remove the batch of attendees from the - SCHEDULE_REFRESH_ATTENDEES table - this will remove duplicates. The refresh is then done and a - new work item scheduled to do the next batch. We only stop rescheduling work items when nothing - is found during the initial query. Note that if any refresh is done we will always reschedule work - even if we know none remain. That should handle the case where a new refresh occurs whilst - processing the last batch from a previous refresh. - - Hopefully the above methodology will deal with concurrency issues, preventing any excessive locking - or failed inserts etc. - """ - - group = property(lambda self: "ScheduleRefreshWork:%s" % (self.resourceID,)) - - - @classmethod - @inlineCallbacks - def refreshAttendees(cls, txn, organizer_resource, organizer_calendar, attendees): - # See if there is already a pending refresh and merge current attendees into that list, - # otherwise just mark all attendees as pending - sra = schema.SCHEDULE_REFRESH_ATTENDEES - pendingAttendees = (yield Select( - [sra.ATTENDEE, ], - From=sra, - Where=sra.RESOURCE_ID == organizer_resource.id(), - ).on(txn)) - pendingAttendees = [row[0] for row in pendingAttendees] - attendeesToRefresh = set(attendees) - set(pendingAttendees) - for attendee in attendeesToRefresh: - yield Insert( - { - sra.RESOURCE_ID: organizer_resource.id(), - sra.ATTENDEE: attendee, - } - ).on(txn) - - # Always queue up new work - coalescing happens when work is executed - notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchDelaySeconds) - yield txn.enqueue( - cls, - homeResourceID=organizer_resource._home.id(), - resourceID=organizer_resource.id(), - notBefore=notBefore - ) - - - @inlineCallbacks - def doWork(self): - - # Look for other work items for this resource and ignore this one if other later ones exist - srw = schema.SCHEDULE_REFRESH_WORK - rows = (yield Select( - (srw.WORK_ID,), - From=srw, - Where=(srw.HOME_RESOURCE_ID == self.homeResourceID).And( - srw.RESOURCE_ID == self.resourceID), - ).on(self.transaction)) - if rows: - log.debug("Schedule refresh for resource-id: {rid} - ignored", rid=self.resourceID) - returnValue(None) - - log.debug("Schedule refresh for resource-id: {rid}", rid=self.resourceID) - - # Get the unique list of pending attendees and split into batch to process - # TODO: do a DELETE ... and rownum <= N returning attendee - but have to fix Oracle to - # handle multi-row returning. Would be better than entire select + delete of each one, - # but need to make sure to use UNIQUE as there may be duplicate attendees. - sra = schema.SCHEDULE_REFRESH_ATTENDEES - pendingAttendees = (yield Select( - [sra.ATTENDEE, ], - From=sra, - Where=sra.RESOURCE_ID == self.resourceID, - ).on(self.transaction)) - pendingAttendees = list(set([row[0] for row in pendingAttendees])) - - # Nothing left so done - if len(pendingAttendees) == 0: - returnValue(None) - - attendeesToProcess = pendingAttendees[:config.Scheduling.Options.AttendeeRefreshBatch] - pendingAttendees = pendingAttendees[config.Scheduling.Options.AttendeeRefreshBatch:] - - yield Delete( - From=sra, - Where=(sra.RESOURCE_ID == self.resourceID).And(sra.ATTENDEE.In(Parameter("attendeesToProcess", len(attendeesToProcess)))) - ).on(self.transaction, attendeesToProcess=attendeesToProcess) - - # Reschedule work item if pending attendees remain. - if len(pendingAttendees) != 0: - notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchIntervalSeconds) - yield self.transaction.enqueue( - self.__class__, - homeResourceID=self.homeResourceID, - resourceID=self.resourceID, - notBefore=notBefore - ) - - # Do refresh - yield self._doDelayedRefresh(attendeesToProcess) - - - @inlineCallbacks - def _doDelayedRefresh(self, attendeesToProcess): - """ - Do an attendee refresh that has been delayed until after processing of the request that called it. That - requires that we create a new transaction to work with. - - @param attendeesToProcess: list of attendees to refresh. - @type attendeesToProcess: C{list} - """ - - organizer_home = (yield self.transaction.calendarHomeWithResourceID(self.homeResourceID)) - organizer_resource = (yield organizer_home.objectResourceWithID(self.resourceID)) - if organizer_resource is not None: - try: - # We need to get the UID lock for implicit processing whilst we send the auto-reply - # as the Organizer processing will attempt to write out data to other attendees to - # refresh them. To prevent a race we need a lock. - yield NamedLock.acquire(self.transaction, "ImplicitUIDLock:%s" % (hashlib.md5(organizer_resource.uid()).hexdigest(),)) - - yield self._doRefresh(organizer_resource, attendeesToProcess) - except Exception, e: - log.debug("ImplicitProcessing - refresh exception UID: '{uid}', {exc}", uid=organizer_resource.uid(), exc=str(e)) - raise - except: - log.debug("ImplicitProcessing - refresh bare exception UID: '{uid}'", uid=organizer_resource.uid()) - raise - else: - log.debug("ImplicitProcessing - skipping refresh of missing ID: '{rid}'", rid=self.resourceID) - - - @inlineCallbacks - def _doRefresh(self, organizer_resource, only_attendees): - """ - Do a refresh of attendees. - - @param organizer_resource: the resource for the organizer's calendar data - @type organizer_resource: L{DAVResource} - @param only_attendees: list of attendees to refresh (C{None} - refresh all) - @type only_attendees: C{tuple} - """ - log.debug("ImplicitProcessing - refreshing UID: '{uid}', Attendees: {att}", uid=organizer_resource.uid(), att=", ".join(only_attendees) if only_attendees else "all") - from txdav.caldav.datastore.scheduling.implicit import ImplicitScheduler - scheduler = ImplicitScheduler() - yield scheduler.refreshAllAttendeesExceptSome( - self.transaction, - organizer_resource, - only_attendees=only_attendees, - ) - - - class ScheduleAutoReplyWork(WorkItem, fromTable(schema.SCHEDULE_AUTO_REPLY_WORK)): - """ - The associated work item table is SCHEDULE_AUTO_REPLY_WORK. - - This work item is used to send auto-reply iTIP messages after the calendar data for the - auto-accept user has been written to the user calendar. - """ - - group = property(lambda self: "ScheduleAutoReplyWork:%s" % (self.resourceID,)) - - - @classmethod - @inlineCallbacks - def autoReply(cls, txn, resource, partstat): - # Always queue up new work - coalescing happens when work is executed - notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AutoReplyDelaySeconds) - yield txn.enqueue( - cls, - homeResourceID=resource._home.id(), - resourceID=resource.id(), - partstat=partstat, - notBefore=notBefore, - ) - - - @inlineCallbacks - def doWork(self): - - log.debug("Schedule auto-reply for resource-id: {rid}", rid=self.resourceID) - - # Delete all other work items with the same pushID - yield Delete(From=self.table, - Where=self.table.RESOURCE_ID == self.resourceID - ).on(self.transaction) - - # Do reply - yield self._sendAttendeeAutoReply() - - - @inlineCallbacks - def _sendAttendeeAutoReply(self): - """ - Auto-process the calendar option to generate automatic accept/decline status and - send a reply if needed. - - We used to have logic to suppress attendee refreshes until after all auto-replies have - been processed. We can't do that with the work queue (easily) so we are going to ignore - that for now. It may not be a big deal given that the refreshes are themselves done in the - queue and we only do the refresh when the last queued work item is processed. - - @param resource: calendar resource to process - @type resource: L{CalendarObject} - @param partstat: new partstat value - @type partstat: C{str} - """ - - home = (yield self.transaction.calendarHomeWithResourceID(self.homeResourceID)) - resource = (yield home.objectResourceWithID(self.resourceID)) - if resource is not None: - try: - # We need to get the UID lock for implicit processing whilst we send the auto-reply - # as the Organizer processing will attempt to write out data to other attendees to - # refresh them. To prevent a race we need a lock. - yield NamedLock.acquire(self.transaction, "ImplicitUIDLock:%s" % (hashlib.md5(resource.uid()).hexdigest(),)) - - # Send out a reply - log.debug("ImplicitProcessing - recipient '%s' processing UID: '%s' - auto-reply: %s" % (home.uid(), resource.uid(), self.partstat)) - from txdav.caldav.datastore.scheduling.implicit import ImplicitScheduler - scheduler = ImplicitScheduler() - yield scheduler.sendAttendeeReply(self.transaction, resource) - except Exception, e: - log.debug("ImplicitProcessing - auto-reply exception UID: '%s', %s" % (resource.uid(), str(e))) - raise - except: - log.debug("ImplicitProcessing - auto-reply bare exception UID: '%s'" % (resource.uid(),)) - raise - else: - log.debug("ImplicitProcessing - skipping auto-reply of missing ID: '{rid}'", rid=self.resourceID) Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/test/test_implicit.py =================================================================== --- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/test/test_implicit.py 2013-10-13 14:57:55 UTC (rev 11810) +++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/test/test_implicit.py 2013-10-13 14:59:35 UTC (rev 11811) @@ -29,7 +29,8 @@ from twistedcaldav.config import config from twistedcaldav.ical import Component -from txdav.caldav.datastore.scheduling.implicit import ImplicitScheduler +from txdav.caldav.datastore.scheduling.implicit import ImplicitScheduler, \ + ScheduleReplyWork from txdav.caldav.datastore.scheduling.scheduler import ScheduleResponseQueue from txdav.caldav.datastore.test.util import buildCalendarStore, \ buildDirectoryRecord @@ -1326,6 +1327,11 @@ yield self._setCalendarData(data2, "user02") + while True: + work = (yield ScheduleReplyWork.hasWork(self.transactionUnderTest())) + if not work: + break + list1 = (yield self._listCalendarObjects("user01", "inbox")) self.assertEqual(len(list1), 1) @@ -1396,6 +1402,11 @@ yield self._setCalendarData(data2, "user02") + while True: + work = (yield ScheduleReplyWork.hasWork(self.transactionUnderTest())) + if not work: + break + list1 = (yield self._listCalendarObjects("user01", "inbox")) self.assertEqual(len(list1), 1) @@ -1480,6 +1491,11 @@ yield self._setCalendarData(data2, "user02") + while True: + work = (yield ScheduleReplyWork.hasWork(self.transactionUnderTest())) + if not work: + break + list1 = (yield self._listCalendarObjects("user01", "inbox")) self.assertEqual(len(list1), 1) Added: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/work.py =================================================================== --- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/work.py (rev 0) +++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/work.py 2013-10-13 14:59:35 UTC (rev 11811) @@ -0,0 +1,495 @@ +# +# Copyright (c) 2013 Apple Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +from twext.enterprise.dal.record import fromTable +from twext.enterprise.dal.syntax import Select, Insert, Delete, Parameter +from twext.enterprise.locking import NamedLock +from twext.enterprise.queue import WorkItem +from twext.python.log import Logger + +from twisted.internet.defer import inlineCallbacks, returnValue + +from twistedcaldav import caldavxml +from twistedcaldav.config import config +from twistedcaldav.ical import Component + +from txdav.caldav.datastore.scheduling.itip import iTipGenerator, iTIPRequestStatus +from txdav.caldav.icalendarstore import ComponentUpdateState +from txdav.common.datastore.sql_tables import schema + +import datetime +import hashlib + +__all__ = [ + "ScheduleReplyWork", + "ScheduleReplyCancelWork", + "ScheduleRefreshWork", + "ScheduleAutoReplyWork", +] + +log = Logger() + +class ScheduleWorkMixin(object): + """ + Base class for common schedule work item behavior. + """ + + # Schedule work is grouped based on calendar object UID + group = property(lambda self: "ScheduleWork:%s" % (self.icalendarUid,)) + + + +class ScheduleReplyWorkMixin(ScheduleWorkMixin): + + + def makeScheduler(self, home): + """ + Convenience method which we can override in unit tests to make testing easier. + """ + from txdav.caldav.datastore.scheduling.caldav.scheduler import CalDAVScheduler + return CalDAVScheduler(self.transaction, home.uid()) + + + @inlineCallbacks + def sendToOrganizer(self, home, action, itipmsg, originator, recipient): + + # Send scheduling message + + # This is a local CALDAV scheduling operation. + scheduler = self.makeScheduler(home) + + # Do the PUT processing + log.info("Implicit %s - attendee: '%s' to organizer: '%s', UID: '%s'" % (action, originator, recipient, itipmsg.resourceUID(),)) + response = (yield scheduler.doSchedulingViaPUT(originator, (recipient,), itipmsg, internal_request=True)) + returnValue(response) + + + +class ScheduleReplyWork(WorkItem, fromTable(schema.SCHEDULE_REPLY_WORK), ScheduleReplyWorkMixin): + """ + The associated work item table is SCHEDULE_REPLY_WORK. + + This work item is used to send an iTIP reply message when an attendee changes + their partstat in the calendar object resource. + """ + + @classmethod + @inlineCallbacks + def reply(cls, txn, home, resource, changedRids, attendee): + # Always queue up new work - coalescing happens when work is executed + notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.QueuedReplyDelaySeconds) + proposal = (yield txn.enqueue( + cls, + notBefore=notBefore, + icalendarUid=resource.uid(), + homeResourceID=home.id(), + resourceID=resource.id(), + changedRids=",".join(changedRids) if changedRids else None, + )) + yield proposal.whenProposed() + log.debug("ScheduleReplyWork - enqueued for ID: {id}, UID: {uid}, attendee: {att}", id=proposal.workItem.workID, uid=resource.uid(), att=attendee) + + + @classmethod + @inlineCallbacks + def hasWork(cls, txn): + srw = schema.SCHEDULE_REPLY_WORK + rows = (yield Select( + (srw.WORK_ID,), + From=srw, + ).on(txn)) + returnValue(len(rows) > 0) + + + @inlineCallbacks + def doWork(self): + + try: + home = (yield self.transaction.calendarHomeWithResourceID(self.homeResourceID)) + resource = (yield home.objectResourceWithID(self.resourceID)) + attendeePrincipal = home.directoryService().recordWithUID(home.uid()) + attendee = attendeePrincipal.canonicalCalendarUserAddress() + calendar = (yield resource.componentForUser()) + organizer = calendar.validOrganizerForScheduling() + + changedRids = self.changedRids.split(",") if self.changedRids else None + + log.debug("ScheduleReplyWork - running for ID: {id}, UID: {uid}, attendee: {att}", id=self.workID, uid=calendar.resourceUID(), att=attendee) + + # We need to get the UID lock for implicit processing. + yield NamedLock.acquire(self.transaction, "ImplicitUIDLock:%s" % (hashlib.md5(calendar.resourceUID()).hexdigest(),)) + + itipmsg = iTipGenerator.generateAttendeeReply(calendar, attendee, changedRids=changedRids) + + # Send scheduling message and process response + response = (yield self.sendToOrganizer(home, "REPLY", itipmsg, attendee, organizer)) + yield self.handleSchedulingResponse(response, calendar, resource, False) + + except Exception, e: + log.debug("ScheduleReplyWork - exception ID: {id}, UID: '{uid}', {err}", id=self.workID, uid=calendar.resourceUID(), err=str(e)) + raise + except: + log.debug("ScheduleReplyWork - bare exception ID: {id}, UID: '{uid}'", id=self.workID, uid=calendar.resourceUID()) + raise + + + @inlineCallbacks + def handleSchedulingResponse(self, response, calendar, resource, is_organizer): + """ + Update a user's calendar object resource based on the results of a queued scheduling + message response. Note we only need to update in the case where there is an error response + as we will already have updated the calendar object resource to make it look like scheduling + worked prior to the work queue item being enqueued. + + @param response: the scheduling response object + @type response: L{caldavxml.ScheduleResponse} + @param calendar: original calendar component + @type calendar: L{Component} + @param resource: calendar object resource to update + @type resource: L{CalendarObject} + @param is_organizer: whether or not iTIP message was sent by the organizer + @type is_organizer: C{bool} + """ + + # Map each recipient in the response to a status code + changed = False + for item in response.responses: + assert isinstance(item, caldavxml.Response), "Wrong element in response" + recipient = str(item.children[0].children[0]) + status = str(item.children[1]) + statusCode = status.split(";")[0] + + # Now apply to each ATTENDEE/ORGANIZER in the original data only if not 1.2 + if statusCode != iTIPRequestStatus.MESSAGE_DELIVERED_CODE: + calendar.setParameterToValueForPropertyWithValue( + "SCHEDULE-STATUS", + statusCode, + "ATTENDEE" if is_organizer else "ORGANIZER", + recipient, + ) + changed = True + + if changed: + yield resource._setComponentInternal(calendar, internal_state=ComponentUpdateState.ATTENDEE_ITIP_UPDATE) + + + +class ScheduleReplyCancelWork(WorkItem, fromTable(schema.SCHEDULE_REPLY_CANCEL_WORK), ScheduleReplyWorkMixin): + """ + The associated work item table is SCHEDULE_REPLY_CANCEL_WORK. + + This work item is used to send an iTIP reply message when an attendee deletes + their copy of the calendar object resource. For this to work we need to store a copy + of the original resource data. + """ + + # Schedule work is grouped based on calendar object UID + group = property(lambda self: "ScheduleWork:%s" % (self.icalendarUid,)) + + + @classmethod + @inlineCallbacks + def replyCancel(cls, txn, home, calendar, attendee): + # Always queue up new work - coalescing happens when work is executed + notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.QueuedReplyDelaySeconds) + proposal = (yield txn.enqueue( + cls, + notBefore=notBefore, + icalendarUid=calendar.resourceUID(), + homeResourceID=home.id(), + icalendarText=calendar.getTextWithTimezones(includeTimezones=not config.EnableTimezonesByReference), + )) + yield proposal.whenProposed() + log.debug("ScheduleReplyCancelWork - enqueued for ID: {id}, UID: {uid}, attendee: {att}", id=proposal.workItem.workID, uid=calendar.resourceUID(), att=attendee) + + + @inlineCallbacks + def doWork(self): + + try: + home = (yield self.transaction.calendarHomeWithResourceID(self.homeResourceID)) + attendeePrincipal = home.directoryService().recordWithUID(home.uid()) + attendee = attendeePrincipal.canonicalCalendarUserAddress() + calendar = Component.fromString(self.icalendarText) + organizer = calendar.validOrganizerForScheduling() + + log.debug("ScheduleReplyCancelWork - running for ID: {id}, UID: {uid}, attendee: {att}", id=self.workID, uid=calendar.resourceUID(), att=attendee) + + # We need to get the UID lock for implicit processing. + yield NamedLock.acquire(self.transaction, "ImplicitUIDLock:%s" % (hashlib.md5(calendar.resourceUID()).hexdigest(),)) + + itipmsg = iTipGenerator.generateAttendeeReply(calendar, attendee, force_decline=True) + + # Send scheduling message - no need to process response as original resource is gone + yield self.sendToOrganizer(home, "CANCEL", itipmsg, attendee, organizer) + + except Exception, e: + log.debug("ScheduleReplyCancelWork - exception ID: {id}, UID: '{uid}', {err}", id=self.workID, uid=calendar.resourceUID(), err=str(e)) + raise + except: + log.debug("ScheduleReplyCancelWork - bare exception ID: {id}, UID: '{uid}'", id=self.workID, uid=calendar.resourceUID()) + raise + + + +class ScheduleRefreshWork(WorkItem, fromTable(schema.SCHEDULE_REFRESH_WORK), ScheduleWorkMixin): + """ + The associated work item table is SCHEDULE_REFRESH_WORK. + + This work item is used to trigger an iTIP refresh of attendees. This happens when one attendee + replies to an invite, and we want to have the others attendees see that change - eventually. We + are going to use the SCHEDULE_REFRESH_ATTENDEES table to track the list of attendees needing + a refresh for each calendar object resource (identified by the organizer's resource-id for that + calendar object). We want to do refreshes in batches with a configurable time between each batch. + + The tricky part here is handling race conditions, where two or more attendee replies happen at the + same time, or happen whilst a previously queued refresh has started batch processing. Here is how + we will handle that: + + 1) Each time a refresh is needed we will add all attendees to the SCHEDULE_REFRESH_ATTENDEES table. + This will happen even if those attendees are currently listed in that table. We ensure the table is + not unique wrt to attendees - this means that two simultaneous refreshes can happily insert the + same set of attendees without running into unique constraints and thus without having to use + savepoints to cope with that. This will mean duplicate attendees listed in the table, but we take + care of that when executing the work item, as per the next point. + + 2) When a work item is triggered we get the set of unique attendees needing a refresh from the + SCHEDULE_REFRESH_ATTENDEES table. We split out a batch of those to actually refresh - with the + others being left in the table as-is. We then remove the batch of attendees from the + SCHEDULE_REFRESH_ATTENDEES table - this will remove duplicates. The refresh is then done and a + new work item scheduled to do the next batch. We only stop rescheduling work items when nothing + is found during the initial query. Note that if any refresh is done we will always reschedule work + even if we know none remain. That should handle the case where a new refresh occurs whilst + processing the last batch from a previous refresh. + + Hopefully the above methodology will deal with concurrency issues, preventing any excessive locking + or failed inserts etc. + """ + + @classmethod + @inlineCallbacks + def refreshAttendees(cls, txn, organizer_resource, organizer_calendar, attendees): + # See if there is already a pending refresh and merge current attendees into that list, + # otherwise just mark all attendees as pending + sra = schema.SCHEDULE_REFRESH_ATTENDEES + pendingAttendees = (yield Select( + [sra.ATTENDEE, ], + From=sra, + Where=sra.RESOURCE_ID == organizer_resource.id(), + ).on(txn)) + pendingAttendees = [row[0] for row in pendingAttendees] + attendeesToRefresh = set(attendees) - set(pendingAttendees) + for attendee in attendeesToRefresh: + yield Insert( + { + sra.RESOURCE_ID: organizer_resource.id(), + sra.ATTENDEE: attendee, + } + ).on(txn) + + # Always queue up new work - coalescing happens when work is executed + notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchDelaySeconds) + yield txn.enqueue( + cls, + icalendarUid=organizer_resource.uid(), + homeResourceID=organizer_resource._home.id(), + resourceID=organizer_resource.id(), + notBefore=notBefore + ) + + + @inlineCallbacks + def doWork(self): + + # Look for other work items for this resource and ignore this one if other later ones exist + srw = schema.SCHEDULE_REFRESH_WORK + rows = (yield Select( + (srw.WORK_ID,), + From=srw, + Where=(srw.HOME_RESOURCE_ID == self.homeResourceID).And( + srw.RESOURCE_ID == self.resourceID), + ).on(self.transaction)) + if rows: + log.debug("Schedule refresh for resource-id: {rid} - ignored", rid=self.resourceID) + returnValue(None) + + log.debug("Schedule refresh for resource-id: {rid}", rid=self.resourceID) + + # Get the unique list of pending attendees and split into batch to process + # TODO: do a DELETE ... and rownum <= N returning attendee - but have to fix Oracle to + # handle multi-row returning. Would be better than entire select + delete of each one, + # but need to make sure to use UNIQUE as there may be duplicate attendees. + sra = schema.SCHEDULE_REFRESH_ATTENDEES + pendingAttendees = (yield Select( + [sra.ATTENDEE, ], + From=sra, + Where=sra.RESOURCE_ID == self.resourceID, + ).on(self.transaction)) + pendingAttendees = list(set([row[0] for row in pendingAttendees])) + + # Nothing left so done + if len(pendingAttendees) == 0: + returnValue(None) + + attendeesToProcess = pendingAttendees[:config.Scheduling.Options.AttendeeRefreshBatch] + pendingAttendees = pendingAttendees[config.Scheduling.Options.AttendeeRefreshBatch:] + + yield Delete( + From=sra, + Where=(sra.RESOURCE_ID == self.resourceID).And(sra.ATTENDEE.In(Parameter("attendeesToProcess", len(attendeesToProcess)))) + ).on(self.transaction, attendeesToProcess=attendeesToProcess) + + # Reschedule work item if pending attendees remain. + if len(pendingAttendees) != 0: + notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchIntervalSeconds) + yield self.transaction.enqueue( + self.__class__, + homeResourceID=self.homeResourceID, + resourceID=self.resourceID, + notBefore=notBefore + ) + + # Do refresh + yield self._doDelayedRefresh(attendeesToProcess) + + + @inlineCallbacks + def _doDelayedRefresh(self, attendeesToProcess): + """ + Do an attendee refresh that has been delayed until after processing of the request that called it. That + requires that we create a new transaction to work with. + + @param attendeesToProcess: list of attendees to refresh. + @type attendeesToProcess: C{list} + """ + + organizer_home = (yield self.transaction.calendarHomeWithResourceID(self.homeResourceID)) + organizer_resource = (yield organizer_home.objectResourceWithID(self.resourceID)) + if organizer_resource is not None: + try: + # We need to get the UID lock for implicit processing whilst we send the auto-reply + # as the Organizer processing will attempt to write out data to other attendees to + # refresh them. To prevent a race we need a lock. + yield NamedLock.acquire(self.transaction, "ImplicitUIDLock:%s" % (hashlib.md5(organizer_resource.uid()).hexdigest(),)) + + yield self._doRefresh(organizer_resource, attendeesToProcess) + except Exception, e: + log.debug("ImplicitProcessing - refresh exception UID: '{uid}', {exc}", uid=organizer_resource.uid(), exc=str(e)) + raise + except: + log.debug("ImplicitProcessing - refresh bare exception UID: '{uid}'", uid=organizer_resource.uid()) + raise + else: + log.debug("ImplicitProcessing - skipping refresh of missing ID: '{rid}'", rid=self.resourceID) + + + @inlineCallbacks + def _doRefresh(self, organizer_resource, only_attendees): + """ + Do a refresh of attendees. + + @param organizer_resource: the resource for the organizer's calendar data + @type organizer_resource: L{DAVResource} + @param only_attendees: list of attendees to refresh (C{None} - refresh all) + @type only_attendees: C{tuple} + """ + log.debug("ImplicitProcessing - refreshing UID: '{uid}', Attendees: {att}", uid=organizer_resource.uid(), att=", ".join(only_attendees) if only_attendees else "all") + from txdav.caldav.datastore.scheduling.implicit import ImplicitScheduler + scheduler = ImplicitScheduler() + yield scheduler.refreshAllAttendeesExceptSome( + self.transaction, + organizer_resource, + only_attendees=only_attendees, + ) + + + +class ScheduleAutoReplyWork(WorkItem, fromTable(schema.SCHEDULE_AUTO_REPLY_WORK), ScheduleWorkMixin): + """ + The associated work item table is SCHEDULE_AUTO_REPLY_WORK. + + This work item is used to send auto-reply iTIP messages after the calendar data for the + auto-accept user has been written to the user calendar. + """ + + @classmethod + @inlineCallbacks + def autoReply(cls, txn, resource, partstat): + # Always queue up new work - coalescing happens when work is executed + notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AutoReplyDelaySeconds) + yield txn.enqueue( + cls, + icalendarUid=resource.uid(), + homeResourceID=resource._home.id(), + resourceID=resource.id(), + partstat=partstat, + notBefore=notBefore, + ) + + + @inlineCallbacks + def doWork(self): + + log.debug("Schedule auto-reply for resource-id: {rid}", rid=self.resourceID) + + # Delete all other work items with the same pushID + yield Delete(From=self.table, + Where=self.table.RESOURCE_ID == self.resourceID + ).on(self.transaction) + + # Do reply + yield self._sendAttendeeAutoReply() + + + @inlineCallbacks + def _sendAttendeeAutoReply(self): + """ + Auto-process the calendar option to generate automatic accept/decline status and + send a reply if needed. + + We used to have logic to suppress attendee refreshes until after all auto-replies have + been processed. We can't do that with the work queue (easily) so we are going to ignore + that for now. It may not be a big deal given that the refreshes are themselves done in the + queue and we only do the refresh when the last queued work item is processed. + + @param resource: calendar resource to process + @type resource: L{CalendarObject} + @param partstat: new partstat value + @type partstat: C{str} + """ + + home = (yield self.transaction.calendarHomeWithResourceID(self.homeResourceID)) + resource = (yield home.objectResourceWithID(self.resourceID)) + if resource is not None: + try: + # We need to get the UID lock for implicit processing whilst we send the auto-reply + # as the Organizer processing will attempt to write out data to other attendees to + # refresh them. To prevent a race we need a lock. + yield NamedLock.acquire(self.transaction, "ImplicitUIDLock:%s" % (hashlib.md5(resource.uid()).hexdigest(),)) + + # Send out a reply + log.debug("ImplicitProcessing - recipient '%s' processing UID: '%s' - auto-reply: %s" % (home.uid(), resource.uid(), self.partstat)) + from txdav.caldav.datastore.scheduling.implicit import ImplicitScheduler + scheduler = ImplicitScheduler() + yield scheduler.sendAttendeeReply(self.transaction, resource) + except Exception, e: + log.debug("ImplicitProcessing - auto-reply exception UID: '%s', %s" % (resource.uid(), str(e))) + raise + except: + log.debug("ImplicitProcessing - auto-reply bare exception UID: '%s'" % (resource.uid(),)) + raise + else: + log.debug("ImplicitProcessing - skipping auto-reply of missing ID: '{rid}'", rid=self.resourceID) Property changes on: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/work.py ___________________________________________________________________ Added: svn:executable + * Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current-oracle-dialect.sql =================================================================== --- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current-oracle-dialect.sql 2013-10-13 14:57:55 UTC (rev 11810) +++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current-oracle-dialect.sql 2013-10-13 14:59:35 UTC (rev 11811) @@ -363,6 +363,7 @@ create table SCHEDULE_REFRESH_WORK ( "WORK_ID" integer primary key not null, "NOT_BEFORE" timestamp default CURRENT_TIMESTAMP at time zone 'UTC', + "ICALENDAR_UID" nvarchar2(255), "HOME_RESOURCE_ID" integer not null references CALENDAR_HOME on delete cascade, "RESOURCE_ID" integer not null references CALENDAR_OBJECT on delete cascade ); @@ -375,11 +376,29 @@ create table SCHEDULE_AUTO_REPLY_WORK ( "WORK_ID" integer primary key not null, "NOT_BEFORE" timestamp default CURRENT_TIMESTAMP at time zone 'UTC', + "ICALENDAR_UID" nvarchar2(255), "HOME_RESOURCE_ID" integer not null references CALENDAR_HOME on delete cascade, "RESOURCE_ID" integer not null references CALENDAR_OBJECT on delete cascade, "PARTSTAT" nvarchar2(255) ); +create table SCHEDULE_REPLY_WORK ( + "WORK_ID" integer primary key not null, + "NOT_BEFORE" timestamp default CURRENT_TIMESTAMP at time zone 'UTC', + "ICALENDAR_UID" nvarchar2(255), + "HOME_RESOURCE_ID" integer not null references CALENDAR_HOME on delete cascade, + "RESOURCE_ID" integer not null references CALENDAR_OBJECT on delete cascade, + "CHANGED_RIDS" nclob +); + +create table SCHEDULE_REPLY_CANCEL_WORK ( + "WORK_ID" integer primary key not null, + "NOT_BEFORE" timestamp default CURRENT_TIMESTAMP at time zone 'UTC', + "ICALENDAR_UID" nvarchar2(255), + "HOME_RESOURCE_ID" integer not null references CALENDAR_HOME on delete cascade, + "ICALENDAR_TEXT" nclob +); + create table CALENDARSERVER ( "NAME" nvarchar2(255) primary key, "VALUE" nvarchar2(255) @@ -530,3 +549,15 @@ RESOURCE_ID ); +create index SCHEDULE_REPLY_WORK_H_745af8cf on SCHEDULE_REPLY_WORK ( + HOME_RESOURCE_ID +); + +create index SCHEDULE_REPLY_WORK_R_11bd3fbb on SCHEDULE_REPLY_WORK ( + RESOURCE_ID +); + +create index SCHEDULE_REPLY_CANCEL_dab513ef on SCHEDULE_REPLY_CANCEL_WORK ( + HOME_RESOURCE_ID +); + Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql =================================================================== --- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql 2013-10-13 14:57:55 UTC (rev 11810) +++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql 2013-10-13 14:59:35 UTC (rev 11811) @@ -693,6 +693,7 @@ create table SCHEDULE_REFRESH_WORK ( WORK_ID integer primary key default nextval('WORKITEM_SEQ') not null, -- implicit index NOT_BEFORE timestamp default timezone('UTC', CURRENT_TIMESTAMP), + ICALENDAR_UID varchar(255) not null, HOME_RESOURCE_ID integer not null references CALENDAR_HOME on delete cascade, RESOURCE_ID integer not null references CALENDAR_OBJECT on delete cascade ); @@ -717,6 +718,7 @@ create table SCHEDULE_AUTO_REPLY_WORK ( WORK_ID integer primary key default nextval('WORKITEM_SEQ') not null, -- implicit index NOT_BEFORE timestamp default timezone('UTC', CURRENT_TIMESTAMP), + ICALENDAR_UID varchar(255) not null, HOME_RESOURCE_ID integer not null references CALENDAR_HOME on delete cascade, RESOURCE_ID integer not null references CALENDAR_OBJECT on delete cascade, PARTSTAT varchar(255) not null @@ -727,6 +729,39 @@ create index SCHEDULE_AUTO_REPLY_WORK_RESOURCE_ID on SCHEDULE_AUTO_REPLY_WORK(RESOURCE_ID); +------------------------- +-- Schedule Reply Work -- +------------------------- + +create table SCHEDULE_REPLY_WORK ( + WORK_ID integer primary key default nextval('WORKITEM_SEQ') not null, -- implicit index + NOT_BEFORE timestamp default timezone('UTC', CURRENT_TIMESTAMP), + ICALENDAR_UID varchar(255) not null, + HOME_RESOURCE_ID integer not null references CALENDAR_HOME on delete cascade, + RESOURCE_ID integer not null references CALENDAR_OBJECT on delete cascade, + CHANGED_RIDS text +); + +create index SCHEDULE_REPLY_WORK_HOME_RESOURCE_ID on + SCHEDULE_REPLY_WORK(HOME_RESOURCE_ID); +create index SCHEDULE_REPLY_WORK_RESOURCE_ID on + SCHEDULE_REPLY_WORK(RESOURCE_ID); + +-------------------------------- +-- Schedule Reply Cancel Work -- +-------------------------------- + +create table SCHEDULE_REPLY_CANCEL_WORK ( + WORK_ID integer primary key default nextval('WORKITEM_SEQ') not null, -- implicit index + NOT_BEFORE timestamp default timezone('UTC', CURRENT_TIMESTAMP), + ICALENDAR_UID varchar(255) not null, + HOME_RESOURCE_ID integer not null references CALENDAR_HOME on delete cascade, + ICALENDAR_TEXT text not null +); + +create index SCHEDULE_REPLY_CANCEL_WORK_HOME_RESOURCE_ID on + SCHEDULE_REPLY_CANCEL_WORK(HOME_RESOURCE_ID); + -------------------- -- Schema Version -- -------------------- Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_25_to_26.sql =================================================================== --- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_25_to_26.sql 2013-10-13 14:57:55 UTC (rev 11810) +++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_25_to_26.sql 2013-10-13 14:59:35 UTC (rev 11811) @@ -23,6 +23,7 @@ create table SCHEDULE_REFRESH_WORK ( "WORK_ID" integer primary key not null, "NOT_BEFORE" timestamp default CURRENT_TIMESTAMP at time zone 'UTC', + "ICALENDAR_UID" nvarchar2(255), "HOME_RESOURCE_ID" integer not null references CALENDAR_HOME on delete cascade, "RESOURCE_ID" integer not null references CALENDAR_OBJECT on delete cascade ); @@ -48,6 +49,7 @@ create table SCHEDULE_AUTO_REPLY_WORK ( "WORK_ID" integer primary key not null, "NOT_BEFORE" timestamp default CURRENT_TIMESTAMP at time zone 'UTC', + "ICALENDAR_UID" nvarchar2(255), "HOME_RESOURCE_ID" integer not null references CALENDAR_HOME on delete cascade, "RESOURCE_ID" integer not null references CALENDAR_OBJECT on delete cascade, "PARTSTAT" nvarchar2(255) @@ -61,6 +63,36 @@ RESOURCE_ID ); +create table SCHEDULE_REPLY_WORK ( + "WORK_ID" integer primary key not null, + "NOT_BEFORE" timestamp default CURRENT_TIMESTAMP at time zone 'UTC', + "ICALENDAR_UID" nvarchar2(255), + "HOME_RESOURCE_ID" integer not null references CALENDAR_HOME on delete cascade, + "RESOURCE_ID" integer not null references CALENDAR_OBJECT on delete cascade, + "CHANGED_RIDS" nclob +); + +create index SCHEDULE_REPLY_WORK_H_745af8cf on SCHEDULE_REPLY_WORK ( + HOME_RESOURCE_ID +); + +create index SCHEDULE_REPLY_WORK_R_11bd3fbb on SCHEDULE_REPLY_WORK ( + RESOURCE_ID +); + +create table SCHEDULE_REPLY_CANCEL_WORK ( + "WORK_ID" integer primary key not null, + "NOT_BEFORE" timestamp default CURRENT_TIMESTAMP at time zone 'UTC', + "ICALENDAR_UID" nvarchar2(255), + "HOME_RESOURCE_ID" integer not null references CALENDAR_HOME on delete cascade, + "ICALENDAR_TEXT" nclob +); + +create index SCHEDULE_REPLY_CANCEL_dab513ef on SCHEDULE_REPLY_CANCEL_WORK ( + HOME_RESOURCE_ID +); + + -- Now update the version -- No data upgrades update CALENDARSERVER set VALUE = '26' where NAME = 'VERSION'; Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_25_to_26.sql =================================================================== --- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_25_to_26.sql 2013-10-13 14:57:55 UTC (rev 11810) +++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_25_to_26.sql 2013-10-13 14:59:35 UTC (rev 11811) @@ -20,9 +20,14 @@ -- New tables +--------------------------- +-- Schedule Refresh Work -- +--------------------------- + create table SCHEDULE_REFRESH_WORK ( WORK_ID integer primary key default nextval('WORKITEM_SEQ') not null, -- implicit index NOT_BEFORE timestamp default timezone('UTC', CURRENT_TIMESTAMP), + ICALENDAR_UID varchar(255) not null, HOME_RESOURCE_ID integer not null references CALENDAR_HOME on delete cascade, RESOURCE_ID integer not null references CALENDAR_OBJECT on delete cascade ); @@ -40,10 +45,14 @@ create index SCHEDULE_REFRESH_ATTENDEES_RESOURCE_ID_ATTENDEE on SCHEDULE_REFRESH_ATTENDEES(RESOURCE_ID, ATTENDEE); +------------------------------ +-- Schedule Auto Reply Work -- +------------------------------ - create table SCHEDULE_AUTO_REPLY_WORK ( +create table SCHEDULE_AUTO_REPLY_WORK ( WORK_ID integer primary key default nextval('WORKITEM_SEQ') not null, -- implicit index NOT_BEFORE timestamp default timezone('UTC', CURRENT_TIMESTAMP), + ICALENDAR_UID varchar(255) not null, HOME_RESOURCE_ID integer not null references CALENDAR_HOME on delete cascade, RESOURCE_ID integer not null references CALENDAR_OBJECT on delete cascade, PARTSTAT varchar(255) not null @@ -54,6 +63,40 @@ create index SCHEDULE_AUTO_REPLY_WORK_RESOURCE_ID on SCHEDULE_AUTO_REPLY_WORK(RESOURCE_ID); +------------------------- +-- Schedule Reply Work -- +------------------------- + +create table SCHEDULE_REPLY_WORK ( + WORK_ID integer primary key default nextval('WORKITEM_SEQ') not null, -- implicit index + NOT_BEFORE timestamp default timezone('UTC', CURRENT_TIMESTAMP), + ICALENDAR_UID varchar(255) not null, + HOME_RESOURCE_ID integer not null references CALENDAR_HOME on delete cascade, + RESOURCE_ID integer not null references CALENDAR_OBJECT on delete cascade, + CHANGED_RIDS text +); + +create index SCHEDULE_REPLY_WORK_HOME_RESOURCE_ID on + SCHEDULE_REPLY_WORK(HOME_RESOURCE_ID); +create index SCHEDULE_REPLY_WORK_RESOURCE_ID on + SCHEDULE_REPLY_WORK(RESOURCE_ID); + +-------------------------------- +-- Schedule Reply Cancel Work -- +-------------------------------- + +create table SCHEDULE_REPLY_CANCEL_WORK ( + WORK_ID integer primary key default nextval('WORKITEM_SEQ') not null, -- implicit index + NOT_BEFORE timestamp default timezone('UTC', CURRENT_TIMESTAMP), + ICALENDAR_UID varchar(255) not null, + HOME_RESOURCE_ID integer not null references CALENDAR_HOME on delete cascade, + ICALENDAR_TEXT text not null +); + +create index SCHEDULE_REPLY_CANCEL_WORK_HOME_RESOURCE_ID on + SCHEDULE_REPLY_CANCEL_WORK(HOME_RESOURCE_ID); + + -- Now update the version -- No data upgrades update CALENDARSERVER set VALUE = '26' where NAME = 'VERSION';
participants (1)
-
source_changes@macosforge.org