[CalendarServer-changes] [2690] CalendarServer/trunk
source_changes at macosforge.org
source_changes at macosforge.org
Fri Jul 11 11:37:40 PDT 2008
Revision: 2690
http://trac.macosforge.org/projects/calendarserver/changeset/2690
Author: wsanchez at apple.com
Date: 2008-07-11 11:37:40 -0700 (Fri, 11 Jul 2008)
Log Message:
-----------
Merge:
http://svn.calendarserver.org/repository/calendarserver/CalendarServer/branches/users/sagen/notifications
http://svn.calendarserver.org/repository/calendarserver/CalendarServer/branches/users/sagen/xmpp
Adds Notification Service.
Modified Paths:
--------------
CalendarServer/trunk/conf/caldavd-test.plist
CalendarServer/trunk/conf/caldavd.plist
CalendarServer/trunk/twisted/plugins/caldav.py
CalendarServer/trunk/twistedcaldav/cache.py
CalendarServer/trunk/twistedcaldav/cluster.py
CalendarServer/trunk/twistedcaldav/config.py
CalendarServer/trunk/twistedcaldav/customxml.py
CalendarServer/trunk/twistedcaldav/static.py
CalendarServer/trunk/twistedcaldav/tap.py
Added Paths:
-----------
CalendarServer/trunk/twistedcaldav/notify.py
CalendarServer/trunk/twistedcaldav/test/test_notify.py
Modified: CalendarServer/trunk/conf/caldavd-test.plist
===================================================================
--- CalendarServer/trunk/conf/caldavd-test.plist 2008-07-11 18:09:51 UTC (rev 2689)
+++ CalendarServer/trunk/conf/caldavd-test.plist 2008-07-11 18:37:40 UTC (rev 2690)
@@ -405,6 +405,66 @@
<!--
+ Notifications
+ -->
+
+ <key>Notifications</key>
+ <dict>
+ <key>CoalesceSeconds</key>
+ <integer>10</integer>
+ <key>InternalNotificationHost</key>
+ <string>localhost</string>
+ <key>InternalNotificationPort</key>
+ <integer>62309</integer>
+
+ <key>Services</key>
+ <array>
+ <dict>
+ <!-- Simple notification service (for testing) -->
+ <key>Service</key>
+ <string>twistedcaldav.notify.SimpleLineNotifierService</string>
+ <key>Enabled</key>
+ <false/>
+ <key>Port</key>
+ <integer>62308</integer>
+ </dict>
+
+ <dict>
+ <!-- XMPP notification service -->
+ <key>Service</key>
+ <string>twistedcaldav.notify.XMPPNotifierService</string>
+ <key>Enabled</key>
+ <false/>
+
+ <!-- XMPP host and port to contact -->
+ <key>Host</key>
+ <string>xmpp.host.name</string>
+ <key>Port</key>
+ <integer>5222</integer>
+
+ <!-- Jabber ID and password for the server -->
+ <key>JID</key>
+ <string>jid at xmpp.host.name/resource</string>
+ <key>Password</key>
+ <string>password_goes_here</string>
+
+ <!-- PubSub service address -->
+ <key>ServiceAddress</key>
+ <string>pubsub.xmpp.host.name</string>
+
+ <!-- Sends a presence notification to XMPP server at this interval (prevents disconnect) -->
+ <key>KeepAliveSeconds</key>
+ <integer>120</integer>
+
+ <!-- Sends messages to this account for debugging -->
+ <key>TestJID</key>
+ <string>otherjid at xmpp.host.name</string>
+ </dict>
+ </array>
+ </dict>
+
+
+ <!--
Miscellaneous items
-->
Modified: CalendarServer/trunk/conf/caldavd.plist
===================================================================
--- CalendarServer/trunk/conf/caldavd.plist 2008-07-11 18:09:51 UTC (rev 2689)
+++ CalendarServer/trunk/conf/caldavd.plist 2008-07-11 18:37:40 UTC (rev 2690)
@@ -277,6 +277,45 @@
<!--
+ Notifications
+ -->
+
+ <key>Notifications</key>
+ <dict>
+ <key>Services</key>
+ <array>
+ <dict>
+ <!-- XMPP notification service -->
+ <key>Service</key>
+ <string>twistedcaldav.notify.XMPPNotifierService</string>
+ <key>Enabled</key>
+ <false/>
+
+ <!-- XMPP host and port to contact -->
+ <key>Host</key>
+ <string>xmpp.host.name</string>
+ <key>Port</key>
+ <integer>5222</integer>
+
+ <!-- Jabber ID and password for the server -->
+ <key>JID</key>
+ <string>jid at xmpp.host.name/resource</string>
+ <key>Password</key>
+ <string>password_goes_here</string>
+
+ <!-- PubSub service address -->
+ <key>ServiceAddress</key>
+ <string>pubsub.xmpp.host.name</string>
+
+ <!-- Sends a presence notification to XMPP server at this interval (prevents disconnect) -->
+ <key>KeepAliveSeconds</key>
+ <integer>120</integer>
+ </dict>
+ </array>
+ </dict>
+
+
+ <!--
Miscellaneous items
-->
Modified: CalendarServer/trunk/twisted/plugins/caldav.py
===================================================================
--- CalendarServer/trunk/twisted/plugins/caldav.py 2008-07-11 18:09:51 UTC (rev 2689)
+++ CalendarServer/trunk/twisted/plugins/caldav.py 2008-07-11 18:37:40 UTC (rev 2690)
@@ -30,3 +30,5 @@
TwistedCalDAV = TAP('twistedcaldav.tap.CalDAVServiceMaker')
+
+CalDAVNotifier = TAP('twistedcaldav.notify.NotificationServiceMaker')
Modified: CalendarServer/trunk/twistedcaldav/cache.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/cache.py 2008-07-11 18:09:51 UTC (rev 2689)
+++ CalendarServer/trunk/twistedcaldav/cache.py 2008-07-11 18:37:40 UTC (rev 2690)
@@ -29,8 +29,11 @@
from twistedcaldav.log import LoggingMixIn
from twistedcaldav.memcachepool import CachePoolUserMixIn
+from twistedcaldav.config import config
+from twistedcaldav.notify import NotificationClientUserMixIn
+
class DisabledCacheNotifier(object):
def __init__(self, *args, **kwargs):
pass
@@ -62,7 +65,14 @@
-class MemcacheChangeNotifier(LoggingMixIn, CachePoolUserMixIn):
+#
+# FIXME: This should be a generic notifier class, not specific to
+# memcache, as evidenced by the addition of the sendNotification()
+# addition in changed() below.
+#
+class MemcacheChangeNotifier(LoggingMixIn, CachePoolUserMixIn,
+ NotificationClientUserMixIn):
+
def __init__(self, resource, cachePool=None):
self._resource = resource
self._cachePool = cachePool
@@ -78,9 +88,16 @@
return: A L{Deferred} that fires when the token has been changed.
"""
- self.log_debug("Changing Cache Token for %r" % (self._resource.url(),))
+
+ url = self._resource.url()
+
+ if config.Notifications["Enabled"]:
+ self.log_debug("Notifications are enabled: %s" % (url,))
+ self.sendNotification(url)
+
+ self.log_debug("Changing Cache Token for %r" % (url,))
return self.getCachePool().set(
- 'cacheToken:%s' % (self._resource.url(),),
+ 'cacheToken:%s' % (url,),
self._newCacheToken())
Modified: CalendarServer/trunk/twistedcaldav/cluster.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/cluster.py 2008-07-11 18:09:51 UTC (rev 2689)
+++ CalendarServer/trunk/twistedcaldav/cluster.py 2008-07-11 18:37:40 UTC (rev 2690)
@@ -309,7 +309,18 @@
monitor.addProcess('memcached', memcachedArgv, env=parentEnv)
+ if (config.Notifications["Enabled"] and
+ config.Notifications["InternalNotificationHost"] == "localhost"):
+ log.msg("Adding notification service")
+ notificationsArgv = [
+ config.Twisted['twistd'],
+ '-n', 'caldav_notifier',
+ '-f', options['config'],
+ ]
+ monitor.addProcess('notifications', notificationsArgv, env=parentEnv)
+
+
logger = AMPLoggingFactory(
RotatingFileAccessLoggingObserver(config.AccessLogFile))
Modified: CalendarServer/trunk/twistedcaldav/config.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/config.py 2008-07-11 18:09:51 UTC (rev 2689)
+++ CalendarServer/trunk/twistedcaldav/config.py 2008-07-11 18:37:40 UTC (rev 2690)
@@ -175,6 +175,34 @@
"EnableTimezoneService" : False, # Timezone service
#
+ # Notifications
+ #
+ "Notifications" : {
+ "Enabled": False,
+ "CoalesceSeconds" : 10,
+ "InternalNotificationHost" : "localhost",
+ "InternalNotificationPort" : 62309,
+
+ "Services" : [
+ {
+ "Service" : "twistedcaldav.notify.SimpleLineNotifierService",
+ "Enabled" : False,
+ "Port" : 62308,
+ },
+ {
+ "Service" : "twistedcaldav.notify.XMPPNotifierService",
+ "Enabled" : False,
+ "Host" : "", # "xmpp.host.name"
+ "Port" : 5222,
+ "JID" : "", # "jid at xmpp.host.name/resource"
+ "Password" : "",
+ "ServiceAddress" : "", # "pubsub.xmpp.host.name"
+ "KeepAliveSeconds" : 120,
+ },
+ ]
+ },
+
+ #
# Implementation details
#
# The following are specific to how the server is built, and useful
@@ -250,6 +278,7 @@
self.updateDropBox,
self.updateLogLevels,
self.updateThreadPoolSize,
+ self.updateNotifications,
]
def __str__(self):
@@ -441,6 +470,29 @@
configDict = _cleanup(configDict)
self.update(configDict)
+ @staticmethod
+ def updateNotifications(self, items):
+ #
+ # Notifications
+ #
+ for service in self.Notifications["Services"]:
+ if service["Enabled"]:
+ self.Notifications["Enabled"] = True
+ break
+ else:
+ self.Notifications["Enabled"] = False
+
+ for service in self.Notifications["Services"]:
+ if (
+ service["Service"] == "twistedcaldav.notify.XMPPNotifierService" and
+ service["Enabled"]
+ ):
+ for key, value in service.iteritems():
+ if not value:
+ raise ConfigurationError("Invalid %s for XMPPNotifierService: %r"
+ % (key, value))
+
+
def _mergeData(oldData, newData):
for key, value in newData.iteritems():
if isinstance(value, (dict,)):
Modified: CalendarServer/trunk/twistedcaldav/customxml.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/customxml.py 2008-07-11 18:09:51 UTC (rev 2689)
+++ CalendarServer/trunk/twistedcaldav/customxml.py 2008-07-11 18:37:40 UTC (rev 2690)
@@ -244,6 +244,17 @@
namespace = calendarserver_namespace
name = "utc-offset"
+class PubSubXMPPURIProperty (davxml.WebDAVTextElement):
+ """
+ A calendarhomefile property to indicate the pubsub XMPP URI to subscribe to
+ for notifications.
+ """
+ namespace = calendarserver_namespace
+ name = "xmpp-uri"
+ protected = True
+
+
+
##
# Extensions to davxml.ResourceType
##
Copied: CalendarServer/trunk/twistedcaldav/notify.py (from rev 2689, CalendarServer/branches/users/sagen/notifications/twistedcaldav/notify.py)
===================================================================
--- CalendarServer/trunk/twistedcaldav/notify.py (rev 0)
+++ CalendarServer/trunk/twistedcaldav/notify.py 2008-07-11 18:37:40 UTC (rev 2690)
@@ -0,0 +1,897 @@
+##
+# Copyright (c) 2005-2008 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Notification framework for Calendar Server
+
+This module implements client code which is executed within the context of
+icalserver itself, and also server code (the "notification server") which is
+run as a separate process, launched as part of "./run".
+
+The notification server process is implemented as a twistd plugin
+(with a tapname of "caldav_notifier"), and is comprised of two
+services -- one handling the internal channel between icalserver
+and notification server, the other handling the external channel
+between notification server and a remote consumer.
+
+The icalserver tap creates a NotificationClient object at startup;
+it deals with passing along notifications to the notification server.
+These notifications originate from cache.py:MemcacheChangeNotifier.changed().
+"""
+
+# TODO: bindAddress to local
+# TODO: add CalDAVTester test for examining new xmpp-uri property
+# TODO: auto-registration and roster management for XMPP
+
+import os
+from twisted.internet import reactor, protocol
+from twisted.protocols import basic
+from twisted.plugin import IPlugin
+from twisted.application import internet, service
+from twisted.python.usage import Options, UsageError
+from twisted.python.reflect import namedClass
+from twisted.words.protocols.jabber import xmlstream
+from twisted.words.protocols.jabber.jid import JID
+from twisted.words.protocols.jabber.client import BasicAuthenticator, IQ
+from twisted.words.xish import domish
+from twistedcaldav.log import LoggingMixIn
+from twistedcaldav.config import config, parseConfig, defaultConfig
+from zope.interface import Interface, implements
+
+__all__ = [
+ "Coalescer",
+ "getNotificationClient",
+ "getPubSubConfiguration",
+ "getPubSubPath",
+ "getPubSubXMPPURI",
+ "INotifier",
+ "installNotificationClient",
+ "InternalNotificationFactory",
+ "InternalNotificationProtocol",
+ "NotificationClient",
+ "NotificationClientFactory",
+ "NotificationClientLineProtocol",
+ "NotificationClientUserMixIn",
+ "NotificationOptions",
+ "NotificationServiceMaker",
+ "SimpleLineNotificationFactory",
+ "SimpleLineNotificationProtocol",
+ "SimpleLineNotifier",
+ "SimpleLineNotifierService",
+ "XMPPNotificationFactory",
+ "XMPPNotifier",
+]
+
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Classes used within calendarserver itself
+#
+
+class NotificationClientUserMixIn(object):
+ """
+ Notification Client User (Mixin)
+
+ Provides a method to send change notifications to the L{NotificationClient}.
+ """
+
+ def sendNotification(self, uri):
+ getNotificationClient().send(uri)
+
+
+
+class NotificationClientLineProtocol(basic.LineReceiver, LoggingMixIn):
+ """
+ Notification Client Line Protocol
+
+ Sends updates to the notification server.
+ """
+
+ def connectionMade(self):
+ self.client.addObserver(self)
+ self.factory.connectionMade()
+
+ def connectionLost(self, reason):
+ self.client.removeObserver(self)
+
+
+class NotificationClientFactory(protocol.ReconnectingClientFactory,
+ LoggingMixIn):
+ """
+ Notification Client Factory
+
+ Sends updates to the notification server.
+ """
+
+ protocol = NotificationClientLineProtocol
+
+ def __init__(self, client):
+ self.connected = False
+ self.client = client
+
+ def clientConnectionLost(self, connector, reason):
+ self.log_error("Connect to notification server lost: %s" %
+ (reason,))
+ self.connected = False
+ protocol.ReconnectingClientFactory.clientConnectionLost(self,
+ connector, reason)
+
+ def clientConnectionFailed(self, connector, reason):
+ self.log_error("Unable to connect to notification server: %s" %
+ (reason,))
+ self.connected = False
+ protocol.ReconnectingClientFactory.clientConnectionFailed(self,
+ connector, reason)
+
+ def connectionMade(self):
+ self.connected = True
+ self.resetDelay()
+ self.client.connectionMade()
+
+ def isReady(self):
+ return self.connected
+
+ def buildProtocol(self, addr):
+ p = self.protocol()
+ p.factory = self
+ p.client = self.client
+ return p
+
+
+class NotificationClient(LoggingMixIn):
+ """
+ Notification Client
+
+ Forwards on notifications from NotificationClientUserMixIns to the
+ notification server. A NotificationClient is installed by the tap at
+ startup.
+ """
+
+ def __init__(self, reactor, host, port):
+ self.factory = None
+ self.reactor = reactor
+ self.host = host
+ self.port = port
+ self.observers = set()
+ self.queued = set()
+
+ def send(self, uri):
+ if self.factory is None:
+ self.factory = NotificationClientFactory(self)
+ self.reactor.connectTCP(self.host, self.port, self.factory)
+ self.log_debug("Creating factory")
+
+ if self.factory.isReady() and self.observers:
+ for observer in self.observers:
+ self.log_debug("Sending to notification server: %s" % (uri,))
+ observer.sendLine(str(uri))
+ else:
+ self.log_debug("Queing: %s" % (uri,))
+ self.queued.add(uri)
+
+ def connectionMade(self):
+ if self.factory.isReady() and self.observers:
+ for observer in self.observers:
+ for uri in self.queued:
+ self.log_debug("Sending from queue: %s" % (uri,))
+ observer.sendLine(str(uri))
+ self.queued.clear()
+
+ def addObserver(self, observer):
+ self.observers.add(observer)
+
+ def removeObserver(self, observer):
+ self.observers.remove(observer)
+
+
+_notificationClient = None
+
+def installNotificationClient(reactor, host, port, klass=NotificationClient):
+ global _notificationClient
+ _notificationClient = klass(reactor, host, port)
+
+def getNotificationClient():
+ return _notificationClient
+
+
+
+
+
+
+
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Classes used within Notification Server
+#
+
+#
+# Internal Channel (from icalserver to notification server)
+#
+
+class InternalNotificationProtocol(basic.LineReceiver):
+ """
+ InternalNotificationProtocol
+
+ Receives notifications from the calendar server.
+ """
+
+ def lineReceived(self, line):
+ val = str(line.strip())
+ self.factory.coalescer.add(val)
+
+
+class InternalNotificationFactory(protocol.ServerFactory):
+ """
+ Internal Notification Factory
+
+ Receives notifications from the calendar server.
+ """
+
+ protocol = InternalNotificationProtocol
+
+ def __init__(self, notifiers, delaySeconds=None):
+ self.coalescer = Coalescer(notifiers, delaySeconds=delaySeconds)
+
+
+
+class Coalescer(LoggingMixIn):
+ """
+ Coalescer
+
+ A queue which hangs on to incoming uris for some period of time before
+ passing them along to the external notifier listening for these updates.
+ A chatty CalDAV client can make several changes in a short period of time,
+ and the Coalescer buffers the external clients somewhat.
+ """
+
+ delaySeconds = 5
+
+ def __init__(self, notifiers, reactor=None, delaySeconds=None):
+
+ if delaySeconds:
+ self.delaySeconds = delaySeconds
+
+ if reactor is None:
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+ self.uris = dict()
+ self.notifiers = notifiers
+
+ def add(self, uri):
+ delayed = self.uris.get(uri, None)
+ if delayed and delayed.active():
+ delayed.reset(self.delaySeconds)
+ else:
+ self.uris[uri] = self.reactor.callLater(self.delaySeconds,
+ self.delayedEnqueue, uri)
+
+ def delayedEnqueue(self, uri):
+ for notifier in self.notifiers:
+ notifier.enqueue(uri)
+
+
+
+#
+# External Channel (from notification server to other consumers)
+#
+
+class INotifier(Interface):
+ """
+ Notifier Interface
+
+ Defines an enqueue method that Notifier classes need to implement.
+ """
+
+ def enqueue(uri):
+ """
+ Let's the notifier object know that a change has been made for this
+ uri, and enough time has passed to allow for coalescence.
+
+ @type uri: C{str}
+ """
+
+
+class SimpleLineNotifier(object):
+ """
+ Simple Line Notifier
+
+ Listens for uris from the coalescer and writes them out to any
+ connected clients. Each line is simply a sequence number, a
+ space, and a uri string. If the external client sends a sequence
+ number, this notifier will send notification lines for each uri
+ that was changed since that sequence number was originally sent.
+ A history of such sequence numbers is stored in a python dict.
+ If the external client sends a zero, then the history is cleared
+ and the next sequence number to use is reset to 1.
+
+ The sequence number is stored as a python long which means it has
+ essentially infinite precision. We discussed rolling over at the
+ 64-bit boundary, but even if we limit the sequence number to a 64-bit
+ signed integer (or 2^63), and we had 100,000 users generating the
+ maximum number of notifications (which by default is 6/minute since
+ we're coalescing over 10 seconds), it would take 29 million years to
+ rollover.
+ """
+
+ implements(INotifier)
+
+ def __init__(self, settings):
+ self.reset()
+ self.observers = set()
+ self.sentReset = False
+
+ def enqueue(self, uri):
+
+ self.latestSeq += 1L
+
+ # Update history
+ self.history[uri] = self.latestSeq
+
+ for observer in self.observers:
+ observer.sendLine("%d %s" % (self.latestSeq, uri))
+
+ def reset(self):
+ self.latestSeq = 0L
+ self.history = { } # keys=uri, values=sequenceNumber
+
+ def playback(self, observer, oldSeq):
+
+ hist = self.history
+ toSend = [(hist[uri], uri) for uri in hist if hist[uri] > oldSeq]
+ toSend.sort() # sorts the tuples based on numeric sequence number
+
+ for seq, uri in toSend:
+ observer.sendLine("%d %s" % (seq, str(uri)))
+
+
+ def addObserver(self, observer):
+ self.observers.add(observer)
+
+ def removeObserver(self, observer):
+ self.observers.remove(observer)
+
+ def connectionMade(self, observer):
+ if not self.sentReset:
+ observer.sendLine("0")
+ self.sentReset = True
+
+
+class SimpleLineNotificationProtocol(basic.LineReceiver, LoggingMixIn):
+ """
+ Simple Line Notification Protocol
+
+ Sends notifications to external consumers. Also responds to history-
+ playback requests. If an integer is received from an external consumer,
+ it is interpreted as a sequence number; all notifications sent since that
+ sequence number was sent are resent.
+ """
+
+ def connectionMade(self):
+ # we just received a connection from the outside; if it's the first
+ # since we started running, it means we need to let them know that
+ # a reset has happened. This assumes that only one connection will
+ # be made to this channel; if we end up having multiple consumers
+ # of this protocol, we would need to uniquely identify them.
+ self.notifier.connectionMade(self)
+
+ def lineReceived(self, line):
+ val = line.strip()
+
+ # Should be a number requesting all updates since that sequence
+ try:
+ oldSeq = int(val)
+ except ValueError, e:
+ self.log_warn("Error parsing %s: %s (from %s)" % (val, e,
+ self.transport.getPeer()))
+ return
+
+ if oldSeq == 0:
+ self.notifier.reset()
+ else:
+ self.notifier.playback(self, oldSeq)
+
+ def connectionLost(self, reason):
+ self.notifier.removeObserver(self)
+
+
+class SimpleLineNotificationFactory(protocol.ServerFactory):
+ """
+ Simple Line Notification Factory
+
+ Sends notifications to external consumers.
+ """
+
+ protocol = SimpleLineNotificationProtocol
+
+ def __init__(self, notifier):
+ self.notifier = notifier
+
+ def buildProtocol(self, addr):
+ p = self.protocol()
+ self.notifier.addObserver(p)
+ p.notifier = self.notifier
+ return p
+
+
+
+
+
+
+
+
+class XMPPNotifier(LoggingMixIn):
+ """
+ XMPP Notifier
+
+ Uses pubsub XMPP requests to let subscribers know when there
+ has been a change made to a DAV resource (currently just
+ CalendarHomeFiles). Uses XMPP login info from the config file
+ to determine which pubsub service to connect to. When it's
+ time to send a notification, XMPPNotifier computes a node path
+ corresponding to the DAV resource and emits a publish request
+ for that node. If the request comes back 404 XMPPNotifier will
+ create the node and then go through the configuration process,
+ followed by a publish retry.
+
+ For monitoring purposes, you can specify a "TestJID" value in
+ the config file; XMPPNotifier will send error messages to that
+ JID. If you also want to receive non-error, debug messages,
+ send the calendar server JID the message, "debug on". Send
+ "help" for other commands. Note, XMPPNotifier doesn't yet
+ handle registration or roster management, so you'll need to set
+ up the JID accounts out-of-band, using another XMPP client, for
+ example.
+
+ """
+
+ implements(INotifier)
+
+ pubsubNS = 'http://jabber.org/protocol/pubsub'
+
+ nodeConf = {
+ 'pubsub#deliver_payloads' : '0',
+ 'pubsub#persist_items' : '0',
+ }
+
+ def __init__(self, settings, reactor=None, configOverride=None):
+ self.xmlStream = None
+ self.settings = settings
+ if reactor is None:
+ from twisted.internet import reactor
+ self.reactor = reactor
+ self.config = configOverride or config
+
+ self.sendDebugMessages = False
+
+ def enqueue(self, uri):
+ if self.xmlStream is not None:
+ # Convert uri to node
+ nodeName = self.uriToNodeName(uri)
+ self.publishNode(nodeName)
+
+ def uriToNodeName(self, uri):
+ return getPubSubPath(uri, getPubSubConfiguration(self.config))
+
+ def publishNode(self, nodeName):
+ if self.xmlStream is not None:
+ iq = IQ(self.xmlStream)
+ pubsubElement = iq.addElement('pubsub', defaultUri=self.pubsubNS)
+ publishElement = pubsubElement.addElement('publish')
+ publishElement['node'] = nodeName
+ iq.addCallback(self.responseFromPublish, nodeName)
+ iq.send(to=self.settings['ServiceAddress'])
+
+ def responseFromPublish(self, nodeName, iq):
+ if iq['type'] == 'result':
+ self.sendDebug("Node publish successful (%s)" % (nodeName,), iq)
+ else:
+ self.log_error("PubSub node publish error: %s" %
+ (iq.toXml().encode('ascii', 'replace')),)
+ self.sendDebug("Node publish failed (%s)" % (nodeName,), iq)
+
+ errorElement = None
+ pubsubElement = None
+ for child in iq.elements():
+ if child.name == 'error':
+ errorElement = child
+ if child.name == 'pubsub':
+ pubsubElement = child
+
+ if errorElement:
+ if errorElement['code'] == '400':
+ self.requestConfigurationForm(nodeName)
+
+ elif errorElement['code'] == '404':
+ self.createNode(nodeName)
+
+ def createNode(self, nodeName):
+ if self.xmlStream is not None:
+ iq = IQ(self.xmlStream)
+ pubsubElement = iq.addElement('pubsub', defaultUri=self.pubsubNS)
+ child = pubsubElement.addElement('create')
+ child['node'] = nodeName
+ iq.addCallback(self.responseFromCreate, nodeName)
+ iq.send(to=self.settings['ServiceAddress'])
+
+ def responseFromCreate(self, nodeName, iq):
+ if iq['type'] == 'result':
+ self.sendDebug("Node creation successful (%s)" % (nodeName,), iq)
+ # now time to configure; fetch the form
+ self.requestConfigurationForm(nodeName)
+ else:
+ self.log_error("PubSub node creation error: %s" %
+ (iq.toXml().encode('ascii', 'replace')),)
+ self.sendError("Node creation failed (%s)" % (nodeName,), iq)
+
+ def requestConfigurationForm(self, nodeName):
+ if self.xmlStream is not None:
+ iq = IQ(self.xmlStream, type='get')
+ child = iq.addElement('pubsub', defaultUri=self.pubsubNS+"#owner")
+ child = child.addElement('configure')
+ child['node'] = nodeName
+ iq.addCallback(self.responseFromConfigurationForm, nodeName)
+ iq.send(to=self.settings['ServiceAddress'])
+
+ def _getChild(self, element, name):
+ for child in element.elements():
+ if child.name == name:
+ return child
+ return None
+
+ def responseFromConfigurationForm(self, nodeName, iq):
+ if iq['type'] == 'result':
+ self.sendDebug("Received configuration form (%s)" % (nodeName,), iq)
+ pubsubElement = self._getChild(iq, 'pubsub')
+ if pubsubElement:
+ configureElement = self._getChild(pubsubElement, 'configure')
+ if configureElement:
+ formElement = configureElement.firstChildElement()
+ if formElement['type'] == 'form':
+ # We've found the form; start building a response
+ filledIq = IQ(self.xmlStream, type='set')
+ filledPubSub = filledIq.addElement('pubsub',
+ defaultUri=self.pubsubNS+"#owner")
+ filledConfigure = filledPubSub.addElement('configure')
+ filledConfigure['node'] = nodeName
+ filledForm = filledConfigure.addElement('x',
+ defaultUri='jabber:x:data')
+ filledForm['type'] = 'submit'
+
+ for field in formElement.elements():
+ if field.name == 'field':
+ value = self.nodeConf.get(field['var'], None)
+ if value is not None:
+ valueElement = self._getChild(field,
+ 'value')
+ valueElement.children = []
+ valueElement.addContent(value)
+ filledForm.addChild(field)
+ filledIq.addCallback(self.responseFromConfiguration,
+ nodeName)
+ filledIq.send(to=self.settings['ServiceAddress'])
+ else:
+ self.log_error("PubSub configuration form request error: %s" %
+ (iq.toXml().encode('ascii', 'replace')),)
+ self.sendError("Failed to receive configuration form (%s)" % (nodeName,), iq)
+
+
+ def responseFromConfiguration(self, nodeName, iq):
+ if iq['type'] == 'result':
+ self.log_debug("PubSub node %s is configured" % (nodeName,))
+ self.sendDebug("Configured node (%s)" % (nodeName,), iq)
+ self.publishNode(nodeName)
+
+ else:
+ self.log_error("PubSub node configuration error: %s" %
+ (iq.toXml().encode('ascii', 'replace')),)
+ self.sendError("Failed to configure node (%s)" % (nodeName,), iq)
+
+
+ def streamOpened(self, xmlStream):
+ self.xmlStream = xmlStream
+ xmlStream.addObserver('/message', self.handleMessage)
+
+ def streamClosed(self):
+ self.xmlStream = None
+
+ def sendDebug(self, txt, element):
+ if self.sendDebugMessages:
+ testJid = self.settings.get("TestJID", "")
+ if testJid:
+ txt = "DEBUG: %s %s" % (txt, element.toXml().encode('ascii',
+ 'replace'))
+ self.sendAlert(testJid, txt)
+
+ def sendError(self, txt, element):
+ testJid = self.settings.get("TestJID", "")
+ if testJid:
+ txt = "ERROR: %s %s" % (txt, element.toXml().encode('ascii',
+ 'replace'))
+ self.sendAlert(testJid, txt)
+
+ def sendAlert(self, jid, txt):
+ if self.xmlStream is not None:
+ message = domish.Element(('jabber:client', 'message'))
+ message['to'] = JID(jid).full()
+ message.addElement('body', content=txt)
+ self.xmlStream.send(message)
+
+ def handleMessage(self, iq):
+ body = getattr(iq, 'body', None)
+ if body:
+ response = None
+ txt = str(body).lower()
+ if txt == "help":
+ response = "debug on, debug off"
+ elif txt == "debug on":
+ self.sendDebugMessages = True
+ response = "Debugging on"
+ elif txt == "debug off":
+ self.sendDebugMessages = False
+ response = "Debugging off"
+ else:
+ response = "I don't understand. Try 'help'."
+
+ if response:
+ message = domish.Element(('jabber:client', 'message'))
+ message['to'] = JID(iq['from']).full()
+ message.addElement('body', content=response)
+ self.xmlStream.send(message)
+
+
+
+
+class XMPPNotificationFactory(xmlstream.XmlStreamFactory, LoggingMixIn):
+
+ def __init__(self, notifier, settings, reactor=None):
+ self.notifier = notifier
+ self.settings = settings
+ self.jid = settings['JID']
+ self.keepAliveSeconds = settings.get('KeepAliveSeconds', 120)
+ self.xmlStream = None
+ self.presenceCall = None
+ if reactor is None:
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+ xmlstream.XmlStreamFactory.__init__(self,
+ BasicAuthenticator(JID(self.jid), settings['Password']))
+
+ self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self.connected)
+ self.addBootstrap(xmlstream.STREAM_END_EVENT, self.disconnected)
+ self.addBootstrap(xmlstream.INIT_FAILED_EVENT, self.initFailed)
+
+ self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self.authenticated)
+ self.addBootstrap(BasicAuthenticator.INVALID_USER_EVENT,
+ self.authFailed)
+ self.addBootstrap(BasicAuthenticator.AUTH_FAILED_EVENT,
+ self.authFailed)
+
+ def connected(self, xmlStream):
+ self.xmlStream = xmlStream
+ self.log_info("XMPP connection successful")
+ # Log all traffic
+ xmlStream.rawDataInFn = self.rawDataIn
+ xmlStream.rawDataOutFn = self.rawDataOut
+
+ def disconnected(self, xmlStream):
+ self.notifier.streamClosed()
+ self.xmlStream = None
+ if self.presenceCall is not None:
+ self.presenceCall.cancel()
+ self.presenceCall = None
+ self.log_info("XMPP disconnected")
+
+ def initFailed(self, failure):
+ self.xmlStream = None
+ self.log_info("XMPP Initialization failed: %s" % (failure,))
+
+ def authenticated(self, xmlStream):
+ self.log_info("XMPP authentication successful: %s" % (self.jid,))
+ # xmlStream.addObserver('/message', self.handleMessage)
+ self.sendPresence()
+ self.notifier.streamOpened(xmlStream)
+
+ def authFailed(self, e):
+ self.log_error("Failed to log in XMPP (%s); check JID and password" %
+ (self.jid,))
+
+ def sendPresence(self):
+ if self.xmlStream is not None:
+ presence = domish.Element(('jabber:client', 'presence'))
+ self.xmlStream.send(presence)
+ self.presenceCall = self.reactor.callLater(self.keepAliveSeconds,
+ self.sendPresence)
+
+ def rawDataIn(self, buf):
+ self.log_debug("RECV: %s" % unicode(buf, 'utf-8').encode('ascii',
+ 'replace'))
+
+ def rawDataOut(self, buf):
+ self.log_debug("SEND: %s" % unicode(buf, 'utf-8').encode('ascii',
+ 'replace'))
+
+
+def getPubSubConfiguration(config):
+ # TODO: Should probably cache this
+ results = { 'enabled' : False }
+
+ # return the first enabled xmpp service settings in the config file
+ for settings in config.Notifications["Services"]:
+ if (settings["Service"] == "twistedcaldav.notify.XMPPNotifierService"
+ and settings["Enabled"]):
+ results['enabled'] = True
+ results['service'] = settings['ServiceAddress']
+ results['host'] = config.ServerHostName
+ results['port'] = config.SSLPort or config.HTTPPort
+
+ return results
+
+def getPubSubPath(uri, pubSubConfiguration):
+ return ("/Public/CalDAV/%s/%d/%s/" % (pubSubConfiguration['host'],
+ pubSubConfiguration['port'], uri.strip("/")))
+
+def getPubSubXMPPURI(uri, pubSubConfiguration):
+ return "xmpp:%s?pubsub;node=%s" % (pubSubConfiguration['service'],
+ getPubSubPath(uri, pubSubConfiguration))
+
+#
+# Notification Server service config
+#
+
+class NotificationOptions(Options):
+ optParameters = [[
+ "config", "f", "/etc/caldavd/caldavd.plist", "Path to configuration file."
+ ]]
+
+ def __init__(self, *args, **kwargs):
+ super(NotificationOptions, self).__init__(*args, **kwargs)
+
+ self.overrides = {}
+
+ def _coerceOption(self, configDict, key, value):
+ """
+ Coerce the given C{val} to type of C{configDict[key]}
+ """
+ if key in configDict:
+ if isinstance(configDict[key], bool):
+ value = value == "True"
+
+ elif isinstance(configDict[key], (int, float, long)):
+ value = type(configDict[key])(value)
+
+ elif isinstance(configDict[key], (list, tuple)):
+ value = value.split(',')
+
+ elif isinstance(configDict[key], dict):
+ raise UsageError(
+ "Dict options not supported on the command line"
+ )
+
+ elif value == 'None':
+ value = None
+
+ return value
+
+ def _setOverride(self, configDict, path, value, overrideDict):
+ """
+ Set the value at path in configDict
+ """
+ key = path[0]
+
+ if len(path) == 1:
+ overrideDict[key] = self._coerceOption(configDict, key, value)
+ return
+
+ if key in configDict:
+ if not isinstance(configDict[key], dict):
+ raise UsageError(
+ "Found intermediate path element that is not a dictionary"
+ )
+
+ if key not in overrideDict:
+ overrideDict[key] = {}
+
+ self._setOverride(
+ configDict[key], path[1:],
+ value, overrideDict[key]
+ )
+
+
+ def opt_option(self, option):
+ """
+ Set an option to override a value in the config file. True, False, int,
+ and float options are supported, as well as comma seperated lists. Only
+ one option may be given for each --option flag, however multiple
+ --option flags may be specified.
+ """
+
+ if "=" in option:
+ path, value = option.split('=')
+ self._setOverride(
+ defaultConfig,
+ path.split('/'),
+ value,
+ self.overrides
+ )
+ else:
+ self.opt_option('%s=True' % (option,))
+
+ opt_o = opt_option
+
+ def postOptions(self):
+ parseConfig(self['config'])
+ config.updateDefaults(self.overrides)
+
+
+class NotificationServiceMaker(object):
+ implements(IPlugin, service.IServiceMaker)
+
+ tapname = "caldav_notifier"
+ description = "Notification Server"
+ options = NotificationOptions
+
+ def makeService(self, options):
+
+ multiService = service.MultiService()
+
+ notifiers = []
+ for settings in config.Notifications["Services"]:
+ if settings["Enabled"]:
+ notifier = namedClass(settings["Service"])(settings)
+ notifier.setServiceParent(multiService)
+ notifiers.append(notifier)
+
+ internet.TCPServer(
+ config.Notifications["InternalNotificationPort"],
+ InternalNotificationFactory(notifiers,
+ delaySeconds=config.Notifications["CoalesceSeconds"])
+ ).setServiceParent(multiService)
+
+ return multiService
+
+
+class SimpleLineNotifierService(service.Service):
+
+ def __init__(self, settings):
+ self.notifier = SimpleLineNotifier(settings)
+ self.server = internet.TCPServer(settings["Port"],
+ SimpleLineNotificationFactory(self.notifier))
+
+ def enqueue(self, uri):
+ self.notifier.enqueue(uri)
+
+ def startService(self):
+ self.server.startService()
+
+ def stopService(self):
+ self.server.stopService()
+
+
+class XMPPNotifierService(service.Service):
+
+ def __init__(self, settings):
+ self.notifier = XMPPNotifier(settings)
+ self.client = internet.TCPClient(settings["Host"], settings["Port"],
+ XMPPNotificationFactory(self.notifier, settings))
+
+ def enqueue(self, uri):
+ self.notifier.enqueue(uri)
+
+ def startService(self):
+ self.client.startService()
+
+ def stopService(self):
+ self.client.stopService()
+
Modified: CalendarServer/trunk/twistedcaldav/static.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/static.py 2008-07-11 18:09:51 UTC (rev 2689)
+++ CalendarServer/trunk/twistedcaldav/static.py 2008-07-11 18:37:40 UTC (rev 2690)
@@ -73,6 +73,9 @@
from twistedcaldav.cache import DisabledCacheNotifier, PropfindCacheMixin
+from twistedcaldav.notify import getPubSubConfiguration, getPubSubPath
+from twistedcaldav.notify import getPubSubXMPPURI
+
log = Logger()
class CalDAVFile (CalDAVResource, DAVFile):
@@ -559,6 +562,10 @@
"""
cacheNotifierFactory = DisabledCacheNotifier
+ liveProperties = CalDAVFile.liveProperties + (
+ (customxml.calendarserver_namespace, "xmpp-uri"),
+ )
+
def __init__(self, path, parent, record):
"""
@param path: the path to the file which will back the resource.
@@ -602,6 +609,23 @@
return super(CalendarHomeFile, self).getChild(name)
+ def readProperty(self, property, request):
+ if type(property) is tuple:
+ qname = property
+ else:
+ qname = property.qname()
+
+ if qname == (customxml.calendarserver_namespace, "xmpp-uri"):
+ pubSubConfiguration = getPubSubConfiguration(config)
+ if pubSubConfiguration['enabled']:
+ return succeed(customxml.PubSubXMPPURIProperty(
+ getPubSubXMPPURI(self.url(), pubSubConfiguration)))
+ else:
+ return succeed(customxml.PubSubXMPPURIProperty())
+
+ return super(CalendarHomeFile, self).readProperty(property, request)
+
+
class ScheduleFile (AutoProvisioningFileMixIn, CalDAVFile):
def __init__(self, path, parent):
super(ScheduleFile, self).__init__(path, principalCollections=parent.principalCollections())
Modified: CalendarServer/trunk/twistedcaldav/tap.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/tap.py 2008-07-11 18:09:51 UTC (rev 2689)
+++ CalendarServer/trunk/twistedcaldav/tap.py 2008-07-11 18:37:40 UTC (rev 2690)
@@ -55,6 +55,7 @@
from twistedcaldav.timezones import TimezoneCache
from twistedcaldav import pdmonster
from twistedcaldav import memcachepool
+from twistedcaldav.notify import installNotificationClient
log = Logger()
@@ -491,6 +492,14 @@
config.Memcached["MaxClients"])
#
+ # Configure NotificationClient
+ #
+ if config.Notifications["Enabled"]:
+ installNotificationClient(reactor,
+ config.Notifications["InternalNotificationHost"],
+ config.Notifications["InternalNotificationPort"])
+
+ #
# Setup Resource hierarchy
#
Copied: CalendarServer/trunk/twistedcaldav/test/test_notify.py (from rev 2689, CalendarServer/branches/users/sagen/notifications/twistedcaldav/test/test_notify.py)
===================================================================
--- CalendarServer/trunk/twistedcaldav/test/test_notify.py (rev 0)
+++ CalendarServer/trunk/twistedcaldav/test/test_notify.py 2008-07-11 18:37:40 UTC (rev 2690)
@@ -0,0 +1,485 @@
+##
+# Copyright (c) 2008 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from copy import deepcopy
+
+from twisted.trial.unittest import TestCase
+from twisted.internet.task import Clock
+from twisted.words.protocols.jabber.client import IQ
+from twistedcaldav.notify import *
+from twistedcaldav import config as config_mod
+from twistedcaldav.config import Config
+
+
+
+class NotificationClientUserTests(TestCase):
+
+ class NotificationClientUser(NotificationClientUserMixIn):
+ pass
+
+ def test_installNoficationClient(self):
+ self.assertEquals(getNotificationClient(), None)
+ self.clock = Clock()
+ installNotificationClient(self.clock, None, None,
+ klass=StubNotificationClient)
+ notificationClient = getNotificationClient()
+ self.assertNotEquals(notificationClient, None)
+
+ clientUser = self.NotificationClientUser()
+ clientUser.sendNotification("a")
+ self.assertEquals(notificationClient.lines, ["a"])
+
+
+class NotificationClientFactoryTests(TestCase):
+
+ def setUp(self):
+ self.client = StubNotificationClient(None, None, None)
+ self.factory = NotificationClientFactory(self.client)
+ self.factory.protocol = StubNotificationClientProtocol
+
+ def test_connect(self):
+ self.assertEquals(self.factory.isReady(), False)
+ protocol = self.factory.buildProtocol(None)
+ protocol.connectionMade()
+ self.assertEquals(self.client.observers, set([protocol]))
+ self.assertEquals(self.factory.isReady(), True)
+
+ protocol.connectionLost(None)
+ self.assertEquals(self.client.observers, set())
+ self.assertEquals(self.factory.isReady(), False)
+
+
+class StubNotificationClient(object):
+
+ def __init__(self, reactor, host, port):
+ self.lines = []
+ self.observers = set()
+
+ def send(self, uri):
+ self.lines.append(uri)
+
+ def addObserver(self, observer):
+ self.observers.add(observer)
+
+ def removeObserver(self, observer):
+ self.observers.remove(observer)
+
+ def connectionMade(self):
+ pass
+
+class StubNotificationClientProtocol(object):
+
+ def __init__(self):
+ self.lines = []
+
+ def sendLine(self, line):
+ self.lines.append(line)
+
+ def connectionMade(self):
+ self.client.addObserver(self)
+ self.factory.connectionMade()
+
+ def connectionLost(self, reason):
+ self.client.removeObserver(self)
+ self.factory.connected = False
+
+
+class NotificationClientTests(TestCase):
+
+ def setUp(self):
+ self.client = NotificationClient(Clock(), None, None)
+ self.client.factory = StubNotificationClientFactory()
+
+ def test_sendWhileNotConnected(self):
+ self.client.send("a")
+ self.assertEquals(self.client.queued, set(["a"]))
+
+ def test_sendWhileConnected(self):
+ protocol = StubNotificationClientProtocol()
+ self.client.addObserver(protocol)
+ self.client.factory.connected = True
+ self.client.send("a")
+ self.assertEquals(self.client.queued, set())
+ self.assertEquals(protocol.lines, ["a"])
+
+ def test_sendQueue(self):
+ self.client.send("a")
+ self.assertEquals(self.client.queued, set(["a"]))
+ protocol = StubNotificationClientProtocol()
+ self.client.addObserver(protocol)
+ self.client.factory.connected = True
+ self.client.connectionMade()
+ self.assertEquals(protocol.lines, ["a"])
+ self.assertEquals(self.client.queued, set())
+
+
+class StubNotificationClientFactory(object):
+
+ def __init__(self):
+ self.connected = False
+
+ def isReady(self):
+ return self.connected
+
+
+class CoalescerTests(TestCase):
+
+ def setUp(self):
+ self.clock = Clock()
+ self.notifier = StubNotifier()
+ self.coalescer = Coalescer([self.notifier], reactor=self.clock)
+
+ def test_delayedNotifications(self):
+ self.coalescer.add("A")
+ self.assertEquals(self.notifier.notifications, [])
+ self.clock.advance(5)
+ self.assertEquals(self.notifier.notifications, ["A"])
+
+ def test_removeDuplicates(self):
+ self.coalescer.add("A")
+ self.coalescer.add("A")
+ self.clock.advance(5)
+ self.assertEquals(self.notifier.notifications, ["A"])
+
+
+class StubNotifier(object):
+
+ def __init__(self):
+ self.notifications = []
+ self.observers = set()
+ self.playbackHistory = []
+
+ def enqueue(self, uri):
+ self.notifications.append(uri)
+
+ def playback(self, protocol, old_seq):
+ self.playbackHistory.append((protocol, old_seq))
+
+ def addObserver(self, observer):
+ self.observers.add(observer)
+
+ def removeObserver(self, observer):
+ self.observers.remove(observer)
+
+
+class SimpleLineNotifierTests(TestCase):
+
+ def setUp(self):
+ self.clock = Clock()
+ self.notifier = SimpleLineNotifier(None)
+ self.coalescer = Coalescer([self.notifier], reactor=self.clock)
+
+ def test_initialConnection(self):
+ protocol = StubProtocol()
+ self.notifier.addObserver(protocol)
+ self.notifier.connectionMade(protocol)
+ self.assertEquals(protocol.lines, ["0"])
+
+ def test_subsequentConnection(self):
+ protocol = StubProtocol()
+ self.notifier.addObserver(protocol)
+ self.notifier.connectionMade(protocol)
+ protocol.lines = []
+ self.notifier.connectionMade(protocol)
+ self.assertEquals(protocol.lines, [])
+
+ def test_send(self):
+ protocol = StubProtocol()
+ self.notifier.addObserver(protocol)
+ self.notifier.enqueue("A")
+ self.assertEquals(protocol.lines, ["1 A"])
+
+ def test_incrementSequence(self):
+ protocol = StubProtocol()
+ self.notifier.addObserver(protocol)
+ self.notifier.enqueue("A")
+ self.notifier.enqueue("B")
+ self.assertEquals(protocol.lines, ["1 A", "2 B"])
+
+ def test_addObserver(self):
+ protocol = StubProtocol()
+ self.notifier.addObserver(protocol)
+ self.notifier.enqueue("A")
+ self.assertEquals(protocol.lines, ["1 A"])
+
+ def test_removeObserver(self):
+ protocol = StubProtocol()
+ self.notifier.addObserver(protocol)
+ self.notifier.removeObserver(protocol)
+ self.notifier.enqueue("A")
+ self.assertEquals(protocol.lines, [])
+
+ def test_multipleObservers(self):
+ protocol1 = StubProtocol()
+ protocol2 = StubProtocol()
+ self.notifier.addObserver(protocol1)
+ self.notifier.addObserver(protocol2)
+ self.notifier.enqueue("A")
+ self.assertEquals(protocol1.lines, ["1 A"])
+ self.assertEquals(protocol2.lines, ["1 A"])
+
+ def test_duplicateObservers(self):
+ protocol = StubProtocol()
+ self.notifier.addObserver(protocol)
+ self.notifier.addObserver(protocol)
+ self.notifier.enqueue("A")
+ self.assertEquals(protocol.lines, ["1 A"])
+
+ def test_playback(self):
+ self.notifier.enqueue("A")
+ self.notifier.enqueue("B")
+ self.notifier.enqueue("C")
+ protocol = StubProtocol()
+ self.notifier.addObserver(protocol)
+ self.notifier.playback(protocol, 1)
+ self.assertEquals(protocol.lines, ["2 B", "3 C"])
+
+ def test_reset(self):
+ self.notifier.enqueue("A")
+ self.assertEquals(self.notifier.history, {"A" : 1})
+ self.assertEquals(self.notifier.latestSeq, 1)
+ self.notifier.reset()
+ self.assertEquals(self.notifier.history, {})
+ self.assertEquals(self.notifier.latestSeq, 0)
+
+
+class SimpleLineNotificationFactoryTests(TestCase):
+
+ def test_buildProtocol(self):
+ notifier = StubNotifier()
+ factory = SimpleLineNotificationFactory(notifier)
+ protocol = factory.buildProtocol(None)
+ self.assertEquals(protocol.notifier, notifier)
+ self.assertIn(protocol, notifier.observers)
+
+
+class SimpleLineNotificationProtocolTests(TestCase):
+
+ def setUp(self):
+ self.notifier = StubNotifier()
+ self.protocol = SimpleLineNotificationProtocol()
+ self.protocol.notifier = self.notifier
+ self.protocol.transport = StubTransport()
+ self.notifier.addObserver(self.protocol)
+
+ def test_connectionLost(self):
+ self.protocol.connectionLost(None)
+ self.assertNotIn(self.protocol, self.notifier.observers)
+
+ def test_lineReceived(self):
+ self.protocol.lineReceived("2")
+ self.assertEquals(self.notifier.playbackHistory, [(self.protocol, 2)])
+
+ def test_lineReceivedInvalid(self):
+ self.protocol.lineReceived("bogus")
+ self.assertEquals(self.notifier.playbackHistory, [])
+
+
+
+class StubProtocol(object):
+
+ def __init__(self):
+ self.lines = []
+
+ def sendLine(self, line):
+ self.lines.append(line)
+
+
+class StubTransport(object):
+
+ def getPeer(self):
+ return "peer"
+
+
+
+
+
+
+class StubXmlStream(object):
+
+ def __init__(self):
+ self.elements = []
+
+ def send(self, element):
+ self.elements.append(element)
+
+ def addOnetimeObserver(*args, **kwds):
+ pass
+
+ def addObserver(*args, **kwds):
+ pass
+
+
+class XMPPNotifierTests(TestCase):
+
+ xmppEnabledConfig = Config(config_mod.defaultConfig)
+ xmppEnabledConfig.Notifications['Services'][1]['Enabled'] = True
+ xmppEnabledConfig.ServerHostName = "server.example.com"
+ xmppEnabledConfig.HTTPPort = 80
+
+ xmppDisabledConfig = Config(config_mod.defaultConfig)
+ xmppDisabledConfig.Notifications['Services'][1]['Enabled'] = False
+
+ def setUp(self):
+ self.xmlStream = StubXmlStream()
+ self.settings = { 'ServiceAddress' : 'pubsub.example.com' }
+ self.notifier = XMPPNotifier(self.settings, reactor=Clock(),
+ configOverride=self.xmppEnabledConfig)
+ self.notifier.streamOpened(self.xmlStream)
+
+ def test_sendWhileConnected(self):
+ self.notifier.enqueue("/principals/__uids__/test")
+
+ iq = self.xmlStream.elements[0]
+ self.assertEquals(iq.name, "iq")
+
+ pubsubElement = list(iq.elements())[0]
+ self.assertEquals(pubsubElement.name, "pubsub")
+ self.assertEquals(pubsubElement.uri, 'http://jabber.org/protocol/pubsub')
+
+ publishElement = list(pubsubElement.elements())[0]
+ self.assertEquals(publishElement.name, "publish")
+ self.assertEquals(publishElement.uri, 'http://jabber.org/protocol/pubsub')
+ self.assertEquals(publishElement['node'],
+ "/Public/CalDAV/server.example.com/80/principals/__uids__/test/")
+
+ def test_sendWhileNotConnected(self):
+ notifier = XMPPNotifier(self.settings, reactor=Clock(),
+ configOverride=self.xmppDisabledConfig)
+ notifier.enqueue("/principals/__uids__/test")
+ self.assertEquals(self.xmlStream.elements, [])
+
+ def test_publishNewNode(self):
+ self.notifier.publishNode("testNodeName")
+ iq = self.xmlStream.elements[0]
+ self.assertEquals(iq.name, "iq")
+
+ def test_publishReponse400(self):
+ response = IQ(self.xmlStream, type='error')
+ errorElement = response.addElement('error')
+ errorElement['code'] = '400'
+ self.assertEquals(len(self.xmlStream.elements), 0)
+ self.notifier.responseFromPublish("testNodeName", response)
+ self.assertEquals(len(self.xmlStream.elements), 1)
+ iq = self.xmlStream.elements[0]
+ self.assertEquals(iq.name, "iq")
+ self.assertEquals(iq['type'], "get")
+
+ pubsubElement = list(iq.elements())[0]
+ self.assertEquals(pubsubElement.name, "pubsub")
+ self.assertEquals(pubsubElement.uri,
+ 'http://jabber.org/protocol/pubsub#owner')
+ configElement = list(pubsubElement.elements())[0]
+ self.assertEquals(configElement.name, "configure")
+ self.assertEquals(configElement['node'], "testNodeName")
+
+
+ def test_publishReponse404(self):
+ response = IQ(self.xmlStream, type='error')
+ errorElement = response.addElement('error')
+ errorElement['code'] = '404'
+ self.assertEquals(len(self.xmlStream.elements), 0)
+ self.notifier.responseFromPublish("testNodeName", response)
+ self.assertEquals(len(self.xmlStream.elements), 1)
+ iq = self.xmlStream.elements[0]
+ self.assertEquals(iq.name, "iq")
+ self.assertEquals(iq['type'], "set")
+
+ pubsubElement = list(iq.elements())[0]
+ self.assertEquals(pubsubElement.name, "pubsub")
+ self.assertEquals(pubsubElement.uri,
+ 'http://jabber.org/protocol/pubsub')
+ createElement = list(pubsubElement.elements())[0]
+ self.assertEquals(createElement.name, "create")
+ self.assertEquals(createElement['node'], "testNodeName")
+
+
+ def test_configureResponse(self):
+
+ def _getChild(element, name):
+ for child in element.elements():
+ if child.name == name:
+ return child
+ return None
+
+ response = IQ(self.xmlStream, type='result')
+ pubsubElement = response.addElement('pubsub')
+ configElement = pubsubElement.addElement('configure')
+ formElement = configElement.addElement('x')
+ formElement['type'] = 'form'
+ fields = [
+ ( "unknown", "don't edit me" ),
+ ( "pubsub#deliver_payloads", "1" ),
+ ( "pubsub#persist_items", "1" ),
+ ]
+ expectedFields = {
+ "unknown" : "don't edit me",
+ "pubsub#deliver_payloads" : "0",
+ "pubsub#persist_items" : "0",
+ }
+ for field in fields:
+ fieldElement = formElement.addElement("field")
+ fieldElement['var'] = field[0]
+ fieldElement.addElement('value', content=field[1])
+
+ self.assertEquals(len(self.xmlStream.elements), 0)
+ self.notifier.responseFromConfigurationForm("testNodeName", response)
+ self.assertEquals(len(self.xmlStream.elements), 1)
+
+ iq = self.xmlStream.elements[0]
+ self.assertEquals(iq.name, "iq")
+ self.assertEquals(iq['type'], "set")
+
+ pubsubElement = list(iq.elements())[0]
+ self.assertEquals(pubsubElement.name, "pubsub")
+ configElement = list(pubsubElement.elements())[0]
+ self.assertEquals(configElement.name, "configure")
+ self.assertEquals(configElement['node'], "testNodeName")
+ formElement = list(configElement.elements())[0]
+ self.assertEquals(formElement['type'], "submit")
+ for field in formElement.elements():
+ valueElement = _getChild(field, "value")
+ if valueElement is not None:
+ self.assertEquals(expectedFields[field['var']],
+ str(valueElement))
+
+
+
+class XMPPNotificationFactoryTests(TestCase):
+
+ def test_sendPresence(self):
+ clock = Clock()
+ xmlStream = StubXmlStream()
+ settings = { 'ServiceAddress' : 'pubsub.example.com', 'JID' : 'jid',
+ 'Password' : 'password', 'KeepAliveSeconds' : 5 }
+ notifier = XMPPNotifier(settings, reactor=clock)
+ factory = XMPPNotificationFactory(notifier, settings, reactor=clock)
+ factory.connected(xmlStream)
+ factory.authenticated(xmlStream)
+
+ self.assertEquals(len(xmlStream.elements), 1)
+ presence = xmlStream.elements[0]
+ self.assertEquals(presence.name, 'presence')
+
+ clock.advance(5)
+
+ self.assertEquals(len(xmlStream.elements), 2)
+ presence = xmlStream.elements[1]
+ self.assertEquals(presence.name, 'presence')
+
+ factory.disconnected(xmlStream)
+ clock.advance(5)
+ self.assertEquals(len(xmlStream.elements), 2)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080711/06623f7f/attachment-0001.html
More information about the calendarserver-changes
mailing list