[CalendarServer-changes] [8558] CalendarServer/trunk/calendarserver/push

source_changes at macosforge.org source_changes at macosforge.org
Tue Jan 17 16:06:30 PST 2012


Revision: 8558
          http://trac.macosforge.org/projects/calendarserver/changeset/8558
Author:   sagen at apple.com
Date:     2012-01-17 16:06:29 -0800 (Tue, 17 Jan 2012)
Log Message:
-----------
Maintain a history of recently sent APN tokens, so that if an error comes back we can match it up with the right token and remove all associated subscriptions.

Modified Paths:
--------------
    CalendarServer/trunk/calendarserver/push/applepush.py
    CalendarServer/trunk/calendarserver/push/test/test_applepush.py
    CalendarServer/trunk/calendarserver/push/util.py

Modified: CalendarServer/trunk/calendarserver/push/applepush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/applepush.py	2012-01-17 19:51:07 UTC (rev 8557)
+++ CalendarServer/trunk/calendarserver/push/applepush.py	2012-01-18 00:06:29 UTC (rev 8558)
@@ -34,7 +34,7 @@
 import struct
 import time
 from txdav.common.icommondatastore import InvalidSubscriptionValues
-from calendarserver.push.util import validToken
+from calendarserver.push.util import validToken, TokenHistory
 
 
 
@@ -96,6 +96,7 @@
                     feedbackTestConnector = testConnectorClass()
 
                 provider = APNProviderService(
+                    service.store,
                     settings["ProviderHost"],
                     settings["ProviderPort"],
                     settings[protocol]["CertificatePath"],
@@ -202,11 +203,15 @@
         255 : "None (unknown)",
     }
 
+    # If error code comes back as one of these, remove the associated device
+    # token
+    TOKEN_REMOVAL_CODES = (5, 8)
+
     MESSAGE_LENGTH = 6
 
     def makeConnection(self, transport):
-        self.identifier = 0
-        # self.log_debug("ProviderProtocol makeConnection")
+        self.history = TokenHistory()
+        self.log_debug("ProviderProtocol makeConnection")
         protocol.Protocol.makeConnection(self, transport)
 
     def connectionMade(self):
@@ -222,6 +227,7 @@
         # Clear the reference to us from the factory
         self.factory.connection = None
 
+    @inlineCallbacks
     def dataReceived(self, data, fn=None):
         """
         Buffer and divide up received data into error messages which are
@@ -241,16 +247,18 @@
             try:
                 command, status, identifier = struct.unpack("!BBI", message)
                 if command == self.COMMAND_ERROR:
-                    fn(status, identifier)
+                    yield fn(status, identifier)
             except Exception, e:
                 self.log_warn("ProviderProtocol could not process error: %s (%s)" %
                     (message.encode("hex"), e))
 
 
+    @inlineCallbacks
     def processError(self, status, identifier):
         """
         Handles an error message we've received from on feedback channel.
-        Not much to do here besides logging the error.
+        If the error code is one that indicates a bad token, remove all
+        subscriptions corresponding to that token.
 
         @param status: The status value returned from APN Feedback server
         @type status: C{int}
@@ -260,8 +268,21 @@
         @type status: C{int}
         """
         msg = self.STATUS_CODES.get(status, "Unknown status code")
-        self.log_error("Received APN error %d on identifier %d: %s" % (status, identifier, msg))
+        self.log_warn("Received APN error %d on identifier %d: %s" % (status, identifier, msg))
+        if status in self.TOKEN_REMOVAL_CODES:
+            token = self.history.extractIdentifier(identifier)
+            if token is not None:
+                self.log_warn("Removing subscriptions for bad token: %s" %
+                    (token,))
+                txn = self.factory.store.newTransaction()
+                subscriptions = (yield txn.apnSubscriptionsByToken(token))
+                for key, modified, uid in subscriptions:
+                    self.log_warn("Removing subscription: %s %s" %
+                        (token, key))
+                    yield txn.removeAPNSubscription(token, key)
+                yield txn.commit()
 
+
     def sendNotification(self, token, key):
         """
         Sends a push notification message for the key to the device associated
