[CalendarServer-changes] [14462] CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav
source_changes at macosforge.org
source_changes at macosforge.org
Fri Feb 20 10:41:05 PST 2015
Revision: 14462
http://trac.calendarserver.org//changeset/14462
Author: cdaboo at apple.com
Date: 2015-02-20 10:41:05 -0800 (Fri, 20 Feb 2015)
Log Message:
-----------
Re-factor various classes into separate files to cut-down on the size of sql.py's.
Modified Paths:
--------------
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql_directory.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql_external.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/test/test_attachments.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/carddav/datastore/sql.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/file.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/test/test_sql.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/upgrade/sql/upgrades/calendar_upgrade_from_2_to_3.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/icommondatastore.py
Added Paths:
-----------
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql_attachment.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_notification.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_sharing.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_util.py
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql.py 2015-02-20 18:06:03 UTC (rev 14461)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql.py 2015-02-20 18:41:05 UTC (rev 14462)
@@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
##
+from txdav.caldav.datastore.sql_attachment import Attachment, DropBoxAttachment, \
+ AttachmentLink, ManagedAttachment
"""
@@ -32,13 +34,11 @@
from twext.enterprise.locking import NamedLock
from twext.enterprise.jobqueue import WorkItem, AggregatedWorkItem, \
WORK_PRIORITY_LOW, WORK_WEIGHT_5, WORK_WEIGHT_3
-from twext.enterprise.util import parseSQLTimestamp
from twext.python.clsprop import classproperty
-from twext.python.filepath import CachingFilePath
from twext.python.log import Logger
from twext.who.idirectory import RecordType
from twistedcaldav.ical import Component as VComponent
-from txweb2.http_headers import MimeType, generateContentType
+from txweb2.http_headers import MimeType
from txweb2.stream import readStream
from twisted.internet.defer import inlineCallbacks, returnValue, succeed
@@ -48,7 +48,7 @@
from twistedcaldav import customxml, ical
from twistedcaldav.stdconfig import config
from twistedcaldav.datafilters.peruserdata import PerUserDataFilter
-from twistedcaldav.dateops import normalizeForIndex, datetimeMktime, \
+from twistedcaldav.dateops import normalizeForIndex, \
pyCalendarTodatetime, parseSQLDateToPyCalendar
from twistedcaldav.ical import Component, InvalidICalendarDataError, Property
from twistedcaldav.instance import InvalidOverriddenInstanceError
@@ -65,14 +65,11 @@
from txdav.caldav.datastore.scheduling.implicit import ImplicitScheduler
from txdav.caldav.datastore.scheduling.utils import uidFromCalendarUserAddress
from txdav.caldav.datastore.sql_directory import GroupAttendeeRecord
-from txdav.caldav.datastore.util import AttachmentRetrievalTransport, \
- normalizationLookup
+from txdav.caldav.datastore.util import normalizationLookup
from txdav.caldav.datastore.util import CalendarObjectBase
-from txdav.caldav.datastore.util import StorageTransportBase
from txdav.caldav.datastore.util import dropboxIDFromCalendarObject
from txdav.caldav.icalendarstore import ICalendarHome, ICalendar, ICalendarObject, \
- IAttachment, AttachmentStoreFailed, AttachmentStoreValidManagedID, \
- AttachmentMigrationFailed, AttachmentDropboxNotAllowed, \
+ AttachmentStoreFailed, AttachmentStoreValidManagedID, \
TooManyAttendeesError, InvalidComponentTypeError, InvalidCalendarAccessError, \
ResourceDeletedError, \
AttendeeAllowedError, InvalidPerUserDataMerge, ComponentUpdateState, \
@@ -80,15 +77,15 @@
InvalidDefaultCalendar, \
InvalidAttachmentOperation, DuplicatePrivateCommentsError, \
TimeRangeUpperLimit, TimeRangeLowerLimit, InvalidSplit, \
- AttachmentSizeTooLarge, UnknownTimezone, SetComponentOptions
-from txdav.caldav.icalendarstore import QuotaExceeded
+ UnknownTimezone, SetComponentOptions
from txdav.common.datastore.sql import CommonHome, CommonHomeChild, \
- CommonObjectResource, ECALENDARTYPE, SharingInvitation
+ CommonObjectResource, ECALENDARTYPE
from txdav.common.datastore.sql_tables import _ATTACHMENTS_MODE_NONE, \
_ATTACHMENTS_MODE_READ, _ATTACHMENTS_MODE_WRITE, _BIND_MODE_DIRECT, \
_BIND_MODE_GROUP, _BIND_MODE_GROUP_READ, _BIND_MODE_GROUP_WRITE, \
_BIND_MODE_OWN, _BIND_MODE_READ, _BIND_MODE_WRITE, _BIND_STATUS_ACCEPTED, \
_TRANSP_OPAQUE, _TRANSP_TRANSPARENT, schema
+from txdav.common.datastore.sql_sharing import SharingInvitation
from txdav.common.icommondatastore import IndexedSearchException, \
InternalDataStoreError, HomeChildNameAlreadyExistsError, \
HomeChildNameNotAllowedError, ObjectResourceTooBigError, \
@@ -113,8 +110,6 @@
import collections
import datetime
import itertools
-import os
-import tempfile
import urllib
import uuid
@@ -4915,1157 +4910,6 @@
-class AttachmentStorageTransport(StorageTransportBase):
-
- _TEMPORARY_UPLOADS_DIRECTORY = "Temporary"
-
- def __init__(self, attachment, contentType, dispositionName, creating=False, migrating=False):
- super(AttachmentStorageTransport, self).__init__(
- attachment, contentType, dispositionName)
-
- fileDescriptor, fileName = self._temporaryFile()
- # Wrap the file descriptor in a file object we can write to
- self._file = os.fdopen(fileDescriptor, "w")
- self._path = CachingFilePath(fileName)
- self._hash = hashlib.md5()
- self._creating = creating
- self._migrating = migrating
-
- self._txn.postAbort(self.aborted)
-
-
- def _temporaryFile(self):
- """
- Returns a (file descriptor, absolute path) tuple for a temporary file within
- the Attachments/Temporary directory (creating the Temporary subdirectory
- if it doesn't exist). It is the caller's responsibility to remove the
- file.
- """
- attachmentRoot = self._txn._store.attachmentsPath
- tempUploadsPath = attachmentRoot.child(self._TEMPORARY_UPLOADS_DIRECTORY)
- if not tempUploadsPath.exists():
- tempUploadsPath.createDirectory()
- return tempfile.mkstemp(dir=tempUploadsPath.path)
-
-
- @property
- def _txn(self):
- return self._attachment._txn
-
-
- def aborted(self):
- """
- Transaction aborted - clean up temp files.
- """
- if self._path.exists():
- self._path.remove()
-
-
- def write(self, data):
- if isinstance(data, buffer):
- data = str(data)
- self._file.write(data)
- self._hash.update(data)
-
-
- @inlineCallbacks
- def loseConnection(self):
- """
- Note that when self._migrating is set we only care about the data and don't need to
- do any quota checks/adjustments.
- """
-
- # FIXME: this should be synchronously accessible; IAttachment should
- # have a method for getting its parent just as CalendarObject/Calendar
- # do.
-
- # FIXME: If this method isn't called, the transaction should be
- # prevented from committing successfully. It's not valid to have an
- # attachment that doesn't point to a real file.
-
- home = (yield self._txn.calendarHomeWithResourceID(self._attachment._ownerHomeID))
-
- oldSize = self._attachment.size()
- newSize = self._file.tell()
- self._file.close()
-
- # Check max size for attachment
- if not self._migrating and newSize > config.MaximumAttachmentSize:
- self._path.remove()
- if self._creating:
- yield self._attachment._internalRemove()
- raise AttachmentSizeTooLarge()
-
- # Check overall user quota
- if not self._migrating:
- allowed = home.quotaAllowedBytes()
- if allowed is not None and allowed < ((yield home.quotaUsedBytes())
- + (newSize - oldSize)):
- self._path.remove()
- if self._creating:
- yield self._attachment._internalRemove()
- raise QuotaExceeded()
-
- self._path.moveTo(self._attachment._path)
-
- yield self._attachment.changed(
- self._contentType,
- self._dispositionName,
- self._hash.hexdigest(),
- newSize
- )
-
- if not self._migrating and home:
- # Adjust quota
- yield home.adjustQuotaUsedBytes(self._attachment.size() - oldSize)
-
- # Send change notification to home
- yield home.notifyChanged()
-
-
-
-def sqltime(value):
- return datetimeMktime(parseSQLTimestamp(value))
-
-
-
-class AttachmentLink(object):
- """
- A binding between an L{Attachment} and an L{CalendarObject}.
- """
-
- _attachmentSchema = schema.ATTACHMENT
- _attachmentLinkSchema = schema.ATTACHMENT_CALENDAR_OBJECT
-
- @classmethod
- def makeClass(cls, txn, linkData):
- """
- Given the various database rows, build the actual class.
-
- @param objectData: the standard set of object columns
- @type objectData: C{list}
-
- @return: the constructed child class
- @rtype: L{CommonHomeChild}
- """
-
- child = cls(txn)
- for attr, value in zip(child._rowAttributes(), linkData):
- setattr(child, attr, value)
- return child
-
-
- @classmethod
- def _allColumns(cls):
- """
- Full set of columns in the object table that need to be loaded to
- initialize the object resource state.
- """
- aco = cls._attachmentLinkSchema
- return [
- aco.ATTACHMENT_ID,
- aco.MANAGED_ID,
- aco.CALENDAR_OBJECT_RESOURCE_ID,
- ]
-
-
- @classmethod
- def _rowAttributes(cls):
- """
- Object attributes used to store the column values from L{_allColumns}. This used to create
- a mapping when serializing the object for cross-pod requests.
- """
- return (
- "_attachmentID",
- "_managedID",
- "_calendarObjectID",
- )
-
-
- @classmethod
- @inlineCallbacks
- def linksForHome(cls, home):
- """
- Load all attachment<->calendar object mappings for the specified home collection.
- """
-
- # Load from the main table first
- att = cls._attachmentSchema
- attco = cls._attachmentLinkSchema
- dataRows = yield Select(
- cls._allColumns(),
- From=attco.join(att, on=(attco.ATTACHMENT_ID == att.ATTACHMENT_ID)),
- Where=att.CALENDAR_HOME_RESOURCE_ID == home.id(),
- ).on(home._txn)
-
- # Create the actual objects
- returnValue([cls.makeClass(home._txn, row) for row in dataRows])
-
-
- def __init__(self, txn):
- self._txn = txn
- for attr in self._rowAttributes():
- setattr(self, attr, None)
-
-
- def serialize(self):
- """
- Create a dictionary mapping key attributes so this object can be sent over a cross-pod call
- and reconstituted at the other end. Note that the other end may have a different schema so
- the attributes may not match exactly and will need to be processed accordingly.
- """
- return dict([(attr[1:], getattr(self, attr, None)) for attr in self._rowAttributes()])
-
-
- @classmethod
- def deserialize(cls, txn, mapping):
- """
- Given a mapping generated by L{serialize}, convert the values into an array of database
- like items that conforms to the ordering of L{_allColumns} so it can be fed into L{makeClass}.
- Note that there may be a schema mismatch with the external data, so treat missing items as
- C{None} and ignore extra items.
- """
-
- return cls.makeClass(txn, [mapping.get(row[1:]) for row in cls._rowAttributes()])
-
-
- def insert(self):
- """
- Insert the object.
- """
-
- row = dict([(column, getattr(self, attr)) for column, attr in itertools.izip(self._allColumns(), self._rowAttributes())])
- return Insert(row).on(self._txn)
-
-
-
-class Attachment(object):
-
- implements(IAttachment)
-
- _attachmentSchema = schema.ATTACHMENT
- _attachmentLinkSchema = schema.ATTACHMENT_CALENDAR_OBJECT
-
- @classmethod
- def makeClass(cls, txn, attachmentData):
- """
- Given the various database rows, build the actual class.
-
- @param attachmentData: the standard set of attachment columns
- @type attachmentData: C{list}
-
- @return: the constructed child class
- @rtype: L{Attachment}
- """
-
- att = cls._attachmentSchema
- dropbox_id = attachmentData[cls._allColumns().index(att.DROPBOX_ID)]
- c = ManagedAttachment if dropbox_id == "." else DropBoxAttachment
- child = c(
- txn,
- attachmentData[cls._allColumns().index(att.ATTACHMENT_ID)],
- attachmentData[cls._allColumns().index(att.DROPBOX_ID)],
- attachmentData[cls._allColumns().index(att.PATH)],
- )
-
- for attr, value in zip(child._rowAttributes(), attachmentData):
- setattr(child, attr, value)
- child._contentType = MimeType.fromString(child._contentType)
-
- return child
-
-
- @classmethod
- def _allColumns(cls):
- """
- Full set of columns in the object table that need to be loaded to
- initialize the object resource state.
- """
- att = cls._attachmentSchema
- return [
- att.ATTACHMENT_ID,
- att.DROPBOX_ID,
- att.CALENDAR_HOME_RESOURCE_ID,
- att.CONTENT_TYPE,
- att.SIZE,
- att.MD5,
- att.CREATED,
- att.MODIFIED,
- att.PATH,
- ]
-
-
- @classmethod
- def _rowAttributes(cls):
- """
- Object attributes used to store the column values from L{_allColumns}. This used to create
- a mapping when serializing the object for cross-pod requests.
- """
- return (
- "_attachmentID",
- "_dropboxID",
- "_ownerHomeID",
- "_contentType",
- "_size",
- "_md5",
- "_created",
- "_modified",
- "_name",
- )
-
-
- @classmethod
- @inlineCallbacks
- def loadAllAttachments(cls, home):
- """
- Load all attachments assigned to the specified home collection. This should only be
- used when sync'ing an entire home's set of attachments.
- """
-
- results = []
-
- # Load from the main table first
- att = cls._attachmentSchema
- dataRows = yield Select(
- cls._allColumns(),
- From=att,
- Where=att.CALENDAR_HOME_RESOURCE_ID == home.id(),
- ).on(home._txn)
-
- # Create the actual objects
- for row in dataRows:
- child = cls.makeClass(home._txn, row)
- results.append(child)
-
- returnValue(results)
-
-
- @classmethod
- @inlineCallbacks
- def loadAttachmentByID(cls, home, id):
- """
- Load one attachments assigned to the specified home collection. This should only be
- used when sync'ing an entire home's set of attachments.
- """
-
- # Load from the main table first
- att = cls._attachmentSchema
- rows = yield Select(
- cls._allColumns(),
- From=att,
- Where=(att.CALENDAR_HOME_RESOURCE_ID == home.id()).And(
- att.ATTACHMENT_ID == id),
- ).on(home._txn)
-
- # Create the actual object
- returnValue(cls.makeClass(home._txn, rows[0]) if len(rows) == 1 else None)
-
-
- def serialize(self):
- """
- Create a dictionary mapping key attributes so this object can be sent over a cross-pod call
- and reconstituted at the other end. Note that the other end may have a different schema so
- the attributes may not match exactly and will need to be processed accordingly.
- """
- result = dict([(attr[1:], getattr(self, attr, None)) for attr in self._rowAttributes()])
- result["contentType"] = generateContentType(result["contentType"])
- return result
-
-
- @classmethod
- def deserialize(cls, txn, mapping):
- """
- Given a mapping generated by L{serialize}, convert the values into an array of database
- like items that conforms to the ordering of L{_allColumns} so it can be fed into L{makeClass}.
- Note that there may be a schema mismatch with the external data, so treat missing items as
- C{None} and ignore extra items.
- """
-
- return cls.makeClass(txn, [mapping.get(row[1:]) for row in cls._rowAttributes()])
-
-
- def __init__(self, txn, a_id, dropboxID, name, ownerHomeID=None, justCreated=False):
- self._txn = txn
- self._attachmentID = a_id
- self._ownerHomeID = ownerHomeID
- self._dropboxID = dropboxID
- self._contentType = None
- self._size = 0
- self._md5 = None
- self._created = None
- self._modified = None
- self._name = name
- self._justCreated = justCreated
-
-
- def __repr__(self):
- return (
- "<{self.__class__.__name__}: {self._attachmentID}>"
- .format(self=self)
- )
-
-
- def _attachmentPathRoot(self):
- return self._txn._store.attachmentsPath
-
-
- @inlineCallbacks
- def initFromStore(self):
- """
- Execute necessary SQL queries to retrieve attributes.
-
- @return: C{True} if this attachment exists, C{False} otherwise.
- """
- att = self._attachmentSchema
- if self._dropboxID and self._dropboxID != ".":
- where = (att.DROPBOX_ID == self._dropboxID).And(
- att.PATH == self._name)
- else:
- where = (att.ATTACHMENT_ID == self._attachmentID)
- rows = (yield Select(
- self._allColumns(),
- From=att,
- Where=where
- ).on(self._txn))
-
- if not rows:
- returnValue(None)
-
- for attr, value in zip(self._rowAttributes(), rows[0]):
- setattr(self, attr, value)
- self._contentType = MimeType.fromString(self._contentType)
-
- returnValue(self)
-
-
- def copyRemote(self, remote):
- """
- Copy properties from a remote (external) attachment that is being migrated.
-
- @param remote: the external attachment
- @type remote: L{Attachment}
- """
- return self.changed(remote.contentType(), remote.name(), remote.md5(), remote.size())
-
-
- def id(self):
- return self._attachmentID
-
-
- def dropboxID(self):
- return self._dropboxID
-
-
- def isManaged(self):
- return self._dropboxID == "."
-
-
- def name(self):
- return self._name
-
-
- def properties(self):
- pass # stub
-
-
- def store(self, contentType, dispositionName=None, migrating=False):
- if not self._name:
- self._name = dispositionName
- return AttachmentStorageTransport(self, contentType, dispositionName, self._justCreated, migrating=migrating)
-
-
- def retrieve(self, protocol):
- return AttachmentRetrievalTransport(self._path).start(protocol)
-
-
- def changed(self, contentType, dispositionName, md5, size):
- raise NotImplementedError
-
- _removeStatement = Delete(
- From=schema.ATTACHMENT,
- Where=(schema.ATTACHMENT.ATTACHMENT_ID == Parameter("attachmentID"))
- )
-
-
- @inlineCallbacks
- def remove(self, adjustQuota=True):
- oldSize = self._size
- self._txn.postCommit(self.removePaths)
- yield self._internalRemove()
-
- # Adjust quota
- if adjustQuota:
- home = (yield self._txn.calendarHomeWithResourceID(self._ownerHomeID))
- if home:
- yield home.adjustQuotaUsedBytes(-oldSize)
-
- # Send change notification to home
- yield home.notifyChanged()
-
-
- def removePaths(self):
- """
- Remove the actual file and up to attachment parent directory if empty.
- """
- self._path.remove()
- self.removeParentPaths()
-
-
- def removeParentPaths(self):
- """
- Remove up to attachment parent directory if empty.
- """
- parent = self._path.parent()
- toppath = self._attachmentPathRoot().path
- while parent.path != toppath:
- if len(parent.listdir()) == 0:
- parent.remove()
- parent = parent.parent()
- else:
- break
-
-
- def _internalRemove(self):
- """
- Just delete the row; don't do any accounting / bookkeeping. (This is
- for attachments that have failed to be created due to errors during
- storage.)
- """
- return self._removeStatement.on(self._txn, attachmentID=self._attachmentID)
-
-
- @classmethod
- @inlineCallbacks
- def removedHome(cls, txn, homeID):
- """
- A calendar home is being removed so all of its attachments must go too. When removing,
- we don't care about quota adjustment as there will be no quota once the home is removed.
-
- TODO: this needs to be transactional wrt the actual file deletes.
- """
- att = cls._attachmentSchema
- attco = cls._attachmentLinkSchema
-
- rows = (yield Select(
- [att.ATTACHMENT_ID, att.DROPBOX_ID, ],
- From=att,
- Where=(
- att.CALENDAR_HOME_RESOURCE_ID == homeID
- ),
- ).on(txn))
-
- for attachmentID, dropboxID in rows:
- if dropboxID != ".":
- attachment = DropBoxAttachment(txn, attachmentID, None, None)
- else:
- attachment = ManagedAttachment(txn, attachmentID, None, None)
- attachment = (yield attachment.initFromStore())
- if attachment._path.exists():
- attachment.removePaths()
-
- yield Delete(
- From=attco,
- Where=(
- attco.ATTACHMENT_ID.In(Select(
- [att.ATTACHMENT_ID, ],
- From=att,
- Where=(
- att.CALENDAR_HOME_RESOURCE_ID == homeID
- ),
- ))
- ),
- ).on(txn)
-
- yield Delete(
- From=att,
- Where=(
- att.CALENDAR_HOME_RESOURCE_ID == homeID
- ),
- ).on(txn)
-
-
- # IDataStoreObject
- def contentType(self):
- return self._contentType
-
-
- def md5(self):
- return self._md5
-
-
- def size(self):
- return self._size
-
-
- def created(self):
- return self._created
-
-
- def modified(self):
- return self._modified
-
-
-
-class DropBoxAttachment(Attachment):
-
- @classmethod
- @inlineCallbacks
- def create(cls, txn, dropboxID, name, ownerHomeID):
- """
- Create a new Attachment object.
-
- @param txn: The transaction to use
- @type txn: L{CommonStoreTransaction}
- @param dropboxID: the identifier for the attachment (dropbox id or managed id)
- @type dropboxID: C{str}
- @param name: the name of the attachment
- @type name: C{str}
- @param ownerHomeID: the resource-id of the home collection of the attachment owner
- @type ownerHomeID: C{int}
- """
-
- # If store has already migrated to managed attachments we will prevent creation of dropbox attachments
- dropbox = (yield txn.store().dropboxAllowed(txn))
- if not dropbox:
- raise AttachmentDropboxNotAllowed
-
- # Now create the DB entry
- att = cls._attachmentSchema
- rows = (yield Insert({
- att.CALENDAR_HOME_RESOURCE_ID : ownerHomeID,
- att.DROPBOX_ID : dropboxID,
- att.CONTENT_TYPE : "",
- att.SIZE : 0,
- att.MD5 : "",
- att.PATH : name,
- }, Return=(att.ATTACHMENT_ID, att.CREATED, att.MODIFIED)).on(txn))
-
- row_iter = iter(rows[0])
- a_id = row_iter.next()
- created = sqltime(row_iter.next())
- modified = sqltime(row_iter.next())
-
- attachment = cls(txn, a_id, dropboxID, name, ownerHomeID, True)
- attachment._created = created
- attachment._modified = modified
-
- # File system paths need to exist
- try:
- attachment._path.parent().makedirs()
- except:
- pass
-
- returnValue(attachment)
-
-
- @classmethod
- @inlineCallbacks
- def load(cls, txn, dropboxID, name):
- attachment = cls(txn, None, dropboxID, name)
- attachment = (yield attachment.initFromStore())
- returnValue(attachment)
-
-
- @property
- def _path(self):
- # Use directory hashing scheme based on MD5 of dropboxID
- hasheduid = hashlib.md5(self._dropboxID).hexdigest()
- attachmentRoot = self._attachmentPathRoot().child(hasheduid[0:2]).child(hasheduid[2:4]).child(hasheduid)
- return attachmentRoot.child(self.name())
-
-
- @classmethod
- @inlineCallbacks
- def resourceRemoved(cls, txn, resourceID, dropboxID):
- """
- Remove all attachments referencing the specified resource.
- """
-
- # See if any other resources still reference this dropbox ID
- co = CalendarObject._objectSchema
- rows = (yield Select(
- [co.RESOURCE_ID, ],
- From=co,
- Where=(co.DROPBOX_ID == dropboxID).And(
- co.RESOURCE_ID != resourceID)
- ).on(txn))
-
- if not rows:
- # Find each attachment with matching dropbox ID
- att = cls._attachmentSchema
- rows = (yield Select(
- [att.PATH],
- From=att,
- Where=(att.DROPBOX_ID == dropboxID)
- ).on(txn))
- for name in rows:
- name = name[0]
- attachment = yield cls.load(txn, dropboxID, name)
- yield attachment.remove()
-
-
- @inlineCallbacks
- def changed(self, contentType, dispositionName, md5, size):
- """
- Dropbox attachments never change their path - ignore dispositionName.
- """
-
- self._contentType = contentType
- self._md5 = md5
- self._size = size
-
- att = self._attachmentSchema
- self._created, self._modified = map(
- sqltime,
- (yield Update(
- {
- att.CONTENT_TYPE : generateContentType(self._contentType),
- att.SIZE : self._size,
- att.MD5 : self._md5,
- att.MODIFIED : utcNowSQL,
- },
- Where=(att.ATTACHMENT_ID == self._attachmentID),
- Return=(att.CREATED, att.MODIFIED)).on(self._txn))[0]
- )
-
-
- @inlineCallbacks
- def convertToManaged(self):
- """
- Convert this dropbox attachment into a managed attachment by updating the
- database and returning a new ManagedAttachment object that does not reference
- any calendar object. Referencing will be added later.
-
- @return: the managed attachment object
- @rtype: L{ManagedAttachment}
- """
-
- # Change the DROPBOX_ID to a single "." to indicate a managed attachment.
- att = self._attachmentSchema
- (yield Update(
- {att.DROPBOX_ID : ".", },
- Where=(att.ATTACHMENT_ID == self._attachmentID),
- ).on(self._txn))
-
- # Create an "orphaned" ManagedAttachment that points to the updated data but without
- # an actual managed-id (which only exists when there is a reference to a calendar object).
- mattach = (yield ManagedAttachment.load(self._txn, None, None, attachmentID=self._attachmentID))
- mattach._managedID = str(uuid.uuid4())
- if mattach is None:
- raise AttachmentMigrationFailed
-
- # Then move the file on disk from the old path to the new one
- try:
- mattach._path.parent().makedirs()
- except Exception:
- # OK to fail if it already exists, otherwise must raise
- if not mattach._path.parent().exists():
- raise
- oldpath = self._path
- newpath = mattach._path
- oldpath.moveTo(newpath)
- self.removeParentPaths()
-
- returnValue(mattach)
-
-
-
-class ManagedAttachment(Attachment):
- """
- Managed attachments are ones that the server is in total control of. Clients do POSTs on calendar objects
- to store the attachment data and have ATTACH properties added, updated or remove from the calendar objects.
- Each ATTACH property in a calendar object has a MANAGED-ID iCalendar parameter that is used in the POST requests
- to target a specific attachment. The MANAGED-ID values are unique to each calendar object resource, though
- multiple calendar object resources can point to the same underlying attachment as there is a separate database
- table that maps calendar objects/managed-ids to actual attachments.
- """
-
- @classmethod
- @inlineCallbacks
- def _create(cls, txn, managedID, ownerHomeID):
- """
- Create a new managed Attachment object.
-
- @param txn: The transaction to use
- @type txn: L{CommonStoreTransaction}
- @param managedID: the identifier for the attachment
- @type managedID: C{str}
- @param ownerHomeID: the resource-id of the home collection of the attachment owner
- @type ownerHomeID: C{int}
- """
-
- # Now create the DB entry
- att = cls._attachmentSchema
- rows = (yield Insert({
- att.CALENDAR_HOME_RESOURCE_ID : ownerHomeID,
- att.DROPBOX_ID : ".",
- att.CONTENT_TYPE : "",
- att.SIZE : 0,
- att.MD5 : "",
- att.PATH : "",
- }, Return=(att.ATTACHMENT_ID, att.CREATED, att.MODIFIED)).on(txn))
-
- row_iter = iter(rows[0])
- a_id = row_iter.next()
- created = sqltime(row_iter.next())
- modified = sqltime(row_iter.next())
-
- attachment = cls(txn, a_id, ".", None, ownerHomeID, True)
- attachment._managedID = managedID
- attachment._created = created
- attachment._modified = modified
-
- # File system paths need to exist
- try:
- attachment._path.parent().makedirs()
- except:
- pass
-
- returnValue(attachment)
-
-
- @classmethod
- @inlineCallbacks
- def create(cls, txn, managedID, ownerHomeID, referencedBy):
- """
- Create a new Attachment object and reference it.
-
- @param txn: The transaction to use
- @type txn: L{CommonStoreTransaction}
- @param managedID: the identifier for the attachment
- @type managedID: C{str}
- @param ownerHomeID: the resource-id of the home collection of the attachment owner
- @type ownerHomeID: C{int}
- @param referencedBy: the resource-id of the calendar object referencing the attachment
- @type referencedBy: C{int}
- """
-
- # Now create the DB entry
- attachment = (yield cls._create(txn, managedID, ownerHomeID))
- attachment._objectResourceID = referencedBy
-
- # Create the attachment<->calendar object relationship for managed attachments
- attco = cls._attachmentLinkSchema
- yield Insert({
- attco.ATTACHMENT_ID : attachment._attachmentID,
- attco.MANAGED_ID : attachment._managedID,
- attco.CALENDAR_OBJECT_RESOURCE_ID : attachment._objectResourceID,
- }).on(txn)
-
- returnValue(attachment)
-
-
- @classmethod
- @inlineCallbacks
- def update(cls, txn, oldManagedID, ownerHomeID, referencedBy, oldAttachmentID):
- """
- Update an Attachment object. This creates a new one and adjusts the reference to the old
- one to point to the new one. If the old one is no longer referenced at all, it is deleted.
-
- @param txn: The transaction to use
- @type txn: L{CommonStoreTransaction}
- @param oldManagedID: the identifier for the original attachment
- @type oldManagedID: C{str}
- @param ownerHomeID: the resource-id of the home collection of the attachment owner
- @type ownerHomeID: C{int}
- @param referencedBy: the resource-id of the calendar object referencing the attachment
- @type referencedBy: C{int}
- @param oldAttachmentID: the attachment-id of the existing attachment being updated
- @type oldAttachmentID: C{int}
- """
-
- # Now create the DB entry with a new managed-ID
- managed_id = str(uuid.uuid4())
- attachment = (yield cls._create(txn, managed_id, ownerHomeID))
- attachment._objectResourceID = referencedBy
-
- # Update the attachment<->calendar object relationship for managed attachments
- attco = cls._attachmentLinkSchema
- yield Update(
- {
- attco.ATTACHMENT_ID : attachment._attachmentID,
- attco.MANAGED_ID : attachment._managedID,
- },
- Where=(attco.MANAGED_ID == oldManagedID).And(
- attco.CALENDAR_OBJECT_RESOURCE_ID == attachment._objectResourceID
- ),
- ).on(txn)
-
- # Now check whether old attachmentID is still referenced - if not delete it
- rows = (yield Select(
- [attco.ATTACHMENT_ID, ],
- From=attco,
- Where=(attco.ATTACHMENT_ID == oldAttachmentID),
- ).on(txn))
- aids = [row[0] for row in rows] if rows is not None else ()
- if len(aids) == 0:
- oldattachment = ManagedAttachment(txn, oldAttachmentID, None, None)
- oldattachment = (yield oldattachment.initFromStore())
- yield oldattachment.remove()
-
- returnValue(attachment)
-
-
- @classmethod
- @inlineCallbacks
- def load(cls, txn, referencedID, managedID, attachmentID=None):
- """
- Load a ManagedAttachment via either its managedID or attachmentID.
- """
-
- if managedID:
- attco = cls._attachmentLinkSchema
- where = (attco.MANAGED_ID == managedID)
- if referencedID is not None:
- where = where.And(attco.CALENDAR_OBJECT_RESOURCE_ID == referencedID)
- rows = (yield Select(
- [attco.ATTACHMENT_ID, ],
- From=attco,
- Where=where,
- ).on(txn))
- if len(rows) == 0:
- returnValue(None)
- elif referencedID is not None and len(rows) != 1:
- raise AttachmentStoreValidManagedID
- attachmentID = rows[0][0]
-
- attachment = cls(txn, attachmentID, None, None)
- attachment = (yield attachment.initFromStore())
- attachment._managedID = managedID
- attachment._objectResourceID = referencedID
- returnValue(attachment)
-
-
- @classmethod
- @inlineCallbacks
- def referencesTo(cls, txn, managedID):
- """
- Find all the calendar object resourceIds referenced by this supplied managed-id.
- """
- attco = cls._attachmentLinkSchema
- rows = (yield Select(
- [attco.CALENDAR_OBJECT_RESOURCE_ID, ],
- From=attco,
- Where=(attco.MANAGED_ID == managedID),
- ).on(txn))
- cobjs = set([row[0] for row in rows]) if rows is not None else set()
- returnValue(cobjs)
-
-
- @classmethod
- @inlineCallbacks
- def usedManagedID(cls, txn, managedID):
- """
- Return the "owner" home and referencing resource is, and UID for a managed-id.
- """
- att = cls._attachmentSchema
- attco = cls._attachmentLinkSchema
- co = CalendarObject._objectSchema
- rows = (yield Select(
- [
- att.CALENDAR_HOME_RESOURCE_ID,
- attco.CALENDAR_OBJECT_RESOURCE_ID,
- co.ICALENDAR_UID,
- ],
- From=att.join(
- attco, att.ATTACHMENT_ID == attco.ATTACHMENT_ID, "left outer"
- ).join(co, co.RESOURCE_ID == attco.CALENDAR_OBJECT_RESOURCE_ID),
- Where=(attco.MANAGED_ID == managedID),
- ).on(txn))
- returnValue(rows)
-
-
- @classmethod
- @inlineCallbacks
- def resourceRemoved(cls, txn, resourceID):
- """
- Remove all attachments referencing the specified resource.
- """
-
- # Find all reference attachment-ids and dereference
- attco = cls._attachmentLinkSchema
- rows = (yield Select(
- [attco.MANAGED_ID, ],
- From=attco,
- Where=(attco.CALENDAR_OBJECT_RESOURCE_ID == resourceID),
- ).on(txn))
- mids = set([row[0] for row in rows]) if rows is not None else set()
- for managedID in mids:
- attachment = (yield ManagedAttachment.load(txn, resourceID, managedID))
- (yield attachment.removeFromResource(resourceID))
-
-
- @classmethod
- @inlineCallbacks
- def copyManagedID(cls, txn, managedID, referencedBy):
- """
- Associate an existing attachment with the new resource.
- """
-
- # Find the associated attachment-id and insert new reference
- attco = cls._attachmentLinkSchema
- aid = (yield Select(
- [attco.ATTACHMENT_ID, ],
- From=attco,
- Where=(attco.MANAGED_ID == managedID),
- ).on(txn))[0][0]
-
- yield Insert({
- attco.ATTACHMENT_ID : aid,
- attco.MANAGED_ID : managedID,
- attco.CALENDAR_OBJECT_RESOURCE_ID : referencedBy,
- }).on(txn)
-
-
- def managedID(self):
- return self._managedID
-
-
- @inlineCallbacks
- def objectResource(self):
- """
- Return the calendar object resource associated with this attachment.
- """
-
- home = (yield self._txn.calendarHomeWithResourceID(self._ownerHomeID))
- obj = (yield home.objectResourceWithID(self._objectResourceID))
- returnValue(obj)
-
-
- @property
- def _path(self):
- # Use directory hashing scheme based on MD5 of attachmentID
- hasheduid = hashlib.md5(str(self._attachmentID)).hexdigest()
- return self._attachmentPathRoot().child(hasheduid[0:2]).child(hasheduid[2:4]).child(hasheduid)
-
-
- @inlineCallbacks
- def location(self):
- """
- Return the URI location of the attachment.
- """
- if not hasattr(self, "_ownerName"):
- home = (yield self._txn.calendarHomeWithResourceID(self._ownerHomeID))
- self._ownerName = home.name()
- if not hasattr(self, "_objectDropboxID"):
- if not hasattr(self, "_objectResource"):
- self._objectResource = (yield self.objectResource())
- self._objectDropboxID = self._objectResource._dropboxID
-
- fname = self.lastSegmentOfUriPath(self._managedID, self._name)
- location = self._txn._store.attachmentsURIPattern % {
- "home": self._ownerName,
- "dropbox_id": urllib.quote(self._objectDropboxID),
- "name": urllib.quote(fname),
- }
- returnValue(location)
-
-
- @classmethod
- def lastSegmentOfUriPath(cls, managed_id, name):
- splits = name.rsplit(".", 1)
- fname = splits[0]
- suffix = splits[1] if len(splits) == 2 else "unknown"
- return "{0}-{1}.{2}".format(fname, managed_id[:8], suffix)
-
-
- @inlineCallbacks
- def changed(self, contentType, dispositionName, md5, size):
- """
- Always update name to current disposition name.
- """
-
- self._contentType = contentType
- self._name = dispositionName
- self._md5 = md5
- self._size = size
- att = self._attachmentSchema
- self._created, self._modified = map(
- sqltime,
- (yield Update(
- {
- att.CONTENT_TYPE : generateContentType(self._contentType),
- att.SIZE : self._size,
- att.MD5 : self._md5,
- att.MODIFIED : utcNowSQL,
- att.PATH : self._name,
- },
- Where=(att.ATTACHMENT_ID == self._attachmentID),
- Return=(att.CREATED, att.MODIFIED)).on(self._txn))[0]
- )
-
-
- @inlineCallbacks
- def newReference(self, resourceID):
- """
- Create a new reference of this attachment to the supplied calendar object resource id, and
- return a ManagedAttachment for the new reference.
-
- @param resourceID: the resource id to reference
- @type resourceID: C{int}
-
- @return: the new managed attachment
- @rtype: L{ManagedAttachment}
- """
-
- attco = self._attachmentLinkSchema
- yield Insert({
- attco.ATTACHMENT_ID : self._attachmentID,
- attco.MANAGED_ID : self._managedID,
- attco.CALENDAR_OBJECT_RESOURCE_ID : resourceID,
- }).on(self._txn)
-
- mattach = (yield ManagedAttachment.load(self._txn, resourceID, self._managedID))
- returnValue(mattach)
-
-
- @inlineCallbacks
- def removeFromResource(self, resourceID):
-
- # Delete the reference
- attco = self._attachmentLinkSchema
- yield Delete(
- From=attco,
- Where=(attco.ATTACHMENT_ID == self._attachmentID).And(
- attco.CALENDAR_OBJECT_RESOURCE_ID == resourceID),
- ).on(self._txn)
-
- # References still exist - if not remove actual attachment
- rows = (yield Select(
- [attco.CALENDAR_OBJECT_RESOURCE_ID, ],
- From=attco,
- Where=(attco.ATTACHMENT_ID == self._attachmentID),
- ).on(self._txn))
- if len(rows) == 0:
- yield self.remove()
-
-
- @inlineCallbacks
- def attachProperty(self):
- """
- Return an iCalendar ATTACH property for this attachment.
- """
- attach = Property("ATTACH", "", valuetype=Value.VALUETYPE_URI)
- location = (yield self.updateProperty(attach))
- returnValue((attach, location,))
-
-
- @inlineCallbacks
- def updateProperty(self, attach):
- """
- Update an iCalendar ATTACH property for this attachment.
- """
-
- location = (yield self.location())
-
- attach.setParameter("MANAGED-ID", self.managedID())
- attach.setParameter("FMTTYPE", "{0}/{1}".format(self.contentType().mediaType, self.contentType().mediaSubtype))
- attach.setParameter("FILENAME", self.name())
- attach.setParameter("SIZE", str(self.size()))
- attach.setValue(location)
-
- returnValue(location)
-
# Hook-up class relationships at the end after they have all been defined
from txdav.caldav.datastore.sql_external import CalendarHomeExternal, CalendarExternal, CalendarObjectExternal
CalendarHome._externalClass = CalendarHomeExternal
Added: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql_attachment.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql_attachment.py (rev 0)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql_attachment.py 2015-02-20 18:41:05 UTC (rev 14462)
@@ -0,0 +1,1205 @@
+# -*- test-case-name: twext.enterprise.dal.test.test_record -*-
+##
+# Copyright (c) 2015 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from pycalendar.value import Value
+
+from twext.enterprise.dal.syntax import Select, Insert, Delete, Parameter, \
+ Update, utcNowSQL
+from twext.enterprise.util import parseSQLTimestamp
+from twext.python.filepath import CachingFilePath
+
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+from twistedcaldav.config import config
+from twistedcaldav.dateops import datetimeMktime
+from twistedcaldav.ical import Property
+
+from txdav.caldav.datastore.util import StorageTransportBase, \
+ AttachmentRetrievalTransport
+from txdav.caldav.icalendarstore import AttachmentSizeTooLarge, QuotaExceeded, \
+ IAttachment, AttachmentDropboxNotAllowed, AttachmentMigrationFailed, \
+ AttachmentStoreValidManagedID
+from txdav.common.datastore.sql_tables import schema
+
+from txweb2.http_headers import MimeType, generateContentType
+
+from zope.interface.declarations import implements
+
+import hashlib
+import itertools
+import os
+import tempfile
+import urllib
+import uuid
+
+"""
+Classes and methods that relate to CalDAV attachments in the SQL store.
+"""
+
+
+class AttachmentStorageTransport(StorageTransportBase):
+
+ _TEMPORARY_UPLOADS_DIRECTORY = "Temporary"
+
+ def __init__(self, attachment, contentType, dispositionName, creating=False, migrating=False):
+ super(AttachmentStorageTransport, self).__init__(
+ attachment, contentType, dispositionName)
+
+ fileDescriptor, fileName = self._temporaryFile()
+ # Wrap the file descriptor in a file object we can write to
+ self._file = os.fdopen(fileDescriptor, "w")
+ self._path = CachingFilePath(fileName)
+ self._hash = hashlib.md5()
+ self._creating = creating
+ self._migrating = migrating
+
+ self._txn.postAbort(self.aborted)
+
+
+ def _temporaryFile(self):
+ """
+ Returns a (file descriptor, absolute path) tuple for a temporary file within
+ the Attachments/Temporary directory (creating the Temporary subdirectory
+ if it doesn't exist). It is the caller's responsibility to remove the
+ file.
+ """
+ attachmentRoot = self._txn._store.attachmentsPath
+ tempUploadsPath = attachmentRoot.child(self._TEMPORARY_UPLOADS_DIRECTORY)
+ if not tempUploadsPath.exists():
+ tempUploadsPath.createDirectory()
+ return tempfile.mkstemp(dir=tempUploadsPath.path)
+
+
+ @property
+ def _txn(self):
+ return self._attachment._txn
+
+
+ def aborted(self):
+ """
+ Transaction aborted - clean up temp files.
+ """
+ if self._path.exists():
+ self._path.remove()
+
+
+ def write(self, data):
+ if isinstance(data, buffer):
+ data = str(data)
+ self._file.write(data)
+ self._hash.update(data)
+
+
+ @inlineCallbacks
+ def loseConnection(self):
+ """
+ Note that when self._migrating is set we only care about the data and don't need to
+ do any quota checks/adjustments.
+ """
+
+ # FIXME: this should be synchronously accessible; IAttachment should
+ # have a method for getting its parent just as CalendarObject/Calendar
+ # do.
+
+ # FIXME: If this method isn't called, the transaction should be
+ # prevented from committing successfully. It's not valid to have an
+ # attachment that doesn't point to a real file.
+
+ home = (yield self._txn.calendarHomeWithResourceID(self._attachment._ownerHomeID))
+
+ oldSize = self._attachment.size()
+ newSize = self._file.tell()
+ self._file.close()
+
+ # Check max size for attachment
+ if not self._migrating and newSize > config.MaximumAttachmentSize:
+ self._path.remove()
+ if self._creating:
+ yield self._attachment._internalRemove()
+ raise AttachmentSizeTooLarge()
+
+ # Check overall user quota
+ if not self._migrating:
+ allowed = home.quotaAllowedBytes()
+ if allowed is not None and allowed < ((yield home.quotaUsedBytes())
+ + (newSize - oldSize)):
+ self._path.remove()
+ if self._creating:
+ yield self._attachment._internalRemove()
+ raise QuotaExceeded()
+
+ self._path.moveTo(self._attachment._path)
+
+ yield self._attachment.changed(
+ self._contentType,
+ self._dispositionName,
+ self._hash.hexdigest(),
+ newSize
+ )
+
+ if not self._migrating and home:
+ # Adjust quota
+ yield home.adjustQuotaUsedBytes(self._attachment.size() - oldSize)
+
+ # Send change notification to home
+ yield home.notifyChanged()
+
+
+
+def sqltime(value):
+ return datetimeMktime(parseSQLTimestamp(value))
+
+
+
+class AttachmentLink(object):
+ """
+ A binding between an L{Attachment} and an L{CalendarObject}.
+ """
+
+ _attachmentSchema = schema.ATTACHMENT
+ _attachmentLinkSchema = schema.ATTACHMENT_CALENDAR_OBJECT
+
+ @classmethod
+ def makeClass(cls, txn, linkData):
+ """
+ Given the various database rows, build the actual class.
+
+ @param objectData: the standard set of object columns
+ @type objectData: C{list}
+
+ @return: the constructed child class
+ @rtype: L{CommonHomeChild}
+ """
+
+ child = cls(txn)
+ for attr, value in zip(child._rowAttributes(), linkData):
+ setattr(child, attr, value)
+ return child
+
+
+ @classmethod
+ def _allColumns(cls):
+ """
+ Full set of columns in the object table that need to be loaded to
+ initialize the object resource state.
+ """
+ aco = cls._attachmentLinkSchema
+ return [
+ aco.ATTACHMENT_ID,
+ aco.MANAGED_ID,
+ aco.CALENDAR_OBJECT_RESOURCE_ID,
+ ]
+
+
+ @classmethod
+ def _rowAttributes(cls):
+ """
+ Object attributes used to store the column values from L{_allColumns}. This used to create
+ a mapping when serializing the object for cross-pod requests.
+ """
+ return (
+ "_attachmentID",
+ "_managedID",
+ "_calendarObjectID",
+ )
+
+
+ @classmethod
+ @inlineCallbacks
+ def linksForHome(cls, home):
+ """
+ Load all attachment<->calendar object mappings for the specified home collection.
+ """
+
+ # Load from the main table first
+ att = cls._attachmentSchema
+ attco = cls._attachmentLinkSchema
+ dataRows = yield Select(
+ cls._allColumns(),
+ From=attco.join(att, on=(attco.ATTACHMENT_ID == att.ATTACHMENT_ID)),
+ Where=att.CALENDAR_HOME_RESOURCE_ID == home.id(),
+ ).on(home._txn)
+
+ # Create the actual objects
+ returnValue([cls.makeClass(home._txn, row) for row in dataRows])
+
+
+ def __init__(self, txn):
+ self._txn = txn
+ for attr in self._rowAttributes():
+ setattr(self, attr, None)
+
+
+ def serialize(self):
+ """
+ Create a dictionary mapping key attributes so this object can be sent over a cross-pod call
+ and reconstituted at the other end. Note that the other end may have a different schema so
+ the attributes may not match exactly and will need to be processed accordingly.
+ """
+ return dict([(attr[1:], getattr(self, attr, None)) for attr in self._rowAttributes()])
+
+
+ @classmethod
+ def deserialize(cls, txn, mapping):
+ """
+ Given a mapping generated by L{serialize}, convert the values into an array of database
+ like items that conforms to the ordering of L{_allColumns} so it can be fed into L{makeClass}.
+ Note that there may be a schema mismatch with the external data, so treat missing items as
+ C{None} and ignore extra items.
+ """
+
+ return cls.makeClass(txn, [mapping.get(row[1:]) for row in cls._rowAttributes()])
+
+
+ def insert(self):
+ """
+ Insert the object.
+ """
+
+ row = dict([(column, getattr(self, attr)) for column, attr in itertools.izip(self._allColumns(), self._rowAttributes())])
+ return Insert(row).on(self._txn)
+
+
+
+class Attachment(object):
+
+ implements(IAttachment)
+
+ _attachmentSchema = schema.ATTACHMENT
+ _attachmentLinkSchema = schema.ATTACHMENT_CALENDAR_OBJECT
+
+ @classmethod
+ def makeClass(cls, txn, attachmentData):
+ """
+ Given the various database rows, build the actual class.
+
+ @param attachmentData: the standard set of attachment columns
+ @type attachmentData: C{list}
+
+ @return: the constructed child class
+ @rtype: L{Attachment}
+ """
+
+ att = cls._attachmentSchema
+ dropbox_id = attachmentData[cls._allColumns().index(att.DROPBOX_ID)]
+ c = ManagedAttachment if dropbox_id == "." else DropBoxAttachment
+ child = c(
+ txn,
+ attachmentData[cls._allColumns().index(att.ATTACHMENT_ID)],
+ attachmentData[cls._allColumns().index(att.DROPBOX_ID)],
+ attachmentData[cls._allColumns().index(att.PATH)],
+ )
+
+ for attr, value in zip(child._rowAttributes(), attachmentData):
+ setattr(child, attr, value)
+ child._contentType = MimeType.fromString(child._contentType)
+
+ return child
+
+
+ @classmethod
+ def _allColumns(cls):
+ """
+ Full set of columns in the object table that need to be loaded to
+ initialize the object resource state.
+ """
+ att = cls._attachmentSchema
+ return [
+ att.ATTACHMENT_ID,
+ att.DROPBOX_ID,
+ att.CALENDAR_HOME_RESOURCE_ID,
+ att.CONTENT_TYPE,
+ att.SIZE,
+ att.MD5,
+ att.CREATED,
+ att.MODIFIED,
+ att.PATH,
+ ]
+
+
+ @classmethod
+ def _rowAttributes(cls):
+ """
+ Object attributes used to store the column values from L{_allColumns}. This used to create
+ a mapping when serializing the object for cross-pod requests.
+ """
+ return (
+ "_attachmentID",
+ "_dropboxID",
+ "_ownerHomeID",
+ "_contentType",
+ "_size",
+ "_md5",
+ "_created",
+ "_modified",
+ "_name",
+ )
+
+
+ @classmethod
+ @inlineCallbacks
+ def loadAllAttachments(cls, home):
+ """
+ Load all attachments assigned to the specified home collection. This should only be
+ used when sync'ing an entire home's set of attachments.
+ """
+
+ results = []
+
+ # Load from the main table first
+ att = cls._attachmentSchema
+ dataRows = yield Select(
+ cls._allColumns(),
+ From=att,
+ Where=att.CALENDAR_HOME_RESOURCE_ID == home.id(),
+ ).on(home._txn)
+
+ # Create the actual objects
+ for row in dataRows:
+ child = cls.makeClass(home._txn, row)
+ results.append(child)
+
+ returnValue(results)
+
+
+ @classmethod
+ @inlineCallbacks
+ def loadAttachmentByID(cls, home, id):
+ """
+ Load one attachments assigned to the specified home collection. This should only be
+ used when sync'ing an entire home's set of attachments.
+ """
+
+ # Load from the main table first
+ att = cls._attachmentSchema
+ rows = yield Select(
+ cls._allColumns(),
+ From=att,
+ Where=(att.CALENDAR_HOME_RESOURCE_ID == home.id()).And(
+ att.ATTACHMENT_ID == id),
+ ).on(home._txn)
+
+ # Create the actual object
+ returnValue(cls.makeClass(home._txn, rows[0]) if len(rows) == 1 else None)
+
+
+ def serialize(self):
+ """
+ Create a dictionary mapping key attributes so this object can be sent over a cross-pod call
+ and reconstituted at the other end. Note that the other end may have a different schema so
+ the attributes may not match exactly and will need to be processed accordingly.
+ """
+ result = dict([(attr[1:], getattr(self, attr, None)) for attr in self._rowAttributes()])
+ result["contentType"] = generateContentType(result["contentType"])
+ return result
+
+
+ @classmethod
+ def deserialize(cls, txn, mapping):
+ """
+ Given a mapping generated by L{serialize}, convert the values into an array of database
+ like items that conforms to the ordering of L{_allColumns} so it can be fed into L{makeClass}.
+ Note that there may be a schema mismatch with the external data, so treat missing items as
+ C{None} and ignore extra items.
+ """
+
+ return cls.makeClass(txn, [mapping.get(row[1:]) for row in cls._rowAttributes()])
+
+
+ def __init__(self, txn, a_id, dropboxID, name, ownerHomeID=None, justCreated=False):
+ self._txn = txn
+ self._attachmentID = a_id
+ self._ownerHomeID = ownerHomeID
+ self._dropboxID = dropboxID
+ self._contentType = None
+ self._size = 0
+ self._md5 = None
+ self._created = None
+ self._modified = None
+ self._name = name
+ self._justCreated = justCreated
+
+
+ def __repr__(self):
+ return (
+ "<{self.__class__.__name__}: {self._attachmentID}>"
+ .format(self=self)
+ )
+
+
+ def _attachmentPathRoot(self):
+ return self._txn._store.attachmentsPath
+
+
+ @inlineCallbacks
+ def initFromStore(self):
+ """
+ Execute necessary SQL queries to retrieve attributes.
+
+ @return: C{True} if this attachment exists, C{False} otherwise.
+ """
+ att = self._attachmentSchema
+ if self._dropboxID and self._dropboxID != ".":
+ where = (att.DROPBOX_ID == self._dropboxID).And(
+ att.PATH == self._name)
+ else:
+ where = (att.ATTACHMENT_ID == self._attachmentID)
+ rows = (yield Select(
+ self._allColumns(),
+ From=att,
+ Where=where
+ ).on(self._txn))
+
+ if not rows:
+ returnValue(None)
+
+ for attr, value in zip(self._rowAttributes(), rows[0]):
+ setattr(self, attr, value)
+ self._contentType = MimeType.fromString(self._contentType)
+ self._created = sqltime(self._created)
+ self._modified = sqltime(self._modified)
+
+ returnValue(self)
+
+
+ def copyRemote(self, remote):
+ """
+ Copy properties from a remote (external) attachment that is being migrated.
+
+ @param remote: the external attachment
+ @type remote: L{Attachment}
+ """
+ return self.changed(remote.contentType(), remote.name(), remote.md5(), remote.size())
+
+
+ def id(self):
+ return self._attachmentID
+
+
+ def dropboxID(self):
+ return self._dropboxID
+
+
+ def isManaged(self):
+ return self._dropboxID == "."
+
+
+ def name(self):
+ return self._name
+
+
+ def properties(self):
+ pass # stub
+
+
+ def store(self, contentType, dispositionName=None, migrating=False):
+ if not self._name:
+ self._name = dispositionName
+ return AttachmentStorageTransport(self, contentType, dispositionName, self._justCreated, migrating=migrating)
+
+
+ def retrieve(self, protocol):
+ return AttachmentRetrievalTransport(self._path).start(protocol)
+
+
+ def changed(self, contentType, dispositionName, md5, size):
+ raise NotImplementedError
+
+ _removeStatement = Delete(
+ From=schema.ATTACHMENT,
+ Where=(schema.ATTACHMENT.ATTACHMENT_ID == Parameter("attachmentID"))
+ )
+
+
+ @inlineCallbacks
+ def remove(self, adjustQuota=True):
+ oldSize = self._size
+ self._txn.postCommit(self.removePaths)
+ yield self._internalRemove()
+
+ # Adjust quota
+ if adjustQuota:
+ home = (yield self._txn.calendarHomeWithResourceID(self._ownerHomeID))
+ if home:
+ yield home.adjustQuotaUsedBytes(-oldSize)
+
+ # Send change notification to home
+ yield home.notifyChanged()
+
+
+ def removePaths(self):
+ """
+ Remove the actual file and up to attachment parent directory if empty.
+ """
+ self._path.remove()
+ self.removeParentPaths()
+
+
+ def removeParentPaths(self):
+ """
+ Remove up to attachment parent directory if empty.
+ """
+ parent = self._path.parent()
+ toppath = self._attachmentPathRoot().path
+ while parent.path != toppath:
+ if len(parent.listdir()) == 0:
+ parent.remove()
+ parent = parent.parent()
+ else:
+ break
+
+
+ def _internalRemove(self):
+ """
+ Just delete the row; don't do any accounting / bookkeeping. (This is
+ for attachments that have failed to be created due to errors during
+ storage.)
+ """
+ return self._removeStatement.on(self._txn, attachmentID=self._attachmentID)
+
+
+ @classmethod
+ @inlineCallbacks
+ def removedHome(cls, txn, homeID):
+ """
+ A calendar home is being removed so all of its attachments must go too. When removing,
+ we don't care about quota adjustment as there will be no quota once the home is removed.
+
+ TODO: this needs to be transactional wrt the actual file deletes.
+ """
+ att = cls._attachmentSchema
+ attco = cls._attachmentLinkSchema
+
+ rows = (yield Select(
+ [att.ATTACHMENT_ID, att.DROPBOX_ID, ],
+ From=att,
+ Where=(
+ att.CALENDAR_HOME_RESOURCE_ID == homeID
+ ),
+ ).on(txn))
+
+ for attachmentID, dropboxID in rows:
+ if dropboxID != ".":
+ attachment = DropBoxAttachment(txn, attachmentID, None, None)
+ else:
+ attachment = ManagedAttachment(txn, attachmentID, None, None)
+ attachment = (yield attachment.initFromStore())
+ if attachment._path.exists():
+ attachment.removePaths()
+
+ yield Delete(
+ From=attco,
+ Where=(
+ attco.ATTACHMENT_ID.In(Select(
+ [att.ATTACHMENT_ID, ],
+ From=att,
+ Where=(
+ att.CALENDAR_HOME_RESOURCE_ID == homeID
+ ),
+ ))
+ ),
+ ).on(txn)
+
+ yield Delete(
+ From=att,
+ Where=(
+ att.CALENDAR_HOME_RESOURCE_ID == homeID
+ ),
+ ).on(txn)
+
+
+ # IDataStoreObject
+ def contentType(self):
+ return self._contentType
+
+
+ def md5(self):
+ return self._md5
+
+
+ def size(self):
+ return self._size
+
+
+ def created(self):
+ return self._created
+
+
+ def modified(self):
+ return self._modified
+
+
+
+class DropBoxAttachment(Attachment):
+
+ @classmethod
+ @inlineCallbacks
+ def create(cls, txn, dropboxID, name, ownerHomeID):
+ """
+ Create a new Attachment object.
+
+ @param txn: The transaction to use
+ @type txn: L{CommonStoreTransaction}
+ @param dropboxID: the identifier for the attachment (dropbox id or managed id)
+ @type dropboxID: C{str}
+ @param name: the name of the attachment
+ @type name: C{str}
+ @param ownerHomeID: the resource-id of the home collection of the attachment owner
+ @type ownerHomeID: C{int}
+ """
+
+ # If store has already migrated to managed attachments we will prevent creation of dropbox attachments
+ dropbox = (yield txn.store().dropboxAllowed(txn))
+ if not dropbox:
+ raise AttachmentDropboxNotAllowed
+
+ # Now create the DB entry
+ att = cls._attachmentSchema
+ rows = (yield Insert({
+ att.CALENDAR_HOME_RESOURCE_ID : ownerHomeID,
+ att.DROPBOX_ID : dropboxID,
+ att.CONTENT_TYPE : "",
+ att.SIZE : 0,
+ att.MD5 : "",
+ att.PATH : name,
+ }, Return=(att.ATTACHMENT_ID, att.CREATED, att.MODIFIED)).on(txn))
+
+ row_iter = iter(rows[0])
+ a_id = row_iter.next()
+ created = sqltime(row_iter.next())
+ modified = sqltime(row_iter.next())
+
+ attachment = cls(txn, a_id, dropboxID, name, ownerHomeID, True)
+ attachment._created = created
+ attachment._modified = modified
+
+ # File system paths need to exist
+ try:
+ attachment._path.parent().makedirs()
+ except:
+ pass
+
+ returnValue(attachment)
+
+
+ @classmethod
+ @inlineCallbacks
+ def load(cls, txn, dropboxID, name):
+ attachment = cls(txn, None, dropboxID, name)
+ attachment = (yield attachment.initFromStore())
+ returnValue(attachment)
+
+
+ @property
+ def _path(self):
+ # Use directory hashing scheme based on MD5 of dropboxID
+ hasheduid = hashlib.md5(self._dropboxID).hexdigest()
+ attachmentRoot = self._attachmentPathRoot().child(hasheduid[0:2]).child(hasheduid[2:4]).child(hasheduid)
+ return attachmentRoot.child(self.name())
+
+
+ @classmethod
+ @inlineCallbacks
+ def resourceRemoved(cls, txn, resourceID, dropboxID):
+ """
+ Remove all attachments referencing the specified resource.
+ """
+
+ # See if any other resources still reference this dropbox ID
+ co = schema.CALENDAR_OBJECT
+ rows = (yield Select(
+ [co.RESOURCE_ID, ],
+ From=co,
+ Where=(co.DROPBOX_ID == dropboxID).And(
+ co.RESOURCE_ID != resourceID)
+ ).on(txn))
+
+ if not rows:
+ # Find each attachment with matching dropbox ID
+ att = cls._attachmentSchema
+ rows = (yield Select(
+ [att.PATH],
+ From=att,
+ Where=(att.DROPBOX_ID == dropboxID)
+ ).on(txn))
+ for name in rows:
+ name = name[0]
+ attachment = yield cls.load(txn, dropboxID, name)
+ yield attachment.remove()
+
+
+ @inlineCallbacks
+ def changed(self, contentType, dispositionName, md5, size):
+ """
+ Dropbox attachments never change their path - ignore dispositionName.
+ """
+
+ self._contentType = contentType
+ self._md5 = md5
+ self._size = size
+
+ att = self._attachmentSchema
+ self._created, self._modified = map(
+ sqltime,
+ (yield Update(
+ {
+ att.CONTENT_TYPE : generateContentType(self._contentType),
+ att.SIZE : self._size,
+ att.MD5 : self._md5,
+ att.MODIFIED : utcNowSQL,
+ },
+ Where=(att.ATTACHMENT_ID == self._attachmentID),
+ Return=(att.CREATED, att.MODIFIED)).on(self._txn))[0]
+ )
+
+
+ @inlineCallbacks
+ def convertToManaged(self):
+ """
+ Convert this dropbox attachment into a managed attachment by updating the
+ database and returning a new ManagedAttachment object that does not reference
+ any calendar object. Referencing will be added later.
+
+ @return: the managed attachment object
+ @rtype: L{ManagedAttachment}
+ """
+
+ # Change the DROPBOX_ID to a single "." to indicate a managed attachment.
+ att = self._attachmentSchema
+ (yield Update(
+ {att.DROPBOX_ID : ".", },
+ Where=(att.ATTACHMENT_ID == self._attachmentID),
+ ).on(self._txn))
+
+ # Create an "orphaned" ManagedAttachment that points to the updated data but without
+ # an actual managed-id (which only exists when there is a reference to a calendar object).
+ mattach = (yield ManagedAttachment.load(self._txn, None, None, attachmentID=self._attachmentID))
+ mattach._managedID = str(uuid.uuid4())
+ if mattach is None:
+ raise AttachmentMigrationFailed
+
+ # Then move the file on disk from the old path to the new one
+ try:
+ mattach._path.parent().makedirs()
+ except Exception:
+ # OK to fail if it already exists, otherwise must raise
+ if not mattach._path.parent().exists():
+ raise
+ oldpath = self._path
+ newpath = mattach._path
+ oldpath.moveTo(newpath)
+ self.removeParentPaths()
+
+ returnValue(mattach)
+
+
+
+class ManagedAttachment(Attachment):
+ """
+ Managed attachments are ones that the server is in total control of. Clients do POSTs on calendar objects
+ to store the attachment data and have ATTACH properties added, updated or remove from the calendar objects.
+ Each ATTACH property in a calendar object has a MANAGED-ID iCalendar parameter that is used in the POST requests
+ to target a specific attachment. The MANAGED-ID values are unique to each calendar object resource, though
+ multiple calendar object resources can point to the same underlying attachment as there is a separate database
+ table that maps calendar objects/managed-ids to actual attachments.
+ """
+
+ @classmethod
+ @inlineCallbacks
+ def _create(cls, txn, managedID, ownerHomeID):
+ """
+ Create a new managed Attachment object.
+
+ @param txn: The transaction to use
+ @type txn: L{CommonStoreTransaction}
+ @param managedID: the identifier for the attachment
+ @type managedID: C{str}
+ @param ownerHomeID: the resource-id of the home collection of the attachment owner
+ @type ownerHomeID: C{int}
+ """
+
+ # Now create the DB entry
+ att = cls._attachmentSchema
+ rows = (yield Insert({
+ att.CALENDAR_HOME_RESOURCE_ID : ownerHomeID,
+ att.DROPBOX_ID : ".",
+ att.CONTENT_TYPE : "",
+ att.SIZE : 0,
+ att.MD5 : "",
+ att.PATH : "",
+ }, Return=(att.ATTACHMENT_ID, att.CREATED, att.MODIFIED)).on(txn))
+
+ row_iter = iter(rows[0])
+ a_id = row_iter.next()
+ created = sqltime(row_iter.next())
+ modified = sqltime(row_iter.next())
+
+ attachment = cls(txn, a_id, ".", None, ownerHomeID, True)
+ attachment._managedID = managedID
+ attachment._created = created
+ attachment._modified = modified
+
+ # File system paths need to exist
+ try:
+ attachment._path.parent().makedirs()
+ except:
+ pass
+
+ returnValue(attachment)
+
+
+ @classmethod
+ @inlineCallbacks
+ def create(cls, txn, managedID, ownerHomeID, referencedBy):
+ """
+ Create a new Attachment object and reference it.
+
+ @param txn: The transaction to use
+ @type txn: L{CommonStoreTransaction}
+ @param managedID: the identifier for the attachment
+ @type managedID: C{str}
+ @param ownerHomeID: the resource-id of the home collection of the attachment owner
+ @type ownerHomeID: C{int}
+ @param referencedBy: the resource-id of the calendar object referencing the attachment
+ @type referencedBy: C{int}
+ """
+
+ # Now create the DB entry
+ attachment = (yield cls._create(txn, managedID, ownerHomeID))
+ attachment._objectResourceID = referencedBy
+
+ # Create the attachment<->calendar object relationship for managed attachments
+ attco = cls._attachmentLinkSchema
+ yield Insert({
+ attco.ATTACHMENT_ID : attachment._attachmentID,
+ attco.MANAGED_ID : attachment._managedID,
+ attco.CALENDAR_OBJECT_RESOURCE_ID : attachment._objectResourceID,
+ }).on(txn)
+
+ returnValue(attachment)
+
+
+ @classmethod
+ @inlineCallbacks
+ def update(cls, txn, oldManagedID, ownerHomeID, referencedBy, oldAttachmentID):
+ """
+ Update an Attachment object. This creates a new one and adjusts the reference to the old
+ one to point to the new one. If the old one is no longer referenced at all, it is deleted.
+
+ @param txn: The transaction to use
+ @type txn: L{CommonStoreTransaction}
+ @param oldManagedID: the identifier for the original attachment
+ @type oldManagedID: C{str}
+ @param ownerHomeID: the resource-id of the home collection of the attachment owner
+ @type ownerHomeID: C{int}
+ @param referencedBy: the resource-id of the calendar object referencing the attachment
+ @type referencedBy: C{int}
+ @param oldAttachmentID: the attachment-id of the existing attachment being updated
+ @type oldAttachmentID: C{int}
+ """
+
+ # Now create the DB entry with a new managed-ID
+ managed_id = str(uuid.uuid4())
+ attachment = (yield cls._create(txn, managed_id, ownerHomeID))
+ attachment._objectResourceID = referencedBy
+
+ # Update the attachment<->calendar object relationship for managed attachments
+ attco = cls._attachmentLinkSchema
+ yield Update(
+ {
+ attco.ATTACHMENT_ID : attachment._attachmentID,
+ attco.MANAGED_ID : attachment._managedID,
+ },
+ Where=(attco.MANAGED_ID == oldManagedID).And(
+ attco.CALENDAR_OBJECT_RESOURCE_ID == attachment._objectResourceID
+ ),
+ ).on(txn)
+
+ # Now check whether old attachmentID is still referenced - if not delete it
+ rows = (yield Select(
+ [attco.ATTACHMENT_ID, ],
+ From=attco,
+ Where=(attco.ATTACHMENT_ID == oldAttachmentID),
+ ).on(txn))
+ aids = [row[0] for row in rows] if rows is not None else ()
+ if len(aids) == 0:
+ oldattachment = ManagedAttachment(txn, oldAttachmentID, None, None)
+ oldattachment = (yield oldattachment.initFromStore())
+ yield oldattachment.remove()
+
+ returnValue(attachment)
+
+
+ @classmethod
+ @inlineCallbacks
+ def load(cls, txn, referencedID, managedID, attachmentID=None):
+ """
+ Load a ManagedAttachment via either its managedID or attachmentID.
+ """
+
+ if managedID:
+ attco = cls._attachmentLinkSchema
+ where = (attco.MANAGED_ID == managedID)
+ if referencedID is not None:
+ where = where.And(attco.CALENDAR_OBJECT_RESOURCE_ID == referencedID)
+ rows = (yield Select(
+ [attco.ATTACHMENT_ID, ],
+ From=attco,
+ Where=where,
+ ).on(txn))
+ if len(rows) == 0:
+ returnValue(None)
+ elif referencedID is not None and len(rows) != 1:
+ raise AttachmentStoreValidManagedID
+ attachmentID = rows[0][0]
+
+ attachment = cls(txn, attachmentID, None, None)
+ attachment = (yield attachment.initFromStore())
+ attachment._managedID = managedID
+ attachment._objectResourceID = referencedID
+ returnValue(attachment)
+
+
+ @classmethod
+ @inlineCallbacks
+ def referencesTo(cls, txn, managedID):
+ """
+ Find all the calendar object resourceIds referenced by this supplied managed-id.
+ """
+ attco = cls._attachmentLinkSchema
+ rows = (yield Select(
+ [attco.CALENDAR_OBJECT_RESOURCE_ID, ],
+ From=attco,
+ Where=(attco.MANAGED_ID == managedID),
+ ).on(txn))
+ cobjs = set([row[0] for row in rows]) if rows is not None else set()
+ returnValue(cobjs)
+
+
+ @classmethod
+ @inlineCallbacks
+ def usedManagedID(cls, txn, managedID):
+ """
+ Return the "owner" home and referencing resource is, and UID for a managed-id.
+ """
+ att = cls._attachmentSchema
+ attco = cls._attachmentLinkSchema
+ co = schema.CALENDAR_OBJECT
+ rows = (yield Select(
+ [
+ att.CALENDAR_HOME_RESOURCE_ID,
+ attco.CALENDAR_OBJECT_RESOURCE_ID,
+ co.ICALENDAR_UID,
+ ],
+ From=att.join(
+ attco, att.ATTACHMENT_ID == attco.ATTACHMENT_ID, "left outer"
+ ).join(co, co.RESOURCE_ID == attco.CALENDAR_OBJECT_RESOURCE_ID),
+ Where=(attco.MANAGED_ID == managedID),
+ ).on(txn))
+ returnValue(rows)
+
+
+ @classmethod
+ @inlineCallbacks
+ def resourceRemoved(cls, txn, resourceID):
+ """
+ Remove all attachments referencing the specified resource.
+ """
+
+ # Find all reference attachment-ids and dereference
+ attco = cls._attachmentLinkSchema
+ rows = (yield Select(
+ [attco.MANAGED_ID, ],
+ From=attco,
+ Where=(attco.CALENDAR_OBJECT_RESOURCE_ID == resourceID),
+ ).on(txn))
+ mids = set([row[0] for row in rows]) if rows is not None else set()
+ for managedID in mids:
+ attachment = (yield ManagedAttachment.load(txn, resourceID, managedID))
+ (yield attachment.removeFromResource(resourceID))
+
+
+ @classmethod
+ @inlineCallbacks
+ def copyManagedID(cls, txn, managedID, referencedBy):
+ """
+ Associate an existing attachment with the new resource.
+ """
+
+ # Find the associated attachment-id and insert new reference
+ attco = cls._attachmentLinkSchema
+ aid = (yield Select(
+ [attco.ATTACHMENT_ID, ],
+ From=attco,
+ Where=(attco.MANAGED_ID == managedID),
+ ).on(txn))[0][0]
+
+ yield Insert({
+ attco.ATTACHMENT_ID : aid,
+ attco.MANAGED_ID : managedID,
+ attco.CALENDAR_OBJECT_RESOURCE_ID : referencedBy,
+ }).on(txn)
+
+
+ def managedID(self):
+ return self._managedID
+
+
+ @inlineCallbacks
+ def objectResource(self):
+ """
+ Return the calendar object resource associated with this attachment.
+ """
+
+ home = (yield self._txn.calendarHomeWithResourceID(self._ownerHomeID))
+ obj = (yield home.objectResourceWithID(self._objectResourceID))
+ returnValue(obj)
+
+
+ @property
+ def _path(self):
+ # Use directory hashing scheme based on MD5 of attachmentID
+ hasheduid = hashlib.md5(str(self._attachmentID)).hexdigest()
+ return self._attachmentPathRoot().child(hasheduid[0:2]).child(hasheduid[2:4]).child(hasheduid)
+
+
+ @inlineCallbacks
+ def location(self):
+ """
+ Return the URI location of the attachment.
+ """
+ if not hasattr(self, "_ownerName"):
+ home = (yield self._txn.calendarHomeWithResourceID(self._ownerHomeID))
+ self._ownerName = home.name()
+ if not hasattr(self, "_objectDropboxID"):
+ if not hasattr(self, "_objectResource"):
+ self._objectResource = (yield self.objectResource())
+ self._objectDropboxID = self._objectResource._dropboxID
+
+ fname = self.lastSegmentOfUriPath(self._managedID, self._name)
+ location = self._txn._store.attachmentsURIPattern % {
+ "home": self._ownerName,
+ "dropbox_id": urllib.quote(self._objectDropboxID),
+ "name": urllib.quote(fname),
+ }
+ returnValue(location)
+
+
+ @classmethod
+ def lastSegmentOfUriPath(cls, managed_id, name):
+ splits = name.rsplit(".", 1)
+ fname = splits[0]
+ suffix = splits[1] if len(splits) == 2 else "unknown"
+ return "{0}-{1}.{2}".format(fname, managed_id[:8], suffix)
+
+
+ @inlineCallbacks
+ def changed(self, contentType, dispositionName, md5, size):
+ """
+ Always update name to current disposition name.
+ """
+
+ self._contentType = contentType
+ self._name = dispositionName
+ self._md5 = md5
+ self._size = size
+ att = self._attachmentSchema
+ self._created, self._modified = map(
+ sqltime,
+ (yield Update(
+ {
+ att.CONTENT_TYPE : generateContentType(self._contentType),
+ att.SIZE : self._size,
+ att.MD5 : self._md5,
+ att.MODIFIED : utcNowSQL,
+ att.PATH : self._name,
+ },
+ Where=(att.ATTACHMENT_ID == self._attachmentID),
+ Return=(att.CREATED, att.MODIFIED)).on(self._txn))[0]
+ )
+
+
+ @inlineCallbacks
+ def newReference(self, resourceID):
+ """
+ Create a new reference of this attachment to the supplied calendar object resource id, and
+ return a ManagedAttachment for the new reference.
+
+ @param resourceID: the resource id to reference
+ @type resourceID: C{int}
+
+ @return: the new managed attachment
+ @rtype: L{ManagedAttachment}
+ """
+
+ attco = self._attachmentLinkSchema
+ yield Insert({
+ attco.ATTACHMENT_ID : self._attachmentID,
+ attco.MANAGED_ID : self._managedID,
+ attco.CALENDAR_OBJECT_RESOURCE_ID : resourceID,
+ }).on(self._txn)
+
+ mattach = (yield ManagedAttachment.load(self._txn, resourceID, self._managedID))
+ returnValue(mattach)
+
+
+ @inlineCallbacks
+ def removeFromResource(self, resourceID):
+
+ # Delete the reference
+ attco = self._attachmentLinkSchema
+ yield Delete(
+ From=attco,
+ Where=(attco.ATTACHMENT_ID == self._attachmentID).And(
+ attco.CALENDAR_OBJECT_RESOURCE_ID == resourceID),
+ ).on(self._txn)
+
+ # References still exist - if not remove actual attachment
+ rows = (yield Select(
+ [attco.CALENDAR_OBJECT_RESOURCE_ID, ],
+ From=attco,
+ Where=(attco.ATTACHMENT_ID == self._attachmentID),
+ ).on(self._txn))
+ if len(rows) == 0:
+ yield self.remove()
+
+
+ @inlineCallbacks
+ def attachProperty(self):
+ """
+ Return an iCalendar ATTACH property for this attachment.
+ """
+ attach = Property("ATTACH", "", valuetype=Value.VALUETYPE_URI)
+ location = (yield self.updateProperty(attach))
+ returnValue((attach, location,))
+
+
+ @inlineCallbacks
+ def updateProperty(self, attach):
+ """
+ Update an iCalendar ATTACH property for this attachment.
+ """
+
+ location = (yield self.location())
+
+ attach.setParameter("MANAGED-ID", self.managedID())
+ attach.setParameter("FMTTYPE", "{0}/{1}".format(self.contentType().mediaType, self.contentType().mediaSubtype))
+ attach.setParameter("FILENAME", self.name())
+ attach.setParameter("SIZE", str(self.size()))
+ attach.setValue(location)
+
+ returnValue(location)
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql_directory.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql_directory.py 2015-02-20 18:06:03 UTC (rev 14461)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql_directory.py 2015-02-20 18:41:05 UTC (rev 14462)
@@ -17,13 +17,10 @@
from twext.enterprise.dal.record import SerializableRecord, fromTable
from twext.enterprise.dal.syntax import Select, Parameter
-from twext.python.log import Logger
from twisted.internet.defer import inlineCallbacks, returnValue
from txdav.common.datastore.sql_tables import schema
from txdav.common.datastore.sql_directory import GroupsRecord
-log = Logger()
-
"""
Classes and methods that relate to directory objects in the SQL store. e.g.,
delegates, groups etc
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql_external.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql_external.py 2015-02-20 18:06:03 UTC (rev 14461)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql_external.py 2015-02-20 18:41:05 UTC (rev 14462)
@@ -16,6 +16,7 @@
##
from txdav.common.datastore.sql_directory import GroupsRecord
from txdav.caldav.datastore.sql_directory import GroupAttendeeRecord
+from txdav.caldav.datastore.sql_attachment import Attachment, AttachmentLink
"""
SQL backend for CalDAV storage when resources are external.
"""
@@ -24,8 +25,7 @@
from twext.python.log import Logger
-from txdav.caldav.datastore.sql import CalendarHome, Calendar, CalendarObject, \
- Attachment, AttachmentLink
+from txdav.caldav.datastore.sql import CalendarHome, Calendar, CalendarObject
from txdav.caldav.icalendarstore import ComponentUpdateState, ComponentRemoveState
from txdav.common.datastore.sql_external import CommonHomeExternal, CommonHomeChildExternal, \
CommonObjectResourceExternal
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/test/test_attachments.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/test/test_attachments.py 2015-02-20 18:06:03 UTC (rev 14461)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/test/test_attachments.py 2015-02-20 18:41:05 UTC (rev 14462)
@@ -29,8 +29,7 @@
from twistedcaldav.config import config
from twistedcaldav.ical import Property, Component
-from txdav.caldav.datastore.sql import CalendarStoreFeatures, DropBoxAttachment, \
- ManagedAttachment
+from txdav.caldav.datastore.sql import CalendarStoreFeatures
from txdav.caldav.datastore.test.common import CaptureProtocol
from txdav.caldav.icalendarstore import IAttachmentStorageTransport, IAttachment, \
QuotaExceeded, AttachmentSizeTooLarge
@@ -40,6 +39,8 @@
import hashlib
import os
+from txdav.caldav.datastore.sql_attachment import DropBoxAttachment, \
+ ManagedAttachment
"""
Tests for txdav.caldav.datastore.sql attachment handling.
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/carddav/datastore/sql.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/carddav/datastore/sql.py 2015-02-20 18:06:03 UTC (rev 14461)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/carddav/datastore/sql.py 2015-02-20 18:41:05 UTC (rev 14462)
@@ -53,11 +53,12 @@
KindChangeNotAllowedError
from txdav.common.datastore.query.generator import SQLQueryGenerator
from txdav.common.datastore.sql import CommonHome, CommonHomeChild, \
- CommonObjectResource, EADDRESSBOOKTYPE, SharingMixIn, SharingInvitation
+ CommonObjectResource, EADDRESSBOOKTYPE, SharingMixIn
from txdav.common.datastore.sql_tables import _ABO_KIND_PERSON, \
_ABO_KIND_GROUP, _ABO_KIND_RESOURCE, _ABO_KIND_LOCATION, schema, \
_BIND_MODE_OWN, _BIND_MODE_WRITE, _BIND_STATUS_ACCEPTED, \
_BIND_STATUS_INVITED, _BIND_MODE_INDIRECT, _BIND_STATUS_DECLINED
+from txdav.common.datastore.sql_sharing import SharingInvitation
from txdav.common.icommondatastore import InternalDataStoreError, \
InvalidUIDError, UIDExistsError, ObjectResourceTooBigError, \
InvalidObjectResourceError, InvalidComponentForStoreError, \
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/file.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/file.py 2015-02-20 18:06:03 UTC (rev 14461)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/file.py 2015-02-20 18:41:05 UTC (rev 14462)
@@ -44,7 +44,8 @@
from txdav.common.icommondatastore import HomeChildNameNotAllowedError, \
HomeChildNameAlreadyExistsError, NoSuchHomeChildError, \
InternalDataStoreError, ObjectResourceNameNotAllowedError, \
- ObjectResourceNameAlreadyExistsError, NoSuchObjectResourceError
+ ObjectResourceNameAlreadyExistsError, NoSuchObjectResourceError, \
+ ECALENDARTYPE, EADDRESSBOOKTYPE
from txdav.common.idirectoryservice import IStoreDirectoryService
from txdav.common.inotifications import INotificationCollection, \
INotificationObject
@@ -64,16 +65,6 @@
from twistedcaldav.sql import AbstractSQLDatabase, db_prefix
import os
-ECALENDARTYPE = 0
-EADDRESSBOOKTYPE = 1
-
-# Labels used to identify the class of resource being modified, so that
-# notification systems can target the correct application
-NotifierPrefixes = {
- ECALENDARTYPE : "CalDAV",
- EADDRESSBOOKTYPE : "CardDAV",
-}
-
TOPPATHS = (
"calendars",
"addressbooks"
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql.py 2015-02-20 18:06:03 UTC (rev 14461)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql.py 2015-02-20 18:41:05 UTC (rev 14462)
@@ -14,6 +14,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
##
+from txdav.common.datastore.sql_notification import NotificationCollection
+from txdav.common.datastore.sql_util import _EmptyCacher, _SharedSyncLogic
+from txdav.common.datastore.sql_sharing import SharingHomeMixIn, SharingMixIn
"""
SQL data store.
@@ -31,7 +34,7 @@
from twext.enterprise.dal.syntax import (
Delete, utcNowSQL, Union, Insert, Len, Max, Parameter, SavepointAction,
- Select, Update, ColumnSyntax, TableSyntax, Upper, Count, ALL_COLUMNS, Sum,
+ Select, Update, Count, ALL_COLUMNS, Sum,
DatabaseLock, DatabaseUnlock)
from twext.enterprise.ienterprise import AlreadyFinishedError
from twext.enterprise.jobqueue import LocalQueuer
@@ -39,12 +42,10 @@
from twext.internet.decorate import memoizedKey, Memoizable
from twext.python.clsprop import classproperty
from twext.python.log import Logger
-from txweb2.http_headers import MimeType
from twisted.application.service import Service
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, returnValue, succeed
-from twisted.python import hashlib
from twisted.python.failure import Failure
from twisted.python.modules import getModule
from twisted.python.util import FancyEqMixin
@@ -53,8 +54,6 @@
from twistedcaldav.dateops import datetimeMktime, pyCalendarTodatetime
from txdav.base.datastore.util import QueryCacher
-from txdav.base.datastore.util import normalizeUUIDOrNot
-from txdav.base.propertystore.base import PropertyName
from txdav.base.propertystore.none import PropertyStore as NonePropertyStore
from txdav.base.propertystore.sql import PropertyStore
from txdav.caldav.icalendarstore import ICalendarTransaction, ICalendarStore
@@ -65,14 +64,12 @@
from txdav.common.datastore.sql_directory import DelegatesAPIMixin, \
GroupsAPIMixin, GroupCacherAPIMixin
from txdav.common.datastore.sql_imip import imipAPIMixin
-from txdav.common.datastore.sql_tables import _BIND_MODE_DIRECT, \
- _BIND_MODE_INDIRECT, _BIND_MODE_OWN, _BIND_STATUS_ACCEPTED, \
- _BIND_STATUS_DECLINED, _BIND_STATUS_DELETED, _BIND_STATUS_INVALID, \
- _BIND_STATUS_INVITED, _HOME_STATUS_EXTERNAL, _HOME_STATUS_NORMAL, \
+from txdav.common.datastore.sql_tables import _BIND_MODE_OWN, _BIND_STATUS_ACCEPTED, \
+ _HOME_STATUS_EXTERNAL, _HOME_STATUS_NORMAL, \
_HOME_STATUS_PURGING, schema, splitSQLString, _HOME_STATUS_MIGRATING
from txdav.common.icommondatastore import ConcurrentModification, \
- RecordNotAllowedError, ExternalShareFailed, ShareNotAllowed, \
- IndexedSearchException
+ RecordNotAllowedError, ShareNotAllowed, \
+ IndexedSearchException, EADDRESSBOOKTYPE, ECALENDARTYPE
from txdav.common.icommondatastore import HomeChildNameNotAllowedError, \
HomeChildNameAlreadyExistsError, NoSuchHomeChildError, \
ObjectResourceNameNotAllowedError, ObjectResourceNameAlreadyExistsError, \
@@ -80,19 +77,12 @@
TooManyObjectResourcesError, SyncTokenValidException
from txdav.common.idirectoryservice import IStoreDirectoryService, \
DirectoryRecordNotFoundError
-from txdav.common.inotifications import INotificationCollection, \
- INotificationObject
from txdav.idav import ChangeCategory
-from txdav.xml import element
-from uuid import uuid4, UUID
-
from zope.interface import implements, directlyProvides
-from collections import namedtuple
import inspect
import itertools
-import json
import sys
import time
@@ -100,18 +90,6 @@
log = Logger()
-ECALENDARTYPE = 0
-EADDRESSBOOKTYPE = 1
-ENOTIFICATIONTYPE = 2
-
-# Labels used to identify the class of resource being modified, so that
-# notification systems can target the correct application
-NotifierPrefixes = {
- ECALENDARTYPE: "CalDAV",
- EADDRESSBOOKTYPE: "CardDAV",
-}
-
-
class CommonDataStore(Service, object):
"""
Shared logic for SQL-based data stores, between calendar and addressbook
@@ -1532,180 +1510,6 @@
-class _EmptyCacher(object):
-
- def set(self, key, value):
- return succeed(True)
-
-
- def get(self, key, withIdentifier=False):
- return succeed(None)
-
-
- def delete(self, key):
- return succeed(True)
-
-
-
-class SharingHomeMixIn(object):
- """
- Common class for CommonHome to implement sharing operations
- """
-
- @inlineCallbacks
- def acceptShare(self, shareUID, summary=None):
- """
- This share is being accepted.
- """
-
- shareeView = yield self.anyObjectWithShareUID(shareUID)
- if shareeView is not None:
- yield shareeView.acceptShare(summary)
-
- returnValue(shareeView)
-
-
- @inlineCallbacks
- def declineShare(self, shareUID):
- """
- This share is being declined.
- """
-
- shareeView = yield self.anyObjectWithShareUID(shareUID)
- if shareeView is not None:
- yield shareeView.declineShare()
-
- returnValue(shareeView is not None)
-
-
- #
- # External (cross-pod) sharing - entry point is the sharee's home collection.
- #
- @inlineCallbacks
- def processExternalInvite(
- self, ownerUID, ownerRID, ownerName, shareUID, bindMode, summary,
- copy_invite_properties, supported_components=None
- ):
- """
- External invite received.
- """
-
- # Get the owner home - create external one if not present
- ownerHome = yield self._txn.homeWithUID(
- self._homeType, ownerUID, create=True
- )
- if ownerHome is None or not ownerHome.external():
- raise ExternalShareFailed("Invalid owner UID: {}".format(ownerUID))
-
- # Try to find owner calendar via its external id
- ownerView = yield ownerHome.childWithExternalID(ownerRID)
- if ownerView is None:
- try:
- ownerView = yield ownerHome.createChildWithName(
- ownerName, externalID=ownerRID
- )
- except HomeChildNameAlreadyExistsError:
- # This is odd - it means we possibly have a left over sharer
- # collection which the sharer likely removed and re-created
- # with the same name but now it has a different externalID and
- # is not found by the initial query. What we do is check to see
- # whether any shares still reference the old ID - if they do we
- # are hosed. If not, we can remove the old item and create a new one.
- oldOwnerView = yield ownerHome.childWithName(ownerName)
- invites = yield oldOwnerView.sharingInvites()
- if len(invites) != 0:
- log.error(
- "External invite collection name is present with a "
- "different externalID and still has shares"
- )
- raise
- log.error(
- "External invite collection name is present with a "
- "different externalID - trying to fix"
- )
- yield ownerHome.removeExternalChild(oldOwnerView)
- ownerView = yield ownerHome.createChildWithName(
- ownerName, externalID=ownerRID
- )
-
- if (
- supported_components is not None and
- hasattr(ownerView, "setSupportedComponents")
- ):
- yield ownerView.setSupportedComponents(supported_components)
-
- # Now carry out the share operation
- if bindMode == _BIND_MODE_DIRECT:
- shareeView = yield ownerView.directShareWithUser(
- self.uid(), shareName=shareUID
- )
- else:
- shareeView = yield ownerView.inviteUIDToShare(
- self.uid(), bindMode, summary, shareName=shareUID
- )
-
- shareeView.setInviteCopyProperties(copy_invite_properties)
-
-
- @inlineCallbacks
- def processExternalUninvite(self, ownerUID, ownerRID, shareUID):
- """
- External invite received.
- """
-
- # Get the owner home
- ownerHome = yield self._txn.homeWithUID(self._homeType, ownerUID)
- if ownerHome is None or not ownerHome.external():
- raise ExternalShareFailed("Invalid owner UID: {}".format(ownerUID))
-
- # Try to find owner calendar via its external id
- ownerView = yield ownerHome.childWithExternalID(ownerRID)
- if ownerView is None:
- raise ExternalShareFailed("Invalid share ID: {}".format(shareUID))
-
- # Now carry out the share operation
- yield ownerView.uninviteUIDFromShare(self.uid())
-
- # See if there are any references to the external share. If not,
- # remove it
- invites = yield ownerView.sharingInvites()
- if len(invites) == 0:
- yield ownerHome.removeExternalChild(ownerView)
-
-
- @inlineCallbacks
- def processExternalReply(
- self, ownerUID, shareeUID, shareUID, bindStatus, summary=None
- ):
- """
- External invite received.
- """
-
- # Make sure the shareeUID and shareUID match
-
- # Get the owner home - create external one if not present
- shareeHome = yield self._txn.homeWithUID(self._homeType, shareeUID)
- if shareeHome is None or not shareeHome.external():
- raise ExternalShareFailed(
- "Invalid sharee UID: {}".format(shareeUID)
- )
-
- # Try to find owner calendar via its external id
- shareeView = yield shareeHome.anyObjectWithShareUID(shareUID)
- if shareeView is None:
- raise ExternalShareFailed("Invalid share UID: {}".format(shareUID))
-
- # Now carry out the share operation
- if bindStatus == _BIND_STATUS_ACCEPTED:
- yield shareeHome.acceptShare(shareUID, summary)
- elif bindStatus == _BIND_STATUS_DECLINED:
- if shareeView.direct():
- yield shareeView.deleteShare()
- else:
- yield shareeHome.declineShare(shareUID)
-
-
-
class CommonHome(SharingHomeMixIn):
log = Logger()
@@ -2908,1461 +2712,6 @@
-class _SharedSyncLogic(object):
- """
- Logic for maintaining sync-token shared between notification collections and
- shared collections.
- """
-
- @classproperty
- def _childSyncTokenQuery(cls):
- """
- DAL query for retrieving the sync token of a L{CommonHomeChild} based on
- its resource ID.
- """
- rev = cls._revisionsSchema
- return Select([Max(rev.REVISION)], From=rev,
- Where=rev.RESOURCE_ID == Parameter("resourceID"))
-
-
- def revisionFromToken(self, token):
- if token is None:
- return 0
- elif isinstance(token, str) or isinstance(token, unicode):
- _ignore_uuid, revision = token.split("_", 1)
- return int(revision)
- else:
- return token
-
-
- @inlineCallbacks
- def syncToken(self):
- if self._syncTokenRevision is None:
- self._syncTokenRevision = yield self.syncTokenRevision()
- returnValue(("%s_%s" % (self._resourceID, self._syncTokenRevision,)))
-
-
- @inlineCallbacks
- def syncTokenRevision(self):
- revision = (yield self._childSyncTokenQuery.on(self._txn, resourceID=self._resourceID))[0][0]
- if revision is None:
- revision = int((yield self._txn.calendarserverValue("MIN-VALID-REVISION")))
- returnValue(revision)
-
-
- def objectResourcesSinceToken(self, token):
- raise NotImplementedError()
-
-
- @classmethod
- def _objectNamesSinceRevisionQuery(cls, deleted=True):
- """
- DAL query for (resource, deleted-flag)
- """
- rev = cls._revisionsSchema
- where = (rev.REVISION > Parameter("revision")).And(rev.RESOURCE_ID == Parameter("resourceID"))
- if not deleted:
- where = where.And(rev.DELETED == False)
- return Select(
- [rev.RESOURCE_NAME, rev.DELETED],
- From=rev,
- Where=where,
- )
-
-
- def resourceNamesSinceToken(self, token):
- """
- Return the changed and deleted resources since a particular sync-token. This simply extracts
- the revision from from the token then calls L{resourceNamesSinceRevision}.
-
- @param revision: the revision to determine changes since
- @type revision: C{int}
- """
-
- return self.resourceNamesSinceRevision(self.revisionFromToken(token))
-
-
- @inlineCallbacks
- def resourceNamesSinceRevision(self, revision):
- """
- Return the changed and deleted resources since a particular revision.
-
- @param revision: the revision to determine changes since
- @type revision: C{int}
- """
- changed = []
- deleted = []
- invalid = []
- if revision:
- minValidRevision = yield self._txn.calendarserverValue("MIN-VALID-REVISION")
- if revision < int(minValidRevision):
- raise SyncTokenValidException
-
- results = [
- (name if name else "", removed) for name, removed in (
- yield self._objectNamesSinceRevisionQuery().on(
- self._txn, revision=revision, resourceID=self._resourceID)
- )
- ]
- results.sort(key=lambda x: x[1])
-
- for name, wasdeleted in results:
- if name:
- if wasdeleted:
- deleted.append(name)
- else:
- changed.append(name)
- else:
- changed = yield self.listObjectResources()
-
- returnValue((changed, deleted, invalid))
-
-
- @classproperty
- def _removeDeletedRevision(cls):
- rev = cls._revisionsSchema
- return Delete(From=rev,
- Where=(rev.HOME_RESOURCE_ID == Parameter("homeID")).And(
- rev.COLLECTION_NAME == Parameter("collectionName")))
-
-
- @classproperty
- def _addNewRevision(cls):
- rev = cls._revisionsSchema
- return Insert(
- {
- rev.HOME_RESOURCE_ID: Parameter("homeID"),
- rev.RESOURCE_ID: Parameter("resourceID"),
- rev.COLLECTION_NAME: Parameter("collectionName"),
- rev.RESOURCE_NAME: None,
- # Always starts false; may be updated to be a tombstone
- # later.
- rev.DELETED: False
- },
- Return=[rev.REVISION]
- )
-
-
- @inlineCallbacks
- def _initSyncToken(self):
- yield self._removeDeletedRevision.on(
- self._txn, homeID=self._home._resourceID, collectionName=self._name
- )
- self._syncTokenRevision = (yield (
- self._addNewRevision.on(self._txn, homeID=self._home._resourceID,
- resourceID=self._resourceID,
- collectionName=self._name)))[0][0]
- self._txn.bumpRevisionForObject(self)
-
-
- @classproperty
- def _renameSyncTokenQuery(cls):
- """
- DAL query to change sync token for a rename (increment and adjust
- resource name).
- """
- rev = cls._revisionsSchema
- return Update(
- {
- rev.REVISION: schema.REVISION_SEQ,
- rev.COLLECTION_NAME: Parameter("name")
- },
- Where=(rev.RESOURCE_ID == Parameter("resourceID")).And
- (rev.RESOURCE_NAME == None),
- Return=rev.REVISION
- )
-
-
- @inlineCallbacks
- def _renameSyncToken(self):
- self._syncTokenRevision = (yield self._renameSyncTokenQuery.on(
- self._txn, name=self._name, resourceID=self._resourceID))[0][0]
- self._txn.bumpRevisionForObject(self)
-
-
- @classproperty
- def _bumpSyncTokenQuery(cls):
- """
- DAL query to change collection sync token. Note this can impact multiple rows if the
- collection is shared.
- """
- rev = cls._revisionsSchema
- return Update(
- {rev.REVISION: schema.REVISION_SEQ, },
- Where=(rev.RESOURCE_ID == Parameter("resourceID")).And
- (rev.RESOURCE_NAME == None)
- )
-
-
- @inlineCallbacks
- def _bumpSyncToken(self):
-
- if not self._txn.isRevisionBumpedAlready(self):
- self._txn.bumpRevisionForObject(self)
- yield self._bumpSyncTokenQuery.on(
- self._txn,
- resourceID=self._resourceID,
- )
- self._syncTokenRevision = None
-
-
- @classproperty
- def _deleteSyncTokenQuery(cls):
- """
- DAL query to remove all child revision information. The revision for the collection
- itself is not touched.
- """
- rev = cls._revisionsSchema
- return Delete(
- From=rev,
- Where=(rev.HOME_RESOURCE_ID == Parameter("homeID")).And
- (rev.RESOURCE_ID == Parameter("resourceID")).And
- (rev.COLLECTION_NAME == None)
- )
-
-
- @classproperty
- def _sharedRemovalQuery(cls):
- """
- DAL query to indicate a shared collection has been deleted.
- """
- rev = cls._revisionsSchema
- return Update(
- {
- rev.RESOURCE_ID: None,
- rev.REVISION: schema.REVISION_SEQ,
- rev.DELETED: True
- },
- Where=(rev.HOME_RESOURCE_ID == Parameter("homeID")).And(
- rev.RESOURCE_ID == Parameter("resourceID")).And(
- rev.RESOURCE_NAME == None)
- )
-
-
- @classproperty
- def _unsharedRemovalQuery(cls):
- """
- DAL query to indicate an owned collection has been deleted.
- """
- rev = cls._revisionsSchema
- return Update(
- {
- rev.RESOURCE_ID: None,
- rev.REVISION: schema.REVISION_SEQ,
- rev.DELETED: True
- },
- Where=(rev.RESOURCE_ID == Parameter("resourceID")).And(
- rev.RESOURCE_NAME == None),
- )
-
-
- @inlineCallbacks
- def _deletedSyncToken(self, sharedRemoval=False):
- """
- When a collection is deleted we remove all the revision information for its child resources.
- We update the collection's sync token to indicate it has been deleted - that way a sync on
- the home collection can report the deletion of the collection.
-
- @param sharedRemoval: indicates whether the collection being removed is shared
- @type sharedRemoval: L{bool}
- """
- # Remove all child entries
- yield self._deleteSyncTokenQuery.on(self._txn,
- homeID=self._home._resourceID,
- resourceID=self._resourceID)
-
- # If this is a share being removed then we only mark this one specific
- # home/resource-id as being deleted. On the other hand, if it is a
- # non-shared collection, then we need to mark all collections
- # with the resource-id as being deleted to account for direct shares.
- if sharedRemoval:
- yield self._sharedRemovalQuery.on(self._txn,
- homeID=self._home._resourceID,
- resourceID=self._resourceID)
- else:
- yield self._unsharedRemovalQuery.on(self._txn,
- resourceID=self._resourceID)
- self._syncTokenRevision = None
-
-
- def _insertRevision(self, name):
- return self._changeRevision("insert", name)
-
-
- def _updateRevision(self, name):
- return self._changeRevision("update", name)
-
-
- def _deleteRevision(self, name):
- return self._changeRevision("delete", name)
-
-
- @classproperty
- def _deleteBumpTokenQuery(cls):
- rev = cls._revisionsSchema
- return Update(
- {rev.REVISION: schema.REVISION_SEQ, rev.DELETED: True},
- Where=(rev.RESOURCE_ID == Parameter("resourceID")).And(
- rev.RESOURCE_NAME == Parameter("name")),
- Return=rev.REVISION
- )
-
-
- @classproperty
- def _updateBumpTokenQuery(cls):
- rev = cls._revisionsSchema
- return Update(
- {rev.REVISION: schema.REVISION_SEQ},
- Where=(rev.RESOURCE_ID == Parameter("resourceID")).And(
- rev.RESOURCE_NAME == Parameter("name")),
- Return=rev.REVISION
- )
-
-
- @classproperty
- def _insertFindPreviouslyNamedQuery(cls):
- rev = cls._revisionsSchema
- return Select(
- [rev.RESOURCE_ID],
- From=rev,
- Where=(rev.RESOURCE_ID == Parameter("resourceID")).And(
- rev.RESOURCE_NAME == Parameter("name"))
- )
-
-
- @classproperty
- def _updatePreviouslyNamedQuery(cls):
- rev = cls._revisionsSchema
- return Update(
- {rev.REVISION: schema.REVISION_SEQ, rev.DELETED: False},
- Where=(rev.RESOURCE_ID == Parameter("resourceID")).And(
- rev.RESOURCE_NAME == Parameter("name")),
- Return=rev.REVISION
- )
-
-
- @classproperty
- def _completelyNewRevisionQuery(cls):
- rev = cls._revisionsSchema
- return Insert(
- {
- rev.HOME_RESOURCE_ID: Parameter("homeID"),
- rev.RESOURCE_ID: Parameter("resourceID"),
- rev.RESOURCE_NAME: Parameter("name"),
- rev.REVISION: schema.REVISION_SEQ,
- rev.DELETED: False
- },
- Return=rev.REVISION
- )
-
-
- @inlineCallbacks
- def _changeRevision(self, action, name):
-
- # Need to handle the case where for some reason the revision entry is
- # actually missing. For a "delete" we don't care, for an "update" we
- # will turn it into an "insert".
- if action == "delete":
- rows = (
- yield self._deleteBumpTokenQuery.on(
- self._txn, resourceID=self._resourceID, name=name))
- if rows:
- self._syncTokenRevision = rows[0][0]
- elif action == "update":
- rows = (
- yield self._updateBumpTokenQuery.on(
- self._txn, resourceID=self._resourceID, name=name))
- if rows:
- self._syncTokenRevision = rows[0][0]
- else:
- action = "insert"
-
- if action == "insert":
- # Note that an "insert" may happen for a resource that previously
- # existed and then was deleted. In that case an entry in the
- # REVISIONS table still exists so we have to detect that and do db
- # INSERT or UPDATE as appropriate
-
- found = bool((
- yield self._insertFindPreviouslyNamedQuery.on(
- self._txn, resourceID=self._resourceID, name=name)))
- if found:
- self._syncTokenRevision = (
- yield self._updatePreviouslyNamedQuery.on(
- self._txn, resourceID=self._resourceID, name=name)
- )[0][0]
- else:
- self._syncTokenRevision = (
- yield self._completelyNewRevisionQuery.on(
- self._txn, homeID=self.ownerHome()._resourceID,
- resourceID=self._resourceID, name=name)
- )[0][0]
- yield self._maybeNotify()
- returnValue(self._syncTokenRevision)
-
-
- def _maybeNotify(self):
- """
- Maybe notify changed. (Overridden in NotificationCollection.)
- """
- return succeed(None)
-
-
-
-SharingInvitation = namedtuple(
- "SharingInvitation",
- ["uid", "ownerUID", "ownerHomeID", "shareeUID", "shareeHomeID", "mode", "status", "summary"]
-)
-
-
-
-class SharingMixIn(object):
- """
- Common class for CommonHomeChild and AddressBookObject
- """
-
- @classproperty
- def _bindInsertQuery(cls, **kw):
- """
- DAL statement to create a bind entry that connects a collection to its
- home.
- """
- bind = cls._bindSchema
- return Insert({
- bind.HOME_RESOURCE_ID: Parameter("homeID"),
- bind.RESOURCE_ID: Parameter("resourceID"),
- bind.EXTERNAL_ID: Parameter("externalID"),
- bind.RESOURCE_NAME: Parameter("name"),
- bind.BIND_MODE: Parameter("mode"),
- bind.BIND_STATUS: Parameter("bindStatus"),
- bind.MESSAGE: Parameter("message"),
- })
-
-
- @classmethod
- def _updateBindColumnsQuery(cls, columnMap):
- bind = cls._bindSchema
- return Update(
- columnMap,
- Where=(bind.RESOURCE_ID == Parameter("resourceID")).And(
- bind.HOME_RESOURCE_ID == Parameter("homeID")),
- )
-
-
- @classproperty
- def _deleteBindForResourceIDAndHomeID(cls):
- bind = cls._bindSchema
- return Delete(
- From=bind,
- Where=(bind.RESOURCE_ID == Parameter("resourceID")).And(
- bind.HOME_RESOURCE_ID == Parameter("homeID")),
- )
-
-
- @classmethod
- def _bindFor(cls, condition):
- bind = cls._bindSchema
- columns = cls.bindColumns() + cls.additionalBindColumns()
- return Select(
- columns,
- From=bind,
- Where=condition
- )
-
-
- @classmethod
- def _bindInviteFor(cls, condition):
- home = cls._homeSchema
- bind = cls._bindSchema
- return Select(
- [
- home.OWNER_UID,
- bind.HOME_RESOURCE_ID,
- bind.RESOURCE_ID,
- bind.RESOURCE_NAME,
- bind.BIND_MODE,
- bind.BIND_STATUS,
- bind.MESSAGE,
- ],
- From=bind.join(home, on=(bind.HOME_RESOURCE_ID == home.RESOURCE_ID)),
- Where=condition
- )
-
-
- @classproperty
- def _sharedInvitationBindForResourceID(cls):
- bind = cls._bindSchema
- return cls._bindInviteFor(
- (bind.RESOURCE_ID == Parameter("resourceID")).And
- (bind.BIND_MODE != _BIND_MODE_OWN)
- )
-
-
- @classproperty
- def _acceptedBindForHomeID(cls):
- bind = cls._bindSchema
- return cls._bindFor((bind.HOME_RESOURCE_ID == Parameter("homeID"))
- .And(bind.BIND_STATUS == _BIND_STATUS_ACCEPTED))
-
-
- @classproperty
- def _bindForResourceIDAndHomeID(cls):
- """
- DAL query that looks up home bind rows by home child
- resource ID and home resource ID.
- """
- bind = cls._bindSchema
- return cls._bindFor((bind.RESOURCE_ID == Parameter("resourceID"))
- .And(bind.HOME_RESOURCE_ID == Parameter("homeID")))
-
-
- @classproperty
- def _bindForExternalIDAndHomeID(cls):
- """
- DAL query that looks up home bind rows by home child
- resource ID and home resource ID.
- """
- bind = cls._bindSchema
- return cls._bindFor((bind.EXTERNAL_ID == Parameter("externalID"))
- .And(bind.HOME_RESOURCE_ID == Parameter("homeID")))
-
-
- @classproperty
- def _bindForNameAndHomeID(cls):
- """
- DAL query that looks up any bind rows by home child
- resource ID and home resource ID.
- """
- bind = cls._bindSchema
- return cls._bindFor((bind.RESOURCE_NAME == Parameter("name"))
- .And(bind.HOME_RESOURCE_ID == Parameter("homeID")))
-
-
- #
- # Higher level API
- #
- @inlineCallbacks
- def inviteUIDToShare(self, shareeUID, mode, summary=None, shareName=None):
- """
- Invite a user to share this collection - either create the share if it does not exist, or
- update the existing share with new values. Make sure a notification is sent as well.
-
- @param shareeUID: UID of the sharee
- @type shareeUID: C{str}
- @param mode: access mode
- @type mode: C{int}
- @param summary: share message
- @type summary: C{str}
- """
-
- # Look for existing invite and update its fields or create new one
- shareeView = yield self.shareeView(shareeUID)
- if shareeView is not None:
- status = _BIND_STATUS_INVITED if shareeView.shareStatus() in (_BIND_STATUS_DECLINED, _BIND_STATUS_INVALID) else None
- yield self.updateShare(shareeView, mode=mode, status=status, summary=summary)
- else:
- shareeView = yield self.createShare(shareeUID=shareeUID, mode=mode, summary=summary, shareName=shareName)
-
- # Check for external
- if shareeView.viewerHome().external():
- yield self._sendExternalInvite(shareeView)
- else:
- # Send invite notification
- yield self._sendInviteNotification(shareeView)
- returnValue(shareeView)
-
-
- @inlineCallbacks
- def directShareWithUser(self, shareeUID, shareName=None):
- """
- Create a direct share with the specified user. Note it is currently up to the app layer
- to enforce access control - this is not ideal as we really should have control of that in
- the store. Once we do, this api will need to verify that access is allowed for a direct share.
-
- NB no invitations are used with direct sharing.
-
- @param shareeUID: UID of the sharee
- @type shareeUID: C{str}
- """
-
- # Ignore if it already exists
- shareeView = yield self.shareeView(shareeUID)
- if shareeView is None:
- shareeView = yield self.createShare(shareeUID=shareeUID, mode=_BIND_MODE_DIRECT, shareName=shareName)
- yield shareeView.newShare()
-
- # Check for external
- if shareeView.viewerHome().external():
- yield self._sendExternalInvite(shareeView)
-
- returnValue(shareeView)
-
-
- @inlineCallbacks
- def uninviteUIDFromShare(self, shareeUID):
- """
- Remove a user from a share. Make sure a notification is sent as well.
-
- @param shareeUID: UID of the sharee
- @type shareeUID: C{str}
- """
- # Cancel invites - we'll just use whatever userid we are given
-
- shareeView = yield self.shareeView(shareeUID)
- if shareeView is not None:
- if shareeView.viewerHome().external():
- yield self._sendExternalUninvite(shareeView)
- else:
- # If current user state is accepted then we send an invite with the new state, otherwise
- # we cancel any existing invites for the user
- if not shareeView.direct():
- if shareeView.shareStatus() != _BIND_STATUS_ACCEPTED:
- yield self._removeInviteNotification(shareeView)
- else:
- yield self._sendInviteNotification(shareeView, notificationState=_BIND_STATUS_DELETED)
-
- # Remove the bind
- yield self.removeShare(shareeView)
-
-
- @inlineCallbacks
- def acceptShare(self, summary=None):
- """
- This share is being accepted.
- """
-
- if not self.direct() and self.shareStatus() != _BIND_STATUS_ACCEPTED:
- if self.external():
- yield self._replyExternalInvite(_BIND_STATUS_ACCEPTED, summary)
- ownerView = yield self.ownerView()
- yield ownerView.updateShare(self, status=_BIND_STATUS_ACCEPTED)
- yield self.newShare(displayname=summary)
- if not ownerView.external():
- yield self._sendReplyNotification(ownerView, summary)
-
-
- @inlineCallbacks
- def declineShare(self):
- """
- This share is being declined.
- """
-
- if not self.direct() and self.shareStatus() != _BIND_STATUS_DECLINED:
- if self.external():
- yield self._replyExternalInvite(_BIND_STATUS_DECLINED)
- ownerView = yield self.ownerView()
- yield ownerView.updateShare(self, status=_BIND_STATUS_DECLINED)
- if not ownerView.external():
- yield self._sendReplyNotification(ownerView)
-
-
- @inlineCallbacks
- def deleteShare(self):
- """
- This share is being deleted (by the sharee) - either decline or remove (for direct shares).
- """
-
- ownerView = yield self.ownerView()
- if self.direct():
- yield ownerView.removeShare(self)
- if ownerView.external():
- yield self._replyExternalInvite(_BIND_STATUS_DECLINED)
- else:
- yield self.declineShare()
-
-
- @inlineCallbacks
- def ownerDeleteShare(self):
- """
- This share is being deleted (by the owner) - either decline or remove (for direct shares).
- """
-
- # Change status on store object
- yield self.setShared(False)
-
- # Remove all sharees (direct and invited)
- for invitation in (yield self.sharingInvites()):
- yield self.uninviteUIDFromShare(invitation.shareeUID)
-
-
- def newShare(self, displayname=None):
- """
- Override in derived classes to do any specific operations needed when a share
- is first accepted.
- """
- return succeed(None)
-
-
- @inlineCallbacks
- def allInvitations(self):
- """
- Get list of all invitations (non-direct) to this object.
- """
- invitations = yield self.sharingInvites()
-
- # remove direct shares as those are not "real" invitations
- invitations = filter(lambda x: x.mode != _BIND_MODE_DIRECT, invitations)
- invitations.sort(key=lambda invitation: invitation.shareeUID)
- returnValue(invitations)
-
-
- @inlineCallbacks
- def _sendInviteNotification(self, shareeView, notificationState=None):
- """
- Called on the owner's resource.
- """
- # When deleting the message is the sharee's display name
- displayname = shareeView.shareMessage()
- if notificationState == _BIND_STATUS_DELETED:
- displayname = str(shareeView.properties().get(PropertyName.fromElement(element.DisplayName), displayname))
-
- notificationtype = {
- "notification-type": "invite-notification",
- "shared-type": shareeView.sharedResourceType(),
- }
- notificationdata = {
- "notification-type": "invite-notification",
- "shared-type": shareeView.sharedResourceType(),
- "dtstamp": DateTime.getNowUTC().getText(),
- "owner": shareeView.ownerHome().uid(),
- "sharee": shareeView.viewerHome().uid(),
- "uid": shareeView.shareUID(),
- "status": shareeView.shareStatus() if notificationState is None else notificationState,
- "access": (yield shareeView.effectiveShareMode()),
- "ownerName": self.shareName(),
- "summary": displayname,
- }
- if hasattr(self, "getSupportedComponents"):
- notificationdata["supported-components"] = self.getSupportedComponents()
-
- # Add to sharee's collection
- notifications = yield self._txn.notificationsWithUID(shareeView.viewerHome().uid())
- yield notifications.writeNotificationObject(shareeView.shareUID(), notificationtype, notificationdata)
-
-
- @inlineCallbacks
- def _sendReplyNotification(self, ownerView, summary=None):
- """
- Create a reply notification based on the current state of this shared resource.
- """
-
- # Generate invite XML
- notificationUID = "%s-reply" % (self.shareUID(),)
-
- notificationtype = {
- "notification-type": "invite-reply",
- "shared-type": self.sharedResourceType(),
- }
-
- notificationdata = {
- "notification-type": "invite-reply",
- "shared-type": self.sharedResourceType(),
- "dtstamp": DateTime.getNowUTC().getText(),
- "owner": self.ownerHome().uid(),
- "sharee": self.viewerHome().uid(),
- "status": self.shareStatus(),
- "ownerName": ownerView.shareName(),
- "in-reply-to": self.shareUID(),
- "summary": summary,
- }
-
- # Add to owner notification collection
- notifications = yield self._txn.notificationsWithUID(self.ownerHome().uid())
- yield notifications.writeNotificationObject(notificationUID, notificationtype, notificationdata)
-
-
- @inlineCallbacks
- def _removeInviteNotification(self, shareeView):
- """
- Called on the owner's resource.
- """
-
- # Remove from sharee's collection
- notifications = yield self._txn.notificationsWithUID(shareeView.viewerHome().uid())
- yield notifications.removeNotificationObjectWithUID(shareeView.shareUID())
-
-
- #
- # External/cross-pod API
- #
- @inlineCallbacks
- def _sendExternalInvite(self, shareeView):
-
- yield self._txn.store().conduit.send_shareinvite(
- self._txn,
- shareeView.ownerHome()._homeType,
- shareeView.ownerHome().uid(),
- self.id(),
- self.shareName(),
- shareeView.viewerHome().uid(),
- shareeView.shareUID(),
- shareeView.shareMode(),
- shareeView.shareMessage(),
- self.getInviteCopyProperties(),
- supported_components=self.getSupportedComponents() if hasattr(self, "getSupportedComponents") else None,
- )
-
-
- @inlineCallbacks
- def _sendExternalUninvite(self, shareeView):
-
- yield self._txn.store().conduit.send_shareuninvite(
- self._txn,
- shareeView.ownerHome()._homeType,
- shareeView.ownerHome().uid(),
- self.id(),
- shareeView.viewerHome().uid(),
- shareeView.shareUID(),
- )
-
-
- @inlineCallbacks
- def _replyExternalInvite(self, status, summary=None):
-
- yield self._txn.store().conduit.send_sharereply(
- self._txn,
- self.viewerHome()._homeType,
- self.ownerHome().uid(),
- self.viewerHome().uid(),
- self.shareUID(),
- status,
- summary,
- )
-
-
- #
- # Lower level API
- #
- @inlineCallbacks
- def ownerView(self):
- """
- Return the owner resource counterpart of this shared resource.
-
- Note we have to play a trick with the property store to coerce it to match
- the per-user properties for the owner.
- """
- # Get the child of the owner home that has the same resource id as the owned one
- ownerView = yield self.ownerHome().childWithID(self.id())
- returnValue(ownerView)
-
-
- @inlineCallbacks
- def shareeView(self, shareeUID):
- """
- Return the shared resource counterpart of this owned resource for the specified sharee.
-
- Note we have to play a trick with the property store to coerce it to match
- the per-user properties for the sharee.
- """
-
- # Never return the owner's own resource
- if self._home.uid() == shareeUID:
- returnValue(None)
-
- # Get the child of the sharee home that has the same resource id as the owned one
- shareeHome = yield self._txn.homeWithUID(self._home._homeType, shareeUID, authzUID=shareeUID)
- shareeView = (yield shareeHome.allChildWithID(self.id())) if shareeHome is not None else None
- returnValue(shareeView)
-
-
- @inlineCallbacks
- def shareWithUID(self, shareeUID, mode, status=None, summary=None, shareName=None):
- """
- Share this (owned) L{CommonHomeChild} with another principal.
-
- @param shareeUID: The UID of the sharee.
- @type: L{str}
-
- @param mode: The sharing mode; L{_BIND_MODE_READ} or
- L{_BIND_MODE_WRITE} or L{_BIND_MODE_DIRECT}
- @type mode: L{str}
-
- @param status: The sharing status; L{_BIND_STATUS_INVITED} or
- L{_BIND_STATUS_ACCEPTED}
- @type: L{str}
-
- @param summary: The proposed message to go along with the share, which
- will be used as the default display name.
- @type: L{str}
-
- @return: the name of the shared calendar in the new calendar home.
- @rtype: L{str}
- """
- shareeHome = yield self._txn.calendarHomeWithUID(shareeUID, create=True)
- returnValue(
- (yield self.shareWith(shareeHome, mode, status, summary, shareName))
- )
-
-
- @inlineCallbacks
- def shareWith(self, shareeHome, mode, status=None, summary=None, shareName=None):
- """
- Share this (owned) L{CommonHomeChild} with another home.
-
- @param shareeHome: The home of the sharee.
- @type: L{CommonHome}
-
- @param mode: The sharing mode; L{_BIND_MODE_READ} or
- L{_BIND_MODE_WRITE} or L{_BIND_MODE_DIRECT}
- @type: L{str}
-
- @param status: The sharing status; L{_BIND_STATUS_INVITED} or
- L{_BIND_STATUS_ACCEPTED}
- @type: L{str}
-
- @param summary: The proposed message to go along with the share, which
- will be used as the default display name.
- @type: L{str}
-
- @param shareName: The proposed name of the new share.
- @type: L{str}
-
- @return: the name of the shared calendar in the new calendar home.
- @rtype: L{str}
- """
-
- if status is None:
- status = _BIND_STATUS_ACCEPTED
-
- @inlineCallbacks
- def doInsert(subt):
- newName = shareName if shareName is not None else self.newShareName()
- yield self._bindInsertQuery.on(
- subt,
- homeID=shareeHome._resourceID,
- resourceID=self._resourceID,
- externalID=self._externalID,
- name=newName,
- mode=mode,
- bindStatus=status,
- message=summary
- )
- returnValue(newName)
- try:
- bindName = yield self._txn.subtransaction(doInsert)
- except AllRetriesFailed:
- # FIXME: catch more specific exception
- child = yield shareeHome.allChildWithID(self._resourceID)
- yield self.updateShare(
- child, mode=mode, status=status,
- summary=summary
- )
- bindName = child._name
- else:
- if status == _BIND_STATUS_ACCEPTED:
- shareeView = yield shareeHome.anyObjectWithShareUID(bindName)
- yield shareeView._initSyncToken()
- yield shareeView._initBindRevision()
-
- # Mark this as shared
- yield self.setShared(True)
-
- # Must send notification to ensure cache invalidation occurs
- yield self.notifyPropertyChanged()
- yield shareeHome.notifyChanged()
-
- returnValue(bindName)
-
-
- @inlineCallbacks
- def createShare(self, shareeUID, mode, summary=None, shareName=None):
- """
- Create a new shared resource. If the mode is direct, the share is created in accepted state,
- otherwise the share is created in invited state.
- """
- shareeHome = yield self._txn.homeWithUID(self.ownerHome()._homeType, shareeUID, create=True)
-
- yield self.shareWith(
- shareeHome,
- mode=mode,
- status=_BIND_STATUS_INVITED if mode != _BIND_MODE_DIRECT else _BIND_STATUS_ACCEPTED,
- summary=summary,
- shareName=shareName,
- )
- shareeView = yield self.shareeView(shareeUID)
- returnValue(shareeView)
-
-
- @inlineCallbacks
- def updateShare(self, shareeView, mode=None, status=None, summary=None):
- """
- Update share mode, status, and message for a home child shared with
- this (owned) L{CommonHomeChild}.
-
- @param shareeView: The sharee home child that shares this.
- @type shareeView: L{CommonHomeChild}
-
- @param mode: The sharing mode; L{_BIND_MODE_READ} or
- L{_BIND_MODE_WRITE} or None to not update
- @type mode: L{str}
-
- @param status: The sharing status; L{_BIND_STATUS_INVITED} or
- L{_BIND_STATUS_ACCEPTED} or L{_BIND_STATUS_DECLINED} or
- L{_BIND_STATUS_INVALID} or None to not update
- @type status: L{str}
-
- @param summary: The proposed message to go along with the share, which
- will be used as the default display name, or None to not update
- @type summary: L{str}
- """
- # TODO: raise a nice exception if shareeView is not, in fact, a shared
- # version of this same L{CommonHomeChild}
-
- # remove None parameters, and substitute None for empty string
- bind = self._bindSchema
- columnMap = {}
- if mode != None and mode != shareeView._bindMode:
- columnMap[bind.BIND_MODE] = mode
- if status != None and status != shareeView._bindStatus:
- columnMap[bind.BIND_STATUS] = status
- if summary != None and summary != shareeView._bindMessage:
- columnMap[bind.MESSAGE] = summary
-
- if columnMap:
-
- # Count accepted
- if bind.BIND_STATUS in columnMap:
- previouslyAcceptedCount = yield shareeView._previousAcceptCount()
-
- yield self._updateBindColumnsQuery(columnMap).on(
- self._txn,
- resourceID=self._resourceID, homeID=shareeView._home._resourceID
- )
-
- # Update affected attributes
- if bind.BIND_MODE in columnMap:
- shareeView._bindMode = columnMap[bind.BIND_MODE]
-
- if bind.BIND_STATUS in columnMap:
- shareeView._bindStatus = columnMap[bind.BIND_STATUS]
- yield shareeView._changedStatus(previouslyAcceptedCount)
-
- if bind.MESSAGE in columnMap:
- shareeView._bindMessage = columnMap[bind.MESSAGE]
-
- yield shareeView.invalidateQueryCache()
-
- # Must send notification to ensure cache invalidation occurs
- yield self.notifyPropertyChanged()
- yield shareeView.viewerHome().notifyChanged()
-
-
- def _previousAcceptCount(self):
- return succeed(1)
-
-
- @inlineCallbacks
- def _changedStatus(self, previouslyAcceptedCount):
- if self._bindStatus == _BIND_STATUS_ACCEPTED:
- yield self._initSyncToken()
- yield self._initBindRevision()
- self._home._children[self._name] = self
- self._home._children[self._resourceID] = self
- elif self._bindStatus in (_BIND_STATUS_INVITED, _BIND_STATUS_DECLINED):
- yield self._deletedSyncToken(sharedRemoval=True)
- self._home._children.pop(self._name, None)
- self._home._children.pop(self._resourceID, None)
-
-
- @inlineCallbacks
- def removeShare(self, shareeView):
- """
- Remove the shared version of this (owned) L{CommonHomeChild} from the
- referenced L{CommonHome}.
-
- @see: L{CommonHomeChild.shareWith}
-
- @param shareeView: The shared resource being removed.
-
- @return: a L{Deferred} which will fire with the previous shareUID
- """
-
- # remove sync tokens
- shareeHome = shareeView.viewerHome()
- yield shareeView._deletedSyncToken(sharedRemoval=True)
- shareeHome._children.pop(shareeView._name, None)
- shareeHome._children.pop(shareeView._resourceID, None)
-
- # Must send notification to ensure cache invalidation occurs
- yield self.notifyPropertyChanged()
- yield shareeHome.notifyChanged()
-
- # delete binds including invites
- yield self._deleteBindForResourceIDAndHomeID.on(
- self._txn,
- resourceID=self._resourceID,
- homeID=shareeHome._resourceID,
- )
-
- yield shareeView.invalidateQueryCache()
-
-
- @inlineCallbacks
- def unshare(self):
- """
- Unshares a collection, regardless of which "direction" it was shared.
- """
- if self.owned():
- # This collection may be shared to others
- invites = yield self.sharingInvites()
- for invite in invites:
- shareeView = yield self.shareeView(invite.shareeUID)
- yield self.removeShare(shareeView)
- else:
- # This collection is shared to me
- ownerView = yield self.ownerView()
- yield ownerView.removeShare(self)
-
-
- @inlineCallbacks
- def sharingInvites(self):
- """
- Retrieve the list of all L{SharingInvitation}'s for this L{CommonHomeChild}, irrespective of mode.
-
- @return: L{SharingInvitation} objects
- @rtype: a L{Deferred} which fires with a L{list} of L{SharingInvitation}s.
- """
- if not self.owned():
- returnValue([])
-
- # get all accepted binds
- invitedRows = yield self._sharedInvitationBindForResourceID.on(
- self._txn, resourceID=self._resourceID, homeID=self._home._resourceID
- )
-
- result = []
- for homeUID, homeRID, _ignore_resourceID, resourceName, bindMode, bindStatus, bindMessage in invitedRows:
- invite = SharingInvitation(
- resourceName,
- self.ownerHome().name(),
- self.ownerHome().id(),
- homeUID,
- homeRID,
- bindMode,
- bindStatus,
- bindMessage,
- )
- result.append(invite)
- returnValue(result)
-
-
- @inlineCallbacks
- def _initBindRevision(self):
- yield self.syncToken() # init self._syncTokenRevision if None
- self._bindRevision = self._syncTokenRevision
-
- bind = self._bindSchema
- yield self._updateBindColumnsQuery(
- {bind.BIND_REVISION : Parameter("revision"), }
- ).on(
- self._txn,
- revision=self._bindRevision,
- resourceID=self._resourceID,
- homeID=self.viewerHome()._resourceID,
- )
- yield self.invalidateQueryCache()
-
-
- def sharedResourceType(self):
- """
- The sharing resource type. Needs to be overridden by each type of resource that can be shared.
-
- @return: an identifier for the type of the share.
- @rtype: C{str}
- """
- return ""
-
-
- def newShareName(self):
- """
- Name used when creating a new share. By default this is a UUID.
- """
- return str(uuid4())
-
-
- def owned(self):
- """
- @see: L{ICalendar.owned}
- """
- return self._bindMode == _BIND_MODE_OWN
-
-
- def isShared(self):
- """
- For an owned collection indicate whether it is shared.
-
- @return: C{True} if shared, C{False} otherwise
- @rtype: C{bool}
- """
- return self.owned() and self._bindMessage == "shared"
-
-
- @inlineCallbacks
- def setShared(self, shared):
- """
- Set an owned collection to shared or unshared state. Technically this is not useful as "shared"
- really means it has invitees, but the current sharing spec supports a notion of a shared collection
- that has not yet had invitees added. For the time being we will support that option by using a new
- MESSAGE value to indicate an owned collection that is "shared".
-
- @param shared: whether or not the owned collection is "shared"
- @type shared: C{bool}
- """
- assert self.owned(), "Cannot change share mode on a shared collection"
-
- # Only if change is needed
- newMessage = "shared" if shared else None
- if self._bindMessage == newMessage:
- returnValue(None)
-
- self._bindMessage = newMessage
-
- bind = self._bindSchema
- yield Update(
- {bind.MESSAGE: self._bindMessage},
- Where=(bind.RESOURCE_ID == Parameter("resourceID")).And(
- bind.HOME_RESOURCE_ID == Parameter("homeID")),
- ).on(self._txn, resourceID=self._resourceID, homeID=self.viewerHome()._resourceID)
-
- yield self.invalidateQueryCache()
- yield self.notifyPropertyChanged()
-
-
- def direct(self):
- """
- Is this a "direct" share?
-
- @return: a boolean indicating whether it's direct.
- """
- return self._bindMode == _BIND_MODE_DIRECT
-
-
- def indirect(self):
- """
- Is this an "indirect" share?
-
- @return: a boolean indicating whether it's indirect.
- """
- return self._bindMode == _BIND_MODE_INDIRECT
-
-
- def shareUID(self):
- """
- @see: L{ICalendar.shareUID}
- """
- return self.name()
-
-
- def shareMode(self):
- """
- @see: L{ICalendar.shareMode}
- """
- return self._bindMode
-
-
- def _effectiveShareMode(self, bindMode, viewerUID, txn):
- """
- Get the effective share mode without a calendar object
- """
- return bindMode
-
-
- def effectiveShareMode(self):
- """
- @see: L{ICalendar.shareMode}
- """
- return self._bindMode
-
-
- def shareName(self):
- """
- This is a path like name for the resource within the home being shared. For object resource
- shares this will be a combination of the L{CommonHomeChild} name and the L{CommonObjecrResource}
- name. Otherwise it is just the L{CommonHomeChild} name. This is needed to expose a value to the
- app-layer such that it can construct a URI for the actual WebDAV resource being shared.
- """
- name = self.name()
- if self.sharedResourceType() == "group":
- name = self.parentCollection().name() + "/" + name
- return name
-
-
- def shareStatus(self):
- """
- @see: L{ICalendar.shareStatus}
- """
- return self._bindStatus
-
-
- def accepted(self):
- """
- @see: L{ICalendar.shareStatus}
- """
- return self._bindStatus == _BIND_STATUS_ACCEPTED
-
-
- def shareMessage(self):
- """
- @see: L{ICalendar.shareMessage}
- """
- return self._bindMessage
-
-
- def getInviteCopyProperties(self):
- """
- Get a dictionary of property name/values (as strings) for properties that are shadowable and
- need to be copied to a sharee's collection when an external (cross-pod) share is created.
- Sub-classes should override to expose the properties they care about.
- """
- return {}
-
-
- def setInviteCopyProperties(self, props):
- """
- Copy a set of shadowable properties (as name/value strings) onto this shared resource when
- a cross-pod invite is processed. Sub-classes should override to expose the properties they
- care about.
- """
- pass
-
-
- @classmethod
- def metadataColumns(cls):
- """
- Return a list of column name for retrieval of metadata. This allows
- different child classes to have their own type specific data, but still make use of the
- common base logic.
- """
-
- # Common behavior is to have created and modified
-
- return (
- cls._homeChildMetaDataSchema.CREATED,
- cls._homeChildMetaDataSchema.MODIFIED,
- )
-
-
- @classmethod
- def metadataAttributes(cls):
- """
- Return a list of attribute names for retrieval of metadata. This allows
- different child classes to have their own type specific data, but still make use of the
- common base logic.
- """
-
- # Common behavior is to have created and modified
-
- return (
- "_created",
- "_modified",
- )
-
-
- @classmethod
- def bindColumns(cls):
- """
- Return a list of column names for retrieval during creation. This allows
- different child classes to have their own type specific data, but still make use of the
- common base logic.
- """
-
- return (
- cls._bindSchema.BIND_MODE,
- cls._bindSchema.HOME_RESOURCE_ID,
- cls._bindSchema.RESOURCE_ID,
- cls._bindSchema.EXTERNAL_ID,
- cls._bindSchema.RESOURCE_NAME,
- cls._bindSchema.BIND_STATUS,
- cls._bindSchema.BIND_REVISION,
- cls._bindSchema.MESSAGE
- )
-
-
- @classmethod
- def bindAttributes(cls):
- """
- Return a list of column names for retrieval during creation. This allows
- different child classes to have their own type specific data, but still make use of the
- common base logic.
- """
-
- return (
- "_bindMode",
- "_homeResourceID",
- "_resourceID",
- "_externalID",
- "_name",
- "_bindStatus",
- "_bindRevision",
- "_bindMessage",
- )
-
- bindColumnCount = 8
-
- @classmethod
- def additionalBindColumns(cls):
- """
- Return a list of column names for retrieval during creation. This allows
- different child classes to have their own type specific data, but still make use of the
- common base logic.
- """
-
- return ()
-
-
- @classmethod
- def additionalBindAttributes(cls):
- """
- Return a list of attribute names for retrieval of during creation. This allows
- different child classes to have their own type specific data, but still make use of the
- common base logic.
- """
-
- return ()
-
-
- @classproperty
- def _childrenAndMetadataForHomeID(cls):
- bind = cls._bindSchema
- child = cls._homeChildSchema
- childMetaData = cls._homeChildMetaDataSchema
-
- columns = cls.bindColumns() + cls.additionalBindColumns() + cls.metadataColumns()
- return Select(
- columns,
- From=child.join(
- bind, child.RESOURCE_ID == bind.RESOURCE_ID,
- 'left outer').join(
- childMetaData, childMetaData.RESOURCE_ID == bind.RESOURCE_ID,
- 'left outer'),
- Where=(bind.HOME_RESOURCE_ID == Parameter("homeID")).And(
- bind.BIND_STATUS == _BIND_STATUS_ACCEPTED)
- )
-
-
- @classmethod
- def _revisionsForResourceIDs(cls, resourceIDs):
- rev = cls._revisionsSchema
- return Select(
- [rev.RESOURCE_ID, Max(rev.REVISION)],
- From=rev,
- Where=rev.RESOURCE_ID.In(Parameter("resourceIDs", len(resourceIDs))).And(
- (rev.RESOURCE_NAME != None).Or(rev.DELETED == False)),
- GroupBy=rev.RESOURCE_ID
- )
-
-
- @inlineCallbacks
- def invalidateQueryCache(self):
- queryCacher = self._txn._queryCacher
- if queryCacher is not None:
- yield queryCacher.invalidateAfterCommit(self._txn, queryCacher.keyForHomeChildMetaData(self._resourceID))
- yield queryCacher.invalidateAfterCommit(self._txn, queryCacher.keyForObjectWithName(self._home._resourceID, self._name))
- yield queryCacher.invalidateAfterCommit(self._txn, queryCacher.keyForObjectWithResourceID(self._home._resourceID, self._resourceID))
- yield queryCacher.invalidateAfterCommit(self._txn, queryCacher.keyForObjectWithExternalID(self._home._resourceID, self._externalID))
-
-
-
class CommonHomeChild(FancyEqMixin, Memoizable, _SharedSyncLogic, HomeChildBase, SharingMixIn):
"""
Common ancestor class of AddressBooks and Calendars.
@@ -6306,1048 +4655,3 @@
raise ConcurrentModification()
else:
returnValue(self._textData)
-
-
-
-class NotificationCollection(FancyEqMixin, _SharedSyncLogic):
- log = Logger()
-
- implements(INotificationCollection)
-
- compareAttributes = (
- "_uid",
- "_resourceID",
- )
-
- _revisionsSchema = schema.NOTIFICATION_OBJECT_REVISIONS
- _homeSchema = schema.NOTIFICATION_HOME
-
-
- def __init__(self, txn, uid, resourceID):
-
- self._txn = txn
- self._uid = uid
- self._resourceID = resourceID
- self._dataVersion = None
- self._notifications = {}
- self._notificationNames = None
- self._syncTokenRevision = None
-
- # Make sure we have push notifications setup to push on this collection
- # as well as the home it is in
- self._notifiers = dict([(factory_name, factory.newNotifier(self),) for factory_name, factory in txn._notifierFactories.items()])
-
- _resourceIDFromUIDQuery = Select(
- [_homeSchema.RESOURCE_ID], From=_homeSchema,
- Where=_homeSchema.OWNER_UID == Parameter("uid"))
-
- _UIDFromResourceIDQuery = Select(
- [_homeSchema.OWNER_UID], From=_homeSchema,
- Where=_homeSchema.RESOURCE_ID == Parameter("rid"))
-
- _provisionNewNotificationsQuery = Insert(
- {_homeSchema.OWNER_UID: Parameter("uid")},
- Return=_homeSchema.RESOURCE_ID
- )
-
-
- @property
- def _home(self):
- """
- L{NotificationCollection} serves as its own C{_home} for the purposes of
- working with L{_SharedSyncLogic}.
- """
- return self
-
-
- @classmethod
- @inlineCallbacks
- def notificationsWithUID(cls, txn, uid, create):
- """
- @param uid: I'm going to assume uid is utf-8 encoded bytes
- """
- rows = yield cls._resourceIDFromUIDQuery.on(txn, uid=uid)
-
- if rows:
- resourceID = rows[0][0]
- created = False
- elif create:
- # Determine if the user is local or external
- record = yield txn.directoryService().recordWithUID(uid.decode("utf-8"))
- if record is None:
- raise DirectoryRecordNotFoundError("Cannot create home for UID since no directory record exists: {}".format(uid))
-
- state = _HOME_STATUS_NORMAL if record.thisServer() else _HOME_STATUS_EXTERNAL
- if state == _HOME_STATUS_EXTERNAL:
- raise RecordNotAllowedError("Cannot store notifications for external user: {}".format(uid))
-
- # Use savepoint so we can do a partial rollback if there is a race
- # condition where this row has already been inserted
- savepoint = SavepointAction("notificationsWithUID")
- yield savepoint.acquire(txn)
-
- try:
- resourceID = str((
- yield cls._provisionNewNotificationsQuery.on(txn, uid=uid)
- )[0][0])
- except Exception:
- # FIXME: Really want to trap the pg.DatabaseError but in a non-
- # DB specific manner
- yield savepoint.rollback(txn)
-
- # Retry the query - row may exist now, if not re-raise
- rows = yield cls._resourceIDFromUIDQuery.on(txn, uid=uid)
- if rows:
- resourceID = rows[0][0]
- created = False
- else:
- raise
- else:
- created = True
- yield savepoint.release(txn)
- else:
- returnValue(None)
- collection = cls(txn, uid, resourceID)
- yield collection._loadPropertyStore()
- if created:
- yield collection._initSyncToken()
- yield collection.notifyChanged()
- returnValue(collection)
-
-
- @classmethod
- @inlineCallbacks
- def notificationsWithResourceID(cls, txn, rid):
- rows = yield cls._UIDFromResourceIDQuery.on(txn, rid=rid)
-
- if rows:
- uid = rows[0][0]
- result = (yield cls.notificationsWithUID(txn, uid, create=False))
- returnValue(result)
- else:
- returnValue(None)
-
-
- @inlineCallbacks
- def _loadPropertyStore(self):
- self._propertyStore = yield PropertyStore.load(
- self._uid,
- self._uid,
- None,
- self._txn,
- self._resourceID,
- notifyCallback=self.notifyChanged
- )
-
-
- def __repr__(self):
- return "<%s: %s>" % (self.__class__.__name__, self._resourceID)
-
-
- def id(self):
- """
- Retrieve the store identifier for this collection.
-
- @return: store identifier.
- @rtype: C{int}
- """
- return self._resourceID
-
-
- @classproperty
- def _dataVersionQuery(cls):
- nh = cls._homeSchema
- return Select(
- [nh.DATAVERSION], From=nh,
- Where=nh.RESOURCE_ID == Parameter("resourceID")
- )
-
-
- @inlineCallbacks
- def dataVersion(self):
- if self._dataVersion is None:
- self._dataVersion = (yield self._dataVersionQuery.on(
- self._txn, resourceID=self._resourceID))[0][0]
- returnValue(self._dataVersion)
-
-
- def name(self):
- return "notification"
-
-
- def uid(self):
- return self._uid
-
-
- def owned(self):
- return True
-
-
- def ownerHome(self):
- return self._home
-
-
- def viewerHome(self):
- return self._home
-
-
- @inlineCallbacks
- def notificationObjects(self):
- results = (yield NotificationObject.loadAllObjects(self))
- for result in results:
- self._notifications[result.uid()] = result
- self._notificationNames = sorted([result.name() for result in results])
- returnValue(results)
-
- _notificationUIDsForHomeQuery = Select(
- [schema.NOTIFICATION.NOTIFICATION_UID], From=schema.NOTIFICATION,
- Where=schema.NOTIFICATION.NOTIFICATION_HOME_RESOURCE_ID ==
- Parameter("resourceID"))
-
-
- @inlineCallbacks
- def listNotificationObjects(self):
- if self._notificationNames is None:
- rows = yield self._notificationUIDsForHomeQuery.on(
- self._txn, resourceID=self._resourceID)
- self._notificationNames = sorted([row[0] for row in rows])
- returnValue(self._notificationNames)
-
-
- # used by _SharedSyncLogic.resourceNamesSinceRevision()
- def listObjectResources(self):
- return self.listNotificationObjects()
-
-
- def _nameToUID(self, name):
- """
- Based on the file-backed implementation, the 'name' is just uid +
- ".xml".
- """
- return name.rsplit(".", 1)[0]
-
-
- def notificationObjectWithName(self, name):
- return self.notificationObjectWithUID(self._nameToUID(name))
-
-
- @memoizedKey("uid", "_notifications")
- @inlineCallbacks
- def notificationObjectWithUID(self, uid):
- """
- Create an empty notification object first then have it initialize itself
- from the store.
- """
- no = NotificationObject(self, uid)
- no = (yield no.initFromStore())
- returnValue(no)
-
-
- @inlineCallbacks
- def writeNotificationObject(self, uid, notificationtype, notificationdata):
-
- inserting = False
- notificationObject = yield self.notificationObjectWithUID(uid)
- if notificationObject is None:
- notificationObject = NotificationObject(self, uid)
- inserting = True
- yield notificationObject.setData(uid, notificationtype, notificationdata, inserting=inserting)
- if inserting:
- yield self._insertRevision("%s.xml" % (uid,))
- if self._notificationNames is not None:
- self._notificationNames.append(notificationObject.uid())
- else:
- yield self._updateRevision("%s.xml" % (uid,))
- yield self.notifyChanged()
-
-
- def removeNotificationObjectWithName(self, name):
- if self._notificationNames is not None:
- self._notificationNames.remove(self._nameToUID(name))
- return self.removeNotificationObjectWithUID(self._nameToUID(name))
-
- _removeByUIDQuery = Delete(
- From=schema.NOTIFICATION,
- Where=(schema.NOTIFICATION.NOTIFICATION_UID == Parameter("uid")).And(
- schema.NOTIFICATION.NOTIFICATION_HOME_RESOURCE_ID
- == Parameter("resourceID")))
-
-
- @inlineCallbacks
- def removeNotificationObjectWithUID(self, uid):
- yield self._removeByUIDQuery.on(
- self._txn, uid=uid, resourceID=self._resourceID)
- self._notifications.pop(uid, None)
- yield self._deleteRevision("%s.xml" % (uid,))
- yield self.notifyChanged()
-
- _initSyncTokenQuery = Insert(
- {
- _revisionsSchema.HOME_RESOURCE_ID : Parameter("resourceID"),
- _revisionsSchema.RESOURCE_NAME : None,
- _revisionsSchema.REVISION : schema.REVISION_SEQ,
- _revisionsSchema.DELETED : False
- }, Return=_revisionsSchema.REVISION
- )
-
-
- @inlineCallbacks
- def _initSyncToken(self):
- self._syncTokenRevision = (yield self._initSyncTokenQuery.on(
- self._txn, resourceID=self._resourceID))[0][0]
-
- _syncTokenQuery = Select(
- [Max(_revisionsSchema.REVISION)], From=_revisionsSchema,
- Where=_revisionsSchema.HOME_RESOURCE_ID == Parameter("resourceID")
- )
-
-
- @inlineCallbacks
- def syncToken(self):
- if self._syncTokenRevision is None:
- self._syncTokenRevision = yield self.syncTokenRevision()
- returnValue("%s_%s" % (self._resourceID, self._syncTokenRevision))
-
-
- @inlineCallbacks
- def syncTokenRevision(self):
- revision = (yield self._syncTokenQuery.on(self._txn, resourceID=self._resourceID))[0][0]
- if revision is None:
- revision = int((yield self._txn.calendarserverValue("MIN-VALID-REVISION")))
- returnValue(revision)
-
-
- def properties(self):
- return self._propertyStore
-
-
- def addNotifier(self, factory_name, notifier):
- if self._notifiers is None:
- self._notifiers = {}
- self._notifiers[factory_name] = notifier
-
-
- def getNotifier(self, factory_name):
- return self._notifiers.get(factory_name)
-
-
- def notifierID(self):
- return (self._txn._homeClass[self._txn._primaryHomeType]._notifierPrefix, "%s/notification" % (self.ownerHome().uid(),),)
-
-
- def parentNotifierID(self):
- return (self._txn._homeClass[self._txn._primaryHomeType]._notifierPrefix, "%s" % (self.ownerHome().uid(),),)
-
-
- @inlineCallbacks
- def notifyChanged(self, category=ChangeCategory.default):
- """
- Send notifications, change sync token and bump last modified because
- the resource has changed. We ensure we only do this once per object
- per transaction.
- """
- if self._txn.isNotifiedAlready(self):
- returnValue(None)
- self._txn.notificationAddedForObject(self)
-
- # Send notifications
- if self._notifiers:
- # cache notifiers run in post commit
- notifier = self._notifiers.get("cache", None)
- if notifier:
- self._txn.postCommit(notifier.notify)
- # push notifiers add their work items immediately
- notifier = self._notifiers.get("push", None)
- if notifier:
- yield notifier.notify(self._txn, priority=category.value)
-
- returnValue(None)
-
-
- @classproperty
- def _completelyNewRevisionQuery(cls):
- rev = cls._revisionsSchema
- return Insert({rev.HOME_RESOURCE_ID: Parameter("homeID"),
- # rev.RESOURCE_ID: Parameter("resourceID"),
- rev.RESOURCE_NAME: Parameter("name"),
- rev.REVISION: schema.REVISION_SEQ,
- rev.DELETED: False},
- Return=rev.REVISION)
-
-
- def _maybeNotify(self):
- """
- Emit a push notification after C{_changeRevision}.
- """
- return self.notifyChanged()
-
-
- @inlineCallbacks
- def remove(self):
- """
- Remove DB rows corresponding to this notification home.
- """
- # Delete NOTIFICATION rows
- no = schema.NOTIFICATION
- kwds = {"ResourceID": self._resourceID}
- yield Delete(
- From=no,
- Where=(
- no.NOTIFICATION_HOME_RESOURCE_ID == Parameter("ResourceID")
- ),
- ).on(self._txn, **kwds)
-
- # Delete NOTIFICATION_HOME (will cascade to NOTIFICATION_OBJECT_REVISIONS)
- nh = schema.NOTIFICATION_HOME
- yield Delete(
- From=nh,
- Where=(
- nh.RESOURCE_ID == Parameter("ResourceID")
- ),
- ).on(self._txn, **kwds)
-
-
-
-class NotificationObject(FancyEqMixin, object):
- """
- This used to store XML data and an XML element for the type. But we are now switching it
- to use JSON internally. The app layer will convert that to XML and fill in the "blanks" as
- needed for the app.
- """
- log = Logger()
-
- implements(INotificationObject)
-
- compareAttributes = (
- "_resourceID",
- "_home",
- )
-
- _objectSchema = schema.NOTIFICATION
-
- def __init__(self, home, uid):
- self._home = home
- self._resourceID = None
- self._uid = uid
- self._md5 = None
- self._size = None
- self._created = None
- self._modified = None
- self._notificationType = None
- self._notificationData = None
-
-
- def __repr__(self):
- return "<%s: %s>" % (self.__class__.__name__, self._resourceID)
-
-
- @classproperty
- def _allColumnsByHomeIDQuery(cls):
- """
- DAL query to load all columns by home ID.
- """
- obj = cls._objectSchema
- return Select(
- [obj.RESOURCE_ID, obj.NOTIFICATION_UID, obj.MD5,
- Len(obj.NOTIFICATION_DATA), obj.NOTIFICATION_TYPE, obj.CREATED, obj.MODIFIED],
- From=obj,
- Where=(obj.NOTIFICATION_HOME_RESOURCE_ID == Parameter("homeID"))
- )
-
-
- @classmethod
- @inlineCallbacks
- def loadAllObjects(cls, parent):
- """
- Load all child objects and return a list of them. This must create the
- child classes and initialize them using "batched" SQL operations to keep
- this constant wrt the number of children. This is an optimization for
- Depth:1 operations on the collection.
- """
-
- results = []
-
- # Load from the main table first
- dataRows = (
- yield cls._allColumnsByHomeIDQuery.on(parent._txn,
- homeID=parent._resourceID))
-
- if dataRows:
- # Get property stores for all these child resources (if any found)
- propertyStores = (yield PropertyStore.forMultipleResources(
- parent.uid(),
- None,
- None,
- parent._txn,
- schema.NOTIFICATION.RESOURCE_ID,
- schema.NOTIFICATION.NOTIFICATION_HOME_RESOURCE_ID,
- parent._resourceID,
- ))
-
- # Create the actual objects merging in properties
- for row in dataRows:
- child = cls(parent, None)
- (child._resourceID,
- child._uid,
- child._md5,
- child._size,
- child._notificationType,
- child._created,
- child._modified,) = tuple(row)
- try:
- child._notificationType = json.loads(child._notificationType)
- except ValueError:
- pass
- if isinstance(child._notificationType, unicode):
- child._notificationType = child._notificationType.encode("utf-8")
- child._loadPropertyStore(
- props=propertyStores.get(child._resourceID, None)
- )
- results.append(child)
-
- returnValue(results)
-
-
- @classproperty
- def _oneNotificationQuery(cls):
- no = cls._objectSchema
- return Select(
- [
- no.RESOURCE_ID,
- no.MD5,
- Len(no.NOTIFICATION_DATA),
- no.NOTIFICATION_TYPE,
- no.CREATED,
- no.MODIFIED
- ],
- From=no,
- Where=(no.NOTIFICATION_UID ==
- Parameter("uid")).And(no.NOTIFICATION_HOME_RESOURCE_ID ==
- Parameter("homeID")))
-
-
- @inlineCallbacks
- def initFromStore(self):
- """
- Initialise this object from the store, based on its UID and home
- resource ID. We read in and cache all the extra metadata from the DB to
- avoid having to do DB queries for those individually later.
-
- @return: L{self} if object exists in the DB, else C{None}
- """
- rows = (yield self._oneNotificationQuery.on(
- self._txn, uid=self._uid, homeID=self._home._resourceID))
- if rows:
- (self._resourceID,
- self._md5,
- self._size,
- self._notificationType,
- self._created,
- self._modified,) = tuple(rows[0])
- try:
- self._notificationType = json.loads(self._notificationType)
- except ValueError:
- pass
- if isinstance(self._notificationType, unicode):
- self._notificationType = self._notificationType.encode("utf-8")
- self._loadPropertyStore()
- returnValue(self)
- else:
- returnValue(None)
-
-
- def _loadPropertyStore(self, props=None, created=False):
- if props is None:
- props = NonePropertyStore(self._home.uid())
- self._propertyStore = props
-
-
- def properties(self):
- return self._propertyStore
-
-
- def id(self):
- """
- Retrieve the store identifier for this object.
-
- @return: store identifier.
- @rtype: C{int}
- """
- return self._resourceID
-
-
- @property
- def _txn(self):
- return self._home._txn
-
-
- def notificationCollection(self):
- return self._home
-
-
- def uid(self):
- return self._uid
-
-
- def name(self):
- return self.uid() + ".xml"
-
-
- @classproperty
- def _newNotificationQuery(cls):
- no = cls._objectSchema
- return Insert(
- {
- no.NOTIFICATION_HOME_RESOURCE_ID: Parameter("homeID"),
- no.NOTIFICATION_UID: Parameter("uid"),
- no.NOTIFICATION_TYPE: Parameter("notificationType"),
- no.NOTIFICATION_DATA: Parameter("notificationData"),
- no.MD5: Parameter("md5"),
- },
- Return=[no.RESOURCE_ID, no.CREATED, no.MODIFIED]
- )
-
-
- @classproperty
- def _updateNotificationQuery(cls):
- no = cls._objectSchema
- return Update(
- {
- no.NOTIFICATION_TYPE: Parameter("notificationType"),
- no.NOTIFICATION_DATA: Parameter("notificationData"),
- no.MD5: Parameter("md5"),
- },
- Where=(no.NOTIFICATION_HOME_RESOURCE_ID == Parameter("homeID")).And(
- no.NOTIFICATION_UID == Parameter("uid")),
- Return=no.MODIFIED
- )
-
-
- @inlineCallbacks
- def setData(self, uid, notificationtype, notificationdata, inserting=False):
- """
- Set the object resource data and update and cached metadata.
- """
-
- notificationtext = json.dumps(notificationdata)
- self._notificationType = notificationtype
- self._md5 = hashlib.md5(notificationtext).hexdigest()
- self._size = len(notificationtext)
- if inserting:
- rows = yield self._newNotificationQuery.on(
- self._txn, homeID=self._home._resourceID, uid=uid,
- notificationType=json.dumps(self._notificationType),
- notificationData=notificationtext, md5=self._md5
- )
- self._resourceID, self._created, self._modified = rows[0]
- self._loadPropertyStore()
- else:
- rows = yield self._updateNotificationQuery.on(
- self._txn, homeID=self._home._resourceID, uid=uid,
- notificationType=json.dumps(self._notificationType),
- notificationData=notificationtext, md5=self._md5
- )
- self._modified = rows[0][0]
- self._notificationData = notificationdata
-
- _notificationDataFromID = Select(
- [_objectSchema.NOTIFICATION_DATA], From=_objectSchema,
- Where=_objectSchema.RESOURCE_ID == Parameter("resourceID"))
-
-
- @inlineCallbacks
- def notificationData(self):
- if self._notificationData is None:
- self._notificationData = (yield self._notificationDataFromID.on(self._txn, resourceID=self._resourceID))[0][0]
- try:
- self._notificationData = json.loads(self._notificationData)
- except ValueError:
- pass
- if isinstance(self._notificationData, unicode):
- self._notificationData = self._notificationData.encode("utf-8")
- returnValue(self._notificationData)
-
-
- def contentType(self):
- """
- The content type of NotificationObjects is text/xml.
- """
- return MimeType.fromString("text/xml")
-
-
- def md5(self):
- return self._md5
-
-
- def size(self):
- return self._size
-
-
- def notificationType(self):
- return self._notificationType
-
-
- def created(self):
- return datetimeMktime(parseSQLTimestamp(self._created))
-
-
- def modified(self):
- return datetimeMktime(parseSQLTimestamp(self._modified))
-
-
-
-def determineNewest(uid, homeType):
- """
- Construct a query to determine the modification time of the newest object
- in a given home.
-
- @param uid: the UID of the home to scan.
- @type uid: C{str}
-
- @param homeType: The type of home to scan; C{ECALENDARTYPE},
- C{ENOTIFICATIONTYPE}, or C{EADDRESSBOOKTYPE}.
- @type homeType: C{int}
-
- @return: A select query that will return a single row containing a single
- column which is the maximum value.
- @rtype: L{Select}
- """
- if homeType == ENOTIFICATIONTYPE:
- return Select(
- [Max(schema.NOTIFICATION.MODIFIED)],
- From=schema.NOTIFICATION_HOME.join(
- schema.NOTIFICATION,
- on=schema.NOTIFICATION_HOME.RESOURCE_ID ==
- schema.NOTIFICATION.NOTIFICATION_HOME_RESOURCE_ID),
- Where=schema.NOTIFICATION_HOME.OWNER_UID == uid
- )
- homeTypeName = {ECALENDARTYPE: "CALENDAR",
- EADDRESSBOOKTYPE: "ADDRESSBOOK"}[homeType]
- home = getattr(schema, homeTypeName + "_HOME")
- bind = getattr(schema, homeTypeName + "_BIND")
- child = getattr(schema, homeTypeName)
- obj = getattr(schema, homeTypeName + "_OBJECT")
- return Select(
- [Max(obj.MODIFIED)],
- From=home.join(bind, on=bind.HOME_RESOURCE_ID == home.RESOURCE_ID).join(
- child, on=child.RESOURCE_ID == bind.RESOURCE_ID).join(
- obj, on=obj.PARENT_RESOURCE_ID == child.RESOURCE_ID),
- Where=(bind.BIND_MODE == 0).And(home.OWNER_UID == uid)
- )
-
-
-
- at inlineCallbacks
-def mergeHomes(sqlTxn, one, other, homeType):
- """
- Merge two homes together. This determines which of C{one} or C{two} is
- newer - that is, has been modified more recently - and pulls all the data
- from the older into the newer home. Then, it changes the UID of the old
- home to its UID, normalized and prefixed with "old.", and then re-names the
- new home to its name, normalized.
-
- Because the UIDs of both homes have changed, B{both one and two will be
- invalid to all other callers from the start of the invocation of this
- function}.
-
- @param sqlTxn: the transaction to use
- @type sqlTxn: A L{CommonTransaction}
-
- @param one: A calendar home.
- @type one: L{ICalendarHome}
-
- @param two: Another, different calendar home.
- @type two: L{ICalendarHome}
-
- @param homeType: The type of home to scan; L{ECALENDARTYPE} or
- L{EADDRESSBOOKTYPE}.
- @type homeType: C{int}
-
- @return: a L{Deferred} which fires with with the newer of C{one} or C{two},
- into which the data from the other home has been merged, when the merge
- is complete.
- """
- from txdav.caldav.datastore.util import migrateHome as migrateCalendarHome
- from txdav.carddav.datastore.util import migrateHome as migrateABHome
- migrateHome = {EADDRESSBOOKTYPE: migrateABHome,
- ECALENDARTYPE: migrateCalendarHome,
- ENOTIFICATIONTYPE: _dontBotherWithNotifications}[homeType]
- homeTable = {EADDRESSBOOKTYPE: schema.ADDRESSBOOK_HOME,
- ECALENDARTYPE: schema.CALENDAR_HOME,
- ENOTIFICATIONTYPE: schema.NOTIFICATION_HOME}[homeType]
- both = []
- both.append([one,
- (yield determineNewest(one.uid(), homeType).on(sqlTxn))])
- both.append([other,
- (yield determineNewest(other.uid(), homeType).on(sqlTxn))])
- both.sort(key=lambda x: x[1])
-
- older = both[0][0]
- newer = both[1][0]
- yield migrateHome(older, newer, merge=True)
- # Rename the old one to 'old.<correct-guid>'
- newNormalized = normalizeUUIDOrNot(newer.uid())
- oldNormalized = normalizeUUIDOrNot(older.uid())
- yield _renameHome(sqlTxn, homeTable, older.uid(), "old." + oldNormalized)
- # Rename the new one to '<correct-guid>'
- if newer.uid() != newNormalized:
- yield _renameHome(sqlTxn, homeTable, newer.uid(), newNormalized)
- yield returnValue(newer)
-
-
-
-def _renameHome(txn, table, oldUID, newUID):
- """
- Rename a calendar, addressbook, or notification home. Note that this
- function is only safe in transactions that have had caching disabled, and
- more specifically should only ever be used during upgrades. Running this
- in a normal transaction will have unpredictable consequences, especially
- with respect to memcache.
-
- @param txn: an SQL transaction to use for this update
- @type txn: L{twext.enterprise.ienterprise.IAsyncTransaction}
-
- @param table: the storage table of the desired home type
- @type table: L{TableSyntax}
-
- @param oldUID: the old UID, the existing home's UID
- @type oldUID: L{str}
-
- @param newUID: the new UID, to change the UID to
- @type newUID: L{str}
-
- @return: a L{Deferred} which fires when the home is renamed.
- """
- return Update({table.OWNER_UID: newUID},
- Where=table.OWNER_UID == oldUID).on(txn)
-
-
-
-def _dontBotherWithNotifications(older, newer, merge):
- """
- Notifications are more transient and can be easily worked around; don't
- bother to migrate all of them when there is a UUID case mismatch.
- """
- pass
-
-
-
- at inlineCallbacks
-def _normalizeHomeUUIDsIn(t, homeType):
- """
- Normalize the UUIDs in the given L{txdav.common.datastore.CommonStore}.
-
- This changes the case of the UUIDs in the calendar home.
-
- @param t: the transaction to normalize all the UUIDs in.
- @type t: L{CommonStoreTransaction}
-
- @param homeType: The type of home to scan, L{ECALENDARTYPE},
- L{EADDRESSBOOKTYPE}, or L{ENOTIFICATIONTYPE}.
- @type homeType: C{int}
-
- @return: a L{Deferred} which fires with C{None} when the UUID normalization
- is complete.
- """
- from txdav.caldav.datastore.util import fixOneCalendarHome
- homeTable = {EADDRESSBOOKTYPE: schema.ADDRESSBOOK_HOME,
- ECALENDARTYPE: schema.CALENDAR_HOME,
- ENOTIFICATIONTYPE: schema.NOTIFICATION_HOME}[homeType]
- homeTypeName = homeTable.model.name.split("_")[0]
-
- allUIDs = yield Select([homeTable.OWNER_UID],
- From=homeTable,
- OrderBy=homeTable.OWNER_UID).on(t)
- total = len(allUIDs)
- allElapsed = []
- for n, [UID] in enumerate(allUIDs):
- start = time.time()
- if allElapsed:
- estimate = "%0.3d" % ((sum(allElapsed) / len(allElapsed)) *
- total - n)
- else:
- estimate = "unknown"
- log.info(
- "Scanning UID {uid} [{homeType}] "
- "({pct!0.2d}%, {estimate} seconds remaining)...",
- uid=UID, pct=(n / float(total)) * 100, estimate=estimate,
- homeType=homeTypeName
- )
- other = None
- this = yield _getHome(t, homeType, UID)
- if homeType == ECALENDARTYPE:
- fixedThisHome = yield fixOneCalendarHome(this)
- else:
- fixedThisHome = 0
- fixedOtherHome = 0
- if this is None:
- log.info(
- "{uid!r} appears to be missing, already processed", uid=UID
- )
- try:
- uuidobj = UUID(UID)
- except ValueError:
- pass
- else:
- newname = str(uuidobj).upper()
- if UID != newname:
- log.info(
- "Detected case variance: {uid} {newuid}[{homeType}]",
- uid=UID, newuid=newname, homeType=homeTypeName
- )
- other = yield _getHome(t, homeType, newname)
- if other is None:
- # No duplicate: just fix the name.
- yield _renameHome(t, homeTable, UID, newname)
- else:
- if homeType == ECALENDARTYPE:
- fixedOtherHome = yield fixOneCalendarHome(other)
- this = yield mergeHomes(t, this, other, homeType)
- # NOTE: WE MUST NOT TOUCH EITHER HOME OBJECT AFTER THIS POINT.
- # THE UIDS HAVE CHANGED AND ALL OPERATIONS WILL FAIL.
-
- end = time.time()
- elapsed = end - start
- allElapsed.append(elapsed)
- log.info(
- "Scanned UID {uid}; {elapsed} seconds elapsed,"
- " {fixes} properties fixed ({duplicate} fixes in duplicate).",
- uid=UID, elapsed=elapsed, fixes=fixedThisHome,
- duplicate=fixedOtherHome
- )
- returnValue(None)
-
-
-
-def _getHome(txn, homeType, uid):
- """
- Like L{CommonHome.homeWithUID} but also honoring ENOTIFICATIONTYPE which
- isn't I{really} a type of home.
-
- @param txn: the transaction to retrieve the home from
- @type txn: L{CommonStoreTransaction}
-
- @param homeType: L{ENOTIFICATIONTYPE}, L{ECALENDARTYPE}, or
- L{EADDRESSBOOKTYPE}.
-
- @param uid: the UID of the home to retrieve.
- @type uid: L{str}
-
- @return: a L{Deferred} that fires with the L{CommonHome} or
- L{NotificationHome} when it has been retrieved.
- """
- if homeType == ENOTIFICATIONTYPE:
- return txn.notificationsWithUID(uid, create=False)
- else:
- return txn.homeWithUID(homeType, uid)
-
-
-
- at inlineCallbacks
-def _normalizeColumnUUIDs(txn, column):
- """
- Upper-case the UUIDs in the given SQL DAL column.
-
- @param txn: The transaction.
- @type txn: L{CommonStoreTransaction}
-
- @param column: the column, which may contain UIDs, to normalize.
- @type column: L{ColumnSyntax}
-
- @return: A L{Deferred} that will fire when the UUID normalization of the
- given column has completed.
- """
- tableModel = column.model.table
- # Get a primary key made of column syntax objects for querying and
- # comparison later.
- pkey = [ColumnSyntax(columnModel)
- for columnModel in tableModel.primaryKey]
- for row in (yield Select([column] + pkey,
- From=TableSyntax(tableModel)).on(txn)):
- before = row[0]
- pkeyparts = row[1:]
- after = normalizeUUIDOrNot(before)
- if after != before:
- where = _AndNothing
- # Build a where clause out of the primary key and the parts of the
- # primary key that were found.
- for pkeycol, pkeypart in zip(pkeyparts, pkey):
- where = where.And(pkeycol == pkeypart)
- yield Update({column: after}, Where=where).on(txn)
-
-
-
-class _AndNothing(object):
- """
- Simple placeholder for iteratively generating a 'Where' clause; the 'And'
- just returns its argument, so it can be used at the start of the loop.
- """
- @staticmethod
- def And(self):
- """
- Return the argument.
- """
- return self
-
-
-
- at inlineCallbacks
-def _needsNormalizationUpgrade(txn):
- """
- Determine whether a given store requires a UUID normalization data upgrade.
-
- @param txn: the transaction to use
- @type txn: L{CommonStoreTransaction}
-
- @return: a L{Deferred} that fires with C{True} or C{False} depending on
- whether we need the normalization upgrade or not.
- """
- for x in [schema.CALENDAR_HOME, schema.ADDRESSBOOK_HOME,
- schema.NOTIFICATION_HOME]:
- slct = Select([x.OWNER_UID], From=x,
- Where=x.OWNER_UID != Upper(x.OWNER_UID))
- rows = yield slct.on(txn)
- if rows:
- for [uid] in rows:
- if normalizeUUIDOrNot(uid) != uid:
- returnValue(True)
- returnValue(False)
-
-
-
- at inlineCallbacks
-def fixUUIDNormalization(store):
- """
- Fix all UUIDs in the given SQL store to be in a canonical form;
- 00000000-0000-0000-0000-000000000000 format and upper-case.
- """
- t = store.newTransaction(disableCache=True)
-
- # First, let's see if there are any calendar, addressbook, or notification
- # homes that have a de-normalized OWNER_UID. If there are none, then we can
- # early-out and avoid the tedious and potentially expensive inspection of
- # oodles of calendar data.
- if not (yield _needsNormalizationUpgrade(t)):
- log.info("No potentially denormalized UUIDs detected, "
- "skipping normalization upgrade.")
- yield t.abort()
- returnValue(None)
- try:
- yield _normalizeHomeUUIDsIn(t, ECALENDARTYPE)
- yield _normalizeHomeUUIDsIn(t, EADDRESSBOOKTYPE)
- yield _normalizeHomeUUIDsIn(t, ENOTIFICATIONTYPE)
- yield _normalizeColumnUUIDs(t, schema.RESOURCE_PROPERTY.VIEWER_UID)
- yield _normalizeColumnUUIDs(t, schema.APN_SUBSCRIPTIONS.SUBSCRIBER_GUID)
- except:
- log.failure("Unable to normalize UUIDs")
- yield t.abort()
- # There's a lot of possible problems here which are very hard to test
- # for individually; unexpected data that might cause constraint
- # violations under one of the manipulations done by
- # normalizeHomeUUIDsIn. Since this upgrade does not come along with a
- # schema version bump and may be re- attempted at any time, just raise
- # the exception and log it so that we can try again later, and the
- # service will survive for everyone _not_ affected by this somewhat
- # obscure bug.
- else:
- yield t.commit()
Added: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_notification.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_notification.py (rev 0)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_notification.py 2015-02-20 18:41:05 UTC (rev 14462)
@@ -0,0 +1,728 @@
+# -*- test-case-name: twext.enterprise.dal.test.test_record -*-
+##
+# Copyright (c) 2015 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twext.enterprise.dal.syntax import Select, Parameter, Insert, \
+ SavepointAction, Delete, Max, Len, Update
+from twext.enterprise.util import parseSQLTimestamp
+from twext.internet.decorate import memoizedKey
+from twext.python.clsprop import classproperty
+from twext.python.log import Logger
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.python.util import FancyEqMixin
+from twistedcaldav.dateops import datetimeMktime
+from txdav.base.propertystore.sql import PropertyStore
+from txdav.common.datastore.sql_tables import schema, _HOME_STATUS_NORMAL, \
+ _HOME_STATUS_EXTERNAL
+from txdav.common.datastore.sql_util import _SharedSyncLogic
+from txdav.common.icommondatastore import RecordNotAllowedError
+from txdav.common.idirectoryservice import DirectoryRecordNotFoundError
+from txdav.common.inotifications import INotificationCollection, \
+ INotificationObject
+from txdav.idav import ChangeCategory
+from txweb2.dav.noneprops import NonePropertyStore
+from txweb2.http_headers import MimeType
+from zope.interface.declarations import implements
+import hashlib
+import json
+
+"""
+Classes and methods that relate to the Notification collection in the SQL store.
+"""
+class NotificationCollection(FancyEqMixin, _SharedSyncLogic):
+ log = Logger()
+
+ implements(INotificationCollection)
+
+ compareAttributes = (
+ "_uid",
+ "_resourceID",
+ )
+
+ _revisionsSchema = schema.NOTIFICATION_OBJECT_REVISIONS
+ _homeSchema = schema.NOTIFICATION_HOME
+
+
+ def __init__(self, txn, uid, resourceID):
+
+ self._txn = txn
+ self._uid = uid
+ self._resourceID = resourceID
+ self._dataVersion = None
+ self._notifications = {}
+ self._notificationNames = None
+ self._syncTokenRevision = None
+
+ # Make sure we have push notifications setup to push on this collection
+ # as well as the home it is in
+ self._notifiers = dict([(factory_name, factory.newNotifier(self),) for factory_name, factory in txn._notifierFactories.items()])
+
+ _resourceIDFromUIDQuery = Select(
+ [_homeSchema.RESOURCE_ID], From=_homeSchema,
+ Where=_homeSchema.OWNER_UID == Parameter("uid"))
+
+ _UIDFromResourceIDQuery = Select(
+ [_homeSchema.OWNER_UID], From=_homeSchema,
+ Where=_homeSchema.RESOURCE_ID == Parameter("rid"))
+
+ _provisionNewNotificationsQuery = Insert(
+ {_homeSchema.OWNER_UID: Parameter("uid")},
+ Return=_homeSchema.RESOURCE_ID
+ )
+
+
+ @property
+ def _home(self):
+ """
+ L{NotificationCollection} serves as its own C{_home} for the purposes of
+ working with L{_SharedSyncLogic}.
+ """
+ return self
+
+
+ @classmethod
+ @inlineCallbacks
+ def notificationsWithUID(cls, txn, uid, create):
+ """
+ @param uid: I'm going to assume uid is utf-8 encoded bytes
+ """
+ rows = yield cls._resourceIDFromUIDQuery.on(txn, uid=uid)
+
+ if rows:
+ resourceID = rows[0][0]
+ created = False
+ elif create:
+ # Determine if the user is local or external
+ record = yield txn.directoryService().recordWithUID(uid.decode("utf-8"))
+ if record is None:
+ raise DirectoryRecordNotFoundError("Cannot create home for UID since no directory record exists: {}".format(uid))
+
+ state = _HOME_STATUS_NORMAL if record.thisServer() else _HOME_STATUS_EXTERNAL
+ if state == _HOME_STATUS_EXTERNAL:
+ raise RecordNotAllowedError("Cannot store notifications for external user: {}".format(uid))
+
+ # Use savepoint so we can do a partial rollback if there is a race
+ # condition where this row has already been inserted
+ savepoint = SavepointAction("notificationsWithUID")
+ yield savepoint.acquire(txn)
+
+ try:
+ resourceID = str((
+ yield cls._provisionNewNotificationsQuery.on(txn, uid=uid)
+ )[0][0])
+ except Exception:
+ # FIXME: Really want to trap the pg.DatabaseError but in a non-
+ # DB specific manner
+ yield savepoint.rollback(txn)
+
+ # Retry the query - row may exist now, if not re-raise
+ rows = yield cls._resourceIDFromUIDQuery.on(txn, uid=uid)
+ if rows:
+ resourceID = rows[0][0]
+ created = False
+ else:
+ raise
+ else:
+ created = True
+ yield savepoint.release(txn)
+ else:
+ returnValue(None)
+ collection = cls(txn, uid, resourceID)
+ yield collection._loadPropertyStore()
+ if created:
+ yield collection._initSyncToken()
+ yield collection.notifyChanged()
+ returnValue(collection)
+
+
+ @classmethod
+ @inlineCallbacks
+ def notificationsWithResourceID(cls, txn, rid):
+ rows = yield cls._UIDFromResourceIDQuery.on(txn, rid=rid)
+
+ if rows:
+ uid = rows[0][0]
+ result = (yield cls.notificationsWithUID(txn, uid, create=False))
+ returnValue(result)
+ else:
+ returnValue(None)
+
+
+ @inlineCallbacks
+ def _loadPropertyStore(self):
+ self._propertyStore = yield PropertyStore.load(
+ self._uid,
+ self._uid,
+ None,
+ self._txn,
+ self._resourceID,
+ notifyCallback=self.notifyChanged
+ )
+
+
+ def __repr__(self):
+ return "<%s: %s>" % (self.__class__.__name__, self._resourceID)
+
+
+ def id(self):
+ """
+ Retrieve the store identifier for this collection.
+
+ @return: store identifier.
+ @rtype: C{int}
+ """
+ return self._resourceID
+
+
+ @classproperty
+ def _dataVersionQuery(cls):
+ nh = cls._homeSchema
+ return Select(
+ [nh.DATAVERSION], From=nh,
+ Where=nh.RESOURCE_ID == Parameter("resourceID")
+ )
+
+
+ @inlineCallbacks
+ def dataVersion(self):
+ if self._dataVersion is None:
+ self._dataVersion = (yield self._dataVersionQuery.on(
+ self._txn, resourceID=self._resourceID))[0][0]
+ returnValue(self._dataVersion)
+
+
+ def name(self):
+ return "notification"
+
+
+ def uid(self):
+ return self._uid
+
+
+ def owned(self):
+ return True
+
+
+ def ownerHome(self):
+ return self._home
+
+
+ def viewerHome(self):
+ return self._home
+
+
+ @inlineCallbacks
+ def notificationObjects(self):
+ results = (yield NotificationObject.loadAllObjects(self))
+ for result in results:
+ self._notifications[result.uid()] = result
+ self._notificationNames = sorted([result.name() for result in results])
+ returnValue(results)
+
+ _notificationUIDsForHomeQuery = Select(
+ [schema.NOTIFICATION.NOTIFICATION_UID], From=schema.NOTIFICATION,
+ Where=schema.NOTIFICATION.NOTIFICATION_HOME_RESOURCE_ID ==
+ Parameter("resourceID"))
+
+
+ @inlineCallbacks
+ def listNotificationObjects(self):
+ if self._notificationNames is None:
+ rows = yield self._notificationUIDsForHomeQuery.on(
+ self._txn, resourceID=self._resourceID)
+ self._notificationNames = sorted([row[0] for row in rows])
+ returnValue(self._notificationNames)
+
+
+ # used by _SharedSyncLogic.resourceNamesSinceRevision()
+ def listObjectResources(self):
+ return self.listNotificationObjects()
+
+
+ def _nameToUID(self, name):
+ """
+ Based on the file-backed implementation, the 'name' is just uid +
+ ".xml".
+ """
+ return name.rsplit(".", 1)[0]
+
+
+ def notificationObjectWithName(self, name):
+ return self.notificationObjectWithUID(self._nameToUID(name))
+
+
+ @memoizedKey("uid", "_notifications")
+ @inlineCallbacks
+ def notificationObjectWithUID(self, uid):
+ """
+ Create an empty notification object first then have it initialize itself
+ from the store.
+ """
+ no = NotificationObject(self, uid)
+ no = (yield no.initFromStore())
+ returnValue(no)
+
+
+ @inlineCallbacks
+ def writeNotificationObject(self, uid, notificationtype, notificationdata):
+
+ inserting = False
+ notificationObject = yield self.notificationObjectWithUID(uid)
+ if notificationObject is None:
+ notificationObject = NotificationObject(self, uid)
+ inserting = True
+ yield notificationObject.setData(uid, notificationtype, notificationdata, inserting=inserting)
+ if inserting:
+ yield self._insertRevision("%s.xml" % (uid,))
+ if self._notificationNames is not None:
+ self._notificationNames.append(notificationObject.uid())
+ else:
+ yield self._updateRevision("%s.xml" % (uid,))
+ yield self.notifyChanged()
+
+
+ def removeNotificationObjectWithName(self, name):
+ if self._notificationNames is not None:
+ self._notificationNames.remove(self._nameToUID(name))
+ return self.removeNotificationObjectWithUID(self._nameToUID(name))
+
+ _removeByUIDQuery = Delete(
+ From=schema.NOTIFICATION,
+ Where=(schema.NOTIFICATION.NOTIFICATION_UID == Parameter("uid")).And(
+ schema.NOTIFICATION.NOTIFICATION_HOME_RESOURCE_ID
+ == Parameter("resourceID")))
+
+
+ @inlineCallbacks
+ def removeNotificationObjectWithUID(self, uid):
+ yield self._removeByUIDQuery.on(
+ self._txn, uid=uid, resourceID=self._resourceID)
+ self._notifications.pop(uid, None)
+ yield self._deleteRevision("%s.xml" % (uid,))
+ yield self.notifyChanged()
+
+ _initSyncTokenQuery = Insert(
+ {
+ _revisionsSchema.HOME_RESOURCE_ID : Parameter("resourceID"),
+ _revisionsSchema.RESOURCE_NAME : None,
+ _revisionsSchema.REVISION : schema.REVISION_SEQ,
+ _revisionsSchema.DELETED : False
+ }, Return=_revisionsSchema.REVISION
+ )
+
+
+ @inlineCallbacks
+ def _initSyncToken(self):
+ self._syncTokenRevision = (yield self._initSyncTokenQuery.on(
+ self._txn, resourceID=self._resourceID))[0][0]
+
+ _syncTokenQuery = Select(
+ [Max(_revisionsSchema.REVISION)], From=_revisionsSchema,
+ Where=_revisionsSchema.HOME_RESOURCE_ID == Parameter("resourceID")
+ )
+
+
+ @inlineCallbacks
+ def syncToken(self):
+ if self._syncTokenRevision is None:
+ self._syncTokenRevision = yield self.syncTokenRevision()
+ returnValue("%s_%s" % (self._resourceID, self._syncTokenRevision))
+
+
+ @inlineCallbacks
+ def syncTokenRevision(self):
+ revision = (yield self._syncTokenQuery.on(self._txn, resourceID=self._resourceID))[0][0]
+ if revision is None:
+ revision = int((yield self._txn.calendarserverValue("MIN-VALID-REVISION")))
+ returnValue(revision)
+
+
+ def properties(self):
+ return self._propertyStore
+
+
+ def addNotifier(self, factory_name, notifier):
+ if self._notifiers is None:
+ self._notifiers = {}
+ self._notifiers[factory_name] = notifier
+
+
+ def getNotifier(self, factory_name):
+ return self._notifiers.get(factory_name)
+
+
+ def notifierID(self):
+ return (self._txn._homeClass[self._txn._primaryHomeType]._notifierPrefix, "%s/notification" % (self.ownerHome().uid(),),)
+
+
+ def parentNotifierID(self):
+ return (self._txn._homeClass[self._txn._primaryHomeType]._notifierPrefix, "%s" % (self.ownerHome().uid(),),)
+
+
+ @inlineCallbacks
+ def notifyChanged(self, category=ChangeCategory.default):
+ """
+ Send notifications, change sync token and bump last modified because
+ the resource has changed. We ensure we only do this once per object
+ per transaction.
+ """
+ if self._txn.isNotifiedAlready(self):
+ returnValue(None)
+ self._txn.notificationAddedForObject(self)
+
+ # Send notifications
+ if self._notifiers:
+ # cache notifiers run in post commit
+ notifier = self._notifiers.get("cache", None)
+ if notifier:
+ self._txn.postCommit(notifier.notify)
+ # push notifiers add their work items immediately
+ notifier = self._notifiers.get("push", None)
+ if notifier:
+ yield notifier.notify(self._txn, priority=category.value)
+
+ returnValue(None)
+
+
+ @classproperty
+ def _completelyNewRevisionQuery(cls):
+ rev = cls._revisionsSchema
+ return Insert({rev.HOME_RESOURCE_ID: Parameter("homeID"),
+ # rev.RESOURCE_ID: Parameter("resourceID"),
+ rev.RESOURCE_NAME: Parameter("name"),
+ rev.REVISION: schema.REVISION_SEQ,
+ rev.DELETED: False},
+ Return=rev.REVISION)
+
+
+ def _maybeNotify(self):
+ """
+ Emit a push notification after C{_changeRevision}.
+ """
+ return self.notifyChanged()
+
+
+ @inlineCallbacks
+ def remove(self):
+ """
+ Remove DB rows corresponding to this notification home.
+ """
+ # Delete NOTIFICATION rows
+ no = schema.NOTIFICATION
+ kwds = {"ResourceID": self._resourceID}
+ yield Delete(
+ From=no,
+ Where=(
+ no.NOTIFICATION_HOME_RESOURCE_ID == Parameter("ResourceID")
+ ),
+ ).on(self._txn, **kwds)
+
+ # Delete NOTIFICATION_HOME (will cascade to NOTIFICATION_OBJECT_REVISIONS)
+ nh = schema.NOTIFICATION_HOME
+ yield Delete(
+ From=nh,
+ Where=(
+ nh.RESOURCE_ID == Parameter("ResourceID")
+ ),
+ ).on(self._txn, **kwds)
+
+
+
+class NotificationObject(FancyEqMixin, object):
+ """
+ This used to store XML data and an XML element for the type. But we are now switching it
+ to use JSON internally. The app layer will convert that to XML and fill in the "blanks" as
+ needed for the app.
+ """
+ log = Logger()
+
+ implements(INotificationObject)
+
+ compareAttributes = (
+ "_resourceID",
+ "_home",
+ )
+
+ _objectSchema = schema.NOTIFICATION
+
+ def __init__(self, home, uid):
+ self._home = home
+ self._resourceID = None
+ self._uid = uid
+ self._md5 = None
+ self._size = None
+ self._created = None
+ self._modified = None
+ self._notificationType = None
+ self._notificationData = None
+
+
+ def __repr__(self):
+ return "<%s: %s>" % (self.__class__.__name__, self._resourceID)
+
+
+ @classproperty
+ def _allColumnsByHomeIDQuery(cls):
+ """
+ DAL query to load all columns by home ID.
+ """
+ obj = cls._objectSchema
+ return Select(
+ [obj.RESOURCE_ID, obj.NOTIFICATION_UID, obj.MD5,
+ Len(obj.NOTIFICATION_DATA), obj.NOTIFICATION_TYPE, obj.CREATED, obj.MODIFIED],
+ From=obj,
+ Where=(obj.NOTIFICATION_HOME_RESOURCE_ID == Parameter("homeID"))
+ )
+
+
+ @classmethod
+ @inlineCallbacks
+ def loadAllObjects(cls, parent):
+ """
+ Load all child objects and return a list of them. This must create the
+ child classes and initialize them using "batched" SQL operations to keep
+ this constant wrt the number of children. This is an optimization for
+ Depth:1 operations on the collection.
+ """
+
+ results = []
+
+ # Load from the main table first
+ dataRows = (
+ yield cls._allColumnsByHomeIDQuery.on(parent._txn,
+ homeID=parent._resourceID))
+
+ if dataRows:
+ # Get property stores for all these child resources (if any found)
+ propertyStores = (yield PropertyStore.forMultipleResources(
+ parent.uid(),
+ None,
+ None,
+ parent._txn,
+ schema.NOTIFICATION.RESOURCE_ID,
+ schema.NOTIFICATION.NOTIFICATION_HOME_RESOURCE_ID,
+ parent._resourceID,
+ ))
+
+ # Create the actual objects merging in properties
+ for row in dataRows:
+ child = cls(parent, None)
+ (child._resourceID,
+ child._uid,
+ child._md5,
+ child._size,
+ child._notificationType,
+ child._created,
+ child._modified,) = tuple(row)
+ try:
+ child._notificationType = json.loads(child._notificationType)
+ except ValueError:
+ pass
+ if isinstance(child._notificationType, unicode):
+ child._notificationType = child._notificationType.encode("utf-8")
+ child._loadPropertyStore(
+ props=propertyStores.get(child._resourceID, None)
+ )
+ results.append(child)
+
+ returnValue(results)
+
+
+ @classproperty
+ def _oneNotificationQuery(cls):
+ no = cls._objectSchema
+ return Select(
+ [
+ no.RESOURCE_ID,
+ no.MD5,
+ Len(no.NOTIFICATION_DATA),
+ no.NOTIFICATION_TYPE,
+ no.CREATED,
+ no.MODIFIED
+ ],
+ From=no,
+ Where=(no.NOTIFICATION_UID ==
+ Parameter("uid")).And(no.NOTIFICATION_HOME_RESOURCE_ID ==
+ Parameter("homeID")))
+
+
+ @inlineCallbacks
+ def initFromStore(self):
+ """
+ Initialise this object from the store, based on its UID and home
+ resource ID. We read in and cache all the extra metadata from the DB to
+ avoid having to do DB queries for those individually later.
+
+ @return: L{self} if object exists in the DB, else C{None}
+ """
+ rows = (yield self._oneNotificationQuery.on(
+ self._txn, uid=self._uid, homeID=self._home._resourceID))
+ if rows:
+ (self._resourceID,
+ self._md5,
+ self._size,
+ self._notificationType,
+ self._created,
+ self._modified,) = tuple(rows[0])
+ try:
+ self._notificationType = json.loads(self._notificationType)
+ except ValueError:
+ pass
+ if isinstance(self._notificationType, unicode):
+ self._notificationType = self._notificationType.encode("utf-8")
+ self._loadPropertyStore()
+ returnValue(self)
+ else:
+ returnValue(None)
+
+
+ def _loadPropertyStore(self, props=None, created=False):
+ if props is None:
+ props = NonePropertyStore(self._home.uid())
+ self._propertyStore = props
+
+
+ def properties(self):
+ return self._propertyStore
+
+
+ def id(self):
+ """
+ Retrieve the store identifier for this object.
+
+ @return: store identifier.
+ @rtype: C{int}
+ """
+ return self._resourceID
+
+
+ @property
+ def _txn(self):
+ return self._home._txn
+
+
+ def notificationCollection(self):
+ return self._home
+
+
+ def uid(self):
+ return self._uid
+
+
+ def name(self):
+ return self.uid() + ".xml"
+
+
+ @classproperty
+ def _newNotificationQuery(cls):
+ no = cls._objectSchema
+ return Insert(
+ {
+ no.NOTIFICATION_HOME_RESOURCE_ID: Parameter("homeID"),
+ no.NOTIFICATION_UID: Parameter("uid"),
+ no.NOTIFICATION_TYPE: Parameter("notificationType"),
+ no.NOTIFICATION_DATA: Parameter("notificationData"),
+ no.MD5: Parameter("md5"),
+ },
+ Return=[no.RESOURCE_ID, no.CREATED, no.MODIFIED]
+ )
+
+
+ @classproperty
+ def _updateNotificationQuery(cls):
+ no = cls._objectSchema
+ return Update(
+ {
+ no.NOTIFICATION_TYPE: Parameter("notificationType"),
+ no.NOTIFICATION_DATA: Parameter("notificationData"),
+ no.MD5: Parameter("md5"),
+ },
+ Where=(no.NOTIFICATION_HOME_RESOURCE_ID == Parameter("homeID")).And(
+ no.NOTIFICATION_UID == Parameter("uid")),
+ Return=no.MODIFIED
+ )
+
+
+ @inlineCallbacks
+ def setData(self, uid, notificationtype, notificationdata, inserting=False):
+ """
+ Set the object resource data and update and cached metadata.
+ """
+
+ notificationtext = json.dumps(notificationdata)
+ self._notificationType = notificationtype
+ self._md5 = hashlib.md5(notificationtext).hexdigest()
+ self._size = len(notificationtext)
+ if inserting:
+ rows = yield self._newNotificationQuery.on(
+ self._txn, homeID=self._home._resourceID, uid=uid,
+ notificationType=json.dumps(self._notificationType),
+ notificationData=notificationtext, md5=self._md5
+ )
+ self._resourceID, self._created, self._modified = rows[0]
+ self._loadPropertyStore()
+ else:
+ rows = yield self._updateNotificationQuery.on(
+ self._txn, homeID=self._home._resourceID, uid=uid,
+ notificationType=json.dumps(self._notificationType),
+ notificationData=notificationtext, md5=self._md5
+ )
+ self._modified = rows[0][0]
+ self._notificationData = notificationdata
+
+ _notificationDataFromID = Select(
+ [_objectSchema.NOTIFICATION_DATA], From=_objectSchema,
+ Where=_objectSchema.RESOURCE_ID == Parameter("resourceID"))
+
+
+ @inlineCallbacks
+ def notificationData(self):
+ if self._notificationData is None:
+ self._notificationData = (yield self._notificationDataFromID.on(self._txn, resourceID=self._resourceID))[0][0]
+ try:
+ self._notificationData = json.loads(self._notificationData)
+ except ValueError:
+ pass
+ if isinstance(self._notificationData, unicode):
+ self._notificationData = self._notificationData.encode("utf-8")
+ returnValue(self._notificationData)
+
+
+ def contentType(self):
+ """
+ The content type of NotificationObjects is text/xml.
+ """
+ return MimeType.fromString("text/xml")
+
+
+ def md5(self):
+ return self._md5
+
+
+ def size(self):
+ return self._size
+
+
+ def notificationType(self):
+ return self._notificationType
+
+
+ def created(self):
+ return datetimeMktime(parseSQLTimestamp(self._created))
+
+
+ def modified(self):
+ return datetimeMktime(parseSQLTimestamp(self._modified))
Added: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_sharing.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_sharing.py (rev 0)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_sharing.py 2015-02-20 18:41:05 UTC (rev 14462)
@@ -0,0 +1,1254 @@
+# -*- test-case-name: twext.enterprise.dal.test.test_record -*-
+##
+# Copyright (c) 2015 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from collections import namedtuple
+from pycalendar.datetime import DateTime
+
+from twext.enterprise.dal.syntax import Insert, Parameter, Update, Delete, \
+ Select, Max
+from twext.python.clsprop import classproperty
+from twext.python.log import Logger
+
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed
+
+from txdav.base.propertystore.base import PropertyName
+from txdav.common.datastore.sql_tables import _BIND_MODE_OWN, _BIND_MODE_DIRECT, \
+ _BIND_MODE_INDIRECT, _BIND_STATUS_ACCEPTED, _BIND_STATUS_DECLINED, \
+ _BIND_STATUS_INVITED, _BIND_STATUS_INVALID, _BIND_STATUS_DELETED
+from txdav.common.icommondatastore import ExternalShareFailed, \
+ HomeChildNameAlreadyExistsError, AllRetriesFailed
+from txdav.xml import element
+
+from uuid import uuid4
+
+
+log = Logger()
+
+"""
+Classes and methods that relate to sharing in the SQL store.
+"""
+
+class SharingHomeMixIn(object):
+ """
+ Common class for CommonHome to implement sharing operations
+ """
+
+ @inlineCallbacks
+ def acceptShare(self, shareUID, summary=None):
+ """
+ This share is being accepted.
+ """
+
+ shareeView = yield self.anyObjectWithShareUID(shareUID)
+ if shareeView is not None:
+ yield shareeView.acceptShare(summary)
+
+ returnValue(shareeView)
+
+
+ @inlineCallbacks
+ def declineShare(self, shareUID):
+ """
+ This share is being declined.
+ """
+
+ shareeView = yield self.anyObjectWithShareUID(shareUID)
+ if shareeView is not None:
+ yield shareeView.declineShare()
+
+ returnValue(shareeView is not None)
+
+
+ #
+ # External (cross-pod) sharing - entry point is the sharee's home collection.
+ #
+ @inlineCallbacks
+ def processExternalInvite(
+ self, ownerUID, ownerRID, ownerName, shareUID, bindMode, summary,
+ copy_invite_properties, supported_components=None
+ ):
+ """
+ External invite received.
+ """
+
+ # Get the owner home - create external one if not present
+ ownerHome = yield self._txn.homeWithUID(
+ self._homeType, ownerUID, create=True
+ )
+ if ownerHome is None or not ownerHome.external():
+ raise ExternalShareFailed("Invalid owner UID: {}".format(ownerUID))
+
+ # Try to find owner calendar via its external id
+ ownerView = yield ownerHome.childWithExternalID(ownerRID)
+ if ownerView is None:
+ try:
+ ownerView = yield ownerHome.createChildWithName(
+ ownerName, externalID=ownerRID
+ )
+ except HomeChildNameAlreadyExistsError:
+ # This is odd - it means we possibly have a left over sharer
+ # collection which the sharer likely removed and re-created
+ # with the same name but now it has a different externalID and
+ # is not found by the initial query. What we do is check to see
+ # whether any shares still reference the old ID - if they do we
+ # are hosed. If not, we can remove the old item and create a new one.
+ oldOwnerView = yield ownerHome.childWithName(ownerName)
+ invites = yield oldOwnerView.sharingInvites()
+ if len(invites) != 0:
+ log.error(
+ "External invite collection name is present with a "
+ "different externalID and still has shares"
+ )
+ raise
+ log.error(
+ "External invite collection name is present with a "
+ "different externalID - trying to fix"
+ )
+ yield ownerHome.removeExternalChild(oldOwnerView)
+ ownerView = yield ownerHome.createChildWithName(
+ ownerName, externalID=ownerRID
+ )
+
+ if (
+ supported_components is not None and
+ hasattr(ownerView, "setSupportedComponents")
+ ):
+ yield ownerView.setSupportedComponents(supported_components)
+
+ # Now carry out the share operation
+ if bindMode == _BIND_MODE_DIRECT:
+ shareeView = yield ownerView.directShareWithUser(
+ self.uid(), shareName=shareUID
+ )
+ else:
+ shareeView = yield ownerView.inviteUIDToShare(
+ self.uid(), bindMode, summary, shareName=shareUID
+ )
+
+ shareeView.setInviteCopyProperties(copy_invite_properties)
+
+
+ @inlineCallbacks
+ def processExternalUninvite(self, ownerUID, ownerRID, shareUID):
+ """
+ External invite received.
+ """
+
+ # Get the owner home
+ ownerHome = yield self._txn.homeWithUID(self._homeType, ownerUID)
+ if ownerHome is None or not ownerHome.external():
+ raise ExternalShareFailed("Invalid owner UID: {}".format(ownerUID))
+
+ # Try to find owner calendar via its external id
+ ownerView = yield ownerHome.childWithExternalID(ownerRID)
+ if ownerView is None:
+ raise ExternalShareFailed("Invalid share ID: {}".format(shareUID))
+
+ # Now carry out the share operation
+ yield ownerView.uninviteUIDFromShare(self.uid())
+
+ # See if there are any references to the external share. If not,
+ # remove it
+ invites = yield ownerView.sharingInvites()
+ if len(invites) == 0:
+ yield ownerHome.removeExternalChild(ownerView)
+
+
+ @inlineCallbacks
+ def processExternalReply(
+ self, ownerUID, shareeUID, shareUID, bindStatus, summary=None
+ ):
+ """
+ External invite received.
+ """
+
+ # Make sure the shareeUID and shareUID match
+
+ # Get the owner home - create external one if not present
+ shareeHome = yield self._txn.homeWithUID(self._homeType, shareeUID)
+ if shareeHome is None or not shareeHome.external():
+ raise ExternalShareFailed(
+ "Invalid sharee UID: {}".format(shareeUID)
+ )
+
+ # Try to find owner calendar via its external id
+ shareeView = yield shareeHome.anyObjectWithShareUID(shareUID)
+ if shareeView is None:
+ raise ExternalShareFailed("Invalid share UID: {}".format(shareUID))
+
+ # Now carry out the share operation
+ if bindStatus == _BIND_STATUS_ACCEPTED:
+ yield shareeHome.acceptShare(shareUID, summary)
+ elif bindStatus == _BIND_STATUS_DECLINED:
+ if shareeView.direct():
+ yield shareeView.deleteShare()
+ else:
+ yield shareeHome.declineShare(shareUID)
+
+
+
+SharingInvitation = namedtuple(
+ "SharingInvitation",
+ ["uid", "ownerUID", "ownerHomeID", "shareeUID", "shareeHomeID", "mode", "status", "summary"]
+)
+
+
+
+class SharingMixIn(object):
+ """
+ Common class for CommonHomeChild and AddressBookObject
+ """
+
+ @classproperty
+ def _bindInsertQuery(cls, **kw):
+ """
+ DAL statement to create a bind entry that connects a collection to its
+ home.
+ """
+ bind = cls._bindSchema
+ return Insert({
+ bind.HOME_RESOURCE_ID: Parameter("homeID"),
+ bind.RESOURCE_ID: Parameter("resourceID"),
+ bind.EXTERNAL_ID: Parameter("externalID"),
+ bind.RESOURCE_NAME: Parameter("name"),
+ bind.BIND_MODE: Parameter("mode"),
+ bind.BIND_STATUS: Parameter("bindStatus"),
+ bind.MESSAGE: Parameter("message"),
+ })
+
+
+ @classmethod
+ def _updateBindColumnsQuery(cls, columnMap):
+ bind = cls._bindSchema
+ return Update(
+ columnMap,
+ Where=(bind.RESOURCE_ID == Parameter("resourceID")).And(
+ bind.HOME_RESOURCE_ID == Parameter("homeID")),
+ )
+
+
+ @classproperty
+ def _deleteBindForResourceIDAndHomeID(cls):
+ bind = cls._bindSchema
+ return Delete(
+ From=bind,
+ Where=(bind.RESOURCE_ID == Parameter("resourceID")).And(
+ bind.HOME_RESOURCE_ID == Parameter("homeID")),
+ )
+
+
+ @classmethod
+ def _bindFor(cls, condition):
+ bind = cls._bindSchema
+ columns = cls.bindColumns() + cls.additionalBindColumns()
+ return Select(
+ columns,
+ From=bind,
+ Where=condition
+ )
+
+
+ @classmethod
+ def _bindInviteFor(cls, condition):
+ home = cls._homeSchema
+ bind = cls._bindSchema
+ return Select(
+ [
+ home.OWNER_UID,
+ bind.HOME_RESOURCE_ID,
+ bind.RESOURCE_ID,
+ bind.RESOURCE_NAME,
+ bind.BIND_MODE,
+ bind.BIND_STATUS,
+ bind.MESSAGE,
+ ],
+ From=bind.join(home, on=(bind.HOME_RESOURCE_ID == home.RESOURCE_ID)),
+ Where=condition
+ )
+
+
+ @classproperty
+ def _sharedInvitationBindForResourceID(cls):
+ bind = cls._bindSchema
+ return cls._bindInviteFor(
+ (bind.RESOURCE_ID == Parameter("resourceID")).And
+ (bind.BIND_MODE != _BIND_MODE_OWN)
+ )
+
+
+ @classproperty
+ def _acceptedBindForHomeID(cls):
+ bind = cls._bindSchema
+ return cls._bindFor((bind.HOME_RESOURCE_ID == Parameter("homeID"))
+ .And(bind.BIND_STATUS == _BIND_STATUS_ACCEPTED))
+
+
+ @classproperty
+ def _bindForResourceIDAndHomeID(cls):
+ """
+ DAL query that looks up home bind rows by home child
+ resource ID and home resource ID.
+ """
+ bind = cls._bindSchema
+ return cls._bindFor((bind.RESOURCE_ID == Parameter("resourceID"))
+ .And(bind.HOME_RESOURCE_ID == Parameter("homeID")))
+
+
+ @classproperty
+ def _bindForExternalIDAndHomeID(cls):
+ """
+ DAL query that looks up home bind rows by home child
+ resource ID and home resource ID.
+ """
+ bind = cls._bindSchema
+ return cls._bindFor((bind.EXTERNAL_ID == Parameter("externalID"))
+ .And(bind.HOME_RESOURCE_ID == Parameter("homeID")))
+
+
+ @classproperty
+ def _bindForNameAndHomeID(cls):
+ """
+ DAL query that looks up any bind rows by home child
+ resource ID and home resource ID.
+ """
+ bind = cls._bindSchema
+ return cls._bindFor((bind.RESOURCE_NAME == Parameter("name"))
+ .And(bind.HOME_RESOURCE_ID == Parameter("homeID")))
+
+
+ #
+ # Higher level API
+ #
+ @inlineCallbacks
+ def inviteUIDToShare(self, shareeUID, mode, summary=None, shareName=None):
+ """
+ Invite a user to share this collection - either create the share if it does not exist, or
+ update the existing share with new values. Make sure a notification is sent as well.
+
+ @param shareeUID: UID of the sharee
+ @type shareeUID: C{str}
+ @param mode: access mode
+ @type mode: C{int}
+ @param summary: share message
+ @type summary: C{str}
+ """
+
+ # Look for existing invite and update its fields or create new one
+ shareeView = yield self.shareeView(shareeUID)
+ if shareeView is not None:
+ status = _BIND_STATUS_INVITED if shareeView.shareStatus() in (_BIND_STATUS_DECLINED, _BIND_STATUS_INVALID) else None
+ yield self.updateShare(shareeView, mode=mode, status=status, summary=summary)
+ else:
+ shareeView = yield self.createShare(shareeUID=shareeUID, mode=mode, summary=summary, shareName=shareName)
+
+ # Check for external
+ if shareeView.viewerHome().external():
+ yield self._sendExternalInvite(shareeView)
+ else:
+ # Send invite notification
+ yield self._sendInviteNotification(shareeView)
+ returnValue(shareeView)
+
+
+ @inlineCallbacks
+ def directShareWithUser(self, shareeUID, shareName=None):
+ """
+ Create a direct share with the specified user. Note it is currently up to the app layer
+ to enforce access control - this is not ideal as we really should have control of that in
+ the store. Once we do, this api will need to verify that access is allowed for a direct share.
+
+ NB no invitations are used with direct sharing.
+
+ @param shareeUID: UID of the sharee
+ @type shareeUID: C{str}
+ """
+
+ # Ignore if it already exists
+ shareeView = yield self.shareeView(shareeUID)
+ if shareeView is None:
+ shareeView = yield self.createShare(shareeUID=shareeUID, mode=_BIND_MODE_DIRECT, shareName=shareName)
+ yield shareeView.newShare()
+
+ # Check for external
+ if shareeView.viewerHome().external():
+ yield self._sendExternalInvite(shareeView)
+
+ returnValue(shareeView)
+
+
+ @inlineCallbacks
+ def uninviteUIDFromShare(self, shareeUID):
+ """
+ Remove a user from a share. Make sure a notification is sent as well.
+
+ @param shareeUID: UID of the sharee
+ @type shareeUID: C{str}
+ """
+ # Cancel invites - we'll just use whatever userid we are given
+
+ shareeView = yield self.shareeView(shareeUID)
+ if shareeView is not None:
+ if shareeView.viewerHome().external():
+ yield self._sendExternalUninvite(shareeView)
+ else:
+ # If current user state is accepted then we send an invite with the new state, otherwise
+ # we cancel any existing invites for the user
+ if not shareeView.direct():
+ if shareeView.shareStatus() != _BIND_STATUS_ACCEPTED:
+ yield self._removeInviteNotification(shareeView)
+ else:
+ yield self._sendInviteNotification(shareeView, notificationState=_BIND_STATUS_DELETED)
+
+ # Remove the bind
+ yield self.removeShare(shareeView)
+
+
+ @inlineCallbacks
+ def acceptShare(self, summary=None):
+ """
+ This share is being accepted.
+ """
+
+ if not self.direct() and self.shareStatus() != _BIND_STATUS_ACCEPTED:
+ if self.external():
+ yield self._replyExternalInvite(_BIND_STATUS_ACCEPTED, summary)
+ ownerView = yield self.ownerView()
+ yield ownerView.updateShare(self, status=_BIND_STATUS_ACCEPTED)
+ yield self.newShare(displayname=summary)
+ if not ownerView.external():
+ yield self._sendReplyNotification(ownerView, summary)
+
+
+ @inlineCallbacks
+ def declineShare(self):
+ """
+ This share is being declined.
+ """
+
+ if not self.direct() and self.shareStatus() != _BIND_STATUS_DECLINED:
+ if self.external():
+ yield self._replyExternalInvite(_BIND_STATUS_DECLINED)
+ ownerView = yield self.ownerView()
+ yield ownerView.updateShare(self, status=_BIND_STATUS_DECLINED)
+ if not ownerView.external():
+ yield self._sendReplyNotification(ownerView)
+
+
+ @inlineCallbacks
+ def deleteShare(self):
+ """
+ This share is being deleted (by the sharee) - either decline or remove (for direct shares).
+ """
+
+ ownerView = yield self.ownerView()
+ if self.direct():
+ yield ownerView.removeShare(self)
+ if ownerView.external():
+ yield self._replyExternalInvite(_BIND_STATUS_DECLINED)
+ else:
+ yield self.declineShare()
+
+
+ @inlineCallbacks
+ def ownerDeleteShare(self):
+ """
+ This share is being deleted (by the owner) - either decline or remove (for direct shares).
+ """
+
+ # Change status on store object
+ yield self.setShared(False)
+
+ # Remove all sharees (direct and invited)
+ for invitation in (yield self.sharingInvites()):
+ yield self.uninviteUIDFromShare(invitation.shareeUID)
+
+
+ def newShare(self, displayname=None):
+ """
+ Override in derived classes to do any specific operations needed when a share
+ is first accepted.
+ """
+ return succeed(None)
+
+
+ @inlineCallbacks
+ def allInvitations(self):
+ """
+ Get list of all invitations (non-direct) to this object.
+ """
+ invitations = yield self.sharingInvites()
+
+ # remove direct shares as those are not "real" invitations
+ invitations = filter(lambda x: x.mode != _BIND_MODE_DIRECT, invitations)
+ invitations.sort(key=lambda invitation: invitation.shareeUID)
+ returnValue(invitations)
+
+
+ @inlineCallbacks
+ def _sendInviteNotification(self, shareeView, notificationState=None):
+ """
+ Called on the owner's resource.
+ """
+ # When deleting the message is the sharee's display name
+ displayname = shareeView.shareMessage()
+ if notificationState == _BIND_STATUS_DELETED:
+ displayname = str(shareeView.properties().get(PropertyName.fromElement(element.DisplayName), displayname))
+
+ notificationtype = {
+ "notification-type": "invite-notification",
+ "shared-type": shareeView.sharedResourceType(),
+ }
+ notificationdata = {
+ "notification-type": "invite-notification",
+ "shared-type": shareeView.sharedResourceType(),
+ "dtstamp": DateTime.getNowUTC().getText(),
+ "owner": shareeView.ownerHome().uid(),
+ "sharee": shareeView.viewerHome().uid(),
+ "uid": shareeView.shareUID(),
+ "status": shareeView.shareStatus() if notificationState is None else notificationState,
+ "access": (yield shareeView.effectiveShareMode()),
+ "ownerName": self.shareName(),
+ "summary": displayname,
+ }
+ if hasattr(self, "getSupportedComponents"):
+ notificationdata["supported-components"] = self.getSupportedComponents()
+
+ # Add to sharee's collection
+ notifications = yield self._txn.notificationsWithUID(shareeView.viewerHome().uid())
+ yield notifications.writeNotificationObject(shareeView.shareUID(), notificationtype, notificationdata)
+
+
+ @inlineCallbacks
+ def _sendReplyNotification(self, ownerView, summary=None):
+ """
+ Create a reply notification based on the current state of this shared resource.
+ """
+
+ # Generate invite XML
+ notificationUID = "%s-reply" % (self.shareUID(),)
+
+ notificationtype = {
+ "notification-type": "invite-reply",
+ "shared-type": self.sharedResourceType(),
+ }
+
+ notificationdata = {
+ "notification-type": "invite-reply",
+ "shared-type": self.sharedResourceType(),
+ "dtstamp": DateTime.getNowUTC().getText(),
+ "owner": self.ownerHome().uid(),
+ "sharee": self.viewerHome().uid(),
+ "status": self.shareStatus(),
+ "ownerName": ownerView.shareName(),
+ "in-reply-to": self.shareUID(),
+ "summary": summary,
+ }
+
+ # Add to owner notification collection
+ notifications = yield self._txn.notificationsWithUID(self.ownerHome().uid())
+ yield notifications.writeNotificationObject(notificationUID, notificationtype, notificationdata)
+
+
+ @inlineCallbacks
+ def _removeInviteNotification(self, shareeView):
+ """
+ Called on the owner's resource.
+ """
+
+ # Remove from sharee's collection
+ notifications = yield self._txn.notificationsWithUID(shareeView.viewerHome().uid())
+ yield notifications.removeNotificationObjectWithUID(shareeView.shareUID())
+
+
+ #
+ # External/cross-pod API
+ #
+ @inlineCallbacks
+ def _sendExternalInvite(self, shareeView):
+
+ yield self._txn.store().conduit.send_shareinvite(
+ self._txn,
+ shareeView.ownerHome()._homeType,
+ shareeView.ownerHome().uid(),
+ self.id(),
+ self.shareName(),
+ shareeView.viewerHome().uid(),
+ shareeView.shareUID(),
+ shareeView.shareMode(),
+ shareeView.shareMessage(),
+ self.getInviteCopyProperties(),
+ supported_components=self.getSupportedComponents() if hasattr(self, "getSupportedComponents") else None,
+ )
+
+
+ @inlineCallbacks
+ def _sendExternalUninvite(self, shareeView):
+
+ yield self._txn.store().conduit.send_shareuninvite(
+ self._txn,
+ shareeView.ownerHome()._homeType,
+ shareeView.ownerHome().uid(),
+ self.id(),
+ shareeView.viewerHome().uid(),
+ shareeView.shareUID(),
+ )
+
+
+ @inlineCallbacks
+ def _replyExternalInvite(self, status, summary=None):
+
+ yield self._txn.store().conduit.send_sharereply(
+ self._txn,
+ self.viewerHome()._homeType,
+ self.ownerHome().uid(),
+ self.viewerHome().uid(),
+ self.shareUID(),
+ status,
+ summary,
+ )
+
+
+ #
+ # Lower level API
+ #
+ @inlineCallbacks
+ def ownerView(self):
+ """
+ Return the owner resource counterpart of this shared resource.
+
+ Note we have to play a trick with the property store to coerce it to match
+ the per-user properties for the owner.
+ """
+ # Get the child of the owner home that has the same resource id as the owned one
+ ownerView = yield self.ownerHome().childWithID(self.id())
+ returnValue(ownerView)
+
+
+ @inlineCallbacks
+ def shareeView(self, shareeUID):
+ """
+ Return the shared resource counterpart of this owned resource for the specified sharee.
+
+ Note we have to play a trick with the property store to coerce it to match
+ the per-user properties for the sharee.
+ """
+
+ # Never return the owner's own resource
+ if self._home.uid() == shareeUID:
+ returnValue(None)
+
+ # Get the child of the sharee home that has the same resource id as the owned one
+ shareeHome = yield self._txn.homeWithUID(self._home._homeType, shareeUID, authzUID=shareeUID)
+ shareeView = (yield shareeHome.allChildWithID(self.id())) if shareeHome is not None else None
+ returnValue(shareeView)
+
+
+ @inlineCallbacks
+ def shareWithUID(self, shareeUID, mode, status=None, summary=None, shareName=None):
+ """
+ Share this (owned) L{CommonHomeChild} with another principal.
+
+ @param shareeUID: The UID of the sharee.
+ @type: L{str}
+
+ @param mode: The sharing mode; L{_BIND_MODE_READ} or
+ L{_BIND_MODE_WRITE} or L{_BIND_MODE_DIRECT}
+ @type mode: L{str}
+
+ @param status: The sharing status; L{_BIND_STATUS_INVITED} or
+ L{_BIND_STATUS_ACCEPTED}
+ @type: L{str}
+
+ @param summary: The proposed message to go along with the share, which
+ will be used as the default display name.
+ @type: L{str}
+
+ @return: the name of the shared calendar in the new calendar home.
+ @rtype: L{str}
+ """
+ shareeHome = yield self._txn.calendarHomeWithUID(shareeUID, create=True)
+ returnValue(
+ (yield self.shareWith(shareeHome, mode, status, summary, shareName))
+ )
+
+
+ @inlineCallbacks
+ def shareWith(self, shareeHome, mode, status=None, summary=None, shareName=None):
+ """
+ Share this (owned) L{CommonHomeChild} with another home.
+
+ @param shareeHome: The home of the sharee.
+ @type: L{CommonHome}
+
+ @param mode: The sharing mode; L{_BIND_MODE_READ} or
+ L{_BIND_MODE_WRITE} or L{_BIND_MODE_DIRECT}
+ @type: L{str}
+
+ @param status: The sharing status; L{_BIND_STATUS_INVITED} or
+ L{_BIND_STATUS_ACCEPTED}
+ @type: L{str}
+
+ @param summary: The proposed message to go along with the share, which
+ will be used as the default display name.
+ @type: L{str}
+
+ @param shareName: The proposed name of the new share.
+ @type: L{str}
+
+ @return: the name of the shared calendar in the new calendar home.
+ @rtype: L{str}
+ """
+
+ if status is None:
+ status = _BIND_STATUS_ACCEPTED
+
+ @inlineCallbacks
+ def doInsert(subt):
+ newName = shareName if shareName is not None else self.newShareName()
+ yield self._bindInsertQuery.on(
+ subt,
+ homeID=shareeHome._resourceID,
+ resourceID=self._resourceID,
+ externalID=self._externalID,
+ name=newName,
+ mode=mode,
+ bindStatus=status,
+ message=summary
+ )
+ returnValue(newName)
+ try:
+ bindName = yield self._txn.subtransaction(doInsert)
+ except AllRetriesFailed:
+ # FIXME: catch more specific exception
+ child = yield shareeHome.allChildWithID(self._resourceID)
+ yield self.updateShare(
+ child, mode=mode, status=status,
+ summary=summary
+ )
+ bindName = child._name
+ else:
+ if status == _BIND_STATUS_ACCEPTED:
+ shareeView = yield shareeHome.anyObjectWithShareUID(bindName)
+ yield shareeView._initSyncToken()
+ yield shareeView._initBindRevision()
+
+ # Mark this as shared
+ yield self.setShared(True)
+
+ # Must send notification to ensure cache invalidation occurs
+ yield self.notifyPropertyChanged()
+ yield shareeHome.notifyChanged()
+
+ returnValue(bindName)
+
+
+ @inlineCallbacks
+ def createShare(self, shareeUID, mode, summary=None, shareName=None):
+ """
+ Create a new shared resource. If the mode is direct, the share is created in accepted state,
+ otherwise the share is created in invited state.
+ """
+ shareeHome = yield self._txn.homeWithUID(self.ownerHome()._homeType, shareeUID, create=True)
+
+ yield self.shareWith(
+ shareeHome,
+ mode=mode,
+ status=_BIND_STATUS_INVITED if mode != _BIND_MODE_DIRECT else _BIND_STATUS_ACCEPTED,
+ summary=summary,
+ shareName=shareName,
+ )
+ shareeView = yield self.shareeView(shareeUID)
+ returnValue(shareeView)
+
+
+ @inlineCallbacks
+ def updateShare(self, shareeView, mode=None, status=None, summary=None):
+ """
+ Update share mode, status, and message for a home child shared with
+ this (owned) L{CommonHomeChild}.
+
+ @param shareeView: The sharee home child that shares this.
+ @type shareeView: L{CommonHomeChild}
+
+ @param mode: The sharing mode; L{_BIND_MODE_READ} or
+ L{_BIND_MODE_WRITE} or None to not update
+ @type mode: L{str}
+
+ @param status: The sharing status; L{_BIND_STATUS_INVITED} or
+ L{_BIND_STATUS_ACCEPTED} or L{_BIND_STATUS_DECLINED} or
+ L{_BIND_STATUS_INVALID} or None to not update
+ @type status: L{str}
+
+ @param summary: The proposed message to go along with the share, which
+ will be used as the default display name, or None to not update
+ @type summary: L{str}
+ """
+ # TODO: raise a nice exception if shareeView is not, in fact, a shared
+ # version of this same L{CommonHomeChild}
+
+ # remove None parameters, and substitute None for empty string
+ bind = self._bindSchema
+ columnMap = {}
+ if mode != None and mode != shareeView._bindMode:
+ columnMap[bind.BIND_MODE] = mode
+ if status != None and status != shareeView._bindStatus:
+ columnMap[bind.BIND_STATUS] = status
+ if summary != None and summary != shareeView._bindMessage:
+ columnMap[bind.MESSAGE] = summary
+
+ if columnMap:
+
+ # Count accepted
+ if bind.BIND_STATUS in columnMap:
+ previouslyAcceptedCount = yield shareeView._previousAcceptCount()
+
+ yield self._updateBindColumnsQuery(columnMap).on(
+ self._txn,
+ resourceID=self._resourceID, homeID=shareeView._home._resourceID
+ )
+
+ # Update affected attributes
+ if bind.BIND_MODE in columnMap:
+ shareeView._bindMode = columnMap[bind.BIND_MODE]
+
+ if bind.BIND_STATUS in columnMap:
+ shareeView._bindStatus = columnMap[bind.BIND_STATUS]
+ yield shareeView._changedStatus(previouslyAcceptedCount)
+
+ if bind.MESSAGE in columnMap:
+ shareeView._bindMessage = columnMap[bind.MESSAGE]
+
+ yield shareeView.invalidateQueryCache()
+
+ # Must send notification to ensure cache invalidation occurs
+ yield self.notifyPropertyChanged()
+ yield shareeView.viewerHome().notifyChanged()
+
+
+ def _previousAcceptCount(self):
+ return succeed(1)
+
+
+ @inlineCallbacks
+ def _changedStatus(self, previouslyAcceptedCount):
+ if self._bindStatus == _BIND_STATUS_ACCEPTED:
+ yield self._initSyncToken()
+ yield self._initBindRevision()
+ self._home._children[self._name] = self
+ self._home._children[self._resourceID] = self
+ elif self._bindStatus in (_BIND_STATUS_INVITED, _BIND_STATUS_DECLINED):
+ yield self._deletedSyncToken(sharedRemoval=True)
+ self._home._children.pop(self._name, None)
+ self._home._children.pop(self._resourceID, None)
+
+
+ @inlineCallbacks
+ def removeShare(self, shareeView):
+ """
+ Remove the shared version of this (owned) L{CommonHomeChild} from the
+ referenced L{CommonHome}.
+
+ @see: L{CommonHomeChild.shareWith}
+
+ @param shareeView: The shared resource being removed.
+
+ @return: a L{Deferred} which will fire with the previous shareUID
+ """
+
+ # remove sync tokens
+ shareeHome = shareeView.viewerHome()
+ yield shareeView._deletedSyncToken(sharedRemoval=True)
+ shareeHome._children.pop(shareeView._name, None)
+ shareeHome._children.pop(shareeView._resourceID, None)
+
+ # Must send notification to ensure cache invalidation occurs
+ yield self.notifyPropertyChanged()
+ yield shareeHome.notifyChanged()
+
+ # delete binds including invites
+ yield self._deleteBindForResourceIDAndHomeID.on(
+ self._txn,
+ resourceID=self._resourceID,
+ homeID=shareeHome._resourceID,
+ )
+
+ yield shareeView.invalidateQueryCache()
+
+
+ @inlineCallbacks
+ def unshare(self):
+ """
+ Unshares a collection, regardless of which "direction" it was shared.
+ """
+ if self.owned():
+ # This collection may be shared to others
+ invites = yield self.sharingInvites()
+ for invite in invites:
+ shareeView = yield self.shareeView(invite.shareeUID)
+ yield self.removeShare(shareeView)
+ else:
+ # This collection is shared to me
+ ownerView = yield self.ownerView()
+ yield ownerView.removeShare(self)
+
+
+ @inlineCallbacks
+ def sharingInvites(self):
+ """
+ Retrieve the list of all L{SharingInvitation}'s for this L{CommonHomeChild}, irrespective of mode.
+
+ @return: L{SharingInvitation} objects
+ @rtype: a L{Deferred} which fires with a L{list} of L{SharingInvitation}s.
+ """
+ if not self.owned():
+ returnValue([])
+
+ # get all accepted binds
+ invitedRows = yield self._sharedInvitationBindForResourceID.on(
+ self._txn, resourceID=self._resourceID, homeID=self._home._resourceID
+ )
+
+ result = []
+ for homeUID, homeRID, _ignore_resourceID, resourceName, bindMode, bindStatus, bindMessage in invitedRows:
+ invite = SharingInvitation(
+ resourceName,
+ self.ownerHome().name(),
+ self.ownerHome().id(),
+ homeUID,
+ homeRID,
+ bindMode,
+ bindStatus,
+ bindMessage,
+ )
+ result.append(invite)
+ returnValue(result)
+
+
+ @inlineCallbacks
+ def _initBindRevision(self):
+ yield self.syncToken() # init self._syncTokenRevision if None
+ self._bindRevision = self._syncTokenRevision
+
+ bind = self._bindSchema
+ yield self._updateBindColumnsQuery(
+ {bind.BIND_REVISION : Parameter("revision"), }
+ ).on(
+ self._txn,
+ revision=self._bindRevision,
+ resourceID=self._resourceID,
+ homeID=self.viewerHome()._resourceID,
+ )
+ yield self.invalidateQueryCache()
+
+
+ def sharedResourceType(self):
+ """
+ The sharing resource type. Needs to be overridden by each type of resource that can be shared.
+
+ @return: an identifier for the type of the share.
+ @rtype: C{str}
+ """
+ return ""
+
+
+ def newShareName(self):
+ """
+ Name used when creating a new share. By default this is a UUID.
+ """
+ return str(uuid4())
+
+
+ def owned(self):
+ """
+ @see: L{ICalendar.owned}
+ """
+ return self._bindMode == _BIND_MODE_OWN
+
+
+ def isShared(self):
+ """
+ For an owned collection indicate whether it is shared.
+
+ @return: C{True} if shared, C{False} otherwise
+ @rtype: C{bool}
+ """
+ return self.owned() and self._bindMessage == "shared"
+
+
+ @inlineCallbacks
+ def setShared(self, shared):
+ """
+ Set an owned collection to shared or unshared state. Technically this is not useful as "shared"
+ really means it has invitees, but the current sharing spec supports a notion of a shared collection
+ that has not yet had invitees added. For the time being we will support that option by using a new
+ MESSAGE value to indicate an owned collection that is "shared".
+
+ @param shared: whether or not the owned collection is "shared"
+ @type shared: C{bool}
+ """
+ assert self.owned(), "Cannot change share mode on a shared collection"
+
+ # Only if change is needed
+ newMessage = "shared" if shared else None
+ if self._bindMessage == newMessage:
+ returnValue(None)
+
+ self._bindMessage = newMessage
+
+ bind = self._bindSchema
+ yield Update(
+ {bind.MESSAGE: self._bindMessage},
+ Where=(bind.RESOURCE_ID == Parameter("resourceID")).And(
+ bind.HOME_RESOURCE_ID == Parameter("homeID")),
+ ).on(self._txn, resourceID=self._resourceID, homeID=self.viewerHome()._resourceID)
+
+ yield self.invalidateQueryCache()
+ yield self.notifyPropertyChanged()
+
+
+ def direct(self):
+ """
+ Is this a "direct" share?
+
+ @return: a boolean indicating whether it's direct.
+ """
+ return self._bindMode == _BIND_MODE_DIRECT
+
+
+ def indirect(self):
+ """
+ Is this an "indirect" share?
+
+ @return: a boolean indicating whether it's indirect.
+ """
+ return self._bindMode == _BIND_MODE_INDIRECT
+
+
+ def shareUID(self):
+ """
+ @see: L{ICalendar.shareUID}
+ """
+ return self.name()
+
+
+ def shareMode(self):
+ """
+ @see: L{ICalendar.shareMode}
+ """
+ return self._bindMode
+
+
+ def _effectiveShareMode(self, bindMode, viewerUID, txn):
+ """
+ Get the effective share mode without a calendar object
+ """
+ return bindMode
+
+
+ def effectiveShareMode(self):
+ """
+ @see: L{ICalendar.shareMode}
+ """
+ return self._bindMode
+
+
+ def shareName(self):
+ """
+ This is a path like name for the resource within the home being shared. For object resource
+ shares this will be a combination of the L{CommonHomeChild} name and the L{CommonObjecrResource}
+ name. Otherwise it is just the L{CommonHomeChild} name. This is needed to expose a value to the
+ app-layer such that it can construct a URI for the actual WebDAV resource being shared.
+ """
+ name = self.name()
+ if self.sharedResourceType() == "group":
+ name = self.parentCollection().name() + "/" + name
+ return name
+
+
+ def shareStatus(self):
+ """
+ @see: L{ICalendar.shareStatus}
+ """
+ return self._bindStatus
+
+
+ def accepted(self):
+ """
+ @see: L{ICalendar.shareStatus}
+ """
+ return self._bindStatus == _BIND_STATUS_ACCEPTED
+
+
+ def shareMessage(self):
+ """
+ @see: L{ICalendar.shareMessage}
+ """
+ return self._bindMessage
+
+
+ def getInviteCopyProperties(self):
+ """
+ Get a dictionary of property name/values (as strings) for properties that are shadowable and
+ need to be copied to a sharee's collection when an external (cross-pod) share is created.
+ Sub-classes should override to expose the properties they care about.
+ """
+ return {}
+
+
+ def setInviteCopyProperties(self, props):
+ """
+ Copy a set of shadowable properties (as name/value strings) onto this shared resource when
+ a cross-pod invite is processed. Sub-classes should override to expose the properties they
+ care about.
+ """
+ pass
+
+
+ @classmethod
+ def metadataColumns(cls):
+ """
+ Return a list of column name for retrieval of metadata. This allows
+ different child classes to have their own type specific data, but still make use of the
+ common base logic.
+ """
+
+ # Common behavior is to have created and modified
+
+ return (
+ cls._homeChildMetaDataSchema.CREATED,
+ cls._homeChildMetaDataSchema.MODIFIED,
+ )
+
+
+ @classmethod
+ def metadataAttributes(cls):
+ """
+ Return a list of attribute names for retrieval of metadata. This allows
+ different child classes to have their own type specific data, but still make use of the
+ common base logic.
+ """
+
+ # Common behavior is to have created and modified
+
+ return (
+ "_created",
+ "_modified",
+ )
+
+
+ @classmethod
+ def bindColumns(cls):
+ """
+ Return a list of column names for retrieval during creation. This allows
+ different child classes to have their own type specific data, but still make use of the
+ common base logic.
+ """
+
+ return (
+ cls._bindSchema.BIND_MODE,
+ cls._bindSchema.HOME_RESOURCE_ID,
+ cls._bindSchema.RESOURCE_ID,
+ cls._bindSchema.EXTERNAL_ID,
+ cls._bindSchema.RESOURCE_NAME,
+ cls._bindSchema.BIND_STATUS,
+ cls._bindSchema.BIND_REVISION,
+ cls._bindSchema.MESSAGE
+ )
+
+
+ @classmethod
+ def bindAttributes(cls):
+ """
+ Return a list of column names for retrieval during creation. This allows
+ different child classes to have their own type specific data, but still make use of the
+ common base logic.
+ """
+
+ return (
+ "_bindMode",
+ "_homeResourceID",
+ "_resourceID",
+ "_externalID",
+ "_name",
+ "_bindStatus",
+ "_bindRevision",
+ "_bindMessage",
+ )
+
+ bindColumnCount = 8
+
+ @classmethod
+ def additionalBindColumns(cls):
+ """
+ Return a list of column names for retrieval during creation. This allows
+ different child classes to have their own type specific data, but still make use of the
+ common base logic.
+ """
+
+ return ()
+
+
+ @classmethod
+ def additionalBindAttributes(cls):
+ """
+ Return a list of attribute names for retrieval of during creation. This allows
+ different child classes to have their own type specific data, but still make use of the
+ common base logic.
+ """
+
+ return ()
+
+
+ @classproperty
+ def _childrenAndMetadataForHomeID(cls):
+ bind = cls._bindSchema
+ child = cls._homeChildSchema
+ childMetaData = cls._homeChildMetaDataSchema
+
+ columns = cls.bindColumns() + cls.additionalBindColumns() + cls.metadataColumns()
+ return Select(
+ columns,
+ From=child.join(
+ bind, child.RESOURCE_ID == bind.RESOURCE_ID,
+ 'left outer').join(
+ childMetaData, childMetaData.RESOURCE_ID == bind.RESOURCE_ID,
+ 'left outer'),
+ Where=(bind.HOME_RESOURCE_ID == Parameter("homeID")).And(
+ bind.BIND_STATUS == _BIND_STATUS_ACCEPTED)
+ )
+
+
+ @classmethod
+ def _revisionsForResourceIDs(cls, resourceIDs):
+ rev = cls._revisionsSchema
+ return Select(
+ [rev.RESOURCE_ID, Max(rev.REVISION)],
+ From=rev,
+ Where=rev.RESOURCE_ID.In(Parameter("resourceIDs", len(resourceIDs))).And(
+ (rev.RESOURCE_NAME != None).Or(rev.DELETED == False)),
+ GroupBy=rev.RESOURCE_ID
+ )
+
+
+ @inlineCallbacks
+ def invalidateQueryCache(self):
+ queryCacher = self._txn._queryCacher
+ if queryCacher is not None:
+ yield queryCacher.invalidateAfterCommit(self._txn, queryCacher.keyForHomeChildMetaData(self._resourceID))
+ yield queryCacher.invalidateAfterCommit(self._txn, queryCacher.keyForObjectWithName(self._home._resourceID, self._name))
+ yield queryCacher.invalidateAfterCommit(self._txn, queryCacher.keyForObjectWithResourceID(self._home._resourceID, self._resourceID))
+ yield queryCacher.invalidateAfterCommit(self._txn, queryCacher.keyForObjectWithExternalID(self._home._resourceID, self._externalID))
Added: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_util.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_util.py (rev 0)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_util.py 2015-02-20 18:41:05 UTC (rev 14462)
@@ -0,0 +1,807 @@
+# -*- test-case-name: twext.enterprise.dal.test.test_record -*-
+##
+# Copyright (c) 2015 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twext.enterprise.dal.syntax import Max, Select, Parameter, Delete, Insert, \
+ Update, ColumnSyntax, TableSyntax, Upper
+from twext.python.clsprop import classproperty
+from twext.python.log import Logger
+from twisted.internet.defer import succeed, inlineCallbacks, returnValue
+from txdav.base.datastore.util import normalizeUUIDOrNot
+from txdav.common.datastore.sql_tables import schema
+from txdav.common.icommondatastore import SyncTokenValidException, \
+ ENOTIFICATIONTYPE, ECALENDARTYPE, EADDRESSBOOKTYPE
+import time
+from uuid import UUID
+
+log = Logger()
+
+
+"""
+Classes and methods for the SQL store.
+"""
+
+class _EmptyCacher(object):
+
+ def set(self, key, value):
+ return succeed(True)
+
+
+ def get(self, key, withIdentifier=False):
+ return succeed(None)
+
+
+ def delete(self, key):
+ return succeed(True)
+
+
+
+class _SharedSyncLogic(object):
+ """
+ Logic for maintaining sync-token shared between notification collections and
+ shared collections.
+ """
+
+ @classproperty
+ def _childSyncTokenQuery(cls):
+ """
+ DAL query for retrieving the sync token of a L{CommonHomeChild} based on
+ its resource ID.
+ """
+ rev = cls._revisionsSchema
+ return Select([Max(rev.REVISION)], From=rev,
+ Where=rev.RESOURCE_ID == Parameter("resourceID"))
+
+
+ def revisionFromToken(self, token):
+ if token is None:
+ return 0
+ elif isinstance(token, str) or isinstance(token, unicode):
+ _ignore_uuid, revision = token.split("_", 1)
+ return int(revision)
+ else:
+ return token
+
+
+ @inlineCallbacks
+ def syncToken(self):
+ if self._syncTokenRevision is None:
+ self._syncTokenRevision = yield self.syncTokenRevision()
+ returnValue(("%s_%s" % (self._resourceID, self._syncTokenRevision,)))
+
+
+ @inlineCallbacks
+ def syncTokenRevision(self):
+ revision = (yield self._childSyncTokenQuery.on(self._txn, resourceID=self._resourceID))[0][0]
+ if revision is None:
+ revision = int((yield self._txn.calendarserverValue("MIN-VALID-REVISION")))
+ returnValue(revision)
+
+
+ def objectResourcesSinceToken(self, token):
+ raise NotImplementedError()
+
+
+ @classmethod
+ def _objectNamesSinceRevisionQuery(cls, deleted=True):
+ """
+ DAL query for (resource, deleted-flag)
+ """
+ rev = cls._revisionsSchema
+ where = (rev.REVISION > Parameter("revision")).And(rev.RESOURCE_ID == Parameter("resourceID"))
+ if not deleted:
+ where = where.And(rev.DELETED == False)
+ return Select(
+ [rev.RESOURCE_NAME, rev.DELETED],
+ From=rev,
+ Where=where,
+ )
+
+
+ def resourceNamesSinceToken(self, token):
+ """
+ Return the changed and deleted resources since a particular sync-token. This simply extracts
+ the revision from from the token then calls L{resourceNamesSinceRevision}.
+
+ @param revision: the revision to determine changes since
+ @type revision: C{int}
+ """
+
+ return self.resourceNamesSinceRevision(self.revisionFromToken(token))
+
+
+ @inlineCallbacks
+ def resourceNamesSinceRevision(self, revision):
+ """
+ Return the changed and deleted resources since a particular revision.
+
+ @param revision: the revision to determine changes since
+ @type revision: C{int}
+ """
+ changed = []
+ deleted = []
+ invalid = []
+ if revision:
+ minValidRevision = yield self._txn.calendarserverValue("MIN-VALID-REVISION")
+ if revision < int(minValidRevision):
+ raise SyncTokenValidException
+
+ results = [
+ (name if name else "", removed) for name, removed in (
+ yield self._objectNamesSinceRevisionQuery().on(
+ self._txn, revision=revision, resourceID=self._resourceID)
+ )
+ ]
+ results.sort(key=lambda x: x[1])
+
+ for name, wasdeleted in results:
+ if name:
+ if wasdeleted:
+ deleted.append(name)
+ else:
+ changed.append(name)
+ else:
+ changed = yield self.listObjectResources()
+
+ returnValue((changed, deleted, invalid))
+
+
+ @classproperty
+ def _removeDeletedRevision(cls):
+ rev = cls._revisionsSchema
+ return Delete(From=rev,
+ Where=(rev.HOME_RESOURCE_ID == Parameter("homeID")).And(
+ rev.COLLECTION_NAME == Parameter("collectionName")))
+
+
+ @classproperty
+ def _addNewRevision(cls):
+ rev = cls._revisionsSchema
+ return Insert(
+ {
+ rev.HOME_RESOURCE_ID: Parameter("homeID"),
+ rev.RESOURCE_ID: Parameter("resourceID"),
+ rev.COLLECTION_NAME: Parameter("collectionName"),
+ rev.RESOURCE_NAME: None,
+ # Always starts false; may be updated to be a tombstone
+ # later.
+ rev.DELETED: False
+ },
+ Return=[rev.REVISION]
+ )
+
+
+ @inlineCallbacks
+ def _initSyncToken(self):
+ yield self._removeDeletedRevision.on(
+ self._txn, homeID=self._home._resourceID, collectionName=self._name
+ )
+ self._syncTokenRevision = (yield (
+ self._addNewRevision.on(self._txn, homeID=self._home._resourceID,
+ resourceID=self._resourceID,
+ collectionName=self._name)))[0][0]
+ self._txn.bumpRevisionForObject(self)
+
+
+ @classproperty
+ def _renameSyncTokenQuery(cls):
+ """
+ DAL query to change sync token for a rename (increment and adjust
+ resource name).
+ """
+ rev = cls._revisionsSchema
+ return Update(
+ {
+ rev.REVISION: schema.REVISION_SEQ,
+ rev.COLLECTION_NAME: Parameter("name")
+ },
+ Where=(rev.RESOURCE_ID == Parameter("resourceID")).And
+ (rev.RESOURCE_NAME == None),
+ Return=rev.REVISION
+ )
+
+
+ @inlineCallbacks
+ def _renameSyncToken(self):
+ self._syncTokenRevision = (yield self._renameSyncTokenQuery.on(
+ self._txn, name=self._name, resourceID=self._resourceID))[0][0]
+ self._txn.bumpRevisionForObject(self)
+
+
+ @classproperty
+ def _bumpSyncTokenQuery(cls):
+ """
+ DAL query to change collection sync token. Note this can impact multiple rows if the
+ collection is shared.
+ """
+ rev = cls._revisionsSchema
+ return Update(
+ {rev.REVISION: schema.REVISION_SEQ, },
+ Where=(rev.RESOURCE_ID == Parameter("resourceID")).And
+ (rev.RESOURCE_NAME == None)
+ )
+
+
+ @inlineCallbacks
+ def _bumpSyncToken(self):
+
+ if not self._txn.isRevisionBumpedAlready(self):
+ self._txn.bumpRevisionForObject(self)
+ yield self._bumpSyncTokenQuery.on(
+ self._txn,
+ resourceID=self._resourceID,
+ )
+ self._syncTokenRevision = None
+
+
+ @classproperty
+ def _deleteSyncTokenQuery(cls):
+ """
+ DAL query to remove all child revision information. The revision for the collection
+ itself is not touched.
+ """
+ rev = cls._revisionsSchema
+ return Delete(
+ From=rev,
+ Where=(rev.HOME_RESOURCE_ID == Parameter("homeID")).And
+ (rev.RESOURCE_ID == Parameter("resourceID")).And
+ (rev.COLLECTION_NAME == None)
+ )
+
+
+ @classproperty
+ def _sharedRemovalQuery(cls):
+ """
+ DAL query to indicate a shared collection has been deleted.
+ """
+ rev = cls._revisionsSchema
+ return Update(
+ {
+ rev.RESOURCE_ID: None,
+ rev.REVISION: schema.REVISION_SEQ,
+ rev.DELETED: True
+ },
+ Where=(rev.HOME_RESOURCE_ID == Parameter("homeID")).And(
+ rev.RESOURCE_ID == Parameter("resourceID")).And(
+ rev.RESOURCE_NAME == None)
+ )
+
+
+ @classproperty
+ def _unsharedRemovalQuery(cls):
+ """
+ DAL query to indicate an owned collection has been deleted.
+ """
+ rev = cls._revisionsSchema
+ return Update(
+ {
+ rev.RESOURCE_ID: None,
+ rev.REVISION: schema.REVISION_SEQ,
+ rev.DELETED: True
+ },
+ Where=(rev.RESOURCE_ID == Parameter("resourceID")).And(
+ rev.RESOURCE_NAME == None),
+ )
+
+
+ @inlineCallbacks
+ def _deletedSyncToken(self, sharedRemoval=False):
+ """
+ When a collection is deleted we remove all the revision information for its child resources.
+ We update the collection's sync token to indicate it has been deleted - that way a sync on
+ the home collection can report the deletion of the collection.
+
+ @param sharedRemoval: indicates whether the collection being removed is shared
+ @type sharedRemoval: L{bool}
+ """
+ # Remove all child entries
+ yield self._deleteSyncTokenQuery.on(self._txn,
+ homeID=self._home._resourceID,
+ resourceID=self._resourceID)
+
+ # If this is a share being removed then we only mark this one specific
+ # home/resource-id as being deleted. On the other hand, if it is a
+ # non-shared collection, then we need to mark all collections
+ # with the resource-id as being deleted to account for direct shares.
+ if sharedRemoval:
+ yield self._sharedRemovalQuery.on(self._txn,
+ homeID=self._home._resourceID,
+ resourceID=self._resourceID)
+ else:
+ yield self._unsharedRemovalQuery.on(self._txn,
+ resourceID=self._resourceID)
+ self._syncTokenRevision = None
+
+
+ def _insertRevision(self, name):
+ return self._changeRevision("insert", name)
+
+
+ def _updateRevision(self, name):
+ return self._changeRevision("update", name)
+
+
+ def _deleteRevision(self, name):
+ return self._changeRevision("delete", name)
+
+
+ @classproperty
+ def _deleteBumpTokenQuery(cls):
+ rev = cls._revisionsSchema
+ return Update(
+ {rev.REVISION: schema.REVISION_SEQ, rev.DELETED: True},
+ Where=(rev.RESOURCE_ID == Parameter("resourceID")).And(
+ rev.RESOURCE_NAME == Parameter("name")),
+ Return=rev.REVISION
+ )
+
+
+ @classproperty
+ def _updateBumpTokenQuery(cls):
+ rev = cls._revisionsSchema
+ return Update(
+ {rev.REVISION: schema.REVISION_SEQ},
+ Where=(rev.RESOURCE_ID == Parameter("resourceID")).And(
+ rev.RESOURCE_NAME == Parameter("name")),
+ Return=rev.REVISION
+ )
+
+
+ @classproperty
+ def _insertFindPreviouslyNamedQuery(cls):
+ rev = cls._revisionsSchema
+ return Select(
+ [rev.RESOURCE_ID],
+ From=rev,
+ Where=(rev.RESOURCE_ID == Parameter("resourceID")).And(
+ rev.RESOURCE_NAME == Parameter("name"))
+ )
+
+
+ @classproperty
+ def _updatePreviouslyNamedQuery(cls):
+ rev = cls._revisionsSchema
+ return Update(
+ {rev.REVISION: schema.REVISION_SEQ, rev.DELETED: False},
+ Where=(rev.RESOURCE_ID == Parameter("resourceID")).And(
+ rev.RESOURCE_NAME == Parameter("name")),
+ Return=rev.REVISION
+ )
+
+
+ @classproperty
+ def _completelyNewRevisionQuery(cls):
+ rev = cls._revisionsSchema
+ return Insert(
+ {
+ rev.HOME_RESOURCE_ID: Parameter("homeID"),
+ rev.RESOURCE_ID: Parameter("resourceID"),
+ rev.RESOURCE_NAME: Parameter("name"),
+ rev.REVISION: schema.REVISION_SEQ,
+ rev.DELETED: False
+ },
+ Return=rev.REVISION
+ )
+
+
+ @inlineCallbacks
+ def _changeRevision(self, action, name):
+
+ # Need to handle the case where for some reason the revision entry is
+ # actually missing. For a "delete" we don't care, for an "update" we
+ # will turn it into an "insert".
+ if action == "delete":
+ rows = (
+ yield self._deleteBumpTokenQuery.on(
+ self._txn, resourceID=self._resourceID, name=name))
+ if rows:
+ self._syncTokenRevision = rows[0][0]
+ elif action == "update":
+ rows = (
+ yield self._updateBumpTokenQuery.on(
+ self._txn, resourceID=self._resourceID, name=name))
+ if rows:
+ self._syncTokenRevision = rows[0][0]
+ else:
+ action = "insert"
+
+ if action == "insert":
+ # Note that an "insert" may happen for a resource that previously
+ # existed and then was deleted. In that case an entry in the
+ # REVISIONS table still exists so we have to detect that and do db
+ # INSERT or UPDATE as appropriate
+
+ found = bool((
+ yield self._insertFindPreviouslyNamedQuery.on(
+ self._txn, resourceID=self._resourceID, name=name)))
+ if found:
+ self._syncTokenRevision = (
+ yield self._updatePreviouslyNamedQuery.on(
+ self._txn, resourceID=self._resourceID, name=name)
+ )[0][0]
+ else:
+ self._syncTokenRevision = (
+ yield self._completelyNewRevisionQuery.on(
+ self._txn, homeID=self.ownerHome()._resourceID,
+ resourceID=self._resourceID, name=name)
+ )[0][0]
+ yield self._maybeNotify()
+ returnValue(self._syncTokenRevision)
+
+
+ def _maybeNotify(self):
+ """
+ Maybe notify changed. (Overridden in NotificationCollection.)
+ """
+ return succeed(None)
+
+
+
+def determineNewest(uid, homeType):
+ """
+ Construct a query to determine the modification time of the newest object
+ in a given home.
+
+ @param uid: the UID of the home to scan.
+ @type uid: C{str}
+
+ @param homeType: The type of home to scan; C{ECALENDARTYPE},
+ C{ENOTIFICATIONTYPE}, or C{EADDRESSBOOKTYPE}.
+ @type homeType: C{int}
+
+ @return: A select query that will return a single row containing a single
+ column which is the maximum value.
+ @rtype: L{Select}
+ """
+ if homeType == ENOTIFICATIONTYPE:
+ return Select(
+ [Max(schema.NOTIFICATION.MODIFIED)],
+ From=schema.NOTIFICATION_HOME.join(
+ schema.NOTIFICATION,
+ on=schema.NOTIFICATION_HOME.RESOURCE_ID ==
+ schema.NOTIFICATION.NOTIFICATION_HOME_RESOURCE_ID),
+ Where=schema.NOTIFICATION_HOME.OWNER_UID == uid
+ )
+ homeTypeName = {ECALENDARTYPE: "CALENDAR",
+ EADDRESSBOOKTYPE: "ADDRESSBOOK"}[homeType]
+ home = getattr(schema, homeTypeName + "_HOME")
+ bind = getattr(schema, homeTypeName + "_BIND")
+ child = getattr(schema, homeTypeName)
+ obj = getattr(schema, homeTypeName + "_OBJECT")
+ return Select(
+ [Max(obj.MODIFIED)],
+ From=home.join(bind, on=bind.HOME_RESOURCE_ID == home.RESOURCE_ID).join(
+ child, on=child.RESOURCE_ID == bind.RESOURCE_ID).join(
+ obj, on=obj.PARENT_RESOURCE_ID == child.RESOURCE_ID),
+ Where=(bind.BIND_MODE == 0).And(home.OWNER_UID == uid)
+ )
+
+
+
+ at inlineCallbacks
+def mergeHomes(sqlTxn, one, other, homeType):
+ """
+ Merge two homes together. This determines which of C{one} or C{two} is
+ newer - that is, has been modified more recently - and pulls all the data
+ from the older into the newer home. Then, it changes the UID of the old
+ home to its UID, normalized and prefixed with "old.", and then re-names the
+ new home to its name, normalized.
+
+ Because the UIDs of both homes have changed, B{both one and two will be
+ invalid to all other callers from the start of the invocation of this
+ function}.
+
+ @param sqlTxn: the transaction to use
+ @type sqlTxn: A L{CommonTransaction}
+
+ @param one: A calendar home.
+ @type one: L{ICalendarHome}
+
+ @param two: Another, different calendar home.
+ @type two: L{ICalendarHome}
+
+ @param homeType: The type of home to scan; L{ECALENDARTYPE} or
+ L{EADDRESSBOOKTYPE}.
+ @type homeType: C{int}
+
+ @return: a L{Deferred} which fires with with the newer of C{one} or C{two},
+ into which the data from the other home has been merged, when the merge
+ is complete.
+ """
+ from txdav.caldav.datastore.util import migrateHome as migrateCalendarHome
+ from txdav.carddav.datastore.util import migrateHome as migrateABHome
+ migrateHome = {EADDRESSBOOKTYPE: migrateABHome,
+ ECALENDARTYPE: migrateCalendarHome,
+ ENOTIFICATIONTYPE: _dontBotherWithNotifications}[homeType]
+ homeTable = {EADDRESSBOOKTYPE: schema.ADDRESSBOOK_HOME,
+ ECALENDARTYPE: schema.CALENDAR_HOME,
+ ENOTIFICATIONTYPE: schema.NOTIFICATION_HOME}[homeType]
+ both = []
+ both.append([one,
+ (yield determineNewest(one.uid(), homeType).on(sqlTxn))])
+ both.append([other,
+ (yield determineNewest(other.uid(), homeType).on(sqlTxn))])
+ both.sort(key=lambda x: x[1])
+
+ older = both[0][0]
+ newer = both[1][0]
+ yield migrateHome(older, newer, merge=True)
+ # Rename the old one to 'old.<correct-guid>'
+ newNormalized = normalizeUUIDOrNot(newer.uid())
+ oldNormalized = normalizeUUIDOrNot(older.uid())
+ yield _renameHome(sqlTxn, homeTable, older.uid(), "old." + oldNormalized)
+ # Rename the new one to '<correct-guid>'
+ if newer.uid() != newNormalized:
+ yield _renameHome(sqlTxn, homeTable, newer.uid(), newNormalized)
+ yield returnValue(newer)
+
+
+
+def _renameHome(txn, table, oldUID, newUID):
+ """
+ Rename a calendar, addressbook, or notification home. Note that this
+ function is only safe in transactions that have had caching disabled, and
+ more specifically should only ever be used during upgrades. Running this
+ in a normal transaction will have unpredictable consequences, especially
+ with respect to memcache.
+
+ @param txn: an SQL transaction to use for this update
+ @type txn: L{twext.enterprise.ienterprise.IAsyncTransaction}
+
+ @param table: the storage table of the desired home type
+ @type table: L{TableSyntax}
+
+ @param oldUID: the old UID, the existing home's UID
+ @type oldUID: L{str}
+
+ @param newUID: the new UID, to change the UID to
+ @type newUID: L{str}
+
+ @return: a L{Deferred} which fires when the home is renamed.
+ """
+ return Update({table.OWNER_UID: newUID},
+ Where=table.OWNER_UID == oldUID).on(txn)
+
+
+
+def _dontBotherWithNotifications(older, newer, merge):
+ """
+ Notifications are more transient and can be easily worked around; don't
+ bother to migrate all of them when there is a UUID case mismatch.
+ """
+ pass
+
+
+
+ at inlineCallbacks
+def _normalizeHomeUUIDsIn(t, homeType):
+ """
+ Normalize the UUIDs in the given L{txdav.common.datastore.CommonStore}.
+
+ This changes the case of the UUIDs in the calendar home.
+
+ @param t: the transaction to normalize all the UUIDs in.
+ @type t: L{CommonStoreTransaction}
+
+ @param homeType: The type of home to scan, L{ECALENDARTYPE},
+ L{EADDRESSBOOKTYPE}, or L{ENOTIFICATIONTYPE}.
+ @type homeType: C{int}
+
+ @return: a L{Deferred} which fires with C{None} when the UUID normalization
+ is complete.
+ """
+ from txdav.caldav.datastore.util import fixOneCalendarHome
+ homeTable = {EADDRESSBOOKTYPE: schema.ADDRESSBOOK_HOME,
+ ECALENDARTYPE: schema.CALENDAR_HOME,
+ ENOTIFICATIONTYPE: schema.NOTIFICATION_HOME}[homeType]
+ homeTypeName = homeTable.model.name.split("_")[0]
+
+ allUIDs = yield Select([homeTable.OWNER_UID],
+ From=homeTable,
+ OrderBy=homeTable.OWNER_UID).on(t)
+ total = len(allUIDs)
+ allElapsed = []
+ for n, [UID] in enumerate(allUIDs):
+ start = time.time()
+ if allElapsed:
+ estimate = "%0.3d" % ((sum(allElapsed) / len(allElapsed)) *
+ total - n)
+ else:
+ estimate = "unknown"
+ log.info(
+ "Scanning UID {uid} [{homeType}] "
+ "({pct!0.2d}%, {estimate} seconds remaining)...",
+ uid=UID, pct=(n / float(total)) * 100, estimate=estimate,
+ homeType=homeTypeName
+ )
+ other = None
+ this = yield _getHome(t, homeType, UID)
+ if homeType == ECALENDARTYPE:
+ fixedThisHome = yield fixOneCalendarHome(this)
+ else:
+ fixedThisHome = 0
+ fixedOtherHome = 0
+ if this is None:
+ log.info(
+ "{uid!r} appears to be missing, already processed", uid=UID
+ )
+ try:
+ uuidobj = UUID(UID)
+ except ValueError:
+ pass
+ else:
+ newname = str(uuidobj).upper()
+ if UID != newname:
+ log.info(
+ "Detected case variance: {uid} {newuid}[{homeType}]",
+ uid=UID, newuid=newname, homeType=homeTypeName
+ )
+ other = yield _getHome(t, homeType, newname)
+ if other is None:
+ # No duplicate: just fix the name.
+ yield _renameHome(t, homeTable, UID, newname)
+ else:
+ if homeType == ECALENDARTYPE:
+ fixedOtherHome = yield fixOneCalendarHome(other)
+ this = yield mergeHomes(t, this, other, homeType)
+ # NOTE: WE MUST NOT TOUCH EITHER HOME OBJECT AFTER THIS POINT.
+ # THE UIDS HAVE CHANGED AND ALL OPERATIONS WILL FAIL.
+
+ end = time.time()
+ elapsed = end - start
+ allElapsed.append(elapsed)
+ log.info(
+ "Scanned UID {uid}; {elapsed} seconds elapsed,"
+ " {fixes} properties fixed ({duplicate} fixes in duplicate).",
+ uid=UID, elapsed=elapsed, fixes=fixedThisHome,
+ duplicate=fixedOtherHome
+ )
+ returnValue(None)
+
+
+
+def _getHome(txn, homeType, uid):
+ """
+ Like L{CommonHome.homeWithUID} but also honoring ENOTIFICATIONTYPE which
+ isn't I{really} a type of home.
+
+ @param txn: the transaction to retrieve the home from
+ @type txn: L{CommonStoreTransaction}
+
+ @param homeType: L{ENOTIFICATIONTYPE}, L{ECALENDARTYPE}, or
+ L{EADDRESSBOOKTYPE}.
+
+ @param uid: the UID of the home to retrieve.
+ @type uid: L{str}
+
+ @return: a L{Deferred} that fires with the L{CommonHome} or
+ L{NotificationHome} when it has been retrieved.
+ """
+ if homeType == ENOTIFICATIONTYPE:
+ return txn.notificationsWithUID(uid, create=False)
+ else:
+ return txn.homeWithUID(homeType, uid)
+
+
+
+ at inlineCallbacks
+def _normalizeColumnUUIDs(txn, column):
+ """
+ Upper-case the UUIDs in the given SQL DAL column.
+
+ @param txn: The transaction.
+ @type txn: L{CommonStoreTransaction}
+
+ @param column: the column, which may contain UIDs, to normalize.
+ @type column: L{ColumnSyntax}
+
+ @return: A L{Deferred} that will fire when the UUID normalization of the
+ given column has completed.
+ """
+ tableModel = column.model.table
+ # Get a primary key made of column syntax objects for querying and
+ # comparison later.
+ pkey = [ColumnSyntax(columnModel)
+ for columnModel in tableModel.primaryKey]
+ for row in (yield Select([column] + pkey,
+ From=TableSyntax(tableModel)).on(txn)):
+ before = row[0]
+ pkeyparts = row[1:]
+ after = normalizeUUIDOrNot(before)
+ if after != before:
+ where = _AndNothing
+ # Build a where clause out of the primary key and the parts of the
+ # primary key that were found.
+ for pkeycol, pkeypart in zip(pkeyparts, pkey):
+ where = where.And(pkeycol == pkeypart)
+ yield Update({column: after}, Where=where).on(txn)
+
+
+
+class _AndNothing(object):
+ """
+ Simple placeholder for iteratively generating a 'Where' clause; the 'And'
+ just returns its argument, so it can be used at the start of the loop.
+ """
+ @staticmethod
+ def And(self):
+ """
+ Return the argument.
+ """
+ return self
+
+
+
+ at inlineCallbacks
+def _needsNormalizationUpgrade(txn):
+ """
+ Determine whether a given store requires a UUID normalization data upgrade.
+
+ @param txn: the transaction to use
+ @type txn: L{CommonStoreTransaction}
+
+ @return: a L{Deferred} that fires with C{True} or C{False} depending on
+ whether we need the normalization upgrade or not.
+ """
+ for x in [schema.CALENDAR_HOME, schema.ADDRESSBOOK_HOME,
+ schema.NOTIFICATION_HOME]:
+ slct = Select([x.OWNER_UID], From=x,
+ Where=x.OWNER_UID != Upper(x.OWNER_UID))
+ rows = yield slct.on(txn)
+ if rows:
+ for [uid] in rows:
+ if normalizeUUIDOrNot(uid) != uid:
+ returnValue(True)
+ returnValue(False)
+
+
+
+ at inlineCallbacks
+def fixUUIDNormalization(store):
+ """
+ Fix all UUIDs in the given SQL store to be in a canonical form;
+ 00000000-0000-0000-0000-000000000000 format and upper-case.
+ """
+ t = store.newTransaction(disableCache=True)
+
+ # First, let's see if there are any calendar, addressbook, or notification
+ # homes that have a de-normalized OWNER_UID. If there are none, then we can
+ # early-out and avoid the tedious and potentially expensive inspection of
+ # oodles of calendar data.
+ if not (yield _needsNormalizationUpgrade(t)):
+ log.info("No potentially denormalized UUIDs detected, "
+ "skipping normalization upgrade.")
+ yield t.abort()
+ returnValue(None)
+ try:
+ yield _normalizeHomeUUIDsIn(t, ECALENDARTYPE)
+ yield _normalizeHomeUUIDsIn(t, EADDRESSBOOKTYPE)
+ yield _normalizeHomeUUIDsIn(t, ENOTIFICATIONTYPE)
+ yield _normalizeColumnUUIDs(t, schema.RESOURCE_PROPERTY.VIEWER_UID)
+ yield _normalizeColumnUUIDs(t, schema.APN_SUBSCRIPTIONS.SUBSCRIBER_GUID)
+ except:
+ log.failure("Unable to normalize UUIDs")
+ yield t.abort()
+ # There's a lot of possible problems here which are very hard to test
+ # for individually; unexpected data that might cause constraint
+ # violations under one of the manipulations done by
+ # normalizeHomeUUIDsIn. Since this upgrade does not come along with a
+ # schema version bump and may be re- attempted at any time, just raise
+ # the exception and log it so that we can try again later, and the
+ # service will survive for everyone _not_ affected by this somewhat
+ # obscure bug.
+ else:
+ yield t.commit()
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/test/test_sql.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/test/test_sql.py 2015-02-20 18:06:03 UTC (rev 14461)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/test/test_sql.py 2015-02-20 18:41:05 UTC (rev 14462)
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
##
+from txdav.common.datastore.sql_util import _normalizeColumnUUIDs, \
+ fixUUIDNormalization
"""
Tests for L{txdav.common.datastore.sql}.
@@ -31,7 +33,6 @@
from txdav.common.datastore.sql_tables import schema
from txdav.common.datastore.test.util import CommonCommonTests
from txdav.common.icommondatastore import AllRetriesFailed
-from txdav.common.datastore.sql import fixUUIDNormalization
from txdav.xml import element as davxml
from uuid import UUID
@@ -371,7 +372,6 @@
rp.VIEWER_UID: "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"}
).on(txn)
# test
- from txdav.common.datastore.sql import _normalizeColumnUUIDs
yield _normalizeColumnUUIDs(txn, rp.VIEWER_UID)
self.assertEqual(
map(
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/upgrade/sql/upgrades/calendar_upgrade_from_2_to_3.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/upgrade/sql/upgrades/calendar_upgrade_from_2_to_3.py 2015-02-20 18:06:03 UTC (rev 14461)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/upgrade/sql/upgrades/calendar_upgrade_from_2_to_3.py 2015-02-20 18:41:05 UTC (rev 14462)
@@ -21,9 +21,9 @@
as in calendar data and properties.
"""
-from txdav.common.datastore.sql import fixUUIDNormalization
from twisted.internet.defer import inlineCallbacks
from txdav.common.datastore.upgrade.sql.upgrades.util import updateCalendarDataVersion
+from txdav.common.datastore.sql_util import fixUUIDNormalization
UPGRADE_TO_VERSION = 3
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/icommondatastore.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/icommondatastore.py 2015-02-20 18:06:03 UTC (rev 14461)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/icommondatastore.py 2015-02-20 18:41:05 UTC (rev 14462)
@@ -40,6 +40,12 @@
"InternalDataStoreError",
]
+# Constants for top-level store types
+ECALENDARTYPE = 0
+EADDRESSBOOKTYPE = 1
+ENOTIFICATIONTYPE = 2
+
+
#
# Exceptions
#
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150220/faf857d2/attachment-0001.html>
More information about the calendarserver-changes
mailing list