[CalendarServer-changes] [11814] CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav

source_changes at macosforge.org source_changes at macosforge.org
Mon Oct 14 14:08:10 PDT 2013


Revision: 11814
          http://trac.calendarserver.org//changeset/11814
Author:   cdaboo at apple.com
Date:     2013-10-14 14:08:10 -0700 (Mon, 14 Oct 2013)
Log Message:
-----------
Implement organizer work item.

Modified Paths:
--------------
    CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/implicit.py
    CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/itip.py
    CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/work.py
    CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql
    CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_tables.py

Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/implicit.py
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/implicit.py	2013-10-14 20:34:03 UTC (rev 11813)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/implicit.py	2013-10-14 21:08:10 UTC (rev 11814)
@@ -35,7 +35,7 @@
 from txdav.caldav.datastore.scheduling.itip import iTipGenerator, iTIPRequestStatus
 from txdav.caldav.datastore.scheduling.utils import getCalendarObjectForRecord
 from txdav.caldav.datastore.scheduling.work import ScheduleReplyWork, \
-    ScheduleReplyCancelWork
+    ScheduleReplyCancelWork, ScheduleOrganizerWork
 
 import collections
 
@@ -45,6 +45,12 @@
 
 log = Logger()
 
+
+class ImplicitSchedulingWorkError(Exception):
+    pass
+
+
+
 # TODO:
 #
 # Handle the case where a PUT removes the ORGANIZER property. That should be equivalent to cancelling the entire meeting.
@@ -390,6 +396,75 @@
 
 
     @inlineCallbacks
+    def queuedOrganizerProcessing(self, txn, action, home, resource, uid, calendar, smart_merge):
+        """
+        Process an organizer scheduling work queue item. The basic goal here is to setup the ImplicitScheduler as if
+        this operation were the equivalent of the PUT that enqueued the work, and then do the actual work.
+        """
+
+        self.txn = txn
+        self.action = action
+        self.state = "organizer"
+        self.calendar_home = home
+        self.resource = resource
+        self.do_smart_merge = smart_merge
+
+        # Handle different action scenarios
+        if action == "create":
+            # resource is None, calendar is None
+            # Find the newly created resource
+            resources = (yield self.calendar_home.objectResourcesWithUID(uid, ignore_children=["inbox"], allowShared=False))
+            if len(resources) != 1:
+                # Ughh - what has happened? It is possible the resource was created then deleted before we could start work processing,
+                # so simply ignore this
+                log.debug("ImplicitScheduler - queuedOrganizerProcessing 'create' cannot find organizer resource for UID: {uid}", uid=calendar.resourceUID())
+                returnValue(None)
+            self.resource = resources[0]
+
+            # The calendar data to use is the current calendar data, not what was stored in the work item, since it might have been
+            # updated a few times after the create, but those modifications are effectively coalesced into the create
+            self.calendar = (yield self.resource.componentForUser())
+
+        elif action == "modify":
+            # Check that the resource still exists - it may have been deleted after this work item was queued, in which
+            # case we have to ignore this (on the assumption that the "remove" action will have queued some work that will
+            # execute soon).
+            if self.resource is None:
+                log.debug("ImplicitScheduler - queuedOrganizerProcessing 'modify' cannot find organizer resource for UID: {uid}", uid=calendar.resourceUID())
+                returnValue(None)
+
+            # The new calendar data is what is currently stored - other modifications may have causes coalescing.
+            # Old calendar data is what was stored int he work item
+            self.calendar = (yield self.resource.componentForUser())
+            self.oldcalendar = calendar
+
+        elif action == "remove":
+            # Check whether the resource still exists - it cannot be in existence as once it is deleted, its resource-id
+            # should never be used again.
+            if self.resource is not None:
+                log.debug("ImplicitScheduler - queuedOrganizerProcessing 'remove' found an organizer resource for UID: {uid}", uid=calendar.resourceUID())
+                raise ImplicitSchedulingWorkError("Resource exists for queued 'remove' scheduling work")
+
+            # The "new" calendar data is in fact the calendar data at the time of the remove - which is the data stored
+            # in the work item.
+            self.calendar = calendar
+
+        yield self.extractCalendarData()
+        self.organizerPrincipal = self.calendar_home.directoryService().recordWithCalendarUserAddress(self.organizer)
+        self.organizerAddress = (yield addressmapping.mapper.getCalendarUser(self.organizer, self.organizerPrincipal))
+
+        # Originator is the organizer in this case
+        self.originatorPrincipal = self.organizerPrincipal
+        self.originator = self.organizer
+
+        self.except_attendees = ()
+        self.only_refresh_attendees = None
+        self.split_details = None
+
+        yield self.doImplicitOrganizer(queued=True)
+
+
+    @inlineCallbacks
     def sendAttendeeReply(self, txn, resource):
 
         self.txn = txn
