[CalendarServer-changes] [10837] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Mon Mar 4 12:05:53 PST 2013


Revision: 10837
          http://trac.calendarserver.org//changeset/10837
Author:   sagen at apple.com
Date:     2013-03-04 12:05:53 -0800 (Mon, 04 Mar 2013)
Log Message:
-----------
AMP push notifications work again; the slaves forward notifications to the master which is now listening for subscriptions from clients.  Works in Master/Slave (combined) and Single mode.

Modified Paths:
--------------
    CalendarServer/trunk/calendarserver/push/amppush.py
    CalendarServer/trunk/calendarserver/push/applepush.py
    CalendarServer/trunk/calendarserver/push/notifier.py
    CalendarServer/trunk/calendarserver/push/test/test_amppush.py
    CalendarServer/trunk/calendarserver/push/test/test_applepush.py
    CalendarServer/trunk/calendarserver/push/test/test_notifier.py
    CalendarServer/trunk/calendarserver/tap/caldav.py
    CalendarServer/trunk/calendarserver/tap/util.py
    CalendarServer/trunk/conf/caldavd-apple.plist
    CalendarServer/trunk/conf/caldavd-test.plist
    CalendarServer/trunk/twistedcaldav/resource.py
    CalendarServer/trunk/twistedcaldav/stdconfig.py

Property Changed:
----------------
    CalendarServer/trunk/calendarserver/tools/
    CalendarServer/trunk/conf/

Modified: CalendarServer/trunk/calendarserver/push/amppush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/amppush.py	2013-03-04 19:07:05 UTC (rev 10836)
+++ CalendarServer/trunk/calendarserver/push/amppush.py	2013-03-04 20:05:53 UTC (rev 10837)
@@ -16,9 +16,8 @@
 
 from calendarserver.push.util import PushScheduler
 from twext.python.log import Logger, LoggingMixIn
-from twisted.application.internet import StreamServerEndpointService
 from twisted.internet.defer import inlineCallbacks, returnValue
-from twisted.internet.endpoints import TCP4ClientEndpoint, TCP4ServerEndpoint
+from twisted.internet.endpoints import TCP4ClientEndpoint 
 from twisted.internet.protocol import Factory, ServerFactory
 from twisted.protocols import amp
 import time
@@ -28,6 +27,10 @@
 log = Logger()
 
 
+# Control socket message-routing constants
+PUSH_ROUTE = "push"
+
+
 # AMP Commands sent to server
 
 class SubscribeToID(amp.Command):
@@ -40,7 +43,7 @@
     response = [('status', amp.String())]
 
 
-# AMP Commands sent to client
+# AMP Commands sent to client (and forwarded to Master)
 
 class NotificationForID(amp.Command):
     arguments = [('id', amp.String()), ('dataChangedTimestamp', amp.Integer())]
@@ -49,29 +52,88 @@
 
 # Server classes
 
+class AMPPushForwardingFactory(Factory, LoggingMixIn):
 
-class AMPPushNotifierService(StreamServerEndpointService, LoggingMixIn):
+    def __init__(self, forwarder):
+        self.forwarder = forwarder
+
+    def buildProtocol(self, addr):
+        protocol = amp.AMP()
+        self.forwarder.protocols.append(protocol)
+        return protocol
+
+class AMPPushForwarder(LoggingMixIn):
     """
+    Runs in the slaves, forwards notifications to the master via AMP
+    """
+    def __init__(self, controlSocket):
+        self.protocols = []
+        controlSocket.addFactory(PUSH_ROUTE, AMPPushForwardingFactory(self))
+
+    @inlineCallbacks
+    def enqueue(self, id, dataChangedTimestamp=None):
+        if dataChangedTimestamp is None:
+            dataChangedTimestamp = int(time.time())
+        for protocol in self.protocols:
+            yield protocol.callRemote(NotificationForID, id=id,
+                dataChangedTimestamp=dataChangedTimestamp)
+
+
+
+class AMPPushMasterListeningProtocol(amp.AMP, LoggingMixIn):
+    """
+    Listens for notifications coming in over AMP from the slaves
+    """
+    def __init__(self, master):
+        super(AMPPushMasterListeningProtocol, self).__init__()
+        self.master = master
+
+    @NotificationForID.responder
+    def enqueueFromWorker(self, id, dataChangedTimestamp=None):
+        if dataChangedTimestamp is None:
+            dataChangedTimestamp = int(time.time())
+        self.master.enqueue(id, dataChangedTimestamp=dataChangedTimestamp)
+        return {"status" : "OK"}
+ 
+
+class AMPPushMasterListenerFactory(Factory, LoggingMixIn):
+
+    def __init__(self, master):
+        self.master = master
+
+    def buildProtocol(self, addr):
+        protocol = AMPPushMasterListeningProtocol(self.master)
+        return protocol
+
+
+class AMPPushMaster(LoggingMixIn):
+    """
     AMPPushNotifierService allows clients to use AMP to subscribe to,
     and receive, change notifications.
     """
 
-    @classmethod
-    def makeService(cls, settings, ignored, reactor=None):
-        return cls(settings, reactor=reactor)
-
-    def __init__(self, settings, reactor=None):
+    def __init__(self, controlSocket, parentService, port, enableStaggering,
+        staggerSeconds, reactor=None):
         if reactor is None:
             from twisted.internet import reactor
