[CalendarServer-changes] [9108] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Fri Apr 13 15:45:50 PDT 2012


Revision: 9108
          http://trac.macosforge.org/projects/calendarserver/changeset/9108
Author:   sagen at apple.com
Date:     2012-04-13 15:45:48 -0700 (Fri, 13 Apr 2012)
Log Message:
-----------
Adds AMP-based notifications, a commandline utility for monitoring them, and updates the loadtest sim t
o use them.

Modified Paths:
--------------
    CalendarServer/trunk/calendarserver/push/applepush.py
    CalendarServer/trunk/calendarserver/push/test/test_applepush.py
    CalendarServer/trunk/calendarserver/push/util.py
    CalendarServer/trunk/conf/caldavd-test.plist
    CalendarServer/trunk/contrib/performance/loadtest/config.plist
    CalendarServer/trunk/contrib/performance/loadtest/ical.py
    CalendarServer/trunk/twistedcaldav/notify.py
    CalendarServer/trunk/twistedcaldav/resource.py
    CalendarServer/trunk/twistedcaldav/stdconfig.py

Added Paths:
-----------
    CalendarServer/trunk/bin/calendarserver_monitor_amp_notifications
    CalendarServer/trunk/calendarserver/push/amppush.py
    CalendarServer/trunk/calendarserver/push/test/test_amppush.py
    CalendarServer/trunk/calendarserver/tools/ampnotifications.py

Added: CalendarServer/trunk/bin/calendarserver_monitor_amp_notifications
===================================================================
--- CalendarServer/trunk/bin/calendarserver_monitor_amp_notifications	                        (rev 0)
+++ CalendarServer/trunk/bin/calendarserver_monitor_amp_notifications	2012-04-13 22:45:48 UTC (rev 9108)
@@ -0,0 +1,33 @@
+#!/usr/bin/env python
+
+##
+# Copyright (c) 2006-2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+import sys
+
+#PYTHONPATH
+
+if __name__ == "__main__":
+    if "PYTHONPATH" in globals():
+        sys.path.insert(0, PYTHONPATH)
+    else:
+        try:
+            import _calendarserver_preamble
+        except ImportError:
+            sys.exc_clear()
+
+    from calendarserver.tools.ampnotifications import main
+    main()


Property changes on: CalendarServer/trunk/bin/calendarserver_monitor_amp_notifications
___________________________________________________________________
Added: svn:executable
   + *