@@ -520,9 +595,10 @@
 
 
     @inlineCallbacks
-    def doImplicitOrganizer(self):
+    def doImplicitOrganizer(self, queued=False):
 
-        self.oldcalendar = None
+        if not queued:
+            self.oldcalendar = None
         self.changed_rids = None
         self.cancelledAttendees = ()
         self.reinvites = None
@@ -540,19 +616,21 @@
             self.cancelledAttendees = [(attendee, None) for attendee in self.attendees]
 
             # CANCEL always bumps sequence
-            self.needs_sequence_change = True
+            if not queued:
+                self.needs_sequence_change = True
 
         # Check for a new resource or an update
         elif self.action == "modify":
 
             # Read in existing data
-            self.oldcalendar = (yield self.resource.componentForUser())
+            if not queued:
+                self.oldcalendar = (yield self.resource.componentForUser())
             self.oldAttendeesByInstance = self.oldcalendar.getAttendeesByInstance(True, onlyScheduleAgentServer=True)
             self.oldInstances = set(self.oldcalendar.getComponentInstances())
             self.coerceAttendeesPartstatOnModify()
 
             # Don't allow any SEQUENCE to decrease
-            if self.oldcalendar:
+            if self.oldcalendar and not queued:
                 self.calendar.sequenceInSync(self.oldcalendar)
 
             # Significant change
@@ -599,7 +677,8 @@
 
                 # For now we always bump the sequence number on modifications because we cannot track DTSTAMP on
                 # the Attendee side. But we check the old and the new and only bump if the client did not already do it.
-                self.needs_sequence_change = self.calendar.needsiTIPSequenceChange(self.oldcalendar)
+                if not queued:
+                    self.needs_sequence_change = self.calendar.needsiTIPSequenceChange(self.oldcalendar)
 
         elif self.action == "create":
             if self.split_details is None:
@@ -617,7 +696,12 @@
         if self.needs_sequence_change:
             self.calendar.bumpiTIPInfo(oldcalendar=self.oldcalendar, doSequence=True)
 
-        yield self.scheduleWithAttendees()
+        # If processing a queue item, actually execute the scheduling operations, else queue it.
+        # Note a split is always queued, so we do not need to re-queue
+        if queued or self.split_details is not None:
+            yield self.scheduleWithAttendees()
+        else:
+            yield self.queuedScheduleWithAttendees()
 
         # Always clear SCHEDULE-FORCE-SEND from all attendees after scheduling
         for attendee in self.calendar.getAllAttendeeProperties():
@@ -915,6 +999,133 @@
 
 
     @inlineCallbacks