-        factory = AMPPushNotifierFactory(self)
-        endpoint = TCP4ServerEndpoint(reactor, settings["Port"])
-        super(AMPPushNotifierService, self).__init__(endpoint, factory)
+        from twisted.application.strports import service as strPortsService
+
+        if port:
+            # Service which listens for client subscriptions and sends
+            # notifications to them
+            strPortsService(str(port), AMPPushNotifierFactory(self),
+                reactor=reactor).setServiceParent(parentService)
+
+        if controlSocket is not None:
+            # Set up the listener which gets notifications from the slaves
+            controlSocket.addFactory(PUSH_ROUTE,
+                AMPPushMasterListenerFactory(self))
+
         self.subscribers = []
-        self.dataHost = settings["DataHost"]
 
-        if settings["EnableStaggering"]:
+        if enableStaggering:
             self.scheduler = PushScheduler(reactor, self.sendNotification,
-                staggerSeconds=settings["StaggerSeconds"])
+                staggerSeconds=staggerSeconds)
         else:
             self.scheduler = None
 
@@ -83,45 +145,32 @@
         self.log_debug("Removed subscriber")
         self.subscribers.remove(p)
 
-    def enqueue(self, id, dataChangedTimestamp=None):
+    def enqueue(self, pushKey, dataChangedTimestamp=None):
         """
-        Sends an AMP push notification to any clients subscribing to this id.
+        Sends an AMP push notification to any clients subscribing to this pushKey.
 
-        @param id: The identifier of the resource that was updated, including
+        @param pushKey: The identifier of the resource that was updated, including
             a prefix indicating whether this is CalDAV or CardDAV related.
-            The prefix is separated from the id with "|", e.g.:
 
-            "CalDAV|abc/def"
+            "/CalDAV/abc/def/"
 
-            The id is an opaque token as far as this code is concerned, and
-            is used in conjunction with the prefix and the server hostname
-            to build the actual key value that devices subscribe to.
-        @type id: C{str}
+        @type pushKey: C{str}
         @param dataChangedTimestamp: Timestamp (epoch seconds) for the data change
             which triggered this notification (Only used for unit tests)
             @type key: C{int}
         """
 
-        try:
-            protocol, id = id.split("|", 1)
-        except ValueError:
-            # id has no protocol, so we can't do anything with it
-            self.log_error("Notification id '%s' is missing protocol" % (id,))
-            return
-
         # Unit tests can pass this value in; otherwise it defaults to now
         if dataChangedTimestamp is None:
             dataChangedTimestamp = int(time.time())
 
-        id = "/%s/%s/%s/" % (protocol, self.dataHost, id)
-
         tokens = []
         for subscriber in self.subscribers:
-            token = subscriber.subscribedToID(id)
+            token = subscriber.subscribedToID(pushKey)
             if token is not None:
                 tokens.append(token)
         if tokens:
-            return self.scheduleNotifications(tokens, id, dataChangedTimestamp)
+            return self.scheduleNotifications(tokens, pushKey, dataChangedTimestamp)
 
 
     @inlineCallbacks

Modified: CalendarServer/trunk/calendarserver/push/applepush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/applepush.py	2013-03-04 19:07:05 UTC (rev 10836)
+++ CalendarServer/trunk/calendarserver/push/applepush.py	2013-03-04 20:05:53 UTC (rev 10837)
@@ -88,7 +88,6 @@
         service.store = store
         service.providers = {}
         service.feedbacks = {}
-        service.dataHost = settings["DataHost"]
         service.purgeCall = None
         service.purgeIntervalSeconds = settings["SubscriptionPurgeIntervalSeconds"]
         service.purgeSeconds = settings["SubscriptionPurgeSeconds"]
@@ -177,31 +176,27 @@
 
 
     @inlineCallbacks
-    def enqueue(self, id, dataChangedTimestamp=None):
+    def enqueue(self, pushKey, dataChangedTimestamp=None):
         """
         Sends an Apple Push Notification to any device token subscribed to
-        this id.
+        this pushKey.
 
-        @param id: The identifier of the resource that was updated, including
+        @param pushKey: The identifier of the resource that was updated, including
             a prefix indicating whether this is CalDAV or CardDAV related.
-            The prefix is separated from the id with "|", e.g.:
 
-            "CalDAV|abc/def"
+            "/CalDAV/abc/def/"
 
-            The id is an opaque token as far as this code is concerned, and
-            is used in conjunction with the prefix and the server hostname
-            to build the actual key value that devices subscribe to.
-        @type id: C{str}
+        @type pushKey: C{str}
         @param dataChangedTimestamp: Timestamp (epoch seconds) for the data change
             which triggered this notification (Only used for unit tests)
         @type key: C{int}
         """
 
         try:
-            protocol, id = id.split("|", 1)
+            protocol = pushKey.split("/")[1]
         except ValueError:
-            # id has no protocol, so we can't do anything with it
-            self.log_error("Notification id '%s' is missing protocol" % (id,))
+            # pushKey has no protocol, so we can't do anything with it
+            self.log_error("Push key '%s' is missing protocol" % (pushKey,))
             return
 
         # Unit tests can pass this value in; otherwise it defaults to now
@@ -210,23 +205,22 @@
 
         provider = self.providers.get(protocol, None)
         if provider is not None:
-            key = "/%s/%s/%s/" % (protocol, self.dataHost, id)
 
             # Look up subscriptions for this key
             txn = self.store.newTransaction()
-            subscriptions = (yield txn.apnSubscriptionsByKey(key))
+            subscriptions = (yield txn.apnSubscriptionsByKey(pushKey))
             yield txn.commit()
 
             numSubscriptions = len(subscriptions)
             if numSubscriptions > 0:
                 self.log_debug("Sending %d APNS notifications for %s" %
-                    (numSubscriptions, key))
+                    (numSubscriptions, pushKey))
                 tokens = []
                 for token, uid in subscriptions:
                     if token and uid:
                         tokens.append(token)
                 if tokens:
-                    provider.scheduleNotifications(tokens, key, dataChangedTimestamp)
+                    provider.scheduleNotifications(tokens, pushKey, dataChangedTimestamp)
 
 
 

Modified: CalendarServer/trunk/calendarserver/push/notifier.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/notifier.py	2013-03-04 19:07:05 UTC (rev 10836)
+++ CalendarServer/trunk/calendarserver/push/notifier.py	2013-03-04 20:05:53 UTC (rev 10837)
@@ -24,8 +24,6 @@
 from twext.enterprise.dal.record import fromTable
 from twext.enterprise.queue import WorkItem
 from txdav.common.datastore.sql_tables import schema
-from twisted.application import service
-from twisted.python.reflect import namedClass
 
 
 log = Logger()
@@ -38,9 +36,9 @@
 
         # FIXME: Coalescing goes here?
 
-        pushService = self.transaction._pushService
-        if pushService is not None:
-            yield pushService.enqueue(self.pushID)
+        pushDistributor = self.transaction._pushDistributor
+        if pushDistributor is not None:
+            yield pushDistributor.enqueue(self.pushID)
 
 
 
@@ -130,7 +128,7 @@
     @inlineCallbacks
     def send(self, id):
         txn = self.store.newTransaction()
-        yield txn.enqueue(PushNotificationWork, pushID=id)
+        yield txn.enqueue(PushNotificationWork, pushID=self.pushKeyForId(id))
         yield txn.commit()
 
     def newNotifier(self, label="default", id=None, prefix=None):
@@ -153,22 +151,21 @@
 
 
 
-def getPubSubAPSConfiguration(id, config):
+def getPubSubAPSConfiguration(pushKey, config):
     """
-    Returns the Apple push notification settings specific to the notifier
-    ID, which includes a prefix that is either "CalDAV" or "CardDAV"
+    Returns the Apple push notification settings specific to the pushKey
     """
     try:
-        prefix, id = id.split("|", 1)
+        protocol, ignored = pushKey.split("|", 1)
     except ValueError:
-        # id has no prefix, so we can't look up APS config
+        # id has no protocol, so we can't look up APS config
         return None
 
     # If we are directly talking to apple push, advertise those settings
-    applePushSettings = config.Notifications.Services.ApplePushNotifier
+    applePushSettings = config.Notifications.Services.APNS
     if applePushSettings.Enabled:
         settings = {}
-        settings["APSBundleID"] = applePushSettings[prefix]["Topic"]
+        settings["APSBundleID"] = applePushSettings[protocol]["Topic"]
         if config.EnableSSL:
             url = "https://%s:%s/%s" % (config.ServerHostName, config.SSLPort,
                 applePushSettings.SubscriptionURL)
@@ -183,27 +180,26 @@
     return None
 
 
-class PushService(service.MultiService):
+class PushDistributor(object):
     """
-    A Service which passes along notifications to the protocol-specific subservices
+    Distributes notifications to the protocol-specific subservices
     """
 
-    @classmethod
-    def makeService(cls, settings, store):
-        multiService = cls()
-        for key, subSettings in settings.Services.iteritems():
-            if subSettings["Enabled"]:
-                subService = namedClass(subSettings["Service"]).makeService(
-                    subSettings, store)
-                subService.setServiceParent(multiService)
-                multiService.subServices.append(subService)            
-        return multiService
+    def __init__(self, observers):
+        """
+        @param observers: the list of observers to distribute pushKeys to
+        @type observers: C{list} 
+        """
+        # TODO: add an IPushObservers interface?
+        self.observers = observers 
 
-    def __init__(self):
-        service.MultiService.__init__(self)
-        self.subServices = []
-
     @inlineCallbacks
-    def enqueue(self, id):
-        for subService in self.subServices:
-            yield subService.enqueue(id)
+    def enqueue(self, pushKey):
+        """
+        Pass along enqueued pushKey to any observers
+
+        @param pushKey: the push key to distribute to the observers
+        @type pushKey: C{str}
+        """
+        for observer in self.observers:
+            yield observer.enqueue(pushKey)

Modified: CalendarServer/trunk/calendarserver/push/test/test_amppush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/test/test_amppush.py	2013-03-04 19:07:05 UTC (rev 10836)
+++ CalendarServer/trunk/calendarserver/push/test/test_amppush.py	2013-03-04 20:05:53 UTC (rev 10837)
@@ -14,30 +14,18 @@
 # limitations under the License.
 ##
 
-from calendarserver.push.amppush import AMPPushNotifierService, AMPPushNotifierProtocol
+from calendarserver.push.amppush import AMPPushMaster, AMPPushNotifierProtocol
 from calendarserver.push.amppush import NotificationForID
 from twistedcaldav.test.util import TestCase
-from twisted.internet.defer import inlineCallbacks
 from twisted.internet.task import Clock
 
-class AMPPushNotifierServiceTests(TestCase):
+class AMPPushMasterTests(TestCase):
 
-    @inlineCallbacks
-    def test_AMPPushNotifierService(self):
+    def test_AMPPushMaster(self):
 
-        settings = {
-            "Service" : "calendarserver.push.amppush.AMPPushNotifierService",
-            "Enabled" : True,
-            "Port" : 62311,
-            "EnableStaggering" : True,
-            "StaggerSeconds" : 3,
-            "DataHost" : "localhost",
-        }
-
         # Set up the service
         clock = Clock()
