[CalendarServer-changes] [14403] CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav
source_changes at macosforge.org
source_changes at macosforge.org
Wed Feb 11 12:34:55 PST 2015
Revision: 14403
http://trac.calendarserver.org//changeset/14403
Author: cdaboo at apple.com
Date: 2015-02-11 12:34:55 -0800 (Wed, 11 Feb 2015)
Log Message:
-----------
Checkpoint: attachment sync'ing during migration.
Modified Paths:
--------------
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql_external.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/attachments.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/conduit.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/migration/home_sync.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/migration/test/test_home_sync.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/request.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/resource.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/test/test_conduit.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/test/util.py
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_schema/current-oracle-dialect.sql
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_schema/current.sql
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_51_to_52.sql
CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_51_to_52.sql
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql.py 2015-02-11 19:55:49 UTC (rev 14402)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql.py 2015-02-11 20:34:55 UTC (rev 14403)
@@ -638,6 +638,24 @@
@inlineCallbacks
+ def getAllAttachments(self):
+ """
+ Return all the L{Attachment} objects associated with this calendar home.
+ Needed during migration.
+ """
+ attachments = yield Attachment.loadAllAttachments(self)
+ returnValue(attachments)
+
+
+ def getAttachmentByID(self, id):
+ """
+ Return a specific attachment associated with this calendar home.
+ Needed during migration only.
+ """
+ return Attachment.loadAttachmentByID(self, id)
+
+
+ @inlineCallbacks
def getAllDropboxIDs(self):
co = schema.CALENDAR_OBJECT
cb = schema.CALENDAR_BIND
@@ -4646,8 +4664,10 @@
@inlineCallbacks
def attachments(self):
if self._dropboxID:
- rows = yield self._attachmentsQuery.on(self._txn,
- dropboxID=self._dropboxID)
+ rows = yield self._attachmentsQuery.on(
+ self._txn,
+ dropboxID=self._dropboxID,
+ )
result = []
for row in rows:
result.append((yield self.attachmentWithName(row[0])))
@@ -4920,7 +4940,7 @@
_TEMPORARY_UPLOADS_DIRECTORY = "Temporary"
- def __init__(self, attachment, contentType, dispositionName, creating=False):
+ def __init__(self, attachment, contentType, dispositionName, creating=False, migrating=False):
super(AttachmentStorageTransport, self).__init__(
attachment, contentType, dispositionName)
@@ -4930,6 +4950,7 @@
self._path = CachingFilePath(fileName)
self._hash = hashlib.md5()
self._creating = creating
+ self._migrating = migrating
self._txn.postAbort(self.aborted)
@@ -4970,6 +4991,10 @@
@inlineCallbacks
def loseConnection(self):
+ """
+ Note that when self._migrating is set we only care about the data and don't need to
+ do any quota checks/adjustments.
+ """
# FIXME: this should be synchronously accessible; IAttachment should
# have a method for getting its parent just as CalendarObject/Calendar
@@ -4986,20 +5011,21 @@
self._file.close()
# Check max size for attachment
- if newSize > config.MaximumAttachmentSize:
+ if not self._migrating and newSize > config.MaximumAttachmentSize:
self._path.remove()
if self._creating:
yield self._attachment._internalRemove()
raise AttachmentSizeTooLarge()
# Check overall user quota
- allowed = home.quotaAllowedBytes()
- if allowed is not None and allowed < ((yield home.quotaUsedBytes())
- + (newSize - oldSize)):
- self._path.remove()
- if self._creating:
- yield self._attachment._internalRemove()
- raise QuotaExceeded()
+ if not self._migrating:
+ allowed = home.quotaAllowedBytes()
+ if allowed is not None and allowed < ((yield home.quotaUsedBytes())
+ + (newSize - oldSize)):
+ self._path.remove()
+ if self._creating:
+ yield self._attachment._internalRemove()
+ raise QuotaExceeded()
self._path.moveTo(self._attachment._path)
@@ -5010,7 +5036,7 @@
newSize
)
- if home:
+ if not self._migrating and home:
# Adjust quota
yield home.adjustQuotaUsedBytes(self._attachment.size() - oldSize)
@@ -5028,6 +5054,153 @@
implements(IAttachment)
+ _attachmentSchema = schema.ATTACHMENT
+
+ @classmethod
+ def makeClass(cls, txn, attachmentData):
+ """
+ Given the various database rows, build the actual class.
+
+ @param parent: the parent collection object
+ @type parent: L{CommonHomeChild}
+
+ @param objectData: the standard set of object columns
+ @type objectData: C{list}
+
+ @param propstore: a property store to use, or C{None} to load it
+ automatically
+ @type propstore: L{PropertyStore}
+
+ @return: the constructed child class
+ @rtype: L{CommonHomeChild}
+ """
+
+ att = schema.ATTACHMENT
+ dropbox_id = attachmentData[cls._allColumns().index(att.DROPBOX_ID)]
+ c = ManagedAttachment if dropbox_id == "." else DropBoxAttachment
+ child = c(
+ txn,
+ attachmentData[cls._allColumns().index(att.ATTACHMENT_ID)],
+ attachmentData[cls._allColumns().index(att.DROPBOX_ID)],
+ attachmentData[cls._allColumns().index(att.PATH)],
+ )
+
+ for attr, value in zip(child._rowAttributes(), attachmentData):
+ setattr(child, attr, value)
+ child._contentType = MimeType.fromString(child._contentType)
+
+ return child
+
+
+ @classmethod
+ def _allColumns(cls):
+ """
+ Full set of columns in the object table that need to be loaded to
+ initialize the object resource state.
+ """
+ att = cls._attachmentSchema
+ return [
+ att.ATTACHMENT_ID,
+ att.DROPBOX_ID,
+ att.CALENDAR_HOME_RESOURCE_ID,
+ att.CONTENT_TYPE,
+ att.SIZE,
+ att.MD5,
+ att.CREATED,
+ att.MODIFIED,
+ att.PATH,
+ ]
+
+
+ @classmethod
+ def _rowAttributes(cls):
+ """
+ Object attributes used to store the column values from L{_allColumns}. This used to create
+ a mapping when serializing the object for cross-pod requests.
+ """
+ return (
+ "_attachmentID",
+ "_dropboxID",
+ "_ownerHomeID",
+ "_contentType",
+ "_size",
+ "_md5",
+ "_created",
+ "_modified",
+ "_name",
+ )
+
+
+ @classmethod
+ @inlineCallbacks
+ def loadAllAttachments(cls, home):
+ """
+ Load all attachments assigned to the specified home collection. This should only be
+ used when sync'ing an entire home's set of attachments.
+ """
+
+ results = []
+
+ # Load from the main table first
+ att = cls._attachmentSchema
+ dataRows = yield Select(
+ cls._allColumns(),
+ From=att,
+ Where=att.CALENDAR_HOME_RESOURCE_ID == home.id(),
+ ).on(home._txn)
+
+ # Create the actual objects
+ for row in dataRows:
+ child = cls.makeClass(home._txn, row)
+ results.append(child)
+
+ returnValue(results)
+
+
+ @classmethod
+ @inlineCallbacks
+ def loadAttachmentByID(cls, home, id):
+ """
+ Load one attachments assigned to the specified home collection. This should only be
+ used when sync'ing an entire home's set of attachments.
+ """
+
+ # Load from the main table first
+ att = cls._attachmentSchema
+ rows = yield Select(
+ cls._allColumns(),
+ From=att,
+ Where=(att.CALENDAR_HOME_RESOURCE_ID == home.id()).And(
+ att.ATTACHMENT_ID == id),
+ ).on(home._txn)
+
+ # Create the actual object
+ returnValue(cls.makeClass(home._txn, rows[0]) if len(rows) == 1 else None)
+
+
+ def externalize(self):
+ """
+ Create a dictionary mapping key attributes so this object can be sent over a cross-pod call
+ and reconstituted at the other end. Note that the other end may have a different schema so
+ the attributes may not match exactly and will need to be processed accordingly.
+ """
+ result = dict([(attr[1:], getattr(self, attr, None)) for attr in self._rowAttributes()])
+ result["contentType"] = generateContentType(result["contentType"])
+ return result
+
+
+ @classmethod
+ def internalize(cls, txn, mapping):
+ """
+ Given a mapping generated by L{externalize}, convert the values into an array of database
+ like items that conforms to the ordering of L{_allColumns} so it can be fed into L{makeClass}.
+ Note that there may be a schema mismatch with the external data, so treat missing items as
+ C{None} and ignore extra items.
+ """
+
+ return cls.makeClass(txn, [mapping.get(row[1:]) for row in cls._rowAttributes()])
+
+
def __init__(self, txn, a_id, dropboxID, name, ownerHomeID=None, justCreated=False):
self._txn = txn
self._attachmentID = a_id
@@ -5061,23 +5234,13 @@
@return: C{True} if this attachment exists, C{False} otherwise.
"""
att = schema.ATTACHMENT
- if self._dropboxID:
+ if self._dropboxID and self._dropboxID != ".":
where = (att.DROPBOX_ID == self._dropboxID).And(
att.PATH == self._name)
else:
where = (att.ATTACHMENT_ID == self._attachmentID)
rows = (yield Select(
- [
- att.ATTACHMENT_ID,
- att.DROPBOX_ID,
- att.CALENDAR_HOME_RESOURCE_ID,
- att.CONTENT_TYPE,
- att.SIZE,
- att.MD5,
- att.CREATED,
- att.MODIFIED,
- att.PATH,
- ],
+ self._allColumns(),
From=att,
Where=where
).on(self._txn))
@@ -5085,20 +5248,28 @@
if not rows:
returnValue(None)
- row_iter = iter(rows[0])
- self._attachmentID = row_iter.next()
- self._dropboxID = row_iter.next()
- self._ownerHomeID = row_iter.next()
- self._contentType = MimeType.fromString(row_iter.next())
- self._size = row_iter.next()
- self._md5 = row_iter.next()
- self._created = sqltime(row_iter.next())
- self._modified = sqltime(row_iter.next())
- self._name = row_iter.next()
+ for attr, value in zip(self._rowAttributes(), rows[0]):
+ setattr(self, attr, value)
+ self._contentType = MimeType.fromString(self._contentType)
returnValue(self)
+ @inlineCallbacks
+ def copyRemote(self, remote):
+ """
+ Copy properties from a remote (external) attachment that is being migrated.
+
+ @param remote: the external attachment
+ @type remote: L{Attachment}
+ """
+ yield self.changed(remote.contentType(), remote.name(), remote.md5(), remote.size())
+
+
+ def id(self):
+ return self._attachmentID
+
+
def dropboxID(self):
return self._dropboxID
@@ -5115,10 +5286,10 @@
pass # stub
- def store(self, contentType, dispositionName=None):
+ def store(self, contentType, dispositionName=None, migrating=False):
if not self._name:
self._name = dispositionName
- return AttachmentStorageTransport(self, contentType, dispositionName, self._justCreated)
+ return AttachmentStorageTransport(self, contentType, dispositionName, self._justCreated, migrating=migrating)
def retrieve(self, protocol):
@@ -5135,17 +5306,19 @@
@inlineCallbacks
- def remove(self):
+ def remove(self, adjustQuota=True):
oldSize = self._size
self._txn.postCommit(self.removePaths)
yield self._internalRemove()
+
# Adjust quota
- home = (yield self._txn.calendarHomeWithResourceID(self._ownerHomeID))
- if home:
- yield home.adjustQuotaUsedBytes(-oldSize)
+ if adjustQuota:
+ home = (yield self._txn.calendarHomeWithResourceID(self._ownerHomeID))
+ if home:
+ yield home.adjustQuotaUsedBytes(-oldSize)
- # Send change notification to home
- yield home.notifyChanged()
+ # Send change notification to home
+ yield home.notifyChanged()
def removePaths(self):
@@ -5200,7 +5373,7 @@
).on(txn))
for attachmentID, dropboxID in rows:
- if dropboxID:
+ if dropboxID != ".":
attachment = DropBoxAttachment(txn, attachmentID, None, None)
else:
attachment = ManagedAttachment(txn, attachmentID, None, None)
@@ -5473,7 +5646,7 @@
@inlineCallbacks
def create(cls, txn, managedID, ownerHomeID, referencedBy):
"""
- Create a new Attachment object.
+ Create a new Attachment object and reference it.
@param txn: The transaction to use
@type txn: L{CommonStoreTransaction}
@@ -5504,7 +5677,8 @@
@inlineCallbacks
def update(cls, txn, oldManagedID, ownerHomeID, referencedBy, oldAttachmentID):
"""
- Create a new Attachment object.
+ Update an Attachment object. This creates a new one and adjusts the reference to the old
+ one to point to the new one. If the old one is no longer referenced at all, it is deleted.
@param txn: The transaction to use
@type txn: L{CommonStoreTransaction}
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql_external.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql_external.py 2015-02-11 19:55:49 UTC (rev 14402)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/caldav/datastore/sql_external.py 2015-02-11 20:34:55 UTC (rev 14403)
@@ -22,7 +22,8 @@
from twext.python.log import Logger
-from txdav.caldav.datastore.sql import CalendarHome, Calendar, CalendarObject
+from txdav.caldav.datastore.sql import CalendarHome, Calendar, CalendarObject, \
+ Attachment
from txdav.caldav.icalendarstore import ComponentUpdateState, ComponentRemoveState
from txdav.common.datastore.sql_external import CommonHomeExternal, CommonHomeChildExternal, \
CommonObjectResourceExternal
@@ -61,6 +62,30 @@
raise AssertionError("CommonHomeExternal: not supported")
+ @inlineCallbacks
+ def getAllAttachments(self):
+ """
+ Return all the L{Attachment} objects associated with this calendar home.
+ Needed during migration.
+ """
+ raw_results = yield self._txn.store().conduit.send_get_all_attachments(self)
+
+ results = []
+ for attachment in raw_results:
+ results.append(Attachment.internalize(self._txn, attachment))
+ returnValue(results)
+
+
+ @inlineCallbacks
+ def readAttachmentData(self, remote_id, attachment):
+ """
+ Read the data associated with an attachment associated with this calendar home.
+ Needed during migration only.
+ """
+ stream = attachment.store(attachment.contentType(), attachment.name(), migrating=True)
+ yield self._txn.store().conduit.send_get_attachment_data(self, remote_id, stream)
+
+
def getAllDropboxIDs(self):
"""
No children.
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/attachments.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/attachments.py 2015-02-11 19:55:49 UTC (rev 14402)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/attachments.py 2015-02-11 20:34:55 UTC (rev 14403)
@@ -15,6 +15,8 @@
##
from twisted.internet.defer import inlineCallbacks, returnValue
+from txdav.caldav.icalendarstore import InvalidAttachmentOperation
+from txweb2.http_headers import generateContentType
class AttachmentsConduitMixin(object):
@@ -150,3 +152,73 @@
request["rids"],
request["managedID"],
)
+
+
+ @inlineCallbacks
+ def send_get_all_attachments(self, home):
+ """
+ Managed attachment removeAttachment call.
+
+ @param home: the home whose attachments are being requested
+ @type home: L{CalendarHome}
+ """
+
+ actionName = "get-all-attachments"
+ txn, request, server = yield self._getRequestForStoreObject(actionName, home, False)
+
+ response = yield self.sendRequestToServer(txn, server, request)
+ returnValue(response)
+
+
+ @inlineCallbacks
+ def recv_get_all_attachments(self, txn, request):
+ """
+ Process an getAllAttachments cross-pod request. Request arguments as per L{send_get_all_attachments}.
+
+ @param request: request arguments
+ @type request: C{dict}
+ """
+
+ home, _ignore = yield self._getStoreObjectForRequest(txn, request)
+ attachments = yield home.getAllAttachments()
+ returnValue([attachment.externalize() for attachment in attachments])
+
+
+ @inlineCallbacks
+ def send_get_attachment_data(self, home, attachment_id, stream):
+ """
+ Managed attachment readAttachmentData call. We are using streams on the sender and the receiver
+ side to avoid reading the whole attachment into memory.
+
+ @param home: the home whose attachment is being read
+ @type home: L{CalendarHome}
+ @param attachment_id: attachment-id to get
+ @type attachment_id: C{str}
+ @param stream: attachment data stream to write to
+ @type stream: L{IStream}
+ """
+
+ actionName = "get-attachment-data"
+ txn, request, server = yield self._getRequestForStoreObject(actionName, home, False)
+ request["attachmentID"] = attachment_id
+
+ response = yield self.sendRequestToServer(txn, server, request, writeStream=stream)
+ returnValue(response)
+
+
+ @inlineCallbacks
+ def recv_get_attachment_data(self, txn, request, stream):
+ """
+ Process an getAttachmentData cross-pod request. Request arguments as per L{send_get_attachment_data}.
+
+ @param request: request arguments
+ @type request: C{dict}
+ """
+
+ home, _ignore = yield self._getStoreObjectForRequest(txn, request)
+ attachment = yield home.getAttachmentByID(request["attachmentID"])
+ if attachment is None:
+ raise InvalidAttachmentOperation("Attachment is missing: {}".format(request["attachmentID"]))
+
+ attachment.retrieve(stream)
+ returnValue((generateContentType(attachment.contentType()), attachment.name(),))
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/conduit.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/conduit.py 2015-02-11 19:55:49 UTC (rev 14402)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/conduit.py 2015-02-11 20:34:55 UTC (rev 14403)
@@ -72,6 +72,7 @@
@param store: the L{CommonDataStore} in use.
"""
self.store = store
+ self.streamingActions = ("get-attachment-data",)
@inlineCallbacks
@@ -108,9 +109,9 @@
@inlineCallbacks
- def sendRequestToServer(self, txn, server, data, stream=None, streamType=None):
+ def sendRequestToServer(self, txn, server, data, stream=None, streamType=None, writeStream=None):
- request = self.conduitRequestClass(server, data, stream, streamType)
+ request = self.conduitRequestClass(server, data, stream, streamType, writeStream)
try:
response = (yield request.doRequest(txn))
except Exception as e:
@@ -123,6 +124,24 @@
returnValue(response.get("value"))
+ def isStreamAction(self, data):
+ """
+ Check to see if this is a request that will return a data stream rather than a JSON response.
+ e.g., this is used to retrieve attachment data on another pod.
+
+ @param data: the JSON data to process
+ @type data: C{dict}
+ """
+ # Must have a dict with an "action" key
+ try:
+ action = data["action"]
+ except (KeyError, TypeError) as e:
+ log.error("JSON data must have an object as its root with an 'action' attribute: {ex}\n{json}", ex=e, json=data)
+ return False
+
+ return action in self.streamingActions
+
+
@inlineCallbacks
def processRequest(self, data):
"""
@@ -171,3 +190,48 @@
yield txn.commit()
returnValue(result)
+
+
+ @inlineCallbacks
+ def processRequestStream(self, data, stream):
+ """
+ Process the request.
+
+ @param data: the JSON data to process
+ @type data: C{dict}
+
+ @return: a L{tuple} of content-type and name, if successful, else a L{dict} for a JSON result
+ @rtype: L{tuple} of (L{str}, L{str}), or L{dict}
+ """
+ # Must have a dict with an "action" key
+ try:
+ action = data["action"]
+ except (KeyError, TypeError) as e:
+ log.error("JSON data must have an object as its root with an 'action' attribute: {ex}\n{json}", ex=e, json=data)
+ raise FailedCrossPodRequestError("JSON data must have an object as its root with an 'action' attribute: {}\n{}".format(e, data,))
+
+ method = "recv_{}".format(action.replace("-", "_"))
+ if not hasattr(self, method):
+ log.error("Unsupported action: {action}", action=action)
+ raise FailedCrossPodRequestError("Unsupported action: {}".format(action))
+
+ # Need a transaction to work with
+ txn = self.store.newTransaction(repr("Conduit request"))
+
+ # Do the actual request processing
+ try:
+ result = (yield getattr(self, method)(txn, data, stream))
+ except Exception as e:
+ # Send the exception over to the other side
+ yield txn.abort()
+ log.error("Failed action: {action}, {ex}", action=action, ex=e)
+ result = {
+ "result": "exception",
+ "class": ".".join((e.__class__.__module__, e.__class__.__name__,)),
+ "details": str(e),
+ }
+
+ else:
+ yield txn.commit()
+
+ returnValue(result)
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/migration/home_sync.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/migration/home_sync.py 2015-02-11 19:55:49 UTC (rev 14402)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/migration/home_sync.py 2015-02-11 20:34:55 UTC (rev 14403)
@@ -23,6 +23,7 @@
from twisted.internet.defer import returnValue, inlineCallbacks
from twisted.python.failure import Failure
from txdav.caldav.icalendarstore import ComponentUpdateState
+from txdav.caldav.datastore.sql import ManagedAttachment
from txdav.common.datastore.sql_tables import schema
from txdav.common.idirectoryservice import DirectoryRecordNotFoundError
@@ -159,7 +160,7 @@
yield self.syncCalendarHomeMetaData()
# TODO: sync attachments
- pass
+ yield self.syncAttachments()
# TODO: group attendee/sharee reconcile
pass
@@ -172,6 +173,9 @@
rows, recalculate quota etc.
"""
+ # TODO: link attachments to resources: ATTACHMENT_CALENDAR_OBJECT table
+ pass
+
# TODO: Re-write attachment URIs - not sure if we need this as reverse proxy may take care of it
pass
@@ -184,7 +188,10 @@
# TODO: notifications
pass
+ # TODO: work items
+ pass
+
@inlineCallbacks
def disableRemoteHome(self):
"""
@@ -320,11 +327,11 @@
"""
Get local synchronization state for the home being migrated.
"""
- cms = schema.CALENDAR_MIGRATION_STATE
+ cm = schema.CALENDAR_MIGRATION
rows = yield Select(
- columns=(cms.REMOTE_RESOURCE_ID, cms.CALENDAR_RESOURCE_ID, cms.LAST_SYNC_TOKEN,),
- From=cms,
- Where=(cms.CALENDAR_HOME_RESOURCE_ID == self.homeId)
+ columns=(cm.REMOTE_RESOURCE_ID, cm.CALENDAR_RESOURCE_ID, cm.LAST_SYNC_TOKEN,),
+ From=cm,
+ Where=(cm.CALENDAR_HOME_RESOURCE_ID == self.homeId)
).on(txn)
returnValue(dict([(remote_id, self.CalendarSyncState(local_id, sync,)) for remote_id, local_id, sync in rows]))
@@ -335,7 +342,7 @@
"""
Get local synchronization state for the home being migrated.
"""
- cms = schema.CALENDAR_MIGRATION_STATE
+ cm = schema.CALENDAR_MIGRATION
old_details = yield self.getSyncState(txn=txn)
@@ -343,9 +350,9 @@
missing = set(old_details.keys()) - set(details.keys())
if missing:
yield Delete(
- From=cms,
- Where=(cms.CALENDAR_HOME_RESOURCE_ID == self.homeId).And(
- cms.REMOTE_RESOURCE_ID.In(Parameter("missing", len(missing)))
+ From=cm,
+ Where=(cm.CALENDAR_HOME_RESOURCE_ID == self.homeId).And(
+ cm.REMOTE_RESOURCE_ID.In(Parameter("missing", len(missing)))
)
).on(txn, missing=missing)
@@ -353,10 +360,10 @@
insert = set(details.keys()) - set(old_details.keys())
for key in insert:
yield Insert({
- cms.CALENDAR_HOME_RESOURCE_ID: self.homeId,
- cms.REMOTE_RESOURCE_ID: key,
- cms.CALENDAR_RESOURCE_ID: details[key].localID,
- cms.LAST_SYNC_TOKEN: details[key].lastSyncToken,
+ cm.CALENDAR_HOME_RESOURCE_ID: self.homeId,
+ cm.REMOTE_RESOURCE_ID: key,
+ cm.CALENDAR_RESOURCE_ID: details[key].localID,
+ cm.LAST_SYNC_TOKEN: details[key].lastSyncToken,
}).on(txn)
# Update existing ones
@@ -364,11 +371,11 @@
for key in updates:
yield Update(
{
- cms.CALENDAR_RESOURCE_ID: details[key].localID,
- cms.LAST_SYNC_TOKEN: details[key].lastSyncToken,
+ cm.CALENDAR_RESOURCE_ID: details[key].localID,
+ cm.LAST_SYNC_TOKEN: details[key].lastSyncToken,
},
- Where=(cms.CALENDAR_HOME_RESOURCE_ID == self.homeId).And(
- cms.REMOTE_RESOURCE_ID == key
+ Where=(cm.CALENDAR_HOME_RESOURCE_ID == self.homeId).And(
+ cm.REMOTE_RESOURCE_ID == key
)
).on(txn)
@@ -612,6 +619,7 @@
# matches the remote one (which should help reduce the need for a client to resync
# the data when moved from one pod to the other).
txn._migrating = True
+ com = schema.CALENDAR_OBJECT_MIGRATION
for obj_name in remote_objects.keys():
remote_object = remote_objects[obj_name]
remote_data = yield remote_object.component()
@@ -623,9 +631,126 @@
else:
local_object = yield local_calendar._createCalendarObjectWithNameInternal(obj_name, remote_data, internal_state=ComponentUpdateState.RAW)
+ # Maintain the mapping from the remote to local id. Note that this mapping never changes as the ids on both
+ # sides are immutable - though it may get deleted if the local object is removed during sync (via a cascade).
+ yield Insert(
+ {
+ com.CALENDAR_HOME_RESOURCE_ID: self.homeId,
+ com.REMOTE_RESOURCE_ID: remote_object.id(),
+ com.LOCAL_RESOURCE_ID: local_object.id()
+ }
+ ).on(txn)
+
# Sync meta-data such as schedule object, schedule tags, access mode etc
yield local_object.copyMetadata(remote_object)
# Purge the ones that remain
for local_object in local_objects.values():
yield local_object.purge()
+
+
+ @inlineCallbacks
+ def syncAttachments(self):
+ """
+ Sync attachments (both metadata and actual attachment data) for the home being migrated.
+ """
+
+ # Two steps - sync the table first in one txn, then sync each attachment's data
+ changed_ids, removed_ids = yield self.syncAttachmentTable()
+
+ for local_id in changed_ids:
+ yield self.syncAttachmentData(local_id)
+
+ returnValue((changed_ids, removed_ids,))
+
+
+ @inTransactionWrapper
+ @inlineCallbacks
+ def syncAttachmentTable(self, txn):
+ """
+ Sync the ATTACHMENT table data for the home being migrated. Return the list of local attachment ids that
+ now need there attachment data sync'd from the server.
+ """
+
+ remote_home = yield self._remoteHome(txn)
+ rattachments = yield remote_home.getAllAttachments()
+ rmap = dict([(attachment.id(), attachment) for attachment in rattachments])
+
+ local_home = yield txn.calendarHomeWithUID(self.migratingUid())
+ lattachments = yield local_home.getAllAttachments()
+ lmap = dict([(attachment.id(), attachment) for attachment in lattachments])
+
+ # Figure out the differences
+ am = schema.ATTACHMENT_MIGRATION
+ rows = yield Select(
+ [am.REMOTE_RESOURCE_ID, am.LOCAL_RESOURCE_ID],
+ From=am,
+ Where=(am.CALENDAR_HOME_RESOURCE_ID == self.homeId),
+ ).on(txn)
+ mapping = dict(rows)
+
+ # Removed - remove attachment and migration state
+ removed = set(mapping.keys()) - set(rmap.keys())
+ for remove_id in removed:
+ local_id = mapping[remove_id]
+ att = yield ManagedAttachment.load(txn, None, None, attachmentID=local_id)
+ if att:
+ yield att.remove(adjustQuota=False)
+ yield Delete(
+ From=am,
+ Where=(am.LOCAL_RESOURCE_ID == local_id),
+ ).on(txn)
+
+ # Track which ones need attachment data sync'd over
+ data_ids = set()
+
+ # Added - add new attachment and migration state
+ added = set(rmap.keys()) - set(mapping.keys())
+ for added_id in added:
+ attachment = yield ManagedAttachment._create(txn, None, self.homeId)
+ yield Insert(
+ {
+ am.CALENDAR_HOME_RESOURCE_ID: self.homeId,
+ am.REMOTE_RESOURCE_ID: added_id,
+ am.LOCAL_RESOURCE_ID: attachment.id(),
+ }
+ ).on(txn)
+ data_ids.add(attachment.id())
+
+ # Possible updates - check for md5 change and sync
+ updates = set(mapping.keys()) & set(rmap.keys())
+ for updated_id in updates:
+ local_id = mapping[updated_id]
+ if rmap[updated_id].md5() != lmap[local_id].md5():
+ yield lmap[local_id].copyRemote(rmap[updated_id])
+ data_ids.add(local_id)
+
+ returnValue((data_ids, removed,))
+
+
+ @inTransactionWrapper
+ @inlineCallbacks
+ def syncAttachmentData(self, txn, local_id):
+ """
+ Sync the attachment data for the home being migrated.
+ """
+
+ remote_home = yield self._remoteHome(txn)
+ local_home = yield txn.calendarHomeWithUID(self.migratingUid())
+ attachment = yield local_home.getAttachmentByID(local_id)
+ if attachment is None:
+ returnValue(None)
+
+ am = schema.ATTACHMENT_MIGRATION
+ rows = yield Select(
+ [am.LOCAL_RESOURCE_ID, am.REMOTE_RESOURCE_ID],
+ From=am,
+ Where=(am.CALENDAR_HOME_RESOURCE_ID == self.homeId),
+ ).on(txn)
+ mapping = dict(rows)
+ remote_id = mapping.get(local_id)
+ if remote_id is None:
+ returnValue(None)
+
+ # Read the data from the conduit
+ yield remote_home.readAttachmentData(remote_id, attachment)
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/migration/test/test_home_sync.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/migration/test/test_home_sync.py 2015-02-11 19:55:49 UTC (rev 14402)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/migration/test/test_home_sync.py 2015-02-11 20:34:55 UTC (rev 14403)
@@ -14,11 +14,15 @@
# limitations under the License.
##
+from pycalendar.datetime import DateTime
+from twext.enterprise.dal.syntax import Select
from twisted.internet.defer import inlineCallbacks
+from twistedcaldav.ical import Component, normalize_iCalStr
+from txdav.common.datastore.podding.migration.home_sync import CrossPodHomeSync
from txdav.common.datastore.podding.test.util import MultiStoreConduitTest
-from txdav.common.datastore.podding.migration.home_sync import CrossPodHomeSync
-from pycalendar.datetime import DateTime
-from twistedcaldav.ical import Component, normalize_iCalStr
+from txdav.common.datastore.sql_tables import schema
+from txweb2.http_headers import MimeType
+from txweb2.stream import MemoryStream
class TestConduitAPI(MultiStoreConduitTest):
@@ -246,10 +250,11 @@
home0 = yield self.homeUnderTest(txn=self.theTransactionUnderTest(0), name="user01", create=True)
calendar0 = yield home0.childWithName("calendar")
- yield calendar0.createCalendarObjectWithName("1.ics", Component.fromString(self.caldata1))
- yield calendar0.createCalendarObjectWithName("2.ics", Component.fromString(self.caldata2))
- yield calendar0.createCalendarObjectWithName("3.ics", Component.fromString(self.caldata3))
+ o1 = yield calendar0.createCalendarObjectWithName("1.ics", Component.fromString(self.caldata1))
+ o2 = yield calendar0.createCalendarObjectWithName("2.ics", Component.fromString(self.caldata2))
+ o3 = yield calendar0.createCalendarObjectWithName("3.ics", Component.fromString(self.caldata3))
remote_id = calendar0.id()
+ mapping0 = dict([(o.name(), o.id()) for o in (o1, o2, o3)])
yield self.commitTransaction(0)
syncer = CrossPodHomeSync(self.theStoreUnderTest(1), "user01")
@@ -273,12 +278,26 @@
self.assertEqual(len(local_sync_state), 1)
self.assertEqual(local_sync_state[remote_id].lastSyncToken, remote_sync_state[remote_id].lastSyncToken)
+ @inlineCallbacks
+ def _checkCalendarObjectMigrationState(home, mapping1):
+ com = schema.CALENDAR_OBJECT_MIGRATION
+ mappings = yield Select(
+ columns=[com.REMOTE_RESOURCE_ID, com.LOCAL_RESOURCE_ID],
+ From=com,
+ Where=(com.CALENDAR_HOME_RESOURCE_ID == home.id())
+ ).on(self.theTransactionUnderTest(1))
+ expected_mappings = dict([(mapping0[name], mapping1[name]) for name in mapping0.keys()])
+ self.assertEqual(dict(mappings), expected_mappings)
+
+
# Local calendar exists
home1 = yield self.homeUnderTest(txn=self.theTransactionUnderTest(1), name=syncer.migratingUid())
calendar1 = yield home1.childWithName("calendar")
self.assertTrue(calendar1 is not None)
- children = yield calendar1.listObjectResources()
- self.assertEqual(set(children), set(("1.ics", "2.ics", "3.ics",)))
+ children = yield calendar1.objectResources()
+ self.assertEqual(set([child.name() for child in children]), set(("1.ics", "2.ics", "3.ics",)))
+ mapping1 = dict([(o.name(), o.id()) for o in children])
+ yield _checkCalendarObjectMigrationState(home1, mapping1)
yield self.commitTransaction(1)
# Change one resource
@@ -307,6 +326,7 @@
txn=self.theTransactionUnderTest(0), home="user01", calendar_name="calendar", name="2.ics"
)
yield object0.remove()
+ del mapping0["2.ics"]
yield self.commitTransaction(0)
remote_sync_state = yield syncer.getCalendarSyncList()
@@ -317,13 +337,16 @@
)
calendar1 = yield self.calendarUnderTest(txn=self.theTransactionUnderTest(1), home=syncer.migratingUid(), name="calendar")
- children = yield calendar1.listObjectResources()
- self.assertEqual(set(children), set(("1.ics", "3.ics",)))
+ children = yield calendar1.objectResources()
+ self.assertEqual(set([child.name() for child in children]), set(("1.ics", "3.ics",)))
+ mapping1 = dict([(o.name(), o.id()) for o in children])
+ yield _checkCalendarObjectMigrationState(home1, mapping1)
yield self.commitTransaction(1)
# Add one resource
calendar0 = yield self.calendarUnderTest(txn=self.theTransactionUnderTest(0), home="user01", name="calendar")
- yield calendar0.createCalendarObjectWithName("4.ics", Component.fromString(self.caldata4))
+ o4 = yield calendar0.createCalendarObjectWithName("4.ics", Component.fromString(self.caldata4))
+ mapping0[o4.name()] = o4.id()
yield self.commitTransaction(0)
remote_sync_state = yield syncer.getCalendarSyncList()
@@ -334,8 +357,10 @@
)
calendar1 = yield self.calendarUnderTest(txn=self.theTransactionUnderTest(1), home=syncer.migratingUid(), name="calendar")
- children = yield calendar1.listObjectResources()
- self.assertEqual(set(children), set(("1.ics", "3.ics", "4.ics",)))
+ children = yield calendar1.objectResources()
+ self.assertEqual(set([child.name() for child in children]), set(("1.ics", "3.ics", "4.ics")))
+ mapping1 = dict([(o.name(), o.id()) for o in children])
+ yield _checkCalendarObjectMigrationState(home1, mapping1)
yield self.commitTransaction(1)
@@ -400,3 +425,166 @@
self.assertTrue("new-calendar" not in details1.values())
self.assertEqual(set(details1.values()), set(details0.values()))
yield self.commitTransaction(1)
+
+
+ @inlineCallbacks
+ def test_sync_attachments_add_remove(self):
+ """
+ Test that L{syncAttachments} syncs attachment data, then an update to the data,
+ and finally a removal of the data.
+ """
+
+
+ home0 = yield self.homeUnderTest(txn=self.theTransactionUnderTest(0), name="user01", create=True)
+ calendar0 = yield home0.childWithName("calendar")
+ yield calendar0.createCalendarObjectWithName("1.ics", Component.fromString(self.caldata1))
+ yield calendar0.createCalendarObjectWithName("2.ics", Component.fromString(self.caldata2))
+ yield calendar0.createCalendarObjectWithName("3.ics", Component.fromString(self.caldata3))
+ remote_id = calendar0.id()
+ mapping0 = dict()
+ yield self.commitTransaction(0)
+
+ syncer = CrossPodHomeSync(self.theStoreUnderTest(1), "user01")
+ yield syncer.loadRecord()
+ syncer.homeId = yield syncer.prepareCalendarHome()
+
+ # Trigger sync of the one calendar
+ local_sync_state = {}
+ remote_sync_state = yield syncer.getCalendarSyncList()
+ yield syncer.syncCalendar(
+ remote_id,
+ local_sync_state,
+ remote_sync_state,
+ )
+ self.assertEqual(len(local_sync_state), 1)
+ self.assertEqual(local_sync_state[remote_id].lastSyncToken, remote_sync_state[remote_id].lastSyncToken)
+
+ # Sync attachments
+ changed, removed = yield syncer.syncAttachments()
+ self.assertEqual(changed, set())
+ self.assertEqual(removed, set())
+
+ @inlineCallbacks
+ def _checkAttachmentObjectMigrationState(home, mapping1):
+ am = schema.ATTACHMENT_MIGRATION
+ mappings = yield Select(
+ columns=[am.REMOTE_RESOURCE_ID, am.LOCAL_RESOURCE_ID],
+ From=am,
+ Where=(am.CALENDAR_HOME_RESOURCE_ID == home.id())
+ ).on(self.theTransactionUnderTest(1))
+ expected_mappings = dict([(mapping0[name], mapping1[name]) for name in mapping0.keys()])
+ self.assertEqual(dict(mappings), expected_mappings)
+
+
+ # Local calendar exists
+ home1 = yield self.homeUnderTest(txn=self.theTransactionUnderTest(1), name=syncer.migratingUid())
+ calendar1 = yield home1.childWithName("calendar")
+ self.assertTrue(calendar1 is not None)
+ children = yield calendar1.objectResources()
+ self.assertEqual(set([child.name() for child in children]), set(("1.ics", "2.ics", "3.ics",)))
+
+ attachments = yield home1.getAllAttachments()
+ mapping1 = dict([(o.md5(), o.id()) for o in attachments])
+ yield _checkAttachmentObjectMigrationState(home1, mapping1)
+ yield self.commitTransaction(1)
+
+ # Add one attachment
+ object1 = yield self.calendarObjectUnderTest(txn=self.theTransactionUnderTest(0), home="user01", calendar_name="calendar", name="1.ics")
+ attachment, _ignore_location = yield object1.addAttachment(None, MimeType.fromString("text/plain"), "test.txt", MemoryStream("Here is some text #1."))
+ id0_1 = attachment.id()
+ md50_1 = attachment.md5()
+ managedid0_1 = attachment.managedID()
+ mapping0[md50_1] = id0_1
+ yield self.commitTransaction(0)
+
+ # Sync attachments
+ changed, removed = yield syncer.syncAttachments()
+ self.assertEqual(changed, set((id0_1,)))
+ self.assertEqual(removed, set())
+
+ # Validate changes
+ home1 = yield self.homeUnderTest(txn=self.theTransactionUnderTest(1), name=syncer.migratingUid())
+ attachments = yield home1.getAllAttachments()
+ mapping1 = dict([(o.md5(), o.id()) for o in attachments])
+ yield _checkAttachmentObjectMigrationState(home1, mapping1)
+
+ # Add another attachment
+ object1 = yield self.calendarObjectUnderTest(txn=self.theTransactionUnderTest(0), home="user01", calendar_name="calendar", name="2.ics")
+ attachment, _ignore_location = yield object1.addAttachment(None, MimeType.fromString("text/plain"), "test2.txt", MemoryStream("Here is some text #2."))
+ id0_2 = attachment.id()
+ md50_2 = attachment.md5()
+ mapping0[md50_2] = id0_2
+ yield self.commitTransaction(0)
+
+ # Sync attachments
+ changed, removed = yield syncer.syncAttachments()
+ self.assertEqual(changed, set((id0_2,)))
+ self.assertEqual(removed, set())
+
+ # Validate changes
+ home1 = yield self.homeUnderTest(txn=self.theTransactionUnderTest(1), name=syncer.migratingUid())
+ attachments = yield home1.getAllAttachments()
+ mapping1 = dict([(o.md5(), o.id()) for o in attachments])
+ yield _checkAttachmentObjectMigrationState(home1, mapping1)
+
+ # Change original attachment (this is actually a remove and a create all in one)
+ object1 = yield self.calendarObjectUnderTest(txn=self.theTransactionUnderTest(0), home="user01", calendar_name="calendar", name="1.ics")
+ attachment, _ignore_location = yield object1.updateAttachment(managedid0_1, MimeType.fromString("text/plain"), "test.txt", MemoryStream("Here is some text #1 - changed."))
+ del mapping0[md50_1]
+ id0_1_changed = attachment.id()
+ md50_1_changed = attachment.md5()
+ managedid0_1_changed = attachment.managedID()
+ mapping0[md50_1_changed] = id0_1_changed
+ yield self.commitTransaction(0)
+
+ # Sync attachments
+ changed, removed = yield syncer.syncAttachments()
+ self.assertEqual(changed, set((id0_1_changed,)))
+ self.assertEqual(removed, set((id0_1,)))
+
+ # Validate changes
+ home1 = yield self.homeUnderTest(txn=self.theTransactionUnderTest(1), name=syncer.migratingUid())
+ attachments = yield home1.getAllAttachments()
+ mapping1 = dict([(o.md5(), o.id()) for o in attachments])
+ yield _checkAttachmentObjectMigrationState(home1, mapping1)
+
+ # Add original to a different resource
+ object1 = yield self.calendarObjectUnderTest(txn=self.theTransactionUnderTest(0), home="user01", calendar_name="calendar", name="1.ics")
+ component = yield object1.componentForUser()
+ attach = component.mainComponent().getProperty("ATTACH")
+
+ object1 = yield self.calendarObjectUnderTest(txn=self.theTransactionUnderTest(0), home="user01", calendar_name="calendar", name="3.ics")
+ component = yield object1.componentForUser()
+ attach = component.mainComponent().addProperty(attach)
+ yield object1.setComponent(component)
+ yield self.commitTransaction(0)
+
+ # Sync attachments
+ changed, removed = yield syncer.syncAttachments()
+ self.assertEqual(changed, set())
+ self.assertEqual(removed, set())
+
+ # Validate changes
+ home1 = yield self.homeUnderTest(txn=self.theTransactionUnderTest(1), name=syncer.migratingUid())
+ attachments = yield home1.getAllAttachments()
+ mapping1 = dict([(o.md5(), o.id()) for o in attachments])
+ yield _checkAttachmentObjectMigrationState(home1, mapping1)
+
+ # Change original attachment in original resource (this creates a new one and does not remove the old)
+ object1 = yield self.calendarObjectUnderTest(txn=self.theTransactionUnderTest(0), home="user01", calendar_name="calendar", name="1.ics")
+ attachment, _ignore_location = yield object1.updateAttachment(managedid0_1_changed, MimeType.fromString("text/plain"), "test.txt", MemoryStream("Here is some text #1 - changed again."))
+ id0_1_changed_again = attachment.id()
+ md50_1_changed_again = attachment.md5()
+ mapping0[md50_1_changed_again] = id0_1_changed_again
+ yield self.commitTransaction(0)
+
+ # Sync attachments
+ changed, removed = yield syncer.syncAttachments()
+ self.assertEqual(changed, set((id0_1_changed_again,)))
+ self.assertEqual(removed, set())
+
+ # Validate changes
+ home1 = yield self.homeUnderTest(txn=self.theTransactionUnderTest(1), name=syncer.migratingUid())
+ attachments = yield home1.getAllAttachments()
+ mapping1 = dict([(o.md5(), o.id()) for o in attachments])
+ yield _checkAttachmentObjectMigrationState(home1, mapping1)
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/request.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/request.py 2015-02-11 19:55:49 UTC (rev 14402)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/request.py 2015-02-11 20:34:55 UTC (rev 14403)
@@ -23,7 +23,7 @@
from txweb2.client.http import HTTPClientProtocol, ClientRequest
from txweb2.dav.util import allDataFromStream
from txweb2.http_headers import Headers, MimeType
-from txweb2.stream import MemoryStream
+from txweb2.stream import MemoryStream, readStream
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet.protocol import Factory
@@ -50,11 +50,12 @@
case the JSON data is sent in an HTTP header.
"""
- def __init__(self, server, data, stream=None, stream_type=None):
+ def __init__(self, server, data, stream=None, stream_type=None, writeStream=None):
self.server = server
self.data = json.dumps(data)
self.stream = stream
self.streamType = stream_type
+ self.writeStream = writeStream
@inlineCallbacks
@@ -72,7 +73,27 @@
self.loggedResponse = yield self.logResponse(response)
emitAccounting("xPod", "", self.loggedRequest + "\n" + self.loggedResponse, "POST")
- if response.code in (responsecode.OK, responsecode.BAD_REQUEST,):
+ if response.code == responsecode.OK:
+ if self.writeStream is None:
+ data = (yield allDataFromStream(response.stream))
+ data = json.loads(data)
+ else:
+ yield readStream(response.stream, self.writeStream.write)
+ yield self.writeStream.loseConnection()
+ content_type = response.headers.getHeader("content-type")
+ if content_type is None:
+ content_type = MimeType("application", "octet-stream")
+ content_disposition = response.headers.getHeader("content-disposition")
+ if content_disposition is None or "filename" not in content_disposition.params:
+ filename = ""
+ else:
+ filename = content_disposition.params["filename"]
+ response = {
+ "result": "ok",
+ "content-type": content_type,
+ "name": filename,
+ }
+ elif response.code == responsecode.BAD_REQUEST:
data = (yield allDataFromStream(response.stream))
data = json.loads(data)
else:
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/resource.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/resource.py 2015-02-11 19:55:49 UTC (rev 14402)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/resource.py 2015-02-11 20:34:55 UTC (rev 14403)
@@ -18,9 +18,11 @@
from txweb2.dav.noneprops import NonePropertyStore
from txweb2.dav.util import allDataFromStream
from txweb2.http import Response, HTTPError, StatusResponse, JSONResponse
-from txweb2.http_headers import MimeType
+from txweb2.http_headers import MimeType, MimeDisposition
+from txweb2.stream import ProducerStream
from twisted.internet.defer import succeed, returnValue, inlineCallbacks
+from twisted.internet.protocol import Protocol
from twistedcaldav.extensions import DAVResource, \
DAVResourceWithoutChildrenMixin
@@ -154,19 +156,54 @@
request.extendedLogItems = {}
request.extendedLogItems["xpod"] = j["action"] if "action" in j else "unknown"
- # Get the conduit to process the data
- try:
- result = yield self.store.conduit.processRequest(j)
- code = responsecode.OK if result["result"] == "ok" else responsecode.BAD_REQUEST
- except Exception as e:
- # Send the exception over to the other side
- result = {
- "result": "exception",
- "class": ".".join((e.__class__.__module__, e.__class__.__name__,)),
- "request": str(e),
- }
- code = responsecode.BAD_REQUEST
+ # Look for a streaming action which needs special handling
+ if self.store.conduit.isStreamAction(j):
+ # Get the conduit to process the data stream
+ try:
+ stream = ProducerStream()
+ class StreamProtocol(Protocol):
+ def connectionMade(self):
+ stream.registerProducer(self.transport, False)
+ def dataReceived(self, data):
+ stream.write(data)
+ def connectionLost(self, reason):
+ stream.finish()
+
+ result = yield self.store.conduit.processRequestStream(j, StreamProtocol())
+
+ try:
+ ct, name = result
+ except ValueError:
+ code = responsecode.BAD_REQUEST
+ else:
+ headers = {"content-type": ct}
+ headers["content-disposition"] = MimeDisposition("attachment", params={"filename": name})
+ returnValue(Response(responsecode.OK, headers, stream))
+
+ except Exception as e:
+ # Send the exception over to the other side
+ result = {
+ "result": "exception",
+ "class": ".".join((e.__class__.__module__, e.__class__.__name__,)),
+ "details": str(e),
+ }
+ code = responsecode.BAD_REQUEST
+
+ else:
+ # Get the conduit to process the data
+ try:
+ result = yield self.store.conduit.processRequest(j)
+ code = responsecode.OK if result["result"] == "ok" else responsecode.BAD_REQUEST
+ except Exception as e:
+ # Send the exception over to the other side
+ result = {
+ "result": "exception",
+ "class": ".".join((e.__class__.__module__, e.__class__.__name__,)),
+ "details": str(e),
+ }
+ code = responsecode.BAD_REQUEST
+
response = JSONResponse(code, result)
returnValue(response)
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/test/test_conduit.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/test/test_conduit.py 2015-02-11 19:55:49 UTC (rev 14402)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/test/test_conduit.py 2015-02-11 20:34:55 UTC (rev 14403)
@@ -1056,3 +1056,54 @@
attachment = yield ManagedAttachment.load(self.theTransactionUnderTest(0), resourceID, managedID)
self.assertTrue(attachment is None)
yield self.commitTransaction(0)
+
+
+ @inlineCallbacks
+ def test_get_all_attachments(self):
+ """
+ Test that action=get-all-attachments works.
+ """
+
+ yield self.createShare("user01", "puser01")
+
+ calendar1 = yield self.calendarUnderTest(txn=self.theTransactionUnderTest(0), home="user01", name="calendar")
+ yield calendar1.createCalendarObjectWithName("1.ics", Component.fromString(self.caldata1))
+ yield self.commitTransaction(0)
+
+ object1 = yield self.calendarObjectUnderTest(txn=self.theTransactionUnderTest(0), home="user01", calendar_name="calendar", name="1.ics")
+ yield object1.addAttachment(None, MimeType.fromString("text/plain"), "test.txt", MemoryStream("Here is some text."))
+ yield self.commitTransaction(0)
+
+ shared_object = yield self.calendarObjectUnderTest(txn=self.theTransactionUnderTest(1), home="puser01", calendar_name="shared-calendar", name="1.ics")
+ attachments = yield shared_object.ownerHome().getAllAttachments()
+ self.assertEqual(len(attachments), 1)
+ self.assertTrue(isinstance(attachments[0], ManagedAttachment))
+ self.assertEqual(attachments[0].contentType(), MimeType.fromString("text/plain"))
+ self.assertEqual(attachments[0].name(), "test.txt")
+ yield self.commitTransaction(1)
+
+
+ @inlineCallbacks
+ def test_get_attachment_data(self):
+ """
+ Test that action=get-all-attachments works.
+ """
+
+ yield self.createShare("user01", "puser01")
+
+ calendar1 = yield self.calendarUnderTest(txn=self.theTransactionUnderTest(0), home="user01", name="calendar")
+ yield calendar1.createCalendarObjectWithName("1.ics", Component.fromString(self.caldata1))
+ yield self.commitTransaction(0)
+
+ object1 = yield self.calendarObjectUnderTest(txn=self.theTransactionUnderTest(0), home="user01", calendar_name="calendar", name="1.ics")
+ attachment, _ignore_location = yield object1.addAttachment(None, MimeType.fromString("text/plain"), "test.txt", MemoryStream("Here is some text."))
+ remote_id = attachment.id()
+ yield self.commitTransaction(0)
+
+ home1 = yield self.homeUnderTest(txn=self.theTransactionUnderTest(1), name="puser01")
+ shared_object = yield self.calendarObjectUnderTest(txn=self.theTransactionUnderTest(1), home="puser01", calendar_name="shared-calendar", name="1.ics")
+ attachment = yield ManagedAttachment._create(self.theTransactionUnderTest(1), None, home1.id())
+ attachment._contentType = MimeType.fromString("text/plain")
+ attachment._name = "test.txt"
+ yield shared_object.ownerHome().readAttachmentData(remote_id, attachment)
+ yield self.commitTransaction(1)
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/test/util.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/test/util.py 2015-02-11 19:55:49 UTC (rev 14402)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/test/util.py 2015-02-11 20:34:55 UTC (rev 14403)
@@ -15,6 +15,7 @@
##
from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.protocol import Protocol
from txdav.caldav.datastore.scheduling.ischedule.localservers import (
Server, ServersDB
@@ -26,6 +27,7 @@
)
import txweb2.dav.test.util
+from txweb2.stream import ProducerStream, readStream
from twext.enterprise.ienterprise import AlreadyFinishedError
@@ -54,12 +56,13 @@
cls.storeMap[server.details()] = store
- def __init__(self, server, data, stream=None, stream_type=None):
+ def __init__(self, server, data, stream=None, stream_type=None, writeStream=None):
self.server = server
self.data = json.dumps(data)
self.stream = stream
self.streamType = stream_type
+ self.writeStream = writeStream
@inlineCallbacks
@@ -68,7 +71,20 @@
# Generate an HTTP client request
try:
response = (yield self._processRequest())
- response = json.loads(response)
+ if self.writeStream is None:
+ response = json.loads(response)
+ else:
+ try:
+ ct, name, stream = response
+ response = {
+ "result": "ok",
+ "content-type": ct,
+ "name": name,
+ }
+ yield readStream(stream, self.writeStream.write)
+ yield self.writeStream.loseConnection()
+ except ValueError:
+ pass
except Exception as e:
raise ValueError("Failed cross-pod request: {}".format(e))
@@ -90,13 +106,32 @@
j["stream"] = self.stream
j["streamType"] = self.streamType
try:
- result = yield store.conduit.processRequest(j)
+ if store.conduit.isStreamAction(j):
+ stream = ProducerStream()
+ class StreamProtocol(Protocol):
+ def connectionMade(self):
+ stream.registerProducer(self.transport, False)
+ def dataReceived(self, data):
+ stream.write(data)
+ def connectionLost(self, reason):
+ stream.finish()
+
+ result = yield store.conduit.processRequestStream(j, StreamProtocol())
+
+ try:
+ ct, name = result
+ except ValueError:
+ pass
+ else:
+ returnValue((ct, name, stream,))
+ else:
+ result = yield store.conduit.processRequest(j)
except Exception as e:
# Send the exception over to the other side
result = {
"result": "exception",
"class": ".".join((e.__class__.__module__, e.__class__.__name__,)),
- "request": str(e),
+ "details": str(e),
}
result = json.dumps(result)
returnValue(result)
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_schema/current-oracle-dialect.sql
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_schema/current-oracle-dialect.sql 2015-02-11 19:55:49 UTC (rev 14402)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_schema/current-oracle-dialect.sql 2015-02-11 20:34:55 UTC (rev 14403)
@@ -69,7 +69,7 @@
"MODIFIED" timestamp default CURRENT_TIMESTAMP at time zone 'UTC'
);
-create table CALENDAR_MIGRATION_STATE (
+create table CALENDAR_MIGRATION (
"CALENDAR_HOME_RESOURCE_ID" integer references CALENDAR_HOME on delete cascade,
"REMOTE_RESOURCE_ID" integer not null,
"CALENDAR_RESOURCE_ID" integer references CALENDAR on delete cascade,
@@ -217,6 +217,13 @@
primary key ("TIME_RANGE_INSTANCE_ID", "USER_ID")
);
+create table CALENDAR_OBJECT_MIGRATION (
+ "CALENDAR_HOME_RESOURCE_ID" integer references CALENDAR_HOME on delete cascade,
+ "REMOTE_RESOURCE_ID" integer not null,
+ "LOCAL_RESOURCE_ID" integer references CALENDAR_OBJECT on delete cascade,
+ primary key ("CALENDAR_HOME_RESOURCE_ID", "REMOTE_RESOURCE_ID")
+);
+
create table ATTACHMENT (
"ATTACHMENT_ID" integer primary key,
"CALENDAR_HOME_RESOURCE_ID" integer not null references CALENDAR_HOME,
@@ -237,6 +244,13 @@
unique ("MANAGED_ID", "CALENDAR_OBJECT_RESOURCE_ID")
);
+create table ATTACHMENT_MIGRATION (
+ "CALENDAR_HOME_RESOURCE_ID" integer references CALENDAR_HOME on delete cascade,
+ "REMOTE_RESOURCE_ID" integer not null,
+ "LOCAL_RESOURCE_ID" integer references ATTACHMENT on delete cascade,
+ primary key ("CALENDAR_HOME_RESOURCE_ID", "REMOTE_RESOURCE_ID")
+);
+
create table RESOURCE_PROPERTY (
"RESOURCE_ID" integer not null,
"NAME" nvarchar2(255),
@@ -633,7 +647,7 @@
DEFAULT_POLLS
);
-create index CALENDAR_MIGRATION_ST_57f40e9a on CALENDAR_MIGRATION_STATE (
+create index CALENDAR_MIGRATION_CA_cc68f4ec on CALENDAR_MIGRATION (
CALENDAR_RESOURCE_ID
);
@@ -672,6 +686,15 @@
CALENDAR_OBJECT_RESOURCE_ID
);
+create index CALENDAR_OBJECT_MIGRA_0502cbef on CALENDAR_OBJECT_MIGRATION (
+ CALENDAR_HOME_RESOURCE_ID,
+ LOCAL_RESOURCE_ID
+);
+
+create index CALENDAR_OBJECT_MIGRA_3577efd9 on CALENDAR_OBJECT_MIGRATION (
+ LOCAL_RESOURCE_ID
+);
+
create index ATTACHMENT_CALENDAR_H_0078845c on ATTACHMENT (
CALENDAR_HOME_RESOURCE_ID
);
@@ -684,6 +707,15 @@
CALENDAR_OBJECT_RESOURCE_ID
);
+create index ATTACHMENT_MIGRATION__804bf85e on ATTACHMENT_MIGRATION (
+ CALENDAR_HOME_RESOURCE_ID,
+ LOCAL_RESOURCE_ID
+);
+
+create index ATTACHMENT_MIGRATION__816947fe on ATTACHMENT_MIGRATION (
+ LOCAL_RESOURCE_ID
+);
+
create index SHARED_ADDRESSBOOK_BI_e9a2e6d4 on SHARED_ADDRESSBOOK_BIND (
OWNER_HOME_RESOURCE_ID
);
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_schema/current.sql
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_schema/current.sql 2015-02-11 19:55:49 UTC (rev 14402)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_schema/current.sql 2015-02-11 20:34:55 UTC (rev 14403)
@@ -140,7 +140,7 @@
-- Calendar Migration --
------------------------
-create table CALENDAR_MIGRATION_STATE (
+create table CALENDAR_MIGRATION (
CALENDAR_HOME_RESOURCE_ID integer references CALENDAR_HOME on delete cascade,
REMOTE_RESOURCE_ID integer not null,
CALENDAR_RESOURCE_ID integer references CALENDAR on delete cascade,
@@ -149,8 +149,8 @@
primary key (CALENDAR_HOME_RESOURCE_ID, REMOTE_RESOURCE_ID) -- implicit index
);
-create index CALENDAR_MIGRATION_STATE_CALENDAR_RESOURCE_ID on
- CALENDAR_MIGRATION_STATE(CALENDAR_RESOURCE_ID);
+create index CALENDAR_MIGRATION_CALENDAR_RESOURCE_ID on
+ CALENDAR_MIGRATION(CALENDAR_RESOURCE_ID);
---------------------------
@@ -381,6 +381,24 @@
);
+-------------------------------
+-- Calendar Object Migration --
+-------------------------------
+
+create table CALENDAR_OBJECT_MIGRATION (
+ CALENDAR_HOME_RESOURCE_ID integer references CALENDAR_HOME on delete cascade,
+ REMOTE_RESOURCE_ID integer not null,
+ LOCAL_RESOURCE_ID integer references CALENDAR_OBJECT on delete cascade,
+
+ primary key (CALENDAR_HOME_RESOURCE_ID, REMOTE_RESOURCE_ID) -- implicit index
+);
+
+create index CALENDAR_OBJECT_MIGRATION_HOME_LOCAL on
+ CALENDAR_OBJECT_MIGRATION(CALENDAR_HOME_RESOURCE_ID, LOCAL_RESOURCE_ID);
+create index CALENDAR_OBJECT_MIGRATION_LOCAL_RESOURCE_ID on
+ CALENDAR_OBJECT_MIGRATION(LOCAL_RESOURCE_ID);
+
+
----------------
-- Attachment --
----------------
@@ -418,6 +436,24 @@
create index ATTACHMENT_CALENDAR_OBJECT_CALENDAR_OBJECT_RESOURCE_ID on
ATTACHMENT_CALENDAR_OBJECT(CALENDAR_OBJECT_RESOURCE_ID);
+-----------------------------------
+-- Calendar Attachment Migration --
+-----------------------------------
+
+create table ATTACHMENT_MIGRATION (
+ CALENDAR_HOME_RESOURCE_ID integer references CALENDAR_HOME on delete cascade,
+ REMOTE_RESOURCE_ID integer not null,
+ LOCAL_RESOURCE_ID integer references ATTACHMENT on delete cascade,
+
+ primary key (CALENDAR_HOME_RESOURCE_ID, REMOTE_RESOURCE_ID) -- implicit index
+);
+
+create index ATTACHMENT_MIGRATION_HOME_LOCAL on
+ ATTACHMENT_MIGRATION(CALENDAR_HOME_RESOURCE_ID, LOCAL_RESOURCE_ID);
+create index ATTACHMENT_MIGRATION_LOCAL_RESOURCE_ID on
+ ATTACHMENT_MIGRATION(LOCAL_RESOURCE_ID);
+
+
-----------------------
-- Resource Property --
-----------------------
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_51_to_52.sql
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_51_to_52.sql 2015-02-11 19:55:49 UTC (rev 14402)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_schema/upgrades/oracle-dialect/upgrade_from_51_to_52.sql 2015-02-11 20:34:55 UTC (rev 14403)
@@ -22,7 +22,7 @@
insert into HOME_STATUS (DESCRIPTION, ID) values ('migrating', 3);
-- New table
-create table CALENDAR_MIGRATION_STATE (
+create table CALENDAR_MIGRATION (
"CALENDAR_HOME_RESOURCE_ID" integer references CALENDAR_HOME on delete cascade,
"REMOTE_RESOURCE_ID" integer not null,
"CALENDAR_RESOURCE_ID" integer references CALENDAR on delete cascade,
@@ -30,10 +30,42 @@
primary key ("CALENDAR_HOME_RESOURCE_ID", "REMOTE_RESOURCE_ID")
);
-create index CALENDAR_MIGRATION_ST_57f40e9a on CALENDAR_MIGRATION_STATE (
+create index CALENDAR_MIGRATION_CA_cc68f4ec on CALENDAR_MIGRATION (
CALENDAR_RESOURCE_ID
);
+-- New table
+create table CALENDAR_OBJECT_MIGRATION (
+ "CALENDAR_HOME_RESOURCE_ID" integer references CALENDAR_HOME on delete cascade,
+ "REMOTE_RESOURCE_ID" integer not null,
+ "LOCAL_RESOURCE_ID" integer references CALENDAR_OBJECT on delete cascade,
+ primary key ("CALENDAR_HOME_RESOURCE_ID", "REMOTE_RESOURCE_ID")
+);
+create index CALENDAR_OBJECT_MIGRA_0502cbef on CALENDAR_OBJECT_MIGRATION (
+ CALENDAR_HOME_RESOURCE_ID,
+ LOCAL_RESOURCE_ID
+);
+create index CALENDAR_OBJECT_MIGRA_3577efd9 on CALENDAR_OBJECT_MIGRATION (
+ LOCAL_RESOURCE_ID
+);
+
+-- New table
+create table ATTACHMENT_MIGRATION (
+ "CALENDAR_HOME_RESOURCE_ID" integer references CALENDAR_HOME on delete cascade,
+ "REMOTE_RESOURCE_ID" integer not null,
+ "LOCAL_RESOURCE_ID" integer references ATTACHMENT on delete cascade,
+ primary key ("CALENDAR_HOME_RESOURCE_ID", "REMOTE_RESOURCE_ID")
+);
+
+create index ATTACHMENT_MIGRATION__804bf85e on ATTACHMENT_MIGRATION (
+ CALENDAR_HOME_RESOURCE_ID,
+ LOCAL_RESOURCE_ID
+);
+create index ATTACHMENT_MIGRATION__816947fe on ATTACHMENT_MIGRATION (
+ LOCAL_RESOURCE_ID
+);
+
+
-- update the version
update CALENDARSERVER set VALUE = '52' where NAME = 'VERSION';
Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_51_to_52.sql
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_51_to_52.sql 2015-02-11 19:55:49 UTC (rev 14402)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_schema/upgrades/postgres-dialect/upgrade_from_51_to_52.sql 2015-02-11 20:34:55 UTC (rev 14403)
@@ -22,7 +22,7 @@
insert into HOME_STATUS values (3, 'migrating');
-- New table
-create table CALENDAR_MIGRATION_STATE (
+create table CALENDAR_MIGRATION (
CALENDAR_HOME_RESOURCE_ID integer references CALENDAR_HOME on delete cascade,
REMOTE_RESOURCE_ID integer not null,
CALENDAR_RESOURCE_ID integer references CALENDAR on delete cascade,
@@ -31,9 +31,39 @@
primary key (CALENDAR_HOME_RESOURCE_ID, REMOTE_RESOURCE_ID) -- implicit index
);
-create index CALENDAR_MIGRATION_STATE_CALENDAR_RESOURCE_ID on
- CALENDAR_MIGRATION_STATE(CALENDAR_RESOURCE_ID);
+create index CALENDAR_MIGRATION_CALENDAR_RESOURCE_ID on
+ CALENDAR_MIGRATION(CALENDAR_RESOURCE_ID);
+
+-- New table
+create table CALENDAR_OBJECT_MIGRATION (
+ CALENDAR_HOME_RESOURCE_ID integer references CALENDAR_HOME on delete cascade,
+ REMOTE_RESOURCE_ID integer not null,
+ LOCAL_RESOURCE_ID integer references CALENDAR_OBJECT on delete cascade,
+
+ primary key (CALENDAR_HOME_RESOURCE_ID, REMOTE_RESOURCE_ID) -- implicit index
+);
+create index CALENDAR_OBJECT_MIGRATION_HOME_LOCAL on
+ CALENDAR_OBJECT_MIGRATION(CALENDAR_HOME_RESOURCE_ID, LOCAL_RESOURCE_ID);
+create index CALENDAR_OBJECT_MIGRATION_LOCAL_RESOURCE_ID on
+ CALENDAR_OBJECT_MIGRATION(LOCAL_RESOURCE_ID);
+
+
+-- New table
+create table ATTACHMENT_MIGRATION (
+ CALENDAR_HOME_RESOURCE_ID integer references CALENDAR_HOME on delete cascade,
+ REMOTE_RESOURCE_ID integer not null,
+ LOCAL_RESOURCE_ID integer references ATTACHMENT on delete cascade,
+
+ primary key (CALENDAR_HOME_RESOURCE_ID, REMOTE_RESOURCE_ID) -- implicit index
+);
+
+create index ATTACHMENT_MIGRATION_HOME_LOCAL on
+ ATTACHMENT_MIGRATION(CALENDAR_HOME_RESOURCE_ID, LOCAL_RESOURCE_ID);
+create index ATTACHMENT_MIGRATION_LOCAL_RESOURCE_ID on
+ ATTACHMENT_MIGRATION(LOCAL_RESOURCE_ID);
+
+
-- update the version
update CALENDARSERVER set VALUE = '52' where NAME = 'VERSION';
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150211/4a2d6920/attachment-0001.html>
More information about the calendarserver-changes
mailing list