+    def queuedScheduleWithAttendees(self):
+
+        # First make sure we are allowed to schedule
+        self.testSchedulingAllowed()
+
+        yield ScheduleOrganizerWork.schedule(
+            self.txn,
+            self.oldcalendar.resourceUID() if self.oldcalendar else self.calendar.resourceUID(),
+            self.action,
+            self.calendar_home,
+            self.resource,
+            self.oldcalendar,
+            self.organizerPrincipal.canonicalCalendarUserAddress(),
+            self.do_smart_merge,
+        )
+
+        # First process cancelled attendees
+        total = (yield self.processQueuedCancels())
+
+        # Process regular requests next
+        if self.action in ("create", "modify",):
+            total += (yield self.processQueuedRequests())
+
+        self.logItems["itip.requests"] = total
+
+
+    @inlineCallbacks
+    def processQueuedCancels(self):
+        """
+        Set each ATTENDEE who would be scheduled to status to 1.2.
+        """
+
+        # Do one per attendee
+        aggregated = {}
+        for attendee, rid in self.cancelledAttendees:
+            aggregated.setdefault(attendee, []).append(rid)
+
+        count = 0
+        for attendee, rids in aggregated.iteritems():
+
+            # Don't send message back to the ORGANIZER
+            if attendee in self.organizerPrincipal.calendarUserAddresses:
+                continue
+
+            # Handle split by not scheduling local attendees
+            if self.split_details is not None:
+                attendeePrincipal = self.calendar_home.directoryService().recordWithCalendarUserAddress(attendee)
+                attendeeAddress = (yield addressmapping.mapper.getCalendarUser(attendee, attendeePrincipal))
+                if type(attendeeAddress) is LocalCalendarUser:
+                    continue
+
+            # Test whether an iTIP CANCEL message for this attendee would be generated
+            if None in rids:
+                # One big CANCEL will do
+                itipmsg = iTipGenerator.generateCancel(self.oldcalendar, (attendee,), None, self.action == "remove", test_only=True)
+            else:
+                # Multiple CANCELs
+                itipmsg = iTipGenerator.generateCancel(self.oldcalendar, (attendee,), rids, test_only=True)
+
+            # Send scheduling message
+            if itipmsg:
+
+                # Always make it look like scheduling succeeded when queuing
+                self.calendar.setParameterToValueForPropertyWithValue(
+                    "SCHEDULE-STATUS",
+                    iTIPRequestStatus.MESSAGE_DELIVERED_CODE,
+                    "ATTENDEE",
+                    attendee,
+                )
+
+                count += 1
+
+        returnValue(count)
+
+
+    @inlineCallbacks
+    def processQueuedRequests(self):
+        """
+        Set each ATTENDEE who would be scheduled to status to 1.2.
+        """
+
+        # Do one per attendee
+        count = 0
+        for attendee in self.attendees:
+
+            # Don't send message back to the ORGANIZER
+            if attendee in self.organizerPrincipal.calendarUserAddresses:
+                continue
+
+            # Don't send message to specified attendees
+            if attendee in self.except_attendees:
+                continue
+
+            # Only send to specified attendees
+            if self.only_refresh_attendees is not None and attendee not in self.only_refresh_attendees:
+                continue
+
+            # If SCHEDULE-FORCE-SEND only change, only send message to those Attendees
+            if self.reinvites and attendee not in self.reinvites:
+                continue
+
+            # Handle split by not scheduling local attendees
+            if self.split_details is not None:
+                attendeePrincipal = self.calendar_home.directoryService().recordWithCalendarUserAddress(attendee)
+                attendeeAddress = (yield addressmapping.mapper.getCalendarUser(attendee, attendeePrincipal))
+                if type(attendeeAddress) is LocalCalendarUser:
+                    continue
+
+            itipmsg = iTipGenerator.generateAttendeeRequest(self.calendar, (attendee,), self.changed_rids, test_only=True)
+
+            # Send scheduling message
+            if itipmsg is not None:
+
+                # Always make it look like scheduling succeeded when queuing
+                self.calendar.setParameterToValueForPropertyWithValue(
+                    "SCHEDULE-STATUS",
+                    iTIPRequestStatus.MESSAGE_DELIVERED_CODE,
+                    "ATTENDEE",
+                    attendee,
+                )
+
+                count += 1
+
+        returnValue(count)
+
+
+    @inlineCallbacks
     def scheduleWithAttendees(self):
 
         # First make sure we are allowed to schedule

Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/itip.py
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/itip.py	2013-10-14 20:34:03 UTC (rev 11813)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/itip.py	2013-10-14 21:08:10 UTC (rev 11814)
@@ -729,7 +729,7 @@
     """
 
     @staticmethod
-    def generateCancel(original, attendees, instances=None, full_cancel=False):
+    def generateCancel(original, attendees, instances=None, full_cancel=False, test_only=False):
         """
         This assumes that SEQUENCE is not already at its new value in the original calendar data. This
         is because the component passed in is the one that originally contained the attendee that is
@@ -748,9 +748,6 @@
         added = False
         for instance_rid in instances:
 
-            # Create a new component matching the type of the original
-            comp = Component(original.mainType())
-
             # Use the master component when the instance is None
             if not instance_rid:
                 instance = original.masterComponent()
@@ -765,6 +762,14 @@
                 if instance is None:
                     continue
 
