[CalendarServer-changes] [2690] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Fri Jul 11 11:37:40 PDT 2008


Revision: 2690
          http://trac.macosforge.org/projects/calendarserver/changeset/2690
Author:   wsanchez at apple.com
Date:     2008-07-11 11:37:40 -0700 (Fri, 11 Jul 2008)
Log Message:
-----------
Merge:
 http://svn.calendarserver.org/repository/calendarserver/CalendarServer/branches/users/sagen/notifications
 http://svn.calendarserver.org/repository/calendarserver/CalendarServer/branches/users/sagen/xmpp

Adds Notification Service.

Modified Paths:
--------------
    CalendarServer/trunk/conf/caldavd-test.plist
    CalendarServer/trunk/conf/caldavd.plist
    CalendarServer/trunk/twisted/plugins/caldav.py
    CalendarServer/trunk/twistedcaldav/cache.py
    CalendarServer/trunk/twistedcaldav/cluster.py
    CalendarServer/trunk/twistedcaldav/config.py
    CalendarServer/trunk/twistedcaldav/customxml.py
    CalendarServer/trunk/twistedcaldav/static.py
    CalendarServer/trunk/twistedcaldav/tap.py

Added Paths:
-----------
    CalendarServer/trunk/twistedcaldav/notify.py
    CalendarServer/trunk/twistedcaldav/test/test_notify.py

Modified: CalendarServer/trunk/conf/caldavd-test.plist
===================================================================
--- CalendarServer/trunk/conf/caldavd-test.plist	2008-07-11 18:09:51 UTC (rev 2689)
+++ CalendarServer/trunk/conf/caldavd-test.plist	2008-07-11 18:37:40 UTC (rev 2690)
@@ -405,6 +405,66 @@
 
 
   <!--
+    Notifications
+  -->
+
+  <key>Notifications</key>
+  <dict>
+    <key>CoalesceSeconds</key>
+    <integer>10</integer>
+    <key>InternalNotificationHost</key>
+    <string>localhost</string>
+    <key>InternalNotificationPort</key>
+    <integer>62309</integer>
+
+    <key>Services</key>
+    <array>
+      <dict>
+        <!-- Simple notification service (for testing) -->
+        <key>Service</key>
+        <string>twistedcaldav.notify.SimpleLineNotifierService</string>
+        <key>Enabled</key>
+        <false/>
+        <key>Port</key>
+        <integer>62308</integer>
+      </dict>
+
+      <dict>
+        <!-- XMPP notification service -->
+        <key>Service</key>
+        <string>twistedcaldav.notify.XMPPNotifierService</string>
+        <key>Enabled</key>
+        <false/>
+
+        <!-- XMPP host and port to contact -->
+        <key>Host</key>
+        <string>xmpp.host.name</string>
+        <key>Port</key>
+        <integer>5222</integer>
+
+        <!-- Jabber ID and password for the server -->
+        <key>JID</key>
+        <string>jid at xmpp.host.name/resource</string>
+        <key>Password</key>
+        <string>password_goes_here</string>
+
+        <!-- PubSub service address -->
+        <key>ServiceAddress</key>
+        <string>pubsub.xmpp.host.name</string>
+
+        <!-- Sends a presence notification to XMPP server at this interval (prevents disconnect) -->
+        <key>KeepAliveSeconds</key>
+        <integer>120</integer>
+
+        <!-- Sends messages to this account for debugging -->
+        <key>TestJID</key>
+        <string>otherjid at xmpp.host.name</string>
+      </dict>
+    </array>
+  </dict>
+
+
+  <!--
     Miscellaneous items
   -->
 

Modified: CalendarServer/trunk/conf/caldavd.plist
===================================================================
--- CalendarServer/trunk/conf/caldavd.plist	2008-07-11 18:09:51 UTC (rev 2689)
+++ CalendarServer/trunk/conf/caldavd.plist	2008-07-11 18:37:40 UTC (rev 2690)
@@ -277,6 +277,45 @@
 
 
   <!--
+    Notifications
+  -->
+
+  <key>Notifications</key>
+  <dict>
+    <key>Services</key>
+    <array>
+      <dict>
+        <!-- XMPP notification service -->
+        <key>Service</key>
+        <string>twistedcaldav.notify.XMPPNotifierService</string>
+        <key>Enabled</key>
+        <false/>
+
+        <!-- XMPP host and port to contact -->
+        <key>Host</key>
+        <string>xmpp.host.name</string>
+        <key>Port</key>
+        <integer>5222</integer>
+
+        <!-- Jabber ID and password for the server -->
+        <key>JID</key>
+        <string>jid at xmpp.host.name/resource</string>
+        <key>Password</key>
+        <string>password_goes_here</string>
+
+        <!-- PubSub service address -->
+        <key>ServiceAddress</key>
+        <string>pubsub.xmpp.host.name</string>
+
+        <!-- Sends a presence notification to XMPP server at this interval (prevents disconnect) -->
+        <key>KeepAliveSeconds</key>
+        <integer>120</integer>
+      </dict>
+    </array>
+  </dict>
+
+
+  <!--
     Miscellaneous items
   -->
 

Modified: CalendarServer/trunk/twisted/plugins/caldav.py
===================================================================
--- CalendarServer/trunk/twisted/plugins/caldav.py	2008-07-11 18:09:51 UTC (rev 2689)
+++ CalendarServer/trunk/twisted/plugins/caldav.py	2008-07-11 18:37:40 UTC (rev 2690)
@@ -30,3 +30,5 @@
 
 
 TwistedCalDAV = TAP('twistedcaldav.tap.CalDAVServiceMaker')
+
+CalDAVNotifier = TAP('twistedcaldav.notify.NotificationServiceMaker')

Modified: CalendarServer/trunk/twistedcaldav/cache.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/cache.py	2008-07-11 18:09:51 UTC (rev 2689)
+++ CalendarServer/trunk/twistedcaldav/cache.py	2008-07-11 18:37:40 UTC (rev 2690)
@@ -29,8 +29,11 @@
 
 from twistedcaldav.log import LoggingMixIn
 from twistedcaldav.memcachepool import CachePoolUserMixIn
+from twistedcaldav.config import config
 
+from twistedcaldav.notify import NotificationClientUserMixIn
 
+
 class DisabledCacheNotifier(object):
     def __init__(self, *args, **kwargs):
         pass
@@ -62,7 +65,14 @@
 
 
 
-class MemcacheChangeNotifier(LoggingMixIn, CachePoolUserMixIn):
+#
+# FIXME: This should be a generic notifier class, not specific to
+# memcache, as evidenced by the addition of the sendNotification()
+# addition in changed() below.
+#
+class MemcacheChangeNotifier(LoggingMixIn, CachePoolUserMixIn,
+    NotificationClientUserMixIn):
+
     def __init__(self, resource, cachePool=None):
         self._resource = resource
         self._cachePool = cachePool
