[CalendarServer-changes] [2952] CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav

source_changes at macosforge.org source_changes at macosforge.org
Tue Sep 9 08:42:42 PDT 2008


Revision: 2952
          http://trac.macosforge.org/projects/calendarserver/changeset/2952
Author:   sagen at apple.com
Date:     2008-09-09 08:42:42 -0700 (Tue, 09 Sep 2008)
Log Message:
-----------
Checkpoint of work done so far; breaks out client notification class from memcachenotifier

Modified Paths:
--------------
    CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/cache.py
    CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/customxml.py
    CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/directory/calendar.py
    CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/notify.py
    CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/static.py
    CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/test/test_notify.py

Modified: CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/cache.py
===================================================================
--- CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/cache.py	2008-09-09 15:41:12 UTC (rev 2951)
+++ CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/cache.py	2008-09-09 15:42:42 UTC (rev 2952)
@@ -30,19 +30,12 @@
 from twistedcaldav.config import config
 from twistedcaldav.log import LoggingMixIn
 from twistedcaldav.memcachepool import CachePoolUserMixIn
-from twistedcaldav.notify import NotificationClientUserMixIn
 
 
 class DisabledCacheNotifier(object):
     def __init__(self, *args, **kwargs):
         pass
 
-    def enableNotify(self, arg):
-        pass
-
-    def disableNotify(self):
-        pass
-
     def changed(self):
         return succeed(None)
 
@@ -66,29 +59,12 @@
             self.uri)
 
 
-#
-# FIXME: This should be a generic notifier class, not specific to
-# memcache, as evidenced by the addition of the sendNotification()
-# addition in changed() below.
-#
-class MemcacheChangeNotifier(LoggingMixIn, CachePoolUserMixIn,
-    NotificationClientUserMixIn):
+class MemcacheChangeNotifier(LoggingMixIn, CachePoolUserMixIn):
 
     def __init__(self, resource, cachePool=None):
         self._resource = resource
         self._cachePool = cachePool
-        self._notify = True
 
-    def enableNotify(self, arg):
-        url = self._resource.url()
-        self.log_debug("enableNotify: %s" % (url,))
-        self._notify = True
-
-    def disableNotify(self):
-        url = self._resource.url()
-        self.log_debug("disableNotify: %s" % (url,))
-        self._notify = False
-
     def _newCacheToken(self):
         return str(uuid.uuid4())
 
@@ -98,16 +74,8 @@
 
         return: A L{Deferred} that fires when the token has been changed.
         """
-
         url = self._resource.url()
 
-        if config.Notifications["Enabled"]:
-            if self._notify:
-                self.log_debug("Notifications are enabled: %s" % (url,))
-                self.sendNotification(url)
-            else:
-                self.log_debug("Skipping notification for: %s" % (url,))
-
         self.log_debug("Changing Cache Token for %r" % (url,))
         return self.getCachePool().set(
             'cacheToken:%s' % (url,),

Modified: CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/customxml.py
===================================================================
--- CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/customxml.py	2008-09-09 15:41:12 UTC (rev 2951)
+++ CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/customxml.py	2008-09-09 15:42:42 UTC (rev 2952)
@@ -252,6 +252,7 @@
     namespace = calendarserver_namespace
     name = "xmpp-uri"
     protected = True
+    hidden = True
 
 class PubSubHeartbeatURIProperty (davxml.WebDAVTextElement):
     """
@@ -261,6 +262,7 @@
     namespace = calendarserver_namespace
     name = "xmpp-heartbeat-uri"
     protected = True
+    hidden = True
 
 class PubSubXMPPServerProperty (davxml.WebDAVTextElement):
     """
@@ -270,6 +272,7 @@
     namespace = calendarserver_namespace
     name = "xmpp-server"
     protected = True
+    hidden = True
 
 class IScheduleInbox (davxml.WebDAVEmptyElement):
     """

Modified: CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/directory/calendar.py
===================================================================
--- CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/directory/calendar.py	2008-09-09 15:41:12 UTC (rev 2951)
+++ CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/directory/calendar.py	2008-09-09 15:42:42 UTC (rev 2952)
@@ -281,8 +281,8 @@
     def provisionDefaultCalendars(self):
 
         # Disable notifications during provisioning
-        if hasattr(self, "cacheNotifier"):
-            self.cacheNotifier.disableNotify()
+        if hasattr(self, "clientNotifier"):
+            self.clientNotifier.disableNotify()
 
         def setupFreeBusy(_):
             # Default calendar is initially opaque to freebusy