+            # If testing, skip the rest
+            if test_only:
+                added = True
+                continue
+
+            # Create a new component matching the type of the original
+            comp = Component(original.mainType())
+
             # Add some required properties extracted from the original
             comp.addProperty(Property("DTSTAMP", instance.propertyValue("DTSTAMP")))
             comp.addProperty(Property("UID", instance.propertyValue("UID")))
@@ -802,6 +807,11 @@
             itip.addComponent(comp)
             added = True
 
+        # When testing only need to return whether an itip would have been created or not
+        if test_only:
+            return added
+
+        # Handle actual iTIP message
         if added:
             # Now include any referenced tzids
             for comp in original.subcomponents():
@@ -819,7 +829,7 @@
 
 
     @staticmethod
-    def generateAttendeeRequest(original, attendees, filter_rids):
+    def generateAttendeeRequest(original, attendees, filter_rids, test_only=False):
         """
         This assumes that SEQUENCE is already at its new value in the original calendar data.
         """
@@ -835,7 +845,8 @@
         # Now filter out components except the ones specified
         if itip.filterComponents(filter_rids):
             # Strip out unwanted bits
-            iTipGenerator.prepareSchedulingMessage(itip)
+            if not test_only:
+                iTipGenerator.prepareSchedulingMessage(itip)
             return itip
 
         else:

Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/work.py
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/work.py	2013-10-14 20:34:03 UTC (rev 11813)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/work.py	2013-10-14 21:08:10 UTC (rev 11814)
@@ -28,12 +28,14 @@
 
 from txdav.caldav.datastore.scheduling.itip import iTipGenerator, iTIPRequestStatus
 from txdav.caldav.icalendarstore import ComponentUpdateState