Added: CalendarServer/trunk/calendarserver/push/amppush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/amppush.py	                        (rev 0)
+++ CalendarServer/trunk/calendarserver/push/amppush.py	2012-04-13 22:45:48 UTC (rev 9108)
@@ -0,0 +1,249 @@
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from calendarserver.push.util import PushScheduler
+from twext.python.log import Logger, LoggingMixIn
+from twext.python.log import LoggingMixIn
+from twisted.application.internet import StreamServerEndpointService
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.endpoints import TCP4ClientEndpoint, TCP4ServerEndpoint
+from twisted.internet.protocol import Factory, ServerFactory
+from twisted.protocols import amp
+from twistedcaldav.notify import getPubSubPath
+import uuid
+
+
+log = Logger()
+
+
+# AMP Commands sent to server
+
+class SubscribeToID(amp.Command):
+    arguments = [('token', amp.String()), ('id', amp.String())]
+    response = [('status', amp.String())]
+
+
+class UnsubscribeFromID(amp.Command):
+    arguments = [('token', amp.String()), ('id', amp.String())]
+    response = [('status', amp.String())]
+
+
+# AMP Commands sent to client
+
+class NotificationForID(amp.Command):
+    arguments = [('id', amp.String())]
+    response = [('status', amp.String())]
+
+
+# Server classes
+
+class AMPPushNotifierService(StreamServerEndpointService, LoggingMixIn):
+    """
+    AMPPushNotifierService allows clients to use AMP to subscribe to,
+    and receive, change notifications.
+    """
+
+    @classmethod
+    def makeService(cls, settings, ignored, serverHostName, reactor=None):
+        return cls(settings, serverHostName, reactor=reactor)
+
+    def __init__(self, settings, serverHostName, reactor=None):
+        if reactor is None:
+            from twisted.internet import reactor
+        factory = AMPPushNotifierFactory(self)
+        endpoint = TCP4ServerEndpoint(reactor, settings["Port"])
+        super(AMPPushNotifierService, self).__init__(endpoint, factory)
+        self.subscribers = []
+
+        if settings["EnableStaggering"]:
+            self.scheduler = PushScheduler(reactor, self.sendNotification,
+                staggerSeconds=settings["StaggerSeconds"])
+        else:
+            self.scheduler = None
+
+        self.serverHostName = serverHostName
+
+    def addSubscriber(self, p):
+        self.log_debug("Added subscriber")
+        self.subscribers.append(p)
+
+    def removeSubscriber(self, p):
+        self.log_debug("Removed subscriber")
+        self.subscribers.remove(p)
+
+    def enqueue(self, op, id):
+        """
+        Sends an AMP push notification to any clients subscribing to this id.
+
+        @param op: The operation that took place, either "create" or "update"
+            (ignored in this implementation)
+        @type op: C{str}
+
+        @param id: The identifier of the resource that was updated, including
+            a prefix indicating whether this is CalDAV or CardDAV related.
+            The prefix is separated from the id with "|", e.g.:
+
+            "CalDAV|abc/def"
+
+            The id is an opaque token as far as this code is concerned, and
+            is used in conjunction with the prefix and the server hostname
+            to build the actual key value that devices subscribe to.
+        @type id: C{str}
+        """
+
+        try:
+            id.split("|", 1)
+        except ValueError:
+            # id has no protocol, so we can't do anything with it
+            self.log_error("Notification id '%s' is missing protocol" % (id,))
+            return
+
+        id = getPubSubPath(id, {"host": self.serverHostName})
+
+        tokens = []
+        for subscriber in self.subscribers:
+            token = subscriber.subscribedToID(id)
+            if token is not None:
+                tokens.append(token)
+        if tokens:
+            return self.scheduleNotifications(tokens, id)
+
+
+    @inlineCallbacks
+    def sendNotification(self, token, id):
+        for subscriber in self.subscribers:
+            if subscriber.subscribedToID(id):
+                yield subscriber.notify(token, id)
+
+
+    @inlineCallbacks
+    def scheduleNotifications(self, tokens, id):
+        if self.scheduler is not None:
+            self.scheduler.schedule(tokens, id)
+        else:
+            for token in tokens:
+                yield self.sendNotification(token, id)
+
+
+class AMPPushNotifierProtocol(amp.AMP):
+
+    def __init__(self, service):
+        super(AMPPushNotifierProtocol, self).__init__()
+        self.service = service
+        self.subscriptions = {}
+
+    def subscribe(self, token, id):
+        self.subscriptions[id] = token
+        return {"status" : "OK"}
+    SubscribeToID.responder(subscribe)
+
+    def unsubscribe(self, token, id):
+        try:
+            del self.subscriptions[id]
+        except KeyError:
+            pass
+        return {"status" : "OK"}
+    UnsubscribeFromID.responder(unsubscribe)
+
+    def notify(self, token, id):
+        if self.subscribedToID(id) == token:
+            return self.callRemote(NotificationForID, id=id)
+
+    def subscribedToID(self, id):
+        return self.subscriptions.get(id, None)
+
+    def connectionLost(self, reason=None):
+        self.service.removeSubscriber(self)
+
+
+class AMPPushNotifierFactory(ServerFactory, LoggingMixIn):
+
+    protocol = AMPPushNotifierProtocol
+
+    def __init__(self, service):
+        self.service = service
+
+    def buildProtocol(self, addr):
+        p = self.protocol(self.service)
+        self.service.addSubscriber(p)
+        p.service = self.service
+        return p
+
+
+# Client classes
+
+class AMPPushClientProtocol(amp.AMP):
+    """
+    Implements the client side of the AMP push protocol.  Whenever
+    the NotificationForID Command arrives, the registered callback
+    will be called with the id.
+    """
+
+    def __init__(self, callback):
+        super(AMPPushClientProtocol, self).__init__()
+        self.callback = callback
+
+    @inlineCallbacks
+    def notificationForID(self, id):
+        yield self.callback(id)
+        returnValue( {"status" : "OK"} )
+
+    NotificationForID.responder(notificationForID)
+
+
+class AMPPushClientFactory(Factory, LoggingMixIn):
+
+    protocol = AMPPushClientProtocol
+
+    def __init__(self, callback):
+        self.callback = callback
+
+    def buildProtocol(self, addr):
+        p = self.protocol(self.callback)
+        return p
+
+
+# Client helper methods
+
+ at inlineCallbacks
+def subscribeToIDs(host, port, ids, callback, reactor=None):
+    """
+    Clients can call this helper method to register a callback which
+    will get called whenever a push notification is fired for any
+    id in the ids list.
+
+    @param host: AMP host name to connect to
+    @type host: string
+    @param port: AMP port to connect to
+    @type port: integer
+    @param ids: The push IDs to subscribe to
+    @type ids: list of strings
+    @param callback: The method to call whenever a notification is
+        received.
+    @type callback: callable which is passed an id (string)
+    """
+
+    if reactor is None:
+        from twisted.internet import reactor
+
+    token = str(uuid.uuid4())
+    endpoint = TCP4ClientEndpoint(reactor, host, port)
+    factory = AMPPushClientFactory(callback)
+    protocol = yield endpoint.connect(factory)
+    for id in ids:
+        yield protocol.callRemote(SubscribeToID, token=token, id=id)
+
+    returnValue(factory)

