[CalendarServer-changes] [4460] CalendarServer/branches/users/wsanchez/deployment/twistedcaldav

source_changes at macosforge.org source_changes at macosforge.org
Tue Jul 14 18:25:22 PDT 2009


Revision: 4460
          http://trac.macosforge.org/projects/calendarserver/changeset/4460
Author:   wsanchez at apple.com
Date:     2009-07-14 18:25:22 -0700 (Tue, 14 Jul 2009)
Log Message:
-----------
Backport of XMPP notifications to deployment branch

Modified Paths:
--------------
    CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cache.py
    CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/config.py
    CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/customxml.py
    CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/directory/calendar.py
    CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/notify.py
    CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/static.py

Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cache.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cache.py	2009-07-15 01:21:01 UTC (rev 4459)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cache.py	2009-07-15 01:25:22 UTC (rev 4460)
@@ -31,9 +31,7 @@
 from twistedcaldav.memcachepool import CachePoolUserMixIn
 from twistedcaldav.config import config
 
-from twistedcaldav.notify import NotificationClientUserMixIn
 
-
 class DisabledCacheNotifier(object):
     def __init__(self, *args, **kwargs):
         pass
@@ -65,13 +63,7 @@
 
 
 
-#
-# FIXME: This should be a generic notifier class, not specific to
-# memcache, as evidenced by the addition of the sendNotification()
-# addition in changed() below.
-#
-class MemcacheChangeNotifier(LoggingMixIn, CachePoolUserMixIn,
-    NotificationClientUserMixIn):
+class MemcacheChangeNotifier(LoggingMixIn, CachePoolUserMixIn):
 
     def __init__(self, resource, cachePool=None):
         self._resource = resource
@@ -91,10 +83,6 @@
 
         url = self._resource.url()
 
-        if config.Notifications["Enabled"]:
-            self.log_debug("Notifications are enabled: %s" % (url,))
-            self.sendNotification(url)
-
         self.log_debug("Changing Cache Token for %r" % (url,))
         return self.getCachePool().set(
             'cacheToken:%s' % (url,),

Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/config.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/config.py	2009-07-15 01:21:01 UTC (rev 4459)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/config.py	2009-07-15 01:25:22 UTC (rev 4460)
@@ -572,19 +572,6 @@
                 service["Service"] == "twistedcaldav.notify.XMPPNotifierService" and
                 service["Enabled"]
             ):
-                # Get password from keychain.  If not there, fall back to what
-                # is in the plist.
-                try:
-                    password = getPasswordFromKeychain(service["JID"])
-                    service["Password"] = password
-                    log.info("XMPP password successfully retreived from keychain")
-                except KeychainAccessError:
-                    # The system doesn't support keychain
-                    pass
-                except KeychainPasswordNotFound:
-                    # The password doesn't exist in the keychain.
-                    log.error("XMPP password not found in keychain")
-
                 # Check for empty fields
                 for key, value in service.iteritems():
                     if not value and key not in ("AllowedJIDs", "HeartbeatMinutes", "Password"):

Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/customxml.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/customxml.py	2009-07-15 01:21:01 UTC (rev 4459)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/customxml.py	2009-07-15 01:25:22 UTC (rev 4460)
@@ -277,9 +277,47 @@
     namespace = calendarserver_namespace
     name = "xmpp-uri"
     protected = True
+    hidden = True
 
+class PubSubHeartbeatProperty (davxml.WebDAVElement):
+    """
+    A calendarhomefile property to indicate the pubsub XMPP URI to subscribe to
+    for server heartbeats.
+    """
+    namespace = calendarserver_namespace
+    name = "xmpp-heartbeat"
+    protected = True
+    hidden = True
+    allowed_children = {
+        (calendarserver_namespace, "xmpp-heartbeat-uri" )  : (1, 1),
+        (calendarserver_namespace, "xmpp-heartbeat-minutes" ) : (1, 1),
+    }
 
+class PubSubHeartbeatURIProperty (davxml.WebDAVTextElement):
+    namespace = calendarserver_namespace
+    name = "xmpp-heartbeat-uri"
+    protected = True
+    hidden = True
 
+class PubSubHeartbeatMinutesProperty (davxml.WebDAVTextElement):
+    namespace = calendarserver_namespace
+    name = "xmpp-heartbeat-minutes"
+    protected = True
+    hidden = True
+
+class PubSubXMPPServerProperty (davxml.WebDAVTextElement):
+    """
+    A calendarhomefile property to indicate the pubsub XMPP hostname to
+    contact for notifications.
+    """
+    namespace = calendarserver_namespace
+    name = "xmpp-server"
+    protected = True
+    hidden = True
+
+
+
+
 ##
 # Extensions to davxml.ResourceType
 ##

Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/directory/calendar.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/directory/calendar.py	2009-07-15 01:21:01 UTC (rev 4459)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/directory/calendar.py	2009-07-15 01:25:22 UTC (rev 4460)
@@ -274,13 +274,25 @@
             self.putChild(name, child)
 
     def provisionDefaultCalendars(self):
-        self.provision()
 
-        childName = "calendar"
-        childURL = joinURL(self.url(), childName)
-        child = self.provisionChild(childName)
-        assert isinstance(child, CalDAVResource), "Child %r is not a %s: %r" % (childName, CalDAVResource.__name__, child)
+        # Disable notifications during provisioning
+        if hasattr(self, "clientNotifier"):
+            self.clientNotifier.disableNotify()
 
+        try:
+            self.provision()
+
+            childName = "calendar"
+            childURL = joinURL(self.url(), childName)
+            child = self.provisionChild(childName)
+            assert isinstance(child, CalDAVResource), "Child %r is not a %s: %r" % (childName, CalDAVResource.__name__, child)
+        except:
+            # We want to make sure to re-enable notifications, so do so
+            # if there is an immediate exception above, or via errback, below
+            if hasattr(self, "clientNotifier"):
+                self.clientNotifier.enableNotify(None)
+            raise
+
         def setupChild(_):
             # Set calendar-free-busy-set on inbox
             inbox = self.getChild("inbox")
@@ -295,6 +307,9 @@
 
         d = child.createCalendarCollection()
         d.addCallback(setupChild)
