[CalendarServer-changes] [8558] CalendarServer/trunk/calendarserver/push
source_changes at macosforge.org
source_changes at macosforge.org
Tue Jan 17 16:06:30 PST 2012
Revision: 8558
http://trac.macosforge.org/projects/calendarserver/changeset/8558
Author: sagen at apple.com
Date: 2012-01-17 16:06:29 -0800 (Tue, 17 Jan 2012)
Log Message:
-----------
Maintain a history of recently sent APN tokens, so that if an error comes back we can match it up with the right token and remove all associated subscriptions.
Modified Paths:
--------------
CalendarServer/trunk/calendarserver/push/applepush.py
CalendarServer/trunk/calendarserver/push/test/test_applepush.py
CalendarServer/trunk/calendarserver/push/util.py
Modified: CalendarServer/trunk/calendarserver/push/applepush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/applepush.py 2012-01-17 19:51:07 UTC (rev 8557)
+++ CalendarServer/trunk/calendarserver/push/applepush.py 2012-01-18 00:06:29 UTC (rev 8558)
@@ -34,7 +34,7 @@
import struct
import time
from txdav.common.icommondatastore import InvalidSubscriptionValues
-from calendarserver.push.util import validToken
+from calendarserver.push.util import validToken, TokenHistory
@@ -96,6 +96,7 @@
feedbackTestConnector = testConnectorClass()
provider = APNProviderService(
+ service.store,
settings["ProviderHost"],
settings["ProviderPort"],
settings[protocol]["CertificatePath"],
@@ -202,11 +203,15 @@
255 : "None (unknown)",
}
+ # If error code comes back as one of these, remove the associated device
+ # token
+ TOKEN_REMOVAL_CODES = (5, 8)
+
MESSAGE_LENGTH = 6
def makeConnection(self, transport):
- self.identifier = 0
- # self.log_debug("ProviderProtocol makeConnection")
+ self.history = TokenHistory()
+ self.log_debug("ProviderProtocol makeConnection")
protocol.Protocol.makeConnection(self, transport)
def connectionMade(self):
@@ -222,6 +227,7 @@
# Clear the reference to us from the factory
self.factory.connection = None
+ @inlineCallbacks
def dataReceived(self, data, fn=None):
"""
Buffer and divide up received data into error messages which are
@@ -241,16 +247,18 @@
try:
command, status, identifier = struct.unpack("!BBI", message)
if command == self.COMMAND_ERROR:
- fn(status, identifier)
+ yield fn(status, identifier)
except Exception, e:
self.log_warn("ProviderProtocol could not process error: %s (%s)" %
(message.encode("hex"), e))
+ @inlineCallbacks
def processError(self, status, identifier):
"""
Handles an error message we've received from on feedback channel.
- Not much to do here besides logging the error.
+ If the error code is one that indicates a bad token, remove all
+ subscriptions corresponding to that token.
@param status: The status value returned from APN Feedback server
@type status: C{int}
@@ -260,8 +268,21 @@
@type status: C{int}
"""
msg = self.STATUS_CODES.get(status, "Unknown status code")
- self.log_error("Received APN error %d on identifier %d: %s" % (status, identifier, msg))
+ self.log_warn("Received APN error %d on identifier %d: %s" % (status, identifier, msg))
+ if status in self.TOKEN_REMOVAL_CODES:
+ token = self.history.extractIdentifier(identifier)
+ if token is not None:
+ self.log_warn("Removing subscriptions for bad token: %s" %
+ (token,))
+ txn = self.factory.store.newTransaction()
+ subscriptions = (yield txn.apnSubscriptionsByToken(token))
+ for key, modified, uid in subscriptions:
+ self.log_warn("Removing subscription: %s %s" %
+ (token, key))
+ yield txn.removeAPNSubscription(token, key)
+ yield txn.commit()
+
def sendNotification(self, token, key):
"""
Sends a push notification message for the key to the device associated
@@ -283,16 +304,16 @@
self.log_error("Invalid APN token in database: %s" % (token,))
return
- self.identifier += 1
+ identifier = self.history.add(token)
payload = '{"key" : "%s"}' % (key,)
payloadLength = len(payload)
self.log_debug("Sending APNS notification to %s: id=%d payload=%s" %
- (token, self.identifier, payload))
+ (token, identifier, payload))
self.transport.write(
struct.pack("!BIIH32sH%ds" % (payloadLength,),
self.COMMAND_ENHANCED, # Command
- self.identifier, # Identifier
+ identifier, # Identifier
0, # Expiry
32, # Token Length
binaryToken, # Token
@@ -306,14 +327,16 @@
protocol = APNProviderProtocol
- def __init__(self, service):
+ def __init__(self, service, store):
self.service = service
+ self.store = store
self.noisy = True
self.maxDelay = 30 # max seconds between connection attempts
def clientConnectionMade(self):
self.log_warn("Connection to APN server made")
self.service.clientConnectionMade()
+ self.delay = 1.0
def clientConnectionLost(self, connector, reason):
self.log_warn("Connection to APN server lost: %s" % (reason,))
@@ -371,7 +394,7 @@
class APNProviderService(APNConnectionService):
- def __init__(self, host, port, certPath, keyPath, chainPath="",
+ def __init__(self, store, host, port, certPath, keyPath, chainPath="",
passphrase="", sslMethod="TLSv1_METHOD", testConnector=None,
reactor=None):
@@ -379,12 +402,13 @@
chainPath=chainPath, passphrase=passphrase, sslMethod=sslMethod,
testConnector=testConnector, reactor=reactor)
+ self.store = store
self.factory = None
self.queue = []
def startService(self):
self.log_info("APNProviderService startService")
- self.factory = APNProviderFactory(self)
+ self.factory = APNProviderFactory(self, self.store)
self.connect(self.factory)
def stopService(self):
Modified: CalendarServer/trunk/calendarserver/push/test/test_applepush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/test/test_applepush.py 2012-01-17 19:51:07 UTC (rev 8557)
+++ CalendarServer/trunk/calendarserver/push/test/test_applepush.py 2012-01-18 00:06:29 UTC (rev 8558)
@@ -17,7 +17,7 @@
from calendarserver.push.applepush import (
ApplePushNotifierService, APNProviderProtocol
)
-from calendarserver.push.util import validToken
+from calendarserver.push.util import validToken, TokenHistory
from twistedcaldav.test.util import TestCase
from twisted.internet.defer import inlineCallbacks, succeed
from twisted.internet.task import Clock
@@ -110,8 +110,8 @@
self.assertEquals(service.providers["CalDAV"].queue, [])
# Verify data sent to APN
- connector = service.providers["CalDAV"].testConnector
- rawData = connector.transport.data
+ providerConnector = service.providers["CalDAV"].testConnector
+ rawData = providerConnector.transport.data
self.assertEquals(len(rawData), 103)
data = struct.unpack("!BIIH32sH", rawData[:45])
self.assertEquals(data[0], 1) # command
@@ -120,6 +120,10 @@
payload = struct.unpack("%ds" % (payloadLength,),
rawData[45:])
self.assertEquals(payload[0], '{"key" : "%s"}' % (key1,))
+ # Verify token history is updated
+ self.assertEquals(providerConnector.service.protocol.history.history,
+ [(1, token)]
+ )
def errorTestFunction(status, identifier):
history.append((status, identifier))
@@ -128,7 +132,7 @@
# Simulate an error
history = []
errorData = struct.pack("!BBI", APNProviderProtocol.COMMAND_ERROR, 1, 2)
- yield connector.receiveData(errorData, fn=errorTestFunction)
+ yield providerConnector.receiveData(errorData, fn=errorTestFunction)
clock.advance(301)
# Simulate multiple errors and dataReceived called
@@ -139,17 +143,17 @@
APNProviderProtocol.COMMAND_ERROR, 3, 4,
APNProviderProtocol.COMMAND_ERROR, 5, 6,
)
- yield connector.receiveData(errorData[:4], fn=errorTestFunction)
+ yield providerConnector.receiveData(errorData[:4], fn=errorTestFunction)
# Send remaining bytes
- yield connector.receiveData(errorData[4:], fn=errorTestFunction)
+ yield providerConnector.receiveData(errorData[4:], fn=errorTestFunction)
self.assertEquals(history, [(3, 4), (5, 6)])
# Buffer is empty
- self.assertEquals(len(connector.service.protocol.buffer), 0)
+ self.assertEquals(len(providerConnector.service.protocol.buffer), 0)
# Sending 7 bytes
- yield connector.receiveData("!" * 7, fn=errorTestFunction)
+ yield providerConnector.receiveData("!" * 7, fn=errorTestFunction)
# Buffer has 1 byte remaining
- self.assertEquals(len(connector.service.protocol.buffer), 1)
+ self.assertEquals(len(providerConnector.service.protocol.buffer), 1)
# Prior to feedback, there are 2 subscriptions
@@ -160,12 +164,12 @@
# Simulate feedback with a single token
- connector = service.feedbacks["CalDAV"].testConnector
+ feedbackConnector = service.feedbacks["CalDAV"].testConnector
timestamp = 2000
binaryToken = token.decode("hex")
feedbackData = struct.pack("!IH32s", timestamp, len(binaryToken),
binaryToken)
- yield connector.receiveData(feedbackData)
+ yield feedbackConnector.receiveData(feedbackData)
# Simulate feedback with multiple tokens, and dataReceived called
# with amounts of data not fitting message boundaries
@@ -180,25 +184,37 @@
timestamp, len(binaryToken), binaryToken,
)
# Send 1st 10 bytes
- yield connector.receiveData(feedbackData[:10], fn=feedbackTestFunction)
+ yield feedbackConnector.receiveData(feedbackData[:10], fn=feedbackTestFunction)
# Send remaining bytes
- yield connector.receiveData(feedbackData[10:], fn=feedbackTestFunction)
+ yield feedbackConnector.receiveData(feedbackData[10:], fn=feedbackTestFunction)
self.assertEquals(history, [(timestamp, token), (timestamp, token)])
# Buffer is empty
- self.assertEquals(len(connector.service.protocol.buffer), 0)
+ self.assertEquals(len(feedbackConnector.service.protocol.buffer), 0)
# Sending 39 bytes
- yield connector.receiveData("!" * 39, fn=feedbackTestFunction)
+ yield feedbackConnector.receiveData("!" * 39, fn=feedbackTestFunction)
# Buffer has 1 byte remaining
- self.assertEquals(len(connector.service.protocol.buffer), 1)
+ self.assertEquals(len(feedbackConnector.service.protocol.buffer), 1)
# The second subscription should now be gone
- # Prior to feedback, there are 2 subscriptions
txn = self.store.newTransaction()
subscriptions = (yield txn.apnSubscriptionsByToken(token))
yield txn.commit()
- self.assertEquals(len(subscriptions), 1)
+ self.assertEquals(subscriptions,
+ [["/CalDAV/calendars.example.com/user02/calendar/", 3000, "D2256BCC-48E2-42D1-BD89-CBA1E4CCDFFB"]]
+ )
+ # Verify processError removes associated subscriptions and history
+ yield providerConnector.service.protocol.processError(8, 1)
+ # The token for this identifier is gone
+ self.assertEquals(providerConnector.service.protocol.history.history, [])
+ # All subscriptions for this token should now be gone
+ txn = self.store.newTransaction()
+ subscriptions = (yield txn.apnSubscriptionsByToken(token))
+ yield txn.commit()
+ self.assertEquals(subscriptions, [])
+
+
def test_validToken(self):
self.assertTrue(validToken("2d0d55cd7f98bcb81c6e24abcdc35168254c7846a43e2828b1ba5a8f82e219df"))
self.assertFalse(validToken("d0d55cd7f98bcb81c6e24abcdc35168254c7846a43e2828b1ba5a8f82e219df"))
@@ -206,6 +222,64 @@
self.assertFalse(validToken(""))
+ def test_TokenHistory(self):
+ history = TokenHistory(maxSize=5)
+
+ # Ensure returned identifiers increment
+ for id, token in enumerate(("one", "two", "three", "four", "five"),
+ start=1):
+ self.assertEquals(id, history.add(token))
+ self.assertEquals(len(history.history), 5)
+
+ # History size never exceeds maxSize
+ id = history.add("six")
+ self.assertEquals(id, 6)
+ self.assertEquals(len(history.history), 5)
+ self.assertEquals(
+ history.history,
+ [(2, "two"), (3, "three"), (4, "four"), (5, "five"), (6, "six")]
+ )
+ id = history.add("seven")
+ self.assertEquals(id, 7)
+ self.assertEquals(len(history.history), 5)
+ self.assertEquals(
+ history.history,
+ [(3, "three"), (4, "four"), (5, "five"), (6, "six"), (7, "seven")]
+ )
+
+ # Look up non-existent identifier
+ token = history.extractIdentifier(9999)
+ self.assertEquals(token, None)
+ self.assertEquals(
+ history.history,
+ [(3, "three"), (4, "four"), (5, "five"), (6, "six"), (7, "seven")]
+ )
+
+ # Look up oldest identifier in history
+ token = history.extractIdentifier(3)
+ self.assertEquals(token, "three")
+ self.assertEquals(
+ history.history,
+ [(4, "four"), (5, "five"), (6, "six"), (7, "seven")]
+ )
+
+ # Look up latest identifier in history
+ token = history.extractIdentifier(7)
+ self.assertEquals(token, "seven")
+ self.assertEquals(
+ history.history,
+ [(4, "four"), (5, "five"), (6, "six")]
+ )
+
+ # Look up an identifier in the middle
+ token = history.extractIdentifier(5)
+ self.assertEquals(token, "five")
+ self.assertEquals(
+ history.history,
+ [(4, "four"), (6, "six")]
+ )
+
+
class TestConnector(object):
def connect(self, service, factory):
Modified: CalendarServer/trunk/calendarserver/push/util.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/util.py 2012-01-17 19:51:07 UTC (rev 8557)
+++ CalendarServer/trunk/calendarserver/push/util.py 2012-01-18 00:06:29 UTC (rev 8558)
@@ -50,3 +50,52 @@
return True
+
+class TokenHistory(object):
+ """
+ Manages a queue of tokens and corresponding identifiers. Queue is always
+ kept below maxSize in length (older entries removed as needed).
+ """
+
+ def __init__(self, maxSize=200):
+ """
+ @param maxSize: How large the history is allowed to grow. Once this
+ size is reached, older entries are pruned as needed.
+ @type maxSize: C{int}
+ """
+ self.maxSize = maxSize
+ self.identifier = 0
+ self.history = []
+
+ def add(self, token):
+ """
+ Add a token to the history, and return the new identifier associated
+ with this token. Identifiers begin at 1 and increase each time this
+ is called. If the number of items in the history exceeds maxSize,
+ older entries are removed to get the size down to maxSize.
+
+ @param token: The token to store
+ @type token: C{str}
+ @returns: the message identifier associated with this token, C{int}
+ """
+ self.identifier += 1
+ self.history.append((self.identifier, token))
+ del self.history[:-self.maxSize]
+ return self.identifier
+
+ def extractIdentifier(self, identifier):
+ """
+ Look for the token associated with the identifier. Remove the
+ identifier-token pair from the history and return the token. Return
+ None if the identifier is not found in the history.
+
+ @param identifier: The identifier to look up
+ @type identifier: C{int}
+ @returns: the token associated with this message identifier, C{str},
+ or None if not found
+ """
+ for index, (id, token) in enumerate(self.history):
+ if id == identifier:
+ del self.history[index]
+ return token
+ return None
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120117/0c4a7210/attachment-0001.html>
More information about the calendarserver-changes
mailing list