[CalendarServer-changes] [10132] CalendarServer/trunk/calendarserver

source_changes at macosforge.org source_changes at macosforge.org
Thu Dec 6 13:56:58 PST 2012


Revision: 10132
          http://trac.calendarserver.org//changeset/10132
Author:   sagen at apple.com
Date:     2012-12-06 13:56:57 -0800 (Thu, 06 Dec 2012)
Log Message:
-----------
APNS payload now contains dataChangedTimestamp and pushRequestSubmittedTimestamp

Modified Paths:
--------------
    CalendarServer/trunk/calendarserver/push/amppush.py
    CalendarServer/trunk/calendarserver/push/applepush.py
    CalendarServer/trunk/calendarserver/push/test/test_amppush.py
    CalendarServer/trunk/calendarserver/push/test/test_applepush.py
    CalendarServer/trunk/calendarserver/push/util.py
    CalendarServer/trunk/calendarserver/tools/ampnotifications.py

Modified: CalendarServer/trunk/calendarserver/push/amppush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/amppush.py	2012-12-06 21:28:23 UTC (rev 10131)
+++ CalendarServer/trunk/calendarserver/push/amppush.py	2012-12-06 21:56:57 UTC (rev 10132)
@@ -23,6 +23,7 @@
 from twisted.internet.protocol import Factory, ServerFactory
 from twisted.protocols import amp
 from twistedcaldav.notify import getPubSubPath
+import time
 import uuid
 
 
@@ -44,7 +45,7 @@
 # AMP Commands sent to client
 
 class NotificationForID(amp.Command):
-    arguments = [('id', amp.String())]
+    arguments = [('id', amp.String()), ('dataChangedTimestamp', amp.Integer())]
     response = [('status', amp.String())]
 
 
@@ -84,14 +85,13 @@
         self.log_debug("Removed subscriber")
         self.subscribers.remove(p)
 
-    def enqueue(self, op, id):
+    def enqueue(self, op, id, dataChangedTimestamp=None):
         """
         Sends an AMP push notification to any clients subscribing to this id.
 
         @param op: The operation that took place, either "create" or "update"
             (ignored in this implementation)
         @type op: C{str}
-
         @param id: The identifier of the resource that was updated, including
             a prefix indicating whether this is CalDAV or CardDAV related.
             The prefix is separated from the id with "|", e.g.:
@@ -102,6 +102,9 @@
             is used in conjunction with the prefix and the server hostname
             to build the actual key value that devices subscribe to.
         @type id: C{str}
+        @param dataChangedTimestamp: Timestamp (epoch seconds) for the data change
+            which triggered this notification (Only used for unit tests)
+            @type key: C{int}
         """
 
         try:
@@ -113,29 +116,33 @@
 
         id = getPubSubPath(id, {"host": self.serverHostName})
 
+        # Unit tests can pass this value in; otherwise it defaults to now
+        if dataChangedTimestamp is None:
+            dataChangedTimestamp = int(time.time())
+
         tokens = []
         for subscriber in self.subscribers:
             token = subscriber.subscribedToID(id)
             if token is not None:
                 tokens.append(token)
         if tokens:
-            return self.scheduleNotifications(tokens, id)
+            return self.scheduleNotifications(tokens, id, dataChangedTimestamp)
 
 
     @inlineCallbacks
-    def sendNotification(self, token, id):
+    def sendNotification(self, token, id, dataChangedTimestamp):
         for subscriber in self.subscribers:
             if subscriber.subscribedToID(id):
-                yield subscriber.notify(token, id)
+                yield subscriber.notify(token, id, dataChangedTimestamp)
 
 
     @inlineCallbacks
-    def scheduleNotifications(self, tokens, id):
+    def scheduleNotifications(self, tokens, id, dataChangedTimestamp):
         if self.scheduler is not None:
-            self.scheduler.schedule(tokens, id)
+            self.scheduler.schedule(tokens, id, dataChangedTimestamp)
         else:
             for token in tokens:
-                yield self.sendNotification(token, id)
+                yield self.sendNotification(token, id, dataChangedTimestamp)
 
 
 class AMPPushNotifierProtocol(amp.AMP, LoggingMixIn):
@@ -162,10 +169,11 @@
         return {"status" : "OK"}
     UnsubscribeFromID.responder(unsubscribe)
 
-    def notify(self, token, id):
+    def notify(self, token, id, dataChangedTimestamp):
         if self.subscribedToID(id) == token:
             self.log_debug("Sending notification for %s to %s" % (id, token))
