[CalendarServer-changes] [11794] CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav /caldav/datastore/scheduling/processing.py

source_changes at macosforge.org source_changes at macosforge.org
Sun Oct 6 18:45:21 PDT 2013


Revision: 11794
          http://trac.calendarserver.org//changeset/11794
Author:   cdaboo at apple.com
Date:     2013-10-06 18:45:20 -0700 (Sun, 06 Oct 2013)
Log Message:
-----------
Remove old refresh api. Make sure we cope with possible race conditions.

Modified Paths:
--------------
    CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py

Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py	2013-10-06 16:53:22 UTC (rev 11793)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py	2013-10-07 01:45:20 UTC (rev 11794)
@@ -33,15 +33,12 @@
 from twistedcaldav.config import config
 from twistedcaldav.ical import Property
 from twistedcaldav.instance import InvalidOverriddenInstanceError
-from twistedcaldav.memcachelock import MemcacheLock, MemcacheLockTimeoutError
-from twistedcaldav.memcacher import Memcacher
 
 from txdav.caldav.datastore.scheduling.cuaddress import normalizeCUAddr
 from txdav.caldav.datastore.scheduling.freebusy import generateFreeBusyInfo
 from txdav.caldav.datastore.scheduling.itip import iTipProcessing, iTIPRequestStatus
 from txdav.caldav.datastore.scheduling.utils import getCalendarObjectForRecord
-from txdav.caldav.icalendarstore import ComponentUpdateState, \
-    ComponentRemoveState
+from txdav.caldav.icalendarstore import ComponentUpdateState, ComponentRemoveState
 from txdav.common.datastore.sql_tables import schema
 
 import collections
@@ -277,46 +274,6 @@
 
         # Check for batched refreshes
         if config.Scheduling.Options.AttendeeRefreshBatch:
-
-#            # Need to lock whilst manipulating the batch list
-#            lock = MemcacheLock(
-#                "BatchRefreshUIDLock",
-#                self.uid,
-#                timeout=config.Scheduling.Options.UIDLockTimeoutSeconds,
-#                expire_time=config.Scheduling.Options.UIDLockExpirySeconds,
-#            )
-#            try:
-#                yield lock.acquire()
-#            except MemcacheLockTimeoutError:
-#                # If we could not lock then just fail the refresh - not sure what else to do
-#                returnValue(None)
-#
-#            try:
-#                # Get all attendees to refresh
-#                allAttendees = sorted(list(self.recipient_calendar.getAllUniqueAttendees()))
-#                allAttendees = filter(lambda x: x not in exclude_attendees, allAttendees)
-#
-#                if allAttendees:
-#                    # See if there is already a pending refresh and merge current attendees into that list,
-#                    # otherwise just mark all attendees as pending
-#                    cache = Memcacher("BatchRefreshAttendees", pickle=True)
-#                    pendingAttendees = yield cache.get(self.uid)
-#                    firstTime = False
-#                    if pendingAttendees:
-#                        for attendee in allAttendees:
-#                            if attendee not in pendingAttendees:
-#                                pendingAttendees.append(attendee)
-#                    else:
-#                        firstTime = True
-#                        pendingAttendees = allAttendees
-#                    yield cache.set(self.uid, pendingAttendees)
-#
-#                    # Now start the first batch off
-#                    if firstTime:
-#                        self._enqueueBatchRefresh()
-#            finally:
-#                yield lock.clean()
-
             yield self.ScheduleRefreshWork.refreshAttendees(
                 self.txn,
                 self.recipient_calendar_resource,
@@ -352,100 +309,6 @@
 
 
     @inlineCallbacks
-    def _doDelayedRefresh(self, attendeesToProcess):
-        """
-        Do an attendee refresh that has been delayed until after processing of the request that called it. That
-        requires that we create a new transaction to work with.
-
-        @param attendeesToProcess: list of attendees to refresh.
-        @type attendeesToProcess: C{list}
-        """
-
-        # The original transaction is still around but likely committed at this point, so we need a brand new
-        # transaction to do this work.
-        txn = yield self.txn.store().newTransaction("Delayed attendee refresh for UID: %s" % (self.uid,))
-
-        try:
-            # We need to get the UID lock for implicit processing whilst we send the auto-reply
-            # as the Organizer processing will attempt to write out data to other attendees to
-            # refresh them. To prevent a race we need a lock.
-            yield NamedLock.acquire(txn, "ImplicitUIDLock:%s" % (hashlib.md5(self.uid).hexdigest(),))
-
-            organizer_home = (yield txn.calendarHomeWithUID(self.organizer_uid))
-            organizer_resource = (yield organizer_home.objectResourceWithID(self.organizer_calendar_resource_id))
-            if organizer_resource is not None:
-                yield self._doRefresh(organizer_resource, only_attendees=attendeesToProcess)
-            else:
-                log.debug("ImplicitProcessing - skipping refresh of missing UID: '%s'" % (self.uid,))
-        except Exception, e:
-            log.debug("ImplicitProcessing - refresh exception UID: '%s', %s" % (self.uid, str(e)))
-            yield txn.abort()
-        except:
-            log.debug("ImplicitProcessing - refresh bare exception UID: '%s'" % (self.uid,))
-            yield txn.abort()
-        else:
-            yield txn.commit()
-
-
-    def _enqueueBatchRefresh(self):
-        """
-        Mostly here to help unit test by being able to stub this out.
-        """
-        reactor.callLater(config.Scheduling.Options.AttendeeRefreshBatchDelaySeconds, self._doBatchRefresh)
-
-
-    @inlineCallbacks
-    def _doBatchRefresh(self):
-        """
-        Do refresh of attendees in batches until the batch list is empty.
-        """
-
-        # Need to lock whilst manipulating the batch list
-        log.debug("ImplicitProcessing - batch refresh for UID: '%s'" % (self.uid,))
-        lock = MemcacheLock(
-            "BatchRefreshUIDLock",
-            self.uid,
-            timeout=config.Scheduling.Options.UIDLockTimeoutSeconds,
-            expire_time=config.Scheduling.Options.UIDLockExpirySeconds,
-        )
-        try:
-            yield lock.acquire()
-        except MemcacheLockTimeoutError:
-            # If we could not lock then just fail the refresh - not sure what else to do
-            returnValue(None)
-
-        try:
-            # Get the batch list
-            cache = Memcacher("BatchRefreshAttendees", pickle=True)
-            pendingAttendees = yield cache.get(self.uid)
-            if pendingAttendees:
-
-                # Get the next batch of attendees to process and update the cache value or remove it if
-                # no more processing is needed
-                attendeesToProcess = pendingAttendees[:config.Scheduling.Options.AttendeeRefreshBatch]
-                pendingAttendees = pendingAttendees[config.Scheduling.Options.AttendeeRefreshBatch:]
-                if pendingAttendees:
-                    yield cache.set(self.uid, pendingAttendees)
-                else:
-                    yield cache.delete(self.uid)
-
-                # Make sure we release this here to avoid potential deadlock when grabbing the ImplicitUIDLock in the next call
-                yield lock.release()
-
-                # Now do the batch refresh
-                yield self._doDelayedRefresh(attendeesToProcess)
-
-                # Queue the next refresh if needed
-                if pendingAttendees:
-                    self._enqueueBatchRefresh()
-            else:
-                yield cache.delete(self.uid)
-                yield lock.release()
-        finally:
-            yield lock.clean()
-
-
-    @inlineCallbacks
     def doImplicitAttendee(self):
 
         # Locate the attendee's copy of the event if it exists.
@@ -1142,26 +1005,29 @@
             ).on(self.transaction))
             pendingAttendees = [row[0] for row in pendingAttendees]
 
