[CalendarServer-changes] [9043] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Thu Apr 12 14:51:03 PDT 2012


Revision: 9043
          http://trac.macosforge.org/projects/calendarserver/changeset/9043
Author:   sagen at apple.com
Date:     2012-04-12 14:51:02 -0700 (Thu, 12 Apr 2012)
Log Message:
-----------
Implements staggered sending of APNS notifications.

Modified Paths:
--------------
    CalendarServer/trunk/calendarserver/push/applepush.py
    CalendarServer/trunk/calendarserver/push/test/test_applepush.py
    CalendarServer/trunk/calendarserver/push/util.py
    CalendarServer/trunk/twistedcaldav/stdconfig.py

Modified: CalendarServer/trunk/calendarserver/push/applepush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/applepush.py	2012-04-12 21:19:44 UTC (rev 9042)
+++ CalendarServer/trunk/calendarserver/push/applepush.py	2012-04-12 21:51:02 UTC (rev 9043)
@@ -34,7 +34,7 @@
 import struct
 import time
 from txdav.common.icommondatastore import InvalidSubscriptionValues
-from calendarserver.push.util import validToken, TokenHistory
+from calendarserver.push.util import validToken, TokenHistory, PushScheduler
 
 
 
@@ -103,6 +103,8 @@
                     settings[protocol]["PrivateKeyPath"],
                     chainPath=settings[protocol]["AuthorityChainPath"],
                     passphrase=settings[protocol]["Passphrase"],
+                    staggerNotifications=settings["EnableStaggering"],
+                    staggerSeconds=settings["StaggerSeconds"],
                     testConnector=providerTestConnector,
                     reactor=reactor,
                 )
@@ -171,9 +173,12 @@
             if numSubscriptions > 0:
                 self.log_debug("Sending %d APNS notifications for %s" %
                     (numSubscriptions, key))
+                tokens = []
                 for token, uid in subscriptions:
                     if token and uid:
-                        provider.sendNotification(token, key)
+                        tokens.append(token)
+                if tokens:
+                    provider.scheduleNotifications(tokens, key)
 
 
 