+        if hasattr(self, "clientNotifier"):
+            d.addCallback(self.clientNotifier.enableNotify)
+            d.addErrback(self.clientNotifier.enableNotify)
         return d
 
     def provisionChild(self, name):

Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/notify.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/notify.py	2009-07-15 01:21:01 UTC (rev 4459)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/notify.py	2009-07-15 01:25:22 UTC (rev 4460)
@@ -32,28 +32,37 @@
 These notifications originate from cache.py:MemcacheChangeNotifier.changed().
 """
 
-# TODO: bindAddress to local
 # TODO: add CalDAVTester test for examining new xmpp-uri property
-# TODO: auto-registration and roster management for XMPP
 
-from twisted.internet import protocol
-from twisted.protocols import basic
+from twisted.internet.protocol import ReconnectingClientFactory, ServerFactory
+from twisted.internet.address import IPv4Address
+from twisted.internet.ssl import ClientContextFactory
+from twisted.internet.defer import inlineCallbacks, Deferred
+from twisted.protocols.basic import LineReceiver
 from twisted.plugin import IPlugin
 from twisted.application import internet, service
 from twisted.python.usage import Options, UsageError
 from twisted.python.reflect import namedClass
 from twisted.words.protocols.jabber import xmlstream
 from twisted.words.protocols.jabber.jid import JID
-from twisted.words.protocols.jabber.client import BasicAuthenticator, IQ
+from twisted.words.protocols.jabber.client import XMPPAuthenticator, IQAuthInitializer
+from twisted.words.protocols.jabber.xmlstream import IQ
 from twisted.words.xish import domish
 from twistedcaldav.log import LoggingMixIn
 from twistedcaldav.config import config, parseConfig, defaultConfig
+from twistedcaldav.memcacher import Memcacher
+from twistedcaldav import memcachepool
 from zope.interface import Interface, implements
+from fnmatch import fnmatch
+import uuid
 
 __all__ = [
+    "ClientNotifier",
     "Coalescer",
+    "getNodeCacher",
     "getNotificationClient",
     "getPubSubConfiguration",
+    "getPubSubHeartbeatURI",
     "getPubSubPath",
     "getPubSubXMPPURI",
     "INotifier",
@@ -63,8 +72,6 @@
     "NotificationClient",
     "NotificationClientFactory",
     "NotificationClientLineProtocol",
-    "NotificationClientUserMixIn",
-    "NotificationOptions",
     "NotificationServiceMaker",
     "SimpleLineNotificationFactory",
     "SimpleLineNotificationProtocol",
@@ -78,19 +85,38 @@
 # Classes used within calendarserver itself
 #
 
-class NotificationClientUserMixIn(object):
+class ClientNotifier(LoggingMixIn):
     """
-    Notification Client User (Mixin)
-
     Provides a method to send change notifications to the L{NotificationClient}.
     """
 
-    def sendNotification(self, uri):
-        getNotificationClient().send(uri)
+    def __init__(self, resource, configOverride=None):
+        self._resource = resource
+        self._notify = True
+        self.config = configOverride or config
 
+    def enableNotify(self, arg):
+        url = self._resource.url()
+        self.log_debug("enableNotify: %s" % (url,))
+        self._notify = True
 
+    def disableNotify(self):
+        url = self._resource.url()
+        self.log_debug("disableNotify: %s" % (url,))
+        self._notify = False
 
-class NotificationClientLineProtocol(basic.LineReceiver, LoggingMixIn):
+    def notify(self, op="update"):
+        url = self._resource.url()
+
+        if self.config.Notifications.Enabled:
+            if self._notify:
+                self.log_debug("Notifications are enabled: %s %s" % (op, url))
+                return getNotificationClient().send(op, url)
+            else:
+                self.log_debug("Skipping notification for: %s" % (url,))
+
+
+class NotificationClientLineProtocol(LineReceiver, LoggingMixIn):
     """
     Notification Client Line Protocol
 
@@ -105,7 +131,7 @@
         self.client.removeObserver(self)
 
 