-from txdav.common.datastore.sql_tables import schema
+from txdav.common.datastore.sql_tables import schema, \
+    scheduleActionToSQL, scheduleActionFromSQL
 
 import datetime
 import hashlib
 
 __all__ = [
+    "ScheduleOrganizerWork",
     "ScheduleReplyWork",
     "ScheduleReplyCancelWork",
     "ScheduleRefreshWork",
@@ -51,7 +53,125 @@
     group = property(lambda self: "ScheduleWork:%s" % (self.icalendarUid,))
 
 
+    @inlineCallbacks
+    def handleSchedulingResponse(self, response, calendar, resource, is_organizer):
+        """
+        Update a user's calendar object resource based on the results of a queued scheduling
+        message response. Note we only need to update in the case where there is an error response
+        as we will already have updated the calendar object resource to make it look like scheduling
+        worked prior to the work queue item being enqueued.
 
+        @param response: the scheduling response object
+        @type response: L{caldavxml.ScheduleResponse}
+        @param calendar: original calendar component
+        @type calendar: L{Component}
+        @param resource: calendar object resource to update
+        @type resource: L{CalendarObject}
+        @param is_organizer: whether or not iTIP message was sent by the organizer
+        @type is_organizer: C{bool}
+        """
+
+        # Map each recipient in the response to a status code
+        changed = False
+        for item in response.responses:
+            assert isinstance(item, caldavxml.Response), "Wrong element in response"
+            recipient = str(item.children[0].children[0])
+            status = str(item.children[1])
+            statusCode = status.split(";")[0]
+
+            # Now apply to each ATTENDEE/ORGANIZER in the original data only if not 1.2
+            if statusCode != iTIPRequestStatus.MESSAGE_DELIVERED_CODE:
+                calendar.setParameterToValueForPropertyWithValue(
+                    "SCHEDULE-STATUS",
+                    statusCode,
+                    "ATTENDEE" if is_organizer else "ORGANIZER",
+                    recipient,
+                )
+                changed = True
+
+        if changed:
+            yield resource._setComponentInternal(calendar, internal_state=ComponentUpdateState.ATTENDEE_ITIP_UPDATE)
+
+
+
+class ScheduleOrganizerWork(WorkItem, fromTable(schema.SCHEDULE_ORGANIZER_WORK), ScheduleWorkMixin):
+    """
+    The associated work item table is SCHEDULE_ORGANIZER_WORK.
+
+    This work item is used to send a iTIP request and cancel messages when an organizer changes
+    their calendar object resource.
+    """
+
+    @classmethod
+    @inlineCallbacks
+    def schedule(cls, txn, uid, action, home, resource, calendar, organizer, smart_merge):
+        """
+        The actual arguments depend on the action:
+
+        1) If action is "create", resource is None, calendar is None
+        2) If action is "modify", resource is existing resource, calendar is the old calendar data
+        3) If action is "remove", resource is the existing resource, calendar is the old calendar data
+
+        Note that for (1), when the work executes the resource will be in existence so we need to load it.
+        Note that for (3), when work executes the resource will have been removed.
+        """
+        # Always queue up new work - coalescing happens when work is executed
+        notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.QueuedRequestDelaySeconds)
+        proposal = (yield txn.enqueue(
+            cls,
+            notBefore=notBefore,
+            icalendarUid=uid,
+            scheduleAction=scheduleActionToSQL[action],
+            homeResourceID=home.id(),
+            resourceID=resource.id() if resource else None,
+            icalendarText=calendar.getTextWithTimezones(includeTimezones=not config.EnableTimezonesByReference) if calendar else None,
+            smartMerge=smart_merge
+        ))
+        yield proposal.whenProposed()
+        log.debug("ScheduleOrganizerWork - enqueued for ID: {id}, UID: {uid}, organizer: {org}", id=proposal.workItem.workID, uid=uid, org=organizer)
+
+
+    @classmethod
+    @inlineCallbacks
+    def hasWork(cls, txn):
+        srw = schema.SCHEDULE_ORGANIZER_WORK
+        rows = (yield Select(
+            (srw.WORK_ID,),
+            From=srw,
+        ).on(txn))
+        returnValue(len(rows) > 0)
+
+
+    @inlineCallbacks
+    def doWork(self):
+
+        try:
+            home = (yield self.transaction.calendarHomeWithResourceID(self.homeResourceID))
+            resource = (yield home.objectResourceWithID(self.resourceID))
+            organizerPrincipal = home.directoryService().recordWithUID(home.uid())
+            organizer = organizerPrincipal.canonicalCalendarUserAddress()
+            calendar = Component.fromString(self.icalendarText) if self.icalendarText else None
+
+            log.debug("ScheduleOrganizerWork - running for ID: {id}, UID: {uid}, organizer: {org}", id=self.workID, uid=self.icalendarUid, org=organizer)
+
+            # We need to get the UID lock for implicit processing.
+            yield NamedLock.acquire(self.transaction, "ImplicitUIDLock:%s" % (hashlib.md5(self.icalendarUid).hexdigest(),))
+
+            from txdav.caldav.datastore.scheduling.implicit import ImplicitScheduler
+            scheduler = ImplicitScheduler()
+            yield scheduler.queuedOrganizerProcessing(self.transaction, scheduleActionFromSQL[self.scheduleAction], home, resource, self.icalendarUid, calendar, self.smartMerge)
+
+        except Exception, e:
+            log.debug("ScheduleOrganizerWork - exception ID: {id}, UID: '{uid}', {err}", id=self.workID, uid=self.icalendarUid, err=str(e))
+            raise
+        except:
+            log.debug("ScheduleOrganizerWork - bare exception ID: {id}, UID: '{uid}'", id=self.workID, uid=self.icalendarUid)
+            raise
+
+        log.debug("ScheduleOrganizerWork - done for ID: {id}, UID: {uid}, organizer: {org}", id=self.workID, uid=self.icalendarUid, org=organizer)
+
+
+
 class ScheduleReplyWorkMixin(ScheduleWorkMixin):
 
 
@@ -145,48 +265,10 @@
             log.debug("ScheduleReplyWork - bare exception ID: {id}, UID: '{uid}'", id=self.workID, uid=calendar.resourceUID())
             raise
 
+        log.debug("ScheduleReplyWork - done for ID: {id}, UID: {uid}, attendee: {att}", id=self.workID, uid=calendar.resourceUID(), att=attendee)
 
-    @inlineCallbacks
-    def handleSchedulingResponse(self, response, calendar, resource, is_organizer):
-        """
-        Update a user's calendar object resource based on the results of a queued scheduling
-        message response. Note we only need to update in the case where there is an error response
-        as we will already have updated the calendar object resource to make it look like scheduling
-        worked prior to the work queue item being enqueued.
 
-        @param response: the scheduling response object
-        @type response: L{caldavxml.ScheduleResponse}
-        @param calendar: original calendar component
-        @type calendar: L{Component}
-        @param resource: calendar object resource to update
-        @type resource: L{CalendarObject}
-        @param is_organizer: whether or not iTIP message was sent by the organizer
-        @type is_organizer: C{bool}
-        """
 
