[CalendarServer-changes] [11811] CalendarServer/branches/users/cdaboo/scheduling-queue-refresh

source_changes at macosforge.org source_changes at macosforge.org
Sun Oct 13 07:59:35 PDT 2013


Revision: 11811
          http://trac.calendarserver.org//changeset/11811
Author:   cdaboo at apple.com
Date:     2013-10-13 07:59:35 -0700 (Sun, 13 Oct 2013)
Log Message:
-----------
Work items for scheduling replies.

Modified Paths:
--------------
    CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twistedcaldav/stdconfig.py
    CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/implicit.py
    CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py
    CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/test/test_implicit.py
    CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current-oracle-dialect.sql
    CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql
    CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_25_to_26.sql
    CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_25_to_26.sql

Added Paths:
-----------
    CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/work.py

Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twistedcaldav/stdconfig.py	2013-10-13 14:57:55 UTC (rev 11810)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/twistedcaldav/stdconfig.py	2013-10-13 14:59:35 UTC (rev 11811)
@@ -714,6 +714,8 @@
             "AttendeeRefreshBatchIntervalSeconds" : 5, # Time between attendee batch refreshes
             "AttendeeRefreshCountLimit"           : 50, # Number of attendees above which attendee refreshes are suppressed: 0 - no limit
             "AutoReplyDelaySeconds"               : 5, # Time delay for sending an auto reply iTIP message
+            "QueuedRequestDelaySeconds"           : 5, # Number of seconds delay for a queued scheduling request/cancel
+            "QueuedReplyDelaySeconds"             : 1, # Number of seconds delay for a queued scheduling reply
             "UIDLockTimeoutSeconds"               : 60, # Time for implicit UID lock timeout
             "UIDLockExpirySeconds"                : 300, # Expiration time for UID lock,
             "PrincipalHostAliases"                : [], # Host names matched in http(s) CUAs

Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/implicit.py
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/implicit.py	2013-10-13 14:57:55 UTC (rev 11810)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/implicit.py	2013-10-13 14:59:35 UTC (rev 11811)
@@ -34,6 +34,8 @@
 from txdav.caldav.datastore.scheduling.icaldiff import iCalDiff
 from txdav.caldav.datastore.scheduling.itip import iTipGenerator, iTIPRequestStatus
 from txdav.caldav.datastore.scheduling.utils import getCalendarObjectForRecord
+from txdav.caldav.datastore.scheduling.work import ScheduleReplyWork, \
+    ScheduleReplyCancelWork
 
 import collections
 
@@ -1306,12 +1308,22 @@
 
         self.logItems["itip.reply"] = "reply"
 
-        itipmsg = iTipGenerator.generateAttendeeReply(self.calendar, self.attendee, changedRids=changedRids)
+#        itipmsg = iTipGenerator.generateAttendeeReply(self.calendar, self.attendee, changedRids=changedRids)
+#
+#        # Send scheduling message
+#        return self.sendToOrganizer("REPLY", itipmsg)
 
-        # Send scheduling message
-        return self.sendToOrganizer("REPLY", itipmsg)
+        # Always make it look like scheduling succeeded when queuing
+        self.calendar.setParameterToValueForPropertyWithValue(
+            "SCHEDULE-STATUS",
+            iTIPRequestStatus.MESSAGE_DELIVERED_CODE,
+            "ORGANIZER",
+            self.organizer,
+        )
 
+        return ScheduleReplyWork.reply(self.txn, self.calendar_home, self.resource, changedRids, self.attendee)
 
+
     def scheduleCancelWithOrganizer(self):
 
         # First make sure we are allowed to schedule
@@ -1319,12 +1331,15 @@
 
         self.logItems["itip.reply"] = "cancel"
 
-        itipmsg = iTipGenerator.generateAttendeeReply(self.calendar, self.attendee, force_decline=True)
+#        itipmsg = iTipGenerator.generateAttendeeReply(self.calendar, self.attendee, force_decline=True)
+#
+#        # Send scheduling message
+#        return self.sendToOrganizer("CANCEL", itipmsg)
 
-        # Send scheduling message
-        return self.sendToOrganizer("CANCEL", itipmsg)
+        return ScheduleReplyCancelWork.replyCancel(self.txn, self.calendar_home, self.calendar, self.attendee)
 
 
+    @inlineCallbacks
     def sendToOrganizer(self, action, itipmsg):
 
         # Send scheduling message
@@ -1333,10 +1348,6 @@
         scheduler = self.makeScheduler()
 
         # Do the PUT processing
-        def _gotResponse(response):
-            self.handleSchedulingResponse(response, False)
-
         log.info("Implicit %s - attendee: '%s' to organizer: '%s', UID: '%s'" % (action, self.attendee, self.organizer, self.uid,))
-        d = scheduler.doSchedulingViaPUT(self.originator, (self.organizer,), itipmsg, internal_request=True)
-        d.addCallback(_gotResponse)
-        return d
+        response = (yield scheduler.doSchedulingViaPUT(self.originator, (self.organizer,), itipmsg, internal_request=True))
+        self.handleSchedulingResponse(response, False)

Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py	2013-10-13 14:57:55 UTC (rev 11810)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py	2013-10-13 14:59:35 UTC (rev 11811)
@@ -18,10 +18,6 @@
 from pycalendar.duration import PyCalendarDuration
 from pycalendar.timezone import PyCalendarTimezone
 
-from twext.enterprise.dal.record import fromTable
-from twext.enterprise.dal.syntax import Select, Delete, Insert, Parameter
-from twext.enterprise.locking import NamedLock
-from twext.enterprise.queue import WorkItem
 from twext.python.log import Logger
 from twext.web2.dav.method.report import NumberOfMatchesWithinLimits
 from twext.web2.http import HTTPError
@@ -37,11 +33,11 @@
 from txdav.caldav.datastore.scheduling.freebusy import generateFreeBusyInfo
 from txdav.caldav.datastore.scheduling.itip import iTipProcessing, iTIPRequestStatus
 from txdav.caldav.datastore.scheduling.utils import getCalendarObjectForRecord
+from txdav.caldav.datastore.scheduling.work import ScheduleRefreshWork, \
+    ScheduleAutoReplyWork
 from txdav.caldav.icalendarstore import ComponentUpdateState, ComponentRemoveState
-from txdav.common.datastore.sql_tables import schema
 
 import collections
-import datetime
 import hashlib
 import uuid
 