-class NotificationClientFactory(protocol.ReconnectingClientFactory,
+class NotificationClientFactory(ReconnectingClientFactory,
     LoggingMixIn):
     """
     Notification Client Factory
@@ -120,18 +146,14 @@
         self.client = client
 
     def clientConnectionLost(self, connector, reason):
-        self.log_error("Connect to notification server lost: %s" %
-            (reason,))
+        self.log_error("Connect to notification server lost: %s" % (reason,))
         self.connected = False
-        protocol.ReconnectingClientFactory.clientConnectionLost(self,
-            connector, reason)
+        ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
 
     def clientConnectionFailed(self, connector, reason):
-        self.log_error("Unable to connect to notification server: %s" %
-            (reason,))
+        self.log_error("Unable to connect to notification server: %s" % (reason,))
         self.connected = False
-        protocol.ReconnectingClientFactory.clientConnectionFailed(self,
-            connector, reason)
+        ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
 
     def connectionMade(self):
         self.connected = True
@@ -152,7 +174,7 @@
     """
     Notification Client
 
-    Forwards on notifications from NotificationClientUserMixIns to the
+    Forwards on notifications from ClientNotifiers to the
     notification server.  A NotificationClient is installed by the tap at
     startup.
     """
@@ -168,26 +190,27 @@
             from twisted.internet import reactor
         self.reactor = reactor
 
-    def send(self, uri):
+    def send(self, op, uri):
         if self.factory is None:
             self.factory = NotificationClientFactory(self)
             self.reactor.connectTCP(self.host, self.port, self.factory)
             self.log_debug("Creating factory")
 
+        msg = "%s %s" % (op, str(uri))
         if self.factory.isReady() and self.observers:
             for observer in self.observers:
-                self.log_debug("Sending to notification server: %s" % (uri,))
-                observer.sendLine(str(uri))
+                self.log_debug("Sending to notification server: %s" % (msg,))
+                observer.sendLine(msg)
         else:
-            self.log_debug("Queing: %s" % (uri,))
-            self.queued.add(uri)
+            self.log_debug("Queuing: %s" % (msg,))
+            self.queued.add(msg)
 
     def connectionMade(self):
         if self.factory.isReady() and self.observers:
             for observer in self.observers:
-                for uri in self.queued:
-                    self.log_debug("Sending from queue: %s" % (uri,))
-                    observer.sendLine(str(uri))
+                for msg in self.queued:
+                    self.log_debug("Sending from queue: %s" % (msg,))
+                    observer.sendLine(msg)
             self.queued.clear()
 
     def addObserver(self, observer):
@@ -208,10 +231,62 @@
 
 
 
+class NodeCreationException(Exception):
+    pass
 
+class NodeCacher(Memcacher, LoggingMixIn):
 
+    def __init__(self, reactor=None):
+        if reactor is None:
+            from twisted.internet import reactor
+        self.reactor = reactor
+        super(NodeCacher, self).__init__("pubsubnodes")
 
+    def nodeExists(self, nodeName):
+        return self.get(nodeName)
 
+    def storeNode(self, nodeName):
+        return self.set(nodeName, "1")
+
+    @inlineCallbacks
+    def waitForNode(self, notifier, nodeName):
+        retryCount = 0
+        verified = False
+        requestedCreation = False
+        while(retryCount < 5):
+            if (yield self.nodeExists(nodeName)):
+                verified = True
+                break
+
+            if not requestedCreation:
+                notifier.notify(op="create")
+                requestedCreation = True
+
+            retryCount += 1
+
+            pause = Deferred()
+            def _timedDeferred():
+                pause.callback(True)
+            self.reactor.callLater(1, _timedDeferred)
+            yield pause
+
+        if not verified:
+            self.log_debug("Giving up!")
+            raise NodeCreationException("Could not create node %s" % (nodeName,))
+
+
+_nodeCacher = None
+
+def getNodeCacher():
+    global _nodeCacher
+    if _nodeCacher is None:
+        _nodeCacher = NodeCacher()
+    return _nodeCacher
+
+
+
+
+
 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
 # Classes used within Notification Server
 #
@@ -220,7 +295,7 @@
 # Internal Channel (from icalserver to notification server)
 #
 
-class InternalNotificationProtocol(basic.LineReceiver):
+class InternalNotificationProtocol(LineReceiver):
     """
     InternalNotificationProtocol
 
@@ -228,11 +303,11 @@
     """
 
     def lineReceived(self, line):
-        val = str(line.strip())
-        self.factory.coalescer.add(val)
+        op, uri = line.strip().split()
+        self.factory.coalescer.add(op, uri)
 
 
-class InternalNotificationFactory(protocol.ServerFactory):
+class InternalNotificationFactory(ServerFactory):
     """
     Internal Notification Factory
 
@@ -257,30 +332,55 @@
     """
 
     delaySeconds = 5
+    sendAnywayAfterCount = 5
 
-    def __init__(self, notifiers, reactor=None, delaySeconds=None):
+    def __init__(self, notifiers, reactor=None, delaySeconds=None,
+        sendAnywayAfterCount=None):
 
-        if delaySeconds:
+        if sendAnywayAfterCount:
+            self.sendAnywayAfterCount = sendAnywayAfterCount
+
+        if delaySeconds is not None:
             self.delaySeconds = delaySeconds
 
         if reactor is None:
             from twisted.internet import reactor
         self.reactor = reactor
 
-        self.uris = dict()
+        self.uris = {}
         self.notifiers = notifiers
 
-    def add(self, uri):
-        delayed = self.uris.get(uri, None)
-        if delayed and delayed.active():
-            delayed.reset(self.delaySeconds)
-        else:
-            self.uris[uri] = self.reactor.callLater(self.delaySeconds,
-                self.delayedEnqueue, uri)
+    def add(self, op, uri):
 
-    def delayedEnqueue(self, uri):
+        if op == "create":
+            # we don't want to delay a "create" notification; this opcode
+            # is meant for XMPP pubsub -- it means create and configure the
+            # node but don't publish to it
+            for notifier in self.notifiers:
+                notifier.enqueue(op, uri)
+
+        else: # normal update notification
+            delayed, count = self.uris.get(uri, [None, 0])
+
+            if delayed and delayed.active():
+                count += 1
+                if count < self.sendAnywayAfterCount:
+                    # reschedule for delaySeconds in the future
+                    delayed.reset(self.delaySeconds)
+                    self.uris[uri][1] = count
+                    self.log_debug("Delaying: %s" % (uri,))
+                else:
+                    self.log_debug("Not delaying to avoid starvation: %s" % (uri,))
+            else:
+                self.log_debug("Scheduling: %s" % (uri,))
+                self.uris[uri] = [self.reactor.callLater(self.delaySeconds,
+                    self.delayedEnqueue, op, uri), 0]
+
+    def delayedEnqueue(self, op, uri):
+        self.log_debug("Time to send: %s" % (uri,))
+        self.uris[uri][1] = 0
         for notifier in self.notifiers:
-            notifier.enqueue(uri)
+            notifier.enqueue(op, uri)
 
 
 
@@ -295,11 +395,12 @@
     Defines an enqueue method that Notifier classes need to implement.
     """
 
-    def enqueue(uri):
+    def enqueue(self, op, uri):
         """
         Let's the notifier object know that a change has been made for this
         uri, and enough time has passed to allow for coalescence.
 
+        @type op: C{str}
         @type uri: C{str}
         """
 
@@ -333,18 +434,20 @@
         self.observers = set()
         self.sentReset = False
 
-    def enqueue(self, uri):
+    def enqueue(self, op, uri):
 
-        self.latestSeq += 1L
+        if op == "update":
 
-        # Update history
-        self.history[uri] = self.latestSeq
+            self.latestSeq += 1L
 
-        for observer in self.observers:
-            msg = "%d %s" % (self.latestSeq, uri)
-            self.log_debug("Sending %s" % (msg,))
-            observer.sendLine(msg)
+            # Update history
+            self.history[uri] = self.latestSeq
 
+            for observer in self.observers:
+                msg = "%d %s" % (self.latestSeq, uri)
+                self.log_debug("Sending %s" % (msg,))
+                observer.sendLine(msg)
+
     def reset(self):
         self.latestSeq = 0L
         self.history = { } # keys=uri, values=sequenceNumber
@@ -374,7 +477,7 @@
             self.sentReset = True
 
 
-class SimpleLineNotificationProtocol(basic.LineReceiver, LoggingMixIn):
+class SimpleLineNotificationProtocol(LineReceiver, LoggingMixIn):
     """
     Simple Line Notification Protocol
 
@@ -412,7 +515,7 @@
         self.notifier.removeObserver(self)
 
 
-class SimpleLineNotificationFactory(protocol.ServerFactory):
+class SimpleLineNotificationFactory(ServerFactory):
     """
     Simple Line Notification Factory
 
@@ -435,8 +538,6 @@
 
 
 
-
-
 class XMPPNotifier(LoggingMixIn):
     """
     XMPP Notifier
@@ -451,104 +552,187 @@
     create the node and then go through the configuration process,
     followed by a publish retry.
 
-    For monitoring purposes, you can specify a "TestJID" value in
-    the config file; XMPPNotifier will send error messages to that
+    For monitoring purposes, you can subscribe to the server's JID
+    as long as your own JID matches the "AllowedJIDs" pattern(s) in
+    the config file; XMPPNotifier will send error messages to your
     JID.  If you also want to receive non-error, debug messages,
     send the calendar server JID the message, "debug on".  Send
-    "help" for other commands.  Note, XMPPNotifier doesn't yet
-    handle registration or roster management, so you'll need to set
-    up the JID accounts out-of-band, using another XMPP client, for
-    example.
+    "help" for other commands.
 
+    To let clients know that the notifications from the calendar server
+    are still flowing, a "heartbeat" node is published to every 30
+    minutes (configurable).
+
     """
 
     implements(INotifier)
 
     pubsubNS = 'http://jabber.org/protocol/pubsub'
 
-    nodeConf = {
-        'pubsub#deliver_payloads': '0',
-        'pubsub#persist_items'   : '0',
-    }
-
-    def __init__(self, settings, reactor=None, configOverride=None):
+    def __init__(self, settings, reactor=None, configOverride=None,
+        heartbeat=True, roster=True):
         self.xmlStream = None
         self.settings = settings
         if reactor is None:
             from twisted.internet import reactor
         self.reactor = reactor
         self.config = configOverride or config
+        self.doHeartbeat = heartbeat and self.settings['HeartbeatMinutes'] != 0
+        self.doRoster = roster
 
-        self.sendDebugMessages = False
+        self.roster = {}
+        self.outstanding = {}
 
-    def enqueue(self, uri):
+    def lockNode(self, nodeName):
+        if self.outstanding.has_key(nodeName):
+            return False
+        else:
+            self.outstanding[nodeName] = 1
+            return True
+
+    def unlockNode(self, failure, nodeName):
+        try:
+            del self.outstanding[nodeName]
+        except KeyError:
+            pass
+
+    def sendHeartbeat(self):
+        if self.doHeartbeat and self.xmlStream is not None:
+            self.enqueue("update", "", lock=False)
+            self.reactor.callLater(self.settings['HeartbeatMinutes'] * 60,
+                self.sendHeartbeat)
+
+    def enqueue(self, op, uri, lock=True):
         if self.xmlStream is not None:
             # Convert uri to node
             nodeName = self.uriToNodeName(uri)
-            self.publishNode(nodeName)
+            if op == "create":
+                if not self.lockNode(nodeName):
+                    # this node is busy, so it must already be created, or at
+                    # least in the proccess
+                    return
+                self.createNode(nodeName, publish=False)
+            else:
+                self.publishNode(nodeName, lock=lock)
 
     def uriToNodeName(self, uri):
         return getPubSubPath(uri, getPubSubConfiguration(self.config))
 
-    def publishNode(self, nodeName):
-        if self.xmlStream is not None:
+    def publishNode(self, nodeName, lock=True):
+        if self.xmlStream is None:
+            # We lost our connection
+            self.unlockNode(None, nodeName)
+            return
+
+        try:
+            if lock and not self.lockNode(nodeName):
+                return
+
             iq = IQ(self.xmlStream)
             pubsubElement = iq.addElement('pubsub', defaultUri=self.pubsubNS)
             publishElement = pubsubElement.addElement('publish')
             publishElement['node'] = nodeName
-            iq.addCallback(self.responseFromPublish, nodeName)
-            iq.send(to=self.settings['ServiceAddress'])
+            if self.settings["NodeConfiguration"]["pubsub#deliver_payloads"] == '1':
+                itemElement = publishElement.addElement('item')
+                payloadElement = itemElement.addElement('plistfrag',
+                    defaultUri='plist-apple')
 
-    def responseFromPublish(self, nodeName, iq):
-        if iq['type'] == 'result':
-            self.sendDebug("Node publish successful (%s)" % (nodeName,), iq)
-        else:
-            self.log_error("PubSub node publish error: %s" %
-                (iq.toXml().encode('ascii', 'replace')),)
-            self.sendDebug("Node publish failed (%s)" % (nodeName,), iq)
+            self.sendDebug("Publishing (%s)" % (nodeName,), iq)
+            d = iq.send(to=self.settings['ServiceAddress'])
+            d.addCallback(self.publishNodeSuccess, nodeName)
+            d.addErrback(self.publishNodeFailure, nodeName)
+        except:
+            self.unlockNode(None, nodeName)
+            raise
 
-            errorElement = None
-            pubsubElement = None
-            for child in iq.elements():
-                if child.name == 'error':
-                    errorElement = child
-                if child.name == 'pubsub':
-                    pubsubElement = child
+    def publishNodeSuccess(self, iq, nodeName):
+        self.unlockNode(None, nodeName)
+        self.sendDebug("Node publish successful (%s)" % (nodeName,), iq)
 
-            if errorElement:
-                if errorElement['code'] == '400':
-                    self.requestConfigurationForm(nodeName)
+    def publishNodeFailure(self, result, nodeName):
+        try:
+            iq = result.value.getElement()
 
-                elif errorElement['code'] == '404':
+            if iq.name == "error":
+                if iq['code'] == '400':
+                    self.requestConfigurationForm(nodeName, True)
+
+                elif iq['code'] == '404':
                     self.createNode(nodeName)
+            else:
+                self.log_error("PubSub node publish error: %s" %
+                    (iq.toXml().encode('ascii', 'replace')),)
+                self.sendDebug("Node publish failed (%s)" % (nodeName,), iq)
+                # Don't know how to proceed
+                self.unlockNode(None, nodeName)
+        except:
+            self.unlockNode(None, nodeName)
+            raise
 
-    def createNode(self, nodeName):
-        if self.xmlStream is not None:
+    def createNode(self, nodeName, publish=True):
+        if self.xmlStream is None:
+            # We lost our connection
+            self.unlockNode(None, nodeName)
+            return
+
+        try:
             iq = IQ(self.xmlStream)
             pubsubElement = iq.addElement('pubsub', defaultUri=self.pubsubNS)
             child = pubsubElement.addElement('create')
             child['node'] = nodeName
-            iq.addCallback(self.responseFromCreate, nodeName)
-            iq.send(to=self.settings['ServiceAddress'])
+            d = iq.send(to=self.settings['ServiceAddress'])
+            d.addCallback(self.createNodeSuccess, nodeName, publish)
+            d.addErrback(self.createNodeFailure, nodeName, publish)
+        except:
+            self.unlockNode(None, nodeName)
+            raise
 
-    def responseFromCreate(self, nodeName, iq):
-        if iq['type'] == 'result':
+    def createNodeSuccess(self, iq, nodeName, publish):
+        try:
             self.sendDebug("Node creation successful (%s)" % (nodeName,), iq)
             # now time to configure; fetch the form
-            self.requestConfigurationForm(nodeName)
-        else:
-            self.log_error("PubSub node creation error: %s" %
-                (iq.toXml().encode('ascii', 'replace')),)
-            self.sendError("Node creation failed (%s)" % (nodeName,), iq)
+            self.requestConfigurationForm(nodeName, publish)
+        except:
+            self.unlockNode(None, nodeName)
+            raise
 
-    def requestConfigurationForm(self, nodeName):
-        if self.xmlStream is not None:
-            iq = IQ(self.xmlStream, type='get')
-            child = iq.addElement('pubsub', defaultUri=self.pubsubNS+"#owner")
+    def createNodeFailure(self, result, nodeName, publish):
+        try:
+            iq = result.value.getElement()
+            if iq['code'] == '409':
+                # node already exists, proceed to configure
+                self.sendDebug("Node already exists (%s)" % (nodeName,), iq)
+                self.requestConfigurationForm(nodeName, publish)
+            else:
+                # couldn't create node, give up
+                self.unlockNode(None, nodeName)
+                self.log_error("PubSub node creation error: %s" %
+                    (iq.toXml().encode('ascii', 'replace')),)
+                self.sendError("Node creation failed (%s)" % (nodeName,), iq)
+        except:
+            self.unlockNode(None, nodeName)
+            raise
+
+    def requestConfigurationForm(self, nodeName, publish):
+        if self.xmlStream is None:
+            # We lost our connection
+            self.unlockNode(None, nodeName)
+            return
+
+        try:
+            # XXX This codepath is not unit tested
+            iq = IQ(self.xmlStream, 'get')
+            child = iq.addElement('pubsub',
+                defaultUri=self.pubsubNS+"#owner")
             child = child.addElement('configure')
             child['node'] = nodeName
-            iq.addCallback(self.responseFromConfigurationForm, nodeName)
-            iq.send(to=self.settings['ServiceAddress'])
+            d = iq.send(to=self.settings['ServiceAddress'])
+            d.addCallback(self.requestConfigurationFormSuccess, nodeName,
+                publish)
+            d.addErrback(self.requestConfigurationFormFailure, nodeName)
+        except:
+            self.unlockNode(None, nodeName)
+            raise
 
     def _getChild(self, element, name):
         for child in element.elements():
@@ -556,8 +740,14 @@
                 return child
         return None
 
-    def responseFromConfigurationForm(self, nodeName, iq):
-        if iq['type'] == 'result':
+    def requestConfigurationFormSuccess(self, iq, nodeName, publish):
+        if self.xmlStream is None:
+            # We lost our connection
+            self.unlockNode(None, nodeName)
+            return
+
+        try:
+            nodeConf = self.settings["NodeConfiguration"]
             self.sendDebug("Received configuration form (%s)" % (nodeName,), iq)
             pubsubElement = self._getChild(iq, 'pubsub')
             if pubsubElement:
@@ -566,7 +756,7 @@
                     formElement = configureElement.firstChildElement()
                     if formElement['type'] == 'form':
                         # We've found the form; start building a response
-                        filledIq = IQ(self.xmlStream, type='set')
+                        filledIq = IQ(self.xmlStream, 'set')
                         filledPubSub = filledIq.addElement('pubsub',
                             defaultUri=self.pubsubNS+"#owner")
                         filledConfigure = filledPubSub.addElement('configure')
@@ -575,64 +765,238 @@
                             defaultUri='jabber:x:data')
                         filledForm['type'] = 'submit'
 
