[CalendarServer-changes] [14464] CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common /datastore

source_changes at macosforge.org source_changes at macosforge.org
Fri Feb 20 13:21:21 PST 2015


Revision: 14464
          http://trac.calendarserver.org//changeset/14464
Author:   cdaboo at apple.com
Date:     2015-02-20 13:21:21 -0800 (Fri, 20 Feb 2015)
Log Message:
-----------
Checkpoint: migration final sync notifications.

Modified Paths:
--------------
    CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/migration/home_sync.py
    CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/migration/test/test_home_sync.py
    CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/store_api.py
    CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/util.py
    CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_external.py
    CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_notification.py

Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/migration/home_sync.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/migration/home_sync.py	2015-02-20 19:28:09 UTC (rev 14463)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/migration/home_sync.py	2015-02-20 21:21:21 UTC (rev 14464)
@@ -23,6 +23,9 @@
 from txdav.common.datastore.podding.migration.sync_metadata import CalendarMigrationRecord, \
     CalendarObjectMigrationRecord, AttachmentMigrationRecord
 from txdav.caldav.datastore.sql import ManagedAttachment
+from txdav.common.datastore.sql_external import NotificationCollectionExternal
+from txdav.common.datastore.sql_notification import NotificationCollection
+from txdav.common.datastore.sql_tables import _HOME_STATUS_EXTERNAL
 from txdav.common.idirectoryservice import DirectoryRecordNotFoundError
 
 import uuid
@@ -186,7 +189,7 @@
         pass
 
         # TODO: notifications
-        pass
+        yield self.notificationsReconcile()
 
         # TODO: work items
         pass