-            return self.callRemote(NotificationForID, id=id)
+            return self.callRemote(NotificationForID, id=id,
+                dataChangedTimestamp=dataChangedTimestamp)
 
     def subscribedToID(self, id):
         if self.any is not None:
@@ -204,8 +212,8 @@
         self.callback = callback
 
     @inlineCallbacks
-    def notificationForID(self, id):
-        yield self.callback(id)
+    def notificationForID(self, id, dataChangedTimestamp):
+        yield self.callback(id, dataChangedTimestamp)
         returnValue( {"status" : "OK"} )
 
     NotificationForID.responder(notificationForID)

Modified: CalendarServer/trunk/calendarserver/push/applepush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/applepush.py	2012-12-06 21:28:23 UTC (rev 10131)
+++ CalendarServer/trunk/calendarserver/push/applepush.py	2012-12-06 21:56:57 UTC (rev 10132)
@@ -31,6 +31,7 @@
 from twisted.internet.task import LoopingCall
 from twistedcaldav.extensions import DAVResource, DAVResourceWithoutChildrenMixin
 from twistedcaldav.resource import ReadOnlyNoCopyResourceMixIn
+import json
 import OpenSSL
 import struct
 import time
@@ -176,7 +177,7 @@
 
 
     @inlineCallbacks
-    def enqueue(self, op, id):
+    def enqueue(self, op, id, dataChangedTimestamp=None):
         """
         Sends an Apple Push Notification to any device token subscribed to
         this id.
@@ -184,7 +185,6 @@
         @param op: The operation that took place, either "create" or "update"
             (ignored in this implementation)
         @type op: C{str}
-
         @param id: The identifier of the resource that was updated, including
             a prefix indicating whether this is CalDAV or CardDAV related.
             The prefix is separated from the id with "|", e.g.:
@@ -195,6 +195,9 @@
             is used in conjunction with the prefix and the server hostname
             to build the actual key value that devices subscribe to.
         @type id: C{str}
+        @param dataChangedTimestamp: Timestamp (epoch seconds) for the data change
+            which triggered this notification (Only used for unit tests)
+        @type key: C{int}
         """
 
         try:
@@ -204,6 +207,10 @@
             self.log_error("Notification id '%s' is missing protocol" % (id,))
             return
 
+        # Unit tests can pass this value in; otherwise it defaults to now
+        if dataChangedTimestamp is None:
+            dataChangedTimestamp = int(time.time())
+
         provider = self.providers.get(protocol, None)
         if provider is not None:
             key = "/%s/%s/%s/" % (protocol, self.dataHost, id)
@@ -222,7 +229,7 @@
                     if token and uid:
                         tokens.append(token)
                 if tokens:
-                    provider.scheduleNotifications(tokens, key)
+                    provider.scheduleNotifications(tokens, key, dataChangedTimestamp)
 
 
 
@@ -332,19 +339,21 @@
                 yield txn.commit()
 
 
-    def sendNotification(self, token, key):
+    def sendNotification(self, token, key, dataChangedTimestamp):
         """
         Sends a push notification message for the key to the device associated
         with the token.
 
         @param token: The device token subscribed to the key
         @type token: C{str}
-
         @param key: The key we're sending a notification about
         @type key: C{str}
+        @param dataChangedTimestamp: Timestamp (epoch seconds) for the data change
+            which triggered this notification
+        @type key: C{int}
         """
 
-        if not (token and key):
+        if not (token and key and dataChangedTimestamp):
             return
 
         try:
@@ -354,7 +363,13 @@
             return
 
         identifier = self.history.add(token)
-        payload = '{"key" : "%s"}' % (key,)
+        payload = json.dumps(
+            {
+                "key" : key,
+                "dataChangedTimestamp" : dataChangedTimestamp,
+                "pushRequestSubmittedTimestamp" : int(time.time()),
+            }
+        )
         payloadLength = len(payload)
         self.log_debug("Sending APNS notification to %s: id=%d payload=%s" %
             (token, identifier, payload))
@@ -488,12 +503,12 @@
             # sent will be put back into the queue.
             queued = list(self.queue)
             self.queue = []