+                        configMatches = True
                         for field in formElement.elements():
                             if field.name == 'field':
                                 var = field['var']
                                 if var == "FORM_TYPE":
                                     filledForm.addChild(field)
                                 else:
-                                    value = self.nodeConf.get(var, None)
-                                    if value is not None:
+                                    value = nodeConf.get(var, None)
+                                    if (value is not None and
+                                        (str(self._getChild(field,
+                                        "value")) != value)):
+                                        # this field needs configuring
+                                        configMatches = False
                                         filledField = filledForm.addElement('field')
                                         filledField['var'] = var
                                         filledField['type'] = field['type']
                                         valueElement = filledField.addElement('value')
                                         valueElement.addContent(value)
-                                        filledForm.addChild(field)
-                        filledIq.addCallback(self.responseFromConfiguration,
-                            nodeName)
-                        self.sendDebug("Sending configuration form (%s)"
-                                       % (nodeName,), filledIq)
-                        filledIq.send(to=self.settings['ServiceAddress'])
-        else:
+                                        # filledForm.addChild(field)
+                        if configMatches:
+                            # XXX This codepath is not unit tested
+                            cancelIq = IQ(self.xmlStream, 'set')
+                            cancelPubSub = cancelIq.addElement('pubsub',
+                                defaultUri=self.pubsubNS+"#owner")
+                            cancelConfig = cancelPubSub.addElement('configure')
+                            cancelConfig['node'] = nodeName
+                            cancelX = cancelConfig.addElement('x',
+                                defaultUri='jabber:x:data')
+                            cancelX['type'] = 'cancel'
+                            self.sendDebug("Cancelling configuration (%s)"
+                                           % (nodeName,), cancelIq)
+                            d = cancelIq.send(to=self.settings['ServiceAddress'])
+                        else:
+                            self.sendDebug("Sending configuration form (%s)"
+                                           % (nodeName,), filledIq)
+                            d = filledIq.send(to=self.settings['ServiceAddress'])
+                        d.addCallback(self.configurationSuccess, nodeName,
+                            publish)
+                        d.addErrback(self.configurationFailure, nodeName)
+                        return
+
+            # Couldn't process configuration form, give up
+            self.unlockNode(None, nodeName)
+
+        except:
+            # Couldn't process configuration form, give up
+            self.unlockNode(None, nodeName)
+            raise
+
+    def requestConfigurationFormFailure(self, result, nodeName):
+        # If we get here we're giving up
+        try:
+            iq = result.value.getElement()
             self.log_error("PubSub configuration form request error: %s" %
                 (iq.toXml().encode('ascii', 'replace')),)
