[CalendarServer-changes] [11933] CalendarServer/branches/users/sagen/groupcacher
source_changes at macosforge.org
source_changes at macosforge.org
Wed Mar 12 11:24:14 PDT 2014
Revision: 11933
http://trac.calendarserver.org//changeset/11933
Author: gaya at apple.com
Date: 2013-11-11 16:22:37 -0800 (Mon, 11 Nov 2013)
Log Message:
-----------
implement GroupAttendeeReconciliationWork, GroupCacher.scheduleEventReconciliations(); add DirectoryRecord.attendee()
Modified Paths:
--------------
CalendarServer/branches/users/sagen/groupcacher/twext/who/groups.py
CalendarServer/branches/users/sagen/groupcacher/twistedcaldav/directory/directory.py
CalendarServer/branches/users/sagen/groupcacher/txdav/caldav/icalendardirectoryservice.py
Modified: CalendarServer/branches/users/sagen/groupcacher/twext/who/groups.py
===================================================================
--- CalendarServer/branches/users/sagen/groupcacher/twext/who/groups.py 2013-11-10 03:40:56 UTC (rev 11932)
+++ CalendarServer/branches/users/sagen/groupcacher/twext/who/groups.py 2013-11-12 00:22:37 UTC (rev 11933)
@@ -19,15 +19,16 @@
Group membership caching
"""
-import datetime
-import hashlib
from twext.enterprise.dal.record import fromTable
+from twext.enterprise.dal.syntax import Delete, Select
from twext.enterprise.queue import WorkItem, PeerConnectionPool
+from twext.who.delegates import allGroupDelegates
+from twext.who.idirectory import RecordType
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twistedcaldav.ical import ignoredComponents
from txdav.common.datastore.sql_tables import schema
-from twisted.internet.defer import inlineCallbacks, returnValue, succeed
-from twext.enterprise.dal.syntax import Delete
-from twext.who.idirectory import RecordType
-from twext.who.delegates import allGroupDelegates
+import datetime
+import hashlib
from twext.python.log import Logger
log = Logger()
@@ -37,7 +38,7 @@
fromTable(schema.GROUP_CACHER_POLLING_WORK)):
group = "group_cacher_polling"
-
+
@inlineCallbacks
def doWork(self):
@@ -67,6 +68,7 @@
notBefore=notBefore)
+
@inlineCallbacks
def scheduleNextGroupCachingUpdate(store, seconds):
txn = store.newTransaction()
@@ -77,6 +79,7 @@
returnValue(wp)
+
def schedulePolledGroupCachingUpdate(store):
"""
Schedules a group caching update work item in "the past" so PeerConnectionPool's
@@ -86,6 +89,7 @@
return scheduleNextGroupCachingUpdate(store, seconds)
+
class GroupRefreshWork(WorkItem, fromTable(schema.GROUP_REFRESH_WORK)):
group = property(lambda self: self.groupGUID)
@@ -114,10 +118,87 @@
groupGUID=self.groupGUID, notBefore=notBefore)
+
class GroupAttendeeReconciliationWork(WorkItem, fromTable(schema.GROUP_ATTENDEE_RECONCILIATION_WORK)):
- pass
+ group = property(lambda self: "%s, %s" % (self.groupID, self.eventID))
+ @inlineCallbacks
+ def doWork(self):
+
+ # Delete all other work items for this group
+ yield Delete(From=self.table,
+ Where=((self.table.GROUP_ID == self.self.groupID).And(
+ self.table.RESOURCE_ID == self.self.eventID)
+ )
+ ).on(self.transaction)
+
+ # get group individual UIDs
+ groupMemember = schema.GROUP_MEMBERSHIP
+ rows = yield Select(
+ [groupMemember.MEMBER_GUID, ],
+ From=groupMemember,
+ Where=groupMemember.GROUP_ID == self.groupID,
+ ).on(self.transaction)
+ individualGUIDs = [row[0] for row in rows]
+
+ # get calendar Object
+ calObject = schema.CALENDAR_OBJECT
+ rows = yield Select(
+ [calObject.CALENDAR_RESOURCE_ID, ],
+ From=calObject,
+ Where=calObject.RESOURCE_ID == self.eventID,
+ ).on(self.transaction)
+
+ calendarID = row[0][0]
+ calendarHome = (yield self.Calendar._ownerHomeWithResourceID.on(
+ self.transaction, resourceID=calendarID)
+ )[0][0]
+
+ calendar = yield calendarHome.childWithID(calendarID)
+ calendarObject = yield calendar.objectResourceWithID(self.eventID)
+ changed = False
+
+ individualUUIDs = set(["urn:uuid:" + individualGUID for individualGUID in individualGUIDs])
+ groupUUID = "urn:uuid:" + self.groupGUID()
+ vcalendar = yield calendarObject.component()
+ for component in vcalendar.subcomponents():
+ if component.name() in ignoredComponents:
+ continue
+
+ oldAttendeeProps = component.getAttendees()
+ oldAttendeeUUIDs = set([attendeeProp.value() for attendeeProp in oldAttendeeProps])
+
+ # add new member attendees
+ for individualUUID in individualUUIDs - oldAttendeeUUIDs:
+ individualGUID = individualUUID[len("urn:uuid:"):]
+ directoryRecord = self.transaction.directoryService().recordWithUID(individualGUID)
+ newAttendeeProp = directoryRecord.attendee(params={"MEMBER": groupUUID})
+ component.addProperty(newAttendeeProp)
+ changed = True
+
+ # remove attendee or update MEMBER attribute for non-primary attendees in this group,
+ for attendeeProp in oldAttendeeProps:
+ memberParam = attendeeProp.getParameter("MEMBER")
+ if memberParam:
+ if groupUUID in memberParam.getValues():
+ if attendeeProp.value() not in individualUUIDs:
+ valueCount = memberParam.removeValue(groupUUID)
+ if valueCount == 0:
+ component.removeProperty(attendeeProp)
+ changed = True
+ else:
+ if attendeeProp.value() in individualUUIDs:
+ memberParam.addValue(groupUUID)
+ changed = True
+
+ # replace old with new
+ if changed:
+ # TODO: call calendarObject._setComponentInternal( vcalendar, mode ) instead?
+ yield calendarObject.setComponent(vcalendar)
+
+
+
@inlineCallbacks
def _expandedMembers(record, members=None, records=None):
"""
@@ -140,6 +221,7 @@
returnValue(members)
+
class GroupCacher(object):
log = Logger()
@@ -179,11 +261,11 @@
membershipHashContent = hashlib.md5()
members = (yield _expandedMembers(record))
members = list(members)
- members.sort(cmp=lambda x,y: cmp(x.guid, y.guid))
+ members.sort(cmp=lambda x, y: cmp(x.guid, y.guid))
for member in members:
membershipHashContent.update(member.guid)
membershipHash = membershipHashContent.hexdigest()
- groupID, cachedName, cachedMembershipHash = (yield
+ groupID, cachedName, cachedMembershipHash = (yield #@UnusedVariable
txn.groupByGUID(groupGUID))
if cachedMembershipHash != membershipHash:
@@ -200,7 +282,7 @@
newMemberGUIDs.add(member.guid)
yield self.synchronizeMembers(txn, groupID, newMemberGUIDs)
- yield self.scheduleEventReconciliations(txn, groupID)
+ yield self.scheduleEventReconciliations(txn, groupID, groupGUID)
@inlineCallbacks
@@ -232,18 +314,38 @@
returnValue(members)
-
-
# @inlineCallbacks
- def scheduleEventReconciliations(self, txn, groupID):
+ def scheduleEventReconciliations(self, txn, groupID, groupGUID):
"""
Find all events who have this groupID as an attendee and create
work items for them.
"""
- return succeed(None)
+ groupAttendee = schema.GROUP_ATTENDEE
+ rows = yield Select(
+ [groupAttendee.RESOURCE_ID, ],
+ From=groupAttendee,
+ Where=groupAttendee.GROUP_ID == groupID,
+ ).on(txn)
+ eventIDs = [row[0] for row in rows]
+ for eventID in eventIDs:
+ notBefore = (datetime.datetime.utcnow() +
+ datetime.timedelta(seconds=10))
+ log.debug("scheduling group reconciliation for ({eventID}, {groupID}, {groupGUID}): {when}",
+ eventID=eventID,
+ groupID=groupID,
+ groupGUID=groupGUID,
+ when=notBefore)
+ yield txn.enqueue(GroupAttendeeReconciliationWork,
+ eventID=eventID,
+ groupID=groupID,
+ groupGUID=groupGUID,
+ notBefore=notBefore
+ )
+
+
@inlineCallbacks
def groupsToRefresh(self, txn):
delegatedGUIDs = set((yield allGroupDelegates(txn)))
Modified: CalendarServer/branches/users/sagen/groupcacher/twistedcaldav/directory/directory.py
===================================================================
--- CalendarServer/branches/users/sagen/groupcacher/twistedcaldav/directory/directory.py 2013-11-10 03:40:56 UTC (rev 11932)
+++ CalendarServer/branches/users/sagen/groupcacher/twistedcaldav/directory/directory.py 2013-11-12 00:22:37 UTC (rev 11933)
@@ -43,6 +43,7 @@
from twistedcaldav.config import config
from twistedcaldav.directory.idirectory import IDirectoryService, IDirectoryRecord
from twistedcaldav.directory.util import uuidFromName, normalizeUUID
+from twistedcaldav.ical import Property
from twistedcaldav.memcacher import Memcacher
from txdav.caldav.datastore.scheduling.cuaddress import normalizeCUAddr
from txdav.caldav.datastore.scheduling.ischedule.localservers import Servers
@@ -85,8 +86,8 @@
searchContext_location = "location"
searchContext_resource = "resource"
- searchContext_user = "user"
- searchContext_group = "group"
+ searchContext_user = "user"
+ searchContext_group = "group"
searchContext_attendee = "attendee"
aggregateService = None
@@ -240,7 +241,7 @@
return record if record and record.enabledForCalendaring else None
- def recordWithCachedGroupsAlias(self, recordType, alias):
+ def recordWithCachedGroupsAlias(self, recordType, alias): #@UnusedVariable
"""
@param recordType: the type of the record to look up.
@param alias: the cached-groups alias of the record to look up.
@@ -441,7 +442,7 @@
return succeed(yieldMatches(recordType))
- def getGroups(self, guids):
+ def getGroups(self, guids): #@UnusedVariable
"""
This implementation returns all groups, not just the ones specified
by guids
@@ -624,6 +625,7 @@
self.expireSeconds = expireSeconds
self.lockSeconds = lockSeconds
+
def setGroupsFor(self, guid, memberships):
self.log.debug("set groups-for %s : %s" % (guid, memberships))
return self.set("groups-for:%s" %
@@ -671,7 +673,6 @@
return self.add("group-cacher-lock", "1", expireTime=self.lockSeconds)
-
def extendLock(self):
"""
Update the expiration time of the memcached lock
@@ -690,6 +691,7 @@
return self.delete("group-cacher-lock")
+
class GroupMembershipCacheUpdater(object):
"""
Responsible for updating memcached with group memberships. This will run
@@ -1034,6 +1036,7 @@
returnValue((fast, len(members), len(changedMembers)))
+
def diffAssignments(old, new):
"""
Compare two proxy assignment lists and return their differences in the form of
@@ -1087,7 +1090,7 @@
self, service, recordType, guid=None,
shortNames=(), authIDs=set(), fullName=None,
firstName=None, lastName=None, emailAddresses=set(),
- calendarUserAddresses=set(),
+ calendarUserAddresses=set(), #@UnusedVariable
autoSchedule=False, autoScheduleMode=None,
autoAcceptGroup="",
enabledForCalendaring=None,
@@ -1325,7 +1328,7 @@
return set()
- def verifyCredentials(self, credentials):
+ def verifyCredentials(self, credentials): #@UnusedVariable
return False
@@ -1513,7 +1516,35 @@
return self.service.isProxyFor(self, other)
+ def attendee(self, params=None):
+ """
+ Returns a pycalendar ATTENDEE property for this record.
+ @param groupUIDs: group uids for the MEMBER parameter of returned property
+ @type organizer: C{List}
+
+ @return: the attendee property
+ @rtype: C{Property}
+ """
+ if params is None:
+ params = {}
+ if "PARTSTAT" not in params:
+ params["PARTSTAT"] = "NEEDS-ACTION"
+ if "CN"not in params:
+ if self.fullName():
+ params["CN"] = self.fullName()
+ if "EMAIL" not in params:
+ if self.emailAddresses():
+ params["EMAIL"] = self.emailAddresses()[0]
+ if "CUTYPE" not in params:
+ cuType = self.getCUType()
+ if cuType is not "INDIVIDUAL":
+ params["CUTYPE"] = cuType
+
+ return Property("ATTENDEE", "urn:uuid:" + self.uid(), params=params)
+
+
+
class DirectoryError(RuntimeError):
"""
Generic directory error.
Modified: CalendarServer/branches/users/sagen/groupcacher/txdav/caldav/icalendardirectoryservice.py
===================================================================
--- CalendarServer/branches/users/sagen/groupcacher/txdav/caldav/icalendardirectoryservice.py 2013-11-10 03:40:56 UTC (rev 11932)
+++ CalendarServer/branches/users/sagen/groupcacher/txdav/caldav/icalendardirectoryservice.py 2013-11-12 00:22:37 UTC (rev 11933)
@@ -133,3 +133,14 @@
@return: C{True} if it is a proxy.
@rtype: C{bool}
"""
+
+ def attendee(groupUIDs=None): #@NoSelf
+ """
+ Returns a pycalendar ATTENDEE property for this record.
+
+ @param groupUIDs: group uids for the MEMBER parameter of returned property
+ @type organizer: C{List}
+
+ @return: the attendee property
+ @rtype: C{Property}
+ """
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140312/e326f844/attachment.html>
More information about the calendarserver-changes
mailing list