[CalendarServer-changes] [11794] CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav /caldav/datastore/scheduling/processing.py
source_changes at macosforge.org
source_changes at macosforge.org
Sun Oct 6 18:45:21 PDT 2013
Revision: 11794
http://trac.calendarserver.org//changeset/11794
Author: cdaboo at apple.com
Date: 2013-10-06 18:45:20 -0700 (Sun, 06 Oct 2013)
Log Message:
-----------
Remove old refresh api. Make sure we cope with possible race conditions.
Modified Paths:
--------------
CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py
Modified: CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py
===================================================================
--- CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py 2013-10-06 16:53:22 UTC (rev 11793)
+++ CalendarServer/branches/users/cdaboo/scheduling-queue-refresh/txdav/caldav/datastore/scheduling/processing.py 2013-10-07 01:45:20 UTC (rev 11794)
@@ -33,15 +33,12 @@
from twistedcaldav.config import config
from twistedcaldav.ical import Property
from twistedcaldav.instance import InvalidOverriddenInstanceError
-from twistedcaldav.memcachelock import MemcacheLock, MemcacheLockTimeoutError
-from twistedcaldav.memcacher import Memcacher
from txdav.caldav.datastore.scheduling.cuaddress import normalizeCUAddr
from txdav.caldav.datastore.scheduling.freebusy import generateFreeBusyInfo
from txdav.caldav.datastore.scheduling.itip import iTipProcessing, iTIPRequestStatus
from txdav.caldav.datastore.scheduling.utils import getCalendarObjectForRecord
-from txdav.caldav.icalendarstore import ComponentUpdateState, \
- ComponentRemoveState
+from txdav.caldav.icalendarstore import ComponentUpdateState, ComponentRemoveState
from txdav.common.datastore.sql_tables import schema
import collections
@@ -277,46 +274,6 @@
# Check for batched refreshes
if config.Scheduling.Options.AttendeeRefreshBatch:
-
-# # Need to lock whilst manipulating the batch list
-# lock = MemcacheLock(
-# "BatchRefreshUIDLock",
-# self.uid,
-# timeout=config.Scheduling.Options.UIDLockTimeoutSeconds,
-# expire_time=config.Scheduling.Options.UIDLockExpirySeconds,
-# )
-# try:
-# yield lock.acquire()
-# except MemcacheLockTimeoutError:
-# # If we could not lock then just fail the refresh - not sure what else to do
-# returnValue(None)
-#
-# try:
-# # Get all attendees to refresh
-# allAttendees = sorted(list(self.recipient_calendar.getAllUniqueAttendees()))
-# allAttendees = filter(lambda x: x not in exclude_attendees, allAttendees)
-#
-# if allAttendees:
-# # See if there is already a pending refresh and merge current attendees into that list,
-# # otherwise just mark all attendees as pending
-# cache = Memcacher("BatchRefreshAttendees", pickle=True)
-# pendingAttendees = yield cache.get(self.uid)
-# firstTime = False
-# if pendingAttendees:
-# for attendee in allAttendees:
-# if attendee not in pendingAttendees:
-# pendingAttendees.append(attendee)
-# else:
-# firstTime = True
-# pendingAttendees = allAttendees
-# yield cache.set(self.uid, pendingAttendees)
-#
-# # Now start the first batch off
-# if firstTime:
-# self._enqueueBatchRefresh()
-# finally:
-# yield lock.clean()
-
yield self.ScheduleRefreshWork.refreshAttendees(
self.txn,
self.recipient_calendar_resource,
@@ -352,100 +309,6 @@
@inlineCallbacks
- def _doDelayedRefresh(self, attendeesToProcess):
- """
- Do an attendee refresh that has been delayed until after processing of the request that called it. That
- requires that we create a new transaction to work with.
-
- @param attendeesToProcess: list of attendees to refresh.
- @type attendeesToProcess: C{list}
- """
-
- # The original transaction is still around but likely committed at this point, so we need a brand new
- # transaction to do this work.
- txn = yield self.txn.store().newTransaction("Delayed attendee refresh for UID: %s" % (self.uid,))
-
- try:
- # We need to get the UID lock for implicit processing whilst we send the auto-reply
- # as the Organizer processing will attempt to write out data to other attendees to
- # refresh them. To prevent a race we need a lock.
- yield NamedLock.acquire(txn, "ImplicitUIDLock:%s" % (hashlib.md5(self.uid).hexdigest(),))
-
- organizer_home = (yield txn.calendarHomeWithUID(self.organizer_uid))
- organizer_resource = (yield organizer_home.objectResourceWithID(self.organizer_calendar_resource_id))
- if organizer_resource is not None:
- yield self._doRefresh(organizer_resource, only_attendees=attendeesToProcess)
- else:
- log.debug("ImplicitProcessing - skipping refresh of missing UID: '%s'" % (self.uid,))
- except Exception, e:
- log.debug("ImplicitProcessing - refresh exception UID: '%s', %s" % (self.uid, str(e)))
- yield txn.abort()
- except:
- log.debug("ImplicitProcessing - refresh bare exception UID: '%s'" % (self.uid,))
- yield txn.abort()
- else:
- yield txn.commit()
-
-
- def _enqueueBatchRefresh(self):
- """
- Mostly here to help unit test by being able to stub this out.
- """
- reactor.callLater(config.Scheduling.Options.AttendeeRefreshBatchDelaySeconds, self._doBatchRefresh)
-
-
- @inlineCallbacks
- def _doBatchRefresh(self):
- """
- Do refresh of attendees in batches until the batch list is empty.
- """
-
- # Need to lock whilst manipulating the batch list
- log.debug("ImplicitProcessing - batch refresh for UID: '%s'" % (self.uid,))
- lock = MemcacheLock(
- "BatchRefreshUIDLock",
- self.uid,
- timeout=config.Scheduling.Options.UIDLockTimeoutSeconds,
- expire_time=config.Scheduling.Options.UIDLockExpirySeconds,
- )
- try:
- yield lock.acquire()
- except MemcacheLockTimeoutError:
- # If we could not lock then just fail the refresh - not sure what else to do
- returnValue(None)
-
- try:
- # Get the batch list
- cache = Memcacher("BatchRefreshAttendees", pickle=True)
- pendingAttendees = yield cache.get(self.uid)
- if pendingAttendees:
-
- # Get the next batch of attendees to process and update the cache value or remove it if
- # no more processing is needed
- attendeesToProcess = pendingAttendees[:config.Scheduling.Options.AttendeeRefreshBatch]
- pendingAttendees = pendingAttendees[config.Scheduling.Options.AttendeeRefreshBatch:]
- if pendingAttendees:
- yield cache.set(self.uid, pendingAttendees)
- else:
- yield cache.delete(self.uid)
-
- # Make sure we release this here to avoid potential deadlock when grabbing the ImplicitUIDLock in the next call
- yield lock.release()
-
- # Now do the batch refresh
- yield self._doDelayedRefresh(attendeesToProcess)
-
- # Queue the next refresh if needed
- if pendingAttendees:
- self._enqueueBatchRefresh()
- else:
- yield cache.delete(self.uid)
- yield lock.release()
- finally:
- yield lock.clean()
-
-
- @inlineCallbacks
def doImplicitAttendee(self):
# Locate the attendee's copy of the event if it exists.
@@ -1142,26 +1005,29 @@
).on(self.transaction))
pendingAttendees = [row[0] for row in pendingAttendees]
+ # Nothing left so done
+ if len(pendingAttendees) == 0:
+ returnValue(None)
+
attendeesToProcess = pendingAttendees[:config.Scheduling.Options.AttendeeRefreshBatch]
pendingAttendees = pendingAttendees[config.Scheduling.Options.AttendeeRefreshBatch:]
- if pendingAttendees:
- yield Delete(
- From=sra,
- Where=(sra.RESOURCE_ID == self.resourceID).And(sra.ATTENDEE.In(Parameter("attendeesToProcess", len(attendeesToProcess))))
- ).on(self.transaction, attendeesToProcess=attendeesToProcess)
- else:
- log.debug("Schedule refresh for resource-id: {rid} missing pending attendees", rid=self.resourceID)
- # If some remain to process, reschedule work item
- if pendingAttendees:
- notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchDelaySeconds)
- yield self.transaction.enqueue(
- self.__class__,
- homeResourceID=self.homeResourceID,
- resourceID=self.resourceID,
- notBefore=notBefore
- )
+ yield Delete(
+ From=sra,
+ Where=(sra.RESOURCE_ID == self.resourceID).And(sra.ATTENDEE.In(Parameter("attendeesToProcess", len(attendeesToProcess))))
+ ).on(self.transaction, attendeesToProcess=attendeesToProcess)
+ # Always reschedule work item, even if nothing appears to be left. This should take care of
+ # any race condition where another transaction adding attendees to refresh thinks there is still
+ # a pending work item and does not reschedule one itself.
+ notBefore = datetime.datetime.utcnow() + datetime.timedelta(seconds=config.Scheduling.Options.AttendeeRefreshBatchDelaySeconds)
+ yield self.transaction.enqueue(
+ self.__class__,
+ homeResourceID=self.homeResourceID,
+ resourceID=self.resourceID,
+ notBefore=notBefore
+ )
+
# Do refresh
yield self._doDelayedRefresh(attendeesToProcess)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20131006/60c518b4/attachment-0001.html>
More information about the calendarserver-changes
mailing list