@@ -317,14 +317,14 @@
         except:
             # We want to make sure to re-enable notifications, so do so
             # if there is an immediate exception above, or via errback, below
-            if hasattr(self, "cacheNotifier"):
-                self.cacheNotifier.enableNotify(None)
+            if hasattr(self, "clientNotifier"):
+                self.clientNotifier.enableNotify(None)
             raise
 
         # Re-enable notifications
-        if hasattr(self, "cacheNotifier"):
-            d.addCallback(self.cacheNotifier.enableNotify)
-            d.addErrback(self.cacheNotifier.enableNotify)
+        if hasattr(self, "clientNotifier"):
+            d.addCallback(self.clientNotifier.enableNotify)
+            d.addErrback(self.clientNotifier.enableNotify)
 
         return d
 

Modified: CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/notify.py
===================================================================
--- CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/notify.py	2008-09-09 15:41:12 UTC (rev 2951)
+++ CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/notify.py	2008-09-09 15:42:42 UTC (rev 2952)
@@ -35,7 +35,9 @@
 # TODO: bindAddress to local
 # TODO: add CalDAVTester test for examining new xmpp-uri property
 
-from twisted.internet import protocol
+from twisted.internet import protocol, defer
+from twisted.internet.address import IPv4Address
+from twisted.internet.defer import inlineCallbacks, returnValue
 from twisted.protocols import basic
 from twisted.plugin import IPlugin
 from twisted.application import internet, service
@@ -48,11 +50,15 @@
 from twisted.words.xish import domish
 from twistedcaldav.log import LoggingMixIn
 from twistedcaldav.config import config, parseConfig, defaultConfig