@@ -283,16 +304,16 @@
             self.log_error("Invalid APN token in database: %s" % (token,))
             return
 
-        self.identifier += 1
+        identifier = self.history.add(token)
         payload = '{"key" : "%s"}' % (key,)
         payloadLength = len(payload)
         self.log_debug("Sending APNS notification to %s: id=%d payload=%s" %
-            (token, self.identifier, payload))
+            (token, identifier, payload))
 
         self.transport.write(
             struct.pack("!BIIH32sH%ds" % (payloadLength,),
                 self.COMMAND_ENHANCED,  # Command
-                self.identifier,        # Identifier
+                identifier,             # Identifier
                 0,                      # Expiry
                 32,                     # Token Length
                 binaryToken,            # Token
@@ -306,14 +327,16 @@
 
     protocol = APNProviderProtocol
 
-    def __init__(self, service):
+    def __init__(self, service, store):
         self.service = service
+        self.store = store
         self.noisy = True
         self.maxDelay = 30 # max seconds between connection attempts
 
     def clientConnectionMade(self):
         self.log_warn("Connection to APN server made")
         self.service.clientConnectionMade()
+        self.delay = 1.0
 
     def clientConnectionLost(self, connector, reason):
         self.log_warn("Connection to APN server lost: %s" % (reason,))
@@ -371,7 +394,7 @@
 
 class APNProviderService(APNConnectionService):
 
-    def __init__(self, host, port, certPath, keyPath, chainPath="",
+    def __init__(self, store, host, port, certPath, keyPath, chainPath="",
         passphrase="", sslMethod="TLSv1_METHOD", testConnector=None,
         reactor=None):
 
@@ -379,12 +402,13 @@
             chainPath=chainPath, passphrase=passphrase, sslMethod=sslMethod,
             testConnector=testConnector, reactor=reactor)
 
+        self.store = store
         self.factory = None
         self.queue = []
 
     def startService(self):
         self.log_info("APNProviderService startService")
-        self.factory = APNProviderFactory(self)
+        self.factory = APNProviderFactory(self, self.store)
         self.connect(self.factory)
 
     def stopService(self):

Modified: CalendarServer/trunk/calendarserver/push/test/test_applepush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/test/test_applepush.py	2012-01-17 19:51:07 UTC (rev 8557)
+++ CalendarServer/trunk/calendarserver/push/test/test_applepush.py	2012-01-18 00:06:29 UTC (rev 8558)
@@ -17,7 +17,7 @@
 from calendarserver.push.applepush import (
     ApplePushNotifierService, APNProviderProtocol
 )
-from calendarserver.push.util import validToken
+from calendarserver.push.util import validToken, TokenHistory
 from twistedcaldav.test.util import TestCase
 from twisted.internet.defer import inlineCallbacks, succeed
 from twisted.internet.task import Clock
@@ -110,8 +110,8 @@
         self.assertEquals(service.providers["CalDAV"].queue, [])
 
         # Verify data sent to APN
-        connector = service.providers["CalDAV"].testConnector
-        rawData = connector.transport.data
+        providerConnector = service.providers["CalDAV"].testConnector
+        rawData = providerConnector.transport.data
         self.assertEquals(len(rawData), 103)
         data = struct.unpack("!BIIH32sH", rawData[:45])
         self.assertEquals(data[0], 1) # command
@@ -120,6 +120,10 @@
         payload = struct.unpack("%ds" % (payloadLength,),
             rawData[45:])
         self.assertEquals(payload[0], '{"key" : "%s"}' % (key1,))
+        # Verify token history is updated
+        self.assertEquals(providerConnector.service.protocol.history.history,
+            [(1, token)]
+        )
 
         def errorTestFunction(status, identifier):
             history.append((status, identifier))
@@ -128,7 +132,7 @@
         # Simulate an error
         history = []
         errorData = struct.pack("!BBI", APNProviderProtocol.COMMAND_ERROR, 1, 2)
-        yield connector.receiveData(errorData, fn=errorTestFunction)
+        yield providerConnector.receiveData(errorData, fn=errorTestFunction)
         clock.advance(301)
 
         # Simulate multiple errors and dataReceived called
@@ -139,17 +143,17 @@
             APNProviderProtocol.COMMAND_ERROR, 3, 4,
             APNProviderProtocol.COMMAND_ERROR, 5, 6,
         )
-        yield connector.receiveData(errorData[:4], fn=errorTestFunction)
+        yield providerConnector.receiveData(errorData[:4], fn=errorTestFunction)
         # Send remaining bytes
-        yield connector.receiveData(errorData[4:], fn=errorTestFunction)
+        yield providerConnector.receiveData(errorData[4:], fn=errorTestFunction)
         self.assertEquals(history, [(3, 4), (5, 6)])
         # Buffer is empty
-        self.assertEquals(len(connector.service.protocol.buffer), 0)
+        self.assertEquals(len(providerConnector.service.protocol.buffer), 0)
 
         # Sending 7 bytes
-        yield connector.receiveData("!" * 7, fn=errorTestFunction)
+        yield providerConnector.receiveData("!" * 7, fn=errorTestFunction)
         # Buffer has 1 byte remaining
-        self.assertEquals(len(connector.service.protocol.buffer), 1)
+        self.assertEquals(len(providerConnector.service.protocol.buffer), 1)
 
 
         # Prior to feedback, there are 2 subscriptions
@@ -160,12 +164,12 @@
 
 
         # Simulate feedback with a single token
-        connector = service.feedbacks["CalDAV"].testConnector
+        feedbackConnector = service.feedbacks["CalDAV"].testConnector
         timestamp = 2000
         binaryToken = token.decode("hex")
         feedbackData = struct.pack("!IH32s", timestamp, len(binaryToken),
             binaryToken)
-        yield connector.receiveData(feedbackData)
+        yield feedbackConnector.receiveData(feedbackData)
 
         # Simulate feedback with multiple tokens, and dataReceived called
         # with amounts of data not fitting message boundaries
@@ -180,25 +184,37 @@
             timestamp, len(binaryToken), binaryToken,
             )
         # Send 1st 10 bytes
