[CalendarServer-changes] [2644] CalendarServer/branches/users/sagen/notifications

source_changes at macosforge.org source_changes at macosforge.org
Mon Jun 30 11:12:25 PDT 2008


Revision: 2644
          http://trac.macosforge.org/projects/calendarserver/changeset/2644
Author:   sagen at apple.com
Date:     2008-06-30 11:12:25 -0700 (Mon, 30 Jun 2008)
Log Message:
-----------
Initial check-in of calendar notification server

Starts up a notification server process which listens for messages from
the calendar server processes, coalesces them (to reduce chattiness),
and forwards them on to external consumers.

Modified Paths:
--------------
    CalendarServer/branches/users/sagen/notifications/conf/caldavd-test.plist
    CalendarServer/branches/users/sagen/notifications/twisted/plugins/caldav.py
    CalendarServer/branches/users/sagen/notifications/twistedcaldav/cache.py
    CalendarServer/branches/users/sagen/notifications/twistedcaldav/cluster.py
    CalendarServer/branches/users/sagen/notifications/twistedcaldav/config.py
    CalendarServer/branches/users/sagen/notifications/twistedcaldav/tap.py

Added Paths:
-----------
    CalendarServer/branches/users/sagen/notifications/twistedcaldav/notify.py
    CalendarServer/branches/users/sagen/notifications/twistedcaldav/test/test_notify.py

Modified: CalendarServer/branches/users/sagen/notifications/conf/caldavd-test.plist
===================================================================
--- CalendarServer/branches/users/sagen/notifications/conf/caldavd-test.plist	2008-06-30 17:23:33 UTC (rev 2643)
+++ CalendarServer/branches/users/sagen/notifications/conf/caldavd-test.plist	2008-06-30 18:12:25 UTC (rev 2644)
@@ -405,6 +405,23 @@
 
 
   <!--
+    Notifications
+  -->
+  <key>EnableNotifications</key>
+  <true/>
+  <key>CoalesceSeconds</key>
+  <integer>10</integer>
+  <key>InternalNotificationHost</key>
+  <string>localhost</string>
+  <key>InternalNotificationPort</key>
+  <integer>62309</integer>
+  <key>ExternalNotificationService</key>
+  <string>twistedcaldav.notify.SimpleLineNotifierService</string>
+  <key>SimpleLineNotificationPort</key>
+  <integer>62308</integer>
+
+
+  <!--
     Miscellaneous items
   -->
 

Modified: CalendarServer/branches/users/sagen/notifications/twisted/plugins/caldav.py
===================================================================
--- CalendarServer/branches/users/sagen/notifications/twisted/plugins/caldav.py	2008-06-30 17:23:33 UTC (rev 2643)
+++ CalendarServer/branches/users/sagen/notifications/twisted/plugins/caldav.py	2008-06-30 18:12:25 UTC (rev 2644)
@@ -30,3 +30,5 @@
 
 
 TwistedCalDAV = TAP('twistedcaldav.tap.CalDAVServiceMaker')
+
+CalDAVNotifier = TAP('twistedcaldav.notify.NotificationServiceMaker')

Modified: CalendarServer/branches/users/sagen/notifications/twistedcaldav/cache.py
===================================================================
--- CalendarServer/branches/users/sagen/notifications/twistedcaldav/cache.py	2008-06-30 17:23:33 UTC (rev 2643)
+++ CalendarServer/branches/users/sagen/notifications/twistedcaldav/cache.py	2008-06-30 18:12:25 UTC (rev 2644)
@@ -29,8 +29,11 @@
 
 from twistedcaldav.log import LoggingMixIn
 from twistedcaldav.memcachepool import CachePoolUserMixIn
+from twistedcaldav.config import config
 
+from twistedcaldav.notify import NotificationClientUserMixIn
 
+
 class DisabledCacheNotifier(object):
     def __init__(self, *args, **kwargs):
         pass