-            self.sendError("Failed to receive configuration form (%s)" % (nodeName,), iq)
+            self.sendError("Failed to receive configuration form (%s)" %
+                (nodeName,), iq)
+        finally:
+            self.unlockNode(None, nodeName)
 
+    def configurationSuccess(self, iq, nodeName, publish):
+        if self.xmlStream is None:
+            # We lost our connection
+            self.unlockNode(None, nodeName)
+            return
 
-    def responseFromConfiguration(self, nodeName, iq):
-        if iq['type'] == 'result':
+        try:
             self.log_debug("PubSub node %s is configured" % (nodeName,))
             self.sendDebug("Configured node (%s)" % (nodeName,), iq)
-            self.publishNode(nodeName)
+            nodeCacher = getNodeCacher()
+            nodeCacher.storeNode(nodeName)
+            if publish:
+                self.publishNode(nodeName, lock=False)
+            else:
+                self.unlockNode(None, nodeName)
+        except:
+            self.unlockNode(None, nodeName)
+            raise
 
-        else:
+    def configurationFailure(self, result, nodeName):
+        # If we get here we're giving up
+        try:
+            iq = result.value.getElement()
             self.log_error("PubSub node configuration error: %s" %
                 (iq.toXml().encode('ascii', 'replace')),)
             self.sendError("Failed to configure node (%s)" % (nodeName,), iq)
+        finally:
+            self.unlockNode(None, nodeName)
 
+    def deleteNode(self, nodeName):
+        if self.xmlStream is None:
+            # We lost our connection
+            self.unlockNode(None, nodeName)
+            return
 