@@ -395,8 +400,9 @@
 class APNProviderService(APNConnectionService):
 
     def __init__(self, store, host, port, certPath, keyPath, chainPath="",
-        passphrase="", sslMethod="TLSv1_METHOD", testConnector=None,
-        reactor=None):
+        passphrase="", sslMethod="TLSv1_METHOD",
+        staggerNotifications=False, staggerSeconds=3,
+        testConnector=None, reactor=None):
 
         APNConnectionService.__init__(self, host, port, certPath, keyPath,
             chainPath=chainPath, passphrase=passphrase, sslMethod=sslMethod,
@@ -405,6 +411,11 @@
         self.store = store
         self.factory = None
         self.queue = []
+        if staggerNotifications:
+            self.scheduler = PushScheduler(self.reactor, self.sendNotification,
+                staggerSeconds=staggerSeconds)
+        else:
+            self.scheduler = None
 
     def startService(self):
         self.log_info("APNProviderService startService")
@@ -415,6 +426,8 @@
         self.log_info("APNProviderService stopService")
         if self.factory is not None:
             self.factory.stopTrying()
+        if self.scheduler is not None:
+            self.scheduler.stop()
 
     def clientConnectionMade(self):
         # Service the queue
@@ -427,21 +440,74 @@
                 if token and key:
                     self.sendNotification(token, key)
 
+
+    def scheduleNotifications(self, tokens, key):
+        """
+        The starting point for getting notifications to the APNS server.  If there is
+        a connection to the APNS server, these notifications are scheduled (or directly
+        sent if there is no scheduler).  If there is no connection, the notifications
+        are saved for later.
+
+        @param tokens: The device tokens to schedule notifications for
+        @type tokens: List of strings
+        @param key: The key to use for this batch of notifications
+        @type key: String
+        """
+        # Service has reference to factory has reference to protocol instance
+        connection = getattr(self.factory, "connection", None)
+        if connection is not None:
+            if self.scheduler is not None:
+                self.scheduler.schedule(tokens, key)
+            else:
+                for token in tokens:
+                    self.sendNotification(token, key)
+        else:
+            self._saveForWhenConnected(tokens, key)
+
+
+    def _saveForWhenConnected(self, tokens, key):
+        """
+        Called in order to save notifications that can't be sent now because there
+        is no connection to the APNS server.  (token, key) tuples are appended to
+        the queue which is serviced during clientConnectionMade()
+
+        @param tokens: The device tokens to schedule notifications for
+        @type tokens: List of strings
+        @param key: The key to use for this batch of notifications
+        @type key: String
+        """
+        for token in tokens:
+            tokenKeyPair = (token, key)
+            if tokenKeyPair not in self.queue:
+                self.log_debug("APNProviderService has no connection; queuing: %s %s" % (token, key))
+                self.queue.append((token, key))
+            else:
+                self.log_debug("APNProviderService has no connection; skipping duplicate: %s %s" % (token, key))
+
+
+
     def sendNotification(self, token, key):
+        """
+        If there is a connection the notification is sent right away, otherwise
+        the notification is saved for later.
+
+        @param token: The device token to send a notifications to
+        @type token: Strings
+        @param key: The key to use for this notification
+        @type key: String
+        """
         if not (token and key):
             return
 
         # Service has reference to factory has reference to protocol instance
         connection = getattr(self.factory, "connection", None)
         if connection is None:
-            self.log_debug("APNProviderService has no connection; queuing: %s %s" % (token, key))
-            tokenKeyPair = (token, key)
-            if tokenKeyPair not in self.queue:
-                self.queue.append(tokenKeyPair)
+            self._saveForWhenConnected([token], key)
         else:
             connection.sendNotification(token, key)
 
 
+
 class APNFeedbackProtocol(protocol.Protocol, LoggingMixIn):
     """
     Implements the Feedback portion of APNS

Modified: CalendarServer/trunk/calendarserver/push/test/test_applepush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/test/test_applepush.py	2012-04-12 21:19:44 UTC (rev 9042)
+++ CalendarServer/trunk/calendarserver/push/test/test_applepush.py	2012-04-12 21:51:02 UTC (rev 9043)
@@ -45,6 +45,8 @@
             "FeedbackHost" : "feedback.push.apple.com",
             "FeedbackPort" : 2196,
             "FeedbackUpdateSeconds" : 300,
+            "EnableStaggering" : True,
+            "StaggerSeconds" : 3,
             "CalDAV" : {
                 "CertificatePath" : "caldav.cer",
                 "PrivateKeyPath" : "caldav.pem",
@@ -75,11 +77,13 @@
         except InvalidSubscriptionValues:
             pass
 
-        token = "2d0d55cd7f98bcb81c6e24abcdc35168254c7846a43e2828b1ba5a8f82e219df"
+        token  = "2d0d55cd7f98bcb81c6e24abcdc35168254c7846a43e2828b1ba5a8f82e219df"
+        token2 = "3d0d55cd7f98bcb81c6e24abcdc35168254c7846a43e2828b1ba5a8f82e219df"
         key1 = "/CalDAV/calendars.example.com/user01/calendar/"
         timestamp1 = 1000
         uid = "D2256BCC-48E2-42D1-BD89-CBA1E4CCDFFB"
         yield txn.addAPNSubscription(token, key1, timestamp1, uid)
+        yield txn.addAPNSubscription(token2, key1, timestamp1, uid)
 
         key2 = "/CalDAV/calendars.example.com/user02/calendar/"
         timestamp2 = 3000
@@ -88,6 +92,7 @@
         subscriptions = (yield txn.apnSubscriptionsBySubscriber(uid))
         self.assertTrue([token, key1, timestamp1] in subscriptions)
         self.assertTrue([token, key2, timestamp2] in subscriptions)
+        self.assertTrue([token2, key1, timestamp1] in subscriptions)
 
         # Verify an update to a subscription with a different uid takes on
         # the new uid
@@ -117,8 +122,9 @@
         # Notification arrives from calendar server
         yield service.enqueue("update", "CalDAV|user01/calendar")
 
-        # The notification should be in the queue
-        self.assertEquals(service.providers["CalDAV"].queue, [(token, key1)])
+        # The notifications should be in the queue
+        self.assertTrue((token, key1) in service.providers["CalDAV"].queue)
+        self.assertTrue((token2, key1) in service.providers["CalDAV"].queue)
 
         # Start the service, making the connection which should service the
         # queue
@@ -139,10 +145,26 @@
             rawData[45:])
         self.assertEquals(payload[0], '{"key" : "%s"}' % (key1,))
         # Verify token history is updated
-        self.assertEquals(providerConnector.service.protocol.history.history,
-            [(1, token)]
-        )
+        self.assertTrue(token in [t for (i, t) in providerConnector.service.protocol.history.history])
+        self.assertTrue(token2 in [t for (i, t) in providerConnector.service.protocol.history.history])
 
+
+        #
+        # Verify staggering behavior
+        #
+
+        # Reset sent data
+        providerConnector.transport.data = None
+        # Send notification while service is connected
+        yield service.enqueue("update", "CalDAV|user01/calendar")
+        clock.advance(1) # so that first push is sent
+        self.assertEquals(len(providerConnector.transport.data), 103)
+        # Reset sent data
+        providerConnector.transport.data = None
+        clock.advance(3) # so that second push is sent
+        self.assertEquals(len(providerConnector.transport.data), 103)
+
+
         def errorTestFunction(status, identifier):
             history.append((status, identifier))
             return succeed(None)
@@ -223,12 +245,18 @@
         )
 
         # Verify processError removes associated subscriptions and history
-        yield providerConnector.service.protocol.processError(8, 1)
+        # First find the id corresponding to token2
+        for (id, t) in providerConnector.service.protocol.history.history:
+            if t == token2:
+                break
+
+        yield providerConnector.service.protocol.processError(8, id)
         # The token for this identifier is gone
-        self.assertEquals(providerConnector.service.protocol.history.history, [])
+        self.assertTrue((id, token2) not in providerConnector.service.protocol.history.history)
+
         # All subscriptions for this token should now be gone
         txn = self.store.newTransaction()
-        subscriptions = (yield txn.apnSubscriptionsByToken(token))
+        subscriptions = (yield txn.apnSubscriptionsByToken(token2))
         yield txn.commit()
         self.assertEquals(subscriptions, [])
 

Modified: CalendarServer/trunk/calendarserver/push/util.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/util.py	2012-04-12 21:19:44 UTC (rev 9042)
+++ CalendarServer/trunk/calendarserver/push/util.py	2012-04-12 21:51:02 UTC (rev 9043)
@@ -15,6 +15,7 @@
 ##
 
 from OpenSSL import crypto
+from twext.python.log import LoggingMixIn
 
 def getAPNTopicFromCertificate(certPath):
     """
@@ -99,3 +100,66 @@
                 del self.history[index]
                 return token
         return None
+
+
+
+class PushScheduler(LoggingMixIn):
+    """
+    Allows staggered scheduling of push notifications
+    """
+
+    def __init__(self, reactor, callback, staggerSeconds=1):
+        """
+        @param callback: The method to call when it's time to send a push
+        @type callback: callable
+        @param staggerSeconds: The number of seconds to stagger between each
+            push
+        @type staggerSeconds: integer
+        """
+        self.outstanding = {}
+        self.reactor = reactor
+        self.callback = callback
+        self.staggerSeconds = staggerSeconds
+
+    def schedule(self, tokens, key):
+        """
+        Schedules a batch of notifications for the given tokens, staggered
+        with self.staggerSeconds between each one.  Duplicates are ignored,
+        so if a token/key pair is already scheduled and not yet sent, a new
+        one will not be scheduled for that pair.
+
+        @param tokens: The device tokens to schedule notifications for
+        @type tokens: List of strings
+        @param key: The key to use for this batch of notifications
+        @type key: String
+        """
+        scheduleTime = 0.0
+        for token in tokens:
+            internalKey = (token, key)
+            if self.outstanding.has_key(internalKey):
+                self.log_debug("PushScheduler already has this scheduled: %s" %
+                    (internalKey,))
+            else:
+                self.outstanding[internalKey] = self.reactor.callLater(
+                    scheduleTime, self.send, token, key)
+                self.log_debug("PushScheduler scheduled: %s in %.0f sec" %
+                    (internalKey, scheduleTime))
+                scheduleTime += self.staggerSeconds
+
+    def send(self, token, key):
+        """
+        This method is what actually gets scheduled.  Its job is to remove
+        its corresponding entry from the outstanding dict and call the
+        callback.
+        """
+        self.log_debug("PushScheduler fired for %s %s" % (token, key))
+        del self.outstanding[(token, key)]
+        self.callback(token, key)
+
+    def stop(self):
+        """
+        Cancel all outstanding delayed calls
+        """
+        for (token, key), delayed in self.outstanding.iteritems():
+            self.log_debug("PushScheduler cancelling %s %s" % (token, key))
+            delayed.cancel()

Modified: CalendarServer/trunk/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/stdconfig.py	2012-04-12 21:19:44 UTC (rev 9042)
+++ CalendarServer/trunk/twistedcaldav/stdconfig.py	2012-04-12 21:51:02 UTC (rev 9043)
@@ -702,6 +702,8 @@
                 "FeedbackPort" : 2196,
                 "FeedbackUpdateSeconds" : 28800, # 8 hours
                 "Environment" : "PRODUCTION",
+                "EnableStaggering" : False,
+                "StaggerSeconds" : 3,
                 "CalDAV" : {
                     "CertificatePath" : "",
                     "PrivateKeyPath" : "",
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120412/a1467b88/attachment-0001.html>


More information about the calendarserver-changes mailing list