-        service = (yield AMPPushNotifierService.makeService(settings,
-            None, reactor=clock))
+        service = AMPPushMaster(None, None, 0, True, 3, reactor=clock)
 
         self.assertEquals(service.subscribers, [])
 
@@ -69,7 +57,7 @@
         self.assertTrue(client3.subscribedToID("/CalDAV/localhost/user03/"))
 
         dataChangedTimestamp = 1354815999
-        service.enqueue("CalDAV|user01", dataChangedTimestamp=dataChangedTimestamp)
+        service.enqueue("/CalDAV/localhost/user01/", dataChangedTimestamp=dataChangedTimestamp)
         self.assertEquals(len(client1.history), 0)
         self.assertEquals(len(client2.history), 0)
         self.assertEquals(len(client3.history), 0)
@@ -86,7 +74,7 @@
         client1.reset()
         client2.reset()
         client2.unsubscribe("token2", "/CalDAV/localhost/user01/")
-        service.enqueue("CalDAV|user01", dataChangedTimestamp=dataChangedTimestamp)
+        service.enqueue("/CalDAV/localhost/user01/", dataChangedTimestamp=dataChangedTimestamp)
         self.assertEquals(len(client1.history), 0)
         clock.advance(1)
         self.assertEquals(client1.history, [(NotificationForID, {'id': '/CalDAV/localhost/user01/', 'dataChangedTimestamp' : 1354815999})])
@@ -99,7 +87,7 @@
         client1.reset()
         client2.reset()
         client2.subscribe("token2", "/CalDAV/localhost/user01/")
-        service.enqueue("CalDAV|user01", dataChangedTimestamp=dataChangedTimestamp)
+        service.enqueue("/CalDAV/localhost/user01/", dataChangedTimestamp=dataChangedTimestamp)
         self.assertEquals(client1.history, [(NotificationForID, {'id': '/CalDAV/localhost/user01/', 'dataChangedTimestamp' : 1354815999})])
         self.assertEquals(client2.history, [(NotificationForID, {'id': '/CalDAV/localhost/user01/', 'dataChangedTimestamp' : 1354815999})])
 