+        try:
+            if not self.lockNode(nodeName):
+                return
+
+            iq = IQ(self.xmlStream)
+            pubsubElement = iq.addElement('pubsub',
+                defaultUri=self.pubsubNS+"#owner")
+            publishElement = pubsubElement.addElement('delete')
+            publishElement['node'] = nodeName
+            self.sendDebug("Deleting (%s)" % (nodeName,), iq)
+            d = iq.send(to=self.settings['ServiceAddress'])
+            d.addCallback(self.deleteNodeSuccess, nodeName)
+            d.addErrback(self.deleteNodeFailure, nodeName)
+        except:
+            self.unlockNode(None, nodeName)
+            raise
+
+    def deleteNodeSuccess(self, iq, nodeName):
+        self.unlockNode(None, nodeName)
+        self.sendDebug("Node delete successful (%s)" % (nodeName,), iq)
+
+    def deleteNodeFailure(self, result, nodeName):
+        try:
+            iq = result.value.getElement()
+            self.log_error("PubSub node delete error: %s" %
+                (iq.toXml().encode('ascii', 'replace')),)
+            self.sendDebug("Node delete failed (%s)" % (nodeName,), iq)
+        finally:
+            self.unlockNode(None, nodeName)
+
+
+    def requestRoster(self):
+        if self.doRoster:
+            self.roster = {}
+            rosterIq = IQ(self.xmlStream, 'get')
+            rosterIq.addElement("query", "jabber:iq:roster")
+            d = rosterIq.send()
+            d.addCallback(self.handleRoster)
+
+    def allowedInRoster(self, jid):
+        for pattern in self.settings.get("AllowedJIDs", []):
+            if fnmatch(jid, pattern):
+                return True
+        return False
+
+    def handleRoster(self, iq):
+        for child in iq.children[0].children:
+            jid = child['jid']
+            if self.allowedInRoster(jid):
+                self.log_debug("In roster: %s" % (jid,))
+                if not self.roster.has_key(jid):
+                    self.roster[jid] = { 'debug' : False, 'available' : False }
+            else:
+                self.log_info("JID not allowed in roster: %s" % (jid,))
+
+    def handlePresence(self, iq):
+        self.log_debug("Presence IQ: %s" %
+            (iq.toXml().encode('ascii', 'replace')),)
+        presenceType = iq.getAttribute('type')
+
+        if presenceType == 'subscribe':
+            frm = JID(iq['from']).userhost()
+            if self.allowedInRoster(frm):
+                self.roster[frm] = { 'debug' : False, 'available' : True }
+                response = domish.Element(('jabber:client', 'presence'))
+                response['to'] = iq['from']
+                response['type'] = 'subscribed'
+                self.xmlStream.send(response)
+
+                # request subscription as well
+                subscribe = domish.Element(('jabber:client', 'presence'))
+                subscribe['to'] = iq['from']
+                subscribe['type'] = 'subscribe'
+                self.xmlStream.send(subscribe)
+            else:
+                self.log_info("JID not allowed in roster: %s" % (frm,))
+                # Reject
+                response = domish.Element(('jabber:client', 'presence'))
+                response['to'] = iq['from']
+                response['type'] = 'unsubscribed'
+                self.xmlStream.send(response)
+
+        elif presenceType == 'unsubscribe':
+            frm = JID(iq['from']).userhost()
+            if self.roster.has_key(frm):
+                del self.roster[frm]
+            response = domish.Element(('jabber:client', 'presence'))
+            response['to'] = iq['from']
+            response['type'] = 'unsubscribed'
+            self.xmlStream.send(response)
+
+            # remove from roster as well
+            # XXX This codepath is not unit tested
+            removal = IQ(self.xmlStream, 'set')
+            query = removal.addElement("query", "jabber:iq:roster")
+            query.addElement("item")
+            query.item["jid"] = iq["from"]
+            query.item["subscription"] = "remove"
+            removal.send()
+
+        elif presenceType == 'unavailable':
+            frm = JID(iq['from']).userhost()
+            if self.roster.has_key(frm):
+                self.roster[frm]['available'] = False
+
+        else:
+            frm = JID(iq['from']).userhost()
+            if self.allowedInRoster(frm):
+                if self.roster.has_key(frm):
+                    self.roster[frm]['available'] = True
+                else:
+                    self.roster[frm] = { 'debug' : False, 'available' : True }
+            else:
+                self.log_info("JID not allowed in roster: %s" % (frm,))
+
     def streamOpened(self, xmlStream):
         self.xmlStream = xmlStream
         xmlStream.addObserver('/message', self.handleMessage)
+        xmlStream.addObserver('/presence', self.handlePresence)
+        self.requestRoster()
+        self.sendHeartbeat()
 
+
     def streamClosed(self):
         self.xmlStream = None
 
     def sendDebug(self, txt, element):
-        if self.sendDebugMessages:
-            testJid = self.settings.get("TestJID", "")
-            if testJid:
-                txt = "DEBUG: %s %s" % (txt, element.toXml().encode('ascii',
-                    'replace'))
-                self.sendAlert(testJid, txt)
+        txt = "DEBUG: %s %s" % (txt, element.toXml().encode('ascii', 'replace'))
+        for jid, info in self.roster.iteritems():
+            if info['available'] and info['debug']:
+                self.sendAlert(jid, txt)
 
     def sendError(self, txt, element):
-        testJid = self.settings.get("TestJID", "")
-        if testJid:
-            txt = "ERROR: %s %s" % (txt, element.toXml().encode('ascii',
-                'replace'))
-            self.sendAlert(testJid, txt)
+        txt = "ERROR: %s %s" % (txt, element.toXml().encode('ascii', 'replace'))
+        for jid, info in self.roster.iteritems():
+            if info['available']:
+                self.sendAlert(jid, txt)
 
     def sendAlert(self, jid, txt):
         if self.xmlStream is not None:
@@ -645,17 +1009,60 @@
         body = getattr(iq, 'body', None)
         if body:
             response = None
-            txt = str(body).lower()
-            if txt == "help":
-                response = "debug on, debug off"
-            elif txt == "debug on":
-                self.sendDebugMessages = True
-                response = "Debugging on"
-            elif txt == "debug off":
-                self.sendDebugMessages = False
-                response = "Debugging off"
+            frm = JID(iq['from']).userhost()
+            if frm in self.roster:
+                txt = str(body).lower()
+                if txt == "help":
+                    response = "debug on, debug off, roster, create <nodename>, publish <nodename>, hammer <count>"
+                elif txt == "roster":
+                    response = "Roster: %s" % (str(self.roster),)
+                elif txt == "debug on":
+                    self.roster[frm]['debug'] = True
+                    response = "Debugging on"
+                elif txt == "debug off":
+                    self.roster[frm]['debug'] = False
+                    response = "Debugging off"
+                elif txt == "outstanding":
+                    response = "Outstanding: %s" % (str(self.outstanding),)
+                elif txt.startswith("publish"):
+                    try:
+                        publish, nodeName = str(body).split()
+                    except ValueError:
+                        response = "Please phrase it like 'publish nodename'"
+                    else:
+                        response = "Publishing node %s" % (nodeName,)
+                        self.reactor.callLater(1, self.enqueue, "update",
+                            nodeName)
+                elif txt.startswith("delete"):
+                    try:
+                        delete, nodeName = str(body).split()
+                    except ValueError:
+                        response = "Please phrase it like 'delete nodename'"
+                    else:
+                        response = "Deleting node %s" % (nodeName,)
+                        self.reactor.callLater(1, self.deleteNode, nodeName)
+                elif txt.startswith("create"):
+                    try:
+                        publish, nodeName = str(body).split()
+                    except ValueError:
+                        response = "Please phrase it like 'create nodename'"
+                    else:
+                        response = "Creating and configuring node %s" % (nodeName,)
+                        self.reactor.callLater(1, self.enqueue, "create",
+                            nodeName)
+                elif txt.startswith("hammer"):
+                    try:
+                        hammer, count = txt.split()
+                        count = int(count)
+                    except ValueError:
+                        response = "Please phrase it like 'hammer 100'"
+                    else:
+                        response = "Hammer will commence now, %d times" % (count,)
+                        self.reactor.callLater(1, self.hammer, count)
+                else:
+                    response = "I don't understand.  Try 'help'."
             else:
-                response = "I don't understand.  Try 'help'."
+                response = "Sorry, you are not authorized to converse with this server"
 
             if response:
                 message = domish.Element(('jabber:client', 'message'))
@@ -664,39 +1071,54 @@
                 self.xmlStream.send(message)
 
 
+    def hammer(self, count):
+        for i in xrange(count):
+            self.enqueue("update", "hammertesting%d" % (i,))
 
 
 class XMPPNotificationFactory(xmlstream.XmlStreamFactory, LoggingMixIn):
 
-    def __init__(self, notifier, settings, reactor=None):
-        self.log_info("Setting up XMPPNotificationFactory")
+    def __init__(self, notifier, settings, reactor=None, keepAlive=True):
+        self.log_warn("Setting up XMPPNotificationFactory")
 
         self.notifier = notifier
         self.settings = settings
+
         self.jid = settings['JID']
+
+        # Ignore JID resource from plist
+        slash = self.jid.find("/")
+        if slash > -1:
+            self.jid = self.jid[0:slash]
+
+        # Generate a unique JID resource value
+        resource = "icalserver.%s" % uuid.uuid4().hex
+        self.jid = "%s/%s" % (self.jid, resource)
+
         self.keepAliveSeconds = settings.get('KeepAliveSeconds', 120)
         self.xmlStream = None
         self.presenceCall = None
+        self.doKeepAlive = keepAlive
         if reactor is None:
             from twisted.internet import reactor
         self.reactor = reactor
 
         xmlstream.XmlStreamFactory.__init__(self,
-            BasicAuthenticator(JID(self.jid), settings['Password']))
+            XMPPAuthenticator(JID(self.jid), settings['Password']))
 
         self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self.connected)
         self.addBootstrap(xmlstream.STREAM_END_EVENT, self.disconnected)
         self.addBootstrap(xmlstream.INIT_FAILED_EVENT, self.initFailed)
 
         self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self.authenticated)