-        # Map each recipient in the response to a status code
-        changed = False
-        for item in response.responses:
-            assert isinstance(item, caldavxml.Response), "Wrong element in response"
-            recipient = str(item.children[0].children[0])
-            status = str(item.children[1])
-            statusCode = status.split(";")[0]
-
-            # Now apply to each ATTENDEE/ORGANIZER in the original data only if not 1.2
-            if statusCode != iTIPRequestStatus.MESSAGE_DELIVERED_CODE:
-                calendar.setParameterToValueForPropertyWithValue(
-                    "SCHEDULE-STATUS",
-                    statusCode,
-                    "ATTENDEE" if is_organizer else "ORGANIZER",
-                    recipient,
-                )
-                changed = True
-
-        if changed:
-            yield resource._setComponentInternal(calendar, internal_state=ComponentUpdateState.ATTENDEE_ITIP_UPDATE)
-
-
-
 class ScheduleReplyCancelWork(WorkItem, fromTable(schema.SCHEDULE_REPLY_CANCEL_WORK), ScheduleReplyWorkMixin):
     """
     The associated work item table is SCHEDULE_REPLY_CANCEL_WORK.
@@ -243,8 +325,10 @@
             log.debug("ScheduleReplyCancelWork - bare exception ID: {id}, UID: '{uid}'", id=self.workID, uid=calendar.resourceUID())
             raise
 
+        log.debug("ScheduleReplyCancelWork - done for ID: {id}, UID: {uid}, attendee: {att}", id=self.workID, uid=calendar.resourceUID(), att=attendee)
 
 
+
 class ScheduleRefreshWork(WorkItem, fromTable(schema.SCHEDULE_REFRESH_WORK), ScheduleWorkMixin):
     """
     The associated work item table is SCHEDULE_REFRESH_WORK.
@@ -302,13 +386,15 @@
 
         # Always queue up new work - coalescing happens when work is executed
         notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchDelaySeconds)
-        yield txn.enqueue(
+        proposal = (yield txn.enqueue(
             cls,
             icalendarUid=organizer_resource.uid(),
             homeResourceID=organizer_resource._home.id(),
             resourceID=organizer_resource.id(),
             notBefore=notBefore
-        )
+        ))
+        yield proposal.whenProposed()
+        log.debug("ScheduleRefreshWork - enqueued for ID: {id}, UID: {uid}, attendees: {att}", id=proposal.workItem.workID, uid=organizer_resource.uid(), att=",".join(attendeesToRefresh))
 
 
     @inlineCallbacks
@@ -326,7 +412,7 @@
             log.debug("Schedule refresh for resource-id: {rid} - ignored", rid=self.resourceID)
             returnValue(None)
 
-        log.debug("Schedule refresh for resource-id: {rid}", rid=self.resourceID)
+        log.debug("ScheduleRefreshWork - running for ID: {id}, UID: {uid}", id=self.workID, uid=self.icalendarUid)
 
         # Get the unique list of pending attendees and split into batch to process
         # TODO: do a DELETE ... and rownum <= N returning attendee - but have to fix Oracle to
@@ -365,7 +451,9 @@
         # Do refresh
         yield self._doDelayedRefresh(attendeesToProcess)
 
+        log.debug("ScheduleRefreshWork - done for ID: {id}, UID: {uid}", id=self.workID, uid=self.icalendarUid)
 