@@ -210,7 +213,7 @@
         """
 
         # TODO: implement API on CommonHome to rename the ownerUID column and
-        # change the status column.
+        # change the status column. Also adjust NotificationCollection.
         pass
 
 
@@ -729,7 +732,7 @@
 
         # Batch setting links for the local home
         len_links = len(links)
-        while len(links):
+        while links:
             yield self.makeAttachmentLinks(links[:50], attachmentIDMap, objectIDMap)
             links = links[50:]
 
@@ -890,3 +893,53 @@
             except KeyError:
                 continue
             yield groupAttendee.insert(txn)
+
+
+    @inlineCallbacks
+    def notificationsReconcile(self):
+        """
+        Sync all the existing L{NotificationObject} resources from the remote store.
+        """
+
+        records = yield self.notificationRecords()
+
+        # Batch setting resources for the local home
+        len_records = len(records)
+        while records:
+            yield self.makeNotifications(records[:50])
+            records = records[50:]
+
+        returnValue(len_records)
+
+
+    @inTransactionWrapper
+    @inlineCallbacks
+    def notificationRecords(self, txn):
+        """
+        Get all the existing L{NotificationObjectRecord}'s from the remote store.
+        """
+
+        notifications = yield NotificationCollectionExternal.notificationsWithUID(txn, self.diruid, True)
+        records = yield notifications.notificationObjectRecords()
+        for record in records:
+            # This needs to be reset when added to the local store
+            del record.resourceID
+
+            # Map the remote id to the local one.
+            record.notificationHomeResourceID = notifications.id()
+
+        returnValue(records)
+
+
+    @inTransactionWrapper
+    @inlineCallbacks
+    def makeNotifications(self, txn, records):
+        """
+        Create L{NotificationObjectRecord} records in the local store.
+        """
+
+        notifications = yield NotificationCollection.notificationsWithUID(txn, self.diruid, True, _HOME_STATUS_EXTERNAL)
+        for record in records:
+            # Do this via the "write" API so that sync revisions are updated properly, rather than just
+            # inserting the records directly.
+            yield notifications.writeNotificationObject(record.notificationUID, record.notificationType, record.notificationData)

Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/migration/test/test_home_sync.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/migration/test/test_home_sync.py	2015-02-20 19:28:09 UTC (rev 14463)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/migration/test/test_home_sync.py	2015-02-20 21:21:21 UTC (rev 14464)
@@ -29,11 +29,14 @@
 from txdav.common.datastore.podding.test.util import MultiStoreConduitTest
 from txdav.common.datastore.sql_directory import DelegateRecord, \
     ExternalDelegateGroupsRecord, DelegateGroupsRecord
-from txdav.common.datastore.sql_tables import schema
+from txdav.common.datastore.sql_notification import NotificationCollection
+from txdav.common.datastore.sql_tables import schema, _HOME_STATUS_EXTERNAL
 from txdav.common.datastore.test.util import populateCalendarsFrom
 from txdav.who.delegates import Delegates
 from txweb2.http_headers import MimeType
 from txweb2.stream import MemoryStream
+from uuid import uuid4
+import json
 
 
 class TestCrossPodHomeSync(MultiStoreConduitTest):
@@ -926,7 +929,53 @@
         yield self.commitTransaction(1)
 
 
+    @inlineCallbacks
+    def test_notifications_reconcile(self):
+        """
+        Test that L{delegateReconcile} copies over the full set of delegates and caches associated groups..
+        """
 
+        # Create remote home - and add some fake notifications
+        yield self.homeUnderTest(txn=self.theTransactionUnderTest(0), name="user01", create=True)
+        notifications = yield self.theTransactionUnderTest(0).notificationsWithUID("user01")
+        uid1 = str(uuid4())
+        obj1 = yield notifications.writeNotificationObject(uid1, "type1", "data1")
+        id1 = obj1.id()
+        uid2 = str(uuid4())
+        obj2 = yield notifications.writeNotificationObject(uid2, "type2", "data2")
+        id2 = obj2.id()
+        yield self.commitTransaction(0)
+
+        # Sync from remote side
+        syncer = CrossPodHomeSync(self.theStoreUnderTest(1), "user01")
+        yield syncer.loadRecord()
+        syncer.homeId = yield syncer.prepareCalendarHome()
+        changes = yield syncer.notificationsReconcile()
+        self.assertEqual(changes, 2)
+
+        # Now have local notifications
+        notifications = yield NotificationCollection.notificationsWithUID(
+            self.theTransactionUnderTest(1),
+            "user01",
+            True,
+            _HOME_STATUS_EXTERNAL
+        )
+        results = yield notifications.notificationObjects()
+        self.assertEqual(len(results), 2)
+        for result in results:
+            for test_uid, test_id, test_type, test_data in ((uid1, id1, "type1", "data1",), (uid2, id2, "type2", "data2",),):
+                if result.uid() == test_uid:
+                    self.assertNotEqual(result.id(), test_id)
+                    self.assertEqual(json.loads(result.notificationType()), test_type)
+                    data = yield result.notificationData()
+                    self.assertEqual(json.loads(data), test_data)
+                    break
+            else:
+                self.fail("Notification uid {} not found".format(result.uid()))
+        yield self.commitTransaction(1)
+
+
+
 class TestGroupAttendeeSync(MultiStoreConduitTest):
     """
     GroupAttendeeReconciliation tests

Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/store_api.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/store_api.py	2015-02-20 19:28:09 UTC (rev 14463)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/store_api.py	2015-02-20 21:21:21 UTC (rev 14464)
@@ -163,3 +163,6 @@
 UtilityConduitMixin._make_simple_action(StoreAPIConduitMixin, "objectresource_setcomponent", "setComponent")
 UtilityConduitMixin._make_simple_action(StoreAPIConduitMixin, "objectresource_component", "component", transform_recv_result=UtilityConduitMixin._to_string)
 UtilityConduitMixin._make_simple_action(StoreAPIConduitMixin, "objectresource_remove", "remove")
+
+# Calls on L{NotificationCollection} objects
+UtilityConduitMixin._make_simple_action(StoreAPIConduitMixin, "notification_all_records", "notificationObjectRecords", classMethod=False, transform_recv_result=UtilityConduitMixin._to_serialize_list)

Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/util.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/util.py	2015-02-20 19:28:09 UTC (rev 14463)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/podding/util.py	2015-02-20 21:21:21 UTC (rev 14464)
@@ -17,6 +17,8 @@
 from twisted.internet.defer import inlineCallbacks, returnValue
 
 from txdav.common.datastore.podding.base import FailedCrossPodRequestError