Modified: CalendarServer/trunk/calendarserver/push/applepush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/applepush.py	2012-04-13 20:13:22 UTC (rev 9107)
+++ CalendarServer/trunk/calendarserver/push/applepush.py	2012-04-13 22:45:48 UTC (rev 9108)
@@ -58,7 +58,7 @@
     """
 
     @classmethod
-    def makeService(cls, settings, store, testConnectorClass=None,
+    def makeService(cls, settings, store, serverHostName, testConnectorClass=None,
         reactor=None):
         """
         Creates the various "subservices" that work together to implement

Added: CalendarServer/trunk/calendarserver/push/test/test_amppush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/test/test_amppush.py	                        (rev 0)
+++ CalendarServer/trunk/calendarserver/push/test/test_amppush.py	2012-04-13 22:45:48 UTC (rev 9108)
@@ -0,0 +1,101 @@
+##
+# Copyright (c) 2011-2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from calendarserver.push.amppush import AMPPushNotifierService, AMPPushNotifierProtocol
+from calendarserver.push.amppush import NotificationForID
+from twistedcaldav.test.util import TestCase
+from twisted.internet.defer import inlineCallbacks
+from twisted.internet.task import Clock
+
+class AMPPushNotifierServiceTests(TestCase):
+
+    @inlineCallbacks
+    def test_AMPPushNotifierService(self):
+
+        settings = {
+            "Service" : "calendarserver.push.amppush.AMPPushNotifierService",
+            "Enabled" : True,
+            "Port" : 62311,
+            "EnableStaggering" : True,
+            "StaggerSeconds" : 3,
+        }
+
+        # Set up the service
+        clock = Clock()
+        service = (yield AMPPushNotifierService.makeService(settings,
+            None, "localhost", reactor=clock))
+
+        self.assertEquals(service.subscribers, [])
+
+        client1 = TestProtocol(service)
+        client1.subscribe("token1", "/CalDAV/localhost/user01/")
+        client1.subscribe("token1", "/CalDAV/localhost/user02/")
+
+        client2 = TestProtocol(service)
+        client2.subscribe("token2", "/CalDAV/localhost/user01/")
+
+        service.addSubscriber(client1)
+        service.addSubscriber(client2)
+
+        self.assertEquals(len(service.subscribers), 2)
+
+        self.assertTrue(client1.subscribedToID("/CalDAV/localhost/user01/"))
+        self.assertTrue(client1.subscribedToID("/CalDAV/localhost/user02/"))
+        self.assertFalse(client1.subscribedToID("nonexistent"))
+
+        self.assertTrue(client2.subscribedToID("/CalDAV/localhost/user01/"))
+        self.assertFalse(client2.subscribedToID("/CalDAV/localhost/user02/"))
+
+        service.enqueue("update", "CalDAV|user01")
+        self.assertEquals(len(client1.history), 0)
+        clock.advance(1)
+        self.assertEquals(client1.history, [(NotificationForID, {'id': '/CalDAV/localhost/user01/'})])
+        self.assertEquals(len(client2.history), 0)
+        clock.advance(3)
+        self.assertEquals(client2.history, [(NotificationForID, {'id': '/CalDAV/localhost/user01/'})])
+
+        client1.reset()
+        client2.reset()
+        client2.unsubscribe("token2", "/CalDAV/localhost/user01/")
+        service.enqueue("update", "CalDAV|user01")
+        self.assertEquals(len(client1.history), 0)
+        clock.advance(1)
+        self.assertEquals(client1.history, [(NotificationForID, {'id': '/CalDAV/localhost/user01/'})])
+        self.assertEquals(len(client2.history), 0)
+        clock.advance(3)
+        self.assertEquals(len(client2.history), 0)
+
+        # Turn off staggering
+        service.scheduler = None
+        client1.reset()
+        client2.reset()
+        client2.subscribe("token2", "/CalDAV/localhost/user01/")
+        service.enqueue("update", "CalDAV|user01")
+        self.assertEquals(client1.history, [(NotificationForID, {'id': '/CalDAV/localhost/user01/'})])
+        self.assertEquals(client2.history, [(NotificationForID, {'id': '/CalDAV/localhost/user01/'})])
+
+
+class TestProtocol(AMPPushNotifierProtocol):
+
+    def __init__(self, service):
+        super(TestProtocol, self).__init__(service)
+        self.reset()
+
+    def callRemote(self, cls, **kwds):
+        self.history.append((cls, kwds))
+
+    def reset(self):
+        self.history = []