+
     @inlineCallbacks
     def _doDelayedRefresh(self, attendeesToProcess):
         """
@@ -430,20 +518,22 @@
     def autoReply(cls, txn, resource, partstat):
         # Always queue up new work - coalescing happens when work is executed
         notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AutoReplyDelaySeconds)
-        yield txn.enqueue(
+        proposal = (yield txn.enqueue(
             cls,
             icalendarUid=resource.uid(),
             homeResourceID=resource._home.id(),
             resourceID=resource.id(),
             partstat=partstat,
             notBefore=notBefore,
-        )
+        ))
+        yield proposal.whenProposed()
+        log.debug("ScheduleAutoReplyWork - enqueued for ID: {id}, UID: {uid}", id=proposal.workItem.workID, uid=resource.uid())
 
 
     @inlineCallbacks
     def doWork(self):
 
-        log.debug("Schedule auto-reply for resource-id: {rid}", rid=self.resourceID)
+        log.debug("ScheduleAutoReplyWork - running for ID: {id}, UID: {uid}", id=self.workID, uid=self.icalendarUid)
 
         # Delete all other work items with the same pushID
         yield Delete(From=self.table,
@@ -453,7 +543,9 @@
         # Do reply
         yield self._sendAttendeeAutoReply()
 
+        log.debug("ScheduleAutoReplyWork - done for ID: {id}, UID: {uid}", id=self.workID, uid=self.icalendarUid)
 
+
     @inlineCallbacks
     def _sendAttendeeAutoReply(self):
         """

Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql	2013-10-14 20:34:03 UTC (rev 11813)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_schema/current.sql	2013-10-14 21:08:10 UTC (rev 11814)
@@ -729,6 +729,37 @@
 create index SCHEDULE_AUTO_REPLY_WORK_RESOURCE_ID on
 	SCHEDULE_AUTO_REPLY_WORK(RESOURCE_ID);
 
+-----------------------------
+-- Schedule Organizer Work --
+-----------------------------
+
+create table SCHEDULE_ORGANIZER_WORK (
+  WORK_ID                       integer      primary key default nextval('WORKITEM_SEQ') not null, -- implicit index
+  NOT_BEFORE                    timestamp    default timezone('UTC', CURRENT_TIMESTAMP),
+  ICALENDAR_UID        			varchar(255) not null,
+  SCHEDULE_ACTION				integer		 not null, -- Enum SCHEDULE_ACTION
+  HOME_RESOURCE_ID              integer      not null references CALENDAR_HOME on delete cascade,
+  RESOURCE_ID                   integer,	 -- this references a possibly non-existent CALENDR_OBJECT
+  ICALENDAR_TEXT				text,
+  SMART_MERGE					boolean
+);
+
+create index SCHEDULE_ORGANIZER_WORK_HOME_RESOURCE_ID on
+	SCHEDULE_ORGANIZER_WORK(HOME_RESOURCE_ID);
+create index SCHEDULE_ORGANIZER_WORK_RESOURCE_ID on
+	SCHEDULE_ORGANIZER_WORK(RESOURCE_ID);
+
+-- Enumeration of schedule actions
+
+create table SCHEDULE_ACTION (
+  ID          integer     primary key,
+  DESCRIPTION varchar(16) not null unique
+);
+
+insert into SCHEDULE_ACTION values (0, 'create');
+insert into SCHEDULE_ACTION values (1, 'modify');
+insert into SCHEDULE_ACTION values (2, 'remove');
+
 -------------------------
 -- Schedule Reply Work --
 -------------------------

Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_tables.py
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_tables.py	2013-10-14 20:34:03 UTC (rev 11813)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/common/datastore/sql_tables.py	2013-10-14 21:08:10 UTC (rev 11814)
@@ -131,6 +131,22 @@
 
 
 
+def _schemaConstantsMaps(nameColumn, valueColumn):
+    """
+    Generate two dicts that map back and forth between SQL enum values and their
+    programmatic values.
+    """
+
+    toSQL = {}
+    fromSQL = {}
+    for row in nameColumn.model.table.schemaRows:
+        toSQL[row[nameColumn.model]] = row[valueColumn.model]
+        fromSQL[row[valueColumn.model]] = row[nameColumn.model]
+
+    return (toSQL, fromSQL)
+
+
+
 # Various constants
 
 _bindStatus = _schemaConstants(
@@ -185,6 +201,12 @@
 _ABO_KIND_RESOURCE = _addressBookObjectKind('resource')
 _ABO_KIND_LOCATION = _addressBookObjectKind('location')
 
+scheduleActionToSQL, scheduleActionFromSQL = _schemaConstantsMaps(
+    schema.SCHEDULE_ACTION.DESCRIPTION,
+    schema.SCHEDULE_ACTION.ID
+)
+
+
 class SchemaBroken(Exception):
     """
     The schema is broken and cannot be translated.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20131014/45ebba6b/attachment-0001.html>


More information about the calendarserver-changes mailing list