@@ -78,9 +88,16 @@
 
         return: A L{Deferred} that fires when the token has been changed.
         """
-        self.log_debug("Changing Cache Token for %r" % (self._resource.url(),))
+
+        url = self._resource.url()
+
+        if config.Notifications["Enabled"]:
+            self.log_debug("Notifications are enabled: %s" % (url,))
+            self.sendNotification(url)
+
+        self.log_debug("Changing Cache Token for %r" % (url,))
         return self.getCachePool().set(
-            'cacheToken:%s' % (self._resource.url(),),
+            'cacheToken:%s' % (url,),
             self._newCacheToken())
 
 

Modified: CalendarServer/trunk/twistedcaldav/cluster.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/cluster.py	2008-07-11 18:09:51 UTC (rev 2689)
+++ CalendarServer/trunk/twistedcaldav/cluster.py	2008-07-11 18:37:40 UTC (rev 2690)
@@ -309,7 +309,18 @@
 
         monitor.addProcess('memcached', memcachedArgv, env=parentEnv)
 
+    if (config.Notifications["Enabled"] and
+        config.Notifications["InternalNotificationHost"] == "localhost"):
+        log.msg("Adding notification service")
 
+        notificationsArgv = [
+            config.Twisted['twistd'],
+            '-n', 'caldav_notifier',
+            '-f', options['config'],
+        ]
+        monitor.addProcess('notifications', notificationsArgv, env=parentEnv)
+
+
     logger = AMPLoggingFactory(
         RotatingFileAccessLoggingObserver(config.AccessLogFile))
 

Modified: CalendarServer/trunk/twistedcaldav/config.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/config.py	2008-07-11 18:09:51 UTC (rev 2689)
+++ CalendarServer/trunk/twistedcaldav/config.py	2008-07-11 18:37:40 UTC (rev 2690)
@@ -175,6 +175,34 @@
     "EnableTimezoneService" : False, # Timezone service
 
     #
+    # Notifications
+    #
+    "Notifications" : {
+        "Enabled": False,
+        "CoalesceSeconds" : 10,
+        "InternalNotificationHost" : "localhost",
+        "InternalNotificationPort" : 62309,
+
+        "Services" : [
+            {
+                "Service" : "twistedcaldav.notify.SimpleLineNotifierService",
+                "Enabled" : False,
+                "Port" : 62308,
+            },
+            {
+                "Service" : "twistedcaldav.notify.XMPPNotifierService",
+                "Enabled" : False,
+                "Host" : "", # "xmpp.host.name"
+                "Port" : 5222,
+                "JID" : "", # "jid at xmpp.host.name/resource"
+                "Password" : "",
+                "ServiceAddress" : "", # "pubsub.xmpp.host.name"
+                "KeepAliveSeconds" : 120,
+            },
+        ]
+    },
+
+    #
     # Implementation details
     #
     #    The following are specific to how the server is built, and useful
@@ -250,6 +278,7 @@
             self.updateDropBox,
             self.updateLogLevels,
             self.updateThreadPoolSize,
+            self.updateNotifications,
         ]
 
     def __str__(self):
@@ -441,6 +470,29 @@
             configDict = _cleanup(configDict)
             self.update(configDict)
 
+    @staticmethod
+    def updateNotifications(self, items):
+        #
+        # Notifications
+        #
+        for service in self.Notifications["Services"]:
+            if service["Enabled"]:
+                self.Notifications["Enabled"] = True
+                break
+        else:
+            self.Notifications["Enabled"] = False
+
+        for service in self.Notifications["Services"]:
+            if (
+                service["Service"] == "twistedcaldav.notify.XMPPNotifierService" and
+                service["Enabled"]
+            ):
+                for key, value in service.iteritems():
+                    if not value:
+                        raise ConfigurationError("Invalid %s for XMPPNotifierService: %r"
+                                                 % (key, value))
+
+
 def _mergeData(oldData, newData):
     for key, value in newData.iteritems():
         if isinstance(value, (dict,)):

Modified: CalendarServer/trunk/twistedcaldav/customxml.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/customxml.py	2008-07-11 18:09:51 UTC (rev 2689)
+++ CalendarServer/trunk/twistedcaldav/customxml.py	2008-07-11 18:37:40 UTC (rev 2690)
@@ -244,6 +244,17 @@
     namespace = calendarserver_namespace
     name = "utc-offset"
 
+class PubSubXMPPURIProperty (davxml.WebDAVTextElement):
+    """
+    A calendarhomefile property to indicate the pubsub XMPP URI to subscribe to
+    for notifications.
+    """
+    namespace = calendarserver_namespace
+    name = "xmpp-uri"
+    protected = True
+
+
+
 ##
 # Extensions to davxml.ResourceType
 ##

Copied: CalendarServer/trunk/twistedcaldav/notify.py (from rev 2689, CalendarServer/branches/users/sagen/notifications/twistedcaldav/notify.py)
===================================================================
--- CalendarServer/trunk/twistedcaldav/notify.py	                        (rev 0)
+++ CalendarServer/trunk/twistedcaldav/notify.py	2008-07-11 18:37:40 UTC (rev 2690)
@@ -0,0 +1,897 @@
+##
+# Copyright (c) 2005-2008 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Notification framework for Calendar Server
+
+This module implements client code which is executed within the context of
+icalserver itself, and also server code (the "notification server") which is
+run as a separate process, launched as part of "./run".
+
+The notification server process is implemented as a twistd plugin
+(with a tapname of "caldav_notifier"), and is comprised of two
+services -- one handling the internal channel between icalserver
+and notification server, the other handling the external channel
+between notification server and a remote consumer.
+
+The icalserver tap creates a NotificationClient object at startup;
+it deals with passing along notifications to the notification server.
+These notifications originate from cache.py:MemcacheChangeNotifier.changed().
+"""
+
+# TODO: bindAddress to local
+# TODO: add CalDAVTester test for examining new xmpp-uri property
+# TODO: auto-registration and roster management for XMPP
+
+import os
+from twisted.internet import reactor, protocol
+from twisted.protocols import basic
+from twisted.plugin import IPlugin
+from twisted.application import internet, service
+from twisted.python.usage import Options, UsageError
+from twisted.python.reflect import namedClass
+from twisted.words.protocols.jabber import xmlstream
+from twisted.words.protocols.jabber.jid import JID
+from twisted.words.protocols.jabber.client import BasicAuthenticator, IQ
+from twisted.words.xish import domish
+from twistedcaldav.log import LoggingMixIn
+from twistedcaldav.config import config, parseConfig, defaultConfig
+from zope.interface import Interface, implements
+
+__all__ = [
+    "Coalescer",
+    "getNotificationClient",
+    "getPubSubConfiguration",
+    "getPubSubPath",
+    "getPubSubXMPPURI",
+    "INotifier",
+    "installNotificationClient",
+    "InternalNotificationFactory",
+    "InternalNotificationProtocol",
+    "NotificationClient",
+    "NotificationClientFactory",
+    "NotificationClientLineProtocol",
+    "NotificationClientUserMixIn",
+    "NotificationOptions",
+    "NotificationServiceMaker",
+    "SimpleLineNotificationFactory",
+    "SimpleLineNotificationProtocol",
+    "SimpleLineNotifier",
+    "SimpleLineNotifierService",
+    "XMPPNotificationFactory",
+    "XMPPNotifier",
+]
+
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Classes used within calendarserver itself
+#
+
+class NotificationClientUserMixIn(object):
+    """
+    Notification Client User (Mixin)
+
+    Provides a method to send change notifications to the L{NotificationClient}.
+    """
+
+    def sendNotification(self, uri):
+        getNotificationClient().send(uri)
+
+
+
+class NotificationClientLineProtocol(basic.LineReceiver, LoggingMixIn):
+    """
+    Notification Client Line Protocol
+
+    Sends updates to the notification server.
+    """
+
+    def connectionMade(self):
+        self.client.addObserver(self)
+        self.factory.connectionMade()
+
+    def connectionLost(self, reason):
+        self.client.removeObserver(self)
+
+
+class NotificationClientFactory(protocol.ReconnectingClientFactory,
+    LoggingMixIn):
+    """
+    Notification Client Factory
+
+    Sends updates to the notification server.
+    """
+
+    protocol = NotificationClientLineProtocol
+
+    def __init__(self, client):
+        self.connected = False
+        self.client = client
+
+    def clientConnectionLost(self, connector, reason):
+        self.log_error("Connect to notification server lost: %s" %
+            (reason,))
+        self.connected = False
+        protocol.ReconnectingClientFactory.clientConnectionLost(self,
+            connector, reason)
+
+    def clientConnectionFailed(self, connector, reason):
+        self.log_error("Unable to connect to notification server: %s" %
+            (reason,))
+        self.connected = False
+        protocol.ReconnectingClientFactory.clientConnectionFailed(self,
+            connector, reason)
+
+    def connectionMade(self):
+        self.connected = True
+        self.resetDelay()
+        self.client.connectionMade()
+
+    def isReady(self):
+        return self.connected
+
+    def buildProtocol(self, addr):
+        p = self.protocol()
+        p.factory = self
+        p.client = self.client
+        return p
+
+
+class NotificationClient(LoggingMixIn):
+    """
+    Notification Client
+
+    Forwards on notifications from NotificationClientUserMixIns to the
+    notification server.  A NotificationClient is installed by the tap at
+    startup.
+    """
+
+    def __init__(self, reactor, host, port):
+        self.factory = None
+        self.reactor = reactor
+        self.host = host
+        self.port = port
+        self.observers = set()
+        self.queued = set()
+
+    def send(self, uri):
+        if self.factory is None:
+            self.factory = NotificationClientFactory(self)
+            self.reactor.connectTCP(self.host, self.port, self.factory)
+            self.log_debug("Creating factory")
+
+        if self.factory.isReady() and self.observers:
+            for observer in self.observers:
+                self.log_debug("Sending to notification server: %s" % (uri,))
+                observer.sendLine(str(uri))
+        else:
+            self.log_debug("Queing: %s" % (uri,))
+            self.queued.add(uri)
+
+    def connectionMade(self):
+        if self.factory.isReady() and self.observers:
+            for observer in self.observers:
+                for uri in self.queued:
+                    self.log_debug("Sending from queue: %s" % (uri,))
+                    observer.sendLine(str(uri))
+            self.queued.clear()
+
+    def addObserver(self, observer):
+        self.observers.add(observer)
+
+    def removeObserver(self, observer):
+        self.observers.remove(observer)
+
+
+_notificationClient = None
+
+def installNotificationClient(reactor, host, port, klass=NotificationClient):
+    global _notificationClient
+    _notificationClient = klass(reactor, host, port)
+
+def getNotificationClient():
+    return _notificationClient
+
+
+
+
+
+
+
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Classes used within Notification Server
+#
+
+#
+# Internal Channel (from icalserver to notification server)
+#
+
+class InternalNotificationProtocol(basic.LineReceiver):
+    """
+    InternalNotificationProtocol
+
+    Receives notifications from the calendar server.
+    """
+
+    def lineReceived(self, line):
+        val = str(line.strip())
+        self.factory.coalescer.add(val)
+
+
+class InternalNotificationFactory(protocol.ServerFactory):
+    """
+    Internal Notification Factory
+
+    Receives notifications from the calendar server.
+    """
+
+    protocol = InternalNotificationProtocol
+
+    def __init__(self, notifiers, delaySeconds=None):
+        self.coalescer = Coalescer(notifiers, delaySeconds=delaySeconds)
+
+
+
+class Coalescer(LoggingMixIn):
+    """
+    Coalescer
+
+    A queue which hangs on to incoming uris for some period of time before
+    passing them along to the external notifier listening for these updates.
+    A chatty CalDAV client can make several changes in a short period of time,
+    and the Coalescer buffers the external clients somewhat.
+    """
+
+    delaySeconds = 5
+
+    def __init__(self, notifiers, reactor=None, delaySeconds=None):
+
+        if delaySeconds:
+            self.delaySeconds = delaySeconds
+
+        if reactor is None:
+            from twisted.internet import reactor
+        self.reactor = reactor
+
+        self.uris = dict()
+        self.notifiers = notifiers
+
+    def add(self, uri):
+        delayed = self.uris.get(uri, None)
+        if delayed and delayed.active():
+            delayed.reset(self.delaySeconds)
+        else:
+            self.uris[uri] = self.reactor.callLater(self.delaySeconds,
+                self.delayedEnqueue, uri)
+
+    def delayedEnqueue(self, uri):
+        for notifier in self.notifiers:
+            notifier.enqueue(uri)
+
+
+
+#
+# External Channel (from notification server to other consumers)
+#
+
+class INotifier(Interface):
+    """
+    Notifier Interface
+
+    Defines an enqueue method that Notifier classes need to implement.
+    """
+
+    def enqueue(uri):
+        """
+        Let's the notifier object know that a change has been made for this
+        uri, and enough time has passed to allow for coalescence.
+
+        @type uri: C{str}
+        """
+
+
+class SimpleLineNotifier(object):
+    """
+    Simple Line Notifier
+
+    Listens for uris from the coalescer and writes them out to any
+    connected clients.  Each line is simply a sequence number, a
+    space, and a uri string.  If the external client sends a sequence
+    number, this notifier will send notification lines for each uri
+    that was changed since that sequence number was originally sent.
+    A history of such sequence numbers is stored in a python dict.
+    If the external client sends a zero, then the history is cleared
+    and the next sequence number to use is reset to 1.
+
+    The sequence number is stored as a python long which means it has
+    essentially infinite precision.  We discussed rolling over at the
+    64-bit boundary, but even if we limit the sequence number to a 64-bit
+    signed integer (or 2^63), and we had 100,000 users generating the
+    maximum number of notifications (which by default is 6/minute since
+    we're coalescing over 10 seconds), it would take 29 million years to
+    rollover.
+    """
+
+    implements(INotifier)
+
+    def __init__(self, settings):
+        self.reset()
+        self.observers = set()
+        self.sentReset = False
+
+    def enqueue(self, uri):
+
+        self.latestSeq += 1L
+
+        # Update history
+        self.history[uri] = self.latestSeq
+
+        for observer in self.observers:
+            observer.sendLine("%d %s" % (self.latestSeq, uri))
+
+    def reset(self):
+        self.latestSeq = 0L
+        self.history = { } # keys=uri, values=sequenceNumber
+
+    def playback(self, observer, oldSeq):
+
+        hist = self.history
+        toSend = [(hist[uri], uri) for uri in hist if hist[uri] > oldSeq]
+        toSend.sort() # sorts the tuples based on numeric sequence number
+
+        for seq, uri in toSend:
+            observer.sendLine("%d %s" % (seq, str(uri)))
+
+
+    def addObserver(self, observer):
+        self.observers.add(observer)
+
+    def removeObserver(self, observer):
+        self.observers.remove(observer)
+
+    def connectionMade(self, observer):
+        if not self.sentReset:
+            observer.sendLine("0")
+            self.sentReset = True
+
+
+class SimpleLineNotificationProtocol(basic.LineReceiver, LoggingMixIn):
+    """
+    Simple Line Notification Protocol
+
+    Sends notifications to external consumers.  Also responds to history-
+    playback requests.  If an integer is received from an external consumer,
+    it is interpreted as a sequence number; all notifications sent since that
+    sequence number was sent are resent.
+    """
+
+    def connectionMade(self):
+        # we just received a connection from the outside; if it's the first
+        # since we started running, it means we need to let them know that
+        # a reset has happened.  This assumes that only one connection will
+        # be made to this channel; if we end up having multiple consumers
+        # of this protocol, we would need to uniquely identify them.
+        self.notifier.connectionMade(self)
+
+    def lineReceived(self, line):
+        val = line.strip()
+
+        # Should be a number requesting all updates since that sequence
+        try:
+            oldSeq = int(val)
+        except ValueError, e:
+            self.log_warn("Error parsing %s: %s (from %s)" % (val, e,
+                self.transport.getPeer()))
+            return
+
+        if oldSeq == 0:
+            self.notifier.reset()
+        else:
+            self.notifier.playback(self, oldSeq)
+
+    def connectionLost(self, reason):
+        self.notifier.removeObserver(self)
+
+
+class SimpleLineNotificationFactory(protocol.ServerFactory):
+    """
+    Simple Line Notification Factory
+
+    Sends notifications to external consumers.
+    """
+
+    protocol = SimpleLineNotificationProtocol
+
+    def __init__(self, notifier):
+        self.notifier = notifier
+
+    def buildProtocol(self, addr):
+        p = self.protocol()
+        self.notifier.addObserver(p)
+        p.notifier = self.notifier
+        return p
+
+
+
+
+
+
+
+
+class XMPPNotifier(LoggingMixIn):
+    """
+    XMPP Notifier
+
+    Uses pubsub XMPP requests to let subscribers know when there
+    has been a change made to a DAV resource (currently just
+    CalendarHomeFiles).  Uses XMPP login info from the config file
+    to determine which pubsub service to connect to.  When it's
+    time to send a notification, XMPPNotifier computes a node path
+    corresponding to the DAV resource and emits a publish request
+    for that node.  If the request comes back 404 XMPPNotifier will
+    create the node and then go through the configuration process,
+    followed by a publish retry.
+
+    For monitoring purposes, you can specify a "TestJID" value in
+    the config file; XMPPNotifier will send error messages to that
+    JID.  If you also want to receive non-error, debug messages,
+    send the calendar server JID the message, "debug on".  Send
+    "help" for other commands.  Note, XMPPNotifier doesn't yet
+    handle registration or roster management, so you'll need to set
+    up the JID accounts out-of-band, using another XMPP client, for
+    example.
+
+    """
+
+    implements(INotifier)
+
+    pubsubNS = 'http://jabber.org/protocol/pubsub'
+
+    nodeConf = {
+        'pubsub#deliver_payloads' : '0',
+        'pubsub#persist_items' : '0',
+    }
+
+    def __init__(self, settings, reactor=None, configOverride=None):
+        self.xmlStream = None
+        self.settings = settings
+        if reactor is None:
+            from twisted.internet import reactor
+        self.reactor = reactor
+        self.config = configOverride or config
+
+        self.sendDebugMessages = False
+
+    def enqueue(self, uri):
+        if self.xmlStream is not None:
+            # Convert uri to node
+            nodeName = self.uriToNodeName(uri)
+            self.publishNode(nodeName)
+
+    def uriToNodeName(self, uri):
+        return getPubSubPath(uri, getPubSubConfiguration(self.config))
+
+    def publishNode(self, nodeName):
+        if self.xmlStream is not None:
+            iq = IQ(self.xmlStream)
+            pubsubElement = iq.addElement('pubsub', defaultUri=self.pubsubNS)
+            publishElement = pubsubElement.addElement('publish')
+            publishElement['node'] = nodeName
+            iq.addCallback(self.responseFromPublish, nodeName)
+            iq.send(to=self.settings['ServiceAddress'])
+
+    def responseFromPublish(self, nodeName, iq):
+        if iq['type'] == 'result':
+            self.sendDebug("Node publish successful (%s)" % (nodeName,), iq)
+        else:
+            self.log_error("PubSub node publish error: %s" %
+                (iq.toXml().encode('ascii', 'replace')),)
+            self.sendDebug("Node publish failed (%s)" % (nodeName,), iq)
+
+            errorElement = None
+            pubsubElement = None
+            for child in iq.elements():
+                if child.name == 'error':
+                    errorElement = child
+                if child.name == 'pubsub':
+                    pubsubElement = child
+
+            if errorElement:
+                if errorElement['code'] == '400':
+                    self.requestConfigurationForm(nodeName)
+
+                elif errorElement['code'] == '404':
+                    self.createNode(nodeName)
+
+    def createNode(self, nodeName):
+        if self.xmlStream is not None:
+            iq = IQ(self.xmlStream)
+            pubsubElement = iq.addElement('pubsub', defaultUri=self.pubsubNS)
+            child = pubsubElement.addElement('create')
+            child['node'] = nodeName
+            iq.addCallback(self.responseFromCreate, nodeName)
+            iq.send(to=self.settings['ServiceAddress'])
+
+    def responseFromCreate(self, nodeName, iq):
+        if iq['type'] == 'result':
+            self.sendDebug("Node creation successful (%s)" % (nodeName,), iq)
+            # now time to configure; fetch the form
+            self.requestConfigurationForm(nodeName)
+        else:
+            self.log_error("PubSub node creation error: %s" %
+                (iq.toXml().encode('ascii', 'replace')),)
+            self.sendError("Node creation failed (%s)" % (nodeName,), iq)
+
+    def requestConfigurationForm(self, nodeName):
+        if self.xmlStream is not None:
+            iq = IQ(self.xmlStream, type='get')
+            child = iq.addElement('pubsub', defaultUri=self.pubsubNS+"#owner")
+            child = child.addElement('configure')
+            child['node'] = nodeName
+            iq.addCallback(self.responseFromConfigurationForm, nodeName)
+            iq.send(to=self.settings['ServiceAddress'])
+
+    def _getChild(self, element, name):
+        for child in element.elements():
+            if child.name == name:
+                return child
+        return None
+
+    def responseFromConfigurationForm(self, nodeName, iq):
+        if iq['type'] == 'result':
+            self.sendDebug("Received configuration form (%s)" % (nodeName,), iq)
+            pubsubElement = self._getChild(iq, 'pubsub')
+            if pubsubElement:
+                configureElement = self._getChild(pubsubElement, 'configure')
+                if configureElement:
+                    formElement = configureElement.firstChildElement()
+                    if formElement['type'] == 'form':
+                        # We've found the form; start building a response
+                        filledIq = IQ(self.xmlStream, type='set')
+                        filledPubSub = filledIq.addElement('pubsub',
+                            defaultUri=self.pubsubNS+"#owner")
+                        filledConfigure = filledPubSub.addElement('configure')
+                        filledConfigure['node'] = nodeName
+                        filledForm = filledConfigure.addElement('x',
+                            defaultUri='jabber:x:data')
+                        filledForm['type'] = 'submit'
+
+                        for field in formElement.elements():
+                            if field.name == 'field':
+                                value = self.nodeConf.get(field['var'], None)
+                                if value is not None:
+                                    valueElement = self._getChild(field,
+                                        'value')
+                                    valueElement.children = []
+                                    valueElement.addContent(value)
+                            filledForm.addChild(field)
+                        filledIq.addCallback(self.responseFromConfiguration,
+                            nodeName)
+                        filledIq.send(to=self.settings['ServiceAddress'])
+        else:
+            self.log_error("PubSub configuration form request error: %s" %
+                (iq.toXml().encode('ascii', 'replace')),)
+            self.sendError("Failed to receive configuration form (%s)" % (nodeName,), iq)
+
+
+    def responseFromConfiguration(self, nodeName, iq):
+        if iq['type'] == 'result':
+            self.log_debug("PubSub node %s is configured" % (nodeName,))
+            self.sendDebug("Configured node (%s)" % (nodeName,), iq)
+            self.publishNode(nodeName)
+
+        else:
+            self.log_error("PubSub node configuration error: %s" %
+                (iq.toXml().encode('ascii', 'replace')),)
+            self.sendError("Failed to configure node (%s)" % (nodeName,), iq)
+
+
+    def streamOpened(self, xmlStream):
+        self.xmlStream = xmlStream
+        xmlStream.addObserver('/message', self.handleMessage)
+
+    def streamClosed(self):
+        self.xmlStream = None
+
+    def sendDebug(self, txt, element):
+        if self.sendDebugMessages:
+            testJid = self.settings.get("TestJID", "")
+            if testJid:
+                txt = "DEBUG: %s %s" % (txt, element.toXml().encode('ascii',
+                    'replace'))
+                self.sendAlert(testJid, txt)
+
+    def sendError(self, txt, element):
+        testJid = self.settings.get("TestJID", "")
+        if testJid:
+            txt = "ERROR: %s %s" % (txt, element.toXml().encode('ascii',
+                'replace'))
+            self.sendAlert(testJid, txt)
+
+    def sendAlert(self, jid, txt):
+        if self.xmlStream is not None:
+            message = domish.Element(('jabber:client', 'message'))
+            message['to'] = JID(jid).full()
+            message.addElement('body', content=txt)
+            self.xmlStream.send(message)
+
+    def handleMessage(self, iq):
+        body = getattr(iq, 'body', None)
+        if body:
+            response = None
+            txt = str(body).lower()
+            if txt == "help":
+                response = "debug on, debug off"
+            elif txt == "debug on":
+                self.sendDebugMessages = True
+                response = "Debugging on"
+            elif txt == "debug off":
+                self.sendDebugMessages = False
+                response = "Debugging off"
+            else:
+                response = "I don't understand.  Try 'help'."
+
+            if response:
+                message = domish.Element(('jabber:client', 'message'))
+                message['to'] = JID(iq['from']).full()
+                message.addElement('body', content=response)
+                self.xmlStream.send(message)
+
+
+
+
+class XMPPNotificationFactory(xmlstream.XmlStreamFactory, LoggingMixIn):
+
+    def __init__(self, notifier, settings, reactor=None):
+        self.notifier = notifier
+        self.settings = settings
+        self.jid = settings['JID']
+        self.keepAliveSeconds = settings.get('KeepAliveSeconds', 120)
+        self.xmlStream = None
+        self.presenceCall = None
+        if reactor is None:
+            from twisted.internet import reactor
+        self.reactor = reactor
+
+        xmlstream.XmlStreamFactory.__init__(self,
+            BasicAuthenticator(JID(self.jid), settings['Password']))
+
+        self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self.connected)
+        self.addBootstrap(xmlstream.STREAM_END_EVENT, self.disconnected)
+        self.addBootstrap(xmlstream.INIT_FAILED_EVENT, self.initFailed)
+
+        self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self.authenticated)
+        self.addBootstrap(BasicAuthenticator.INVALID_USER_EVENT,
+            self.authFailed)
+        self.addBootstrap(BasicAuthenticator.AUTH_FAILED_EVENT,
+            self.authFailed)
+
+    def connected(self, xmlStream):
+        self.xmlStream = xmlStream
+        self.log_info("XMPP connection successful")
+        # Log all traffic
+        xmlStream.rawDataInFn = self.rawDataIn
+        xmlStream.rawDataOutFn = self.rawDataOut
+
+    def disconnected(self, xmlStream):
+        self.notifier.streamClosed()
+        self.xmlStream = None
+        if self.presenceCall is not None:
+            self.presenceCall.cancel()
+            self.presenceCall = None
+        self.log_info("XMPP disconnected")
+
+    def initFailed(self, failure):
+        self.xmlStream = None
+        self.log_info("XMPP Initialization failed: %s" % (failure,))
+
+    def authenticated(self, xmlStream):
+        self.log_info("XMPP authentication successful: %s" % (self.jid,))
+        # xmlStream.addObserver('/message', self.handleMessage)
+        self.sendPresence()
+        self.notifier.streamOpened(xmlStream)
+
+    def authFailed(self, e):
+        self.log_error("Failed to log in XMPP (%s); check JID and password" %
+            (self.jid,))
+
+    def sendPresence(self):
+        if self.xmlStream is not None:
+            presence = domish.Element(('jabber:client', 'presence'))
+            self.xmlStream.send(presence)
+            self.presenceCall = self.reactor.callLater(self.keepAliveSeconds,
+                self.sendPresence)
+
+    def rawDataIn(self, buf):
+        self.log_debug("RECV: %s" % unicode(buf, 'utf-8').encode('ascii',
+            'replace'))
+
+    def rawDataOut(self, buf):
+        self.log_debug("SEND: %s" % unicode(buf, 'utf-8').encode('ascii',
+            'replace'))
+
+
+def getPubSubConfiguration(config):
+    # TODO: Should probably cache this
+    results = { 'enabled' : False }
+
+    # return the first enabled xmpp service settings in the config file
+    for settings in config.Notifications["Services"]:
+        if (settings["Service"] == "twistedcaldav.notify.XMPPNotifierService"
+            and settings["Enabled"]):
+            results['enabled'] = True
+            results['service'] = settings['ServiceAddress']
+            results['host'] = config.ServerHostName
+            results['port'] = config.SSLPort or config.HTTPPort
+
+    return results
+
+def getPubSubPath(uri, pubSubConfiguration):
+    return ("/Public/CalDAV/%s/%d/%s/" % (pubSubConfiguration['host'],
+        pubSubConfiguration['port'], uri.strip("/")))
+
+def getPubSubXMPPURI(uri, pubSubConfiguration):
+    return "xmpp:%s?pubsub;node=%s" % (pubSubConfiguration['service'],
+        getPubSubPath(uri, pubSubConfiguration))
+
+#
+# Notification Server service config
+#
+
+class NotificationOptions(Options):
+    optParameters = [[
+        "config", "f", "/etc/caldavd/caldavd.plist", "Path to configuration file."
+    ]]
+
+    def __init__(self, *args, **kwargs):
+        super(NotificationOptions, self).__init__(*args, **kwargs)
+
+        self.overrides = {}
+
+    def _coerceOption(self, configDict, key, value):
+        """
+        Coerce the given C{val} to type of C{configDict[key]}
+        """
+        if key in configDict:
+            if isinstance(configDict[key], bool):
+                value = value == "True"
+
+            elif isinstance(configDict[key], (int, float, long)):
+                value = type(configDict[key])(value)
+
+            elif isinstance(configDict[key], (list, tuple)):
+                value = value.split(',')
+
+            elif isinstance(configDict[key], dict):
+                raise UsageError(
+                    "Dict options not supported on the command line"
+                )
+
+            elif value == 'None':
+                value = None
+
+        return value
+
+    def _setOverride(self, configDict, path, value, overrideDict):
+        """
+        Set the value at path in configDict
+        """
+        key = path[0]
+
+        if len(path) == 1:
+            overrideDict[key] = self._coerceOption(configDict, key, value)
+            return
+
+        if key in configDict:
+            if not isinstance(configDict[key], dict):
+                raise UsageError(
+                    "Found intermediate path element that is not a dictionary"
+                )
+
+            if key not in overrideDict:
+                overrideDict[key] = {}
+
+            self._setOverride(
+                configDict[key], path[1:],
+                value, overrideDict[key]
+            )
+
+
+    def opt_option(self, option):
+        """
+        Set an option to override a value in the config file. True, False, int,
+        and float options are supported, as well as comma seperated lists. Only
+        one option may be given for each --option flag, however multiple
+        --option flags may be specified.
+        """
+
+        if "=" in option:
+            path, value = option.split('=')
+            self._setOverride(
+                defaultConfig,
+                path.split('/'),
+                value,
+                self.overrides
+            )
+        else:
+            self.opt_option('%s=True' % (option,))
+
+    opt_o = opt_option
+
+    def postOptions(self):
+        parseConfig(self['config'])
+        config.updateDefaults(self.overrides)
+
+
+class NotificationServiceMaker(object):
+    implements(IPlugin, service.IServiceMaker)
+
+    tapname = "caldav_notifier"
+    description = "Notification Server"
+    options = NotificationOptions
+
+    def makeService(self, options):
+
+        multiService = service.MultiService()
+
+        notifiers = []
+        for settings in config.Notifications["Services"]:
+            if settings["Enabled"]:
+                notifier = namedClass(settings["Service"])(settings)
+                notifier.setServiceParent(multiService)
+                notifiers.append(notifier)
+
+        internet.TCPServer(
+            config.Notifications["InternalNotificationPort"],
+            InternalNotificationFactory(notifiers,
+                delaySeconds=config.Notifications["CoalesceSeconds"])
+        ).setServiceParent(multiService)
+
+        return multiService
+
+
+class SimpleLineNotifierService(service.Service):
+
+    def __init__(self, settings):
+        self.notifier = SimpleLineNotifier(settings)
+        self.server = internet.TCPServer(settings["Port"],
+            SimpleLineNotificationFactory(self.notifier))
+
+    def enqueue(self, uri):
+        self.notifier.enqueue(uri)
+
+    def startService(self):
+        self.server.startService()
+
+    def stopService(self):
+        self.server.stopService()
+
+
+class XMPPNotifierService(service.Service):
+
+    def __init__(self, settings):
+        self.notifier = XMPPNotifier(settings)
+        self.client = internet.TCPClient(settings["Host"], settings["Port"],
+            XMPPNotificationFactory(self.notifier, settings))
+
+    def enqueue(self, uri):
+        self.notifier.enqueue(uri)
+
+    def startService(self):
+        self.client.startService()
+
+    def stopService(self):
+        self.client.stopService()
+