-            for token, key in queued:
-                if token and key:
-                    self.sendNotification(token, key)
+            for (token, key), dataChangedTimestamp in queued:
+                if token and key and dataChangedTimestamp:
+                    self.sendNotification(token, key, dataChangedTimestamp)
 
 
-    def scheduleNotifications(self, tokens, key):
+    def scheduleNotifications(self, tokens, key, dataChangedTimestamp):
         """
         The starting point for getting notifications to the APNS server.  If there is
         a connection to the APNS server, these notifications are scheduled (or directly
@@ -504,59 +519,70 @@
         @type tokens: List of strings
         @param key: The key to use for this batch of notifications
         @type key: String
+        @param dataChangedTimestamp: Timestamp (epoch seconds) for the data change
+            which triggered this notification
+        @type key: C{int}
         """
         # Service has reference to factory has reference to protocol instance
         connection = getattr(self.factory, "connection", None)
         if connection is not None:
             if self.scheduler is not None:
-                self.scheduler.schedule(tokens, key)
+                self.scheduler.schedule(tokens, key, dataChangedTimestamp)
             else:
                 for token in tokens:
-                    self.sendNotification(token, key)
+                    self.sendNotification(token, key, dataChangedTimestamp)
         else:
-            self._saveForWhenConnected(tokens, key)
+            self._saveForWhenConnected(tokens, key, dataChangedTimestamp)
 
 
-    def _saveForWhenConnected(self, tokens, key):
+    def _saveForWhenConnected(self, tokens, key, dataChangedTimestamp):
         """
         Called in order to save notifications that can't be sent now because there
         is no connection to the APNS server.  (token, key) tuples are appended to
         the queue which is serviced during clientConnectionMade()
 
         @param tokens: The device tokens to schedule notifications for
-        @type tokens: List of strings
+        @type tokens: List of C{str}
         @param key: The key to use for this batch of notifications
-        @type key: String
+        @type key: C{str}
+        @param dataChangedTimestamp: Timestamp (epoch seconds) for the data change
+            which triggered this notification
+        @type key: C{int}
         """
         for token in tokens:
             tokenKeyPair = (token, key)
-            if tokenKeyPair not in self.queue:
+            for existingPair, ignored in self.queue:
+                if tokenKeyPair == existingPair:
+                    self.log_debug("APNProviderService has no connection; skipping duplicate: %s %s" % (token, key))
+                    break # Already scheduled
+            else:
                 self.log_debug("APNProviderService has no connection; queuing: %s %s" % (token, key))
-                self.queue.append((token, key))
-            else:
-                self.log_debug("APNProviderService has no connection; skipping duplicate: %s %s" % (token, key))
+                self.queue.append(((token, key), dataChangedTimestamp))
 
 
 
-    def sendNotification(self, token, key):
+    def sendNotification(self, token, key, dataChangedTimestamp):
         """
         If there is a connection the notification is sent right away, otherwise
         the notification is saved for later.
 
         @param token: The device token to send a notifications to
-        @type token: Strings
+        @type token: C{str}
         @param key: The key to use for this notification
-        @type key: String
+        @type key: C{str}
+        @param dataChangedTimestamp: Timestamp (epoch seconds) for the data change
+            which triggered this notification
+        @type key: C{int}
         """
-        if not (token and key):
+        if not (token and key and dataChangedTimestamp):
             return
 
         # Service has reference to factory has reference to protocol instance
         connection = getattr(self.factory, "connection", None)
         if connection is None:
-            self._saveForWhenConnected([token], key)
+            self._saveForWhenConnected([token], key, dataChangedTimestamp)
         else:
-            connection.sendNotification(token, key)
+            connection.sendNotification(token, key, dataChangedTimestamp)
 
 
 

Modified: CalendarServer/trunk/calendarserver/push/test/test_amppush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/test/test_amppush.py	2012-12-06 21:28:23 UTC (rev 10131)
+++ CalendarServer/trunk/calendarserver/push/test/test_amppush.py	2012-12-06 21:56:57 UTC (rev 10132)
@@ -67,27 +67,28 @@
         self.assertTrue(client3.subscribedToID("/CalDAV/localhost/user02/"))
         self.assertTrue(client3.subscribedToID("/CalDAV/localhost/user03/"))
 
-        service.enqueue("update", "CalDAV|user01")
+        dataChangedTimestamp = 1354815999
+        service.enqueue("update", "CalDAV|user01", dataChangedTimestamp=dataChangedTimestamp)
         self.assertEquals(len(client1.history), 0)
         self.assertEquals(len(client2.history), 0)
         self.assertEquals(len(client3.history), 0)
         clock.advance(1)