Modified: CalendarServer/trunk/calendarserver/push/test/test_applepush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/test/test_applepush.py	2012-04-13 20:13:22 UTC (rev 9107)
+++ CalendarServer/trunk/calendarserver/push/test/test_applepush.py	2012-04-13 22:45:48 UTC (rev 9108)
@@ -112,7 +112,7 @@
         # Set up the service
         clock = Clock()
         service = (yield ApplePushNotifierService.makeService(settings,
-            self.store, testConnectorClass=TestConnector, reactor=clock))
+            self.store, "localhost", testConnectorClass=TestConnector, reactor=clock))
         self.assertEquals(set(service.providers.keys()), set(["CalDAV","CardDAV"]))
         self.assertEquals(set(service.feedbacks.keys()), set(["CalDAV","CardDAV"]))
 

Modified: CalendarServer/trunk/calendarserver/push/util.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/util.py	2012-04-13 20:13:22 UTC (rev 9107)
+++ CalendarServer/trunk/calendarserver/push/util.py	2012-04-13 22:45:48 UTC (rev 9108)
@@ -154,7 +154,7 @@
         """
         self.log_debug("PushScheduler fired for %s %s" % (token, key))
         del self.outstanding[(token, key)]
-        self.callback(token, key)
+        return self.callback(token, key)
 
     def stop(self):
         """