-        self.addBootstrap(BasicAuthenticator.INVALID_USER_EVENT,
+        self.addBootstrap(IQAuthInitializer.INVALID_USER_EVENT,
             self.authFailed)
-        self.addBootstrap(BasicAuthenticator.AUTH_FAILED_EVENT,
+        self.addBootstrap(IQAuthInitializer.AUTH_FAILED_EVENT,
             self.authFailed)
 
     def connected(self, xmlStream):
         self.xmlStream = xmlStream
-        self.log_info("XMPP connection successful")
+        self.log_warn("XMPP connection successful")
         # Log all traffic
         xmlStream.rawDataInFn = self.rawDataIn
         xmlStream.rawDataOutFn = self.rawDataOut
@@ -707,14 +1129,14 @@
         if self.presenceCall is not None:
             self.presenceCall.cancel()
             self.presenceCall = None
-        self.log_info("XMPP disconnected")
+        self.log_warn("XMPP disconnected")
 
     def initFailed(self, failure):
         self.xmlStream = None
-        self.log_info("XMPP Initialization failed: %s" % (failure,))
+        self.log_warn("XMPP Initialization failed: %s" % (failure,))
 
     def authenticated(self, xmlStream):
-        self.log_info("XMPP authentication successful: %s" % (self.jid,))
+        self.log_warn("XMPP authentication successful: %s" % (self.jid,))
         # xmlStream.addObserver('/message', self.handleMessage)
         self.sendPresence()
         self.notifier.streamOpened(xmlStream)
@@ -724,7 +1146,7 @@
             (self.jid,))
 
     def sendPresence(self):
-        if self.xmlStream is not None:
+        if self.doKeepAlive and self.xmlStream is not None:
             presence = domish.Element(('jabber:client', 'presence'))
             self.xmlStream.send(presence)
             self.presenceCall = self.reactor.callLater(self.keepAliveSeconds,
@@ -744,24 +1166,33 @@
     results = { 'enabled' : False }
 
     # return the first enabled xmpp service settings in the config file
-    for settings in config.Notifications["Services"].itervalues():
+    for key, settings in config.Notifications.Services.iteritems():
         if (settings["Service"] == "twistedcaldav.notify.XMPPNotifierService"
             and settings["Enabled"]):
             results['enabled'] = True
             results['service'] = settings['ServiceAddress']
             results['host'] = config.ServerHostName
             results['port'] = config.SSLPort or config.HTTPPort
+            results['xmpp-server'] = settings['Host']
+            results['heartrate'] = settings['HeartbeatMinutes']
 
     return results
 
 def getPubSubPath(uri, pubSubConfiguration):
-    return ("/Public/CalDAV/%s/%d/%s/" % (pubSubConfiguration['host'],
-        pubSubConfiguration['port'], uri.strip("/")))
+    path = "/Public/CalDAV/%s/%d/" % (pubSubConfiguration['host'],
+        pubSubConfiguration['port'])
+    if uri:
+        path += "%s/" % (uri.strip("/"),)
+    return path
 
 def getPubSubXMPPURI(uri, pubSubConfiguration):
     return "xmpp:%s?pubsub;node=%s" % (pubSubConfiguration['service'],
         getPubSubPath(uri, pubSubConfiguration))
 
+def getPubSubHeartbeatURI(pubSubConfiguration):
+    return "xmpp:%s?pubsub;node=%s" % (pubSubConfiguration['service'],
+        getPubSubPath("", pubSubConfiguration))
+
 #
 # Notification Server service config
 #
@@ -861,19 +1292,31 @@
 
     def makeService(self, options):
 
+        #
+        # Configure Memcached Client Pool
+        #
+        if config.Memcached.ClientEnabled:
+            memcachepool.installPool(
+                IPv4Address(
+                    'TCP',
+                    config.Memcached.BindAddress,
+                    config.Memcached.Port),
+                config.Memcached.MaxClients)
+
         multiService = service.MultiService()
 
         notifiers = []
-        for settings in config.Notifications["Services"].itervalues():
+        for key, settings in config.Notifications.Services.iteritems():
             if settings["Enabled"]:
                 notifier = namedClass(settings["Service"])(settings)
                 notifier.setServiceParent(multiService)
                 notifiers.append(notifier)
 
         internet.TCPServer(
-            config.Notifications["InternalNotificationPort"],
+            config.Notifications.InternalNotificationPort,
             InternalNotificationFactory(notifiers,
-                delaySeconds=config.Notifications["CoalesceSeconds"])
+                delaySeconds=config.Notifications.CoalesceSeconds),
+            interface=config.Notifications.BindAddress
         ).setServiceParent(multiService)
 
         return multiService
@@ -886,8 +1329,8 @@
         self.server = internet.TCPServer(settings["Port"],
             SimpleLineNotificationFactory(self.notifier))
 
-    def enqueue(self, uri):
-        self.notifier.enqueue(uri)
+    def enqueue(self, op, uri):
+        self.notifier.enqueue(op, uri)
 
     def startService(self):
         self.server.startService()
@@ -900,12 +1343,19 @@
 
     def __init__(self, settings):
         self.notifier = XMPPNotifier(settings)
-        self.client = internet.TCPClient(settings["Host"], settings["Port"],
-            XMPPNotificationFactory(self.notifier, settings))
 
-    def enqueue(self, uri):
-        self.notifier.enqueue(uri)
+        if settings["Port"] == 5223: # use old SSL method
+            self.client = internet.SSLClient(settings["Host"], settings["Port"],
+                XMPPNotificationFactory(self.notifier, settings),
+                ClientContextFactory())
+        else:
+            # TLS and SASL
+            self.client = internet.TCPClient(settings["Host"], settings["Port"],
+                XMPPNotificationFactory(self.notifier, settings))
 
+    def enqueue(self, op, uri):
+        self.notifier.enqueue(op, uri)
+
     def startService(self):
         self.client.startService()
 

Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/static.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/static.py	2009-07-15 01:21:01 UTC (rev 4459)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/static.py	2009-07-15 01:25:22 UTC (rev 4460)
@@ -74,7 +74,10 @@
 from twistedcaldav.timezoneservice import TimezoneServiceResource
 from twistedcaldav.cache import DisabledCacheNotifier, PropfindCacheMixin
 from twistedcaldav.notify import getPubSubConfiguration, getPubSubXMPPURI
+from twistedcaldav.notify import getPubSubHeartbeatURI, getPubSubPath
+from twistedcaldav.notify import ClientNotifier, getNodeCacher
 
+
 log = Logger()
 
 class CalDAVFile (CalDAVResource, DAVFile):
@@ -381,6 +384,12 @@
         except:
             return fail(Failure())
 
+        if hasattr(self, 'clientNotifier'):
+            self.clientNotifier.notify(op="update")
+        else:
+            log.debug("%r does not have a clientNotifier but the CTag changed"
+                      % (self,))
+
         if hasattr(self, 'cacheNotifier'):
             return self.cacheNotifier.changed()
         else:
@@ -650,6 +659,8 @@
 
     liveProperties = CalDAVFile.liveProperties + (
         (customxml.calendarserver_namespace, "xmpp-uri"),
+        (customxml.calendarserver_namespace, "xmpp-heartbeat-uri"),
+        (customxml.calendarserver_namespace, "xmpp-server"),
     )
 
     def __init__(self, path, parent, record):
@@ -657,6 +668,7 @@
         @param path: the path to the file which will back the resource.
         """
         self.cacheNotifier = self.cacheNotifierFactory(self)
+        self.clientNotifier = ClientNotifier(self)
         CalDAVFile.__init__(self, path)
         DirectoryCalendarHomeResource.__init__(self, parent, record)
 
@@ -675,6 +687,7 @@
         if cls is not None:
             child = cls(self.fp.child(name).path, self)
             child.cacheNotifier = self.cacheNotifier
+            child.clientNotifier = self.clientNotifier
             return child
 
         return self.createSimilarFile(self.fp.child(name).path)
@@ -685,6 +698,7 @@
         else:
             similar = CalDAVFile(path, principalCollections=self.principalCollections())
             similar.cacheNotifier = self.cacheNotifier
+            similar.clientNotifier = self.clientNotifier
             return similar
 
     def getChild(self, name):
@@ -701,14 +715,50 @@
         else:
             qname = property.qname()
 
+        def doneWaiting(result, propVal):
+            return propVal
+
         if qname == (customxml.calendarserver_namespace, "xmpp-uri"):
             pubSubConfiguration = getPubSubConfiguration(config)
             if pubSubConfiguration['enabled']:
-                return succeed(customxml.PubSubXMPPURIProperty(
-                    getPubSubXMPPURI(self.url(), pubSubConfiguration)))
+                if getattr(self, "clientNotifier", None) is not None:
+                    url = self.url()
+                    nodeName = getPubSubPath(url, pubSubConfiguration)
+                    propVal = customxml.PubSubXMPPURIProperty(
+                        getPubSubXMPPURI(url, pubSubConfiguration))
+                    nodeCacher = getNodeCacher()
+                    d = nodeCacher.waitForNode(self.clientNotifier, nodeName)
+                    # In either case we're going to return the xmpp-uri value
+                    d.addCallback(doneWaiting, propVal)
+                    d.addErrback(doneWaiting, propVal)
+                    return d
             else:
                 return succeed(customxml.PubSubXMPPURIProperty())
 
+        elif qname == (customxml.calendarserver_namespace, "xmpp-heartbeat-uri"):
+            pubSubConfiguration = getPubSubConfiguration(config)
+            if pubSubConfiguration['enabled']:
+                return succeed(
+                    customxml.PubSubHeartbeatProperty(
+                        customxml.PubSubHeartbeatURIProperty(
+                            getPubSubHeartbeatURI(pubSubConfiguration)
+                        ),
+                        customxml.PubSubHeartbeatMinutesProperty(
+                            str(pubSubConfiguration['heartrate'])
+                        )
+                    )
+                )
+            else:
+                return succeed(customxml.PubSubHeartbeatURIProperty())
+
+        elif qname == (customxml.calendarserver_namespace, "xmpp-server"):
+            pubSubConfiguration = getPubSubConfiguration(config)
+            if pubSubConfiguration['enabled']:
+                return succeed(customxml.PubSubXMPPServerProperty(
+                    pubSubConfiguration['xmpp-server']))
+            else:
+                return succeed(customxml.PubSubXMPPServerProperty())
+
         return super(CalendarHomeFile, self).readProperty(property, request)
 
 
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20090714/53f8416e/attachment-0001.html>


More information about the calendarserver-changes mailing list