Modified: CalendarServer/trunk/calendarserver/push/test/test_applepush.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/test/test_applepush.py	2013-03-04 19:07:05 UTC (rev 10836)
+++ CalendarServer/trunk/calendarserver/push/test/test_applepush.py	2013-03-04 20:05:53 UTC (rev 10837)
@@ -38,12 +38,10 @@
     def test_ApplePushNotifierService(self):
 
         settings = {
-            "Service" : "calendarserver.push.applepush.ApplePushNotifierService",
             "Enabled" : True,
             "SubscriptionURL" : "apn",
             "SubscriptionPurgeSeconds" : 24 * 60 * 60,
             "SubscriptionPurgeIntervalSeconds" : 24 * 60 * 60,
-            "DataHost" : "calendars.example.com",
             "ProviderHost" : "gateway.push.apple.com",
             "ProviderPort" : 2195,
             "FeedbackHost" : "feedback.push.apple.com",
@@ -127,7 +125,7 @@
 
         # Notification arrives from calendar server
         dataChangedTimestamp = 1354815999
-        yield service.enqueue("CalDAV|user01/calendar",
+        yield service.enqueue("/CalDAV/calendars.example.com/user01/calendar/",
             dataChangedTimestamp=dataChangedTimestamp)
 
         # The notifications should be in the queue
@@ -167,7 +165,7 @@
         # Reset sent data
         providerConnector.transport.data = None
         # Send notification while service is connected
-        yield service.enqueue("CalDAV|user01/calendar")
+        yield service.enqueue("/CalDAV/calendars.example.com/user01/calendar/")
         clock.advance(1) # so that first push is sent
         self.assertEquals(len(providerConnector.transport.data), 183)
         # Reset sent data

Modified: CalendarServer/trunk/calendarserver/push/test/test_notifier.py
===================================================================
--- CalendarServer/trunk/calendarserver/push/test/test_notifier.py	2013-03-04 19:07:05 UTC (rev 10836)
+++ CalendarServer/trunk/calendarserver/push/test/test_notifier.py	2013-03-04 20:05:53 UTC (rev 10837)
@@ -15,15 +15,12 @@
 ##
 
 from twistedcaldav.test.util import TestCase
+from calendarserver.push.notifier import PushDistributor, getPubSubAPSConfiguration
+from twisted.internet.defer import inlineCallbacks, succeed
 from twistedcaldav.config import ConfigDict
-from calendarserver.push.notifier import PushService
-from twisted.internet.defer import inlineCallbacks, succeed
-from twisted.application import service
 
-class StubService(service.Service):
-    def __init__(self, settings, store):
-        self.settings = settings
-        self.store = store
+class StubService(object):
+    def __init__(self):
         self.reset()
 
     def reset(self):
@@ -33,25 +30,42 @@
         self.history.append(id)
         return(succeed(None))
 
-    @classmethod
-    def makeService(cls, settings, store):
-        return cls(settings, store)
+class PushDistributorTests(TestCase):
 
-class PushServiceTests(TestCase):
-
     @inlineCallbacks
     def test_enqueue(self):
-        settings = ConfigDict({
-            "Services" : {
-                "Stub" : {
-                    "Service" : "calendarserver.push.test.test_notifier.StubService",
-                    "Enabled" : True,
-                    "Foo" : "Bar",
+        stub = StubService()
+        dist = PushDistributor([stub])
+        yield dist.enqueue("testing")
+        self.assertEquals(stub.history, ["testing"])
+
+    def test_getPubSubAPSConfiguration(self):
+        config = ConfigDict({
+            "EnableSSL" : True,
+            "ServerHostName" : "calendars.example.com",
+            "SSLPort" : 8443,
+            "HTTPPort" : 8008,
+            "Notifications" : {
+                "Services" : {
+                    "APNS" : {
+                        "CalDAV" : {
+                            "Topic" : "test topic",
+                        },
+                        "SubscriptionRefreshIntervalSeconds" : 42,
+                        "SubscriptionURL" : "apns",
+                        "Environment" : "prod",
+                        "Enabled" : True,
+                    },
                 },
             },
         })
-        svc = PushService.makeService(settings, None)
-        yield svc.enqueue("testing")
-        self.assertEquals(svc.subServices[0].history, ["testing"])
-
-
+        result = getPubSubAPSConfiguration("CalDAV|foo", config)
+        self.assertEquals(
+            result,
+            {
+                "SubscriptionRefreshIntervalSeconds": 42, 
+                "SubscriptionURL": "https://calendars.example.com:8443/apns", 
+                "APSBundleID": "test topic", 
+                "APSEnvironment": "prod"
+            }
+        )

Modified: CalendarServer/trunk/calendarserver/tap/caldav.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/caldav.py	2013-03-04 19:07:05 UTC (rev 10836)
+++ CalendarServer/trunk/calendarserver/tap/caldav.py	2013-03-04 20:05:53 UTC (rev 10837)
@@ -106,7 +106,9 @@
 from calendarserver.tap.util import oracleConnectorFromConfig
 from calendarserver.tap.cfgchild import ConfiguredChildSpawner
 from calendarserver.tools.util import checkDirectory
-from calendarserver.push.notifier import PushService
+from calendarserver.push.notifier import PushDistributor
+from calendarserver.push.amppush import AMPPushMaster, AMPPushForwarder
+from calendarserver.push.applepush import ApplePushNotifierService
 from twistedcaldav.scheduling.imip.inbound import MailRetriever
 
 try:
@@ -710,17 +712,56 @@
         """
         pool, txnFactory = getDBPool(config)
         store = storeFromConfig(config, txnFactory)
-        result = self.requestProcessingService(options, store)
+        logObserver = AMPCommonAccessLoggingObserver()
+        result = self.requestProcessingService(options, store, logObserver)
         directory = result.rootResource.getDirectory()
         if pool is not None:
             pool.setServiceParent(result)
 
+        if config.ControlSocket:
+            id = config.ControlSocket
+            self.log_info("Control via AF_UNIX: %s" % (id,))
+            endpointFactory = lambda reactor: UNIXClientEndpoint(
+                reactor, id)
+        else:
+            id = int(config.ControlPort)
+            self.log_info("Control via AF_INET: %d" % (id,))
+            endpointFactory = lambda reactor: TCP4ClientEndpoint(
+                reactor, "127.0.0.1", id)
+        controlSocketClient = ControlSocket()
+        class LogClient(AMP):
+            def startReceivingBoxes(self, sender):
+                super(LogClient, self).startReceivingBoxes(sender)
+                logObserver.addClient(self)
+        f = Factory()
+        f.protocol = LogClient
+        controlSocketClient.addFactory(_LOG_ROUTE, f)
+        from txdav.common.datastore.sql import CommonDataStore as SQLStore
+        if isinstance(store, SQLStore):
+            def queueMasterAvailable(connectionFromMaster):
+                store.queuer = store.queuer.transferProposalCallbacks(connectionFromMaster)
+            queueFactory = QueueWorkerFactory(store.newTransaction, schema,
+                                              queueMasterAvailable)
+            controlSocketClient.addFactory(_QUEUE_ROUTE, queueFactory)
+        controlClient = ControlSocketConnectingService(
+            endpointFactory, controlSocketClient
+        )
+        controlClient.setServiceParent(result)
+
         # Optionally set up push notifications
+        pushDistributor = None
         if config.Notifications.Enabled:
-            pushService = PushService.makeService(config.Notifications, store)
-            pushService.setServiceParent(result)
-        else:
-            pushService = None
+            observers = []
+            if config.Notifications.Services.APNS.Enabled:
+                pushSubService = ApplePushNotifierService.makeService(
+                    config.Notifications.Services.APNS, store)
+                observers.append(pushSubService)
+                pushSubService.setServiceParent(result)
+            if config.Notifications.Services.AMP.Enabled:
+                pushSubService = AMPPushForwarder(controlSocketClient)
+                observers.append(pushSubService)
+            if observers:
+                pushDistributor = PushDistributor(observers)
 
         # Optionally set up mail retrieval
         if config.Scheduling.iMIP.Enabled:
@@ -731,7 +772,7 @@
             mailRetriever = None
 
         def decorateTransaction(txn):
-            txn._pushService = pushService
+            txn._pushDistributor = pushDistributor
             txn._rootResource = result.rootResource
             txn._mailRetriever = mailRetriever
 
@@ -762,7 +803,7 @@
         return result
 
 
-    def requestProcessingService(self, options, store):
+    def requestProcessingService(self, options, store, logObserver):
         """
         Make a service that will actually process HTTP requests.
 
@@ -786,52 +827,8 @@
         #
         self.log_info("Setting up service")
 
-        bonusServices = []
-
-        if config.ProcessType == "Slave":
-            logObserver = AMPCommonAccessLoggingObserver()
-
-            if config.ControlSocket:
-                id = config.ControlSocket
-                self.log_info("Control via AF_UNIX: %s" % (id,))
-                endpointFactory = lambda reactor: UNIXClientEndpoint(
-                    reactor, id)
-            else:
-                id = int(config.ControlPort)
-                self.log_info("Control via AF_INET: %d" % (id,))
-                endpointFactory = lambda reactor: TCP4ClientEndpoint(
-                    reactor, "127.0.0.1", id)
-            controlSocketClient = ControlSocket()
-            class LogClient(AMP):
-                def startReceivingBoxes(self, sender):
-                    super(LogClient, self).startReceivingBoxes(sender)
-                    logObserver.addClient(self)
-            f = Factory()
-            f.protocol = LogClient
-            controlSocketClient.addFactory(_LOG_ROUTE, f)
-            from txdav.common.datastore.sql import CommonDataStore as SQLStore
-            if isinstance(store, SQLStore):
-                def queueMasterAvailable(connectionFromMaster):
-                    store.queuer = store.queuer.transferProposalCallbacks(connectionFromMaster)
-                queueFactory = QueueWorkerFactory(store.newTransaction, schema,
-                                                  queueMasterAvailable)
-                controlSocketClient.addFactory(_QUEUE_ROUTE, queueFactory)
-            controlClient = ControlSocketConnectingService(
-                endpointFactory, controlSocketClient
-            )
-            bonusServices.append(controlClient)
-        elif config.ProcessType == "Single":
-            # Make sure no old socket files are lying around.
-            self.deleteStaleSocketFiles()
-            logObserver = RotatingFileAccessLoggingObserver(
-                config.AccessLogFile,
-            )
-
         self.log_info("Configuring access log observer: %s" % (logObserver,))
-
         service = CalDAVService(logObserver)
-        for bonus in bonusServices:
-            bonus.setServiceParent(service)
 
         rootResource = getRootResource(config, store, additional)
         service.rootResource = rootResource
@@ -1018,13 +1015,57 @@
         Create a service to be used in a single-process, stand-alone
         configuration.
         """
-        def slaveSvcCreator(pool, store):
-            return self.requestProcessingService(options, store)
+        def slaveSvcCreator(pool, store, logObserver):
+            result = self.requestProcessingService(options, store, logObserver)
 
+            # Optionally set up push notifications
+            pushDistributor = None
+            if config.Notifications.Enabled:
+                observers = []
+                if config.Notifications.Services.APNS.Enabled:
+                    pushSubService = ApplePushNotifierService.makeService(
+                        config.Notifications.Services.APNS, store)
+                    observers.append(pushSubService)
+                    pushSubService.setServiceParent(result)
+                if config.Notifications.Services.AMP.Enabled:
+                    pushSubService = AMPPushMaster(None, result,
+                        config.Notifications.Services.AMP.Port,
+                        config.Notifications.Services.AMP.EnableStaggering,
+                        config.Notifications.Services.AMP.StaggerSeconds
+                        )
+                    observers.append(pushSubService)
+                if observers:
+                    pushDistributor = PushDistributor(observers)
+
+            # Optionally set up mail retrieval
+            if config.Scheduling.iMIP.Enabled:
+                directory = result.rootResource.getDirectory()
+                mailRetriever = MailRetriever(store, directory,
+                    config.Scheduling.iMIP.Receiving)
+                mailRetriever.setServiceParent(result)
+            else:
+                mailRetriever = None
+
+            def decorateTransaction(txn):
+                txn._pushDistributor = pushDistributor
+                txn._rootResource = result.rootResource
+                txn._mailRetriever = mailRetriever
+
+            store.callWithNewTransactions(decorateTransaction)
+
+            return result 
+
         uid, gid = getSystemIDs(config.UserName, config.GroupName)
-        return self.storageService(slaveSvcCreator, uid=uid, gid=gid)
 
+        # Make sure no old socket files are lying around.
+        self.deleteStaleSocketFiles()
+        logObserver = RotatingFileAccessLoggingObserver(
+            config.AccessLogFile,
+        )
 
+        return self.storageService(slaveSvcCreator, logObserver, uid=uid, gid=gid)
+
+
     def makeService_Utility(self, options):
         """
         Create a service to be used in a command-line utility
@@ -1033,14 +1074,14 @@
         When created, that service will have access to the storage facilities.
         """
 
-        def toolServiceCreator(pool, store):
+        def toolServiceCreator(pool, store, ignored):
             return config.UtilityServiceClass(store)
 
         uid, gid = getSystemIDs(config.UserName, config.GroupName)
-        return self.storageService(toolServiceCreator, uid=uid, gid=gid)
+        return self.storageService(toolServiceCreator, None, uid=uid, gid=gid)
 
 
-    def storageService(self, createMainService, uid=None, gid=None):
+    def storageService(self, createMainService, logObserver, uid=None, gid=None):
         """
         If necessary, create a service to be started used for storage; for
         example, starting a database backend.  This service will then start the
@@ -1079,7 +1120,7 @@
                                     maxConnections=config.MaxDBConnectionsPerPool)
                 cp.setServiceParent(ms)
                 store = storeFromConfig(config, cp.connection)
-                mainService = createMainService(cp, store)
+                mainService = createMainService(cp, store, logObserver)
                 if config.SharedConnectionPool:
                     dispenser = ConnectionDispenser(cp)
                 else:
@@ -1153,7 +1194,7 @@
                 raise UsageError("Unknown database type %r" (config.DBType,))
         else:
             store = storeFromConfig(config, None)
-            return createMainService(None, store)
+            return createMainService(None, store, logObserver)
 
 
     def makeService_Combined(self, options):
@@ -1196,6 +1237,17 @@
 
         controlSocket = ControlSocket()
         controlSocket.addFactory(_LOG_ROUTE, logger)
+
+        # Optionally set up AMPPushMaster
+        if config.Notifications.Enabled and config.Notifications.Services.AMP.Enabled:
+            ampSettings = config.Notifications.Services.AMP
+            AMPPushMaster(
+                controlSocket,
+                s,
+                ampSettings["Port"],
+                ampSettings["EnableStaggering"],
+                ampSettings["StaggerSeconds"]
+            )
         if config.ControlSocket:
             controlSocketService = GroupOwnedUNIXServer(
                 gid, config.ControlSocket, controlSocket, mode=0660
@@ -1338,7 +1390,7 @@
         # to), and second, the service which does an upgrade from the
         # filesystem to the database (if that's necessary, and there is
         # filesystem data in need of upgrading).
-        def spawnerSvcCreator(pool, store):
+        def spawnerSvcCreator(pool, store, ignored):
             from twisted.internet import reactor
             pool = PeerConnectionPool(reactor, store.newTransaction,
                                       7654, schema)

Modified: CalendarServer/trunk/calendarserver/tap/util.py
===================================================================
--- CalendarServer/trunk/calendarserver/tap/util.py	2013-03-04 19:07:05 UTC (rev 10836)
+++ CalendarServer/trunk/calendarserver/tap/util.py	2013-03-04 20:05:53 UTC (rev 10837)
@@ -654,7 +654,7 @@
     #
     # Apple Push Notification Subscriptions
     #
-    apnConfig = config.Notifications.Services["ApplePushNotifier"]
+    apnConfig = config.Notifications.Services.APNS
     if apnConfig.Enabled:
         log.info("Setting up APNS resource at /%s" %
             (apnConfig["SubscriptionURL"],))


Property changes on: CalendarServer/trunk/calendarserver/tools
___________________________________________________________________
Modified: svn:ignore
   - *.pyc

   + *.pyc
config.py.edited



Property changes on: CalendarServer/trunk/conf
___________________________________________________________________
Modified: svn:ignore
   - caldavd-dev.plist

   + caldavd-dev.plist
caldavd-apns.plist
caldavd-corpds-ldap.plist.latest
caldavd-included.plist
caldavd-ldap-separate-groupcacher-memcache.plist
caldavd-server.plist
caldavd-with-imip.plist


Modified: CalendarServer/trunk/conf/caldavd-apple.plist
===================================================================
--- CalendarServer/trunk/conf/caldavd-apple.plist	2013-03-04 19:07:05 UTC (rev 10836)
+++ CalendarServer/trunk/conf/caldavd-apple.plist	2013-03-04 20:05:53 UTC (rev 10837)
@@ -355,30 +355,6 @@
 
       <key>Services</key>
       <dict>
-        <key>XMPPNotifier</key>
-        <dict>
-          <!-- XMPP notification service -->
-          <key>Service</key>
-          <string>twistedcaldav.notify.XMPPNotifierService</string>
-          <key>Enabled</key>
-          <false/>
-
-          <!-- XMPP host and port to contact -->
-          <key>Host</key>
-          <string>xmpp.host.name</string>
-          <key>Port</key>
-          <integer>5222</integer>
-
-          <!-- Jabber ID and password for the server -->
-          <key>JID</key>
-          <string>jid at xmpp.host.name/resource</string>
-          <key>Password</key>
-          <string>password_goes_here</string>
-
-          <!-- PubSub service address -->
-          <key>ServiceAddress</key>
-          <string>pubsub.xmpp.host.name</string>
-        </dict>
       </dict>
     </dict>
 

Modified: CalendarServer/trunk/conf/caldavd-test.plist
===================================================================
--- CalendarServer/trunk/conf/caldavd-test.plist	2013-03-04 19:07:05 UTC (rev 10836)
+++ CalendarServer/trunk/conf/caldavd-test.plist	2013-03-04 20:05:53 UTC (rev 10837)
@@ -656,19 +656,11 @@
       <key>CoalesceSeconds</key>
       <integer>3</integer>
 
-      <key>InternalNotificationHost</key>
-      <string>localhost</string>
-
-      <key>InternalNotificationPort</key>
-      <integer>62309</integer>
-
       <key>Services</key>
       <dict>
 
-        <key>AMPNotifier</key>
+        <key>AMP</key>
         <dict>
-          <key>Service</key>
-          <string>calendarserver.push.amppush.AMPPushNotifierService</string>
           <key>Enabled</key>
           <false/>
           <key>Port</key>
@@ -679,81 +671,6 @@
           <integer>3</integer>
         </dict>
 
-        <key>SimpleLineNotifier</key>
-        <dict>
-          <!-- Simple line notification service (for testing) -->
-          <key>Service</key>
-          <string>twistedcaldav.notify.SimpleLineNotifierService</string>
-          <key>Enabled</key>
-          <false/>
-          <key>Port</key>
-          <integer>62308</integer>
-        </dict>
-
-        <key>XMPPNotifier</key>
-        <dict>
-          <!-- XMPP notification service -->
-          <key>Service</key>
-          <string>twistedcaldav.notify.XMPPNotifierService</string>
-          <key>Enabled</key>
-          <false/>
-
-          <!-- XMPP host and port to contact -->
-          <key>Host</key>
-          <string>xmpp.host.name</string>
-          <key>Port</key>
-          <integer>5222</integer>
-
-          <!-- Jabber ID and password for the server -->
-          <key>JID</key>
-          <string>jid at xmpp.host.name/resource</string>
-          <key>Password</key>
-          <string>password_goes_here</string>
-
-          <!-- PubSub service address -->
-          <key>ServiceAddress</key>
-          <string>pubsub.xmpp.host.name</string>
-
-          <!-- Apple-specific config -->
-          <key>CalDAV</key>
-          <dict>
-              <key>APSBundleID</key>
-              <string></string>
-              <key>SubscriptionURL</key>
-              <string></string>
-          </dict>
-          <key>CardDAV</key>
-          <dict>
-              <key>APSBundleID</key>
-              <string></string>
-              <key>SubscriptionURL</key>
-              <string></string>
-          </dict>
-
-          <key>NodeConfiguration</key>
-          <dict>
-            <key>pubsub#deliver_payloads</key>
-            <string>1</string>
-            <key>pubsub#persist_items</key>
-            <string>1</string>
-          </dict>
-
-          <!-- Sends a presence notification to XMPP server at this interval (prevents disconnect) -->
-          <key>KeepAliveSeconds</key>
-          <integer>120</integer>
-
-          <!-- Sends a pubsub publish to a particular heartbeat node at this interval -->
-          <key>HeartbeatMinutes</key>
-          <integer>30</integer>
-
-          <!-- List of glob-like expressions defining which XMPP JIDs can converse with the server (for debugging) -->
-          <key>AllowedJIDs</key>
-          <array>
-            <!--
-            <string>*.example.com</string>
-             -->
-          </array>
-        </dict>
       </dict>
     </dict>
 

Modified: CalendarServer/trunk/twistedcaldav/resource.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/resource.py	2013-03-04 19:07:05 UTC (rev 10836)
+++ CalendarServer/trunk/twistedcaldav/resource.py	2013-03-04 20:05:53 UTC (rev 10837)
@@ -2320,7 +2320,7 @@
 
         elif qname == (customxml.calendarserver_namespace, "push-transports"):
 
-            if config.Notifications.Services.ApplePushNotifier.Enabled:
+            if config.Notifications.Services.APNS.Enabled:
 
                 nodeName = (yield self._newStoreHome.nodeName())
                 if nodeName:
@@ -2355,8 +2355,8 @@
             returnValue(None)
 
         elif qname == (customxml.calendarserver_namespace, "pushkey"):
-            if (config.Notifications.Services.AMPNotifier.Enabled or
-                config.Notifications.Services.ApplePushNotifier.Enabled):
+            if (config.Notifications.Services.AMP.Enabled or
+                config.Notifications.Services.APNS.Enabled):
                 nodeName = (yield self._newStoreHome.nodeName())
                 if nodeName:
                     returnValue(customxml.PubSubXMPPPushKeyProperty(nodeName))

Modified: CalendarServer/trunk/twistedcaldav/stdconfig.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/stdconfig.py	2013-03-04 19:07:05 UTC (rev 10836)
+++ CalendarServer/trunk/twistedcaldav/stdconfig.py	2013-03-04 20:05:53 UTC (rev 10837)
@@ -736,14 +736,12 @@
         "CoalesceSeconds" : 3,
 
         "Services" : {
-            "ApplePushNotifier" : {
-                "Service" : "calendarserver.push.applepush.ApplePushNotifierService",
+            "APNS" : {
                 "Enabled" : False,
                 "SubscriptionURL" : "apns",
                 "SubscriptionRefreshIntervalSeconds" : 2 * 24 * 60 * 60, # How often the client should re-register (2 days)
                 "SubscriptionPurgeIntervalSeconds" : 12 * 60 * 60, # How often a purge is done (12 hours)
                 "SubscriptionPurgeSeconds" : 14 * 24 * 60 * 60, # How old a subscription must be before it's purged (14 days)
-                "DataHost" : "",
                 "ProviderHost" : "gateway.push.apple.com",
                 "ProviderPort" : 2195,
                 "FeedbackHost" : "feedback.push.apple.com",
@@ -767,13 +765,11 @@
                     "Topic" : "",
                 },
             },
-            "AMPNotifier" : {
-                "Service" : "calendarserver.push.amppush.AMPPushNotifierService",
+            "AMP" : {
                 "Enabled" : False,
                 "Port" : 62311,
                 "EnableStaggering" : False,
                 "StaggerSeconds" : 3,
-                "DataHost" : "",
             },
         }
     },
@@ -1354,16 +1350,9 @@
     else:
         configDict.Notifications["Enabled"] = False
 
-    for _ignore_key, service in configDict.Notifications["Services"].iteritems():
+    for key, service in configDict.Notifications["Services"].iteritems():
 
-        if (
-            service["Service"] == "calendarserver.push.applepush.ApplePushNotifierService" and
-            service["Enabled"]
-        ):
-            # The default for apple push DataHost is ServerHostName
-            if service["DataHost"] == "":
-                service["DataHost"] = configDict.ServerHostName
-
+        if (key == "APNS" and service["Enabled"]):
             # Retrieve APN topics from certificates if not explicitly set
             for protocol, accountName in (
                 ("CalDAV", "apns:com.apple.calendar"),
@@ -1392,16 +1381,8 @@
                     # The password doesn't exist in the keychain.
                     log.info("%s APN certificate passphrase not found in keychain" % (protocol,))
 
-        if (
-            service["Service"] == "calendarserver.push.amppush.AMPPushNotifierService" and
-            service["Enabled"]
-        ):
-            # The default for apple push DataHost is ServerHostName
-            if service["DataHost"] == "":
-                service["DataHost"] = configDict.ServerHostName
 
 
-
 def _updateScheduling(configDict, reloading=False):
     #
     # Scheduling
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130304/092bd785/attachment-0001.html>


More information about the calendarserver-changes mailing list