[CalendarServer-changes] [8147] CalendarServer/branches/users/sagen/applepush/twistedcaldav

source_changes at macosforge.org source_changes at macosforge.org
Wed Oct 5 15:02:04 PDT 2011


Revision: 8147
          http://trac.macosforge.org/projects/calendarserver/changeset/8147
Author:   sagen at apple.com
Date:     2011-10-05 15:02:03 -0700 (Wed, 05 Oct 2011)
Log Message:
-----------
Implements APN feedback channel

Modified Paths:
--------------
    CalendarServer/branches/users/sagen/applepush/twistedcaldav/applepush.py
    CalendarServer/branches/users/sagen/applepush/twistedcaldav/stdconfig.py
    CalendarServer/branches/users/sagen/applepush/twistedcaldav/test/test_applepush.py

Modified: CalendarServer/branches/users/sagen/applepush/twistedcaldav/applepush.py
===================================================================
--- CalendarServer/branches/users/sagen/applepush/twistedcaldav/applepush.py	2011-10-05 17:38:43 UTC (rev 8146)
+++ CalendarServer/branches/users/sagen/applepush/twistedcaldav/applepush.py	2011-10-05 22:02:03 UTC (rev 8147)
@@ -48,7 +48,9 @@
 class ApplePushNotifierService(service.MultiService, LoggingMixIn):
 
     @classmethod
-    def makeService(cls, settings, store):
+    def makeService(cls, settings, store, testConnectorClass=None,
+        reactor=None):
+
         service = cls()
 
         service.store = store
@@ -58,11 +60,19 @@
 
         for protocol in ("CalDAV", "CardDAV"):
 
+            providerTestConnector = None
+            feedbackTestConnector = None
+            if testConnectorClass is not None:
+                providerTestConnector = testConnectorClass()
+                feedbackTestConnector = testConnectorClass()
+
             provider = APNProviderService(
                 settings["ProviderHost"],
                 settings["ProviderPort"],
                 settings[protocol]["CertificatePath"],
                 settings[protocol]["PrivateKeyPath"],
+                testConnector=providerTestConnector,
+                reactor=reactor,
             )
             provider.setServiceParent(service)
             service.providers[protocol] = provider
@@ -70,15 +80,18 @@
                 (protocol, settings[protocol]["Topic"]))
 
             feedback = APNFeedbackService(
+                service.store,
+                settings["FeedbackUpdateSeconds"],
                 settings["FeedbackHost"],
                 settings["FeedbackPort"],
                 settings[protocol]["CertificatePath"],
                 settings[protocol]["PrivateKeyPath"],
+                testConnector=feedbackTestConnector,
+                reactor=reactor,
             )
             feedback.setServiceParent(service)
             service.feedbacks[protocol] = feedback
 
-
         return service
 
 
@@ -99,7 +112,7 @@
             # Look up subscriptions for this key
             txn = self.store.newTransaction()
             subscriptions = (yield txn.apnSubscriptionsByKey(key))
-            yield txn.commit()
+            yield txn.commit() # TODO: Glyph, needed?
 
             for token, guid in subscriptions:
                 self.log_debug("Sending APNS: token='%s' key='%s' guid='%s'" %
@@ -136,7 +149,7 @@
 
     def makeConnection(self, transport):
         self.identifier = 0
-        self.log_debug("ProviderProtocol makeConnection")
+        # self.log_debug("ProviderProtocol makeConnection")
         protocol.Protocol.makeConnection(self, transport)
 
     def connectionMade(self):
@@ -148,7 +161,7 @@
         # self.sendNotification(TOKEN, "xyzzy")
 
     def connectionLost(self, reason=None):
-        self.log_error("ProviderProtocol connectionLost: %s" % (reason,))
+        # self.log_debug("ProviderProtocol connectionLost: %s" % (reason,))
         # TODO: glyph review
         # Clear the reference to us from the factory
         self.factory.connection = None
@@ -202,7 +215,7 @@
         return p
 
     def clientConnectionLost(self, connector, reason):
-        self.log_error("Connection to APN server lost: %s" % (reason,))
+        # self.log_info("Connection to APN server lost: %s" % (reason,))
         ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
 
     def clientConnectionFailed(self, connector, reason):
@@ -215,7 +228,7 @@
 class APNConnectionService(service.Service, LoggingMixIn):
 
     def __init__(self, host, port, certPath, keyPath, chainPath="",
-        sslMethod="TLSv1_METHOD", testConnector=None):
+        sslMethod="TLSv1_METHOD", testConnector=None, reactor=None):
 
         self.host = host
         self.port = port