-        yield connector.receiveData(feedbackData[:10], fn=feedbackTestFunction)
+        yield feedbackConnector.receiveData(feedbackData[:10], fn=feedbackTestFunction)
         # Send remaining bytes
-        yield connector.receiveData(feedbackData[10:], fn=feedbackTestFunction)
+        yield feedbackConnector.receiveData(feedbackData[10:], fn=feedbackTestFunction)
         self.assertEquals(history, [(timestamp, token), (timestamp, token)])
         # Buffer is empty
-        self.assertEquals(len(connector.service.protocol.buffer), 0)
+        self.assertEquals(len(feedbackConnector.service.protocol.buffer), 0)
 
         # Sending 39 bytes
-        yield connector.receiveData("!" * 39, fn=feedbackTestFunction)
+        yield feedbackConnector.receiveData("!" * 39, fn=feedbackTestFunction)
         # Buffer has 1 byte remaining
-        self.assertEquals(len(connector.service.protocol.buffer), 1)
+        self.assertEquals(len(feedbackConnector.service.protocol.buffer), 1)
 
         # The second subscription should now be gone
-        # Prior to feedback, there are 2 subscriptions
         txn = self.store.newTransaction()
         subscriptions = (yield txn.apnSubscriptionsByToken(token))
         yield txn.commit()
-        self.assertEquals(len(subscriptions), 1)
+        self.assertEquals(subscriptions,
+            [["/CalDAV/calendars.example.com/user02/calendar/", 3000, "D2256BCC-48E2-42D1-BD89-CBA1E4CCDFFB"]]
+        )
 
+        # Verify processError removes associated subscriptions and history
+        yield providerConnector.service.protocol.processError(8, 1)
+        # The token for this identifier is gone
+        self.assertEquals(providerConnector.service.protocol.history.history, [])
+        # All subscriptions for this token should now be gone
+        txn = self.store.newTransaction()
+        subscriptions = (yield txn.apnSubscriptionsByToken(token))
+        yield txn.commit()
+        self.assertEquals(subscriptions, [])
+
+
     def test_validToken(self):
         self.assertTrue(validToken("2d0d55cd7f98bcb81c6e24abcdc35168254c7846a43e2828b1ba5a8f82e219df"))
         self.assertFalse(validToken("d0d55cd7f98bcb81c6e24abcdc35168254c7846a43e2828b1ba5a8f82e219df"))