-        self.assertEquals(client1.history, [(NotificationForID, {'id': '/CalDAV/localhost/user01/'})])
+        self.assertEquals(client1.history, [(NotificationForID, {'id': '/CalDAV/localhost/user01/', 'dataChangedTimestamp': 1354815999})])
         self.assertEquals(len(client2.history), 0)
         self.assertEquals(len(client3.history), 0)
         clock.advance(3)
-        self.assertEquals(client2.history, [(NotificationForID, {'id': '/CalDAV/localhost/user01/'})])
+        self.assertEquals(client2.history, [(NotificationForID, {'id': '/CalDAV/localhost/user01/', 'dataChangedTimestamp': 1354815999})])
         self.assertEquals(len(client3.history), 0)
         clock.advance(3)
-        self.assertEquals(client3.history, [(NotificationForID, {'id': '/CalDAV/localhost/user01/'})])
+        self.assertEquals(client3.history, [(NotificationForID, {'id': '/CalDAV/localhost/user01/', 'dataChangedTimestamp': 1354815999})])
 
         client1.reset()
         client2.reset()
         client2.unsubscribe("token2", "/CalDAV/localhost/user01/")
-        service.enqueue("update", "CalDAV|user01")
+        service.enqueue("update", "CalDAV|user01", dataChangedTimestamp=dataChangedTimestamp)
         self.assertEquals(len(client1.history), 0)
         clock.advance(1)
-        self.assertEquals(client1.history, [(NotificationForID, {'id': '/CalDAV/localhost/user01/'})])
+        self.assertEquals(client1.history, [(NotificationForID, {'id': '/CalDAV/localhost/user01/', 'dataChangedTimestamp' : 1354815999})])
         self.assertEquals(len(client2.history), 0)
         clock.advance(3)
         self.assertEquals(len(client2.history), 0)
@@ -97,9 +98,9 @@
         client1.reset()
         client2.reset()
         client2.subscribe("token2", "/CalDAV/localhost/user01/")
-        service.enqueue("update", "CalDAV|user01")
-        self.assertEquals(client1.history, [(NotificationForID, {'id': '/CalDAV/localhost/user01/'})])
-        self.assertEquals(client2.history, [(NotificationForID, {'id': '/CalDAV/localhost/user01/'})])
+        service.enqueue("update", "CalDAV|user01", dataChangedTimestamp=dataChangedTimestamp)
+        self.assertEquals(client1.history, [(NotificationForID, {'id': '/CalDAV/localhost/user01/', 'dataChangedTimestamp' : 1354815999})])
+        self.assertEquals(client2.history, [(NotificationForID, {'id': '/CalDAV/localhost/user01/', 'dataChangedTimestamp' : 1354815999})])
 
 
 class TestProtocol(AMPPushNotifierProtocol):

Modified: CalendarServer/trunk/calendarserver/push/test/test_applepush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/test/test_applepush.py	2012-12-06 21:28:23 UTC (rev 10131)
+++ CalendarServer/trunk/calendarserver/push/test/test_applepush.py	2012-12-06 21:56:57 UTC (rev 10132)
@@ -14,6 +14,7 @@
 # limitations under the License.
 ##
 