@@ -62,7 +65,9 @@
 
 
 
-class MemcacheChangeNotifier(LoggingMixIn, CachePoolUserMixIn):
+class MemcacheChangeNotifier(LoggingMixIn, CachePoolUserMixIn,
+    NotificationClientUserMixIn):
+
     def __init__(self, resource, cachePool=None):
         self._resource = resource
         self._cachePool = cachePool
@@ -78,9 +83,16 @@
 
         return: A L{Deferred} that fires when the token has been changed.
         """
-        self.log_debug("Changing Cache Token for %r" % (self._resource.url(),))
+
+        url = self._resource.url()
+
+        if config.EnableNotifications:
+            self.log_debug("Notifications are enabled: %s" % (url,))
+            self.sendNotification(url)
+
+        self.log_debug("Changing Cache Token for %r" % (url,))
         return self.getCachePool().set(
-            'cacheToken:%s' % (self._resource.url(),),
+            'cacheToken:%s' % (url,),
             self._newCacheToken())
 
 

Modified: CalendarServer/branches/users/sagen/notifications/twistedcaldav/cluster.py
===================================================================
--- CalendarServer/branches/users/sagen/notifications/twistedcaldav/cluster.py	2008-06-30 17:23:33 UTC (rev 2643)
+++ CalendarServer/branches/users/sagen/notifications/twistedcaldav/cluster.py	2008-06-30 18:12:25 UTC (rev 2644)
@@ -309,7 +309,18 @@
 
         monitor.addProcess('memcached', memcachedArgv, env=parentEnv)
 
+    if (config.EnableNotifications and
+        config.InternalNotificationHost == "localhost"):
+        log.msg("Adding notification service")
 
+        notificationsArgv = [
+            config.Twisted['twistd'],
+            '-n', 'caldav_notifier',
+            '-f', options['config'],
+        ]
+        monitor.addProcess('notifications', notificationsArgv, env=parentEnv)
+
+
     logger = AMPLoggingFactory(
         RotatingFileAccessLoggingObserver(config.AccessLogFile))
 

Modified: CalendarServer/branches/users/sagen/notifications/twistedcaldav/config.py
===================================================================
--- CalendarServer/branches/users/sagen/notifications/twistedcaldav/config.py	2008-06-30 17:23:33 UTC (rev 2643)
+++ CalendarServer/branches/users/sagen/notifications/twistedcaldav/config.py	2008-06-30 18:12:25 UTC (rev 2644)
@@ -169,6 +169,16 @@
     "EnableTimezoneService" : False, # Timezone service
 
     #
+    # Notifications
+    #
+    "EnableNotifications" : False,
+    "CoalesceSeconds" : 10,
+    "InternalNotificationHost" : "localhost",
+    "InternalNotificationPort" : 62309,
+    "ExternalNotificationService" : "twistedcaldav.notify.SimpleLineNotifierService",
+    "SimpleLineNotificationPort" : 62308,
+
+    #
     # Implementation details
     #
     #    The following are specific to how the server is built, and useful

Added: CalendarServer/branches/users/sagen/notifications/twistedcaldav/notify.py
===================================================================
--- CalendarServer/branches/users/sagen/notifications/twistedcaldav/notify.py	                        (rev 0)
+++ CalendarServer/branches/users/sagen/notifications/twistedcaldav/notify.py	2008-06-30 18:12:25 UTC (rev 2644)
@@ -0,0 +1,522 @@
+##
+# Copyright (c) 2005-2008 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Notification framework for Calendar Server
+
+This module implements client code which is executed within the context of
+icalserver itself, and also server code (the "notification server") which is
+run as a separate process, launched as part of "./run".
+
+The notification server process is implemented as a twistd plugin
+(with a tapname of "caldav_notifier"), and is comprised of two
+services -- one handling the internal channel between icalserver
+and notification server, the other handling the external channel
+between notification server and a remote consumer.
+
+The icalserver tap creates a NotificationClient object at startup;
+it deals with passing along notifications to the notification server.
+These notifications originate from cache.py:MemcacheChangeNotifier.changed().
+"""
+
+# TODO: bindAddress to local
+# TODO: "reset" / "all" upon startup
+# TODO: sequence number rollover
+
+import os
+from twisted.internet import reactor, protocol
+from twisted.protocols import basic
+from twisted.plugin import IPlugin
+from twisted.application import internet, service
+from twistedcaldav.log import LoggingMixIn
+from twisted.python.usage import Options, UsageError
+from twisted.python.reflect import namedClass
+from twistedcaldav.config import config, parseConfig, defaultConfig
+from zope.interface import Interface, implements
+import sqlite3
+
+__all__ = '''
+Coalescer getNotificationClient INotifier installNotificationClient
+InternalNotificationFactory InternalNotificationProtocol
+NotificationClient NotificationClientFactory NotificationClientLineProtocol
+NotificationClientUserMixIn NotificationOptions NotificationServiceMaker
+SimpleLineNotificationFactory SimpleLineNotificationProtocol
+SimpleLineNotifier SimpleLineNotifierService
+'''.split()
+
+
+
+
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Classes used within calendarserver itself
+#
+
+class NotificationClientUserMixIn(object):
+    """
+    Notification Client User (Mixin)
+
+    Provides a method to send change notifications to the L{NotificationClient}.
+    """
+
+    def sendNotification(self, uri):
+        getNotificationClient().send(uri)
+
+
+class NotificationClientLineProtocol(basic.LineReceiver, LoggingMixIn):
+    """
+    Notification Client Line Protocol
+
+    Sends updates to the notification server.
+    """
+
+    def connectionMade(self):
+        self.client.addObserver(self)
+        self.factory.connectionMade()
+
+    def connectionLost(self, reason):
+        self.client.removeObserver(self)
+
+
+class NotificationClientFactory(protocol.ReconnectingClientFactory,
+    LoggingMixIn):
+    """
+    Notification Client Factory
+
+    Sends updates to the notification server.
+    """
+
+    protocol = NotificationClientLineProtocol
+
+    def __init__(self, client):
+        self.connected = False
+        self.client = client
+
+    def clientConnectionLost(self, connector, reason):
+        self.log_error("Connect to notification server lost: %s" %
+            (reason,))
+        self.connected = False
+        protocol.ReconnectingClientFactory.clientConnectionLost(self,
+            connector, reason)
+
+    def clientConnectionFailed(self, connector, reason):
+        self.log_error("Unable to connect to notification server: %s" %
+            (reason,))
+        self.connected = False
+        protocol.ReconnectingClientFactory.clientConnectionFailed(self,
+            connector, reason)
+
+    def connectionMade(self):
+        self.connected = True
+        self.resetDelay()
+        self.client.connectionMade()
+
+    def isReady(self):
+        return self.connected
+
+    def buildProtocol(self, addr):
+        p = self.protocol()
+        p.factory = self
+        p.client = self.client
+        return p
+
+
+class NotificationClient(LoggingMixIn):
+    """
+    Notification Client
+
+    Forwards on notifications from NotificationClientUserMixIns to the
+    notification server.  A NotificationClient is installed by the tap at
+    startup.
+    """
+
+    def __init__(self, reactor, host, port):
+        self.factory = None
+        self.reactor = reactor
+        self.host = host
+        self.port = port
+        self.observers = set()
+        self.queued = set()
+
+    def send(self, uri):
+        if self.factory is None:
+            self.factory = NotificationClientFactory(self)
+            self.reactor.connectTCP(self.host, self.port, self.factory)
+            self.log_debug("Creating factory")
+
+        if self.factory.isReady() and self.observers:
+            for observer in self.observers:
+                self.log_debug("Sending to notification server: %s" % (uri,))
+                observer.sendLine(str(uri))
+        else:
+            self.log_debug("Queing: %s" % (uri,))
+            self.queued.add(uri)
+
+    def connectionMade(self):
+        if self.factory.isReady() and self.observers:
+            for observer in self.observers:
+                for uri in self.queued:
+                    self.log_debug("Sending from queue: %s" % (uri,))
+                    observer.sendLine(str(uri))
+            self.queued.clear()
+
+    def addObserver(self, observer):
+        self.observers.add(observer)
+
+    def removeObserver(self, observer):
+        self.observers.remove(observer)
+
+
+_notificationClient = None
+
+def installNotificationClient(reactor, host, port, klass=NotificationClient):
+    global _notificationClient
+    _notificationClient = klass(reactor, host, port)
+
+def getNotificationClient():
+    return _notificationClient
+
+
+
+
+
+
+
+# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+# Classes used within Notification Server
+#
+
+#
+# Internal Channel (from icalserver to notification server)
+#
+
+class InternalNotificationProtocol(basic.LineReceiver):
+    """
+    InternalNotificationProtocol
+
+    Receives notifications from the calendar server.
+    """
+
+    def lineReceived(self, line):
+        val = str(line.strip())
+        self.factory.coalescer.add(val)
+
+
+class InternalNotificationFactory(protocol.ServerFactory):
+    """
+    Internal Notification Factory
+
+    Receives notifications from the calendar server.
+    """
+
+    protocol = InternalNotificationProtocol
+
+    def __init__(self, externalService, delaySeconds=None):
+        self.coalescer = Coalescer(externalService, delaySeconds=delaySeconds)
+
+
+
+class Coalescer(LoggingMixIn):
+    """
+    Coalescer
+
+    A queue which hangs on to incoming uris for some period of time before
+    passing them along to the external notifier listening for these updates.
+    A chatty CalDAV client can make several changes in a short period of time,
+    and the Coalescer buffers the external clients somewhat.
+    """
+
+    delaySeconds = 5
+
+    def __init__(self, notifier, reactor=None, delaySeconds=None):
+
+        if delaySeconds:
+            self.delaySeconds = delaySeconds
+
+        self.log_warn("Coalescer seconds: %d" % (self.delaySeconds,))
+
+        if reactor is None:
+            from twisted.internet import reactor
+        self.reactor = reactor
+
+        self.uris = dict()
+        self.notifier = notifier
+
+    def add(self, uri):
+        delayed = self.uris.get(uri, None)
+        if delayed and delayed.active():
+            delayed.reset(self.delaySeconds)
+        else:
+            self.uris[uri] = self.reactor.callLater(self.delaySeconds,
+                self.notifier.enqueue, uri)
+
+
+
+
+#
+# External Channel (from notification server to other consumers)
+#
+
+class INotifier(Interface):
+    """
+    Notifier Interface
+
+    Defines an enqueue method that Notifier classes need to implement.
+    """
+
+    def enqueue(uri):
+        """
+        Let's the notifier object know that a change has been made for this
+        uri, and enough time has passed to allow for coalescence.
+
+        @type uri: C{str}
+        """
+
+
+class SimpleLineNotifier(object):
+    """
+    Simple Line Notifier
+
+    Listens for uris from the coalescer and writes them out to any
+    connected clients.  Each line is simply a sequence number, a
+    space, and a uri string.  If the external client sends a sequence
+    number, this notifier will send notification lines for each uri
+    that was changed since that sequence number was originally sent.
+    A history of such sequence numbers is stored in a sqlite db.
+    """
+
+    implements(INotifier)
+
+    def __init__(self, dbName=":memory:"):
+
+        self.next_seq = 0
+        self.observers = set()
+
+        self.db = sqlite3.connect(dbName)
+        query = 'CREATE TABLE uris (uri TEXT PRIMARY KEY, seq LONG)'
+        cursor = self.db.cursor()
+        cursor.execute(query)
+        cursor.close()
+        self.db.commit()
+
+
+    def enqueue(self, uri):
+
+        self.next_seq += 1
+
+        # Update database
+        query = 'INSERT OR REPLACE INTO uris (uri, seq) VALUES (:1, :2)'
+        cursor = self.db.cursor()
+        cursor.execute(query, (uri, self.next_seq))
+        cursor.close()
+        self.db.commit()
+
+        for observer in self.observers:
+            observer.sendLine("%d %s" % (self.next_seq, uri))
+
+
+    def playback(self, observer, old_seq):
+
+        query = 'SELECT * FROM uris WHERE seq > :1 ORDER BY seq ASC'
+        cursor = self.db.cursor()
+
+        for uri, seq in cursor.execute(query, (old_seq,)):
+            observer.sendLine("%d %s" % (seq, str(uri)))
+
+        cursor.close()
+
+
+    def addObserver(self, observer):
+        self.observers.add(observer)
+
+    def removeObserver(self, observer):
+        self.observers.remove(observer)
+
+
+class SimpleLineNotificationProtocol(basic.LineReceiver, LoggingMixIn):
+    """
+    Simple Line Notification Protocol
+
+    Sends notifications to external consumers.  Also responds to history-
+    playback requests.  If an integer is received from an external consumer,
+    it is interpreted as a sequence number; all notifications sent since that
+    sequence number was sent are resent.
+    """
+
+    def lineReceived(self, line):
+        val = line.strip()
+
+        # Should be a number requesting all updates since that sequence
+        try:
+            old_seq = int(val)
+        except ValueError, e:
+            self.log_warn("Error parsing %s: %s (from %s)" % (val, e,
+                self.transport.getPeer()))
+            return
+
+        self.notifier.playback(self, old_seq)
+
+    def connectionLost(self, reason):
+        self.notifier.removeObserver(self)
+
+
+class SimpleLineNotificationFactory(protocol.ServerFactory):
+    """
+    Simple Line Notification Factory
+
+    Sends notifications to external consumers.
+    """
+
+    protocol = SimpleLineNotificationProtocol
+
+    def __init__(self, notifier):
+        self.notifier = notifier
+
+    def buildProtocol(self, addr):
+        p = self.protocol()
+        self.notifier.addObserver(p)
+        p.notifier = self.notifier
+        return p
+
+
+
+#
+# Notification Server service config
+#
+
+class NotificationOptions(Options):
+    optParameters = [[
+        "config", "f", "/etc/caldavd/caldavd.plist", "Path to configuration file."
+    ]]
+
+    def __init__(self, *args, **kwargs):
+        super(NotificationOptions, self).__init__(*args, **kwargs)
+
+        self.overrides = {}
+
+    def _coerceOption(self, configDict, key, value):
+        """
+        Coerce the given C{val} to type of C{configDict[key]}
+        """
+        if key in configDict:
+            if isinstance(configDict[key], bool):
+                value = value == "True"
+
+            elif isinstance(configDict[key], (int, float, long)):
+                value = type(configDict[key])(value)
+
+            elif isinstance(configDict[key], (list, tuple)):
+                value = value.split(',')
+
+            elif isinstance(configDict[key], dict):
+                raise UsageError(
+                    "Dict options not supported on the command line"
+                )
+
+            elif value == 'None':
+                value = None
+
+        return value
+
+    def _setOverride(self, configDict, path, value, overrideDict):
+        """
+        Set the value at path in configDict
+        """
+        key = path[0]
+
+        if len(path) == 1:
+            overrideDict[key] = self._coerceOption(configDict, key, value)
+            return
+
+        if key in configDict:
+            if not isinstance(configDict[key], dict):
+                raise UsageError(
+                    "Found intermediate path element that is not a dictionary"
+                )
+
+            if key not in overrideDict:
+                overrideDict[key] = {}
+
+            self._setOverride(
+                configDict[key], path[1:],
+                value, overrideDict[key]
+            )
+
+
+    def opt_option(self, option):
+        """
+        Set an option to override a value in the config file. True, False, int,
+        and float options are supported, as well as comma seperated lists. Only
+        one option may be given for each --option flag, however multiple
+        --option flags may be specified.
+        """
+
+        if "=" in option:
+            path, value = option.split('=')
+            self._setOverride(
+                defaultConfig,
+                path.split('/'),
+                value,
+                self.overrides
+            )
+        else:
+            self.opt_option('%s=True' % (option,))
+
+    opt_o = opt_option
+
+    def postOptions(self):
+        parseConfig(self['config'])
+        config.updateDefaults(self.overrides)
+
+
+class NotificationServiceMaker(object):
+    implements(IPlugin, service.IServiceMaker)
+
+    tapname = "caldav_notifier"
+    description = "Notification Server"
+    options = NotificationOptions
+
+    def makeService(self, options):
+
+        multiService = service.MultiService()
+
+        externalServiceClass = namedClass(config.ExternalNotificationService)
+        externalService = externalServiceClass()
+        externalService.setServiceParent(multiService)
+
+        internet.TCPServer(
+            config.InternalNotificationPort,
+            InternalNotificationFactory(externalService,
+                delaySeconds=config.CoalesceSeconds)
+        ).setServiceParent(multiService)
+
+        return multiService
+
+
+class SimpleLineNotifierService(service.Service):
+
+    def __init__(self):
+        self.notifier = SimpleLineNotifier()
+        self.server = internet.TCPServer(config.SimpleLineNotificationPort,
+            SimpleLineNotificationFactory(self.notifier))
+
+    def enqueue(self, uri):
+        self.notifier.enqueue(uri)
+
+    def startService(self):
+        self.server.startService()
+
+    def stopService(self):
+        self.server.stopService()