Modified: CalendarServer/trunk/twistedcaldav/static.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/static.py	2008-07-11 18:09:51 UTC (rev 2689)
+++ CalendarServer/trunk/twistedcaldav/static.py	2008-07-11 18:37:40 UTC (rev 2690)
@@ -73,6 +73,9 @@
 
 from twistedcaldav.cache import DisabledCacheNotifier, PropfindCacheMixin
 
+from twistedcaldav.notify import getPubSubConfiguration, getPubSubPath
+from twistedcaldav.notify import getPubSubXMPPURI
+
 log = Logger()
 
 class CalDAVFile (CalDAVResource, DAVFile):
@@ -559,6 +562,10 @@
     """
     cacheNotifierFactory = DisabledCacheNotifier
 
+    liveProperties = CalDAVFile.liveProperties + (
+        (customxml.calendarserver_namespace, "xmpp-uri"),
+    )
+
     def __init__(self, path, parent, record):
         """
         @param path: the path to the file which will back the resource.
@@ -602,6 +609,23 @@
         return super(CalendarHomeFile, self).getChild(name)
 
 
+    def readProperty(self, property, request):
+        if type(property) is tuple:
+            qname = property
+        else:
+            qname = property.qname()
+
+        if qname == (customxml.calendarserver_namespace, "xmpp-uri"):
+            pubSubConfiguration = getPubSubConfiguration(config)
+            if pubSubConfiguration['enabled']:
+                return succeed(customxml.PubSubXMPPURIProperty(
+                    getPubSubXMPPURI(self.url(), pubSubConfiguration)))
+            else:
+                return succeed(customxml.PubSubXMPPURIProperty())
+
+        return super(CalendarHomeFile, self).readProperty(property, request)
+
+
 class ScheduleFile (AutoProvisioningFileMixIn, CalDAVFile):
     def __init__(self, path, parent):
         super(ScheduleFile, self).__init__(path, principalCollections=parent.principalCollections())