+from twistedcaldav.memcacher import Memcacher
+from twistedcaldav import memcachepool
 from zope.interface import Interface, implements
 from fnmatch import fnmatch
 
 __all__ = [
+    "ClientNotifier",
     "Coalescer",
+    "getNodeCacher",
     "getNotificationClient",
     "getPubSubConfiguration",
     "getPubSubHeartbeatURI",
@@ -65,8 +71,6 @@
     "NotificationClient",
     "NotificationClientFactory",
     "NotificationClientLineProtocol",
-    "NotificationClientUserMixIn",
-    "NotificationOptions",
     "NotificationServiceMaker",
     "SimpleLineNotificationFactory",
     "SimpleLineNotificationProtocol",
@@ -80,18 +84,37 @@
 # Classes used within calendarserver itself
 #
 
-class NotificationClientUserMixIn(object):
+class ClientNotifier(LoggingMixIn):
     """
-    Notification Client User (Mixin)
-
     Provides a method to send change notifications to the L{NotificationClient}.
     """
 
-    def sendNotification(self, uri):
-        getNotificationClient().send(uri)
+    def __init__(self, resource, configOverride=None):
+        self._resource = resource
+        self._notify = True
+        self.config = configOverride or config
 
+    def enableNotify(self, arg):
+        url = self._resource.url()
+        self.log_debug("enableNotify: %s" % (url,))
+        self._notify = True
 
+    def disableNotify(self):
+        url = self._resource.url()
+        self.log_debug("disableNotify: %s" % (url,))
+        self._notify = False
 
+    def notify(self, op="update"):
+        url = self._resource.url()
+
+        if self.config.Notifications["Enabled"]:
+            if self._notify:
+                self.log_debug("Notifications are enabled: %s %s" % (op, url))
+                return getNotificationClient().send(op, url)
+            else:
+                self.log_debug("Skipping notification for: %s" % (url,))
+
+
 class NotificationClientLineProtocol(basic.LineReceiver, LoggingMixIn):
     """
     Notification Client Line Protocol
@@ -154,7 +177,7 @@
     """
     Notification Client
 
-    Forwards on notifications from NotificationClientUserMixIns to the
+    Forwards on notifications from ClientNotifiers to the
     notification server.  A NotificationClient is installed by the tap at
     startup.
     """
@@ -170,26 +193,27 @@
             from twisted.internet import reactor
         self.reactor = reactor
 
-    def send(self, uri):
+    def send(self, op, uri):
         if self.factory is None:
             self.factory = NotificationClientFactory(self)
             self.reactor.connectTCP(self.host, self.port, self.factory)
             self.log_debug("Creating factory")
 
+        msg = "%s %s" % (op, str(uri))
         if self.factory.isReady() and self.observers:
             for observer in self.observers:
-                self.log_debug("Sending to notification server: %s" % (uri,))
-                observer.sendLine(str(uri))
+                self.log_debug("Sending to notification server: %s" % (msg,))
+                observer.sendLine(msg)
         else:
-            self.log_debug("Queuing: %s" % (uri,))
-            self.queued.add(uri)
+            self.log_debug("Queuing: %s" % (msg,))
+            self.queued.add(msg)
 
     def connectionMade(self):
         if self.factory.isReady() and self.observers:
             for observer in self.observers:
-                for uri in self.queued:
-                    self.log_debug("Sending from queue: %s" % (uri,))
-                    observer.sendLine(str(uri))
+                for msg in self.queued:
+                    self.log_debug("Sending from queue: %s" % (msg,))
+                    observer.sendLine(msg)
             self.queued.clear()
 
     def addObserver(self, observer):
@@ -211,9 +235,84 @@
 
 
 
+class NodeCacher(Memcacher, LoggingMixIn):
 
+    def __init__(self, reactor=None):
+        if reactor is None:
+            from twisted.internet import reactor
+        self.reactor = reactor
+        super(NodeCacher, self).__init__("pubsubnodes")
 
+    @inlineCallbacks
+    def nodeExists(self, nodeName):
+        result = (yield self.get(nodeName))
+        self.log_debug("nodeExists result = %s" % (result,))
+        returnValue(result is not None)
 
+    @inlineCallbacks
+    def storeNode(self, nodeName):
+        return
+        self.log_debug("Storing node %s" % (nodeName,))
+        try:
+            yield self.set(nodeName, "1")
+        except Exception, e:
+            import pdb; pdb.set_trace()
+            self.log_error(e)
+            raise
+
+    @inlineCallbacks
+    def waitForNode(self, notifier, nodeName):
+        self.log_debug("in waitForNode %s" % (nodeName,))
+        doesExist = (yield self.nodeExists(nodeName))
+        self.log_debug("doesExist = %s" % (doesExist,))
+        if doesExist:
+            self.log_debug("waitForNode returning True")
+            returnValue(True)
+        else:
+            self.log_debug("waitForNode calling notify()")
+            notifier.notify(op="create")
+            self.log_debug("waitForNode called notify()")
+            (yield self._waitForNode(None, nodeName))
+
+    def _waitForNode(self, result, nodeName, retries=5, deferred=None):
+        self.log_debug("waiting for node %s, retries %d" % (nodeName, retries))
+
+        if deferred == None:
+            deferred = defer.Deferred()
+
+        def _exists(result, nodeName, retries, deferred):
+            if result is True:
+                self.log_debug("node exists %s" % (nodeName,))
+                deferred.callback(True)
+                return
+            else:
+                retries -= 1
+                if retries == 0:
+                    self.log_debug("giving up on node %s" % (nodeName,))
+                    deferred.errback()
+                    return
+                self.log_debug("scheduling a retry of node %s" % (nodeName,))
+                self.reactor.callLater(2, self._waitForNode, result, nodeName,
+                    retries=retries, deferred=deferred)
+
+        self.nodeExists(nodeName).addCallback(_exists, nodeName, retries,
+            deferred)
+        return deferred
+
+
+
+_nodeCacher = None
+
+def getNodeCacher():
+    global _nodeCacher
+    if _nodeCacher is None:
+        _nodeCacher = NodeCacher()
+    return _nodeCacher
+
+
+
+
+
 # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
 # Classes used within Notification Server
 #
@@ -230,8 +329,8 @@
     """
 
     def lineReceived(self, line):
-        val = str(line.strip())
-        self.factory.coalescer.add(val)
+        op, uri = line.strip().split()
+        self.factory.coalescer.add(op, uri)
 
 
 class InternalNotificationFactory(protocol.ServerFactory):
@@ -277,28 +376,37 @@
         self.uris = {}
         self.notifiers = notifiers
 
-    def add(self, uri):
-        delayed, count = self.uris.get(uri, [None, 0])
+    def add(self, op, uri):
 
-        if delayed and delayed.active():
-            count += 1
-            if count < self.sendAnywayAfterCount:
-                # reschedule for delaySeconds in the future
-                delayed.reset(self.delaySeconds)
-                self.uris[uri][1] = count
-                self.log_info("Delaying: %s" % (uri,))
+        if op == "create":
+            # we don't want to delay a "create" notification; this opcode
+            # is meant for XMPP pubsub -- it means create and configure the
+            # node but don't publish to it
+            for notifier in self.notifiers:
+                notifier.enqueue(op, uri)
+
+        else: # normal update notification
+            delayed, count = self.uris.get(uri, [None, 0])
+
+            if delayed and delayed.active():
+                count += 1
+                if count < self.sendAnywayAfterCount:
+                    # reschedule for delaySeconds in the future
+                    delayed.reset(self.delaySeconds)
+                    self.uris[uri][1] = count
+                    self.log_info("Delaying: %s" % (uri,))
+                else:
+                    self.log_info("Not delaying to avoid starvation: %s" % (uri,))
             else:
-                self.log_info("Not delaying to avoid starvation: %s" % (uri,))
-        else:
-            self.log_info("Scheduling: %s" % (uri,))
-            self.uris[uri] = [self.reactor.callLater(self.delaySeconds,
-                self.delayedEnqueue, uri), 0]
+                self.log_info("Scheduling: %s" % (uri,))
+                self.uris[uri] = [self.reactor.callLater(self.delaySeconds,
+                    self.delayedEnqueue, op, uri), 0]
 
-    def delayedEnqueue(self, uri):
+    def delayedEnqueue(self, op, uri):
         self.log_info("Time to send: %s" % (uri,))
         self.uris[uri][1] = 0
         for notifier in self.notifiers:
-            notifier.enqueue(uri)
+            notifier.enqueue(op, uri)
 
 
 
@@ -313,11 +421,12 @@
     Defines an enqueue method that Notifier classes need to implement.
     """
 
-    def enqueue(uri):
+    def enqueue(self, op, uri):
         """
         Let's the notifier object know that a change has been made for this
         uri, and enough time has passed to allow for coalescence.
 
+        @type op: C{str}
         @type uri: C{str}
         """
 
@@ -351,18 +460,20 @@
         self.observers = set()
         self.sentReset = False
 
-    def enqueue(self, uri):
+    def enqueue(self, op, uri):
 
-        self.latestSeq += 1L
+        if op == "update":
 
-        # Update history
-        self.history[uri] = self.latestSeq
+            self.latestSeq += 1L
 
-        for observer in self.observers:
-            msg = "%d %s" % (self.latestSeq, uri)
-            self.log_debug("Sending %s" % (msg,))
-            observer.sendLine(msg)
+            # Update history
+            self.history[uri] = self.latestSeq
 
+            for observer in self.observers:
+                msg = "%d %s" % (self.latestSeq, uri)
+                self.log_debug("Sending %s" % (msg,))
+                observer.sendLine(msg)
+
     def reset(self):
         self.latestSeq = 0L
         self.history = { } # keys=uri, values=sequenceNumber
@@ -518,15 +629,22 @@
 
     def sendHeartbeat(self):
         if self.doHeartbeat and self.xmlStream is not None:
-            self.enqueue("", lock=False)
+            self.enqueue("update", "", lock=False)
             self.reactor.callLater(self.settings['HeartbeatSeconds'],
                 self.sendHeartbeat)
 
-    def enqueue(self, uri, lock=True):
+    def enqueue(self, op, uri, lock=True):
         if self.xmlStream is not None:
             # Convert uri to node
             nodeName = self.uriToNodeName(uri)
-            self.publishNode(nodeName, lock=lock)
+            if op == "create":
+                if not self.lockNode(nodeName):
+                    # this node is busy, so it must already be created, or at
+                    # least in the proccess
+                    return
+                self.createNode(nodeName, publish=False)
+            else:
+                self.publishNode(nodeName, lock=lock)
 
     def uriToNodeName(self, uri):
         return getPubSubPath(uri, getPubSubConfiguration(self.config))
@@ -569,7 +687,7 @@
 
             if iq.name == "error":
                 if iq['code'] == '400':
-                    self.requestConfigurationForm(nodeName)
+                    self.requestConfigurationForm(nodeName, True)
 
                 elif iq['code'] == '404':
                     self.createNode(nodeName)
@@ -580,7 +698,7 @@
             self.unlockNode(None, nodeName)
             raise
 
-    def createNode(self, nodeName):
+    def createNode(self, nodeName, publish=True):
         if self.xmlStream is None:
             # We lost our connection
             self.unlockNode(None, nodeName)
@@ -592,32 +710,39 @@
             child = pubsubElement.addElement('create')
             child['node'] = nodeName
             d = iq.send(to=self.settings['ServiceAddress'])
-            d.addCallback(self.createNodeSuccess, nodeName)
-            d.addErrback(self.createNodeFailure, nodeName)
+            d.addCallback(self.createNodeSuccess, nodeName, publish)
+            d.addErrback(self.createNodeFailure, nodeName, publish)
         except:
             self.unlockNode(None, nodeName)
             raise
 
-    def createNodeSuccess(self, iq, nodeName):
+    def createNodeSuccess(self, iq, nodeName, publish):
         try:
             self.sendDebug("Node creation successful (%s)" % (nodeName,), iq)
             # now time to configure; fetch the form
-            self.requestConfigurationForm(nodeName)
+            self.requestConfigurationForm(nodeName, publish)
         except:
             self.unlockNode(None, nodeName)
             raise
 
-    def createNodeFailure(self, result, nodeName):
-        # If we get here we're giving up
+    def createNodeFailure(self, result, nodeName, publish):
         try:
             iq = result.value.getElement()
-            self.log_error("PubSub node creation error: %s" %
-                (iq.toXml().encode('ascii', 'replace')),)
-            self.sendError("Node creation failed (%s)" % (nodeName,), iq)
-        finally:
+            if iq['code'] == '409':
+                # node already exists, proceed to configure
+                self.sendDebug("Node already exists (%s)" % (nodeName,), iq)
+                self.requestConfigurationForm(nodeName, publish)
+            else:
+                # couldn't create node, give up
+                self.unlockNode(None, nodeName)
+                self.log_error("PubSub node creation error: %s" %
+                    (iq.toXml().encode('ascii', 'replace')),)
+                self.sendError("Node creation failed (%s)" % (nodeName,), iq)
+        except:
             self.unlockNode(None, nodeName)
+            raise
 
-    def requestConfigurationForm(self, nodeName):
+    def requestConfigurationForm(self, nodeName, publish):
         if self.xmlStream is None:
             # We lost our connection
             self.unlockNode(None, nodeName)
@@ -630,7 +755,8 @@
             child = child.addElement('configure')
             child['node'] = nodeName
             d = iq.send(to=self.settings['ServiceAddress'])
-            d.addCallback(self.requestConfigurationFormSuccess, nodeName)
+            d.addCallback(self.requestConfigurationFormSuccess, nodeName,
+                publish)
             d.addErrback(self.requestConfigurationFormFailure, nodeName)
         except:
             self.unlockNode(None, nodeName)
@@ -642,7 +768,7 @@
                 return child
         return None
 
-    def requestConfigurationFormSuccess(self, iq, nodeName):
+    def requestConfigurationFormSuccess(self, iq, nodeName, publish):
         if self.xmlStream is None:
             # We lost our connection
             self.unlockNode(None, nodeName)
@@ -683,7 +809,8 @@
                         self.sendDebug("Sending configuration form (%s)"
                                        % (nodeName,), filledIq)
                         d = filledIq.send(to=self.settings['ServiceAddress'])
-                        d.addCallback(self.configurationSuccess, nodeName)
+                        d.addCallback(self.configurationSuccess, nodeName,
+                            publish)
                         d.addErrback(self.configurationFailure, nodeName)
                         return
 
@@ -706,7 +833,7 @@
         finally:
             self.unlockNode(None, nodeName)
 
-    def configurationSuccess(self, iq, nodeName):
+    def configurationSuccess(self, iq, nodeName, publish):
         if self.xmlStream is None:
             # We lost our connection
             self.unlockNode(None, nodeName)
@@ -715,7 +842,12 @@
         try:
             self.log_debug("PubSub node %s is configured" % (nodeName,))
             self.sendDebug("Configured node (%s)" % (nodeName,), iq)
-            self.publishNode(nodeName, lock=False)
+            nodeCacher = getNodeCacher()
+            nodeCacher.storeNode(nodeName)
+            if publish:
+                self.publishNode(nodeName, lock=False)
+            else:
+                self.unlockNode(None, nodeName)
         except:
             self.unlockNode(None, nodeName)
             raise
@@ -852,7 +984,7 @@
             if frm in self.roster:
                 txt = str(body).lower()
                 if txt == "help":
-                    response = "debug on, debug off, roster, publish <nodename>, hammer <count>"
+                    response = "debug on, debug off, roster, create <nodename>, publish <nodename>, hammer <count>"
                 elif txt == "roster":
                     response = "Roster: %s" % (str(self.roster),)
                 elif txt == "debug on":
@@ -870,7 +1002,17 @@
                         response = "Please phrase it like 'publish nodename'"
                     else:
                         response = "Publishing node %s" % (nodeName,)
-                        self.reactor.callLater(1, self.publishNode, nodeName)
+                        self.reactor.callLater(1, self.enqueue, "update",
+                            nodeName)
+                elif txt.startswith("create"):
+                    try:
+                        publish, nodeName = txt.split()
+                    except ValueError:
+                        response = "Please phrase it like 'create nodename'"
+                    else:
+                        response = "Creating and configuring node %s" % (nodeName,)
+                        self.reactor.callLater(1, self.enqueue, "create",
+                            nodeName)
                 elif txt.startswith("hammer"):
                     try:
                         hammer, count = txt.split()
@@ -894,7 +1036,7 @@
 
     def hammer(self, count):
         for i in xrange(count):
-            self.enqueue("hammertesting%d" % (i,))
+            self.enqueue("update", "hammertesting%d" % (i,))
 
 
 class XMPPNotificationFactory(xmlstream.XmlStreamFactory, LoggingMixIn):
@@ -1101,6 +1243,17 @@
 
     def makeService(self, options):
 
+        #
+        # Configure Memcached Client Pool
+        #
+        if config.Memcached["ClientEnabled"]:
+            memcachepool.installPool(
+                IPv4Address(
+                    'TCP',
+                    config.Memcached["BindAddress"],
+                    config.Memcached["Port"]),
+                config.Memcached["MaxClients"])
+
         multiService = service.MultiService()
 
         notifiers = []
@@ -1126,8 +1279,8 @@
         self.server = internet.TCPServer(settings["Port"],
             SimpleLineNotificationFactory(self.notifier))
 
-    def enqueue(self, uri):
-        self.notifier.enqueue(uri)
+    def enqueue(self, op, uri):
+        self.notifier.enqueue(op, uri)
 
     def startService(self):
         self.server.startService()
@@ -1143,8 +1296,8 @@
         self.client = internet.TCPClient(settings["Host"], settings["Port"],
             XMPPNotificationFactory(self.notifier, settings))
 
-    def enqueue(self, uri):
-        self.notifier.enqueue(uri)
+    def enqueue(self, op, uri):
+        self.notifier.enqueue(op, uri)
 
     def startService(self):
         self.client.startService()

Modified: CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/static.py
===================================================================
--- CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/static.py	2008-09-09 15:41:12 UTC (rev 2951)
+++ CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/static.py	2008-09-09 15:42:42 UTC (rev 2952)
@@ -77,7 +77,8 @@
 from twistedcaldav.timezoneservice import TimezoneServiceResource
 from twistedcaldav.cache import DisabledCacheNotifier, PropfindCacheMixin
 from twistedcaldav.notify import getPubSubConfiguration, getPubSubXMPPURI
-from twistedcaldav.notify import getPubSubHeartbeatURI
+from twistedcaldav.notify import getPubSubHeartbeatURI, getPubSubPath
+from twistedcaldav.notify import ClientNotifier, getNodeCacher
 
 log = Logger()
 
@@ -332,11 +333,18 @@
         except:
             return fail(Failure())
 
+        if hasattr(self, 'clientNotifier'):
+            self.clientNotifier.notify(op="update")
+        else:
+            log.debug("%r does not have a clientNotifier but the CTag changed"
+                      % (self,))
+
         if hasattr(self, 'cacheNotifier'):
             return self.cacheNotifier.changed()
         else:
             log.debug("%r does not have a cacheNotifier but the CTag changed"
                       % (self,))
+
         return succeed(True)
 
     ##
@@ -610,6 +618,7 @@
         @param path: the path to the file which will back the resource.
         """
         self.cacheNotifier = self.cacheNotifierFactory(self)
+        self.clientNotifier = ClientNotifier(self)
         CalDAVFile.__init__(self, path)
         DirectoryCalendarHomeResource.__init__(self, parent, record)
 
@@ -634,6 +643,7 @@
         if cls is not None:
             child = cls(self.fp.child(name).path, self)
             child.cacheNotifier = self.cacheNotifier
+            child.clientNotifier = self.clientNotifier
             return child
 
         return self.createSimilarFile(self.fp.child(name).path)
@@ -644,6 +654,7 @@
         else:
             similar = CalDAVFile(path, principalCollections=self.principalCollections())
             similar.cacheNotifier = self.cacheNotifier
+            similar.clientNotifier = self.clientNotifier
             return similar
 
     def getChild(self, name):
@@ -660,11 +671,29 @@
         else:
             qname = property.qname()
 
+
+        def _succeeded(result, propVal):
+            self.log_info("RESULT: SUCCESS")
+            return propVal
+
+        def _failed(failure):
+            self.log_info("RESULT: FAILURE")
+            return customxml.PubSubXMPPURIProperty()
+
         if qname == (customxml.calendarserver_namespace, "xmpp-uri"):
             pubSubConfiguration = getPubSubConfiguration(config)
             if pubSubConfiguration['enabled']:
-                return succeed(customxml.PubSubXMPPURIProperty(
-                    getPubSubXMPPURI(self.url(), pubSubConfiguration)))
+                if getattr(self, "clientNotifier", None) is not None:
+                    url = self.url()
+                    nodeName = getPubSubPath(url, pubSubConfiguration)
+                    propVal = customxml.PubSubXMPPURIProperty(
+                        getPubSubXMPPURI(url, pubSubConfiguration))
+
+                    nodeCacher = getNodeCacher()
+                    d = nodeCacher.waitForNode(self.clientNotifier, nodeName)
+                    d.addCallback(_succeeded, propVal)
+                    d.addErrback(_failed)
+                    return d
             else:
                 return succeed(customxml.PubSubXMPPURIProperty())
 

Modified: CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/test/test_notify.py
===================================================================
--- CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/test/test_notify.py	2008-09-09 15:41:12 UTC (rev 2951)
+++ CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/test/test_notify.py	2008-09-09 15:42:42 UTC (rev 2952)
@@ -23,12 +23,17 @@
 from twistedcaldav.config import Config
 
 
+class StubResource(object):
 
+    def __init__(self, url):
+        self._url = url
+
+    def url(self):
+        return self._url
+
+
 class NotificationClientUserTests(TestCase):
 
-    class NotificationClientUser(NotificationClientUserMixIn):
-        pass
-
     def test_installNoficationClient(self):
         self.assertEquals(getNotificationClient(), None)
         self.clock = Clock()
@@ -37,8 +42,11 @@
         notificationClient = getNotificationClient()
         self.assertNotEquals(notificationClient, None)
 
-        clientUser = self.NotificationClientUser()
-        clientUser.sendNotification("a")
+        enabledConfig = Config(config_mod.defaultConfig)
+        enabledConfig.Notifications['Enabled'] = True
+        clientNotifier = ClientNotifier(StubResource("a"),
+            configOverride=enabledConfig)
+        clientNotifier.notify()
         self.assertEquals(notificationClient.lines, ["a"])
 
 
@@ -67,7 +75,7 @@
         self.lines = []
         self.observers = set()
 
-    def send(self, uri):
+    def send(self, op, uri):
         self.lines.append(uri)
 
     def addObserver(self, observer):
@@ -103,25 +111,25 @@
         self.client.factory = StubNotificationClientFactory()
 
     def test_sendWhileNotConnected(self):
-        self.client.send("a")
-        self.assertEquals(self.client.queued, set(["a"]))
+        self.client.send("update", "a")
+        self.assertEquals(self.client.queued, set(["update a"]))
 
     def test_sendWhileConnected(self):
         protocol = StubNotificationClientProtocol()
         self.client.addObserver(protocol)
         self.client.factory.connected = True
-        self.client.send("a")
+        self.client.send("update", "a")
         self.assertEquals(self.client.queued, set())
-        self.assertEquals(protocol.lines, ["a"])
+        self.assertEquals(protocol.lines, ["update a"])
 
     def test_sendQueue(self):
-        self.client.send("a")
-        self.assertEquals(self.client.queued, set(["a"]))
+        self.client.send("update", "a")
+        self.assertEquals(self.client.queued, set(["update a"]))
         protocol = StubNotificationClientProtocol()
         self.client.addObserver(protocol)
         self.client.factory.connected = True
         self.client.connectionMade()
-        self.assertEquals(protocol.lines, ["a"])
+        self.assertEquals(protocol.lines, ["update a"])
         self.assertEquals(self.client.queued, set())
 
 
@@ -142,14 +150,14 @@
         self.coalescer = Coalescer([self.notifier], reactor=self.clock)
 
     def test_delayedNotifications(self):
-        self.coalescer.add("A")
+        self.coalescer.add("update", "A")
         self.assertEquals(self.notifier.notifications, [])
         self.clock.advance(5)
         self.assertEquals(self.notifier.notifications, ["A"])
 
     def test_removeDuplicates(self):
-        self.coalescer.add("A")
-        self.coalescer.add("A")
+        self.coalescer.add("update", "A")
+        self.coalescer.add("update", "A")
         self.clock.advance(5)
         self.assertEquals(self.notifier.notifications, ["A"])
 
@@ -161,7 +169,7 @@
         self.observers = set()
         self.playbackHistory = []
 
-    def enqueue(self, uri):
+    def enqueue(self, op, uri):
         self.notifications.append(uri)
 
     def playback(self, protocol, old_seq):
@@ -198,27 +206,27 @@
     def test_send(self):
         protocol = StubProtocol()
         self.notifier.addObserver(protocol)
-        self.notifier.enqueue("A")
+        self.notifier.enqueue("update", "A")
         self.assertEquals(protocol.lines, ["1 A"])
 
     def test_incrementSequence(self):
         protocol = StubProtocol()
         self.notifier.addObserver(protocol)
-        self.notifier.enqueue("A")
-        self.notifier.enqueue("B")
+        self.notifier.enqueue("update", "A")
+        self.notifier.enqueue("update", "B")
         self.assertEquals(protocol.lines, ["1 A", "2 B"])
 
     def test_addObserver(self):
         protocol = StubProtocol()
         self.notifier.addObserver(protocol)
-        self.notifier.enqueue("A")
+        self.notifier.enqueue("update", "A")
         self.assertEquals(protocol.lines, ["1 A"])
 
     def test_removeObserver(self):
         protocol = StubProtocol()
         self.notifier.addObserver(protocol)
         self.notifier.removeObserver(protocol)
-        self.notifier.enqueue("A")
+        self.notifier.enqueue("update", "A")
         self.assertEquals(protocol.lines, [])
 
     def test_multipleObservers(self):
@@ -226,7 +234,7 @@
         protocol2 = StubProtocol()
         self.notifier.addObserver(protocol1)
         self.notifier.addObserver(protocol2)
-        self.notifier.enqueue("A")
+        self.notifier.enqueue("update", "A")
         self.assertEquals(protocol1.lines, ["1 A"])
         self.assertEquals(protocol2.lines, ["1 A"])
 
@@ -234,20 +242,20 @@
         protocol = StubProtocol()
         self.notifier.addObserver(protocol)
         self.notifier.addObserver(protocol)
-        self.notifier.enqueue("A")
+        self.notifier.enqueue("update", "A")
         self.assertEquals(protocol.lines, ["1 A"])
 
     def test_playback(self):
-        self.notifier.enqueue("A")
-        self.notifier.enqueue("B")
-        self.notifier.enqueue("C")
+        self.notifier.enqueue("update", "A")
+        self.notifier.enqueue("update", "B")
+        self.notifier.enqueue("update", "C")
         protocol = StubProtocol()
         self.notifier.addObserver(protocol)
         self.notifier.playback(protocol, 1)
         self.assertEquals(protocol.lines, ["2 B", "3 C"])
 
     def test_reset(self):
-        self.notifier.enqueue("A")
+        self.notifier.enqueue("update", "A")
         self.assertEquals(self.notifier.history, {"A" : 1})
         self.assertEquals(self.notifier.latestSeq, 1)
         self.notifier.reset()
@@ -345,7 +353,7 @@
         self.notifier.streamOpened(self.xmlStream)
 
     def test_sendWhileConnected(self):
-        self.notifier.enqueue("/principals/__uids__/test")
+        self.notifier.enqueue("update", "/principals/__uids__/test")
 
         iq = self.xmlStream.elements[1]
         self.assertEquals(iq.name, "iq")
@@ -363,7 +371,7 @@
     def test_sendWhileNotConnected(self):
         notifier = XMPPNotifier(self.settings, reactor=Clock(),
             configOverride=self.xmppDisabledConfig)
-        notifier.enqueue("/principals/__uids__/test")
+        notifier.enqueue("update", "/principals/__uids__/test")
         self.assertEquals(len(self.xmlStream.elements), 1)
 
     def test_publishNewNode(self):
@@ -437,7 +445,8 @@
             fieldElement.addElement('value', content=field[1])
 
         self.assertEquals(len(self.xmlStream.elements), 1)
-        self.notifier.requestConfigurationFormSuccess(response, "testNodeName")
+        self.notifier.requestConfigurationFormSuccess(response, "testNodeName",
+            False)
         self.assertEquals(len(self.xmlStream.elements), 2)
 
         iq = self.xmlStream.elements[1]
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080909/818edf6c/attachment-0001.html 


More information about the calendarserver-changes mailing list