Modified: CalendarServer/branches/users/sagen/notifications/twistedcaldav/tap.py
===================================================================
--- CalendarServer/branches/users/sagen/notifications/twistedcaldav/tap.py	2008-06-30 17:23:33 UTC (rev 2643)
+++ CalendarServer/branches/users/sagen/notifications/twistedcaldav/tap.py	2008-06-30 18:12:25 UTC (rev 2644)
@@ -56,6 +56,7 @@
 from twistedcaldav.timezones import TimezoneCache
 from twistedcaldav import pdmonster
 from twistedcaldav import memcachepool
+from twistedcaldav.notify import installNotificationClient
 
 log = Logger()
 
@@ -492,6 +493,14 @@
                 config.Memcached["MaxClients"])
 
         #
+        # Configure NotificationClient
+        #
+        if config.EnableNotifications:
+            installNotificationClient(reactor,
+                config.InternalNotificationHost,
+                config.InternalNotificationPort)
+
+        #
         # Setup Resource hierarchy
         #
 

Added: CalendarServer/branches/users/sagen/notifications/twistedcaldav/test/test_notify.py
===================================================================
--- CalendarServer/branches/users/sagen/notifications/twistedcaldav/test/test_notify.py	                        (rev 0)
+++ CalendarServer/branches/users/sagen/notifications/twistedcaldav/test/test_notify.py	2008-06-30 18:12:25 UTC (rev 2644)
@@ -0,0 +1,277 @@
+##
+# Copyright (c) 2008 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from twisted.trial.unittest import TestCase
+from twisted.internet.task import Clock
+from twistedcaldav.notify import *
+
+
+
+class NotificationClientUserTests(TestCase):
+
+    class NotificationClientUser(NotificationClientUserMixIn):
+        pass
+
+    def test_installNoficationClient(self):
+        self.assertEquals(getNotificationClient(), None)
+        self.clock = Clock()
+        installNotificationClient(self.clock, None, None,
+            klass=StubNotificationClient)
+        notificationClient = getNotificationClient()
+        self.assertNotEquals(notificationClient, None)
+
+        clientUser = self.NotificationClientUser()
+        clientUser.sendNotification("a")
+        self.assertEquals(notificationClient.lines, ["a"])
+
+
+class NotificationClientFactoryTests(TestCase):
+
+    def setUp(self):
+        self.client = StubNotificationClient(None, None, None)
+        self.factory = NotificationClientFactory(self.client)
+        self.factory.protocol = StubNotificationClientProtocol
+
+    def test_connect(self):
+        self.assertEquals(self.factory.isReady(), False)
+        protocol = self.factory.buildProtocol(None)
+        protocol.connectionMade()
+        self.assertEquals(self.client.observers, set([protocol]))
+        self.assertEquals(self.factory.isReady(), True)
+        
+        protocol.connectionLost(None)
+        self.assertEquals(self.client.observers, set())
+        self.assertEquals(self.factory.isReady(), False)
+
+
+class StubNotificationClient(object):
+
+    def __init__(self, reactor, host, port):
+        self.lines = []
+        self.observers = set()
+
+    def send(self, uri):
+        self.lines.append(uri)
+
+    def addObserver(self, observer):
+        self.observers.add(observer)
+
+    def removeObserver(self, observer):
+        self.observers.remove(observer)
+
+    def connectionMade(self):
+        pass
+
+class StubNotificationClientProtocol(object):
+
+    def __init__(self):
+        self.lines = []
+
+    def sendLine(self, line):
+        self.lines.append(line)
+
+    def connectionMade(self):
+        self.client.addObserver(self)
+        self.factory.connectionMade()
+
+    def connectionLost(self, reason):
+        self.client.removeObserver(self)
+        self.factory.connected = False
+
+
+class NotificationClientTests(TestCase):
+
+    def setUp(self):
+        self.client = NotificationClient(Clock(), None, None)
+        self.client.factory = StubNotificationClientFactory()
+
+    def test_sendWhileNotConnected(self):
+        self.client.send("a")
+        self.assertEquals(self.client.queued, set(["a"]))
+
+    def test_sendWhileConnected(self):
+        protocol = StubNotificationClientProtocol()
+        self.client.addObserver(protocol)
+        self.client.factory.connected = True
+        self.client.send("a")
+        self.assertEquals(self.client.queued, set())
+        self.assertEquals(protocol.lines, ["a"])
+
+    def test_sendQueue(self):
+        self.client.send("a")
+        self.assertEquals(self.client.queued, set(["a"]))
+        protocol = StubNotificationClientProtocol()
+        self.client.addObserver(protocol)
+        self.client.factory.connected = True
+        self.client.connectionMade()
+        self.assertEquals(protocol.lines, ["a"])
+        self.assertEquals(self.client.queued, set())
+
+
+class StubNotificationClientFactory(object):
+
+    def __init__(self):
+        self.connected = False
+
+    def isReady(self):
+        return self.connected
+
+
+class CoalescerTests(TestCase):
+
+    def setUp(self):
+        self.clock = Clock()
+        self.notifier = StubNotifier()
+        self.coalescer = Coalescer(self.notifier, reactor=self.clock)
+
+    def test_delayedNotifications(self):
+        self.coalescer.add("A")
+        self.assertEquals(self.notifier.notifications, [])
+        self.clock.advance(5)
+        self.assertEquals(self.notifier.notifications, ["A"])
+
+    def test_removeDuplicates(self):
+        self.coalescer.add("A")
+        self.coalescer.add("A")
+        self.clock.advance(5)
+        self.assertEquals(self.notifier.notifications, ["A"])
+
+
+class StubNotifier(object):
+
+    def __init__(self):
+        self.notifications = []
+        self.observers = set()
+        self.playbackHistory = []
+
+    def enqueue(self, uri):
+        self.notifications.append(uri)
+
+    def playback(self, protocol, old_seq):
+        self.playbackHistory.append((protocol, old_seq))
+
+    def addObserver(self, observer):
+        self.observers.add(observer)
+
+    def removeObserver(self, observer):
+        self.observers.remove(observer)
+
+
+class SimpleLineNotifierTests(TestCase):
+
+    def setUp(self):
+        self.clock = Clock()
+        self.notifier = SimpleLineNotifier()
+        self.coalescer = Coalescer(self.notifier, reactor=self.clock)
+
+
+    def test_send(self):
+        protocol = StubProtocol()
+        self.notifier.addObserver(protocol)
+        self.notifier.enqueue("A")
+        self.assertEquals(protocol.lines, ["1 A"])
+
+    def test_incrementSequence(self):
+        protocol = StubProtocol()
+        self.notifier.addObserver(protocol)
+        self.notifier.enqueue("A")
+        self.notifier.enqueue("B")
+        self.assertEquals(protocol.lines, ["1 A", "2 B"])
+
+    def test_addObserver(self):
+        protocol = StubProtocol()
+        self.notifier.addObserver(protocol)
+        self.notifier.enqueue("A")
+        self.assertEquals(protocol.lines, ["1 A"])
+
+    def test_removeObserver(self):
+        protocol = StubProtocol()
+        self.notifier.addObserver(protocol)
+        self.notifier.removeObserver(protocol)
+        self.notifier.enqueue("A")
+        self.assertEquals(protocol.lines, [])
+
+    def test_multipleObservers(self):
+        protocol1 = StubProtocol()
+        protocol2 = StubProtocol()
+        self.notifier.addObserver(protocol1)
+        self.notifier.addObserver(protocol2)
+        self.notifier.enqueue("A")
+        self.assertEquals(protocol1.lines, ["1 A"])
+        self.assertEquals(protocol2.lines, ["1 A"])
+
+    def test_duplicateObservers(self):
+        protocol = StubProtocol()
+        self.notifier.addObserver(protocol)
+        self.notifier.addObserver(protocol)
+        self.notifier.enqueue("A")
+        self.assertEquals(protocol.lines, ["1 A"])
+
+    def test_playback(self):
+        self.notifier.enqueue("A")
+        self.notifier.enqueue("B")
+        self.notifier.enqueue("C")
+        protocol = StubProtocol()
+        self.notifier.addObserver(protocol)
+        self.notifier.playback(protocol, 1)
+        self.assertEquals(protocol.lines, ["2 B", "3 C"])
+
+class SimpleLineNotificationFactoryTests(TestCase):
+
+    def test_buildProtocol(self):
+        notifier = StubNotifier()
+        factory = SimpleLineNotificationFactory(notifier)
+        protocol = factory.buildProtocol(None)
+        self.assertEquals(protocol.notifier, notifier)
+        self.assertIn(protocol, notifier.observers)
+
+
+class SimpleLineNotificationProtocolTests(TestCase):
+
+    def setUp(self):
+        self.notifier = StubNotifier()
+        self.protocol = SimpleLineNotificationProtocol()
+        self.protocol.notifier = self.notifier
+        self.protocol.transport = StubTransport()
+        self.notifier.addObserver(self.protocol)
+
+    def test_connectionLost(self):
+        self.protocol.connectionLost(None)
+        self.assertNotIn(self.protocol, self.notifier.observers)
+
+    def test_lineReceived(self):
+        self.protocol.lineReceived("2")
+        self.assertEquals(self.notifier.playbackHistory, [(self.protocol, 2)])
+
+    def test_lineReceivedInvalid(self):
+        self.protocol.lineReceived("bogus")
+        self.assertEquals(self.notifier.playbackHistory, [])
+
+
+
+class StubProtocol(object):
+
+    def __init__(self):
+        self.lines = []
+
+    def sendLine(self, line):
+        self.lines.append(line)
+
+
+class StubTransport(object):
+
+    def getPeer(self):
+        return "peer"
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080630/d927a086/attachment-0001.html 


More information about the calendarserver-changes mailing list