+from txdav.common.datastore.sql_notification import NotificationCollection, \
+    NotificationObject
 
 
 class UtilityConduitMixin(object):
@@ -56,6 +58,7 @@
         viewer_home = None
         home_child = None
         object_resource = None
+        notification = None
         if isinstance(storeObject, CommonObjectResource):
             owner_home = storeObject.ownerHome()
             viewer_home = storeObject.viewerHome()
@@ -71,22 +74,31 @@
             viewer_home = storeObject
             txn = storeObject._txn
             result["classMethod"] = classMethod
+        elif isinstance(storeObject, NotificationCollection):
+            notification = storeObject
+            txn = storeObject._txn
+            result["classMethod"] = classMethod
 
         # Add store object identities to JSON request
-        result["homeType"] = viewer_home._homeType
-        result["homeUID"] = viewer_home.uid()
-        if home_child:
-            if home_child.owned():
-                result["homeChildID"] = home_child.id()
-            else:
-                result["homeChildSharedID"] = home_child.name()
-        if object_resource:
-            result["objectResourceID"] = object_resource.id()
+        if viewer_home:
+            result["homeType"] = viewer_home._homeType
+            result["homeUID"] = viewer_home.uid()
+            if home_child:
+                if home_child.owned():
+                    result["homeChildID"] = home_child.id()
+                else:
+                    result["homeChildSharedID"] = home_child.name()
+            if object_resource:
+                result["objectResourceID"] = object_resource.id()
 
-        # Note that the owner_home is always the ownerHome() because in the sharing case
-        # a viewer is accessing the owner's data on another pod.
-        recipient = yield self.store.directoryService().recordWithUID(owner_home.uid())
+            # Note that the owner_home is always the ownerHome() because in the sharing case
+            # a viewer is accessing the owner's data on another pod.
+            recipient = yield self.store.directoryService().recordWithUID(owner_home.uid())
 
+        elif notification:
+            result["notificationUID"] = notification.uid()
+            recipient = yield self.store.directoryService().recordWithUID(notification.uid())
+
         returnValue((txn, result, recipient.server(),))
 
 
@@ -129,6 +141,15 @@
                 raise FailedCrossPodRequestError("Invalid object resource specified")
             returnObject = objectResource
 
+        if "notificationUID" in request:
+            notification = yield txn.notificationsWithUID(request["notificationUID"])
+            if notification is None:
+                raise FailedCrossPodRequestError("Invalid notification UID specified")
+            notification._internalRequest = False
+            returnObject = notification
+            if request.get("classMethod", False):
+                classObject = NotificationObject
+
         returnValue((returnObject, classObject,))
 
 

Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_external.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_external.py	2015-02-20 19:28:09 UTC (rev 14463)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_external.py	2015-02-20 21:21:21 UTC (rev 14464)
@@ -26,6 +26,8 @@
 from txdav.base.propertystore.sql import PropertyStore
 from txdav.common.datastore.sql import CommonHome, CommonHomeChild, \
     CommonObjectResource
+from txdav.common.datastore.sql_notification import NotificationCollection, \
+    NotificationObjectRecord
 from txdav.common.datastore.sql_tables import _HOME_STATUS_EXTERNAL
 from txdav.common.icommondatastore import NonExistentExternalShare, \
     ExternalShareFailed
@@ -199,7 +201,6 @@
     """
 
     @classmethod
-    @inlineCallbacks
     def listObjects(cls, home):
         """
         Retrieve the names of the children that exist in the given home.
