[CalendarServer-changes] [10763] CalendarServer/branches/users/gaya/sharedgroups
source_changes at macosforge.org
source_changes at macosforge.org
Tue Feb 19 15:19:21 PST 2013
Revision: 10763
http://trac.calendarserver.org//changeset/10763
Author: gaya at apple.com
Date: 2013-02-19 15:19:21 -0800 (Tue, 19 Feb 2013)
Log Message:
-----------
fix sharing.py merge
Modified Paths:
--------------
CalendarServer/branches/users/gaya/sharedgroups/twistedcaldav/sharing.py
CalendarServer/branches/users/gaya/sharedgroups/twistedcaldav/test/test_sharing.py
CalendarServer/branches/users/gaya/sharedgroups/txdav/common/datastore/upgrade/test/test_migrate.py
Removed Paths:
-------------
CalendarServer/branches/users/gaya/sharedgroups/twistedcaldav/notify.py
CalendarServer/branches/users/gaya/sharedgroups/twistedcaldav/test/test_notify.py
Deleted: CalendarServer/branches/users/gaya/sharedgroups/twistedcaldav/notify.py
===================================================================
--- CalendarServer/branches/users/gaya/sharedgroups/twistedcaldav/notify.py 2013-02-19 22:15:27 UTC (rev 10762)
+++ CalendarServer/branches/users/gaya/sharedgroups/twistedcaldav/notify.py 2013-02-19 23:19:21 UTC (rev 10763)
@@ -1,1554 +0,0 @@
-# -*- test-case-name: twistedcaldav.test.test_notify -*-
-##
-# Copyright (c) 2005-2013 Apple Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-##
-
-"""
-Notification framework for Calendar Server
-
-This module implements client code which is executed within the context of
-icalserver itself, and also server code (the "notification server") which is
-run as a separate process, launched as part of "./run".
-
-The notification server process is implemented as a twistd plugin
-(with a tapname of "caldav_notifier"), and is comprised of two
-services -- one handling the internal channel between icalserver
-and notification server, the other handling the external channel
-between notification server and a remote consumer.
-"""
-
-# TODO: add CalDAVTester test for examining new xmpp-uri property
-
-import uuid
-from fnmatch import fnmatch
-
-from zope.interface import Interface, implements
-
-from twext.python.log import LoggingMixIn, Logger
-
-from twisted.internet.protocol import ReconnectingClientFactory, ServerFactory
-from twisted.internet.ssl import ClientContextFactory
-from twisted.internet.defer import inlineCallbacks, Deferred, returnValue
-from twisted.protocols.basic import LineReceiver
-from twisted.plugin import IPlugin
-from twisted.application import internet, service
-from twisted.python.usage import Options, UsageError
-from twisted.python.reflect import namedClass
-from twisted.words.protocols.jabber import xmlstream
-from twisted.words.protocols.jabber.jid import JID
-from twisted.words.protocols.jabber.client import XMPPAuthenticator, IQAuthInitializer
-from twisted.words.protocols.jabber.xmlstream import IQ
-from twisted.words.xish import domish
-from twistedcaldav.config import config
-from twistedcaldav.memcacher import Memcacher
-from twistedcaldav.stdconfig import DEFAULT_CONFIG, DEFAULT_CONFIG_FILE
-from twistedcaldav import memcachepool
-from twext.internet.gaiendpoint import GAIEndpoint
-from twext.internet.adaptendpoint import connect
-
-log = Logger()
-
-__all__ = [
- "Coalescer",
- "INotifier",
- "InternalNotificationFactory",
- "InternalNotificationProtocol",
- "NotificationClientFactory",
- "NotificationClientLineProtocol",
- "NotificationServiceMaker",
- "Notifier",
- "NotifierFactory",
- "SimpleLineNotificationFactory",
- "SimpleLineNotificationProtocol",
- "SimpleLineNotifier",
- "SimpleLineNotifierService",
- "XMPPNotificationFactory",
- "XMPPNotifier",
- "getNodeCacher",
- "getPubSubAPSConfiguration",
- "getPubSubConfiguration",
- "getPubSubHeartbeatURI",
- "getPubSubPath",
- "getPubSubXMPPURI",
- "getXMPPSettings",
-]
-
-
-# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-# Classes used within calendarserver itself
-#
-
-class Notifier(LoggingMixIn):
- """
- Provides a hook for sending change notifications to the
- L{NotifierFactory}.
- """
-
- def __init__(self, notifierFactory, label="default", id=None, prefix=None):
- self._notifierFactory = notifierFactory
- self._ids = { label : self.normalizeID(id) }
- self._notify = True
- self._prefix = prefix
-
- def normalizeID(self, id):
- urn = "urn:uuid:"
- try:
- if id.startswith(urn):
- return id[len(urn):]
- except AttributeError:
- pass
- return id
-
- def enableNotify(self, arg):
- self.log_debug("enableNotify: %s" % (self._ids['default'][1],))
- self._notify = True
-
- def disableNotify(self):
- self.log_debug("disableNotify: %s" % (self._ids['default'][1],))
- self._notify = False
-
- def notify(self, op="update"):
- for label in self._ids.iterkeys():
- id = self.getID(label=label)
- if id is not None:
- if self._notify:
- self.log_debug("Notifications are enabled: %s %s %s" %
- (op, label, id))
- self._notifierFactory.send(op, id)
- else:
- self.log_debug("Skipping notification for: %s" % (id,))
-
- def clone(self, label="default", id=None):
- newNotifier = self.__class__(self._notifierFactory)
- newNotifier._ids = self._ids.copy()
- newNotifier._ids[label] = id
- newNotifier._prefix = self._prefix
- return newNotifier
-
- def addID(self, label="default", id=None):
- self._ids[label] = self.normalizeID(id)
-
- def getID(self, label="default"):
- id = self._ids.get(label, None)
- if self._prefix is None:
- return id
- else:
- return "%s|%s" % (self._prefix, id)
-
- @inlineCallbacks
- def nodeName(self, label="default"):
- id = self.getID(label=label)
- pubSubConfig = self._notifierFactory.pubSubConfig
- name = getPubSubPath(id, pubSubConfig)
- if pubSubConfig["enabled"]:
- try:
- if self._notifierFactory.nodeCacher:
- nodeCacher = self._notifierFactory.nodeCacher
- else:
- nodeCacher = getNodeCacher()
- (yield nodeCacher.waitForNode(self, name))
- except NodeCreationException, e:
- self.log_warn(e)
- returnValue(None)
- returnValue(name)
-
-class NotificationClientLineProtocol(LineReceiver, LoggingMixIn):
- """
- Notification Client Line Protocol
-
- Sends updates to the notification server.
- """
-
- def connectionMade(self):
- self.client.addObserver(self)
- self.factory.connectionMade()
-
- def connectionLost(self, reason):
- self.client.removeObserver(self)
-
-
-class NotificationClientFactory(ReconnectingClientFactory,
- LoggingMixIn):
- """
- Notification Client Factory
-
- Sends updates to the notification server.
- """
-
- protocol = NotificationClientLineProtocol
-
- def __init__(self, client):
- self.connected = False
- self.client = client
-
- def clientConnectionLost(self, connector, reason):
- self.log_error("Connect to notification server lost: %s" % (reason,))
- self.connected = False
- ReconnectingClientFactory.clientConnectionLost(self, connector, reason)
-
- def clientConnectionFailed(self, connector, reason):
- self.log_error("Unable to connect to notification server: %s" % (reason,))
- self.connected = False
- ReconnectingClientFactory.clientConnectionFailed(self, connector, reason)
-
- def connectionMade(self):
- self.connected = True
- self.resetDelay()
- self.client.connectionMade()
-
- def isReady(self):
- return self.connected
-
- def buildProtocol(self, addr):
- p = self.protocol()
- p.factory = self
- p.client = self.client
- return p
-
-
-class NotifierFactory(LoggingMixIn):
- """
- Notifier Factory
-
- Creates Notifier instances and forwards notifications from them to the
- gateway.
- """
-
- def __init__(self, gatewayHost, gatewayPort, pubSubConfig=None,
- nodeCacher=None, reactor=None):
-
- self.factory = None
-
- self.gatewayHost = gatewayHost
- self.gatewayPort = gatewayPort
- self.pubSubConfig = pubSubConfig
- self.nodeCacher = nodeCacher
-
- self.observers = set()
- self.queued = set()
-
- if reactor is None:
- from twisted.internet import reactor
- self.reactor = reactor
-
- def send(self, op, id):
- if self.factory is None:
- self.factory = NotificationClientFactory(self)
- connect(
- GAIEndpoint(self.reactor, self.gatewayHost, self.gatewayPort),
- self.factory)
- self.log_debug("Creating factory")
-
- msg = "%s %s" % (op, str(id))
- if self.factory.isReady() and self.observers:
- for observer in self.observers:
- self.log_debug("Sending to notification server: %s" % (msg,))
- observer.sendLine(msg)
- else:
- self.log_debug("Queuing: %s" % (msg,))
- self.queued.add(msg)
-
- def connectionMade(self):
- if self.factory.isReady() and self.observers:
- for observer in self.observers:
- for msg in self.queued:
- self.log_debug("Sending from queue: %s" % (msg,))
- observer.sendLine(msg)
- self.queued.clear()
-
- def addObserver(self, observer):
- self.observers.add(observer)
-
- def removeObserver(self, observer):
- self.observers.remove(observer)
-
- def newNotifier(self, label="default", id=None, prefix=None):
- return Notifier(self, label=label, id=id, prefix=prefix)
-
-
-
-
-class NodeCreationException(Exception):
- pass
-
-class NodeCacher(Memcacher, LoggingMixIn):
-
- def __init__(self, reactor=None):
- if reactor is None:
- from twisted.internet import reactor
- self.reactor = reactor
- super(NodeCacher, self).__init__("pubsubnodes")
-
- def nodeExists(self, nodeName):
- return self.get(nodeName)
-
- def storeNode(self, nodeName):
- return self.set(nodeName, "1")
-
- @inlineCallbacks
- def waitForNode(self, notifier, nodeName):
- retryCount = 0
- verified = False
- requestedCreation = False
- while(retryCount < 5):
- if (yield self.nodeExists(nodeName)):
- verified = True
- break
-
- if not requestedCreation:
- notifier.notify(op="create")
- requestedCreation = True
-
- retryCount += 1
-
- pause = Deferred()
- def _timedDeferred():
- pause.callback(True)
- self.reactor.callLater(1, _timedDeferred)
- yield pause
-
- if not verified:
- self.log_debug("Giving up!")
- raise NodeCreationException("Could not create node %s" % (nodeName,))
-
- def createNode(self, notifier, nodeName):
- """
- Check with memcached to see if this node is known to exist, and if
- not, request it be created (without waiting)
- """
- def _nodeExistenceChecked(result):
- if not result:
- notifier.notify(op="create")
-
- d = self.nodeExists(nodeName)
- d.addCallback(_nodeExistenceChecked)
- return d
-
-
-_nodeCacher = None
-
-def getNodeCacher():
- global _nodeCacher
- if _nodeCacher is None:
- _nodeCacher = NodeCacher()
- return _nodeCacher
-
-
-
-
-
-# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
-# Classes used within Notification Server
-#
-
-#
-# Internal Channel (from icalserver to notification server)
-#
-
-class InternalNotificationProtocol(LineReceiver):
- """
- InternalNotificationProtocol
-
- Receives notifications from the calendar server.
- """
-
- def lineReceived(self, line):
- try:
- op, id = line.strip().split()
- self.factory.coalescer.add(op, id)
- except ValueError:
- # ignore invalid input
- log.error("Invalid input received on internal notification port: %s"
- % (line,))
-
-
-class InternalNotificationFactory(ServerFactory):
- """
- Internal Notification Factory
-
- Receives notifications from the calendar server.
- """
-
- protocol = InternalNotificationProtocol
-
- def __init__(self, notifiers, delaySeconds=None):
- self.coalescer = Coalescer(notifiers, delaySeconds=delaySeconds)
-
-
-
-class Coalescer(LoggingMixIn):
- """
- Coalescer
-
- A queue which hangs on to incoming ids for some period of time before
- passing them along to the external notifier listening for these updates.
- A chatty CalDAV client can make several changes in a short period of time,
- and the Coalescer buffers the external clients somewhat.
- """
-
- delaySeconds = 5
-
- # sendAnywayAfterCount can be used to control how many times a notification
- # countdown timer is reset because of new changes. Once a notification
- # has been delayed 'sendAnywayAfterCount' times, it is sent anyway,
- # otherwise a busy calendar might never have a notification sent out.
- # Set this to 0 to disable the timer reset feature.
- sendAnywayAfterCount = 0
-
- def __init__(self, notifiers, reactor=None, delaySeconds=None,
- sendAnywayAfterCount=None):
-
- if sendAnywayAfterCount:
- self.sendAnywayAfterCount = sendAnywayAfterCount
-
- if delaySeconds is not None:
- self.delaySeconds = delaySeconds
-
- if reactor is None:
- from twisted.internet import reactor
- self.reactor = reactor
-
- self.ids = {}
- self.notifiers = notifiers
-
- def add(self, op, id):
-
- if op == "create":
- # we don't want to delay a "create" notification; this opcode
- # is meant for XMPP pubsub -- it means create and configure the
- # node but don't publish to it
- for notifier in self.notifiers:
- notifier.enqueue(op, id)
-
- else: # normal update notification
- delayed, count = self.ids.get(id, [None, 0])
-
- if delayed and delayed.active():
- count += 1
- if count < self.sendAnywayAfterCount:
- # reschedule for delaySeconds in the future
- delayed.reset(self.delaySeconds)
- self.ids[id][1] = count
- self.log_debug("Delaying: %s" % (id,))
- else:
- self.log_debug("Not delaying to avoid starvation: %s" % (id,))
- else:
- self.log_debug("Scheduling: %s" % (id,))
- self.ids[id] = [self.reactor.callLater(self.delaySeconds,
- self.delayedEnqueue, op, id), 0]
-
- def delayedEnqueue(self, op, id):
- self.log_debug("Time to send: %s" % (id,))
- self.ids[id][1] = 0
- for notifier in self.notifiers:
- notifier.enqueue(op, id)
-
-
-
-#
-# External Channel (from notification server to other consumers)
-#
-
-class INotifier(Interface):
- """
- Notifier Interface
-
- Defines an enqueue method that Notifier classes need to implement.
- """
-
- def enqueue(self, op, id):
- """
- Let the notifier object know that a change has been made for this
- id, and enough time has passed to allow for coalescence.
-
- @type op: C{str}
- @type id: C{str}
- """
-
-
-class SimpleLineNotifier(LoggingMixIn):
- """
- Simple Line Notifier
-
- Listens for ids from the coalescer and writes them out to any
- connected clients. Each line is simply a sequence number, a
- space, and an id string. If the external client sends a sequence
- number, this notifier will send notification lines for each id
- that was changed since that sequence number was originally sent.
- A history of such sequence numbers is stored in a python dict.
- If the external client sends a zero, then the history is cleared
- and the next sequence number to use is reset to 1.
-
- The sequence number is stored as a python long which means it has
- essentially infinite precision. We discussed rolling over at the
- 64-bit boundary, but even if we limit the sequence number to a 64-bit
- signed integer (or 2^63), and we had 100,000 users generating the
- maximum number of notifications (which by default is 6/minute since
- we're coalescing over 10 seconds), it would take 29 million years to
- rollover.
- """
-
- implements(INotifier)
-
- def __init__(self, settings):
- self.reset()
- self.observers = set()
- self.sentReset = False
-
- def enqueue(self, op, id):
-
- if op == "update":
-
- self.latestSeq += 1L
-
- # Update history
- self.history[id] = self.latestSeq
-
- for observer in self.observers:
- msg = "%d %s" % (self.latestSeq, id)
- self.log_debug("Sending %s" % (msg,))
- observer.sendLine(msg)
-
- def reset(self):
- self.latestSeq = 0L
- self.history = { } # keys=id, values=sequenceNumber
-
- def playback(self, observer, oldSeq):
-
- hist = self.history
- toSend = [(hist[id], id) for id in hist if hist[id] > oldSeq]
- toSend.sort() # sorts the tuples based on numeric sequence number
-
- for seq, id in toSend:
- msg = "%d %s" % (seq, id)
- self.log_debug("Sending %s" % (msg,))
- observer.sendLine(msg)
-
-
- def addObserver(self, observer):
- self.observers.add(observer)
-
- def removeObserver(self, observer):
- self.observers.remove(observer)
-
- def connectionMade(self, observer):
- if not self.sentReset:
- self.log_debug("Sending 0")
- observer.sendLine("0")
- self.sentReset = True
-
-
-class SimpleLineNotificationProtocol(LineReceiver, LoggingMixIn):
- """
- Simple Line Notification Protocol
-
- Sends notifications to external consumers. Also responds to history-
- playback requests. If an integer is received from an external consumer,
- it is interpreted as a sequence number; all notifications sent since that
- sequence number was sent are resent.
- """
-
- def connectionMade(self):
- # we just received a connection from the outside; if it's the first
- # since we started running, it means we need to let them know that
- # a reset has happened. This assumes that only one connection will
- # be made to this channel; if we end up having multiple consumers
- # of this protocol, we would need to uniquely identify them.
- self.notifier.connectionMade(self)
-
- def lineReceived(self, line):
- val = line.strip()
-
- # Should be a number requesting all updates since that sequence
- try:
- oldSeq = int(val)
- except ValueError, e:
- self.log_warn("Error parsing %s: %s (from %s)" % (val, e,
- self.transport.getPeer()))
- return
-
- if oldSeq == 0:
- self.notifier.reset()
- else:
- self.notifier.playback(self, oldSeq)
-
- def connectionLost(self, reason):
- self.notifier.removeObserver(self)
-
-
-class SimpleLineNotificationFactory(ServerFactory):
- """
- Simple Line Notification Factory
-
- Sends notifications to external consumers.
- """
-
- protocol = SimpleLineNotificationProtocol
-
- def __init__(self, notifier):
- self.notifier = notifier
-
- def buildProtocol(self, addr):
- p = self.protocol()
- self.notifier.addObserver(p)
- p.notifier = self.notifier
- return p
-
-
-
-
-
-
-class XMPPNotifier(LoggingMixIn):
- """
- XMPP Notifier
-
- Uses pubsub XMPP requests to let subscribers know when there
- has been a change made to a DAV resource (currently just
- CalendarHomeResources). Uses XMPP login info from the config file
- to determine which pubsub service to connect to. When it's
- time to send a notification, XMPPNotifier computes a node path
- corresponding to the DAV resource and emits a publish request
- for that node. If the request comes back 404 XMPPNotifier will
- create the node and then go through the configuration process,
- followed by a publish retry.
-
- For monitoring purposes, you can subscribe to the server's JID
- as long as your own JID matches the "AllowedJIDs" pattern(s) in
- the config file; XMPPNotifier will send error messages to your
- JID. If you also want to receive non-error, debug messages,
- send the calendar server JID the message, "debug on". Send
- "help" for other commands.
-
- To let clients know that the notifications from the calendar server
- are still flowing, a "heartbeat" node is published to every 30
- minutes (configurable).
-
- """
-
- implements(INotifier)
-
- pubsubNS = 'http://jabber.org/protocol/pubsub'
-
- def __init__(self, settings, reactor=None, configOverride=None,
- heartbeat=True, roster=True):
- self.xmlStream = None
- self.settings = settings
- if reactor is None:
- from twisted.internet import reactor
- self.reactor = reactor
- self.config = configOverride or config
- self.doHeartbeat = heartbeat and self.settings['HeartbeatMinutes'] != 0
- self.doRoster = roster
-
- self.roster = {}
- self.outstanding = {}
-
- def lockNode(self, nodeName):
- if self.outstanding.has_key(nodeName):
- return False
- else:
- self.outstanding[nodeName] = 1
- return True
-
- def unlockNode(self, failure, nodeName):
- try:
- del self.outstanding[nodeName]
- except KeyError:
- pass
-
- def sendHeartbeat(self):
- if self.doHeartbeat and self.xmlStream is not None:
- self.enqueue("update", "", lock=False)
- self.reactor.callLater(self.settings['HeartbeatMinutes'] * 60,
- self.sendHeartbeat)
-
- def enqueue(self, op, id, lock=True):
- if self.xmlStream is not None:
- # Convert id to node
- nodeName = getPubSubPath(id, getPubSubConfiguration(self.config))
- if op == "create":
- if not self.lockNode(nodeName):
- # this node is busy, so it must already be created, or at
- # least in the proccess
- return
- self.createNode(nodeName, publish=False)
- else:
- self.publishNode(nodeName, lock=lock)
-
- def publishNode(self, nodeName, lock=True):
- if self.xmlStream is None:
- # We lost our connection
- self.unlockNode(None, nodeName)
- return
-
- try:
- if lock and not self.lockNode(nodeName):
- return
-
- iq = IQ(self.xmlStream)
- pubsubElement = iq.addElement('pubsub', defaultUri=self.pubsubNS)
- publishElement = pubsubElement.addElement('publish')
- publishElement['node'] = nodeName.decode("utf-8")
- if self.settings["NodeConfiguration"]["pubsub#deliver_payloads"] == '1':
- itemElement = publishElement.addElement('item')
- itemElement.addElement('plistfrag', defaultUri='plist-apple')
-
- self.sendDebug("Publishing (%s)" % (nodeName,), iq)
- d = iq.send(to=self.settings['ServiceAddress'])
- d.addCallback(self.publishNodeSuccess, nodeName)
- d.addErrback(self.publishNodeFailure, nodeName)
- except:
- self.unlockNode(None, nodeName)
- raise
-
- def publishNodeSuccess(self, iq, nodeName):
- self.unlockNode(None, nodeName)
- self.sendDebug("Node publish successful (%s)" % (nodeName,), iq)
-
- def publishNodeFailure(self, result, nodeName):
- try:
- iq = result.value.getElement()
-
- if iq.name == "error":
- if iq['code'] == '400':
- self.requestConfigurationForm(nodeName, True)
-
- elif iq['code'] == '404':
- self.createNode(nodeName)
- else:
- self.log_error("PubSub node publish error: %s" %
- (iq.toXml().encode('ascii', 'replace')),)
- self.sendDebug("Node publish failed (%s)" % (nodeName,), iq)
- # Don't know how to proceed
- self.unlockNode(None, nodeName)
- except AttributeError:
- # We did not get an XML response; most likely it was a disconnection
- self.unlockNode(None, nodeName)
- # Don't re-raise, just unlock and ignore
- except:
- # Note: this block is not a "finally" because in the case of a 404
- # we don't want to unlock yet
- self.unlockNode(None, nodeName)
- raise
-
- def createNode(self, nodeName, publish=True):
- if self.xmlStream is None:
- # We lost our connection
- self.unlockNode(None, nodeName)
- return
-
- try:
- iq = IQ(self.xmlStream)
- pubsubElement = iq.addElement('pubsub', defaultUri=self.pubsubNS)
- child = pubsubElement.addElement('create')
- child['node'] = nodeName.decode("utf-8")
- d = iq.send(to=self.settings['ServiceAddress'])
- d.addCallback(self.createNodeSuccess, nodeName, publish)
- d.addErrback(self.createNodeFailure, nodeName, publish)
- except:
- self.unlockNode(None, nodeName)
- raise
-
- def createNodeSuccess(self, iq, nodeName, publish):
- try:
- self.sendDebug("Node creation successful (%s)" % (nodeName,), iq)
- # now time to configure; fetch the form
- self.requestConfigurationForm(nodeName, publish)
- except:
- self.unlockNode(None, nodeName)
- raise
-
- def createNodeFailure(self, result, nodeName, publish):
- try:
- iq = result.value.getElement()
- if iq['code'] == '409':
- # node already exists, proceed to configure
- self.sendDebug("Node already exists (%s)" % (nodeName,), iq)
- self.requestConfigurationForm(nodeName, publish)
- else:
- # couldn't create node, give up
- self.unlockNode(None, nodeName)
- self.log_error("PubSub node creation error: %s" %
- (iq.toXml().encode('ascii', 'replace')),)
- self.sendError("Node creation failed (%s)" % (nodeName,), iq)
- except AttributeError:
- # We did not get an XML response; most likely it was a disconnection
- self.unlockNode(None, nodeName)
- # Don't re-raise, just unlock and ignore
- except:
- # Note: this block is not a "finally" because in the case of a 409
- # we don't want to unlock yet
- self.unlockNode(None, nodeName)
- raise
-
- def requestConfigurationForm(self, nodeName, publish):
- if self.xmlStream is None:
- # We lost our connection
- self.unlockNode(None, nodeName)
- return
-
- try:
- # XXX This codepath is not unit tested
- iq = IQ(self.xmlStream, 'get')
- child = iq.addElement('pubsub',
- defaultUri=self.pubsubNS+"#owner")
- child = child.addElement('configure')
- child['node'] = nodeName.decode("utf-8")
- d = iq.send(to=self.settings['ServiceAddress'])
- d.addCallback(self.requestConfigurationFormSuccess, nodeName,
- publish)
- d.addErrback(self.requestConfigurationFormFailure, nodeName)
- except:
- self.unlockNode(None, nodeName)
- raise
-
- def _getChild(self, element, name):
- for child in element.elements():
- if child.name == name:
- return child
- return None
-
- def requestConfigurationFormSuccess(self, iq, nodeName, publish):
- if self.xmlStream is None:
- # We lost our connection
- self.unlockNode(None, nodeName)
- return
-
- try:
- nodeConf = self.settings["NodeConfiguration"]
- self.sendDebug("Received configuration form (%s)" % (nodeName,), iq)
- pubsubElement = self._getChild(iq, 'pubsub')
- if pubsubElement:
- configureElement = self._getChild(pubsubElement, 'configure')
- if configureElement:
- formElement = configureElement.firstChildElement()
- if formElement['type'] == 'form':
- # We've found the form; start building a response
- filledIq = IQ(self.xmlStream, 'set')
- filledPubSub = filledIq.addElement('pubsub',
- defaultUri=self.pubsubNS+"#owner")
- filledConfigure = filledPubSub.addElement('configure')
- filledConfigure['node'] = nodeName.decode("utf-8")
- filledForm = filledConfigure.addElement('x',
- defaultUri='jabber:x:data')
- filledForm['type'] = 'submit'
-
- configMatches = True
- for field in formElement.elements():
- if field.name == 'field':
- var = field['var']
- if var == "FORM_TYPE":
- filledForm.addChild(field)
- else:
- value = nodeConf.get(var, None)
- if (value is not None and
- (str(self._getChild(field,
- "value")) != value)):
- # this field needs configuring
- configMatches = False
- filledField = filledForm.addElement('field')
- filledField['var'] = var
- filledField['type'] = field['type']
- valueElement = filledField.addElement('value')
- valueElement.addContent(value)
- # filledForm.addChild(field)
- if configMatches:
- # XXX This codepath is not unit tested
- cancelIq = IQ(self.xmlStream, 'set')
- cancelPubSub = cancelIq.addElement('pubsub',
- defaultUri=self.pubsubNS+"#owner")
- cancelConfig = cancelPubSub.addElement('configure')
- cancelConfig['node'] = nodeName.decode("utf-8")
- cancelX = cancelConfig.addElement('x',
- defaultUri='jabber:x:data')
- cancelX['type'] = 'cancel'
- self.sendDebug("Cancelling configuration (%s)"
- % (nodeName,), cancelIq)
- d = cancelIq.send(to=self.settings['ServiceAddress'])
- else:
- self.sendDebug("Sending configuration form (%s)"
- % (nodeName,), filledIq)
- d = filledIq.send(to=self.settings['ServiceAddress'])
- d.addCallback(self.configurationSuccess, nodeName,
- publish)
- d.addErrback(self.configurationFailure, nodeName)
- return
-
- # Couldn't process configuration form, give up
- self.unlockNode(None, nodeName)
-
- except:
- # Couldn't process configuration form, give up
- self.unlockNode(None, nodeName)
- raise
-
- def requestConfigurationFormFailure(self, result, nodeName):
- # If we get here we're giving up
- try:
- iq = result.value.getElement()
- self.log_error("PubSub configuration form request error: %s" %
- (iq.toXml().encode('ascii', 'replace')),)
- self.sendError("Failed to receive configuration form (%s)" %
- (nodeName,), iq)
- except AttributeError:
- # We did not get an XML response; most likely it was a disconnection
- pass
- finally:
- self.unlockNode(None, nodeName)
-
- def configurationSuccess(self, iq, nodeName, publish):
- if self.xmlStream is None:
- # We lost our connection
- self.unlockNode(None, nodeName)
- return
-
- try:
- self.log_debug("PubSub node %s is configured" % (nodeName,))
- self.sendDebug("Configured node (%s)" % (nodeName,), iq)
- nodeCacher = getNodeCacher()
- nodeCacher.storeNode(nodeName)
- if publish:
- self.publishNode(nodeName, lock=False)
- else:
- self.unlockNode(None, nodeName)
- except:
- self.unlockNode(None, nodeName)
- raise
-
- def configurationFailure(self, result, nodeName):
- # If we get here we're giving up
- try:
- iq = result.value.getElement()
- self.log_error("PubSub node configuration error: %s" %
- (iq.toXml().encode('ascii', 'replace')),)
- self.sendError("Failed to configure node (%s)" % (nodeName,), iq)
- except AttributeError:
- # We did not get an XML response; most likely it was a disconnection
- pass
- finally:
- self.unlockNode(None, nodeName)
-
- def deleteNode(self, nodeName):
- if self.xmlStream is None:
- # We lost our connection
- self.unlockNode(None, nodeName)
- return
-
- try:
- if not self.lockNode(nodeName):
- return
-
- iq = IQ(self.xmlStream)
- pubsubElement = iq.addElement('pubsub',
- defaultUri=self.pubsubNS+"#owner")
- publishElement = pubsubElement.addElement('delete')
- publishElement['node'] = nodeName.decode("utf-8")
- self.sendDebug("Deleting (%s)" % (nodeName,), iq)
- d = iq.send(to=self.settings['ServiceAddress'])
- d.addCallback(self.deleteNodeSuccess, nodeName)
- d.addErrback(self.deleteNodeFailure, nodeName)
- except:
- self.unlockNode(None, nodeName)
- raise
-
- def deleteNodeSuccess(self, iq, nodeName):
- self.unlockNode(None, nodeName)
- self.sendDebug("Node delete successful (%s)" % (nodeName,), iq)
-
- def deleteNodeFailure(self, result, nodeName):
- try:
- iq = result.value.getElement()
- self.log_error("PubSub node delete error: %s" %
- (iq.toXml().encode('ascii', 'replace')),)
- self.sendDebug("Node delete failed (%s)" % (nodeName,), iq)
- except AttributeError:
- # We did not get an XML response; most likely it was a disconnection
- pass
- finally:
- self.unlockNode(None, nodeName)
-
-
- def requestRoster(self):
- if self.doRoster:
- self.roster = {}
- rosterIq = IQ(self.xmlStream, 'get')
- rosterIq.addElement("query", "jabber:iq:roster")
- d = rosterIq.send()
- d.addCallback(self.handleRoster)
-
- def allowedInRoster(self, jid):
- """ Returns True if jid matches any of the patterns in AllowedJIDs,
- or is our own JID. False otherwise. """
-
- # Always allow our own JID (in case multiple servers are sharing it)
- settings = self.settings
- if settings is not None:
- if settings["JID"] == jid:
- return True
-
- for pattern in self.settings.get("AllowedJIDs", []):
- if fnmatch(jid, pattern):
- return True
- return False
-
- def handleRoster(self, iq):
- for child in iq.children[0].children:
- jid = child['jid']
- if self.allowedInRoster(jid):
- self.log_debug("In roster: %s" % (jid,))
- if not self.roster.has_key(jid):
- self.roster[jid] = { 'debug' : False, 'available' : False }
- else:
- self.log_info("JID not allowed in roster: %s" % (jid,))
-
- def handlePresence(self, iq):
- self.log_debug("Presence IQ: %s" %
- (iq.toXml().encode('ascii', 'replace')),)
- presenceType = iq.getAttribute('type')
-
- if presenceType == 'subscribe':
- frm = JID(iq['from']).userhost()
- if self.allowedInRoster(frm):
- self.roster[frm] = { 'debug' : False, 'available' : True }
- response = domish.Element(('jabber:client', 'presence'))
- response['to'] = iq['from']
- response['type'] = 'subscribed'
- self.xmlStream.send(response)
-
- # request subscription as well
- subscribe = domish.Element(('jabber:client', 'presence'))
- subscribe['to'] = iq['from']
- subscribe['type'] = 'subscribe'
- self.xmlStream.send(subscribe)
- else:
- self.log_info("JID not allowed in roster: %s" % (frm,))
- # Reject
- response = domish.Element(('jabber:client', 'presence'))
- response['to'] = iq['from']
- response['type'] = 'unsubscribed'
- self.xmlStream.send(response)
-
- elif presenceType == 'unsubscribe':
- frm = JID(iq['from']).userhost()
- if self.roster.has_key(frm):
- del self.roster[frm]
- response = domish.Element(('jabber:client', 'presence'))
- response['to'] = iq['from']
- response['type'] = 'unsubscribed'
- self.xmlStream.send(response)
-
- # remove from roster as well
- # XXX This codepath is not unit tested
- removal = IQ(self.xmlStream, 'set')
- query = removal.addElement("query", "jabber:iq:roster")
- query.addElement("item")
- query.item["jid"] = iq["from"]
- query.item["subscription"] = "remove"
- removal.send()
-
- elif presenceType == 'unavailable':
- frm = JID(iq['from']).userhost()
- if self.roster.has_key(frm):
- self.roster[frm]['available'] = False
-
- else:
- frm = JID(iq['from']).userhost()
- if self.allowedInRoster(frm):
- if self.roster.has_key(frm):
- self.roster[frm]['available'] = True
- else:
- self.roster[frm] = { 'debug' : False, 'available' : True }
- else:
- self.log_info("JID not allowed in roster: %s" % (frm,))
-
- def streamOpened(self, xmlStream):
- self.xmlStream = xmlStream
- xmlStream.addObserver('/message', self.handleMessage)
- xmlStream.addObserver('/presence', self.handlePresence)
- self.requestRoster()
- self.sendHeartbeat()
-
-
- def streamClosed(self):
- self.xmlStream = None
-
- def sendDebug(self, txt, element):
- txt = "DEBUG: %s %s" % (txt, element.toXml().encode('ascii', 'replace'))
- for jid, info in self.roster.iteritems():
- if info['available'] and info['debug']:
- self.sendAlert(jid, txt)
-
- def sendError(self, txt, element):
- txt = "ERROR: %s %s" % (txt, element.toXml().encode('ascii', 'replace'))
- for jid, info in self.roster.iteritems():
- if info['available']:
- self.sendAlert(jid, txt)
-
- def sendAlert(self, jid, txt):
- if self.xmlStream is not None:
- message = domish.Element(('jabber:client', 'message'))
- message['to'] = JID(jid).full()
- message.addElement('body', content=txt)
- self.xmlStream.send(message)
-
- def handleMessage(self, iq):
- body = getattr(iq, 'body', None)
- if body:
- response = None
- frm = JID(iq['from']).userhost()
- if frm in self.roster:
- txt = str(body).lower()
- if txt == "help":
- response = "debug on, debug off, roster, create <nodename>, publish <nodename>, hammer <count>"
- elif txt == "roster":
- response = "Roster: %s" % (str(self.roster),)
- elif txt == "debug on":
- self.roster[frm]['debug'] = True
- response = "Debugging on"
- elif txt == "debug off":
- self.roster[frm]['debug'] = False
- response = "Debugging off"
- elif txt == "outstanding":
- response = "Outstanding: %s" % (str(self.outstanding),)
- elif txt.startswith("publish"):
- try:
- publish, nodeName = str(body).split()
- except ValueError:
- response = "Please phrase it like 'publish nodename'"
- else:
- response = "Publishing node %s" % (nodeName,)
- self.reactor.callLater(1, self.enqueue, "update",
- nodeName)
- elif txt.startswith("delete"):
- try:
- delete, nodeName = str(body).split()
- except ValueError:
- response = "Please phrase it like 'delete nodename'"
- else:
- response = "Deleting node %s" % (nodeName,)
- self.reactor.callLater(1, self.deleteNode, nodeName)
- elif txt.startswith("create"):
- try:
- publish, nodeName = str(body).split()
- except ValueError:
- response = "Please phrase it like 'create nodename'"
- else:
- response = "Creating and configuring node %s" % (nodeName,)
- self.reactor.callLater(1, self.enqueue, "create",
- nodeName)
- elif txt.startswith("hammer"):
- try:
- hammer, count = txt.split()
- count = int(count)
- except ValueError:
- response = "Please phrase it like 'hammer 100'"
- else:
- response = "Hammer will commence now, %d times" % (count,)
- self.reactor.callLater(1, self.hammer, count)
- else:
- response = "I don't understand. Try 'help'."
- else:
- response = "Sorry, you are not authorized to converse with this server"
-
- if response:
- message = domish.Element(('jabber:client', 'message'))
- message['to'] = JID(iq['from']).full()
- message.addElement('body', content=response)
- self.xmlStream.send(message)
-
-
- def hammer(self, count):
- for i in xrange(count):
- self.enqueue("update", "hammertesting%d" % (i,))
-
-
-class XMPPNotificationFactory(xmlstream.XmlStreamFactory, LoggingMixIn):
-
- def __init__(self, notifier, settings, reactor=None, keepAlive=True):
- self.log_warn("Setting up XMPPNotificationFactory")
-
- self.notifier = notifier
- self.settings = settings
-
- self.jid = settings['JID']
-
- # Ignore JID resource from plist
- slash = self.jid.find("/")
- if slash > -1:
- self.jid = self.jid[0:slash]
-
- # Generate a unique JID resource value
- resource = "icalserver.%s" % uuid.uuid4().hex
- self.jid = "%s/%s" % (self.jid, resource)
-
- self.keepAliveSeconds = settings.get('KeepAliveSeconds', 120)
- self.xmlStream = None
- self.presenceCall = None
- self.doKeepAlive = keepAlive
- if reactor is None:
- from twisted.internet import reactor
- self.reactor = reactor
-
- xmlstream.XmlStreamFactory.__init__(self,
- XMPPAuthenticator(JID(self.jid), settings['Password']))
-
- self.addBootstrap(xmlstream.STREAM_CONNECTED_EVENT, self.connected)
- self.addBootstrap(xmlstream.STREAM_END_EVENT, self.disconnected)
- self.addBootstrap(xmlstream.INIT_FAILED_EVENT, self.initFailed)
-
- self.addBootstrap(xmlstream.STREAM_AUTHD_EVENT, self.authenticated)
- self.addBootstrap(IQAuthInitializer.INVALID_USER_EVENT,
- self.authFailed)
- self.addBootstrap(IQAuthInitializer.AUTH_FAILED_EVENT,
- self.authFailed)
-
- def connected(self, xmlStream):
- self.xmlStream = xmlStream
- self.log_warn("XMPP connection successful")
- # Log all traffic
- xmlStream.rawDataInFn = self.rawDataIn
- xmlStream.rawDataOutFn = self.rawDataOut
-
- def disconnected(self, xmlStream):
- self.notifier.streamClosed()
- self.xmlStream = None
- if self.presenceCall is not None:
- self.presenceCall.cancel()
- self.presenceCall = None
- self.log_warn("XMPP disconnected")
-
- def initFailed(self, failure):
- self.xmlStream = None
- self.log_warn("XMPP Initialization failed: %s" % (failure,))
-
- def authenticated(self, xmlStream):
- self.log_warn("XMPP authentication successful: %s" % (self.jid,))
- # xmlStream.addObserver('/message', self.handleMessage)
- self.sendPresence()
- self.notifier.streamOpened(xmlStream)
-
- def authFailed(self, e):
- self.log_error("Failed to log in XMPP (%s); check JID and password" %
- (self.jid,))
-
- def sendPresence(self):
- if self.doKeepAlive and self.xmlStream is not None:
- presence = domish.Element(('jabber:client', 'presence'))
- self.xmlStream.send(presence)
- self.presenceCall = self.reactor.callLater(self.keepAliveSeconds,
- self.sendPresence)
-
- def rawDataIn(self, buf):
- self.log_debug("RECV: %s" % unicode(buf, 'utf-8').encode('ascii',
- 'replace'))
-
- def rawDataOut(self, buf):
- self.log_debug("SEND: %s" % unicode(buf, 'utf-8').encode('ascii',
- 'replace'))
-
-def getXMPPSettings(config):
- """ Return the XMPP settings if both overall notifications are enabled
- and XMPP is enabled; None otherwise.
- """
- if config.Notifications.Enabled:
- # return the first enabled xmpp service settings in the config file
- for key, settings in config.Notifications.Services.iteritems():
- if (settings["Service"] == "twistedcaldav.notify.XMPPNotifierService"
- and settings["Enabled"]):
- return settings
- return None
-
-def getPubSubConfiguration(config):
- # TODO: Should probably cache this
- results = { 'enabled' : False, 'host' : config.ServerHostName }
- settings = getXMPPSettings(config)
- if settings is not None:
- results['enabled'] = True
- results['service'] = settings['ServiceAddress']
- results['port'] = config.SSLPort or config.HTTPPort
- results['xmpp-server'] = (
- settings['Host'] if settings['Port'] == 5222
- else "%s:%d" % (settings['Host'], settings['Port'])
- )
- results['heartrate'] = settings['HeartbeatMinutes']
-
- return results
-
-def getPubSubAPSConfiguration(id, config):
- """
- Returns the Apple push notification settings specific to the notifier
- ID, which includes a prefix that is either "CalDAV" or "CardDAV"
- """
- try:
- prefix, id = id.split("|", 1)
- except ValueError:
- # id has no prefix, so we can't look up APS config
- return None
-
- # If we are directly talking to apple push, advertise those settings
- applePushSettings = config.Notifications.Services.ApplePushNotifier
- if applePushSettings.Enabled:
- settings = {}
- settings["APSBundleID"] = applePushSettings[prefix]["Topic"]
- if config.EnableSSL:
- url = "https://%s:%s/%s" % (config.ServerHostName, config.SSLPort,
- applePushSettings.SubscriptionURL)
- else:
- url = "http://%s:%s/%s" % (config.ServerHostName, config.HTTPPort,
- applePushSettings.SubscriptionURL)
- settings["SubscriptionURL"] = url
- settings["SubscriptionRefreshIntervalSeconds"] = applePushSettings.SubscriptionRefreshIntervalSeconds
- settings["APSEnvironment"] = applePushSettings.Environment
- return settings
-
- # ...otherwise pick up the apple push settings we get via XMPP and
- # apn bridge
- settings = getXMPPSettings(config)
- if settings is None:
- return None
-
- if (settings.has_key(prefix) and
- settings[prefix]["APSBundleID"] and
- settings[prefix]["SubscriptionURL"]):
- return settings[prefix]
-
- return None
-
-
-
-
-def getPubSubPath(id, pubSubConfiguration):
- """
- Generate a pubsub node path from an id and the pubsub configuration
- @param id: a string identifying the resource that was modified. If
- the id has a "|" in it, what is to the left of the first "|" is
- treated as a prefix and will be used for the root of the path.
- @type id: C{str}
-
- @param pubSubConfiguration: a dictionary containing various relevant
- configuration data
- @type pubSubConfiguration: C{dict}
-
- """
-
- path = "/"
-
- try:
- prefix, id = id.split("|", 1)
- path += "%s/" % (prefix,)
- except ValueError:
- # id has no prefix
- pass
-
- path += "%s/" % (pubSubConfiguration['host'],)
- if id:
- path += "%s/" % (id,)
- return path
-
-def getPubSubXMPPURI(id, pubSubConfiguration):
- return "xmpp:%s?pubsub;node=%s" % (pubSubConfiguration['service'],
- getPubSubPath(id, pubSubConfiguration))
-
-def getPubSubHeartbeatURI(pubSubConfiguration):
- return "xmpp:%s?pubsub;node=%s" % (pubSubConfiguration['service'],
- getPubSubPath("", pubSubConfiguration))
-
-#
-# Notification Server service config
-#
-
-class NotificationOptions(Options):
- optParameters = [[
- "config", "f", DEFAULT_CONFIG_FILE, "Path to configuration file."
- ]]
-
- def __init__(self, *args, **kwargs):
- super(NotificationOptions, self).__init__(*args, **kwargs)
-
- self.overrides = {}
-
- def _coerceOption(self, configDict, key, value):
- """
- Coerce the given C{val} to type of C{configDict[key]}
- """
- if key in configDict:
- if isinstance(configDict[key], bool):
- value = value == "True"
-
- elif isinstance(configDict[key], (int, float, long)):
- value = type(configDict[key])(value)
-
- elif isinstance(configDict[key], (list, tuple)):
- value = value.split(',')
-
- elif isinstance(configDict[key], dict):
- raise UsageError(
- "Dict options not supported on the command line"
- )
-
- elif value == 'None':
- value = None
-
- return value
-
- def _setOverride(self, configDict, path, value, overrideDict):
- """
- Set the value at path in configDict
- """
- key = path[0]
-
- if len(path) == 1:
- overrideDict[key] = self._coerceOption(configDict, key, value)
- return
-
- if key in configDict:
- if not isinstance(configDict[key], dict):
- raise UsageError(
- "Found intermediate path element that is not a dictionary"
- )
-
- if key not in overrideDict:
- overrideDict[key] = {}
-
- self._setOverride(
- configDict[key], path[1:],
- value, overrideDict[key]
- )
-
-
- def opt_option(self, option):
- """
- Set an option to override a value in the config file. True, False, int,
- and float options are supported, as well as comma seperated lists. Only
- one option may be given for each --option flag, however multiple
- --option flags may be specified.
- """
-
- if "=" in option:
- path, value = option.split('=')
- self._setOverride(
- DEFAULT_CONFIG,
- path.split('/'),
- value,
- self.overrides
- )
- else:
- self.opt_option('%s=True' % (option,))
-
- opt_o = opt_option
-
- def postOptions(self):
- config.load(self['config'])
- config.updateDefaults(self.overrides)
- self.parent['pidfile'] = None
-
-
-class NotificationServiceMaker(object):
- implements(IPlugin, service.IServiceMaker)
-
- tapname = "caldav_notifier"
- description = "Notification Server"
- options = NotificationOptions
-
- def makeService(self, options):
- try:
- from setproctitle import setproctitle
- except ImportError:
- pass
- else:
- setproctitle("CalendarServer [Notification Gateway]")
-
- #
- # Configure Memcached Client Pool
- #
- memcachepool.installPools(
- config.Memcached.Pools,
- config.Memcached.MaxClients,
- )
-
- multiService = service.MultiService()
-
- from calendarserver.tap.util import storeFromConfig, getDBPool
- pool, txnFactory = getDBPool(config)
- if pool is not None:
- pool.setServiceParent(multiService)
- store = storeFromConfig(config, txnFactory)
-
- notifiers = []
- for key, settings in config.Notifications.Services.iteritems():
- if settings["Enabled"]:
- notifier = namedClass(settings["Service"]).makeService(settings,
- store, config.ServerHostName)
- notifier.setServiceParent(multiService)
- notifiers.append(notifier)
-
- internet.TCPServer(
- config.Notifications.InternalNotificationPort,
- InternalNotificationFactory(notifiers,
- delaySeconds=config.Notifications.CoalesceSeconds),
- interface=config.Notifications.BindAddress
- ).setServiceParent(multiService)
-
- return multiService
-
-
-class SimpleLineNotifierService(service.Service):
-
- @classmethod
- def makeService(cls, settings, store, serverHostName):
- return cls(settings)
-
- def __init__(self, settings):
- self.notifier = SimpleLineNotifier(settings)
- self.server = internet.TCPServer(settings["Port"],
- SimpleLineNotificationFactory(self.notifier))
-
- def enqueue(self, op, id):
- self.notifier.enqueue(op, id)
-
- def startService(self):
- self.server.startService()
-
- def stopService(self):
- self.server.stopService()
-
-
-class XMPPNotifierService(service.Service):
-
- @classmethod
- def makeService(cls, settings, store, serverHostName):
- return cls(settings)
-
- def __init__(self, settings):
- self.notifier = XMPPNotifier(settings)
-
- if settings["Port"] == 5223: # use old SSL method
- self.client = internet.SSLClient(settings["Host"], settings["Port"],
- XMPPNotificationFactory(self.notifier, settings),
- ClientContextFactory())
- else:
- # TLS and SASL
- self.client = internet.TCPClient(settings["Host"], settings["Port"],
- XMPPNotificationFactory(self.notifier, settings))
-
- def enqueue(self, op, id):
- self.notifier.enqueue(op, id)
-
- def startService(self):
- self.client.startService()
-
- def stopService(self):
- self.client.stopService()
Modified: CalendarServer/branches/users/gaya/sharedgroups/twistedcaldav/sharing.py
===================================================================
--- CalendarServer/branches/users/gaya/sharedgroups/twistedcaldav/sharing.py 2013-02-19 22:15:27 UTC (rev 10762)
+++ CalendarServer/branches/users/gaya/sharedgroups/twistedcaldav/sharing.py 2013-02-19 23:19:21 UTC (rev 10763)
@@ -575,7 +575,7 @@
mode=invitationAccessToBindModeMap[access],
status=_BIND_STATUS_INVITED,
message=summary)
- shareeStoreObject = yield self._newStoreHome.objectWithShareUID(shareUID)
+ shareeStoreObject = yield shareeHome.objectWithShareUID(shareUID)
invitation = Invitation(shareeStoreObject)
returnValue(invitation)
@@ -584,11 +584,7 @@
def _updateInvitation(self, invitation, access=None, state=None, summary=None):
mode = None if access is None else invitationAccessToBindModeMap[access]
status = None if state is None else invitationStateToBindStatusMap[state]
-
yield self._newStoreObject.updateShare(invitation._shareeStoreObject, mode=mode, status=status, message=summary)
- assert not access or access == invitation.access(), "access=%s != invitation.access()=%s" % (access, invitation.access())
- assert not state or state == invitation.state(), "state=%s != invitation.state()=%s" % (state, invitation.state())
- assert not summary or summary == invitation.summary(), "summary=%s != invitation.summary()=%s" % (summary, invitation.summary())
@inlineCallbacks
@@ -1066,13 +1062,35 @@
@inlineCallbacks
def provisionShare(self, child, request=None):
+ """
+ If the given child resource (a L{SharedCollectionMixin}) of this
+ L{SharedHomeMixin} is a I{sharee}'s view of a shared calendar object,
+ associate it with a L{Share}.
+ """
share = yield self._shareForStoreObject(child._newStoreObject, request)
if share:
child.setShare(share)
+ access = yield child._checkAccessControl()
+ if access is None:
+ returnValue(None)
+ returnValue(child)
@inlineCallbacks
def _shareForStoreObject(self, storeObject, request=None):
+ """
+ Determine the L{Share} associated with the given child.
+
+ @param child: A calendar or addressbook data store object, a child of
+ the resource represented by this L{SharedHomeMixin} instance, which
+ may be shared.
+ @type child: L{txdav.caldav.icalendarstore.ICalendar} or
+ L{txdav.carddav.iaddressbookstore.IAddressBook}
+
+ @return: a L{Share} if C{child} is not the owner's view of the share,
+ or C{None}.
+ @rtype: L{Share} or L{NoneType}
+ """
# Find a matching share
if not storeObject or storeObject.owned():
returnValue(None)
@@ -1135,21 +1153,26 @@
oldShare = yield self._shareForUID(inviteUID, request)
# Send the invite reply then add the link
- yield self._changeShare(request, "ACCEPTED", hostUrl, inviteUID, displayname)
+ yield self._changeShare(request, "ACCEPTED", hostUrl, inviteUID,
+ displayname)
if oldShare:
share = oldShare
else:
sharedResource = yield request.locateResource(hostUrl)
shareeStoreObject = yield self._newStoreHome.objectWithShareUID(inviteUID)
- share = Share(shareeStoreObject=shareeStoreObject, ownerStoreObject=sharedResource._newStoreObject, url=hostUrl)
+ share = Share(shareeStoreObject=shareeStoreObject,
+ ownerStoreObject=sharedResource._newStoreObject,
+ url=hostUrl)
- response = yield self._acceptShare(request, not oldShare, share, displayname)
+ response = yield self._acceptShare(request, not oldShare, share,
+ displayname)
returnValue(response)
@inlineCallbacks
- def acceptDirectShare(self, request, hostUrl, resourceUID, displayname=None):
+ def acceptDirectShare(self, request, hostUrl, resourceUID,
+ displayname=None):
# Just add the link
oldShare = yield self._shareForUID(resourceUID, request)
@@ -1157,20 +1180,46 @@
share = oldShare
else:
sharedCollection = yield request.locateResource(hostUrl)
- shareeStoreObject = yield sharedCollection._newStoreObject.shareWith(shareeHome=self._newStoreHome,
- mode=_BIND_MODE_DIRECT,
- status=_BIND_STATUS_ACCEPTED,
- message=displayname)
+ shareUID = yield sharedCollection._newStoreObject.shareWith(
+ shareeHome=self._newStoreHome,
+ mode=_BIND_MODE_DIRECT,
+ status=_BIND_STATUS_ACCEPTED,
+ message=displayname
+ )
- share = Share(shareeStoreObject=shareeStoreObject, ownerStoreObject=sharedCollection._newStoreObject, url=hostUrl)
+ shareeStoreObject = yield self._newStoreHome.objectWithShareUID(shareUID)
+ share = Share(shareeStoreObject=shareeStoreObject,
+ ownerStoreObject=sharedCollection._newStoreObject,
+ url=hostUrl)
- response = yield self._acceptShare(request, not oldShare, share, displayname)
+ response = yield self._acceptShare(request, not oldShare, share,
+ displayname)
returnValue(response)
@inlineCallbacks
def _acceptShare(self, request, isNewShare, share, displayname=None):
+ """
+ Mark a pending shared invitation I{to} this, the owner's collection, as
+ accepted, generating the HTTP response to the request that accepted it.
+ @param request: The HTTP request that is accepting it.
+ @type request: L{twext.web2.iweb.IRequest}
+
+ @param isNewShare: a boolean indicating whether this share is new.
+ @type isNewShare: L{bool}
+
+ @param share: The share referencing the proposed sharer and sharee.
+ @type share: L{Share}
+
+ @param displayname: the UTF-8 encoded contents of the display-name
+ property on the resource to be created while accepting.
+ @type displayname: L{bytes}
+
+ @return: a L{twext.web2.iweb.IResponse} containing a serialized
+ L{customxml.SharedAs} element as its body.
+ @rtype: L{Deferred} firing L{XMLResponse}
+ """
# Get shared collection in non-share mode first
sharedResource = yield request.locateResource(share.url())
sharee = self.principalForUID(share.shareeUID())
@@ -1402,8 +1451,26 @@
class Share(object):
+ """
+ A L{Share} represents information about a collection which has been shared
+ from one user to another.
+ """
def __init__(self, ownerStoreObject, shareeStoreObject, url):
+ """
+ @param sharerHomeChild: The data store object representing the shared
+ collection as present in the owner's home collection; the owner's
+ reference.
+ @type sharerHomeChild: L{txdav.caldav.icalendarstore.ICalendar}
+
+ @param shareeHomeChild: The data store object representing the
+ collection as present in the sharee's home collection; the sharee's
+ reference.
+ @type shareeHomeChild: L{txdav.caldav.icalendarstore.ICalendar}
+
+ @param url: The URL referring to the sharer's version of the resource.
+ @type url: L{bytes}
+ """
self._shareeStoreObject = shareeStoreObject
self._ownerStoreObject = ownerStoreObject
self._ownerResourceURL = url
@@ -1411,22 +1478,32 @@
@classmethod
def directUID(cls, shareeHome, ownerHomeChild):
- return "Direct-%s-%s" % (shareeHome._resourceID, ownerHomeChild._resourceID,)
+ return "Direct-%s-%s" % (shareeHome._resourceID,
+ ownerHomeChild._resourceID,)
def uid(self):
# Move to CommonHomeChild shareUID?
if self._shareeStoreObject.shareMode() == _BIND_MODE_DIRECT:
- return self.directUID(shareeHome=self._shareeStoreObject.viewerHome(), ownerHomeChild=self._ownerStoreObject,)
+ return self.directUID(shareeHome=self._shareeStoreObject.viewerHome(),
+ ownerHomeChild=self._ownerStoreObject,)
else:
return self._shareeStoreObject.shareUID()
def direct(self):
+ """
+ Is this L{Share} a "direct" share?
+
+ @return: a boolean indicating whether it's direct.
+ """
return self._shareeStoreObject.shareMode() == _BIND_MODE_DIRECT
def url(self):
+ """
+ @return: The URL to the owner's version of the shared collection.
+ """
return self._ownerResourceURL
Deleted: CalendarServer/branches/users/gaya/sharedgroups/twistedcaldav/test/test_notify.py
===================================================================
--- CalendarServer/branches/users/gaya/sharedgroups/twistedcaldav/test/test_notify.py 2013-02-19 22:15:27 UTC (rev 10762)
+++ CalendarServer/branches/users/gaya/sharedgroups/twistedcaldav/test/test_notify.py 2013-02-19 23:19:21 UTC (rev 10763)
@@ -1,621 +0,0 @@
-##
-# Copyright (c) 2008-2013 Apple Inc. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-##
-
-from twisted.internet.task import Clock
-from twisted.words.protocols.jabber.client import IQ
-from twisted.words.protocols.jabber.error import StanzaError
-from twistedcaldav.notify import *
-from twistedcaldav.config import Config
-from twistedcaldav.stdconfig import DEFAULT_CONFIG, PListConfigProvider
-from twistedcaldav.test.util import TestCase
-
-
-class StubResource(object):
-
- def __init__(self, id):
- self._id = id
-
- def resourceID(self):
- return self._id
-
-
-
-class NotifierTests(TestCase):
-
- def test_notifier(self):
- enabledConfig = Config(PListConfigProvider(DEFAULT_CONFIG))
- enabledConfig.Notifications["Enabled"] = True
- notifier = Notifier(None, id="test")
-
- self.assertEquals(notifier._ids, {"default": "test"})
- clone = notifier.clone(label="alt", id="altID")
- self.assertEquals("altID", clone.getID(label="alt"))
- self.assertEquals(clone._ids, {
- "default" : "test",
- "alt" : "altID",
- })
- self.assertEquals("test", notifier.getID())
- self.assertEquals(notifier._ids, {
- "default" : "test",
- })
- self.assertEquals(None, notifier.getID(label="notthere"))
-
- notifier = Notifier(None, id="urn:uuid:foo")
- self.assertEquals("foo", notifier.getID())
-
- notifier.disableNotify()
- self.assertEquals(notifier._notify, False)
- notifier.enableNotify(None)
- self.assertEquals(notifier._notify, True)
-
- notifier = Notifier(None, id="test", prefix="CalDAV")
- self.assertEquals("CalDAV|test", notifier.getID())
-
-
-
-class NotificationClientFactoryTests(TestCase):
-
- def setUp(self):
- TestCase.setUp(self)
- self.client = StubNotificationClient(None, None)
- self.factory = NotificationClientFactory(self.client)
- self.factory.protocol = StubNotificationClientProtocol
-
- def test_connect(self):
- self.assertEquals(self.factory.isReady(), False)
- protocol = self.factory.buildProtocol(None)
- protocol.connectionMade()
- self.assertEquals(self.client.observers, set([protocol]))
- self.assertEquals(self.factory.isReady(), True)
-
- protocol.connectionLost(None)
- self.assertEquals(self.client.observers, set())
- self.assertEquals(self.factory.isReady(), False)
-
-
-class StubNotificationClient(object):
-
- def __init__(self, host, port, reactor=None):
- self.lines = []
- self.observers = set()
-
- def send(self, op, id):
- self.lines.append(id)
-
- def addObserver(self, observer):
- self.observers.add(observer)
-
- def removeObserver(self, observer):
- self.observers.remove(observer)
-
- def connectionMade(self):
- pass
-
- def clear(self):
- self.lines = []
-
-class StubNotificationClientProtocol(object):
-
- def __init__(self):
- self.lines = []
-
- def sendLine(self, line):
- self.lines.append(line)
-
- def connectionMade(self):
- self.client.addObserver(self)
- self.factory.connectionMade()
-
- def connectionLost(self, reason):
- self.client.removeObserver(self)
- self.factory.connected = False
-
-
-class NotifierFactoryTests(TestCase):
-
- def setUp(self):
- TestCase.setUp(self)
- self.client = NotifierFactory(None, None, reactor=Clock())
- self.client.factory = StubNotificationClientFactory()
-
- def test_sendWhileNotConnected(self):
- self.client.send("update", "a")
- self.assertEquals(self.client.queued, set(["update a"]))
-
- def test_sendWhileConnected(self):
- protocol = StubNotificationClientProtocol()
- self.client.addObserver(protocol)
- self.client.factory.connected = True
- self.client.send("update", "a")
- self.assertEquals(self.client.queued, set())
- self.assertEquals(protocol.lines, ["update a"])
-
- def test_sendQueue(self):
- self.client.send("update", "a")
- self.assertEquals(self.client.queued, set(["update a"]))
- protocol = StubNotificationClientProtocol()
- self.client.addObserver(protocol)
- self.client.factory.connected = True
- self.client.connectionMade()
- self.assertEquals(protocol.lines, ["update a"])
- self.assertEquals(self.client.queued, set())
-
-
-class StubNotificationClientFactory(object):
-
- def __init__(self):
- self.connected = False
-
- def isReady(self):
- return self.connected
-
-
-class CoalescerTests(TestCase):
-
- def setUp(self):
- TestCase.setUp(self)
- self.clock = Clock()
- self.notifier = StubNotifier()
- self.coalescer = Coalescer([self.notifier], reactor=self.clock)
-
- def test_delayedNotifications(self):
- self.coalescer.add("update", "A")
- self.assertEquals(self.notifier.notifications, [])
- self.clock.advance(5)
- self.assertEquals(self.notifier.notifications, ["A"])
-
- def test_removeDuplicates(self):
- self.coalescer.add("update", "A")
- self.coalescer.add("update", "A")
- self.clock.advance(5)
- self.assertEquals(self.notifier.notifications, ["A"])
-
-
-class StubNotifier(object):
-
- def __init__(self):
- self.notifications = []
- self.observers = set()
- self.playbackHistory = []
-
- def enqueue(self, op, id):
- self.notifications.append(id)
-
- def playback(self, protocol, old_seq):
- self.playbackHistory.append((protocol, old_seq))
-
- def addObserver(self, observer):
- self.observers.add(observer)
-
- def removeObserver(self, observer):
- self.observers.remove(observer)
-
-
-class SimpleLineNotifierTests(TestCase):
-
- def setUp(self):
- TestCase.setUp(self)
- self.clock = Clock()
- self.notifier = SimpleLineNotifier(None)
- self.coalescer = Coalescer([self.notifier], reactor=self.clock)
-
- def test_initialConnection(self):
- protocol = StubProtocol()
- self.notifier.addObserver(protocol)
- self.notifier.connectionMade(protocol)
- self.assertEquals(protocol.lines, ["0"])
-
- def test_subsequentConnection(self):
- protocol = StubProtocol()
- self.notifier.addObserver(protocol)
- self.notifier.connectionMade(protocol)
- protocol.lines = []
- self.notifier.connectionMade(protocol)
- self.assertEquals(protocol.lines, [])
-
- def test_send(self):
- protocol = StubProtocol()
- self.notifier.addObserver(protocol)
- self.notifier.enqueue("update", "A")
- self.assertEquals(protocol.lines, ["1 A"])
-
- def test_incrementSequence(self):
- protocol = StubProtocol()
- self.notifier.addObserver(protocol)
- self.notifier.enqueue("update", "A")
- self.notifier.enqueue("update", "B")
- self.assertEquals(protocol.lines, ["1 A", "2 B"])
-
- def test_addObserver(self):
- protocol = StubProtocol()
- self.notifier.addObserver(protocol)
- self.notifier.enqueue("update", "A")
- self.assertEquals(protocol.lines, ["1 A"])
-
- def test_removeObserver(self):
- protocol = StubProtocol()
- self.notifier.addObserver(protocol)
- self.notifier.removeObserver(protocol)
- self.notifier.enqueue("update", "A")
- self.assertEquals(protocol.lines, [])
-
- def test_multipleObservers(self):
- protocol1 = StubProtocol()
- protocol2 = StubProtocol()
- self.notifier.addObserver(protocol1)
- self.notifier.addObserver(protocol2)
- self.notifier.enqueue("update", "A")
- self.assertEquals(protocol1.lines, ["1 A"])
- self.assertEquals(protocol2.lines, ["1 A"])
-
- def test_duplicateObservers(self):
- protocol = StubProtocol()
- self.notifier.addObserver(protocol)
- self.notifier.addObserver(protocol)
- self.notifier.enqueue("update", "A")
- self.assertEquals(protocol.lines, ["1 A"])
-
- def test_playback(self):
- self.notifier.enqueue("update", "A")
- self.notifier.enqueue("update", "B")
- self.notifier.enqueue("update", "C")
- protocol = StubProtocol()
- self.notifier.addObserver(protocol)
- self.notifier.playback(protocol, 1)
- self.assertEquals(protocol.lines, ["2 B", "3 C"])
-
- def test_reset(self):
- self.notifier.enqueue("update", "A")
- self.assertEquals(self.notifier.history, {"A" : 1})
- self.assertEquals(self.notifier.latestSeq, 1)
- self.notifier.reset()
- self.assertEquals(self.notifier.history, {})
- self.assertEquals(self.notifier.latestSeq, 0)
-
-
-class SimpleLineNotificationFactoryTests(TestCase):
-
- def test_buildProtocol(self):
- notifier = StubNotifier()
- factory = SimpleLineNotificationFactory(notifier)
- protocol = factory.buildProtocol(None)
- self.assertEquals(protocol.notifier, notifier)
- self.assertIn(protocol, notifier.observers)
-
-
-class SimpleLineNotificationProtocolTests(TestCase):
-
- def setUp(self):
- TestCase.setUp(self)
- self.notifier = StubNotifier()
- self.protocol = SimpleLineNotificationProtocol()
- self.protocol.notifier = self.notifier
- self.protocol.transport = StubTransport()
- self.notifier.addObserver(self.protocol)
-
- def test_connectionLost(self):
- self.protocol.connectionLost(None)
- self.assertNotIn(self.protocol, self.notifier.observers)
-
- def test_lineReceived(self):
- self.protocol.lineReceived("2")
- self.assertEquals(self.notifier.playbackHistory, [(self.protocol, 2)])
-
- def test_lineReceivedInvalid(self):
- self.protocol.lineReceived("bogus")
- self.assertEquals(self.notifier.playbackHistory, [])
-
-
-
-class StubProtocol(object):
-
- def __init__(self):
- self.lines = []
-
- def sendLine(self, line):
- self.lines.append(line)
-
-
-class StubTransport(object):
-
- def getPeer(self):
- return "peer"
-
-
-
-
-
-
-class StubXmlStream(object):
-
- def __init__(self):
- self.elements = []
-
- def send(self, element):
- self.elements.append(element)
-
- def addOnetimeObserver(self, *args, **kwds):
- pass
-
- def addObserver(self, *args, **kwds):
- pass
-
-
-class StubFailure(object):
-
- def __init__(self, value):
- self.value = value
-
-class XMPPNotifierTests(TestCase):
-
- xmppEnabledConfig = Config(PListConfigProvider(DEFAULT_CONFIG))
- xmppEnabledConfig.Notifications["Enabled"] = True
- xmppEnabledConfig.Notifications["Services"]["XMPPNotifier"]["Enabled"] = True
- xmppEnabledConfig.ServerHostName = "server.example.com"
- xmppEnabledConfig.HTTPPort = 80
-
- xmppDisabledConfig = Config(PListConfigProvider(DEFAULT_CONFIG))
- xmppDisabledConfig.Notifications["Services"]["XMPPNotifier"]["Enabled"] = False
-
- def setUp(self):
- TestCase.setUp(self)
- self.xmlStream = StubXmlStream()
- self.settings = { "ServiceAddress" : "pubsub.example.com",
- "NodeConfiguration" : { "pubsub#deliver_payloads" : "1" },
- "HeartbeatMinutes" : 30,
- }
- self.notifier = XMPPNotifier(self.settings, reactor=Clock(),
- configOverride=self.xmppEnabledConfig, heartbeat=False)
- self.notifier.streamOpened(self.xmlStream)
-
- def test_sendWhileConnected(self):
- self.notifier.enqueue("update", "test")
-
- iq = self.xmlStream.elements[1]
- self.assertEquals(iq.name, "iq")
-
- pubsubElement = list(iq.elements())[0]
- self.assertEquals(pubsubElement.name, "pubsub")
- self.assertEquals(pubsubElement.uri, "http://jabber.org/protocol/pubsub")
-
- publishElement = list(pubsubElement.elements())[0]
- self.assertEquals(publishElement.name, "publish")
- self.assertEquals(publishElement.uri, "http://jabber.org/protocol/pubsub")
- self.assertEquals(publishElement["node"],
- "/server.example.com/test/")
-
- def test_sendWhileNotConnected(self):
- notifier = XMPPNotifier(self.settings, reactor=Clock(),
- configOverride=self.xmppDisabledConfig)
- notifier.enqueue("update", "/principals/__uids__/test")
- self.assertEquals(len(self.xmlStream.elements), 1)
-
- def test_publishNewNode(self):
- self.notifier.publishNode("testNodeName")
- iq = self.xmlStream.elements[1]
- self.assertEquals(iq.name, "iq")
-
- def test_publishReponse400(self):
- failure = StubFailure(StanzaError("bad-request"))
- self.assertEquals(len(self.xmlStream.elements), 1)
- self.notifier.publishNodeFailure(failure, "testNodeName")
- self.assertEquals(len(self.xmlStream.elements), 2)
- iq = self.xmlStream.elements[1]
- self.assertEquals(iq.name, "iq")
- self.assertEquals(iq["type"], "get")
-
- pubsubElement = list(iq.elements())[0]
- self.assertEquals(pubsubElement.name, "pubsub")
- self.assertEquals(pubsubElement.uri,
- "http://jabber.org/protocol/pubsub#owner")
- configElement = list(pubsubElement.elements())[0]
- self.assertEquals(configElement.name, "configure")
- self.assertEquals(configElement["node"], "testNodeName")
-
-
- def test_publishReponse404(self):
- self.assertEquals(len(self.xmlStream.elements), 1)
- failure = StubFailure(StanzaError("item-not-found"))
- self.notifier.publishNodeFailure(failure, "testNodeName")
- self.assertEquals(len(self.xmlStream.elements), 2)
- iq = self.xmlStream.elements[1]
- self.assertEquals(iq.name, "iq")
- self.assertEquals(iq["type"], "set")
-
- pubsubElement = list(iq.elements())[0]
- self.assertEquals(pubsubElement.name, "pubsub")
- self.assertEquals(pubsubElement.uri,
- "http://jabber.org/protocol/pubsub")
- createElement = list(pubsubElement.elements())[0]
- self.assertEquals(createElement.name, "create")
- self.assertEquals(createElement["node"], "testNodeName")
-
-
- def test_configureResponse(self):
-
- def _getChild(element, name):
- for child in element.elements():
- if child.name == name:
- return child
- return None
-
- response = IQ(self.xmlStream, type="result")
- pubsubElement = response.addElement("pubsub")
- configElement = pubsubElement.addElement("configure")
- formElement = configElement.addElement("x")
- formElement["type"] = "form"
- fields = [
- ( "unknown", "don't edit me", "text-single" ),
- ( "pubsub#deliver_payloads", "0", "boolean" ),
- ( "pubsub#persist_items", "0", "boolean" ),
- ]
- expectedFields = {
- "unknown" : "don't edit me",
- "pubsub#deliver_payloads" : "1",
- "pubsub#persist_items" : "1",
- }
- for field in fields:
- fieldElement = formElement.addElement("field")
- fieldElement["var"] = field[0]
- fieldElement["type"] = field[2]
- fieldElement.addElement("value", content=field[1])
-
- self.assertEquals(len(self.xmlStream.elements), 1)
- self.notifier.requestConfigurationFormSuccess(response, "testNodeName",
- False)
- self.assertEquals(len(self.xmlStream.elements), 2)
-
- iq = self.xmlStream.elements[1]
- self.assertEquals(iq.name, "iq")
- self.assertEquals(iq["type"], "set")
-
- pubsubElement = list(iq.elements())[0]
- self.assertEquals(pubsubElement.name, "pubsub")
- configElement = list(pubsubElement.elements())[0]
- self.assertEquals(configElement.name, "configure")
- self.assertEquals(configElement["node"], "testNodeName")
- formElement = list(configElement.elements())[0]
- self.assertEquals(formElement["type"], "submit")
- for field in formElement.elements():
- valueElement = _getChild(field, "value")
- if valueElement is not None:
- self.assertEquals(expectedFields[field["var"]],
- str(valueElement))
-
-
- def test_sendHeartbeat(self):
-
- xmppConfig = Config(PListConfigProvider(DEFAULT_CONFIG))
- xmppConfig.Notifications["Enabled"] = True
- xmppConfig.Notifications["Services"]["XMPPNotifier"]["Enabled"] = True
- xmppConfig.ServerHostName = "server.example.com"
- xmppConfig.HTTPPort = 80
-
- clock = Clock()
- xmlStream = StubXmlStream()
- settings = { "ServiceAddress" : "pubsub.example.com", "JID" : "jid",
- "Password" : "password", "KeepAliveSeconds" : 5,
- "NodeConfiguration" : { "pubsub#deliver_payloads" : "1" },
- "HeartbeatMinutes" : 30 }
- notifier = XMPPNotifier(settings, reactor=clock, heartbeat=True,
- roster=False, configOverride=xmppConfig)
- factory = XMPPNotificationFactory(notifier, settings, reactor=clock,
- keepAlive=False)
- factory.connected(xmlStream)
- factory.authenticated(xmlStream)
-
- self.assertEquals(len(xmlStream.elements), 1)
- heartbeat = xmlStream.elements[0]
- self.assertEquals(heartbeat.name, "iq")
-
- clock.advance(1800)
-
- self.assertEquals(len(xmlStream.elements), 2)
- heartbeat = xmlStream.elements[1]
- self.assertEquals(heartbeat.name, "iq")
-
- factory.disconnected(xmlStream)
- clock.advance(1800)
- self.assertEquals(len(xmlStream.elements), 2)
-
-
-
-
-class XMPPNotificationFactoryTests(TestCase):
-
- def test_sendPresence(self):
- clock = Clock()
- xmlStream = StubXmlStream()
- settings = { "ServiceAddress" : "pubsub.example.com", "JID" : "jid",
- "NodeConfiguration" : { "pubsub#deliver_payloads" : "1" },
- "Password" : "password", "KeepAliveSeconds" : 5 }
- notifier = XMPPNotifier(settings, reactor=clock, heartbeat=False)
- factory = XMPPNotificationFactory(notifier, settings, reactor=clock)
- factory.connected(xmlStream)
- factory.authenticated(xmlStream)
-
- self.assertEquals(len(xmlStream.elements), 2)
- presence = xmlStream.elements[0]
- self.assertEquals(presence.name, "presence")
- iq = xmlStream.elements[1]
- self.assertEquals(iq.name, "iq")
-
- clock.advance(5)
-
- self.assertEquals(len(xmlStream.elements), 3)
- presence = xmlStream.elements[2]
- self.assertEquals(presence.name, "presence")
-
- factory.disconnected(xmlStream)
- clock.advance(5)
- self.assertEquals(len(xmlStream.elements), 3)
-
-
-
-class ConfigurationTests(TestCase):
-
- def test_disabled(self):
- disabledConfig = Config(PListConfigProvider(DEFAULT_CONFIG))
-
- # Overall notifications are disabled
- disabledConfig.Notifications["Enabled"] = False
- conf = getPubSubConfiguration(disabledConfig)
- self.assertEquals(conf, { "enabled" : False, "host" : "" })
- conf = getXMPPSettings(disabledConfig)
- self.assertEquals(conf, None)
-
- # Overall notifications are enabled, but XMPP disabled
- disabledConfig.Notifications["Enabled"] = True
- settings = getXMPPSettings(disabledConfig)
- self.assertEquals(settings, None)
-
- # Overall notifications are enabled, XMPP enabled, but no APS
- service = disabledConfig.Notifications["Services"]["XMPPNotifier"]
- service.Enabled = True
- conf = getPubSubAPSConfiguration("CalDAV|foo", disabledConfig)
- self.assertEquals(conf, None)
-
- def test_enabled(self):
- enabledConfig = Config(PListConfigProvider(DEFAULT_CONFIG))
- enabledConfig.Notifications["Enabled"] = True
- service = enabledConfig.Notifications["Services"]["XMPPNotifier"]
- service.Enabled = True
- service.Host = "example.com"
- service.Port = 5222
- service.ServiceAddress = "pubsub.example.com"
- service.CalDAV.APSBundleID = "CalDAVAPSBundleID"
- service.CalDAV.SubscriptionURL = "CalDAVSubscriptionURL"
- conf = getPubSubConfiguration(enabledConfig)
- self.assertEquals(conf, {'heartrate': 30, 'service': 'pubsub.example.com', 'xmpp-server': 'example.com', 'enabled': True, 'host': '', 'port': 0} )
- conf = getPubSubAPSConfiguration("CalDAV|foo", enabledConfig)
- self.assertEquals(conf, {'SubscriptionURL': 'CalDAVSubscriptionURL', 'APSBundleID': 'CalDAVAPSBundleID', 'APSEnvironment' : 'PRODUCTION'} )
- conf = getPubSubAPSConfiguration("noprefix", enabledConfig)
- self.assertEquals(conf, None)
- conf = getPubSubAPSConfiguration("UnknownPrefix|foo", enabledConfig)
- self.assertEquals(conf, None)
-
- def test_allowedInRoster(self):
- """
- Our own JID is implicitly included in AllowedJIDs
- """
- settings = {
- "JID" : "test1 at example.com",
- "AllowedJIDs" : ["test2 at example.com"]
- }
- notifier = XMPPNotifier(settings, heartbeat=False)
- self.assertTrue(notifier.allowedInRoster("test1 at example.com"))
- self.assertTrue(notifier.allowedInRoster("test2 at example.com"))
- self.assertFalse(notifier.allowedInRoster("test3 at example.com"))
Modified: CalendarServer/branches/users/gaya/sharedgroups/twistedcaldav/test/test_sharing.py
===================================================================
--- CalendarServer/branches/users/gaya/sharedgroups/twistedcaldav/test/test_sharing.py 2013-02-19 22:15:27 UTC (rev 10762)
+++ CalendarServer/branches/users/gaya/sharedgroups/twistedcaldav/test/test_sharing.py 2013-02-19 23:19:21 UTC (rev 10763)
@@ -734,7 +734,7 @@
@inlineCallbacks
def test_noWikiAccess(self):
"""
- If L{SharedCollectionMixin.shareeAccessControlList} detects missing
+ If L{SharedResourceMixin.shareeAccessControlList} detects missing
access controls for a directly shared collection, it will automatically
un-share that collection.
"""
Modified: CalendarServer/branches/users/gaya/sharedgroups/txdav/common/datastore/upgrade/test/test_migrate.py
===================================================================
--- CalendarServer/branches/users/gaya/sharedgroups/txdav/common/datastore/upgrade/test/test_migrate.py 2013-02-19 22:15:27 UTC (rev 10762)
+++ CalendarServer/branches/users/gaya/sharedgroups/txdav/common/datastore/upgrade/test/test_migrate.py 2013-02-19 23:19:21 UTC (rev 10763)
@@ -46,6 +46,10 @@
StoreSpawnerService, swapAMP
import copy
+def _todo(f, why):
+ f.todo = why
+ return f
+rewriteOrRemove = lambda f: _todo(f, "Rewrite or remove")
@@ -378,6 +382,7 @@
@inlineCallbacks
+ @rewriteOrRemove
def test_upgradeAddressBookHomes(self):
"""
L{UpgradeToDatabaseService.startService} will do the upgrade, then
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130219/43521570/attachment-0001.html>
More information about the calendarserver-changes
mailing list