[CalendarServer-changes] [2644] CalendarServer/branches/users/sagen/notifications
source_changes at macosforge.org
source_changes at macosforge.org
Mon Jun 30 11:12:25 PDT 2008
Revision: 2644
http://trac.macosforge.org/projects/calendarserver/changeset/2644
Author: sagen at apple.com
Date: 2008-06-30 11:12:25 -0700 (Mon, 30 Jun 2008)
Log Message:
-----------
Initial check-in of calendar notification server
Starts up a notification server process which listens for messages from
the calendar server processes, coalesces them (to reduce chattiness),
and forwards them on to external consumers.
Modified Paths:
--------------
CalendarServer/branches/users/sagen/notifications/conf/caldavd-test.plist
CalendarServer/branches/users/sagen/notifications/twisted/plugins/caldav.py
CalendarServer/branches/users/sagen/notifications/twistedcaldav/cache.py
CalendarServer/branches/users/sagen/notifications/twistedcaldav/cluster.py
CalendarServer/branches/users/sagen/notifications/twistedcaldav/config.py
CalendarServer/branches/users/sagen/notifications/twistedcaldav/tap.py
Added Paths:
-----------
CalendarServer/branches/users/sagen/notifications/twistedcaldav/notify.py
CalendarServer/branches/users/sagen/notifications/twistedcaldav/test/test_notify.py
Modified: CalendarServer/branches/users/sagen/notifications/conf/caldavd-test.plist
===================================================================
--- CalendarServer/branches/users/sagen/notifications/conf/caldavd-test.plist 2008-06-30 17:23:33 UTC (rev 2643)
+++ CalendarServer/branches/users/sagen/notifications/conf/caldavd-test.plist 2008-06-30 18:12:25 UTC (rev 2644)
@@ -405,6 +405,23 @@
<!--
+ Notifications
+ -->
+ <key>EnableNotifications</key>
+ <true/>
+ <key>CoalesceSeconds</key>
+ <integer>10</integer>
+ <key>InternalNotificationHost</key>
+ <string>localhost</string>
+ <key>InternalNotificationPort</key>
+ <integer>62309</integer>
+ <key>ExternalNotificationService</key>
+ <string>twistedcaldav.notify.SimpleLineNotifierService</string>
+ <key>SimpleLineNotificationPort</key>
+ <integer>62308</integer>
+
+
+ <!--
Miscellaneous items
-->
Modified: CalendarServer/branches/users/sagen/notifications/twisted/plugins/caldav.py
===================================================================
--- CalendarServer/branches/users/sagen/notifications/twisted/plugins/caldav.py 2008-06-30 17:23:33 UTC (rev 2643)
+++ CalendarServer/branches/users/sagen/notifications/twisted/plugins/caldav.py 2008-06-30 18:12:25 UTC (rev 2644)
@@ -30,3 +30,5 @@
TwistedCalDAV = TAP('twistedcaldav.tap.CalDAVServiceMaker')
+
+CalDAVNotifier = TAP('twistedcaldav.notify.NotificationServiceMaker')
Modified: CalendarServer/branches/users/sagen/notifications/twistedcaldav/cache.py
===================================================================
--- CalendarServer/branches/users/sagen/notifications/twistedcaldav/cache.py 2008-06-30 17:23:33 UTC (rev 2643)
+++ CalendarServer/branches/users/sagen/notifications/twistedcaldav/cache.py 2008-06-30 18:12:25 UTC (rev 2644)
@@ -29,8 +29,11 @@
from twistedcaldav.log import LoggingMixIn
from twistedcaldav.memcachepool import CachePoolUserMixIn
+from twistedcaldav.config import config
+from twistedcaldav.notify import NotificationClientUserMixIn
+
class DisabledCacheNotifier(object):
def __init__(self, *args, **kwargs):
pass
@@ -62,7 +65,9 @@
-class MemcacheChangeNotifier(LoggingMixIn, CachePoolUserMixIn):
+class MemcacheChangeNotifier(LoggingMixIn, CachePoolUserMixIn,
+ NotificationClientUserMixIn):
+
def __init__(self, resource, cachePool=None):
self._resource = resource
self._cachePool = cachePool
@@ -78,9 +83,16 @@
return: A L{Deferred} that fires when the token has been changed.
"""
- self.log_debug("Changing Cache Token for %r" % (self._resource.url(),))
+
+ url = self._resource.url()
+
+ if config.EnableNotifications:
+ self.log_debug("Notifications are enabled: %s" % (url,))
+ self.sendNotification(url)
+
+ self.log_debug("Changing Cache Token for %r" % (url,))
return self.getCachePool().set(
- 'cacheToken:%s' % (self._resource.url(),),
+ 'cacheToken:%s' % (url,),
self._newCacheToken())
Modified: CalendarServer/branches/users/sagen/notifications/twistedcaldav/cluster.py
===================================================================
--- CalendarServer/branches/users/sagen/notifications/twistedcaldav/cluster.py 2008-06-30 17:23:33 UTC (rev 2643)
+++ CalendarServer/branches/users/sagen/notifications/twistedcaldav/cluster.py 2008-06-30 18:12:25 UTC (rev 2644)
@@ -309,7 +309,18 @@
monitor.addProcess('memcached', memcachedArgv, env=parentEnv)
+ if (config.EnableNotifications and
+ config.InternalNotificationHost == "localhost"):
+ log.msg("Adding notification service")
+ notificationsArgv = [
+ config.Twisted['twistd'],
+ '-n', 'caldav_notifier',
+ '-f', options['config'],
+ ]
+ monitor.addProcess('notifications', notificationsArgv, env=parentEnv)
+
+
logger = AMPLoggingFactory(
RotatingFileAccessLoggingObserver(config.AccessLogFile))
Modified: CalendarServer/branches/users/sagen/notifications/twistedcaldav/config.py
===================================================================
--- CalendarServer/branches/users/sagen/notifications/twistedcaldav/config.py 2008-06-30 17:23:33 UTC (rev 2643)
+++ CalendarServer/branches/users/sagen/notifications/twistedcaldav/config.py 2008-06-30 18:12:25 UTC (rev 2644)
@@ -169,6 +169,16 @@
"EnableTimezoneService" : False, # Timezone service
#
+ # Notifications
+ #
+ "EnableNotifications" : False,
+ "CoalesceSeconds" : 10,
+ "InternalNotificationHost" : "localhost",
+ "InternalNotificationPort" : 62309,
+ "ExternalNotificationService" : "twistedcaldav.notify.SimpleLineNotifierService",
+ "SimpleLineNotificationPort" : 62308,
+
+ #
# Implementation details
#
# The following are specific to how the server is built, and useful
Added: CalendarServer/branches/users/sagen/notifications/twistedcaldav/notify.py
===================================================================
--- CalendarServer/branches/users/sagen/notifications/twistedcaldav/notify.py (rev 0)
+++ CalendarServer/branches/users/sagen/notifications/twistedcaldav/notify.py 2008-06-30 18:12:25 UTC (rev 2644)
@@ -0,0 +1,522 @@
+##
+# Copyright (c) 2005-2008 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Notification framework for Calendar Server
+
+This module implements client code which is executed within the context of
+icalserver itself, and also server code (the "notification server") which is
+run as a separate process, launched as part of "./run".
+
+The notification server process is implemented as a twistd plugin
+(with a tapname of "caldav_notifier"), and is comprised of two
+services -- one handling the internal channel between icalserver
+and notification server, the other handling the external channel
+between notification server and a remote consumer.
+
+The icalserver tap creates a NotificationClient object at startup;
+it deals with passing along notifications to the notification server.
+These notifications originate from cache.py:MemcacheChangeNotifier.changed().
+"""
+
+# TODO: bindAddress to local
+# TODO: "reset" / "all" upon startup
+# TODO: sequence number rollover
+
+import os
+from twisted.internet import reactor, protocol
+from twisted.protocols import basic
+from twisted.plugin import IPlugin
+from twisted.application import internet, service
+from twistedcaldav.log import LoggingMixIn
+from twisted.python.usage import Options, UsageError
+from twisted.python.reflect import namedClass
+from twistedcaldav.config import config, parseConfig, defaultConfig
+from zope.interface import Interface, implements
+import sqlite3
+
+__all__ = '''
+Coalescer getNotificationClient INotifier installNotificationClient
+InternalNotificationFactory InternalNotificationProtocol
+NotificationClient NotificationClientFactory NotificationClientLineProtocol
+NotificationClientUserMixIn NotificationOptions NotificationServiceMaker
+SimpleLineNotificationFactory SimpleLineNotificationProtocol
+SimpleLineNotifier SimpleLineNotifierService
+'''.split()
+
+
+
+
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Classes used within calendarserver itself
+#
+
+class NotificationClientUserMixIn(object):
+ """
+ Notification Client User (Mixin)
+
+ Provides a method to send change notifications to the L{NotificationClient}.
+ """
+
+ def sendNotification(self, uri):
+ getNotificationClient().send(uri)
+
+
+class NotificationClientLineProtocol(basic.LineReceiver, LoggingMixIn):
+ """
+ Notification Client Line Protocol
+
+ Sends updates to the notification server.
+ """
+
+ def connectionMade(self):
+ self.client.addObserver(self)
+ self.factory.connectionMade()
+
+ def connectionLost(self, reason):
+ self.client.removeObserver(self)
+
+
+class NotificationClientFactory(protocol.ReconnectingClientFactory,
+ LoggingMixIn):
+ """
+ Notification Client Factory
+
+ Sends updates to the notification server.
+ """
+
+ protocol = NotificationClientLineProtocol
+
+ def __init__(self, client):
+ self.connected = False
+ self.client = client
+
+ def clientConnectionLost(self, connector, reason):
+ self.log_error("Connect to notification server lost: %s" %
+ (reason,))
+ self.connected = False
+ protocol.ReconnectingClientFactory.clientConnectionLost(self,
+ connector, reason)
+
+ def clientConnectionFailed(self, connector, reason):
+ self.log_error("Unable to connect to notification server: %s" %
+ (reason,))
+ self.connected = False
+ protocol.ReconnectingClientFactory.clientConnectionFailed(self,
+ connector, reason)
+
+ def connectionMade(self):
+ self.connected = True
+ self.resetDelay()
+ self.client.connectionMade()
+
+ def isReady(self):
+ return self.connected
+
+ def buildProtocol(self, addr):
+ p = self.protocol()
+ p.factory = self
+ p.client = self.client
+ return p
+
+
+class NotificationClient(LoggingMixIn):
+ """
+ Notification Client
+
+ Forwards on notifications from NotificationClientUserMixIns to the
+ notification server. A NotificationClient is installed by the tap at
+ startup.
+ """
+
+ def __init__(self, reactor, host, port):
+ self.factory = None
+ self.reactor = reactor
+ self.host = host
+ self.port = port
+ self.observers = set()
+ self.queued = set()
+
+ def send(self, uri):
+ if self.factory is None:
+ self.factory = NotificationClientFactory(self)
+ self.reactor.connectTCP(self.host, self.port, self.factory)
+ self.log_debug("Creating factory")
+
+ if self.factory.isReady() and self.observers:
+ for observer in self.observers:
+ self.log_debug("Sending to notification server: %s" % (uri,))
+ observer.sendLine(str(uri))
+ else:
+ self.log_debug("Queing: %s" % (uri,))
+ self.queued.add(uri)
+
+ def connectionMade(self):
+ if self.factory.isReady() and self.observers:
+ for observer in self.observers:
+ for uri in self.queued:
+ self.log_debug("Sending from queue: %s" % (uri,))
+ observer.sendLine(str(uri))
+ self.queued.clear()
+
+ def addObserver(self, observer):
+ self.observers.add(observer)
+
+ def removeObserver(self, observer):
+ self.observers.remove(observer)
+
+
+_notificationClient = None
+
+def installNotificationClient(reactor, host, port, klass=NotificationClient):
+ global _notificationClient
+ _notificationClient = klass(reactor, host, port)
+
+def getNotificationClient():
+ return _notificationClient
+
+
+
+
+
+
+
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Classes used within Notification Server
+#
+
+#
+# Internal Channel (from icalserver to notification server)
+#
+
+class InternalNotificationProtocol(basic.LineReceiver):
+ """
+ InternalNotificationProtocol
+
+ Receives notifications from the calendar server.
+ """
+
+ def lineReceived(self, line):
+ val = str(line.strip())
+ self.factory.coalescer.add(val)
+
+
+class InternalNotificationFactory(protocol.ServerFactory):
+ """
+ Internal Notification Factory
+
+ Receives notifications from the calendar server.
+ """
+
+ protocol = InternalNotificationProtocol
+
+ def __init__(self, externalService, delaySeconds=None):
+ self.coalescer = Coalescer(externalService, delaySeconds=delaySeconds)
+
+
+
+class Coalescer(LoggingMixIn):
+ """
+ Coalescer
+
+ A queue which hangs on to incoming uris for some period of time before
+ passing them along to the external notifier listening for these updates.
+ A chatty CalDAV client can make several changes in a short period of time,
+ and the Coalescer buffers the external clients somewhat.
+ """
+
+ delaySeconds = 5
+
+ def __init__(self, notifier, reactor=None, delaySeconds=None):
+
+ if delaySeconds:
+ self.delaySeconds = delaySeconds
+
+ self.log_warn("Coalescer seconds: %d" % (self.delaySeconds,))
+
+ if reactor is None:
+ from twisted.internet import reactor
+ self.reactor = reactor
+
+ self.uris = dict()
+ self.notifier = notifier
+
+ def add(self, uri):
+ delayed = self.uris.get(uri, None)
+ if delayed and delayed.active():
+ delayed.reset(self.delaySeconds)
+ else:
+ self.uris[uri] = self.reactor.callLater(self.delaySeconds,
+ self.notifier.enqueue, uri)
+
+
+
+
+#
+# External Channel (from notification server to other consumers)
+#
+
+class INotifier(Interface):
+ """
+ Notifier Interface
+
+ Defines an enqueue method that Notifier classes need to implement.
+ """
+
+ def enqueue(uri):
+ """
+ Let's the notifier object know that a change has been made for this
+ uri, and enough time has passed to allow for coalescence.
+
+ @type uri: C{str}
+ """
+
+
+class SimpleLineNotifier(object):
+ """
+ Simple Line Notifier
+
+ Listens for uris from the coalescer and writes them out to any
+ connected clients. Each line is simply a sequence number, a
+ space, and a uri string. If the external client sends a sequence
+ number, this notifier will send notification lines for each uri
+ that was changed since that sequence number was originally sent.
+ A history of such sequence numbers is stored in a sqlite db.
+ """
+
+ implements(INotifier)
+
+ def __init__(self, dbName=":memory:"):
+
+ self.next_seq = 0
+ self.observers = set()
+
+ self.db = sqlite3.connect(dbName)
+ query = 'CREATE TABLE uris (uri TEXT PRIMARY KEY, seq LONG)'
+ cursor = self.db.cursor()
+ cursor.execute(query)
+ cursor.close()
+ self.db.commit()
+
+
+ def enqueue(self, uri):
+
+ self.next_seq += 1
+
+ # Update database
+ query = 'INSERT OR REPLACE INTO uris (uri, seq) VALUES (:1, :2)'
+ cursor = self.db.cursor()
+ cursor.execute(query, (uri, self.next_seq))
+ cursor.close()
+ self.db.commit()
+
+ for observer in self.observers:
+ observer.sendLine("%d %s" % (self.next_seq, uri))
+
+
+ def playback(self, observer, old_seq):
+
+ query = 'SELECT * FROM uris WHERE seq > :1 ORDER BY seq ASC'
+ cursor = self.db.cursor()
+
+ for uri, seq in cursor.execute(query, (old_seq,)):
+ observer.sendLine("%d %s" % (seq, str(uri)))
+
+ cursor.close()
+
+
+ def addObserver(self, observer):
+ self.observers.add(observer)
+
+ def removeObserver(self, observer):
+ self.observers.remove(observer)
+
+
+class SimpleLineNotificationProtocol(basic.LineReceiver, LoggingMixIn):
+ """
+ Simple Line Notification Protocol
+
+ Sends notifications to external consumers. Also responds to history-
+ playback requests. If an integer is received from an external consumer,
+ it is interpreted as a sequence number; all notifications sent since that
+ sequence number was sent are resent.
+ """
+
+ def lineReceived(self, line):
+ val = line.strip()
+
+ # Should be a number requesting all updates since that sequence
+ try:
+ old_seq = int(val)
+ except ValueError, e:
+ self.log_warn("Error parsing %s: %s (from %s)" % (val, e,
+ self.transport.getPeer()))
+ return
+
+ self.notifier.playback(self, old_seq)
+
+ def connectionLost(self, reason):
+ self.notifier.removeObserver(self)
+
+
+class SimpleLineNotificationFactory(protocol.ServerFactory):
+ """
+ Simple Line Notification Factory
+
+ Sends notifications to external consumers.
+ """
+
+ protocol = SimpleLineNotificationProtocol
+
+ def __init__(self, notifier):
+ self.notifier = notifier
+
+ def buildProtocol(self, addr):
+ p = self.protocol()
+ self.notifier.addObserver(p)
+ p.notifier = self.notifier
+ return p
+
+
+
+#
+# Notification Server service config
+#
+
+class NotificationOptions(Options):
+ optParameters = [[
+ "config", "f", "/etc/caldavd/caldavd.plist", "Path to configuration file."
+ ]]
+
+ def __init__(self, *args, **kwargs):
+ super(NotificationOptions, self).__init__(*args, **kwargs)
+
+ self.overrides = {}
+
+ def _coerceOption(self, configDict, key, value):
+ """
+ Coerce the given C{val} to type of C{configDict[key]}
+ """
+ if key in configDict:
+ if isinstance(configDict[key], bool):
+ value = value == "True"
+
+ elif isinstance(configDict[key], (int, float, long)):
+ value = type(configDict[key])(value)
+
+ elif isinstance(configDict[key], (list, tuple)):
+ value = value.split(',')
+
+ elif isinstance(configDict[key], dict):
+ raise UsageError(
+ "Dict options not supported on the command line"
+ )
+
+ elif value == 'None':
+ value = None
+
+ return value
+
+ def _setOverride(self, configDict, path, value, overrideDict):
+ """
+ Set the value at path in configDict
+ """
+ key = path[0]
+
+ if len(path) == 1:
+ overrideDict[key] = self._coerceOption(configDict, key, value)
+ return
+
+ if key in configDict:
+ if not isinstance(configDict[key], dict):
+ raise UsageError(
+ "Found intermediate path element that is not a dictionary"
+ )
+
+ if key not in overrideDict:
+ overrideDict[key] = {}
+
+ self._setOverride(
+ configDict[key], path[1:],
+ value, overrideDict[key]
+ )
+
+
+ def opt_option(self, option):
+ """
+ Set an option to override a value in the config file. True, False, int,
+ and float options are supported, as well as comma seperated lists. Only
+ one option may be given for each --option flag, however multiple
+ --option flags may be specified.
+ """
+
+ if "=" in option:
+ path, value = option.split('=')
+ self._setOverride(
+ defaultConfig,
+ path.split('/'),
+ value,
+ self.overrides
+ )
+ else:
+ self.opt_option('%s=True' % (option,))
+
+ opt_o = opt_option
+
+ def postOptions(self):
+ parseConfig(self['config'])
+ config.updateDefaults(self.overrides)
+
+
+class NotificationServiceMaker(object):
+ implements(IPlugin, service.IServiceMaker)
+
+ tapname = "caldav_notifier"
+ description = "Notification Server"
+ options = NotificationOptions
+
+ def makeService(self, options):
+
+ multiService = service.MultiService()
+
+ externalServiceClass = namedClass(config.ExternalNotificationService)
+ externalService = externalServiceClass()
+ externalService.setServiceParent(multiService)
+
+ internet.TCPServer(
+ config.InternalNotificationPort,
+ InternalNotificationFactory(externalService,
+ delaySeconds=config.CoalesceSeconds)
+ ).setServiceParent(multiService)
+
+ return multiService
+
+
+class SimpleLineNotifierService(service.Service):
+
+ def __init__(self):
+ self.notifier = SimpleLineNotifier()
+ self.server = internet.TCPServer(config.SimpleLineNotificationPort,
+ SimpleLineNotificationFactory(self.notifier))
+
+ def enqueue(self, uri):
+ self.notifier.enqueue(uri)
+
+ def startService(self):
+ self.server.startService()
+
+ def stopService(self):
+ self.server.stopService()
Modified: CalendarServer/branches/users/sagen/notifications/twistedcaldav/tap.py
===================================================================
--- CalendarServer/branches/users/sagen/notifications/twistedcaldav/tap.py 2008-06-30 17:23:33 UTC (rev 2643)
+++ CalendarServer/branches/users/sagen/notifications/twistedcaldav/tap.py 2008-06-30 18:12:25 UTC (rev 2644)
@@ -56,6 +56,7 @@
from twistedcaldav.timezones import TimezoneCache
from twistedcaldav import pdmonster
from twistedcaldav import memcachepool
+from twistedcaldav.notify import installNotificationClient
log = Logger()
@@ -492,6 +493,14 @@
config.Memcached["MaxClients"])
#
+ # Configure NotificationClient
+ #
+ if config.EnableNotifications:
+ installNotificationClient(reactor,
+ config.InternalNotificationHost,
+ config.InternalNotificationPort)
+
+ #
# Setup Resource hierarchy
#
Added: CalendarServer/branches/users/sagen/notifications/twistedcaldav/test/test_notify.py
===================================================================
--- CalendarServer/branches/users/sagen/notifications/twistedcaldav/test/test_notify.py (rev 0)
+++ CalendarServer/branches/users/sagen/notifications/twistedcaldav/test/test_notify.py 2008-06-30 18:12:25 UTC (rev 2644)
@@ -0,0 +1,277 @@
+##
+# Copyright (c) 2008 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twisted.trial.unittest import TestCase
+from twisted.internet.task import Clock
+from twistedcaldav.notify import *
+
+
+
+class NotificationClientUserTests(TestCase):
+
+ class NotificationClientUser(NotificationClientUserMixIn):
+ pass
+
+ def test_installNoficationClient(self):
+ self.assertEquals(getNotificationClient(), None)
+ self.clock = Clock()
+ installNotificationClient(self.clock, None, None,
+ klass=StubNotificationClient)
+ notificationClient = getNotificationClient()
+ self.assertNotEquals(notificationClient, None)
+
+ clientUser = self.NotificationClientUser()
+ clientUser.sendNotification("a")
+ self.assertEquals(notificationClient.lines, ["a"])
+
+
+class NotificationClientFactoryTests(TestCase):
+
+ def setUp(self):
+ self.client = StubNotificationClient(None, None, None)
+ self.factory = NotificationClientFactory(self.client)
+ self.factory.protocol = StubNotificationClientProtocol
+
+ def test_connect(self):
+ self.assertEquals(self.factory.isReady(), False)
+ protocol = self.factory.buildProtocol(None)
+ protocol.connectionMade()
+ self.assertEquals(self.client.observers, set([protocol]))
+ self.assertEquals(self.factory.isReady(), True)
+
+ protocol.connectionLost(None)
+ self.assertEquals(self.client.observers, set())
+ self.assertEquals(self.factory.isReady(), False)
+
+
+class StubNotificationClient(object):
+
+ def __init__(self, reactor, host, port):
+ self.lines = []
+ self.observers = set()
+
+ def send(self, uri):
+ self.lines.append(uri)
+
+ def addObserver(self, observer):
+ self.observers.add(observer)
+
+ def removeObserver(self, observer):
+ self.observers.remove(observer)
+
+ def connectionMade(self):
+ pass
+
+class StubNotificationClientProtocol(object):
+
+ def __init__(self):
+ self.lines = []
+
+ def sendLine(self, line):
+ self.lines.append(line)
+
+ def connectionMade(self):
+ self.client.addObserver(self)
+ self.factory.connectionMade()
+
+ def connectionLost(self, reason):
+ self.client.removeObserver(self)
+ self.factory.connected = False
+
+
+class NotificationClientTests(TestCase):
+
+ def setUp(self):
+ self.client = NotificationClient(Clock(), None, None)
+ self.client.factory = StubNotificationClientFactory()
+
+ def test_sendWhileNotConnected(self):
+ self.client.send("a")
+ self.assertEquals(self.client.queued, set(["a"]))
+
+ def test_sendWhileConnected(self):
+ protocol = StubNotificationClientProtocol()
+ self.client.addObserver(protocol)
+ self.client.factory.connected = True
+ self.client.send("a")
+ self.assertEquals(self.client.queued, set())
+ self.assertEquals(protocol.lines, ["a"])
+
+ def test_sendQueue(self):
+ self.client.send("a")
+ self.assertEquals(self.client.queued, set(["a"]))
+ protocol = StubNotificationClientProtocol()
+ self.client.addObserver(protocol)
+ self.client.factory.connected = True
+ self.client.connectionMade()
+ self.assertEquals(protocol.lines, ["a"])
+ self.assertEquals(self.client.queued, set())
+
+
+class StubNotificationClientFactory(object):
+
+ def __init__(self):
+ self.connected = False
+
+ def isReady(self):
+ return self.connected
+
+
+class CoalescerTests(TestCase):
+
+ def setUp(self):
+ self.clock = Clock()
+ self.notifier = StubNotifier()
+ self.coalescer = Coalescer(self.notifier, reactor=self.clock)
+
+ def test_delayedNotifications(self):
+ self.coalescer.add("A")
+ self.assertEquals(self.notifier.notifications, [])
+ self.clock.advance(5)
+ self.assertEquals(self.notifier.notifications, ["A"])
+
+ def test_removeDuplicates(self):
+ self.coalescer.add("A")
+ self.coalescer.add("A")
+ self.clock.advance(5)
+ self.assertEquals(self.notifier.notifications, ["A"])
+
+
+class StubNotifier(object):
+
+ def __init__(self):
+ self.notifications = []
+ self.observers = set()
+ self.playbackHistory = []
+
+ def enqueue(self, uri):
+ self.notifications.append(uri)
+
+ def playback(self, protocol, old_seq):
+ self.playbackHistory.append((protocol, old_seq))
+
+ def addObserver(self, observer):
+ self.observers.add(observer)
+
+ def removeObserver(self, observer):
+ self.observers.remove(observer)
+
+
+class SimpleLineNotifierTests(TestCase):
+
+ def setUp(self):
+ self.clock = Clock()
+ self.notifier = SimpleLineNotifier()
+ self.coalescer = Coalescer(self.notifier, reactor=self.clock)
+
+
+ def test_send(self):
+ protocol = StubProtocol()
+ self.notifier.addObserver(protocol)
+ self.notifier.enqueue("A")
+ self.assertEquals(protocol.lines, ["1 A"])
+
+ def test_incrementSequence(self):
+ protocol = StubProtocol()
+ self.notifier.addObserver(protocol)
+ self.notifier.enqueue("A")
+ self.notifier.enqueue("B")
+ self.assertEquals(protocol.lines, ["1 A", "2 B"])
+
+ def test_addObserver(self):
+ protocol = StubProtocol()
+ self.notifier.addObserver(protocol)
+ self.notifier.enqueue("A")
+ self.assertEquals(protocol.lines, ["1 A"])
+
+ def test_removeObserver(self):
+ protocol = StubProtocol()
+ self.notifier.addObserver(protocol)
+ self.notifier.removeObserver(protocol)
+ self.notifier.enqueue("A")
+ self.assertEquals(protocol.lines, [])
+
+ def test_multipleObservers(self):
+ protocol1 = StubProtocol()
+ protocol2 = StubProtocol()
+ self.notifier.addObserver(protocol1)
+ self.notifier.addObserver(protocol2)
+ self.notifier.enqueue("A")
+ self.assertEquals(protocol1.lines, ["1 A"])
+ self.assertEquals(protocol2.lines, ["1 A"])
+
+ def test_duplicateObservers(self):
+ protocol = StubProtocol()
+ self.notifier.addObserver(protocol)
+ self.notifier.addObserver(protocol)
+ self.notifier.enqueue("A")
+ self.assertEquals(protocol.lines, ["1 A"])
+
+ def test_playback(self):
+ self.notifier.enqueue("A")
+ self.notifier.enqueue("B")
+ self.notifier.enqueue("C")
+ protocol = StubProtocol()
+ self.notifier.addObserver(protocol)
+ self.notifier.playback(protocol, 1)
+ self.assertEquals(protocol.lines, ["2 B", "3 C"])
+
+class SimpleLineNotificationFactoryTests(TestCase):
+
+ def test_buildProtocol(self):
+ notifier = StubNotifier()
+ factory = SimpleLineNotificationFactory(notifier)
+ protocol = factory.buildProtocol(None)
+ self.assertEquals(protocol.notifier, notifier)
+ self.assertIn(protocol, notifier.observers)
+
+
+class SimpleLineNotificationProtocolTests(TestCase):
+
+ def setUp(self):
+ self.notifier = StubNotifier()
+ self.protocol = SimpleLineNotificationProtocol()
+ self.protocol.notifier = self.notifier
+ self.protocol.transport = StubTransport()
+ self.notifier.addObserver(self.protocol)
+
+ def test_connectionLost(self):
+ self.protocol.connectionLost(None)
+ self.assertNotIn(self.protocol, self.notifier.observers)
+
+ def test_lineReceived(self):
+ self.protocol.lineReceived("2")
+ self.assertEquals(self.notifier.playbackHistory, [(self.protocol, 2)])
+
+ def test_lineReceivedInvalid(self):
+ self.protocol.lineReceived("bogus")
+ self.assertEquals(self.notifier.playbackHistory, [])
+
+
+
+class StubProtocol(object):
+
+ def __init__(self):
+ self.lines = []
+
+ def sendLine(self, line):
+ self.lines.append(line)
+
+
+class StubTransport(object):
+
+ def getPeer(self):
+ return "peer"
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080630/d927a086/attachment-0001.html
More information about the calendarserver-changes
mailing list