@@ -207,8 +208,7 @@
         @return: an iterable of C{str}s.
         """
 
-        results = yield home._txn.store().conduit.send_homechild_listobjects(home)
-        returnValue(results)
+        return home._txn.store().conduit.send_homechild_listobjects(home)
 
 
     @classmethod
@@ -385,17 +385,13 @@
 
 
     @classmethod
-    @inlineCallbacks
     def listObjects(cls, parent):
-        results = yield parent._txn.store().conduit.send_objectresource_listobjects(parent)
-        returnValue(results)
+        return parent._txn.store().conduit.send_objectresource_listobjects(parent)
 
 
     @classmethod
-    @inlineCallbacks
     def countObjects(cls, parent):
-        result = yield parent._txn.store().conduit.send_objectresource_countobjects(parent)
-        returnValue(result)
+        return parent._txn.store().conduit.send_objectresource_countobjects(parent)
 
 
     @classmethod
@@ -411,17 +407,13 @@
 
 
     @classmethod
-    @inlineCallbacks
     def resourceNameForUID(cls, parent, uid):
-        result = yield parent._txn.store().conduit.send_objectresource_resourcenameforuid(parent, uid)
-        returnValue(result)
+        return parent._txn.store().conduit.send_objectresource_resourcenameforuid(parent, uid)
 
 
     @classmethod
-    @inlineCallbacks
     def resourceUIDForName(cls, parent, name):
-        result = yield parent._txn.store().conduit.send_objectresource_resourceuidforname(parent, name)
-        returnValue(result)
+        return parent._txn.store().conduit.send_objectresource_resourceuidforname(parent, name)
 
 
     @classmethod
@@ -452,6 +444,23 @@
         returnValue(self._cachedComponent)
 
 
+    def remove(self):
+        return self._txn.store().conduit.send_objectresource_remove(self)
+
+
+
+class NotificationCollectionExternal(NotificationCollection):
+    """
+    A NotificationCollection for a resource not hosted on this system, but on another pod. This will forward
+    specific apis to the other pod using cross-pod requests.
+    """
+
+    @classmethod
+    def notificationsWithUID(cls, txn, uid, create):
+        return super(NotificationCollectionExternal, cls).notificationsWithUID(txn, uid, create, expected_status=_HOME_STATUS_EXTERNAL)
+
+
     @inlineCallbacks
-    def remove(self):
-        yield self._txn.store().conduit.send_objectresource_remove(self)
+    def notificationObjectRecords(self):
+        results = yield self._txn.store().conduit.send_notification_all_records(self)
+        returnValue(map(NotificationObjectRecord.deserialize, results))

Modified: CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_notification.py
===================================================================
--- CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_notification.py	2015-02-20 19:28:09 UTC (rev 14463)
+++ CalendarServer/branches/users/cdaboo/pod2pod-migration/txdav/common/datastore/sql_notification.py	2015-02-20 21:21:21 UTC (rev 14464)
@@ -15,6 +15,7 @@
 # limitations under the License.
 ##
 
+from twext.enterprise.dal.record import SerializableRecord, fromTable
 from twext.enterprise.dal.syntax import Select, Parameter, Insert, \
     SavepointAction, Delete, Max, Len, Update
 from twext.enterprise.util import parseSQLTimestamp
@@ -56,11 +57,12 @@
     _homeSchema = schema.NOTIFICATION_HOME
 
 
-    def __init__(self, txn, uid, resourceID):
+    def __init__(self, txn, uid, resourceID, status):
 
         self._txn = txn
         self._uid = uid
         self._resourceID = resourceID
+        self._status = status
         self._dataVersion = None
         self._notifications = {}
         self._notificationNames = None
@@ -71,15 +73,22 @@
         self._notifiers = dict([(factory_name, factory.newNotifier(self),) for factory_name, factory in txn._notifierFactories.items()])
 
     _resourceIDFromUIDQuery = Select(
-        [_homeSchema.RESOURCE_ID], From=_homeSchema,
-        Where=_homeSchema.OWNER_UID == Parameter("uid"))
+        [_homeSchema.RESOURCE_ID, _homeSchema.STATUS],
+        From=_homeSchema,
+        Where=_homeSchema.OWNER_UID == Parameter("uid")
+    )
 
     _UIDFromResourceIDQuery = Select(
-        [_homeSchema.OWNER_UID], From=_homeSchema,
-        Where=_homeSchema.RESOURCE_ID == Parameter("rid"))
+        [_homeSchema.OWNER_UID],
+        From=_homeSchema,
+        Where=_homeSchema.RESOURCE_ID == Parameter("rid")
+    )
 
     _provisionNewNotificationsQuery = Insert(
-        {_homeSchema.OWNER_UID: Parameter("uid")},
+        {
+            _homeSchema.OWNER_UID: Parameter("uid"),
+            _homeSchema.STATUS: Parameter("status"),
+        },
         Return=_homeSchema.RESOURCE_ID
     )
 
@@ -95,7 +104,7 @@
 
     @classmethod
     @inlineCallbacks
-    def notificationsWithUID(cls, txn, uid, create):
+    def notificationsWithUID(cls, txn, uid, create, expected_status=_HOME_STATUS_NORMAL):
         """
         @param uid: I'm going to assume uid is utf-8 encoded bytes
         """
@@ -103,6 +112,9 @@
 
         if rows:
             resourceID = rows[0][0]
+            status = rows[0][1]
+            if status != expected_status:
+                raise RecordNotAllowedError("Notifications status mismatch: {} != {}".format(status, expected_status))
             created = False
         elif create:
             # Determine if the user is local or external
@@ -110,9 +122,9 @@
             if record is None:
                 raise DirectoryRecordNotFoundError("Cannot create home for UID since no directory record exists: {}".format(uid))
 
-            state = _HOME_STATUS_NORMAL if record.thisServer() else _HOME_STATUS_EXTERNAL
-            if state == _HOME_STATUS_EXTERNAL:
-                raise RecordNotAllowedError("Cannot store notifications for external user: {}".format(uid))
+            status = _HOME_STATUS_NORMAL if record.thisServer() else _HOME_STATUS_EXTERNAL
+            if status != expected_status:
+                raise RecordNotAllowedError("Notifications status mismatch: {} != {}".format(status, expected_status))
 
             # Use savepoint so we can do a partial rollback if there is a race
             # condition where this row has already been inserted
@@ -121,7 +133,7 @@
 
             try:
                 resourceID = str((
-                    yield cls._provisionNewNotificationsQuery.on(txn, uid=uid)
+                    yield cls._provisionNewNotificationsQuery.on(txn, uid=uid, status=status)
                 )[0][0])
             except Exception:
                 # FIXME: Really want to trap the pg.DatabaseError but in a non-
@@ -132,6 +144,9 @@
                 rows = yield cls._resourceIDFromUIDQuery.on(txn, uid=uid)
                 if rows:
                     resourceID = rows[0][0]
+                    status = rows[0][1]
+                    if status != expected_status:
+                        raise RecordNotAllowedError("Notifications status mismatch: {} != {}".format(status, expected_status))
                     created = False
                 else:
                     raise
@@ -140,7 +155,7 @@
                 yield savepoint.release(txn)
         else:
             returnValue(None)
-        collection = cls(txn, uid, resourceID)
+        collection = cls(txn, uid, resourceID, status)
         yield collection._loadPropertyStore()
         if created:
             yield collection._initSyncToken()
@@ -224,6 +239,10 @@
         return self._home
 
 
+    def notificationObjectRecords(self):
+        return NotificationObjectRecord.querysimple(self._txn, notificationHomeResourceID=self.id())
+
+
     @inlineCallbacks
     def notificationObjects(self):
         results = (yield NotificationObject.loadAllObjects(self))
@@ -292,6 +311,7 @@
         else:
             yield self._updateRevision("%s.xml" % (uid,))
         yield self.notifyChanged()
+        returnValue(notificationObject)
 
 
     def removeNotificationObjectWithName(self, name):
@@ -441,6 +461,15 @@
 
 
 
+class NotificationObjectRecord(SerializableRecord, fromTable(schema.NOTIFICATION)):
+    """
+    @DynamicAttrs
+    L{Record} for L{schema.NOTIFICATION}.
+    """
+    pass
+
+
+
 class NotificationObject(FancyEqMixin, object):
     """
     This used to store XML data and an XML element for the type. But we are now switching it
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150220/df452277/attachment-0001.html>


More information about the calendarserver-changes mailing list