@@ -206,6 +222,64 @@
         self.assertFalse(validToken(""))
 
 
+    def test_TokenHistory(self):
+        history = TokenHistory(maxSize=5)
+
+        # Ensure returned identifiers increment
+        for id, token in enumerate(("one", "two", "three", "four", "five"),
+            start=1):
+            self.assertEquals(id, history.add(token))
+        self.assertEquals(len(history.history), 5)
+
+        # History size never exceeds maxSize
+        id = history.add("six")
+        self.assertEquals(id, 6)
+        self.assertEquals(len(history.history), 5)
+        self.assertEquals(
+            history.history,
+            [(2, "two"), (3, "three"), (4, "four"), (5, "five"), (6, "six")]
+        )
+        id = history.add("seven")
+        self.assertEquals(id, 7)
+        self.assertEquals(len(history.history), 5)
+        self.assertEquals(
+            history.history,
+            [(3, "three"), (4, "four"), (5, "five"), (6, "six"), (7, "seven")]
+        )
+
+        # Look up non-existent identifier
+        token = history.extractIdentifier(9999)
+        self.assertEquals(token, None)
+        self.assertEquals(
+            history.history,
+            [(3, "three"), (4, "four"), (5, "five"), (6, "six"), (7, "seven")]
+        )
+
+        # Look up oldest identifier in history
+        token = history.extractIdentifier(3)
+        self.assertEquals(token, "three")
+        self.assertEquals(
+            history.history,
+            [(4, "four"), (5, "five"), (6, "six"), (7, "seven")]
+        )
+
+        # Look up latest identifier in history
+        token = history.extractIdentifier(7)
+        self.assertEquals(token, "seven")
+        self.assertEquals(
+            history.history,
+            [(4, "four"), (5, "five"), (6, "six")]
+        )
+
+        # Look up an identifier in the middle
+        token = history.extractIdentifier(5)
+        self.assertEquals(token, "five")
+        self.assertEquals(
+            history.history,
+            [(4, "four"), (6, "six")]
+        )
+
+
 class TestConnector(object):
 
     def connect(self, service, factory):

Modified: CalendarServer/trunk/calendarserver/push/util.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/util.py	2012-01-17 19:51:07 UTC (rev 8557)
+++ CalendarServer/trunk/calendarserver/push/util.py	2012-01-18 00:06:29 UTC (rev 8558)
@@ -50,3 +50,52 @@
 
     return True
 
+
+class TokenHistory(object):
+    """
+    Manages a queue of tokens and corresponding identifiers.  Queue is always
+    kept below maxSize in length (older entries removed as needed).
+    """
+
+    def __init__(self, maxSize=200):
+        """
+        @param maxSize: How large the history is allowed to grow.  Once this
+            size is reached, older entries are pruned as needed.
+        @type maxSize: C{int}
+        """
+        self.maxSize = maxSize
+        self.identifier = 0
+        self.history = []
+
+    def add(self, token):
+        """
+        Add a token to the history, and return the new identifier associated
+        with this token.  Identifiers begin at 1 and increase each time this
+        is called.  If the number of items in the history exceeds maxSize,
+        older entries are removed to get the size down to maxSize.
+
+        @param token: The token to store
+        @type token: C{str}
+        @returns: the message identifier associated with this token, C{int}
+        """
+        self.identifier += 1
+        self.history.append((self.identifier, token))
+        del self.history[:-self.maxSize]
+        return self.identifier
+
+    def extractIdentifier(self, identifier):
+        """
+        Look for the token associated with the identifier.  Remove the
+        identifier-token pair from the history and return the token.  Return
+        None if the identifier is not found in the history.
+
+        @param identifier: The identifier to look up
+        @type identifier: C{int}
+        @returns: the token associated with this message identifier, C{str},
+            or None if not found
+        """
+        for index, (id, token) in enumerate(self.history):
+            if id == identifier:
+                del self.history[index]
+                return token
+        return None
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120117/0c4a7210/attachment-0001.html>


More information about the calendarserver-changes mailing list