Added: CalendarServer/trunk/calendarserver/tools/ampnotifications.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/ampnotifications.py	                        (rev 0)
+++ CalendarServer/trunk/calendarserver/tools/ampnotifications.py	2012-04-13 22:45:48 UTC (rev 9108)
@@ -0,0 +1,138 @@
+#!/usr/bin/env python
+##
+# Copyright (c) 2012 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from calendarserver.tools.cmdline import utilityMain
+from getopt import getopt, GetoptError
+from twext.python.log import Logger
+from twisted.application.service import Service
+from twisted.internet.defer import inlineCallbacks, succeed
+from twistedcaldav.config import ConfigurationError
+import os
+import sys
+
+from twisted.internet.defer import inlineCallbacks, succeed
+
+from calendarserver.push.amppush import subscribeToIDs
+
+log = Logger()
+
+def usage(e=None):
+
+    name = os.path.basename(sys.argv[0])
+    print "usage: %s [options] [pushkey ...]" % (name,)
+    print ""
+    print "  Monitor AMP Push Notifications"
+    print ""
+    print "options:"
+    print "  -h --help: print this help and exit"
+    print "  -f --config <path>: Specify caldavd.plist configuration path"
+    print "  -p --port <port>: AMP port to connect to"
+    print "  -s --server <hostname>: AMP server to connect to"
+    print ""
+
+    if e:
+        sys.stderr.write("%s\n" % (e,))
+        sys.exit(64)
+    else:
+        sys.exit(0)
+
+
+class WorkerService(Service):
+
+    def __init__(self, store):
+        self._store = store
+
+    @inlineCallbacks
+    def startService(self):
+        try:
+            yield self.doWork()
+        except ConfigurationError, ce:
+            sys.stderr.write("Error: %s\n" % (str(ce),))
+        except Exception, e:
+            sys.stderr.write("Error: %s\n" % (e,))
+            raise
+
+
+class MonitorAMPNotifications(WorkerService):
+
+    ids = []
+    hostname = None
+    port = None
+
+    def doWork(self):
+        return monitorAMPNotifications(self.hostname, self.port, self.ids)
+
+
+def main():
+
+    try:
+        (optargs, args) = getopt(
+            sys.argv[1:], "f:hp:s:", [
+                "config=",
+                "help",
+                "port=",
+                "server=",
+            ],
+        )
+    except GetoptError, e:
+        usage(e)
+
+    #
+    # Get configuration
+    #
+    configFileName = None
+    hostname = "localhost"
+    port = 62319
+
+    for opt, arg in optargs:
+        if opt in ("-h", "--help"):
+            usage()
+
+        elif opt in ("-f", "--config"):
+            configFileName = arg
+
+        elif opt in ("-p", "--port"):
+            port = int(arg)
+
+        elif opt in ("-s", "--server"):
+            hostname = arg
+
+        else:
+            raise NotImplementedError(opt)
+
+    if not args:
+        usage("Not enough arguments")
+
+
+    MonitorAMPNotifications.ids = args
+    MonitorAMPNotifications.hostname = hostname
+    MonitorAMPNotifications.port = port
+
+    utilityMain(
+        configFileName,
+        MonitorAMPNotifications,
+    )
+
+def notificationCallback(id):
+    print "Received notification for:", id
+    return succeed(True)
+
+ at inlineCallbacks
+def monitorAMPNotifications(hostname, port, ids):
+    print "Subscribing to notifications..."
+    yield subscribeToIDs(hostname, port, ids, notificationCallback)
+    print "Waiting for notifications..."


Property changes on: CalendarServer/trunk/calendarserver/tools/ampnotifications.py
___________________________________________________________________
Added: svn:executable
   + *