@@ -278,7 +274,7 @@
         @param attendees: the list of attendees to refresh
         @type attendees: C{list}
         """
-        return self.ScheduleRefreshWork.refreshAttendees(
+        return ScheduleRefreshWork.refreshAttendees(
             self.txn,
             self.recipient_calendar_resource,
             self.recipient_calendar,
@@ -432,7 +428,7 @@
             if send_reply:
                 # Track outstanding auto-reply processing
                 log.debug("ImplicitProcessing - recipient '%s' processing UID: '%s' - auto-reply queued" % (self.recipient.cuaddr, self.uid,))
-                self.ScheduleAutoReplyWork.autoReply(self.txn, new_resource, partstat)
+                ScheduleAutoReplyWork.autoReply(self.txn, new_resource, partstat)
 
             # Build the schedule-changes XML element
             changes = customxml.ScheduleChanges(
@@ -472,7 +468,7 @@
                 if send_reply:
                     # Track outstanding auto-reply processing
                     log.debug("ImplicitProcessing - recipient '%s' processing UID: '%s' - auto-reply queued" % (self.recipient.cuaddr, self.uid,))
-                    self.ScheduleAutoReplyWork.autoReply(self.txn, new_resource, partstat)
+                    ScheduleAutoReplyWork.autoReply(self.txn, new_resource, partstat)
 
                 # Build the schedule-changes XML element
                 update_details = []
@@ -885,256 +881,3 @@
                 yield self.deleteCalendarResource(recipient_resource)
 
         returnValue(True)
-
-
-    class ScheduleRefreshWork(WorkItem, fromTable(schema.SCHEDULE_REFRESH_WORK)):
-        """
-        The associated work item table is SCHEDULE_REFRESH_WORK.
-
-        This work item is used to trigger an iTIP refresh of attendees. This happens when one attendee
-        replies to an invite, and we want to have the others attendees see that change - eventually. We
-        are going to use the SCHEDULE_REFRESH_ATTENDEES table to track the list of attendees needing
-        a refresh for each calendar object resource (identified by the organizer's resource-id for that
-        calendar object). We want to do refreshes in batches with a configurable time between each batch.
-
-        The tricky part here is handling race conditions, where two or more attendee replies happen at the
-        same time, or happen whilst a previously queued refresh has started batch processing. Here is how
-        we will handle that:
-
-        1) Each time a refresh is needed we will add all attendees to the SCHEDULE_REFRESH_ATTENDEES table.
-        This will happen even if those attendees are currently listed in that table. We ensure the table is
-        not unique wrt to attendees - this means that two simultaneous refreshes can happily insert the
-        same set of attendees without running into unique constraints and thus without having to use
-        savepoints to cope with that. This will mean duplicate attendees listed in the table, but we take
-        care of that when executing the work item, as per the next point.
-
-        2) When a work item is triggered we get the set of unique attendees needing a refresh from the
-        SCHEDULE_REFRESH_ATTENDEES table. We split out a batch of those to actually refresh - with the
-        others being left in the table as-is. We then remove the batch of attendees from the
-        SCHEDULE_REFRESH_ATTENDEES table - this will remove duplicates. The refresh is then done and a
-        new work item scheduled to do the next batch. We only stop rescheduling work items when nothing
-        is found during the initial query. Note that if any refresh is done we will always reschedule work
-        even if we know none remain. That should handle the case where a new refresh occurs whilst
-        processing the last batch from a previous refresh.
-
-        Hopefully the above methodology will deal with concurrency issues, preventing any excessive locking
-        or failed inserts etc.
-        """
-
-        group = property(lambda self: "ScheduleRefreshWork:%s" % (self.resourceID,))
-
-
-        @classmethod
-        @inlineCallbacks
-        def refreshAttendees(cls, txn, organizer_resource, organizer_calendar, attendees):
-            # See if there is already a pending refresh and merge current attendees into that list,
-            # otherwise just mark all attendees as pending
-            sra = schema.SCHEDULE_REFRESH_ATTENDEES
-            pendingAttendees = (yield Select(
-                [sra.ATTENDEE, ],
-                From=sra,
-                Where=sra.RESOURCE_ID == organizer_resource.id(),
-            ).on(txn))
-            pendingAttendees = [row[0] for row in pendingAttendees]
-            attendeesToRefresh = set(attendees) - set(pendingAttendees)
-            for attendee in attendeesToRefresh:
-                yield Insert(
-                    {
-                        sra.RESOURCE_ID: organizer_resource.id(),
-                        sra.ATTENDEE: attendee,
-                    }
-                ).on(txn)
-
-            # Always queue up new work - coalescing happens when work is executed
-            notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchDelaySeconds)
-            yield txn.enqueue(
-                cls,
-                homeResourceID=organizer_resource._home.id(),
-                resourceID=organizer_resource.id(),
-                notBefore=notBefore
-            )
-
-
-        @inlineCallbacks
-        def doWork(self):
-
-            # Look for other work items for this resource and ignore this one if other later ones exist
-            srw = schema.SCHEDULE_REFRESH_WORK
-            rows = (yield Select(
-                (srw.WORK_ID,),
-                From=srw,
-                Where=(srw.HOME_RESOURCE_ID == self.homeResourceID).And(
-                       srw.RESOURCE_ID == self.resourceID),
-            ).on(self.transaction))
-            if rows:
-                log.debug("Schedule refresh for resource-id: {rid} - ignored", rid=self.resourceID)
-                returnValue(None)
-
-            log.debug("Schedule refresh for resource-id: {rid}", rid=self.resourceID)
-
-            # Get the unique list of pending attendees and split into batch to process
-            # TODO: do a DELETE ... and rownum <= N returning attendee - but have to fix Oracle to
-            # handle multi-row returning. Would be better than entire select + delete of each one,
-            # but need to make sure to use UNIQUE as there may be duplicate attendees.
-            sra = schema.SCHEDULE_REFRESH_ATTENDEES
-            pendingAttendees = (yield Select(
-                [sra.ATTENDEE, ],
-                From=sra,
-                Where=sra.RESOURCE_ID == self.resourceID,
-            ).on(self.transaction))
-            pendingAttendees = list(set([row[0] for row in pendingAttendees]))
-
-            # Nothing left so done
-            if len(pendingAttendees) == 0:
-                returnValue(None)
-
-            attendeesToProcess = pendingAttendees[:config.Scheduling.Options.AttendeeRefreshBatch]
-            pendingAttendees = pendingAttendees[config.Scheduling.Options.AttendeeRefreshBatch:]
-
-            yield Delete(
-                From=sra,
-                Where=(sra.RESOURCE_ID == self.resourceID).And(sra.ATTENDEE.In(Parameter("attendeesToProcess", len(attendeesToProcess))))
-            ).on(self.transaction, attendeesToProcess=attendeesToProcess)
-
-            # Reschedule work item if pending attendees remain.
-            if len(pendingAttendees) != 0:
-                notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchIntervalSeconds)
-                yield self.transaction.enqueue(
-                    self.__class__,
-                    homeResourceID=self.homeResourceID,
-                    resourceID=self.resourceID,
-                    notBefore=notBefore
-                )
-
-            # Do refresh
-            yield self._doDelayedRefresh(attendeesToProcess)
-
-
-        @inlineCallbacks
-        def _doDelayedRefresh(self, attendeesToProcess):
-            """
-            Do an attendee refresh that has been delayed until after processing of the request that called it. That
-            requires that we create a new transaction to work with.
-
-            @param attendeesToProcess: list of attendees to refresh.
-            @type attendeesToProcess: C{list}
-            """
-
-            organizer_home = (yield self.transaction.calendarHomeWithResourceID(self.homeResourceID))
-            organizer_resource = (yield organizer_home.objectResourceWithID(self.resourceID))
-            if organizer_resource is not None:
-                try:
-                    # We need to get the UID lock for implicit processing whilst we send the auto-reply
-                    # as the Organizer processing will attempt to write out data to other attendees to
-                    # refresh them. To prevent a race we need a lock.
-                    yield NamedLock.acquire(self.transaction, "ImplicitUIDLock:%s" % (hashlib.md5(organizer_resource.uid()).hexdigest(),))
-
-                    yield self._doRefresh(organizer_resource, attendeesToProcess)
-                except Exception, e:
-                    log.debug("ImplicitProcessing - refresh exception UID: '{uid}', {exc}", uid=organizer_resource.uid(), exc=str(e))
-                    raise
-                except:
-                    log.debug("ImplicitProcessing - refresh bare exception UID: '{uid}'", uid=organizer_resource.uid())
-                    raise
-            else:
-                log.debug("ImplicitProcessing - skipping refresh of missing ID: '{rid}'", rid=self.resourceID)
-
-
-        @inlineCallbacks
-        def _doRefresh(self, organizer_resource, only_attendees):
-            """
-            Do a refresh of attendees.
-
-            @param organizer_resource: the resource for the organizer's calendar data
-            @type organizer_resource: L{DAVResource}
-            @param only_attendees: list of attendees to refresh (C{None} - refresh all)
-            @type only_attendees: C{tuple}
-            """
-            log.debug("ImplicitProcessing - refreshing UID: '{uid}', Attendees: {att}", uid=organizer_resource.uid(), att=", ".join(only_attendees) if only_attendees else "all")
-            from txdav.caldav.datastore.scheduling.implicit import ImplicitScheduler
-            scheduler = ImplicitScheduler()
-            yield scheduler.refreshAllAttendeesExceptSome(
-                self.transaction,
-                organizer_resource,
-                only_attendees=only_attendees,
-            )
-
-
-    class ScheduleAutoReplyWork(WorkItem, fromTable(schema.SCHEDULE_AUTO_REPLY_WORK)):
-        """
-        The associated work item table is SCHEDULE_AUTO_REPLY_WORK.
-
-        This work item is used to send auto-reply iTIP messages after the calendar data for the
-        auto-accept user has been written to the user calendar.
-        """
-
-        group = property(lambda self: "ScheduleAutoReplyWork:%s" % (self.resourceID,))
-
-
-        @classmethod
-        @inlineCallbacks
-        def autoReply(cls, txn, resource, partstat):
-            # Always queue up new work - coalescing happens when work is executed
-            notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AutoReplyDelaySeconds)
-            yield txn.enqueue(
-                cls,
-                homeResourceID=resource._home.id(),
-                resourceID=resource.id(),
-                partstat=partstat,
-                notBefore=notBefore,
-            )
-
-
-        @inlineCallbacks
-        def doWork(self):
-
-            log.debug("Schedule auto-reply for resource-id: {rid}", rid=self.resourceID)
-
-            # Delete all other work items with the same pushID
-            yield Delete(From=self.table,
-                Where=self.table.RESOURCE_ID == self.resourceID
-            ).on(self.transaction)
-
-            # Do reply
-            yield self._sendAttendeeAutoReply()
-
-
-        @inlineCallbacks
-        def _sendAttendeeAutoReply(self):
-            """
-            Auto-process the calendar option to generate automatic accept/decline status and
-            send a reply if needed.
-
-            We used to have logic to suppress attendee refreshes until after all auto-replies have
-            been processed. We can't do that with the work queue (easily) so we are going to ignore
-            that for now. It may not be a big deal given that the refreshes are themselves done in the
-            queue and we only do the refresh when the last queued work item is processed.
-
-            @param resource: calendar resource to process
-            @type resource: L{CalendarObject}
-            @param partstat: new partstat value
-            @type partstat: C{str}
-            """
-
-            home = (yield self.transaction.calendarHomeWithResourceID(self.homeResourceID))
-            resource = (yield home.objectResourceWithID(self.resourceID))
-            if resource is not None:
-                try:
-                    # We need to get the UID lock for implicit processing whilst we send the auto-reply
-                    # as the Organizer processing will attempt to write out data to other attendees to
-                    # refresh them. To prevent a race we need a lock.
-                    yield NamedLock.acquire(self.transaction, "ImplicitUIDLock:%s" % (hashlib.md5(resource.uid()).hexdigest(),))
-
-                    # Send out a reply
-                    log.debug("ImplicitProcessing - recipient '%s' processing UID: '%s' - auto-reply: %s" % (home.uid(), resource.uid(), self.partstat))
-                    from txdav.caldav.datastore.scheduling.implicit import ImplicitScheduler
-                    scheduler = ImplicitScheduler()
-                    yield scheduler.sendAttendeeReply(self.transaction, resource)
-                except Exception, e:
-                    log.debug("ImplicitProcessing - auto-reply exception UID: '%s', %s" % (resource.uid(), str(e)))
-                    raise
-                except:
-                    log.debug("ImplicitProcessing - auto-reply bare exception UID: '%s'" % (resource.uid(),))
-                    raise
-            else:
-                log.debug("ImplicitProcessing - skipping auto-reply of missing ID: '{rid}'", rid=self.resourceID)

Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/test/test_implicit.py
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/test/test_implicit.py	2013-10-13 14:57:55 UTC (rev 11810)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/test/test_implicit.py	2013-10-13 14:59:35 UTC (rev 11811)
@@ -29,7 +29,8 @@
 from twistedcaldav.config import config
 from twistedcaldav.ical import Component
 
-from txdav.caldav.datastore.scheduling.implicit import ImplicitScheduler
+from txdav.caldav.datastore.scheduling.implicit import ImplicitScheduler, \
+    ScheduleReplyWork
 from txdav.caldav.datastore.scheduling.scheduler import ScheduleResponseQueue
 from txdav.caldav.datastore.test.util import buildCalendarStore, \
     buildDirectoryRecord
@@ -1326,6 +1327,11 @@
 
         yield self._setCalendarData(data2, "user02")
 
+        while True:
+            work = (yield ScheduleReplyWork.hasWork(self.transactionUnderTest()))
+            if not work:
+                break
+
         list1 = (yield self._listCalendarObjects("user01", "inbox"))
         self.assertEqual(len(list1), 1)
 
@@ -1396,6 +1402,11 @@
 
         yield self._setCalendarData(data2, "user02")
 
+        while True:
+            work = (yield ScheduleReplyWork.hasWork(self.transactionUnderTest()))
+            if not work:
+                break
+
         list1 = (yield self._listCalendarObjects("user01", "inbox"))
         self.assertEqual(len(list1), 1)
 
@@ -1480,6 +1491,11 @@
 
         yield self._setCalendarData(data2, "user02")
 
+        while True:
+            work = (yield ScheduleReplyWork.hasWork(self.transactionUnderTest()))
+            if not work:
+                break
+
         list1 = (yield self._listCalendarObjects("user01", "inbox"))
         self.assertEqual(len(list1), 1)
 

Added: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/work.py
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/work.py	                        (rev 0)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/work.py	2013-10-13 14:59:35 UTC (rev 11811)
@@ -0,0 +1,495 @@
+#
+# Copyright (c) 2013 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twext.enterprise.dal.record import fromTable
+from twext.enterprise.dal.syntax import Select, Insert, Delete, Parameter
+from twext.enterprise.locking import NamedLock
+from twext.enterprise.queue import WorkItem
+from twext.python.log import Logger
+
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+from twistedcaldav import caldavxml
+from twistedcaldav.config import config
+from twistedcaldav.ical import Component
+
+from txdav.caldav.datastore.scheduling.itip import iTipGenerator, iTIPRequestStatus
+from txdav.caldav.icalendarstore import ComponentUpdateState
+from txdav.common.datastore.sql_tables import schema
+
+import datetime
+import hashlib
+
+__all__ = [
+    "ScheduleReplyWork",
+    "ScheduleReplyCancelWork",
+    "ScheduleRefreshWork",
+    "ScheduleAutoReplyWork",
+]
+
+log = Logger()
+
+class ScheduleWorkMixin(object):
+    """
+    Base class for common schedule work item behavior.
+    """
+
+    # Schedule work is grouped based on calendar object UID
+    group = property(lambda self: "ScheduleWork:%s" % (self.icalendarUid,))
+
+
+
+class ScheduleReplyWorkMixin(ScheduleWorkMixin):
+
+
+    def makeScheduler(self, home):
+        """
+        Convenience method which we can override in unit tests to make testing easier.
+        """
+        from txdav.caldav.datastore.scheduling.caldav.scheduler import CalDAVScheduler
+        return CalDAVScheduler(self.transaction, home.uid())
+
+
+    @inlineCallbacks
+    def sendToOrganizer(self, home, action, itipmsg, originator, recipient):
+
+        # Send scheduling message
+
+        # This is a local CALDAV scheduling operation.
+        scheduler = self.makeScheduler(home)
+
+        # Do the PUT processing
+        log.info("Implicit %s - attendee: '%s' to organizer: '%s', UID: '%s'" % (action, originator, recipient, itipmsg.resourceUID(),))
+        response = (yield scheduler.doSchedulingViaPUT(originator, (recipient,), itipmsg, internal_request=True))
+        returnValue(response)
+
+
+
+class ScheduleReplyWork(WorkItem, fromTable(schema.SCHEDULE_REPLY_WORK), ScheduleReplyWorkMixin):
+    """
+    The associated work item table is SCHEDULE_REPLY_WORK.
+
+    This work item is used to send an iTIP reply message when an attendee changes
+    their partstat in the calendar object resource.
+    """
+
+    @classmethod
+    @inlineCallbacks
+    def reply(cls, txn, home, resource, changedRids, attendee):
+        # Always queue up new work - coalescing happens when work is executed
+        notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.QueuedReplyDelaySeconds)
+        proposal = (yield txn.enqueue(
+            cls,
+            notBefore=notBefore,
+            icalendarUid=resource.uid(),
+            homeResourceID=home.id(),
+            resourceID=resource.id(),
+            changedRids=",".join(changedRids) if changedRids else None,
+        ))
+        yield proposal.whenProposed()
+        log.debug("ScheduleReplyWork - enqueued for ID: {id}, UID: {uid}, attendee: {att}", id=proposal.workItem.workID, uid=resource.uid(), att=attendee)
+
+
+    @classmethod
+    @inlineCallbacks
+    def hasWork(cls, txn):
+        srw = schema.SCHEDULE_REPLY_WORK
+        rows = (yield Select(
+            (srw.WORK_ID,),
+            From=srw,
+        ).on(txn))
+        returnValue(len(rows) > 0)
+
+
+    @inlineCallbacks
+    def doWork(self):
+
+        try:
+            home = (yield self.transaction.calendarHomeWithResourceID(self.homeResourceID))
+            resource = (yield home.objectResourceWithID(self.resourceID))
+            attendeePrincipal = home.directoryService().recordWithUID(home.uid())
+            attendee = attendeePrincipal.canonicalCalendarUserAddress()
+            calendar = (yield resource.componentForUser())
+            organizer = calendar.validOrganizerForScheduling()
+
+            changedRids = self.changedRids.split(",") if self.changedRids else None
+
+            log.debug("ScheduleReplyWork - running for ID: {id}, UID: {uid}, attendee: {att}", id=self.workID, uid=calendar.resourceUID(), att=attendee)
+
+            # We need to get the UID lock for implicit processing.
+            yield NamedLock.acquire(self.transaction, "ImplicitUIDLock:%s" % (hashlib.md5(calendar.resourceUID()).hexdigest(),))
+
+            itipmsg = iTipGenerator.generateAttendeeReply(calendar, attendee, changedRids=changedRids)
+
+            # Send scheduling message and process response
+            response = (yield self.sendToOrganizer(home, "REPLY", itipmsg, attendee, organizer))
+            yield self.handleSchedulingResponse(response, calendar, resource, False)
+
+        except Exception, e:
+            log.debug("ScheduleReplyWork - exception ID: {id}, UID: '{uid}', {err}", id=self.workID, uid=calendar.resourceUID(), err=str(e))
+            raise
+        except:
+            log.debug("ScheduleReplyWork - bare exception ID: {id}, UID: '{uid}'", id=self.workID, uid=calendar.resourceUID())
+            raise
+
+
+    @inlineCallbacks
+    def handleSchedulingResponse(self, response, calendar, resource, is_organizer):
+        """
+        Update a user's calendar object resource based on the results of a queued scheduling
+        message response. Note we only need to update in the case where there is an error response
+        as we will already have updated the calendar object resource to make it look like scheduling
+        worked prior to the work queue item being enqueued.
+
+        @param response: the scheduling response object
+        @type response: L{caldavxml.ScheduleResponse}
+        @param calendar: original calendar component
+        @type calendar: L{Component}
+        @param resource: calendar object resource to update
+        @type resource: L{CalendarObject}
+        @param is_organizer: whether or not iTIP message was sent by the organizer
+        @type is_organizer: C{bool}
+        """
+
+        # Map each recipient in the response to a status code
+        changed = False
+        for item in response.responses:
+            assert isinstance(item, caldavxml.Response), "Wrong element in response"
+            recipient = str(item.children[0].children[0])
+            status = str(item.children[1])
+            statusCode = status.split(";")[0]
+
+            # Now apply to each ATTENDEE/ORGANIZER in the original data only if not 1.2
+            if statusCode != iTIPRequestStatus.MESSAGE_DELIVERED_CODE:
+                calendar.setParameterToValueForPropertyWithValue(
+                    "SCHEDULE-STATUS",
+                    statusCode,
+                    "ATTENDEE" if is_organizer else "ORGANIZER",
+                    recipient,
+                )
+                changed = True
+
+        if changed:
+            yield resource._setComponentInternal(calendar, internal_state=ComponentUpdateState.ATTENDEE_ITIP_UPDATE)
+
+
+
+class ScheduleReplyCancelWork(WorkItem, fromTable(schema.SCHEDULE_REPLY_CANCEL_WORK), ScheduleReplyWorkMixin):
+    """
+    The associated work item table is SCHEDULE_REPLY_CANCEL_WORK.
+
+    This work item is used to send an iTIP reply message when an attendee deletes
+    their copy of the calendar object resource. For this to work we need to store a copy
+    of the original resource data.
+    """
+
+    # Schedule work is grouped based on calendar object UID
+    group = property(lambda self: "ScheduleWork:%s" % (self.icalendarUid,))
+
+
+    @classmethod
+    @inlineCallbacks
+    def replyCancel(cls, txn, home, calendar, attendee):
+        # Always queue up new work - coalescing happens when work is executed
+        notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.QueuedReplyDelaySeconds)
+        proposal = (yield txn.enqueue(
+            cls,
+            notBefore=notBefore,
+            icalendarUid=calendar.resourceUID(),
+            homeResourceID=home.id(),
+            icalendarText=calendar.getTextWithTimezones(includeTimezones=not config.EnableTimezonesByReference),
+        ))
+        yield proposal.whenProposed()
+        log.debug("ScheduleReplyCancelWork - enqueued for ID: {id}, UID: {uid}, attendee: {att}", id=proposal.workItem.workID, uid=calendar.resourceUID(), att=attendee)
+
+
+    @inlineCallbacks
+    def doWork(self):
+
+        try:
+            home = (yield self.transaction.calendarHomeWithResourceID(self.homeResourceID))
+            attendeePrincipal = home.directoryService().recordWithUID(home.uid())
+            attendee = attendeePrincipal.canonicalCalendarUserAddress()
+            calendar = Component.fromString(self.icalendarText)
+            organizer = calendar.validOrganizerForScheduling()
+
+            log.debug("ScheduleReplyCancelWork - running for ID: {id}, UID: {uid}, attendee: {att}", id=self.workID, uid=calendar.resourceUID(), att=attendee)
+
+            # We need to get the UID lock for implicit processing.
+            yield NamedLock.acquire(self.transaction, "ImplicitUIDLock:%s" % (hashlib.md5(calendar.resourceUID()).hexdigest(),))
+
+            itipmsg = iTipGenerator.generateAttendeeReply(calendar, attendee, force_decline=True)
+
+            # Send scheduling message - no need to process response as original resource is gone
+            yield self.sendToOrganizer(home, "CANCEL", itipmsg, attendee, organizer)
+
+        except Exception, e:
+            log.debug("ScheduleReplyCancelWork - exception ID: {id}, UID: '{uid}', {err}", id=self.workID, uid=calendar.resourceUID(), err=str(e))
+            raise
+        except:
+            log.debug("ScheduleReplyCancelWork - bare exception ID: {id}, UID: '{uid}'", id=self.workID, uid=calendar.resourceUID())
+            raise
+
+
+
+class ScheduleRefreshWork(WorkItem, fromTable(schema.SCHEDULE_REFRESH_WORK), ScheduleWorkMixin):
+    """
+    The associated work item table is SCHEDULE_REFRESH_WORK.
+
+    This work item is used to trigger an iTIP refresh of attendees. This happens when one attendee
+    replies to an invite, and we want to have the others attendees see that change - eventually. We
+    are going to use the SCHEDULE_REFRESH_ATTENDEES table to track the list of attendees needing
+    a refresh for each calendar object resource (identified by the organizer's resource-id for that
+    calendar object). We want to do refreshes in batches with a configurable time between each batch.
+
+    The tricky part here is handling race conditions, where two or more attendee replies happen at the
+    same time, or happen whilst a previously queued refresh has started batch processing. Here is how
+    we will handle that:
+
+    1) Each time a refresh is needed we will add all attendees to the SCHEDULE_REFRESH_ATTENDEES table.
+    This will happen even if those attendees are currently listed in that table. We ensure the table is
+    not unique wrt to attendees - this means that two simultaneous refreshes can happily insert the
+    same set of attendees without running into unique constraints and thus without having to use
+    savepoints to cope with that. This will mean duplicate attendees listed in the table, but we take
+    care of that when executing the work item, as per the next point.
+
+    2) When a work item is triggered we get the set of unique attendees needing a refresh from the
+    SCHEDULE_REFRESH_ATTENDEES table. We split out a batch of those to actually refresh - with the
+    others being left in the table as-is. We then remove the batch of attendees from the
+    SCHEDULE_REFRESH_ATTENDEES table - this will remove duplicates. The refresh is then done and a
+    new work item scheduled to do the next batch. We only stop rescheduling work items when nothing
+    is found during the initial query. Note that if any refresh is done we will always reschedule work
+    even if we know none remain. That should handle the case where a new refresh occurs whilst
+    processing the last batch from a previous refresh.
+
+    Hopefully the above methodology will deal with concurrency issues, preventing any excessive locking
+    or failed inserts etc.
+    """
+
+    @classmethod
+    @inlineCallbacks
+    def refreshAttendees(cls, txn, organizer_resource, organizer_calendar, attendees):
+        # See if there is already a pending refresh and merge current attendees into that list,
+        # otherwise just mark all attendees as pending
+        sra = schema.SCHEDULE_REFRESH_ATTENDEES
+        pendingAttendees = (yield Select(
+            [sra.ATTENDEE, ],
+            From=sra,
+            Where=sra.RESOURCE_ID == organizer_resource.id(),
+        ).on(txn))
+        pendingAttendees = [row[0] for row in pendingAttendees]
+        attendeesToRefresh = set(attendees) - set(pendingAttendees)
+        for attendee in attendeesToRefresh:
+            yield Insert(
+                {
+                    sra.RESOURCE_ID: organizer_resource.id(),
+                    sra.ATTENDEE: attendee,
+                }
+            ).on(txn)
+
+        # Always queue up new work - coalescing happens when work is executed
+        notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchDelaySeconds)
+        yield txn.enqueue(
+            cls,
+            icalendarUid=organizer_resource.uid(),
+            homeResourceID=organizer_resource._home.id(),
+            resourceID=organizer_resource.id(),
+            notBefore=notBefore
+        )
+
+
+    @inlineCallbacks
+    def doWork(self):
+
+        # Look for other work items for this resource and ignore this one if other later ones exist
+        srw = schema.SCHEDULE_REFRESH_WORK
+        rows = (yield Select(
+            (srw.WORK_ID,),
+            From=srw,
+            Where=(srw.HOME_RESOURCE_ID == self.homeResourceID).And(
+                   srw.RESOURCE_ID == self.resourceID),
+        ).on(self.transaction))
+        if rows:
+            log.debug("Schedule refresh for resource-id: {rid} - ignored", rid=self.resourceID)
+            returnValue(None)
+
+        log.debug("Schedule refresh for resource-id: {rid}", rid=self.resourceID)
+
+        # Get the unique list of pending attendees and split into batch to process
+        # TODO: do a DELETE ... and rownum <= N returning attendee - but have to fix Oracle to
+        # handle multi-row returning. Would be better than entire select + delete of each one,
+        # but need to make sure to use UNIQUE as there may be duplicate attendees.
+        sra = schema.SCHEDULE_REFRESH_ATTENDEES
+        pendingAttendees = (yield Select(
+            [sra.ATTENDEE, ],
+            From=sra,
+            Where=sra.RESOURCE_ID == self.resourceID,
+        ).on(self.transaction))
+        pendingAttendees = list(set([row[0] for row in pendingAttendees]))
+
+        # Nothing left so done
+        if len(pendingAttendees) == 0:
+            returnValue(None)
+
+        attendeesToProcess = pendingAttendees[:config.Scheduling.Options.AttendeeRefreshBatch]
+        pendingAttendees = pendingAttendees[config.Scheduling.Options.AttendeeRefreshBatch:]
+
+        yield Delete(
+            From=sra,
+            Where=(sra.RESOURCE_ID == self.resourceID).And(sra.ATTENDEE.In(Parameter("attendeesToProcess", len(attendeesToProcess))))
+        ).on(self.transaction, attendeesToProcess=attendeesToProcess)
+
+        # Reschedule work item if pending attendees remain.
+        if len(pendingAttendees) != 0:
+            notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchIntervalSeconds)
+            yield self.transaction.enqueue(
+                self.__class__,
+                homeResourceID=self.homeResourceID,
+                resourceID=self.resourceID,
+                notBefore=notBefore
+            )
+
+        # Do refresh
+        yield self._doDelayedRefresh(attendeesToProcess)
+
+
+    @inlineCallbacks
+    def _doDelayedRefresh(self, attendeesToProcess):
+        """
+        Do an attendee refresh that has been delayed until after processing of the request that called it. That
+        requires that we create a new transaction to work with.
+
+        @param attendeesToProcess: list of attendees to refresh.
+        @type attendeesToProcess: C{list}
+        """
+
+        organizer_home = (yield self.transaction.calendarHomeWithResourceID(self.homeResourceID))
+        organizer_resource = (yield organizer_home.objectResourceWithID(self.resourceID))
+        if organizer_resource is not None:
+            try:
+                # We need to get the UID lock for implicit processing whilst we send the auto-reply
+                # as the Organizer processing will attempt to write out data to other attendees to
+                # refresh them. To prevent a race we need a lock.
+                yield NamedLock.acquire(self.transaction, "ImplicitUIDLock:%s" % (hashlib.md5(organizer_resource.uid()).hexdigest(),))
+
+                yield self._doRefresh(organizer_resource, attendeesToProcess)
+            except Exception, e:
+                log.debug("ImplicitProcessing - refresh exception UID: '{uid}', {exc}", uid=organizer_resource.uid(), exc=str(e))
+                raise
+            except:
+                log.debug("ImplicitProcessing - refresh bare exception UID: '{uid}'", uid=organizer_resource.uid())
+                raise
+        else:
+            log.debug("ImplicitProcessing - skipping refresh of missing ID: '{rid}'", rid=self.resourceID)
+
+
+    @inlineCallbacks
+    def _doRefresh(self, organizer_resource, only_attendees):
+        """
+        Do a refresh of attendees.
+
+        @param organizer_resource: the resource for the organizer's calendar data
+        @type organizer_resource: L{DAVResource}
+        @param only_attendees: list of attendees to refresh (C{None} - refresh all)
+        @type only_attendees: C{tuple}
+        """
+        log.debug("ImplicitProcessing - refreshing UID: '{uid}', Attendees: {att}", uid=organizer_resource.uid(), att=", ".join(only_attendees) if only_attendees else "all")
+        from txdav.caldav.datastore.scheduling.implicit import ImplicitScheduler
+        scheduler = ImplicitScheduler()
+        yield scheduler.refreshAllAttendeesExceptSome(
+            self.transaction,
+            organizer_resource,
+            only_attendees=only_attendees,
+        )
+
+
+
+class ScheduleAutoReplyWork(WorkItem, fromTable(schema.SCHEDULE_AUTO_REPLY_WORK), ScheduleWorkMixin):
+    """
+    The associated work item table is SCHEDULE_AUTO_REPLY_WORK.
+
+    This work item is used to send auto-reply iTIP messages after the calendar data for the
+    auto-accept user has been written to the user calendar.
+    """
+
+    @classmethod
+    @inlineCallbacks
+    def autoReply(cls, txn, resource, partstat):
+        # Always queue up new work - coalescing happens when work is executed
+        notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AutoReplyDelaySeconds)
+        yield txn.enqueue(
+            cls,
+            icalendarUid=resource.uid(),
+            homeResourceID=resource._home.id(),
+            resourceID=resource.id(),
+            partstat=partstat,
+            notBefore=notBefore,
+        )
+
+
+    @inlineCallbacks
+    def doWork(self):
+
+        log.debug("Schedule auto-reply for resource-id: {rid}", rid=self.resourceID)
+
+        # Delete all other work items with the same pushID
+        yield Delete(From=self.table,
+            Where=self.table.RESOURCE_ID == self.resourceID
+        ).on(self.transaction)
+
+        # Do reply
+        yield self._sendAttendeeAutoReply()
+
+
+    @inlineCallbacks
+    def _sendAttendeeAutoReply(self):
+        """
+        Auto-process the calendar option to generate automatic accept/decline status and
+        send a reply if needed.
+
+        We used to have logic to suppress attendee refreshes until after all auto-replies have
+        been processed. We can't do that with the work queue (easily) so we are going to ignore
+        that for now. It may not be a big deal given that the refreshes are themselves done in the
+        queue and we only do the refresh when the last queued work item is processed.
+
+        @param resource: calendar resource to process
+        @type resource: L{CalendarObject}
+        @param partstat: new partstat value
+        @type partstat: C{str}
+        """
+
+        home = (yield self.transaction.calendarHomeWithResourceID(self.homeResourceID))
+        resource = (yield home.objectResourceWithID(self.resourceID))
+        if resource is not None:
+            try:
+                # We need to get the UID lock for implicit processing whilst we send the auto-reply
+                # as the Organizer processing will attempt to write out data to other attendees to
+                # refresh them. To prevent a race we need a lock.
+                yield NamedLock.acquire(self.transaction, "ImplicitUIDLock:%s" % (hashlib.md5(resource.uid()).hexdigest(),))
+
+                # Send out a reply
+                log.debug("ImplicitProcessing - recipient '%s' processing UID: '%s' - auto-reply: %s" % (home.uid(), resource.uid(), self.partstat))
+                from txdav.caldav.datastore.scheduling.implicit import ImplicitScheduler
+                scheduler = ImplicitScheduler()
+                yield scheduler.sendAttendeeReply(self.transaction, resource)
+            except Exception, e:
+                log.debug("ImplicitProcessing - auto-reply exception UID: '%s', %s" % (resource.uid(), str(e)))
+                raise
+            except:
+                log.debug("ImplicitProcessing - auto-reply bare exception UID: '%s'" % (resource.uid(),))
+                raise
+        else:
+            log.debug("ImplicitProcessing - skipping auto-reply of missing ID: '{rid}'", rid=self.resourceID)


Property changes on: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/work.py
___________________________________________________________________
Added: svn:executable
   + *

Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current-oracle-dialect.sql
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current-oracle-dialect.sql	2013-10-13 14:57:55 UTC (rev 11810)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current-oracle-dialect.sql	2013-10-13 14:59:35 UTC (rev 11811)
@@ -363,6 +363,7 @@
 create table SCHEDULE_REFRESH_WORK (
     "WORK_ID" integer primary key not null,
     "NOT_BEFORE" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+    "ICALENDAR_UID" nvarchar2(255),
     "HOME_RESOURCE_ID" integer not null references CALENDAR_HOME on delete cascade,
     "RESOURCE_ID" integer not null references CALENDAR_OBJECT on delete cascade
 );
@@ -375,11 +376,29 @@
 create table SCHEDULE_AUTO_REPLY_WORK (
     "WORK_ID" integer primary key not null,
     "NOT_BEFORE" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+    "ICALENDAR_UID" nvarchar2(255),
     "HOME_RESOURCE_ID" integer not null references CALENDAR_HOME on delete cascade,
     "RESOURCE_ID" integer not null references CALENDAR_OBJECT on delete cascade,
     "PARTSTAT" nvarchar2(255)
 );
 
+create table SCHEDULE_REPLY_WORK (
+    "WORK_ID" integer primary key not null,
+    "NOT_BEFORE" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+    "ICALENDAR_UID" nvarchar2(255),
+    "HOME_RESOURCE_ID" integer not null references CALENDAR_HOME on delete cascade,
+    "RESOURCE_ID" integer not null references CALENDAR_OBJECT on delete cascade,
+    "CHANGED_RIDS" nclob
+);
+
+create table SCHEDULE_REPLY_CANCEL_WORK (
+    "WORK_ID" integer primary key not null,
+    "NOT_BEFORE" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+    "ICALENDAR_UID" nvarchar2(255),
+    "HOME_RESOURCE_ID" integer not null references CALENDAR_HOME on delete cascade,
+    "ICALENDAR_TEXT" nclob
+);
+
 create table CALENDARSERVER (
     "NAME" nvarchar2(255) primary key,
     "VALUE" nvarchar2(255)
@@ -530,3 +549,15 @@
     RESOURCE_ID
 );
 
+create index SCHEDULE_REPLY_WORK_H_745af8cf on SCHEDULE_REPLY_WORK (
+    HOME_RESOURCE_ID
+);
+
+create index SCHEDULE_REPLY_WORK_R_11bd3fbb on SCHEDULE_REPLY_WORK (
+    RESOURCE_ID
+);
+
+create index SCHEDULE_REPLY_CANCEL_dab513ef on SCHEDULE_REPLY_CANCEL_WORK (
+    HOME_RESOURCE_ID
+);
+

Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql	2013-10-13 14:57:55 UTC (rev 11810)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql	2013-10-13 14:59:35 UTC (rev 11811)
@@ -693,6 +693,7 @@
 create table SCHEDULE_REFRESH_WORK (
   WORK_ID                       integer      primary key default nextval('WORKITEM_SEQ') not null, -- implicit index
   NOT_BEFORE                    timestamp    default timezone('UTC', CURRENT_TIMESTAMP),
+  ICALENDAR_UID        			varchar(255) not null,
   HOME_RESOURCE_ID              integer      not null references CALENDAR_HOME on delete cascade,
   RESOURCE_ID                   integer      not null references CALENDAR_OBJECT on delete cascade
 );
@@ -717,6 +718,7 @@
 create table SCHEDULE_AUTO_REPLY_WORK (
   WORK_ID                       integer      primary key default nextval('WORKITEM_SEQ') not null, -- implicit index
   NOT_BEFORE                    timestamp    default timezone('UTC', CURRENT_TIMESTAMP),
+  ICALENDAR_UID        			varchar(255) not null,
   HOME_RESOURCE_ID              integer      not null references CALENDAR_HOME on delete cascade,
   RESOURCE_ID                   integer      not null references CALENDAR_OBJECT on delete cascade,
   PARTSTAT						varchar(255) not null
@@ -727,6 +729,39 @@
 create index SCHEDULE_AUTO_REPLY_WORK_RESOURCE_ID on
 	SCHEDULE_AUTO_REPLY_WORK(RESOURCE_ID);
 
+-------------------------
+-- Schedule Reply Work --
+-------------------------
+
+create table SCHEDULE_REPLY_WORK (
+  WORK_ID                       integer      primary key default nextval('WORKITEM_SEQ') not null, -- implicit index
+  NOT_BEFORE                    timestamp    default timezone('UTC', CURRENT_TIMESTAMP),
+  ICALENDAR_UID        			varchar(255) not null,
+  HOME_RESOURCE_ID              integer      not null references CALENDAR_HOME on delete cascade,
+  RESOURCE_ID                   integer      not null references CALENDAR_OBJECT on delete cascade,
+  CHANGED_RIDS       			text
+);
+
+create index SCHEDULE_REPLY_WORK_HOME_RESOURCE_ID on
+	SCHEDULE_REPLY_WORK(HOME_RESOURCE_ID);
+create index SCHEDULE_REPLY_WORK_RESOURCE_ID on
+	SCHEDULE_REPLY_WORK(RESOURCE_ID);
+
+--------------------------------
+-- Schedule Reply Cancel Work --
+--------------------------------
+
+create table SCHEDULE_REPLY_CANCEL_WORK (
+  WORK_ID                       integer      primary key default nextval('WORKITEM_SEQ') not null, -- implicit index
+  NOT_BEFORE                    timestamp    default timezone('UTC', CURRENT_TIMESTAMP),
+  ICALENDAR_UID        			varchar(255) not null,
+  HOME_RESOURCE_ID              integer      not null references CALENDAR_HOME on delete cascade,
+  ICALENDAR_TEXT       			text         not null
+);
+
+create index SCHEDULE_REPLY_CANCEL_WORK_HOME_RESOURCE_ID on
+	SCHEDULE_REPLY_CANCEL_WORK(HOME_RESOURCE_ID);
+
 --------------------
 -- Schema Version --
 --------------------

Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_25_to_26.sql
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_25_to_26.sql	2013-10-13 14:57:55 UTC (rev 11810)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_25_to_26.sql	2013-10-13 14:59:35 UTC (rev 11811)
@@ -23,6 +23,7 @@
 create table SCHEDULE_REFRESH_WORK (
     "WORK_ID" integer primary key not null,
     "NOT_BEFORE" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+    "ICALENDAR_UID" nvarchar2(255),
     "HOME_RESOURCE_ID" integer not null references CALENDAR_HOME on delete cascade,
     "RESOURCE_ID" integer not null references CALENDAR_OBJECT on delete cascade
 );
@@ -48,6 +49,7 @@
 create table SCHEDULE_AUTO_REPLY_WORK (
     "WORK_ID" integer primary key not null,
     "NOT_BEFORE" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+    "ICALENDAR_UID" nvarchar2(255),
     "HOME_RESOURCE_ID" integer not null references CALENDAR_HOME on delete cascade,
     "RESOURCE_ID" integer not null references CALENDAR_OBJECT on delete cascade,
     "PARTSTAT" nvarchar2(255)
@@ -61,6 +63,36 @@
     RESOURCE_ID
 );
 
+create table SCHEDULE_REPLY_WORK (
+    "WORK_ID" integer primary key not null,
+    "NOT_BEFORE" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+    "ICALENDAR_UID" nvarchar2(255),
+    "HOME_RESOURCE_ID" integer not null references CALENDAR_HOME on delete cascade,
+    "RESOURCE_ID" integer not null references CALENDAR_OBJECT on delete cascade,
+    "CHANGED_RIDS" nclob
+);
+
+create index SCHEDULE_REPLY_WORK_H_745af8cf on SCHEDULE_REPLY_WORK (
+    HOME_RESOURCE_ID
+);
+
+create index SCHEDULE_REPLY_WORK_R_11bd3fbb on SCHEDULE_REPLY_WORK (
+    RESOURCE_ID
+);
+
+create table SCHEDULE_REPLY_CANCEL_WORK (
+    "WORK_ID" integer primary key not null,
+    "NOT_BEFORE" timestamp default CURRENT_TIMESTAMP at time zone 'UTC',
+    "ICALENDAR_UID" nvarchar2(255),
+    "HOME_RESOURCE_ID" integer not null references CALENDAR_HOME on delete cascade,
+    "ICALENDAR_TEXT" nclob
+);
+
+create index SCHEDULE_REPLY_CANCEL_dab513ef on SCHEDULE_REPLY_CANCEL_WORK (
+    HOME_RESOURCE_ID
+);
+
+
 -- Now update the version
 -- No data upgrades
 update CALENDARSERVER set VALUE = '26' where NAME = 'VERSION';

Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_25_to_26.sql
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_25_to_26.sql	2013-10-13 14:57:55 UTC (rev 11810)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_25_to_26.sql	2013-10-13 14:59:35 UTC (rev 11811)
@@ -20,9 +20,14 @@
 
 -- New tables
 
+---------------------------
+-- Schedule Refresh Work --
+---------------------------
+
 create table SCHEDULE_REFRESH_WORK (
   WORK_ID                       integer      primary key default nextval('WORKITEM_SEQ') not null, -- implicit index
   NOT_BEFORE                    timestamp    default timezone('UTC', CURRENT_TIMESTAMP),
+  ICALENDAR_UID        			varchar(255) not null,
   HOME_RESOURCE_ID              integer      not null references CALENDAR_HOME on delete cascade,
   RESOURCE_ID                   integer      not null references CALENDAR_OBJECT on delete cascade
 );
@@ -40,10 +45,14 @@
 create index SCHEDULE_REFRESH_ATTENDEES_RESOURCE_ID_ATTENDEE on
 	SCHEDULE_REFRESH_ATTENDEES(RESOURCE_ID, ATTENDEE);
 
+------------------------------
+-- Schedule Auto Reply Work --
+------------------------------
 
-	create table SCHEDULE_AUTO_REPLY_WORK (
+create table SCHEDULE_AUTO_REPLY_WORK (
   WORK_ID                       integer      primary key default nextval('WORKITEM_SEQ') not null, -- implicit index
   NOT_BEFORE                    timestamp    default timezone('UTC', CURRENT_TIMESTAMP),
+  ICALENDAR_UID        			varchar(255) not null,
   HOME_RESOURCE_ID              integer      not null references CALENDAR_HOME on delete cascade,
   RESOURCE_ID                   integer      not null references CALENDAR_OBJECT on delete cascade,
   PARTSTAT						varchar(255) not null
@@ -54,6 +63,40 @@
 create index SCHEDULE_AUTO_REPLY_WORK_RESOURCE_ID on
 	SCHEDULE_AUTO_REPLY_WORK(RESOURCE_ID);
 
+-------------------------
+-- Schedule Reply Work --
+-------------------------
+
+create table SCHEDULE_REPLY_WORK (
+  WORK_ID                       integer      primary key default nextval('WORKITEM_SEQ') not null, -- implicit index
+  NOT_BEFORE                    timestamp    default timezone('UTC', CURRENT_TIMESTAMP),
+  ICALENDAR_UID        			varchar(255) not null,
+  HOME_RESOURCE_ID              integer      not null references CALENDAR_HOME on delete cascade,
+  RESOURCE_ID                   integer      not null references CALENDAR_OBJECT on delete cascade,
+  CHANGED_RIDS       			text
+);
+
+create index SCHEDULE_REPLY_WORK_HOME_RESOURCE_ID on
+	SCHEDULE_REPLY_WORK(HOME_RESOURCE_ID);
+create index SCHEDULE_REPLY_WORK_RESOURCE_ID on
+	SCHEDULE_REPLY_WORK(RESOURCE_ID);
+
+--------------------------------
+-- Schedule Reply Cancel Work --
+--------------------------------
+
+create table SCHEDULE_REPLY_CANCEL_WORK (
+  WORK_ID                       integer      primary key default nextval('WORKITEM_SEQ') not null, -- implicit index
+  NOT_BEFORE                    timestamp    default timezone('UTC', CURRENT_TIMESTAMP),
+  ICALENDAR_UID        			varchar(255) not null,
+  HOME_RESOURCE_ID              integer      not null references CALENDAR_HOME on delete cascade,
+  ICALENDAR_TEXT       			text         not null
+);
+
+create index SCHEDULE_REPLY_CANCEL_WORK_HOME_RESOURCE_ID on
+	SCHEDULE_REPLY_CANCEL_WORK(HOME_RESOURCE_ID);
+
+
 -- Now update the version
 -- No data upgrades
 update CALENDARSERVER set VALUE = '26' where NAME = 'VERSION';
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20131013/25c0f72f/attachment-0001.html>


More information about the calendarserver-changes mailing list