+import json
 import struct
 import time
 from calendarserver.push.applepush import (
@@ -125,11 +126,13 @@
         # case by doing it prior to startService()
 
         # Notification arrives from calendar server
-        yield service.enqueue("update", "CalDAV|user01/calendar")
+        dataChangedTimestamp = 1354815999
+        yield service.enqueue("update", "CalDAV|user01/calendar",
+            dataChangedTimestamp=dataChangedTimestamp)
 
         # The notifications should be in the queue
-        self.assertTrue((token, key1) in service.providers["CalDAV"].queue)
-        self.assertTrue((token2, key1) in service.providers["CalDAV"].queue)
+        self.assertTrue(((token, key1), dataChangedTimestamp) in service.providers["CalDAV"].queue)
+        self.assertTrue(((token2, key1), dataChangedTimestamp) in service.providers["CalDAV"].queue)
 
         # Start the service, making the connection which should service the
         # queue
@@ -141,14 +144,17 @@
         # Verify data sent to APN
         providerConnector = service.providers["CalDAV"].testConnector
         rawData = providerConnector.transport.data
-        self.assertEquals(len(rawData), 103)
+        self.assertEquals(len(rawData), 183)
         data = struct.unpack("!BIIH32sH", rawData[:45])
         self.assertEquals(data[0], 1) # command
         self.assertEquals(data[4].encode("hex"), token.replace(" ", "")) # token
         payloadLength = data[5]
         payload = struct.unpack("%ds" % (payloadLength,),
             rawData[45:])
-        self.assertEquals(payload[0], '{"key" : "%s"}' % (key1,))
+        payload = json.loads(payload[0])
+        self.assertEquals(payload["key"], u"/CalDAV/calendars.example.com/user01/calendar/")
+        self.assertEquals(payload["dataChangedTimestamp"], dataChangedTimestamp)
+        self.assertTrue(payload.has_key("pushRequestSubmittedTimestamp"))
         # Verify token history is updated
         self.assertTrue(token in [t for (i, t) in providerConnector.service.protocol.history.history])
         self.assertTrue(token2 in [t for (i, t) in providerConnector.service.protocol.history.history])
@@ -163,11 +169,11 @@
         # Send notification while service is connected
         yield service.enqueue("update", "CalDAV|user01/calendar")
         clock.advance(1) # so that first push is sent
-        self.assertEquals(len(providerConnector.transport.data), 103)
+        self.assertEquals(len(providerConnector.transport.data), 183)
         # Reset sent data
         providerConnector.transport.data = None
         clock.advance(3) # so that second push is sent
-        self.assertEquals(len(providerConnector.transport.data), 103)
+        self.assertEquals(len(providerConnector.transport.data), 183)
 
 
         def errorTestFunction(status, identifier):

Modified: CalendarServer/trunk/calendarserver/push/util.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/util.py	2012-12-06 21:28:23 UTC (rev 10131)
+++ CalendarServer/trunk/calendarserver/push/util.py	2012-12-06 21:56:57 UTC (rev 10132)
@@ -121,7 +121,7 @@
         self.callback = callback
         self.staggerSeconds = staggerSeconds
 
-    def schedule(self, tokens, key):
+    def schedule(self, tokens, key, dataChangedTimestamp):
         """
         Schedules a batch of notifications for the given tokens, staggered
         with self.staggerSeconds between each one.  Duplicates are ignored,
@@ -129,9 +129,12 @@
         one will not be scheduled for that pair.
 
         @param tokens: The device tokens to schedule notifications for
-        @type tokens: List of strings
+        @type tokens: List of C{str}
         @param key: The key to use for this batch of notifications
-        @type key: String
+        @type key: C{str}
+        @param dataChangedTimestamp: Timestamp (epoch seconds) for the data change
+            which triggered this notification
+        @type key: C{int}
         """
         scheduleTime = 0.0
         for token in tokens:
@@ -141,20 +144,28 @@
                     (internalKey,))
             else:
                 self.outstanding[internalKey] = self.reactor.callLater(
-                    scheduleTime, self.send, token, key)
+                    scheduleTime, self.send, token, key, dataChangedTimestamp)
                 self.log_debug("PushScheduler scheduled: %s in %.0f sec" %
                     (internalKey, scheduleTime))
                 scheduleTime += self.staggerSeconds
 
-    def send(self, token, key):
+    def send(self, token, key, dataChangedTimestamp):
         """
         This method is what actually gets scheduled.  Its job is to remove
         its corresponding entry from the outstanding dict and call the
         callback.
+
+        @param token: The device token to send a notification to
+        @type token: C{str}
+        @param key: The notification key
+        @type key: C{str}
+        @param dataChangedTimestamp: Timestamp (epoch seconds) for the data change
+            which triggered this notification
+        @type key: C{int}
         """
-        self.log_debug("PushScheduler fired for %s %s" % (token, key))
+        self.log_debug("PushScheduler fired for %s %s %d" % (token, key, dataChangedTimestamp))
         del self.outstanding[(token, key)]
-        return self.callback(token, key)
+        return self.callback(token, key, dataChangedTimestamp)
 
     def stop(self):
         """

Modified: CalendarServer/trunk/calendarserver/tools/ampnotifications.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/ampnotifications.py	2012-12-06 21:28:23 UTC (rev 10131)
+++ CalendarServer/trunk/calendarserver/tools/ampnotifications.py	2012-12-06 21:56:57 UTC (rev 10132)
@@ -127,7 +127,7 @@
         MonitorAMPNotifications,
     )
 
-def notificationCallback(id):
+def notificationCallback(id, dataChangedTimestamp):
     print "Received notification for:", id
     return succeed(True)
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20121206/3ca483d7/attachment-0001.html>


More information about the calendarserver-changes mailing list