Modified: CalendarServer/trunk/conf/caldavd-test.plist
===================================================================
--- CalendarServer/trunk/conf/caldavd-test.plist	2012-04-13 20:13:22 UTC (rev 9107)
+++ CalendarServer/trunk/conf/caldavd-test.plist	2012-04-13 22:45:48 UTC (rev 9108)
@@ -634,6 +634,21 @@
 
       <key>Services</key>
       <dict>
+
+        <key>AMPNotifier</key>
+        <dict>
+          <key>Service</key>
+          <string>calendarserver.push.amppush.AMPPushNotifierService</string>
+          <key>Enabled</key>
+          <true/>
+          <key>Port</key>
+          <integer>62311</integer>
+          <key>EnableStaggering</key>
+          <false/>
+          <key>StaggerSeconds</key>
+          <integer>3</integer>
+        </dict>
+
         <key>SimpleLineNotifier</key>
         <dict>
           <!-- Simple line notification service (for testing) -->

Modified: CalendarServer/trunk/contrib/performance/loadtest/config.plist
===================================================================
--- CalendarServer/trunk/contrib/performance/loadtest/config.plist	2012-04-13 20:13:22 UTC (rev 9107)
+++ CalendarServer/trunk/contrib/performance/loadtest/config.plist	2012-04-13 22:45:48 UTC (rev 9108)
@@ -122,6 +122,13 @@
 						advertised. -->
 					<key>supportPush</key>
 					<false />
+
+					<key>supportAmpPush</key>
+					<true/>
+					<key>ampPushHost</key>
+					<string>localhost</string>
+					<key>ampPushPort</key>
+					<integer>62311</integer>
 				</dict>
 
 				<!-- The profiles define certain types of user behavior on top of the 

Modified: CalendarServer/trunk/contrib/performance/loadtest/ical.py
===================================================================
--- CalendarServer/trunk/contrib/performance/loadtest/ical.py	2012-04-13 20:13:22 UTC (rev 9107)
+++ CalendarServer/trunk/contrib/performance/loadtest/ical.py	2012-04-13 22:45:48 UTC (rev 9108)
@@ -48,6 +48,7 @@
 from caldavclientlibrary.protocol.caldav.definitions import csxml
 
 from calendarserver.tools.notifications import PubSubClientFactory
+from calendarserver.push.amppush import subscribeToIDs
 
 from contrib.performance.httpclient import StringProducer, readBody
 from contrib.performance.httpauth import AuthHandlerAgent
@@ -267,7 +268,8 @@
 
     email = None
 
-    def __init__(self, reactor, root, record, auth, calendarHomePollInterval=None, supportPush=True):
+    def __init__(self, reactor, root, record, auth, calendarHomePollInterval=None, supportPush=True,
+        supportAmpPush=True, ampPushHost="localhost", ampPushPort=62311):
         
         self._client_id = str(uuid4())
 
@@ -281,7 +283,11 @@
         self.calendarHomePollInterval = calendarHomePollInterval
 
         self.supportPush = supportPush
-        
+
+        self.supportAmpPush = supportAmpPush
+        self.ampPushHost = ampPushHost
+        self.ampPushPort = ampPushPort
+
         self.supportSync = self._SYNC_REPORT
 
         # Keep track of the calendars on this account, keys are
@@ -302,6 +308,8 @@
         # values.
         self.xmpp = {}
 
+        self.ampPushKeys = {}
+
         # Keep track of push factories so we can unsubscribe at shutdown
         self._pushFactories = []
 
@@ -544,7 +552,16 @@
 
             if href == calendarHome:
                 text = results[href].getTextProperties()
+
                 try:
+                    pushkey = text[csxml.pushkey]
+                except KeyError:
+                    pass
+                else:
+                    if pushkey:
+                        self.ampPushKeys[href] = pushkey
+
+                try:
                     server = text[csxml.xmpp_server]
                     uri = text[csxml.xmpp_uri]
                     pushkey = text[csxml.pushkey]
@@ -922,7 +939,24 @@
         self._pushFactories.append(factory)
         connect(GAIEndpoint(self.reactor, host, port), factory)
 