Modified: CalendarServer/trunk/twistedcaldav/tap.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/tap.py	2008-07-11 18:09:51 UTC (rev 2689)
+++ CalendarServer/trunk/twistedcaldav/tap.py	2008-07-11 18:37:40 UTC (rev 2690)
@@ -55,6 +55,7 @@
 from twistedcaldav.timezones import TimezoneCache
 from twistedcaldav import pdmonster
 from twistedcaldav import memcachepool
+from twistedcaldav.notify import installNotificationClient
 
 log = Logger()
 
@@ -491,6 +492,14 @@
                 config.Memcached["MaxClients"])
 
         #
+        # Configure NotificationClient
+        #
+        if config.Notifications["Enabled"]:
+            installNotificationClient(reactor,
+                config.Notifications["InternalNotificationHost"],
+                config.Notifications["InternalNotificationPort"])
+
+        #
         # Setup Resource hierarchy
         #
 

Copied: CalendarServer/trunk/twistedcaldav/test/test_notify.py (from rev 2689, CalendarServer/branches/users/sagen/notifications/twistedcaldav/test/test_notify.py)
===================================================================
--- CalendarServer/trunk/twistedcaldav/test/test_notify.py	                        (rev 0)
+++ CalendarServer/trunk/twistedcaldav/test/test_notify.py	2008-07-11 18:37:40 UTC (rev 2690)
@@ -0,0 +1,485 @@
+##
+# Copyright (c) 2008 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from copy import deepcopy
+
+from twisted.trial.unittest import TestCase
+from twisted.internet.task import Clock
+from twisted.words.protocols.jabber.client import IQ
+from twistedcaldav.notify import *
+from twistedcaldav import config as config_mod
+from twistedcaldav.config import Config
+
+
+
+class NotificationClientUserTests(TestCase):
+
+    class NotificationClientUser(NotificationClientUserMixIn):
+        pass
+
+    def test_installNoficationClient(self):
+        self.assertEquals(getNotificationClient(), None)
+        self.clock = Clock()
+        installNotificationClient(self.clock, None, None,
+            klass=StubNotificationClient)
+        notificationClient = getNotificationClient()
+        self.assertNotEquals(notificationClient, None)
+
+        clientUser = self.NotificationClientUser()
+        clientUser.sendNotification("a")
+        self.assertEquals(notificationClient.lines, ["a"])
+
+
+class NotificationClientFactoryTests(TestCase):
+
+    def setUp(self):
+        self.client = StubNotificationClient(None, None, None)
+        self.factory = NotificationClientFactory(self.client)
+        self.factory.protocol = StubNotificationClientProtocol
+
+    def test_connect(self):
+        self.assertEquals(self.factory.isReady(), False)
+        protocol = self.factory.buildProtocol(None)
+        protocol.connectionMade()
+        self.assertEquals(self.client.observers, set([protocol]))
+        self.assertEquals(self.factory.isReady(), True)
+
+        protocol.connectionLost(None)
+        self.assertEquals(self.client.observers, set())
+        self.assertEquals(self.factory.isReady(), False)
+
+
+class StubNotificationClient(object):
+
+    def __init__(self, reactor, host, port):
+        self.lines = []
+        self.observers = set()
+
+    def send(self, uri):
+        self.lines.append(uri)
+
+    def addObserver(self, observer):
+        self.observers.add(observer)
+
+    def removeObserver(self, observer):
+        self.observers.remove(observer)
+
+    def connectionMade(self):
+        pass
+
+class StubNotificationClientProtocol(object):
+
+    def __init__(self):
+        self.lines = []
+
+    def sendLine(self, line):
+        self.lines.append(line)
+
+    def connectionMade(self):
+        self.client.addObserver(self)
+        self.factory.connectionMade()
+
+    def connectionLost(self, reason):
+        self.client.removeObserver(self)
+        self.factory.connected = False
+
+
+class NotificationClientTests(TestCase):
+
+    def setUp(self):
+        self.client = NotificationClient(Clock(), None, None)
+        self.client.factory = StubNotificationClientFactory()
+
+    def test_sendWhileNotConnected(self):
+        self.client.send("a")
+        self.assertEquals(self.client.queued, set(["a"]))
+
+    def test_sendWhileConnected(self):
+        protocol = StubNotificationClientProtocol()
+        self.client.addObserver(protocol)
+        self.client.factory.connected = True
+        self.client.send("a")
+        self.assertEquals(self.client.queued, set())
+        self.assertEquals(protocol.lines, ["a"])
+
+    def test_sendQueue(self):
+        self.client.send("a")
+        self.assertEquals(self.client.queued, set(["a"]))
+        protocol = StubNotificationClientProtocol()
+        self.client.addObserver(protocol)
+        self.client.factory.connected = True
+        self.client.connectionMade()
+        self.assertEquals(protocol.lines, ["a"])
+        self.assertEquals(self.client.queued, set())
+
+
+class StubNotificationClientFactory(object):
+
+    def __init__(self):
+        self.connected = False
+
+    def isReady(self):
+        return self.connected
+
+
+class CoalescerTests(TestCase):
+
+    def setUp(self):
+        self.clock = Clock()
+        self.notifier = StubNotifier()
+        self.coalescer = Coalescer([self.notifier], reactor=self.clock)
+
+    def test_delayedNotifications(self):
+        self.coalescer.add("A")
+        self.assertEquals(self.notifier.notifications, [])
+        self.clock.advance(5)
+        self.assertEquals(self.notifier.notifications, ["A"])
+
+    def test_removeDuplicates(self):
+        self.coalescer.add("A")
+        self.coalescer.add("A")
+        self.clock.advance(5)
+        self.assertEquals(self.notifier.notifications, ["A"])
+
+
+class StubNotifier(object):
+
+    def __init__(self):
+        self.notifications = []
+        self.observers = set()
+        self.playbackHistory = []
+
+    def enqueue(self, uri):
+        self.notifications.append(uri)
+
+    def playback(self, protocol, old_seq):
+        self.playbackHistory.append((protocol, old_seq))
+
+    def addObserver(self, observer):
+        self.observers.add(observer)
+
+    def removeObserver(self, observer):
+        self.observers.remove(observer)
+
+
+class SimpleLineNotifierTests(TestCase):
+
+    def setUp(self):
+        self.clock = Clock()
+        self.notifier = SimpleLineNotifier(None)
+        self.coalescer = Coalescer([self.notifier], reactor=self.clock)
+
+    def test_initialConnection(self):
+        protocol = StubProtocol()
+        self.notifier.addObserver(protocol)
+        self.notifier.connectionMade(protocol)
+        self.assertEquals(protocol.lines, ["0"])
+
+    def test_subsequentConnection(self):
+        protocol = StubProtocol()
+        self.notifier.addObserver(protocol)
+        self.notifier.connectionMade(protocol)
+        protocol.lines = []
+        self.notifier.connectionMade(protocol)
+        self.assertEquals(protocol.lines, [])
+
+    def test_send(self):
+        protocol = StubProtocol()
+        self.notifier.addObserver(protocol)
+        self.notifier.enqueue("A")
+        self.assertEquals(protocol.lines, ["1 A"])
+
+    def test_incrementSequence(self):
+        protocol = StubProtocol()
+        self.notifier.addObserver(protocol)
+        self.notifier.enqueue("A")
+        self.notifier.enqueue("B")
+        self.assertEquals(protocol.lines, ["1 A", "2 B"])
+
+    def test_addObserver(self):
+        protocol = StubProtocol()
+        self.notifier.addObserver(protocol)
+        self.notifier.enqueue("A")
+        self.assertEquals(protocol.lines, ["1 A"])
+
+    def test_removeObserver(self):
+        protocol = StubProtocol()
+        self.notifier.addObserver(protocol)
+        self.notifier.removeObserver(protocol)
+        self.notifier.enqueue("A")
+        self.assertEquals(protocol.lines, [])
+
+    def test_multipleObservers(self):
+        protocol1 = StubProtocol()
+        protocol2 = StubProtocol()
+        self.notifier.addObserver(protocol1)
+        self.notifier.addObserver(protocol2)
+        self.notifier.enqueue("A")
+        self.assertEquals(protocol1.lines, ["1 A"])
+        self.assertEquals(protocol2.lines, ["1 A"])
+
+    def test_duplicateObservers(self):
+        protocol = StubProtocol()
+        self.notifier.addObserver(protocol)
+        self.notifier.addObserver(protocol)
+        self.notifier.enqueue("A")
+        self.assertEquals(protocol.lines, ["1 A"])
+
+    def test_playback(self):
+        self.notifier.enqueue("A")
+        self.notifier.enqueue("B")
+        self.notifier.enqueue("C")
+        protocol = StubProtocol()
+        self.notifier.addObserver(protocol)
+        self.notifier.playback(protocol, 1)
+        self.assertEquals(protocol.lines, ["2 B", "3 C"])
+
+    def test_reset(self):
+        self.notifier.enqueue("A")
+        self.assertEquals(self.notifier.history, {"A" : 1})
+        self.assertEquals(self.notifier.latestSeq, 1)
+        self.notifier.reset()
+        self.assertEquals(self.notifier.history, {})
+        self.assertEquals(self.notifier.latestSeq, 0)
+        
+
+class SimpleLineNotificationFactoryTests(TestCase):
+
+    def test_buildProtocol(self):
+        notifier = StubNotifier()
+        factory = SimpleLineNotificationFactory(notifier)
+        protocol = factory.buildProtocol(None)
+        self.assertEquals(protocol.notifier, notifier)
+        self.assertIn(protocol, notifier.observers)
+
+
+class SimpleLineNotificationProtocolTests(TestCase):
+
+    def setUp(self):
+        self.notifier = StubNotifier()
+        self.protocol = SimpleLineNotificationProtocol()
+        self.protocol.notifier = self.notifier
+        self.protocol.transport = StubTransport()
+        self.notifier.addObserver(self.protocol)
+
+    def test_connectionLost(self):
+        self.protocol.connectionLost(None)
+        self.assertNotIn(self.protocol, self.notifier.observers)
+
+    def test_lineReceived(self):
+        self.protocol.lineReceived("2")
+        self.assertEquals(self.notifier.playbackHistory, [(self.protocol, 2)])
+
+    def test_lineReceivedInvalid(self):
+        self.protocol.lineReceived("bogus")
+        self.assertEquals(self.notifier.playbackHistory, [])
+
+
+
+class StubProtocol(object):
+
+    def __init__(self):
+        self.lines = []
+
+    def sendLine(self, line):
+        self.lines.append(line)
+
+
+class StubTransport(object):
+
+    def getPeer(self):
+        return "peer"
+
+
+
+
+
+
+class StubXmlStream(object):
+
+    def __init__(self):
+        self.elements = []
+
+    def send(self, element):
+        self.elements.append(element)
+
+    def addOnetimeObserver(*args, **kwds):
+        pass
+
+    def addObserver(*args, **kwds):
+        pass
+
+
+class XMPPNotifierTests(TestCase):
+
+    xmppEnabledConfig = Config(config_mod.defaultConfig)
+    xmppEnabledConfig.Notifications['Services'][1]['Enabled'] = True
+    xmppEnabledConfig.ServerHostName = "server.example.com"
+    xmppEnabledConfig.HTTPPort = 80
+
+    xmppDisabledConfig = Config(config_mod.defaultConfig)
+    xmppDisabledConfig.Notifications['Services'][1]['Enabled'] = False
+
+    def setUp(self):
+        self.xmlStream = StubXmlStream()
+        self.settings = { 'ServiceAddress' : 'pubsub.example.com' }
+        self.notifier = XMPPNotifier(self.settings, reactor=Clock(),
+            configOverride=self.xmppEnabledConfig)
+        self.notifier.streamOpened(self.xmlStream)
+
+    def test_sendWhileConnected(self):
+        self.notifier.enqueue("/principals/__uids__/test")
+
+        iq = self.xmlStream.elements[0]
+        self.assertEquals(iq.name, "iq")
+
+        pubsubElement = list(iq.elements())[0]
+        self.assertEquals(pubsubElement.name, "pubsub")
+        self.assertEquals(pubsubElement.uri, 'http://jabber.org/protocol/pubsub')
+
+        publishElement = list(pubsubElement.elements())[0]
+        self.assertEquals(publishElement.name, "publish")
+        self.assertEquals(publishElement.uri, 'http://jabber.org/protocol/pubsub')
+        self.assertEquals(publishElement['node'],
+            "/Public/CalDAV/server.example.com/80/principals/__uids__/test/")
+
+    def test_sendWhileNotConnected(self):
+        notifier = XMPPNotifier(self.settings, reactor=Clock(),
+            configOverride=self.xmppDisabledConfig)
+        notifier.enqueue("/principals/__uids__/test")
+        self.assertEquals(self.xmlStream.elements, [])
+
+    def test_publishNewNode(self):
+        self.notifier.publishNode("testNodeName")
+        iq = self.xmlStream.elements[0]
+        self.assertEquals(iq.name, "iq")
+
+    def test_publishReponse400(self):
+        response = IQ(self.xmlStream, type='error')
+        errorElement = response.addElement('error')
+        errorElement['code'] = '400'
+        self.assertEquals(len(self.xmlStream.elements), 0)
+        self.notifier.responseFromPublish("testNodeName", response)
+        self.assertEquals(len(self.xmlStream.elements), 1)
+        iq = self.xmlStream.elements[0]
+        self.assertEquals(iq.name, "iq")
+        self.assertEquals(iq['type'], "get")
+
+        pubsubElement = list(iq.elements())[0]
+        self.assertEquals(pubsubElement.name, "pubsub")
+        self.assertEquals(pubsubElement.uri,
+            'http://jabber.org/protocol/pubsub#owner')
+        configElement = list(pubsubElement.elements())[0]
+        self.assertEquals(configElement.name, "configure")
+        self.assertEquals(configElement['node'], "testNodeName")
+
+
+    def test_publishReponse404(self):
+        response = IQ(self.xmlStream, type='error')
+        errorElement = response.addElement('error')
+        errorElement['code'] = '404'
+        self.assertEquals(len(self.xmlStream.elements), 0)
+        self.notifier.responseFromPublish("testNodeName", response)
+        self.assertEquals(len(self.xmlStream.elements), 1)
+        iq = self.xmlStream.elements[0]
+        self.assertEquals(iq.name, "iq")
+        self.assertEquals(iq['type'], "set")
+
+        pubsubElement = list(iq.elements())[0]
+        self.assertEquals(pubsubElement.name, "pubsub")
+        self.assertEquals(pubsubElement.uri,
+            'http://jabber.org/protocol/pubsub')
+        createElement = list(pubsubElement.elements())[0]
+        self.assertEquals(createElement.name, "create")
+        self.assertEquals(createElement['node'], "testNodeName")
+
+
+    def test_configureResponse(self):
+
+        def _getChild(element, name):
+            for child in element.elements():
+                if child.name == name:
+                    return child
+            return None
+
+        response = IQ(self.xmlStream, type='result')
+        pubsubElement = response.addElement('pubsub')
+        configElement = pubsubElement.addElement('configure')
+        formElement = configElement.addElement('x')
+        formElement['type'] = 'form'
+        fields = [
+            ( "unknown", "don't edit me" ),
+            ( "pubsub#deliver_payloads", "1" ),
+            ( "pubsub#persist_items", "1" ),
+        ]
+        expectedFields = {
+            "unknown" : "don't edit me",
+            "pubsub#deliver_payloads" : "0",
+            "pubsub#persist_items" : "0",
+        }
+        for field in fields:
+            fieldElement = formElement.addElement("field")
+            fieldElement['var'] = field[0]
+            fieldElement.addElement('value', content=field[1])
+
+        self.assertEquals(len(self.xmlStream.elements), 0)
+        self.notifier.responseFromConfigurationForm("testNodeName", response)
+        self.assertEquals(len(self.xmlStream.elements), 1)
+
+        iq = self.xmlStream.elements[0]
+        self.assertEquals(iq.name, "iq")
+        self.assertEquals(iq['type'], "set")
+
+        pubsubElement = list(iq.elements())[0]
+        self.assertEquals(pubsubElement.name, "pubsub")
+        configElement = list(pubsubElement.elements())[0]
+        self.assertEquals(configElement.name, "configure")
+        self.assertEquals(configElement['node'], "testNodeName")
+        formElement = list(configElement.elements())[0]
+        self.assertEquals(formElement['type'], "submit")
+        for field in formElement.elements():
+            valueElement = _getChild(field, "value")
+            if valueElement is not None:
+                self.assertEquals(expectedFields[field['var']],
+                    str(valueElement))
+
+
+
+class XMPPNotificationFactoryTests(TestCase):
+
+    def test_sendPresence(self):
+        clock = Clock()
+        xmlStream = StubXmlStream()
+        settings = { 'ServiceAddress' : 'pubsub.example.com', 'JID' : 'jid',
+            'Password' : 'password', 'KeepAliveSeconds' : 5 }
+        notifier = XMPPNotifier(settings, reactor=clock)
+        factory = XMPPNotificationFactory(notifier, settings, reactor=clock)
+        factory.connected(xmlStream)
+        factory.authenticated(xmlStream)
+
+        self.assertEquals(len(xmlStream.elements), 1)
+        presence = xmlStream.elements[0]
+        self.assertEquals(presence.name, 'presence')
+
+        clock.advance(5)
+
+        self.assertEquals(len(xmlStream.elements), 2)
+        presence = xmlStream.elements[1]
+        self.assertEquals(presence.name, 'presence')
+
+        factory.disconnected(xmlStream)
+        clock.advance(5)
+        self.assertEquals(len(xmlStream.elements), 2)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080711/06623f7f/attachment-0001.html 


More information about the calendarserver-changes mailing list