+            # Nothing left so done
+            if len(pendingAttendees) == 0:
+                returnValue(None)
+
             attendeesToProcess = pendingAttendees[:config.Scheduling.Options.AttendeeRefreshBatch]
             pendingAttendees = pendingAttendees[config.Scheduling.Options.AttendeeRefreshBatch:]
-            if pendingAttendees:
-                yield Delete(
-                    From=sra,
-                    Where=(sra.RESOURCE_ID == self.resourceID).And(sra.ATTENDEE.In(Parameter("attendeesToProcess", len(attendeesToProcess))))
-                ).on(self.transaction, attendeesToProcess=attendeesToProcess)
-            else:
-                log.debug("Schedule refresh for resource-id: {rid} missing pending attendees", rid=self.resourceID)
 
-            # If some remain to process, reschedule work item
-            if pendingAttendees:
-                notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchDelaySeconds)
-                yield self.transaction.enqueue(
-                    self.__class__,
-                    homeResourceID=self.homeResourceID,
-                    resourceID=self.resourceID,
-                    notBefore=notBefore
-                )
+            yield Delete(
+                From=sra,
+                Where=(sra.RESOURCE_ID == self.resourceID).And(sra.ATTENDEE.In(Parameter("attendeesToProcess", len(attendeesToProcess))))
+            ).on(self.transaction, attendeesToProcess=attendeesToProcess)
 
+            # Always reschedule work item, even if nothing appears to be left. This should take care of
+            # any race condition where another transaction adding attendees to refresh thinks there is still
+            # a pending work item and does not reschedule one itself.
+            notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchDelaySeconds)
+            yield self.transaction.enqueue(
+                self.__class__,
+                homeResourceID=self.homeResourceID,
+                resourceID=self.resourceID,
+                notBefore=notBefore
+            )
+
             # Do refresh
             yield self._doDelayedRefresh(attendeesToProcess)
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20131006/60c518b4/attachment-0001.html>


More information about the calendarserver-changes mailing list