@@ -225,6 +238,10 @@
         self.sslMethod = sslMethod
         self.testConnector = testConnector
 
+        if reactor is None:
+            from twisted.internet import reactor
+        self.reactor = reactor
+
     def connect(self, factory):
         if self.testConnector is not None:
             # For testing purposes
@@ -242,11 +259,11 @@
 class APNProviderService(APNConnectionService):
 
     def __init__(self, host, port, certPath, keyPath, chainPath="",
-        sslMethod="TLSv1_METHOD", testConnector=None):
+        sslMethod="TLSv1_METHOD", testConnector=None, reactor=None):
 
         APNConnectionService.__init__(self, host, port, certPath, keyPath,
             chainPath="", sslMethod=sslMethod,
-            testConnector=testConnector)
+            testConnector=testConnector, reactor=reactor)
 
     def startService(self):
         self.log_debug("APNProviderService startService")
@@ -273,34 +290,47 @@
     Implements the Feedback portion of APNS
     """
 
+    def connectionMade(self):
+        self.log_debug("FeedbackProtocol connectionMade")
+
     def dataReceived(self, data):
         self.log_debug("FeedbackProtocol dataReceived %d bytes" % (len(data),))
         timestamp, tokenLength, binaryToken = struct.unpack("!IH32s", data)
         token = binaryToken.encode("hex")
         self.processFeedback(timestamp, token)
 
+    @inlineCallbacks
     def processFeedback(self, timestamp, token):
         self.log_debug("FeedbackProtocol processFeedback time=%d token=%s" %
             (timestamp, token))
         # TODO: actually see if we need to remove the token from subscriptions
+        txn = self.store.newTransaction()
+        subscriptions = (yield txn.apnSubscriptionsByToken(token))
+        yield txn.commit() # TODO: Glyph, needed?
 
+        for key, modified, guid in subscriptions:
+            if timestamp > modified:
+                self.log_debug("FeedbackProtocol removing subscription: %s %s" %
+                    (token, key))
+                yield txn.removeAPNSubscription(token, key)
 
+
 class APNFeedbackFactory(ClientFactory, LoggingMixIn):
 
     protocol = APNFeedbackProtocol
 
+    def __init__(self, store):
+        self.store = store
+
     def buildProtocol(self, addr):
         p = self.protocol()
         # TODO: glyph review
         # Give protocol a back-reference to factory so it can set/clear
         # the "connection" reference on the factory
         p.factory = self
+        p.store = self.store
         return p
 
-    def clientConnectionLost(self, connector, reason):
-        self.log_error("Connection to APN feedback server lost: %s" % (reason,))
-        ClientFactory.clientConnectionLost(self, connector, reason)
-
     def clientConnectionFailed(self, connector, reason):
         self.log_error("Unable to connect to APN feedback server: %s" %
             (reason,))
@@ -310,22 +340,33 @@
 
 class APNFeedbackService(APNConnectionService):
 
-    def __init__(self, host, port, certPath, keyPath, chainPath="",
-        sslMethod="TLSv1_METHOD", testConnector=None):
+    def __init__(self, store, updateSeconds, host, port, certPath, keyPath,
+        chainPath="", sslMethod="TLSv1_METHOD", testConnector=None,
+        reactor=None):
 
         APNConnectionService.__init__(self, host, port, certPath, keyPath,
             chainPath="", sslMethod=sslMethod,
-            testConnector=testConnector)
+            testConnector=testConnector, reactor=reactor)
 
+        self.store = store
+        self.updateSeconds = updateSeconds
+
     def startService(self):
         self.log_debug("APNFeedbackService startService")
-        # TODO: Set a timer to connect to feedback at an interval
-        # self.factory = APNFeedbackFactory()
-        # self.connect(self.factory)
+        self.factory = APNFeedbackFactory(self.store)
+        self.checkForFeedback()
 
     def stopService(self):
         self.log_debug("APNFeedbackService stopService")
+        if self.nextCheck is not None:
+            self.nextCheck.cancel()
 
+    def checkForFeedback(self):
+        self.nextCheck = None
+        self.log_debug("APNFeedbackService checkForFeedback")
+        self.connect(self.factory)
+        self.nextCheck = self.reactor.callLater(self.updateSeconds,
+            self.checkForFeedback)
 
 class APNSubscriptionResource(Resource):
 
@@ -336,7 +377,7 @@
 
     def __init__(self, store):
         self.store = store
-        # Hopefully we can use this store to manage subscriptions
+        # TODO: add authentication
 
     def http_GET(self, request):
         return self.processSubscription(request.args)
@@ -366,9 +407,10 @@
     def addSubscription(self, token, key):
         now = int(time.time()) # epoch seconds
         txn = self.store.newTransaction()
+        # TODO: use actual guid
         yield txn.addAPNSubscription(token, key, now, "xyzzy")
-        subscriptions = (yield txn.apnSubscriptionsByToken(token))
-        print subscriptions
+        # subscriptions = (yield txn.apnSubscriptionsByToken(token))
+        # print subscriptions
         yield txn.commit()
 
     def renderResponse(self, code, body=None):

Modified: CalendarServer/branches/users/sagen/applepush/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/branches/users/sagen/applepush/twistedcaldav/stdconfig.py	2011-10-05 17:38:43 UTC (rev 8146)
+++ CalendarServer/branches/users/sagen/applepush/twistedcaldav/stdconfig.py	2011-10-05 22:02:03 UTC (rev 8147)
@@ -638,6 +638,7 @@
                 "ProviderPort" : 2195,
                 "FeedbackHost" : "feedback.push.apple.com",
                 "FeedbackPort" : 2196,
+                "FeedbackUpdateSeconds" : 300, # 5 minutes
                 "Environment" : "PRODUCTION",
                 "CalDAV" : {
                     "CertificatePath" : "",

Modified: CalendarServer/branches/users/sagen/applepush/twistedcaldav/test/test_applepush.py
===================================================================
--- CalendarServer/branches/users/sagen/applepush/twistedcaldav/test/test_applepush.py	2011-10-05 17:38:43 UTC (rev 8146)
+++ CalendarServer/branches/users/sagen/applepush/twistedcaldav/test/test_applepush.py	2011-10-05 22:02:03 UTC (rev 8147)
@@ -15,10 +15,11 @@
 ##
 
 from twistedcaldav.applepush import (
-    ApplePushNotifierService, APNProviderService
+    ApplePushNotifierService, APNProviderService, APNProviderProtocol
 )
 from twistedcaldav.test.util import TestCase
 from twisted.internet.defer import inlineCallbacks, succeed
+from twisted.internet.task import Clock
 import struct
 import time
 
@@ -36,6 +37,7 @@
             "ProviderPort" : 2195,
             "FeedbackHost" : "feedback.push.apple.com",
             "FeedbackPort" : 2196,
+            "FeedbackUpdateSeconds" : 300,
             "CalDAV" : {
                 "CertificatePath" : "caldav.cer",
                 "PrivateKeyPath" : "caldav.pem",
@@ -49,18 +51,23 @@
         }
 
 
-        # Add a subscription
+        # Add subscriptions
         store = StubStore()
         txn = store.newTransaction()
         token = "2d0d55cd7f98bcb81c6e24abcdc35168254c7846a43e2828b1ba5a8f82e219df"
-        key = "/CalDAV/calendars.example.com/user01/calendar/"
-        now = int(time.time())
+        key1 = "/CalDAV/calendars.example.com/user01/calendar/"
+        timestamp1 = 1000
         guid = "D2256BCC-48E2-42D1-BD89-CBA1E4CCDFFB"
-        yield txn.addAPNSubscription(token, key, now, guid)
+        yield txn.addAPNSubscription(token, key1, timestamp1, guid)
 
+        key2 = "/CalDAV/calendars.example.com/user02/calendar/"
+        timestamp2 = 3000
+        yield txn.addAPNSubscription(token, key2, timestamp2, guid)
+
         # Set up the service
+        clock = Clock()
         service = (yield ApplePushNotifierService.makeService(settings, store,
-            testConnectorClass=TestConnector))
+            testConnectorClass=TestConnector, reactor=clock))
         self.assertEquals(set(service.providers.keys()), set(["CalDAV","CardDAV"]))
         self.assertEquals(set(service.feedbacks.keys()), set(["CalDAV","CardDAV"]))
         service.startService()
@@ -69,7 +76,8 @@
         service.enqueue("update", "CalDAV|user01/calendar")
 
         # Verify data sent to APN
-        rawData = service.providers["CalDAV"].testConnector.getData()
+        connector = service.providers["CalDAV"].testConnector
+        rawData = connector.transport.data
         self.assertEquals(len(rawData), 103)
         data = struct.unpack("!BIIH32sH", rawData[:45])
         self.assertEquals(data[0], 1) # command
@@ -77,19 +85,39 @@
         payloadLength = data[5]
         payload = struct.unpack("%ds" % (payloadLength,),
             rawData[45:])
-        self.assertEquals(payload[0], '{"key" : "%s"}' % (key,))
+        self.assertEquals(payload[0], '{"key" : "%s"}' % (key1,))
 
+        # Simulate an error
+        errorData = struct.pack("!BBI", APNProviderProtocol.COMMAND_ERROR, 1, 1)
+        connector.receiveData(errorData)
+        clock.advance(301)
 
+        # Prior to feedback, there are 2 subscriptions
+        self.assertEquals(len(store.subscriptions), 2)
+
+        # Simulate feedback
+        timestamp = 2000
+        connector = service.feedbacks["CalDAV"].testConnector
+        binaryToken = token.decode("hex")
+        feedbackData = struct.pack("!IH32s", timestamp, len(binaryToken),
+            binaryToken)
+        connector.receiveData(feedbackData)
+
+        # The second subscription should now be gone
+        self.assertEquals(len(store.subscriptions), 1)
+
+
 class TestConnector(object):
 
     def connect(self, service, factory):
+        self.service = service
         service.protocol = factory.buildProtocol(None)
         service.connected = 1
         self.transport = StubTransport()
         service.protocol.makeConnection(self.transport)
 
-    def getData(self):
-        return self.transport.data
+    def receiveData(self, data):
+        self.service.protocol.dataReceived(data)
 
 
 class StubTransport(object):
@@ -122,11 +150,26 @@
                 matches.append((subscription.token, subscription.guid))
         return succeed(matches)
 
+    def apnSubscriptionsByToken(self, token):
+        matches = []
+        for subscription in self.store.subscriptions:
+            if subscription.token == token:
+                matches.append((subscription.key, subscription.timestamp,
+                    subscription.guid))
+        return succeed(matches)
+
     def addAPNSubscription(self, token, key, timestamp, guid):
         subscription = Subscription(token, key, timestamp, guid)
         self.store.subscriptions.append(subscription)
         return succeed(None)
 
+    def removeAPNSubscription(self, token, key):
+        matches = []
+        for subscription in list(self.store.subscriptions):
+            if subscription.token == token and subscription.key == key:
+                self.store.subscriptions.remove(subscription)
+        return succeed(None)
+
     def commit(self):
         pass
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20111005/b46574d2/attachment-0001.html>


More information about the calendarserver-changes mailing list