[CalendarServer-changes] [4460] CalendarServer/branches/users/wsanchez/deployment/twistedcaldav
source_changes at macosforge.org
source_changes at macosforge.org
Tue Jul 14 18:25:22 PDT 2009
Revision: 4460
http://trac.macosforge.org/projects/calendarserver/changeset/4460
Author: wsanchez at apple.com
Date: 2009-07-14 18:25:22 -0700 (Tue, 14 Jul 2009)
Log Message:
-----------
Backport of XMPP notifications to deployment branch
Modified Paths:
--------------
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cache.py
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/config.py
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/customxml.py
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/directory/calendar.py
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/notify.py
CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/static.py
Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cache.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cache.py 2009-07-15 01:21:01 UTC (rev 4459)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/cache.py 2009-07-15 01:25:22 UTC (rev 4460)
@@ -31,9 +31,7 @@
from twistedcaldav.memcachepool import CachePoolUserMixIn
from twistedcaldav.config import config
-from twistedcaldav.notify import NotificationClientUserMixIn
-
class DisabledCacheNotifier(object):
def __init__(self, *args, **kwargs):
pass
@@ -65,13 +63,7 @@
-#
-# FIXME: This should be a generic notifier class, not specific to
-# memcache, as evidenced by the addition of the sendNotification()
-# addition in changed() below.
-#
-class MemcacheChangeNotifier(LoggingMixIn, CachePoolUserMixIn,
- NotificationClientUserMixIn):
+class MemcacheChangeNotifier(LoggingMixIn, CachePoolUserMixIn):
def __init__(self, resource, cachePool=None):
self._resource = resource
@@ -91,10 +83,6 @@
url = self._resource.url()
- if config.Notifications["Enabled"]:
- self.log_debug("Notifications are enabled: %s" % (url,))
- self.sendNotification(url)
-
self.log_debug("Changing Cache Token for %r" % (url,))
return self.getCachePool().set(
'cacheToken:%s' % (url,),
Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/config.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/config.py 2009-07-15 01:21:01 UTC (rev 4459)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/config.py 2009-07-15 01:25:22 UTC (rev 4460)
@@ -572,19 +572,6 @@
service["Service"] == "twistedcaldav.notify.XMPPNotifierService" and
service["Enabled"]
):
- # Get password from keychain. If not there, fall back to what
- # is in the plist.
- try:
- password = getPasswordFromKeychain(service["JID"])
- service["Password"] = password
- log.info("XMPP password successfully retreived from keychain")
- except KeychainAccessError:
- # The system doesn't support keychain
- pass
- except KeychainPasswordNotFound:
- # The password doesn't exist in the keychain.
- log.error("XMPP password not found in keychain")
-
# Check for empty fields
for key, value in service.iteritems():
if not value and key not in ("AllowedJIDs", "HeartbeatMinutes", "Password"):
Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/customxml.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/customxml.py 2009-07-15 01:21:01 UTC (rev 4459)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/customxml.py 2009-07-15 01:25:22 UTC (rev 4460)
@@ -277,9 +277,47 @@
namespace = calendarserver_namespace
name = "xmpp-uri"
protected = True
+ hidden = True
+class PubSubHeartbeatProperty (davxml.WebDAVElement):
+ """
+ A calendarhomefile property to indicate the pubsub XMPP URI to subscribe to
+ for server heartbeats.
+ """
+ namespace = calendarserver_namespace
+ name = "xmpp-heartbeat"
+ protected = True
+ hidden = True
+ allowed_children = {
+ (calendarserver_namespace, "xmpp-heartbeat-uri" ) : (1, 1),
+ (calendarserver_namespace, "xmpp-heartbeat-minutes" ) : (1, 1),
+ }
+class PubSubHeartbeatURIProperty (davxml.WebDAVTextElement):
+ namespace = calendarserver_namespace
+ name = "xmpp-heartbeat-uri"
+ protected = True
+ hidden = True
+class PubSubHeartbeatMinutesProperty (davxml.WebDAVTextElement):
+ namespace = calendarserver_namespace
+ name = "xmpp-heartbeat-minutes"
+ protected = True
+ hidden = True
+
+class PubSubXMPPServerProperty (davxml.WebDAVTextElement):
+ """
+ A calendarhomefile property to indicate the pubsub XMPP hostname to
+ contact for notifications.
+ """
+ namespace = calendarserver_namespace
+ name = "xmpp-server"
+ protected = True
+ hidden = True
+
+
+
+
##
# Extensions to davxml.ResourceType
##
Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/directory/calendar.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/directory/calendar.py 2009-07-15 01:21:01 UTC (rev 4459)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/directory/calendar.py 2009-07-15 01:25:22 UTC (rev 4460)
@@ -274,13 +274,25 @@
self.putChild(name, child)
def provisionDefaultCalendars(self):
- self.provision()
- childName = "calendar"
- childURL = joinURL(self.url(), childName)
- child = self.provisionChild(childName)
- assert isinstance(child, CalDAVResource), "Child %r is not a %s: %r" % (childName, CalDAVResource.__name__, child)
+ # Disable notifications during provisioning
+ if hasattr(self, "clientNotifier"):
+ self.clientNotifier.disableNotify()
+ try:
+ self.provision()
+
+ childName = "calendar"
+ childURL = joinURL(self.url(), childName)
+ child = self.provisionChild(childName)
+ assert isinstance(child, CalDAVResource), "Child %r is not a %s: %r" % (childName, CalDAVResource.__name__, child)
+ except:
+ # We want to make sure to re-enable notifications, so do so
+ # if there is an immediate exception above, or via errback, below
+ if hasattr(self, "clientNotifier"):
+ self.clientNotifier.enableNotify(None)
+ raise
+
def setupChild(_):
# Set calendar-free-busy-set on inbox
inbox = self.getChild("inbox")
@@ -295,6 +307,9 @@
d = child.createCalendarCollection()
d.addCallback(setupChild)
+ if hasattr(self, "clientNotifier"):
+ d.addCallback(self.clientNotifier.enableNotify)
+ d.addErrback(self.clientNotifier.enableNotify)
return d
def provisionChild(self, name):
Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/notify.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/notify.py 2009-07-15 01:21:01 UTC (rev 4459)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/notify.py 2009-07-15 01:25:22 UTC (rev 4460)
@@ -32,28 +32,37 @@
These notifications originate from cache.py:MemcacheChangeNotifier.changed().
"""
-# TODO: bindAddress to local
# TODO: add CalDAVTester test for examining new xmpp-uri property
-# TODO: auto-registration and roster management for XMPP
-from twisted.internet import protocol
-from twisted.protocols import basic
+from twisted.internet.protocol import ReconnectingClientFactory, ServerFactory
+from twisted.internet.address import IPv4Address
+from twisted.internet.ssl import ClientContextFactory
+from twisted.internet.defer import inlineCallbacks, Deferred
+from twisted.protocols.basic import LineReceiver
from twisted.plugin import IPlugin
from twisted.application import internet, service
from twisted.python.usage import Options, UsageError
from twisted.python.reflect import namedClass
from twisted.words.protocols.jabber import xmlstream
from twisted.words.protocols.jabber.jid import JID
-from twisted.words.protocols.jabber.client import BasicAuthenticator, IQ
+from twisted.words.protocols.jabber.client import XMPPAuthenticator, IQAuthInitializer
+from twisted.words.protocols.jabber.xmlstream import IQ
from twisted.words.xish import domish
from twistedcaldav.log import LoggingMixIn
from twistedcaldav.config import config, parseConfig, defaultConfig
+from twistedcaldav.memcacher import Memcacher
+from twistedcaldav import memcachepool
from zope.interface import Interface, implements
+from fnmatch import fnmatch
+import uuid
__all__ = [
+ "ClientNotifier",
"Coalescer",
+ "getNodeCacher",
"getNotificationClient",
"getPubSubConfiguration",
+ "getPubSubHeartbeatURI",
"getPubSubPath",
"getPubSubXMPPURI",
"INotifier",
@@ -63,8 +72,6 @@
"NotificationClient",
"NotificationClientFactory",
"NotificationClientLineProtocol",
- "NotificationClientUserMixIn",
- "NotificationOptions",
"NotificationServiceMaker",
"SimpleLineNotificationFactory",
"SimpleLineNotificationProtocol",
@@ -78,19 +85,38 @@
# Classes used within calendarserver itself
#
-class NotificationClientUserMixIn(object):
+class ClientNotifier(LoggingMixIn):
"""
- Notification Client User (Mixin)
-
Provides a method to send change notifications to the L{NotificationClient}.
"""
- def sendNotification(self, uri):
- getNotificationClient().send(uri)
+ def __init__(self, resource, configOverride=None):
+ self._resource = resource
+ self._notify = True
+ self.config = configOverride or config
+ def enableNotify(self, arg):
+ url = self._resource.url()
+ self.log_debug("enableNotify: %s" % (url,))
+ self._notify = True
+ def disableNotify(self):
+ url = self._resource.url()
+ self.log_debug("disableNotify: %s" % (url,))
+ self._notify = False
-class NotificationClientLineProtocol(basic.LineReceiver, LoggingMixIn):
+ def notify(self, op="update"):
+ url = self._resource.url()
+
+ if self.config.Notifications.Enabled:
+ if self._notify:
+ self.log_debug("Notifications are enabled: %s %s" % (op, url))
+ return getNotificationClient().send(op, url)
+ else:
+ self.log_debug("Skipping notification for: %s" % (url,))
+
+
+class NotificationClientLineProtocol(LineReceiver, LoggingMixIn):
"""
Notification Client Line Protocol
@@ -105,7 +131,7 @@
self.client.removeObserver(self)
-class NotificationClientFactory(protocol.ReconnectingClientFactory,
+class NotificationClientFactory(ReconnectingClientFactory,
LoggingMixIn):
"""
Notification Client Factory
@@ -120,18 +146,14 @@
self.client = client
def clientConnectionLost(self, connector, reason):
- self.log_error("Connect to notification server lost: %s" %
- (reason,))
+ self.log_error("Connect to notification server lost: %s" % (reason,))
self.connected = False
- protocol.ReconnectingClientFactory.clientConnectionLost(self,
- connector, reason)
+ ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
def clientConnectionFailed(self, connector, reason):
- self.log_error("Unable to connect to notification server: %s" %
- (reason,))
+ self.log_error("Unable to connect to notification server: %s" % (reason,))
self.connected = False
- protocol.ReconnectingClientFactory.clientConnectionFailed(self,
- connector, reason)
+ ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
def connectionMade(self):
self.connected = True
@@ -152,7 +174,7 @@
"""
Notification Client
- Forwards on notifications from NotificationClientUserMixIns to the
+ Forwards on notifications from ClientNotifiers to the
notification server. A NotificationClient is installed by the tap at
startup.
"""
@@ -168,26 +190,27 @@
from twisted.internet import reactor
self.reactor = reactor
- def send(self, uri):
+ def send(self, op, uri):
if self.factory is None:
self.factory = NotificationClientFactory(self)
self.reactor.connectTCP(self.host, self.port, self.factory)
self.log_debug("Creating factory")
+ msg = "%s %s" % (op, str(uri))
if self.factory.isReady() and self.observers:
for observer in self.observers:
- self.log_debug("Sending to notification server: %s" % (uri,))
- observer.sendLine(str(uri))
+ self.log_debug("Sending to notification server: %s" % (msg,))
+ observer.sendLine(msg)
else:
- self.log_debug("Queing: %s" % (uri,))
- self.queued.add(uri)
+ self.log_debug("Queuing: %s" % (msg,))
+ self.queued.add(msg)
def connectionMade(self):
if self.factory.isReady() and self.observers:
for observer in self.observers:
- for uri in self.queued:
- self.log_debug("Sending from queue: %s" % (uri,))
- observer.sendLine(str(uri))
+ for msg in self.queued:
+ self.log_debug("Sending from queue: %s" % (msg,))
+ observer.sendLine(msg)
self.queued.clear()
def addObserver(self, observer):
@@ -208,10 +231,62 @@
+class NodeCreationException(Exception):
+ pass
+class NodeCacher(Memcacher, LoggingMixIn):
+ def __init__(self, reactor=None):
+ if reactor is None:
+ from twisted.internet import reactor
+ self.reactor = reactor
+ super(NodeCacher, self).__init__("pubsubnodes")
+ def nodeExists(self, nodeName):
+ return self.get(nodeName)
+ def storeNode(self, nodeName):
+ return self.set(nodeName, "1")
+
+ @inlineCallbacks
+ def waitForNode(self, notifier, nodeName):
+ retryCount = 0
+ verified = False
+ requestedCreation = False
+ while(retryCount < 5):
+ if (yield self.nodeExists(nodeName)):
+ verified = True
+ break
+
+ if not requestedCreation:
+ notifier.notify(op="create")
+ requestedCreation = True
+
+ retryCount += 1
+
+ pause = Deferred()
+ def _timedDeferred():
+ pause.callback(True)
+ self.reactor.callLater(1, _timedDeferred)
+ yield pause
+
+ if not verified:
+ self.log_debug("Giving up!")
+ raise NodeCreationException("Could not create node %s" % (nodeName,))
+
+
+_nodeCacher = None
+
+def getNodeCacher():
+ global _nodeCacher
+ if _nodeCacher is None:
+ _nodeCacher = NodeCacher()
+ return _nodeCacher
+
+
+
+
+
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Classes used within Notification Server
#
@@ -220,7 +295,7 @@
# Internal Channel (from icalserver to notification server)
#
-class InternalNotificationProtocol(basic.LineReceiver):
+class InternalNotificationProtocol(LineReceiver):
"""
InternalNotificationProtocol
@@ -228,11 +303,11 @@
"""
def lineReceived(self, line):
- val = str(line.strip())
- self.factory.coalescer.add(val)
+ op, uri = line.strip().split()
+ self.factory.coalescer.add(op, uri)
-class InternalNotificationFactory(protocol.ServerFactory):
+class InternalNotificationFactory(ServerFactory):
"""
Internal Notification Factory
@@ -257,30 +332,55 @@
"""
delaySeconds = 5
+ sendAnywayAfterCount = 5
- def __init__(self, notifiers, reactor=None, delaySeconds=None):
+ def __init__(self, notifiers, reactor=None, delaySeconds=None,
+ sendAnywayAfterCount=None):
- if delaySeconds:
+ if sendAnywayAfterCount:
+ self.sendAnywayAfterCount = sendAnywayAfterCount
+
+ if delaySeconds is not None:
self.delaySeconds = delaySeconds
if reactor is None:
from twisted.internet import reactor
self.reactor = reactor
- self.uris = dict()
+ self.uris = {}
self.notifiers = notifiers
- def add(self, uri):
- delayed = self.uris.get(uri, None)
- if delayed and delayed.active():
- delayed.reset(self.delaySeconds)
- else:
- self.uris[uri] = self.reactor.callLater(self.delaySeconds,
- self.delayedEnqueue, uri)
+ def add(self, op, uri):
- def delayedEnqueue(self, uri):
+ if op == "create":
+ # we don't want to delay a "create" notification; this opcode
+ # is meant for XMPP pubsub -- it means create and configure the
+ # node but don't publish to it
+ for notifier in self.notifiers:
+ notifier.enqueue(op, uri)
+
+ else: # normal update notification
+ delayed, count = self.uris.get(uri, [None, 0])
+
+ if delayed and delayed.active():
+ count += 1
+ if count < self.sendAnywayAfterCount:
+ # reschedule for delaySeconds in the future
+ delayed.reset(self.delaySeconds)
+ self.uris[uri][1] = count
+ self.log_debug("Delaying: %s" % (uri,))
+ else:
+ self.log_debug("Not delaying to avoid starvation: %s" % (uri,))
+ else:
+ self.log_debug("Scheduling: %s" % (uri,))
+ self.uris[uri] = [self.reactor.callLater(self.delaySeconds,
+ self.delayedEnqueue, op, uri), 0]
+
+ def delayedEnqueue(self, op, uri):
+ self.log_debug("Time to send: %s" % (uri,))
+ self.uris[uri][1] = 0
for notifier in self.notifiers:
- notifier.enqueue(uri)
+ notifier.enqueue(op, uri)
@@ -295,11 +395,12 @@
Defines an enqueue method that Notifier classes need to implement.
"""
- def enqueue(uri):
+ def enqueue(self, op, uri):
"""
Let's the notifier object know that a change has been made for this
uri, and enough time has passed to allow for coalescence.
+ @type op: C{str}
@type uri: C{str}
"""
@@ -333,18 +434,20 @@
self.observers = set()
self.sentReset = False
- def enqueue(self, uri):
+ def enqueue(self, op, uri):
- self.latestSeq += 1L
+ if op == "update":
- # Update history
- self.history[uri] = self.latestSeq
+ self.latestSeq += 1L
- for observer in self.observers:
- msg = "%d %s" % (self.latestSeq, uri)
- self.log_debug("Sending %s" % (msg,))
- observer.sendLine(msg)
+ # Update history
+ self.history[uri] = self.latestSeq
+ for observer in self.observers:
+ msg = "%d %s" % (self.latestSeq, uri)
+ self.log_debug("Sending %s" % (msg,))
+ observer.sendLine(msg)
+
def reset(self):
self.latestSeq = 0L
self.history = { } # keys=uri, values=sequenceNumber
@@ -374,7 +477,7 @@
self.sentReset = True
-class SimpleLineNotificationProtocol(basic.LineReceiver, LoggingMixIn):
+class SimpleLineNotificationProtocol(LineReceiver, LoggingMixIn):
"""
Simple Line Notification Protocol
@@ -412,7 +515,7 @@
self.notifier.removeObserver(self)
-class SimpleLineNotificationFactory(protocol.ServerFactory):
+class SimpleLineNotificationFactory(ServerFactory):
"""
Simple Line Notification Factory
@@ -435,8 +538,6 @@
-
-
class XMPPNotifier(LoggingMixIn):
"""
XMPP Notifier
@@ -451,104 +552,187 @@
create the node and then go through the configuration process,
followed by a publish retry.
- For monitoring purposes, you can specify a "TestJID" value in
- the config file; XMPPNotifier will send error messages to that
+ For monitoring purposes, you can subscribe to the server's JID
+ as long as your own JID matches the "AllowedJIDs" pattern(s) in
+ the config file; XMPPNotifier will send error messages to your
JID. If you also want to receive non-error, debug messages,
send the calendar server JID the message, "debug on". Send
- "help" for other commands. Note, XMPPNotifier doesn't yet
- handle registration or roster management, so you'll need to set
- up the JID accounts out-of-band, using another XMPP client, for
- example.
+ "help" for other commands.
+ To let clients know that the notifications from the calendar server
+ are still flowing, a "heartbeat" node is published to every 30
+ minutes (configurable).
+
"""
implements(INotifier)
pubsubNS = 'http://jabber.org/protocol/pubsub'
- nodeConf = {
- 'pubsub#deliver_payloads': '0',
- 'pubsub#persist_items' : '0',
- }
-
- def __init__(self, settings, reactor=None, configOverride=None):
+ def __init__(self, settings, reactor=None, configOverride=None,
+ heartbeat=True, roster=True):
self.xmlStream = None
self.settings = settings
if reactor is None:
from twisted.internet import reactor
self.reactor = reactor
self.config = configOverride or config
+ self.doHeartbeat = heartbeat and self.settings['HeartbeatMinutes'] != 0
+ self.doRoster = roster
- self.sendDebugMessages = False
+ self.roster = {}
+ self.outstanding = {}
- def enqueue(self, uri):
+ def lockNode(self, nodeName):
+ if self.outstanding.has_key(nodeName):
+ return False
+ else:
+ self.outstanding[nodeName] = 1
+ return True
+
+ def unlockNode(self, failure, nodeName):
+ try:
+ del self.outstanding[nodeName]
+ except KeyError:
+ pass
+
+ def sendHeartbeat(self):
+ if self.doHeartbeat and self.xmlStream is not None:
+ self.enqueue("update", "", lock=False)
+ self.reactor.callLater(self.settings['HeartbeatMinutes'] * 60,
+ self.sendHeartbeat)
+
+ def enqueue(self, op, uri, lock=True):
if self.xmlStream is not None:
# Convert uri to node
nodeName = self.uriToNodeName(uri)
- self.publishNode(nodeName)
+ if op == "create":
+ if not self.lockNode(nodeName):
+ # this node is busy, so it must already be created, or at
+ # least in the proccess
+ return
+ self.createNode(nodeName, publish=False)
+ else:
+ self.publishNode(nodeName, lock=lock)
def uriToNodeName(self, uri):
return getPubSubPath(uri, getPubSubConfiguration(self.config))
- def publishNode(self, nodeName):
- if self.xmlStream is not None:
+ def publishNode(self, nodeName, lock=True):
+ if self.xmlStream is None:
+ # We lost our connection
+ self.unlockNode(None, nodeName)
+ return
+
+ try:
+ if lock and not self.lockNode(nodeName):
+ return
+
iq = IQ(self.xmlStream)
pubsubElement = iq.addElement('pubsub', defaultUri=self.pubsubNS)
publishElement = pubsubElement.addElement('publish')
publishElement['node'] = nodeName
- iq.addCallback(self.responseFromPublish, nodeName)
- iq.send(to=self.settings['ServiceAddress'])
+ if self.settings["NodeConfiguration"]["pubsub#deliver_payloads"] == '1':
+ itemElement = publishElement.addElement('item')
+ payloadElement = itemElement.addElement('plistfrag',
+ defaultUri='plist-apple')
- def responseFromPublish(self, nodeName, iq):
- if iq['type'] == 'result':
- self.sendDebug("Node publish successful (%s)" % (nodeName,), iq)
- else:
- self.log_error("PubSub node publish error: %s" %
- (iq.toXml().encode('ascii', 'replace')),)
- self.sendDebug("Node publish failed (%s)" % (nodeName,), iq)
+ self.sendDebug("Publishing (%s)" % (nodeName,), iq)
+ d = iq.send(to=self.settings['ServiceAddress'])
+ d.addCallback(self.publishNodeSuccess, nodeName)
+ d.addErrback(self.publishNodeFailure, nodeName)
+ except:
+ self.unlockNode(None, nodeName)
+ raise
- errorElement = None
- pubsubElement = None
- for child in iq.elements():
- if child.name == 'error':
- errorElement = child
- if child.name == 'pubsub':
- pubsubElement = child
+ def publishNodeSuccess(self, iq, nodeName):
+ self.unlockNode(None, nodeName)
+ self.sendDebug("Node publish successful (%s)" % (nodeName,), iq)
- if errorElement:
- if errorElement['code'] == '400':
- self.requestConfigurationForm(nodeName)
+ def publishNodeFailure(self, result, nodeName):
+ try:
+ iq = result.value.getElement()
- elif errorElement['code'] == '404':
+ if iq.name == "error":
+ if iq['code'] == '400':
+ self.requestConfigurationForm(nodeName, True)
+
+ elif iq['code'] == '404':
self.createNode(nodeName)
+ else:
+ self.log_error("PubSub node publish error: %s" %
+ (iq.toXml().encode('ascii', 'replace')),)
+ self.sendDebug("Node publish failed (%s)" % (nodeName,), iq)
+ # Don't know how to proceed
+ self.unlockNode(None, nodeName)
+ except:
+ self.unlockNode(None, nodeName)
+ raise
- def createNode(self, nodeName):
- if self.xmlStream is not None:
+ def createNode(self, nodeName, publish=True):
+ if self.xmlStream is None:
+ # We lost our connection
+ self.unlockNode(None, nodeName)
+ return
+
+ try:
iq = IQ(self.xmlStream)
pubsubElement = iq.addElement('pubsub', defaultUri=self.pubsubNS)
child = pubsubElement.addElement('create')
child['node'] = nodeName
- iq.addCallback(self.responseFromCreate, nodeName)
- iq.send(to=self.settings['ServiceAddress'])
+ d = iq.send(to=self.settings['ServiceAddress'])
+ d.addCallback(self.createNodeSuccess, nodeName, publish)
+ d.addErrback(self.createNodeFailure, nodeName, publish)
+ except:
+ self.unlockNode(None, nodeName)
+ raise
- def responseFromCreate(self, nodeName, iq):
- if iq['type'] == 'result':
+ def createNodeSuccess(self, iq, nodeName, publish):
+ try:
self.sendDebug("Node creation successful (%s)" % (nodeName,), iq)
# now time to configure; fetch the form
- self.requestConfigurationForm(nodeName)
- else:
- self.log_error("PubSub node creation error: %s" %
- (iq.toXml().encode('ascii', 'replace')),)
- self.sendError("Node creation failed (%s)" % (nodeName,), iq)
+ self.requestConfigurationForm(nodeName, publish)
+ except:
+ self.unlockNode(None, nodeName)
+ raise
- def requestConfigurationForm(self, nodeName):
- if self.xmlStream is not None:
- iq = IQ(self.xmlStream, type='get')
- child = iq.addElement('pubsub', defaultUri=self.pubsubNS+"#owner")
+ def createNodeFailure(self, result, nodeName, publish):
+ try:
+ iq = result.value.getElement()
+ if iq['code'] == '409':
+ # node already exists, proceed to configure
+ self.sendDebug("Node already exists (%s)" % (nodeName,), iq)
+ self.requestConfigurationForm(nodeName, publish)
+ else:
+ # couldn't create node, give up
+ self.unlockNode(None, nodeName)
+ self.log_error("PubSub node creation error: %s" %
+ (iq.toXml().encode('ascii', 'replace')),)
+ self.sendError("Node creation failed (%s)" % (nodeName,), iq)
+ except:
+ self.unlockNode(None, nodeName)
+ raise
+
+ def requestConfigurationForm(self, nodeName, publish):
+ if self.xmlStream is None:
+ # We lost our connection
+ self.unlockNode(None, nodeName)
+ return
+
+ try:
+ # XXX This codepath is not unit tested
+ iq = IQ(self.xmlStream, 'get')
+ child = iq.addElement('pubsub',
+ defaultUri=self.pubsubNS+"#owner")
child = child.addElement('configure')
child['node'] = nodeName
- iq.addCallback(self.responseFromConfigurationForm, nodeName)
- iq.send(to=self.settings['ServiceAddress'])
+ d = iq.send(to=self.settings['ServiceAddress'])
+ d.addCallback(self.requestConfigurationFormSuccess, nodeName,
+ publish)
+ d.addErrback(self.requestConfigurationFormFailure, nodeName)
+ except:
+ self.unlockNode(None, nodeName)
+ raise
def _getChild(self, element, name):
for child in element.elements():
@@ -556,8 +740,14 @@
return child
return None
- def responseFromConfigurationForm(self, nodeName, iq):
- if iq['type'] == 'result':
+ def requestConfigurationFormSuccess(self, iq, nodeName, publish):
+ if self.xmlStream is None:
+ # We lost our connection
+ self.unlockNode(None, nodeName)
+ return
+
+ try:
+ nodeConf = self.settings["NodeConfiguration"]
self.sendDebug("Received configuration form (%s)" % (nodeName,), iq)
pubsubElement = self._getChild(iq, 'pubsub')
if pubsubElement:
@@ -566,7 +756,7 @@
formElement = configureElement.firstChildElement()
if formElement['type'] == 'form':
# We've found the form; start building a response
- filledIq = IQ(self.xmlStream, type='set')
+ filledIq = IQ(self.xmlStream, 'set')
filledPubSub = filledIq.addElement('pubsub',
defaultUri=self.pubsubNS+"#owner")
filledConfigure = filledPubSub.addElement('configure')
@@ -575,64 +765,238 @@
defaultUri='jabber:x:data')
filledForm['type'] = 'submit'
+ configMatches = True
for field in formElement.elements():
if field.name == 'field':
var = field['var']
if var == "FORM_TYPE":
filledForm.addChild(field)
else:
- value = self.nodeConf.get(var, None)
- if value is not None:
+ value = nodeConf.get(var, None)
+ if (value is not None and
+ (str(self._getChild(field,
+ "value")) != value)):
+ # this field needs configuring
+ configMatches = False
filledField = filledForm.addElement('field')
filledField['var'] = var
filledField['type'] = field['type']
valueElement = filledField.addElement('value')
valueElement.addContent(value)
- filledForm.addChild(field)
- filledIq.addCallback(self.responseFromConfiguration,
- nodeName)
- self.sendDebug("Sending configuration form (%s)"
- % (nodeName,), filledIq)
- filledIq.send(to=self.settings['ServiceAddress'])
- else:
+ # filledForm.addChild(field)
+ if configMatches:
+ # XXX This codepath is not unit tested
+ cancelIq = IQ(self.xmlStream, 'set')
+ cancelPubSub = cancelIq.addElement('pubsub',
+ defaultUri=self.pubsubNS+"#owner")
+ cancelConfig = cancelPubSub.addElement('configure')
+ cancelConfig['node'] = nodeName
+ cancelX = cancelConfig.addElement('x',
+ defaultUri='jabber:x:data')
+ cancelX['type'] = 'cancel'
+ self.sendDebug("Cancelling configuration (%s)"
+ % (nodeName,), cancelIq)
+ d = cancelIq.send(to=self.settings['ServiceAddress'])
+ else:
+ self.sendDebug("Sending configuration form (%s)"
+ % (nodeName,), filledIq)
+ d = filledIq.send(to=self.settings['ServiceAddress'])
+ d.addCallback(self.configurationSuccess, nodeName,
+ publish)
+ d.addErrback(self.configurationFailure, nodeName)
+ return
+
+ # Couldn't process configuration form, give up
+ self.unlockNode(None, nodeName)
+
+ except:
+ # Couldn't process configuration form, give up
+ self.unlockNode(None, nodeName)
+ raise
+
+ def requestConfigurationFormFailure(self, result, nodeName):
+ # If we get here we're giving up
+ try:
+ iq = result.value.getElement()
self.log_error("PubSub configuration form request error: %s" %
(iq.toXml().encode('ascii', 'replace')),)
- self.sendError("Failed to receive configuration form (%s)" % (nodeName,), iq)
+ self.sendError("Failed to receive configuration form (%s)" %
+ (nodeName,), iq)
+ finally:
+ self.unlockNode(None, nodeName)
+ def configurationSuccess(self, iq, nodeName, publish):
+ if self.xmlStream is None:
+ # We lost our connection
+ self.unlockNode(None, nodeName)
+ return
- def responseFromConfiguration(self, nodeName, iq):
- if iq['type'] == 'result':
+ try:
self.log_debug("PubSub node %s is configured" % (nodeName,))
self.sendDebug("Configured node (%s)" % (nodeName,), iq)
- self.publishNode(nodeName)
+ nodeCacher = getNodeCacher()
+ nodeCacher.storeNode(nodeName)
+ if publish:
+ self.publishNode(nodeName, lock=False)
+ else:
+ self.unlockNode(None, nodeName)
+ except:
+ self.unlockNode(None, nodeName)
+ raise
- else:
+ def configurationFailure(self, result, nodeName):
+ # If we get here we're giving up
+ try:
+ iq = result.value.getElement()
self.log_error("PubSub node configuration error: %s" %
(iq.toXml().encode('ascii', 'replace')),)
self.sendError("Failed to configure node (%s)" % (nodeName,), iq)
+ finally:
+ self.unlockNode(None, nodeName)
+ def deleteNode(self, nodeName):
+ if self.xmlStream is None:
+ # We lost our connection
+ self.unlockNode(None, nodeName)
+ return
+ try:
+ if not self.lockNode(nodeName):
+ return
+
+ iq = IQ(self.xmlStream)
+ pubsubElement = iq.addElement('pubsub',
+ defaultUri=self.pubsubNS+"#owner")
+ publishElement = pubsubElement.addElement('delete')
+ publishElement['node'] = nodeName
+ self.sendDebug("Deleting (%s)" % (nodeName,), iq)
+ d = iq.send(to=self.settings['ServiceAddress'])
+ d.addCallback(self.deleteNodeSuccess, nodeName)
+ d.addErrback(self.deleteNodeFailure, nodeName)
+ except:
+ self.unlockNode(None, nodeName)
+ raise
+
+ def deleteNodeSuccess(self, iq, nodeName):
+ self.unlockNode(None, nodeName)
+ self.sendDebug("Node delete successful (%s)" % (nodeName,), iq)
+
+ def deleteNodeFailure(self, result, nodeName):
+ try:
+ iq = result.value.getElement()
+ self.log_error("PubSub node delete error: %s" %
+ (iq.toXml().encode('ascii', 'replace')),)
+ self.sendDebug("Node delete failed (%s)" % (nodeName,), iq)
+ finally:
+ self.unlockNode(None, nodeName)
+
+
+ def requestRoster(self):
+ if self.doRoster:
+ self.roster = {}
+ rosterIq = IQ(self.xmlStream, 'get')
+ rosterIq.addElement("query", "jabber:iq:roster")
+ d = rosterIq.send()
+ d.addCallback(self.handleRoster)
+
+ def allowedInRoster(self, jid):
+ for pattern in self.settings.get("AllowedJIDs", []):
+ if fnmatch(jid, pattern):
+ return True
+ return False
+
+ def handleRoster(self, iq):
+ for child in iq.children[0].children:
+ jid = child['jid']
+ if self.allowedInRoster(jid):
+ self.log_debug("In roster: %s" % (jid,))
+ if not self.roster.has_key(jid):
+ self.roster[jid] = { 'debug' : False, 'available' : False }
+ else:
+ self.log_info("JID not allowed in roster: %s" % (jid,))
+
+ def handlePresence(self, iq):
+ self.log_debug("Presence IQ: %s" %
+ (iq.toXml().encode('ascii', 'replace')),)
+ presenceType = iq.getAttribute('type')
+
+ if presenceType == 'subscribe':
+ frm = JID(iq['from']).userhost()
+ if self.allowedInRoster(frm):
+ self.roster[frm] = { 'debug' : False, 'available' : True }
+ response = domish.Element(('jabber:client', 'presence'))
+ response['to'] = iq['from']
+ response['type'] = 'subscribed'
+ self.xmlStream.send(response)
+
+ # request subscription as well
+ subscribe = domish.Element(('jabber:client', 'presence'))
+ subscribe['to'] = iq['from']
+ subscribe['type'] = 'subscribe'
+ self.xmlStream.send(subscribe)
+ else:
+ self.log_info("JID not allowed in roster: %s" % (frm,))
+ # Reject
+ response = domish.Element(('jabber:client', 'presence'))
+ response['to'] = iq['from']
+ response['type'] = 'unsubscribed'
+ self.xmlStream.send(response)
+
+ elif presenceType == 'unsubscribe':
+ frm = JID(iq['from']).userhost()
+ if self.roster.has_key(frm):
+ del self.roster[frm]
+ response = domish.Element(('jabber:client', 'presence'))
+ response['to'] = iq['from']
+ response['type'] = 'unsubscribed'
+ self.xmlStream.send(response)
+
+ # remove from roster as well
+ # XXX This codepath is not unit tested
+ removal = IQ(self.xmlStream, 'set')
+ query = removal.addElement("query", "jabber:iq:roster")
+ query.addElement("item")
+ query.item["jid"] = iq["from"]
+ query.item["subscription"] = "remove"
+ removal.send()
+
+ elif presenceType == 'unavailable':
+ frm = JID(iq['from']).userhost()
+ if self.roster.has_key(frm):
+ self.roster[frm]['available'] = False
+
+ else:
+ frm = JID(iq['from']).userhost()
+ if self.allowedInRoster(frm):
+ if self.roster.has_key(frm):
+ self.roster[frm]['available'] = True
+ else:
+ self.roster[frm] = { 'debug' : False, 'available' : True }
+ else:
+ self.log_info("JID not allowed in roster: %s" % (frm,))
+
def streamOpened(self, xmlStream):
self.xmlStream = xmlStream
xmlStream.addObserver('/message', self.handleMessage)
+ xmlStream.addObserver('/presence', self.handlePresence)
+ self.requestRoster()
+ self.sendHeartbeat()
+
def streamClosed(self):
self.xmlStream = None
def sendDebug(self, txt, element):
- if self.sendDebugMessages:
- testJid = self.settings.get("TestJID", "")
- if testJid:
- txt = "DEBUG: %s %s" % (txt, element.toXml().encode('ascii',
- 'replace'))
- self.sendAlert(testJid, txt)
+ txt = "DEBUG: %s %s" % (txt, element.toXml().encode('ascii', 'replace'))
+ for jid, info in self.roster.iteritems():
+ if info['available'] and info['debug']:
+ self.sendAlert(jid, txt)
def sendError(self, txt, element):
- testJid = self.settings.get("TestJID", "")
- if testJid:
- txt = "ERROR: %s %s" % (txt, element.toXml().encode('ascii',
- 'replace'))
- self.sendAlert(testJid, txt)
+ txt = "ERROR: %s %s" % (txt, element.toXml().encode('ascii', 'replace'))
+ for jid, info in self.roster.iteritems():
+ if info['available']:
+ self.sendAlert(jid, txt)
def sendAlert(self, jid, txt):
if self.xmlStream is not None:
@@ -645,17 +1009,60 @@
body = getattr(iq, 'body', None)
if body:
response = None
- txt = str(body).lower()
- if txt == "help":
- response = "debug on, debug off"
- elif txt == "debug on":
- self.sendDebugMessages = True
- response = "Debugging on"
- elif txt == "debug off":
- self.sendDebugMessages = False
- response = "Debugging off"
+ frm = JID(iq['from']).userhost()
+ if frm in self.roster:
+ txt = str(body).lower()
+ if txt == "help":
+ response = "debug on, debug off, roster, create <nodename>, publish <nodename>, hammer <count>"
+ elif txt == "roster":
+ response = "Roster: %s" % (str(self.roster),)
+ elif txt == "debug on":
+ self.roster[frm]['debug'] = True
+ response = "Debugging on"
+ elif txt == "debug off":
+ self.roster[frm]['debug'] = False
+ response = "Debugging off"
+ elif txt == "outstanding":
+ response = "Outstanding: %s" % (str(self.outstanding),)
+ elif txt.startswith("publish"):
+ try:
+ publish, nodeName = str(body).split()
+ except ValueError:
+ response = "Please phrase it like 'publish nodename'"
+ else:
+ response = "Publishing node %s" % (nodeName,)
+ self.reactor.callLater(1, self.enqueue, "update",
+ nodeName)
+ elif txt.startswith("delete"):
+ try:
+ delete, nodeName = str(body).split()
+ except ValueError:
+ response = "Please phrase it like 'delete nodename'"
+ else:
+ response = "Deleting node %s" % (nodeName,)
+ self.reactor.callLater(1, self.deleteNode, nodeName)
+ elif txt.startswith("create"):
+ try:
+ publish, nodeName = str(body).split()
+ except ValueError:
+ response = "Please phrase it like 'create nodename'"
+ else:
+ response = "Creating and configuring node %s" % (nodeName,)
+ self.reactor.callLater(1, self.enqueue, "create",
+ nodeName)
+ elif txt.startswith("hammer"):
+ try:
+ hammer, count = txt.split()
+ count = int(count)
+ except ValueError:
+ response = "Please phrase it like 'hammer 100'"
+ else:
+ response = "Hammer will commence now, %d times" % (count,)
+ self.reactor.callLater(1, self.hammer, count)
+ else:
+ response = "I don't understand. Try 'help'."
else:
- response = "I don't understand. Try 'help'."
+ response = "Sorry, you are not authorized to converse with this server"
if response:
message = domish.Element(('jabber:client', 'message'))
@@ -664,39 +1071,54 @@
self.xmlStream.send(message)
+ def hammer(self, count):
+ for i in xrange(count):
+ self.enqueue("update", "hammertesting%d" % (i,))
class XMPPNotificationFactory(xmlstream.XmlStreamFactory, LoggingMixIn):
- def __init__(self, notifier, settings, reactor=None):
- self.log_info("Setting up XMPPNotificationFactory")
+ def __init__(self, notifier, settings, reactor=None, keepAlive=True):
+ self.log_warn("Setting up XMPPNotificationFactory")
self.notifier = notifier
self.settings = settings
+
self.jid = settings['JID']
+
+ # Ignore JID resource from plist
+ slash = self.jid.find("/")
+ if slash > -1:
+ self.jid = self.jid[0:slash]
+
+ # Generate a unique JID resource value
+ resource = "icalserver.%s" % uuid.uuid4().hex
+ self.jid = "%s/%s" % (self.jid, resource)
+
self.keepAliveSeconds = settings.get('KeepAliveSeconds', 120)
self.xmlStream = None
self.presenceCall = None
+ self.doKeepAlive = keepAlive
if reactor is None:
from twisted.internet import reactor
self.reactor = reactor
xmlstream.XmlStreamFactory.__init__(self,
- BasicAuthenticator(JID(self.jid), settings['Password']))
+ XMPPAuthenticator(JID(self.jid), settings['Password']))
self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self.connected)
self.addBootstrap(xmlstream.STREAM_END_EVENT, self.disconnected)
self.addBootstrap(xmlstream.INIT_FAILED_EVENT, self.initFailed)
self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self.authenticated)
- self.addBootstrap(BasicAuthenticator.INVALID_USER_EVENT,
+ self.addBootstrap(IQAuthInitializer.INVALID_USER_EVENT,
self.authFailed)
- self.addBootstrap(BasicAuthenticator.AUTH_FAILED_EVENT,
+ self.addBootstrap(IQAuthInitializer.AUTH_FAILED_EVENT,
self.authFailed)
def connected(self, xmlStream):
self.xmlStream = xmlStream
- self.log_info("XMPP connection successful")
+ self.log_warn("XMPP connection successful")
# Log all traffic
xmlStream.rawDataInFn = self.rawDataIn
xmlStream.rawDataOutFn = self.rawDataOut
@@ -707,14 +1129,14 @@
if self.presenceCall is not None:
self.presenceCall.cancel()
self.presenceCall = None
- self.log_info("XMPP disconnected")
+ self.log_warn("XMPP disconnected")
def initFailed(self, failure):
self.xmlStream = None
- self.log_info("XMPP Initialization failed: %s" % (failure,))
+ self.log_warn("XMPP Initialization failed: %s" % (failure,))
def authenticated(self, xmlStream):
- self.log_info("XMPP authentication successful: %s" % (self.jid,))
+ self.log_warn("XMPP authentication successful: %s" % (self.jid,))
# xmlStream.addObserver('/message', self.handleMessage)
self.sendPresence()
self.notifier.streamOpened(xmlStream)
@@ -724,7 +1146,7 @@
(self.jid,))
def sendPresence(self):
- if self.xmlStream is not None:
+ if self.doKeepAlive and self.xmlStream is not None:
presence = domish.Element(('jabber:client', 'presence'))
self.xmlStream.send(presence)
self.presenceCall = self.reactor.callLater(self.keepAliveSeconds,
@@ -744,24 +1166,33 @@
results = { 'enabled' : False }
# return the first enabled xmpp service settings in the config file
- for settings in config.Notifications["Services"].itervalues():
+ for key, settings in config.Notifications.Services.iteritems():
if (settings["Service"] == "twistedcaldav.notify.XMPPNotifierService"
and settings["Enabled"]):
results['enabled'] = True
results['service'] = settings['ServiceAddress']
results['host'] = config.ServerHostName
results['port'] = config.SSLPort or config.HTTPPort
+ results['xmpp-server'] = settings['Host']
+ results['heartrate'] = settings['HeartbeatMinutes']
return results
def getPubSubPath(uri, pubSubConfiguration):
- return ("/Public/CalDAV/%s/%d/%s/" % (pubSubConfiguration['host'],
- pubSubConfiguration['port'], uri.strip("/")))
+ path = "/Public/CalDAV/%s/%d/" % (pubSubConfiguration['host'],
+ pubSubConfiguration['port'])
+ if uri:
+ path += "%s/" % (uri.strip("/"),)
+ return path
def getPubSubXMPPURI(uri, pubSubConfiguration):
return "xmpp:%s?pubsub;node=%s" % (pubSubConfiguration['service'],
getPubSubPath(uri, pubSubConfiguration))
+def getPubSubHeartbeatURI(pubSubConfiguration):
+ return "xmpp:%s?pubsub;node=%s" % (pubSubConfiguration['service'],
+ getPubSubPath("", pubSubConfiguration))
+
#
# Notification Server service config
#
@@ -861,19 +1292,31 @@
def makeService(self, options):
+ #
+ # Configure Memcached Client Pool
+ #
+ if config.Memcached.ClientEnabled:
+ memcachepool.installPool(
+ IPv4Address(
+ 'TCP',
+ config.Memcached.BindAddress,
+ config.Memcached.Port),
+ config.Memcached.MaxClients)
+
multiService = service.MultiService()
notifiers = []
- for settings in config.Notifications["Services"].itervalues():
+ for key, settings in config.Notifications.Services.iteritems():
if settings["Enabled"]:
notifier = namedClass(settings["Service"])(settings)
notifier.setServiceParent(multiService)
notifiers.append(notifier)
internet.TCPServer(
- config.Notifications["InternalNotificationPort"],
+ config.Notifications.InternalNotificationPort,
InternalNotificationFactory(notifiers,
- delaySeconds=config.Notifications["CoalesceSeconds"])
+ delaySeconds=config.Notifications.CoalesceSeconds),
+ interface=config.Notifications.BindAddress
).setServiceParent(multiService)
return multiService
@@ -886,8 +1329,8 @@
self.server = internet.TCPServer(settings["Port"],
SimpleLineNotificationFactory(self.notifier))
- def enqueue(self, uri):
- self.notifier.enqueue(uri)
+ def enqueue(self, op, uri):
+ self.notifier.enqueue(op, uri)
def startService(self):
self.server.startService()
@@ -900,12 +1343,19 @@
def __init__(self, settings):
self.notifier = XMPPNotifier(settings)
- self.client = internet.TCPClient(settings["Host"], settings["Port"],
- XMPPNotificationFactory(self.notifier, settings))
- def enqueue(self, uri):
- self.notifier.enqueue(uri)
+ if settings["Port"] == 5223: # use old SSL method
+ self.client = internet.SSLClient(settings["Host"], settings["Port"],
+ XMPPNotificationFactory(self.notifier, settings),
+ ClientContextFactory())
+ else:
+ # TLS and SASL
+ self.client = internet.TCPClient(settings["Host"], settings["Port"],
+ XMPPNotificationFactory(self.notifier, settings))
+ def enqueue(self, op, uri):
+ self.notifier.enqueue(op, uri)
+
def startService(self):
self.client.startService()
Modified: CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/static.py
===================================================================
--- CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/static.py 2009-07-15 01:21:01 UTC (rev 4459)
+++ CalendarServer/branches/users/wsanchez/deployment/twistedcaldav/static.py 2009-07-15 01:25:22 UTC (rev 4460)
@@ -74,7 +74,10 @@
from twistedcaldav.timezoneservice import TimezoneServiceResource
from twistedcaldav.cache import DisabledCacheNotifier, PropfindCacheMixin
from twistedcaldav.notify import getPubSubConfiguration, getPubSubXMPPURI
+from twistedcaldav.notify import getPubSubHeartbeatURI, getPubSubPath
+from twistedcaldav.notify import ClientNotifier, getNodeCacher
+
log = Logger()
class CalDAVFile (CalDAVResource, DAVFile):
@@ -381,6 +384,12 @@
except:
return fail(Failure())
+ if hasattr(self, 'clientNotifier'):
+ self.clientNotifier.notify(op="update")
+ else:
+ log.debug("%r does not have a clientNotifier but the CTag changed"
+ % (self,))
+
if hasattr(self, 'cacheNotifier'):
return self.cacheNotifier.changed()
else:
@@ -650,6 +659,8 @@
liveProperties = CalDAVFile.liveProperties + (
(customxml.calendarserver_namespace, "xmpp-uri"),
+ (customxml.calendarserver_namespace, "xmpp-heartbeat-uri"),
+ (customxml.calendarserver_namespace, "xmpp-server"),
)
def __init__(self, path, parent, record):
@@ -657,6 +668,7 @@
@param path: the path to the file which will back the resource.
"""
self.cacheNotifier = self.cacheNotifierFactory(self)
+ self.clientNotifier = ClientNotifier(self)
CalDAVFile.__init__(self, path)
DirectoryCalendarHomeResource.__init__(self, parent, record)
@@ -675,6 +687,7 @@
if cls is not None:
child = cls(self.fp.child(name).path, self)
child.cacheNotifier = self.cacheNotifier
+ child.clientNotifier = self.clientNotifier
return child
return self.createSimilarFile(self.fp.child(name).path)
@@ -685,6 +698,7 @@
else:
similar = CalDAVFile(path, principalCollections=self.principalCollections())
similar.cacheNotifier = self.cacheNotifier
+ similar.clientNotifier = self.clientNotifier
return similar
def getChild(self, name):
@@ -701,14 +715,50 @@
else:
qname = property.qname()
+ def doneWaiting(result, propVal):
+ return propVal
+
if qname == (customxml.calendarserver_namespace, "xmpp-uri"):
pubSubConfiguration = getPubSubConfiguration(config)
if pubSubConfiguration['enabled']:
- return succeed(customxml.PubSubXMPPURIProperty(
- getPubSubXMPPURI(self.url(), pubSubConfiguration)))
+ if getattr(self, "clientNotifier", None) is not None:
+ url = self.url()
+ nodeName = getPubSubPath(url, pubSubConfiguration)
+ propVal = customxml.PubSubXMPPURIProperty(
+ getPubSubXMPPURI(url, pubSubConfiguration))
+ nodeCacher = getNodeCacher()
+ d = nodeCacher.waitForNode(self.clientNotifier, nodeName)
+ # In either case we're going to return the xmpp-uri value
+ d.addCallback(doneWaiting, propVal)
+ d.addErrback(doneWaiting, propVal)
+ return d
else:
return succeed(customxml.PubSubXMPPURIProperty())
+ elif qname == (customxml.calendarserver_namespace, "xmpp-heartbeat-uri"):
+ pubSubConfiguration = getPubSubConfiguration(config)
+ if pubSubConfiguration['enabled']:
+ return succeed(
+ customxml.PubSubHeartbeatProperty(
+ customxml.PubSubHeartbeatURIProperty(
+ getPubSubHeartbeatURI(pubSubConfiguration)
+ ),
+ customxml.PubSubHeartbeatMinutesProperty(
+ str(pubSubConfiguration['heartrate'])
+ )
+ )
+ )
+ else:
+ return succeed(customxml.PubSubHeartbeatURIProperty())
+
+ elif qname == (customxml.calendarserver_namespace, "xmpp-server"):
+ pubSubConfiguration = getPubSubConfiguration(config)
+ if pubSubConfiguration['enabled']:
+ return succeed(customxml.PubSubXMPPServerProperty(
+ pubSubConfiguration['xmpp-server']))
+ else:
+ return succeed(customxml.PubSubXMPPServerProperty())
+
return super(CalendarHomeFile, self).readProperty(property, request)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20090714/53f8416e/attachment-0001.html>
More information about the calendarserver-changes
mailing list