+    def _receivedPush(self, inboundID):
+        for href, id in self.ampPushKeys.iteritems():
+            if inboundID == id:
+                self._checkCalendarsForEvents(href, push=True)
+                break
+        else:
+            # somehow we are not subscribed to this id
+            pass
 
+
+    def _monitorAmpPush(self, home, pushKeys):
+        """
+        Start monitoring for AMP-based push notifications
+        """
+        subscribeToIDs(self.ampPushHost, self.ampPushPort, pushKeys,
+            self._receivedPush, self.reactor)
+
+
     @inlineCallbacks
     def _unsubscribePubSub(self):
         for factory in self._pushFactories:
@@ -954,6 +988,11 @@
             self._monitorPubSub(calendarHome, self.xmpp[calendarHome])
             # Run indefinitely.
             yield Deferred()
+        elif self.supportAmpPush and calendarHome in self.ampPushKeys:
+            pushKeys = self.ampPushKeys.values()
+            self._monitorAmpPush(calendarHome, pushKeys)
+            # Run indefinitely.
+            yield Deferred()
         else:
             # This completes when the calendar home poll loop completes, which
             # currently it never will except due to an unexpected error.

Modified: CalendarServer/trunk/twistedcaldav/notify.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/notify.py	2012-04-13 20:13:22 UTC (rev 9107)
+++ CalendarServer/trunk/twistedcaldav/notify.py	2012-04-13 22:45:48 UTC (rev 9108)
@@ -1484,7 +1484,7 @@
         for key, settings in config.Notifications.Services.iteritems():
             if settings["Enabled"]:
                 notifier = namedClass(settings["Service"]).makeService(settings,
-                    store)
+                    store, config.ServerHostName)
                 notifier.setServiceParent(multiService)
                 notifiers.append(notifier)
 
@@ -1501,7 +1501,7 @@
 class SimpleLineNotifierService(service.Service):
 
     @classmethod
-    def makeService(cls, settings, store):
+    def makeService(cls, settings, store, serverHostName):
         return cls(settings)
 
     def __init__(self, settings):
@@ -1522,7 +1522,7 @@
 class XMPPNotifierService(service.Service):
 
     @classmethod
-    def makeService(cls, settings, store):
+    def makeService(cls, settings, store, serverHostName):
         return cls(settings)
 
     def __init__(self, settings):

Modified: CalendarServer/trunk/twistedcaldav/resource.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/resource.py	2012-04-13 20:13:22 UTC (rev 9107)
+++ CalendarServer/trunk/twistedcaldav/resource.py	2012-04-13 22:45:48 UTC (rev 9108)
@@ -2347,6 +2347,7 @@
 
         elif qname == (customxml.calendarserver_namespace, "pushkey"):
             if (config.Notifications.Services.XMPPNotifier.Enabled or
+                config.Notifications.Services.AMPNotifier.Enabled or
                 config.Notifications.Services.ApplePushNotifier.Enabled):
                 nodeName = (yield self._newStoreHome.nodeName())
                 if nodeName:

Modified: CalendarServer/trunk/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/stdconfig.py	2012-04-13 20:13:22 UTC (rev 9107)
+++ CalendarServer/trunk/twistedcaldav/stdconfig.py	2012-04-13 22:45:48 UTC (rev 9108)
@@ -719,6 +719,13 @@
                     "Topic" : "",
                 },
             },
+            "AMPNotifier" : {
+                "Service" : "calendarserver.push.amppush.AMPPushNotifierService",
+                "Enabled" : True,
+                "Port" : 62311,
+                "EnableStaggering" : False,
+                "StaggerSeconds" : 3,
+            },
             "XMPPNotifier" : {
                 "Service" : "twistedcaldav.notify.XMPPNotifierService",
                 "Enabled" : False,
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20120413/b5745112/attachment-0001.html>


More information about the calendarserver-changes mailing list