[CalendarServer-changes] [9043] CalendarServer/trunk
source_changes at macosforge.org
source_changes at macosforge.org
Thu Apr 12 14:51:03 PDT 2012
Revision: 9043
http://trac.macosforge.org/projects/calendarserver/changeset/9043
Author: sagen at apple.com
Date: 2012-04-12 14:51:02 -0700 (Thu, 12 Apr 2012)
Log Message:
-----------
Implements staggered sending of APNS notifications.
Modified Paths:
--------------
CalendarServer/trunk/calendarserver/push/applepush.py
CalendarServer/trunk/calendarserver/push/test/test_applepush.py
CalendarServer/trunk/calendarserver/push/util.py
CalendarServer/trunk/twistedcaldav/stdconfig.py
Modified: CalendarServer/trunk/calendarserver/push/applepush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/applepush.py 2012-04-12 21:19:44 UTC (rev 9042)
+++ CalendarServer/trunk/calendarserver/push/applepush.py 2012-04-12 21:51:02 UTC (rev 9043)
@@ -34,7 +34,7 @@
import struct
import time
from txdav.common.icommondatastore import InvalidSubscriptionValues
-from calendarserver.push.util import validToken, TokenHistory
+from calendarserver.push.util import validToken, TokenHistory, PushScheduler
@@ -103,6 +103,8 @@
settings[protocol]["PrivateKeyPath"],
chainPath=settings[protocol]["AuthorityChainPath"],
passphrase=settings[protocol]["Passphrase"],
+ staggerNotifications=settings["EnableStaggering"],
+ staggerSeconds=settings["StaggerSeconds"],
testConnector=providerTestConnector,
reactor=reactor,
)
@@ -171,9 +173,12 @@
if numSubscriptions > 0:
self.log_debug("Sending %d APNS notifications for %s" %
(numSubscriptions, key))
+ tokens = []
for token, uid in subscriptions:
if token and uid:
- provider.sendNotification(token, key)
+ tokens.append(token)
+ if tokens:
+ provider.scheduleNotifications(tokens, key)
@@ -395,8 +400,9 @@
class APNProviderService(APNConnectionService):
def __init__(self, store, host, port, certPath, keyPath, chainPath="",
- passphrase="", sslMethod="TLSv1_METHOD", testConnector=None,
- reactor=None):
+ passphrase="", sslMethod="TLSv1_METHOD",
+ staggerNotifications=False, staggerSeconds=3,
+ testConnector=None, reactor=None):
APNConnectionService.__init__(self, host, port, certPath, keyPath,
chainPath=chainPath, passphrase=passphrase, sslMethod=sslMethod,
@@ -405,6 +411,11 @@
self.store = store
self.factory = None
self.queue = []
+ if staggerNotifications:
+ self.scheduler = PushScheduler(self.reactor, self.sendNotification,
+ staggerSeconds=staggerSeconds)
+ else:
+ self.scheduler = None
def startService(self):
self.log_info("APNProviderService startService")
@@ -415,6 +426,8 @@
self.log_info("APNProviderService stopService")
if self.factory is not None:
self.factory.stopTrying()
+ if self.scheduler is not None:
+ self.scheduler.stop()
def clientConnectionMade(self):
# Service the queue
@@ -427,21 +440,74 @@
if token and key:
self.sendNotification(token, key)
+
+ def scheduleNotifications(self, tokens, key):
+ """
+ The starting point for getting notifications to the APNS server. If there is
+ a connection to the APNS server, these notifications are scheduled (or directly
+ sent if there is no scheduler). If there is no connection, the notifications
+ are saved for later.
+
+ @param tokens: The device tokens to schedule notifications for
+ @type tokens: List of strings
+ @param key: The key to use for this batch of notifications
+ @type key: String
+ """
+ # Service has reference to factory has reference to protocol instance
+ connection = getattr(self.factory, "connection", None)
+ if connection is not None:
+ if self.scheduler is not None:
+ self.scheduler.schedule(tokens, key)
+ else:
+ for token in tokens:
+ self.sendNotification(token, key)
+ else:
+ self._saveForWhenConnected(tokens, key)
+
+
+ def _saveForWhenConnected(self, tokens, key):
+ """
+ Called in order to save notifications that can't be sent now because there
+ is no connection to the APNS server. (token, key) tuples are appended to
+ the queue which is serviced during clientConnectionMade()
+
+ @param tokens: The device tokens to schedule notifications for
+ @type tokens: List of strings
+ @param key: The key to use for this batch of notifications
+ @type key: String
+ """
+ for token in tokens:
+ tokenKeyPair = (token, key)
+ if tokenKeyPair not in self.queue:
+ self.log_debug("APNProviderService has no connection; queuing: %s %s" % (token, key))
+ self.queue.append((token, key))
+ else:
+ self.log_debug("APNProviderService has no connection; skipping duplicate: %s %s" % (token, key))
+
+
+
def sendNotification(self, token, key):
+ """
+ If there is a connection the notification is sent right away, otherwise
+ the notification is saved for later.
+
+ @param token: The device token to send a notifications to
+ @type token: Strings
+ @param key: The key to use for this notification
+ @type key: String
+ """
if not (token and key):
return
# Service has reference to factory has reference to protocol instance
connection = getattr(self.factory, "connection", None)
if connection is None:
- self.log_debug("APNProviderService has no connection; queuing: %s %s" % (token, key))
- tokenKeyPair = (token, key)
- if tokenKeyPair not in self.queue:
- self.queue.append(tokenKeyPair)
+ self._saveForWhenConnected([token], key)
else:
connection.sendNotification(token, key)
+
class APNFeedbackProtocol(protocol.Protocol, LoggingMixIn):
"""
Implements the Feedback portion of APNS
Modified: CalendarServer/trunk/calendarserver/push/test/test_applepush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/test/test_applepush.py 2012-04-12 21:19:44 UTC (rev 9042)
+++ CalendarServer/trunk/calendarserver/push/test/test_applepush.py 2012-04-12 21:51:02 UTC (rev 9043)
@@ -45,6 +45,8 @@
"FeedbackHost" : "feedback.push.apple.com",
"FeedbackPort" : 2196,
"FeedbackUpdateSeconds" : 300,
+ "EnableStaggering" : True,
+ "StaggerSeconds" : 3,
"CalDAV" : {
"CertificatePath" : "caldav.cer",
"PrivateKeyPath" : "caldav.pem",
@@ -75,11 +77,13 @@
except InvalidSubscriptionValues:
pass
- token = "2d0d55cd7f98bcb81c6e24abcdc35168254c7846a43e2828b1ba5a8f82e219df"
+ token = "2d0d55cd7f98bcb81c6e24abcdc35168254c7846a43e2828b1ba5a8f82e219df"
+ token2 = "3d0d55cd7f98bcb81c6e24abcdc35168254c7846a43e2828b1ba5a8f82e219df"
key1 = "/CalDAV/calendars.example.com/user01/calendar/"
timestamp1 = 1000
uid = "D2256BCC-48E2-42D1-BD89-CBA1E4CCDFFB"
yield txn.addAPNSubscription(token, key1, timestamp1, uid)
+ yield txn.addAPNSubscription(token2, key1, timestamp1, uid)
key2 = "/CalDAV/calendars.example.com/user02/calendar/"
timestamp2 = 3000
@@ -88,6 +92,7 @@
subscriptions = (yield txn.apnSubscriptionsBySubscriber(uid))
self.assertTrue([token, key1, timestamp1] in subscriptions)
self.assertTrue([token, key2, timestamp2] in subscriptions)
+ self.assertTrue([token2, key1, timestamp1] in subscriptions)
# Verify an update to a subscription with a different uid takes on
# the new uid
@@ -117,8 +122,9 @@
# Notification arrives from calendar server
yield service.enqueue("update", "CalDAV|user01/calendar")
- # The notification should be in the queue
- self.assertEquals(service.providers["CalDAV"].queue, [(token, key1)])
+ # The notifications should be in the queue
+ self.assertTrue((token, key1) in service.providers["CalDAV"].queue)
+ self.assertTrue((token2, key1) in service.providers["CalDAV"].queue)
# Start the service, making the connection which should service the
# queue
@@ -139,10 +145,26 @@
rawData[45:])
self.assertEquals(payload[0], '{"key" : "%s"}' % (key1,))
# Verify token history is updated
- self.assertEquals(providerConnector.service.protocol.history.history,
- [(1, token)]
- )
+ self.assertTrue(token in [t for (i, t) in providerConnector.service.protocol.history.history])
+ self.assertTrue(token2 in [t for (i, t) in providerConnector.service.protocol.history.history])
+
+ #
+ # Verify staggering behavior
+ #
+
+ # Reset sent data
+ providerConnector.transport.data = None
+ # Send notification while service is connected
+ yield service.enqueue("update", "CalDAV|user01/calendar")
+ clock.advance(1) # so that first push is sent
+ self.assertEquals(len(providerConnector.transport.data), 103)
+ # Reset sent data
+ providerConnector.transport.data = None
+ clock.advance(3) # so that second push is sent
+ self.assertEquals(len(providerConnector.transport.data), 103)
+
+
def errorTestFunction(status, identifier):
history.append((status, identifier))
return succeed(None)
@@ -223,12 +245,18 @@
)
# Verify processError removes associated subscriptions and history
- yield providerConnector.service.protocol.processError(8, 1)
+ # First find the id corresponding to token2
+ for (id, t) in providerConnector.service.protocol.history.history:
+ if t == token2:
+ break
+
+ yield providerConnector.service.protocol.processError(8, id)
# The token for this identifier is gone
- self.assertEquals(providerConnector.service.protocol.history.history, [])
+ self.assertTrue((id, token2) not in providerConnector.service.protocol.history.history)
+
# All subscriptions for this token should now be gone
txn = self.store.newTransaction()
- subscriptions = (yield txn.apnSubscriptionsByToken(token))
+ subscriptions = (yield txn.apnSubscriptionsByToken(token2))
yield txn.commit()
self.assertEquals(subscriptions, [])
Modified: CalendarServer/trunk/calendarserver/push/util.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/util.py 2012-04-12 21:19:44 UTC (rev 9042)
+++ CalendarServer/trunk/calendarserver/push/util.py 2012-04-12 21:51:02 UTC (rev 9043)
@@ -15,6 +15,7 @@
##
from OpenSSL import crypto
+from twext.python.log import LoggingMixIn
def getAPNTopicFromCertificate(certPath):
"""
@@ -99,3 +100,66 @@
del self.history[index]
return token
return None
+
+
+
+class PushScheduler(LoggingMixIn):
+ """
+ Allows staggered scheduling of push notifications
+ """
+
+ def __init__(self, reactor, callback, staggerSeconds=1):
+ """
+ @param callback: The method to call when it's time to send a push
+ @type callback: callable
+ @param staggerSeconds: The number of seconds to stagger between each
+ push
+ @type staggerSeconds: integer
+ """
+ self.outstanding = {}
+ self.reactor = reactor
+ self.callback = callback
+ self.staggerSeconds = staggerSeconds
+
+ def schedule(self, tokens, key):
+ """
+ Schedules a batch of notifications for the given tokens, staggered
+ with self.staggerSeconds between each one. Duplicates are ignored,
+ so if a token/key pair is already scheduled and not yet sent, a new
+ one will not be scheduled for that pair.
+
+ @param tokens: The device tokens to schedule notifications for
+ @type tokens: List of strings
+ @param key: The key to use for this batch of notifications
+ @type key: String
+ """
+ scheduleTime = 0.0
+ for token in tokens:
+ internalKey = (token, key)
+ if self.outstanding.has_key(internalKey):
+ self.log_debug("PushScheduler already has this scheduled: %s" %
+ (internalKey,))
+ else:
+ self.outstanding[internalKey] = self.reactor.callLater(
+ scheduleTime, self.send, token, key)
+ self.log_debug("PushScheduler scheduled: %s in %.0f sec" %
+ (internalKey, scheduleTime))
+ scheduleTime += self.staggerSeconds
+
+ def send(self, token, key):
+ """
+ This method is what actually gets scheduled. Its job is to remove
+ its corresponding entry from the outstanding dict and call the
+ callback.
+ """
+ self.log_debug("PushScheduler fired for %s %s" % (token, key))
+ del self.outstanding[(token, key)]
+ self.callback(token, key)
+
+ def stop(self):
+ """
+ Cancel all outstanding delayed calls
+ """
+ for (token, key), delayed in self.outstanding.iteritems():
+ self.log_debug("PushScheduler cancelling %s %s" % (token, key))
+ delayed.cancel()
Modified: CalendarServer/trunk/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/stdconfig.py 2012-04-12 21:19:44 UTC (rev 9042)
+++ CalendarServer/trunk/twistedcaldav/stdconfig.py 2012-04-12 21:51:02 UTC (rev 9043)
@@ -702,6 +702,8 @@
"FeedbackPort" : 2196,
"FeedbackUpdateSeconds" : 28800, # 8 hours
"Environment" : "PRODUCTION",
+ "EnableStaggering" : False,
+ "StaggerSeconds" : 3,
"CalDAV" : {
"CertificatePath" : "",
"PrivateKeyPath" : "",
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120412/a1467b88/attachment-0001.html>
More information about the calendarserver-changes
mailing list