[CalendarServer-changes] [2952] CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav
source_changes at macosforge.org
source_changes at macosforge.org
Tue Sep 9 08:42:42 PDT 2008
Revision: 2952
http://trac.macosforge.org/projects/calendarserver/changeset/2952
Author: sagen at apple.com
Date: 2008-09-09 08:42:42 -0700 (Tue, 09 Sep 2008)
Log Message:
-----------
Checkpoint of work done so far; breaks out client notification class from memcachenotifier
Modified Paths:
--------------
CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/cache.py
CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/customxml.py
CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/directory/calendar.py
CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/notify.py
CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/static.py
CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/test/test_notify.py
Modified: CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/cache.py
===================================================================
--- CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/cache.py 2008-09-09 15:41:12 UTC (rev 2951)
+++ CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/cache.py 2008-09-09 15:42:42 UTC (rev 2952)
@@ -30,19 +30,12 @@
from twistedcaldav.config import config
from twistedcaldav.log import LoggingMixIn
from twistedcaldav.memcachepool import CachePoolUserMixIn
-from twistedcaldav.notify import NotificationClientUserMixIn
class DisabledCacheNotifier(object):
def __init__(self, *args, **kwargs):
pass
- def enableNotify(self, arg):
- pass
-
- def disableNotify(self):
- pass
-
def changed(self):
return succeed(None)
@@ -66,29 +59,12 @@
self.uri)
-#
-# FIXME: This should be a generic notifier class, not specific to
-# memcache, as evidenced by the addition of the sendNotification()
-# addition in changed() below.
-#
-class MemcacheChangeNotifier(LoggingMixIn, CachePoolUserMixIn,
- NotificationClientUserMixIn):
+class MemcacheChangeNotifier(LoggingMixIn, CachePoolUserMixIn):
def __init__(self, resource, cachePool=None):
self._resource = resource
self._cachePool = cachePool
- self._notify = True
- def enableNotify(self, arg):
- url = self._resource.url()
- self.log_debug("enableNotify: %s" % (url,))
- self._notify = True
-
- def disableNotify(self):
- url = self._resource.url()
- self.log_debug("disableNotify: %s" % (url,))
- self._notify = False
-
def _newCacheToken(self):
return str(uuid.uuid4())
@@ -98,16 +74,8 @@
return: A L{Deferred} that fires when the token has been changed.
"""
-
url = self._resource.url()
- if config.Notifications["Enabled"]:
- if self._notify:
- self.log_debug("Notifications are enabled: %s" % (url,))
- self.sendNotification(url)
- else:
- self.log_debug("Skipping notification for: %s" % (url,))
-
self.log_debug("Changing Cache Token for %r" % (url,))
return self.getCachePool().set(
'cacheToken:%s' % (url,),
Modified: CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/customxml.py
===================================================================
--- CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/customxml.py 2008-09-09 15:41:12 UTC (rev 2951)
+++ CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/customxml.py 2008-09-09 15:42:42 UTC (rev 2952)
@@ -252,6 +252,7 @@
namespace = calendarserver_namespace
name = "xmpp-uri"
protected = True
+ hidden = True
class PubSubHeartbeatURIProperty (davxml.WebDAVTextElement):
"""
@@ -261,6 +262,7 @@
namespace = calendarserver_namespace
name = "xmpp-heartbeat-uri"
protected = True
+ hidden = True
class PubSubXMPPServerProperty (davxml.WebDAVTextElement):
"""
@@ -270,6 +272,7 @@
namespace = calendarserver_namespace
name = "xmpp-server"
protected = True
+ hidden = True
class IScheduleInbox (davxml.WebDAVEmptyElement):
"""
Modified: CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/directory/calendar.py
===================================================================
--- CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/directory/calendar.py 2008-09-09 15:41:12 UTC (rev 2951)
+++ CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/directory/calendar.py 2008-09-09 15:42:42 UTC (rev 2952)
@@ -281,8 +281,8 @@
def provisionDefaultCalendars(self):
# Disable notifications during provisioning
- if hasattr(self, "cacheNotifier"):
- self.cacheNotifier.disableNotify()
+ if hasattr(self, "clientNotifier"):
+ self.clientNotifier.disableNotify()
def setupFreeBusy(_):
# Default calendar is initially opaque to freebusy
@@ -317,14 +317,14 @@
except:
# We want to make sure to re-enable notifications, so do so
# if there is an immediate exception above, or via errback, below
- if hasattr(self, "cacheNotifier"):
- self.cacheNotifier.enableNotify(None)
+ if hasattr(self, "clientNotifier"):
+ self.clientNotifier.enableNotify(None)
raise
# Re-enable notifications
- if hasattr(self, "cacheNotifier"):
- d.addCallback(self.cacheNotifier.enableNotify)
- d.addErrback(self.cacheNotifier.enableNotify)
+ if hasattr(self, "clientNotifier"):
+ d.addCallback(self.clientNotifier.enableNotify)
+ d.addErrback(self.clientNotifier.enableNotify)
return d
Modified: CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/notify.py
===================================================================
--- CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/notify.py 2008-09-09 15:41:12 UTC (rev 2951)
+++ CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/notify.py 2008-09-09 15:42:42 UTC (rev 2952)
@@ -35,7 +35,9 @@
# TODO: bindAddress to local
# TODO: add CalDAVTester test for examining new xmpp-uri property
-from twisted.internet import protocol
+from twisted.internet import protocol, defer
+from twisted.internet.address import IPv4Address
+from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.protocols import basic
from twisted.plugin import IPlugin
from twisted.application import internet, service
@@ -48,11 +50,15 @@
from twisted.words.xish import domish
from twistedcaldav.log import LoggingMixIn
from twistedcaldav.config import config, parseConfig, defaultConfig
+from twistedcaldav.memcacher import Memcacher
+from twistedcaldav import memcachepool
from zope.interface import Interface, implements
from fnmatch import fnmatch
__all__ = [
+ "ClientNotifier",
"Coalescer",
+ "getNodeCacher",
"getNotificationClient",
"getPubSubConfiguration",
"getPubSubHeartbeatURI",
@@ -65,8 +71,6 @@
"NotificationClient",
"NotificationClientFactory",
"NotificationClientLineProtocol",
- "NotificationClientUserMixIn",
- "NotificationOptions",
"NotificationServiceMaker",
"SimpleLineNotificationFactory",
"SimpleLineNotificationProtocol",
@@ -80,18 +84,37 @@
# Classes used within calendarserver itself
#
-class NotificationClientUserMixIn(object):
+class ClientNotifier(LoggingMixIn):
"""
- Notification Client User (Mixin)
-
Provides a method to send change notifications to the L{NotificationClient}.
"""
- def sendNotification(self, uri):
- getNotificationClient().send(uri)
+ def __init__(self, resource, configOverride=None):
+ self._resource = resource
+ self._notify = True
+ self.config = configOverride or config
+ def enableNotify(self, arg):
+ url = self._resource.url()
+ self.log_debug("enableNotify: %s" % (url,))
+ self._notify = True
+ def disableNotify(self):
+ url = self._resource.url()
+ self.log_debug("disableNotify: %s" % (url,))
+ self._notify = False
+ def notify(self, op="update"):
+ url = self._resource.url()
+
+ if self.config.Notifications["Enabled"]:
+ if self._notify:
+ self.log_debug("Notifications are enabled: %s %s" % (op, url))
+ return getNotificationClient().send(op, url)
+ else:
+ self.log_debug("Skipping notification for: %s" % (url,))
+
+
class NotificationClientLineProtocol(basic.LineReceiver, LoggingMixIn):
"""
Notification Client Line Protocol
@@ -154,7 +177,7 @@
"""
Notification Client
- Forwards on notifications from NotificationClientUserMixIns to the
+ Forwards on notifications from ClientNotifiers to the
notification server. A NotificationClient is installed by the tap at
startup.
"""
@@ -170,26 +193,27 @@
from twisted.internet import reactor
self.reactor = reactor
- def send(self, uri):
+ def send(self, op, uri):
if self.factory is None:
self.factory = NotificationClientFactory(self)
self.reactor.connectTCP(self.host, self.port, self.factory)
self.log_debug("Creating factory")
+ msg = "%s %s" % (op, str(uri))
if self.factory.isReady() and self.observers:
for observer in self.observers:
- self.log_debug("Sending to notification server: %s" % (uri,))
- observer.sendLine(str(uri))
+ self.log_debug("Sending to notification server: %s" % (msg,))
+ observer.sendLine(msg)
else:
- self.log_debug("Queuing: %s" % (uri,))
- self.queued.add(uri)
+ self.log_debug("Queuing: %s" % (msg,))
+ self.queued.add(msg)
def connectionMade(self):
if self.factory.isReady() and self.observers:
for observer in self.observers:
- for uri in self.queued:
- self.log_debug("Sending from queue: %s" % (uri,))
- observer.sendLine(str(uri))
+ for msg in self.queued:
+ self.log_debug("Sending from queue: %s" % (msg,))
+ observer.sendLine(msg)
self.queued.clear()
def addObserver(self, observer):
@@ -211,9 +235,84 @@
+class NodeCacher(Memcacher, LoggingMixIn):
+ def __init__(self, reactor=None):
+ if reactor is None:
+ from twisted.internet import reactor
+ self.reactor = reactor
+ super(NodeCacher, self).__init__("pubsubnodes")
+ @inlineCallbacks
+ def nodeExists(self, nodeName):
+ result = (yield self.get(nodeName))
+ self.log_debug("nodeExists result = %s" % (result,))
+ returnValue(result is not None)
+ @inlineCallbacks
+ def storeNode(self, nodeName):
+ return
+ self.log_debug("Storing node %s" % (nodeName,))
+ try:
+ yield self.set(nodeName, "1")
+ except Exception, e:
+ import pdb; pdb.set_trace()
+ self.log_error(e)
+ raise
+
+ @inlineCallbacks
+ def waitForNode(self, notifier, nodeName):
+ self.log_debug("in waitForNode %s" % (nodeName,))
+ doesExist = (yield self.nodeExists(nodeName))
+ self.log_debug("doesExist = %s" % (doesExist,))
+ if doesExist:
+ self.log_debug("waitForNode returning True")
+ returnValue(True)
+ else:
+ self.log_debug("waitForNode calling notify()")
+ notifier.notify(op="create")
+ self.log_debug("waitForNode called notify()")
+ (yield self._waitForNode(None, nodeName))
+
+ def _waitForNode(self, result, nodeName, retries=5, deferred=None):
+ self.log_debug("waiting for node %s, retries %d" % (nodeName, retries))
+
+ if deferred == None:
+ deferred = defer.Deferred()
+
+ def _exists(result, nodeName, retries, deferred):
+ if result is True:
+ self.log_debug("node exists %s" % (nodeName,))
+ deferred.callback(True)
+ return
+ else:
+ retries -= 1
+ if retries == 0:
+ self.log_debug("giving up on node %s" % (nodeName,))
+ deferred.errback()
+ return
+ self.log_debug("scheduling a retry of node %s" % (nodeName,))
+ self.reactor.callLater(2, self._waitForNode, result, nodeName,
+ retries=retries, deferred=deferred)
+
+ self.nodeExists(nodeName).addCallback(_exists, nodeName, retries,
+ deferred)
+ return deferred
+
+
+
+_nodeCacher = None
+
+def getNodeCacher():
+ global _nodeCacher
+ if _nodeCacher is None:
+ _nodeCacher = NodeCacher()
+ return _nodeCacher
+
+
+
+
+
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
# Classes used within Notification Server
#
@@ -230,8 +329,8 @@
"""
def lineReceived(self, line):
- val = str(line.strip())
- self.factory.coalescer.add(val)
+ op, uri = line.strip().split()
+ self.factory.coalescer.add(op, uri)
class InternalNotificationFactory(protocol.ServerFactory):
@@ -277,28 +376,37 @@
self.uris = {}
self.notifiers = notifiers
- def add(self, uri):
- delayed, count = self.uris.get(uri, [None, 0])
+ def add(self, op, uri):
- if delayed and delayed.active():
- count += 1
- if count < self.sendAnywayAfterCount:
- # reschedule for delaySeconds in the future
- delayed.reset(self.delaySeconds)
- self.uris[uri][1] = count
- self.log_info("Delaying: %s" % (uri,))
+ if op == "create":
+ # we don't want to delay a "create" notification; this opcode
+ # is meant for XMPP pubsub -- it means create and configure the
+ # node but don't publish to it
+ for notifier in self.notifiers:
+ notifier.enqueue(op, uri)
+
+ else: # normal update notification
+ delayed, count = self.uris.get(uri, [None, 0])
+
+ if delayed and delayed.active():
+ count += 1
+ if count < self.sendAnywayAfterCount:
+ # reschedule for delaySeconds in the future
+ delayed.reset(self.delaySeconds)
+ self.uris[uri][1] = count
+ self.log_info("Delaying: %s" % (uri,))
+ else:
+ self.log_info("Not delaying to avoid starvation: %s" % (uri,))
else:
- self.log_info("Not delaying to avoid starvation: %s" % (uri,))
- else:
- self.log_info("Scheduling: %s" % (uri,))
- self.uris[uri] = [self.reactor.callLater(self.delaySeconds,
- self.delayedEnqueue, uri), 0]
+ self.log_info("Scheduling: %s" % (uri,))
+ self.uris[uri] = [self.reactor.callLater(self.delaySeconds,
+ self.delayedEnqueue, op, uri), 0]
- def delayedEnqueue(self, uri):
+ def delayedEnqueue(self, op, uri):
self.log_info("Time to send: %s" % (uri,))
self.uris[uri][1] = 0
for notifier in self.notifiers:
- notifier.enqueue(uri)
+ notifier.enqueue(op, uri)
@@ -313,11 +421,12 @@
Defines an enqueue method that Notifier classes need to implement.
"""
- def enqueue(uri):
+ def enqueue(self, op, uri):
"""
Let's the notifier object know that a change has been made for this
uri, and enough time has passed to allow for coalescence.
+ @type op: C{str}
@type uri: C{str}
"""
@@ -351,18 +460,20 @@
self.observers = set()
self.sentReset = False
- def enqueue(self, uri):
+ def enqueue(self, op, uri):
- self.latestSeq += 1L
+ if op == "update":
- # Update history
- self.history[uri] = self.latestSeq
+ self.latestSeq += 1L
- for observer in self.observers:
- msg = "%d %s" % (self.latestSeq, uri)
- self.log_debug("Sending %s" % (msg,))
- observer.sendLine(msg)
+ # Update history
+ self.history[uri] = self.latestSeq
+ for observer in self.observers:
+ msg = "%d %s" % (self.latestSeq, uri)
+ self.log_debug("Sending %s" % (msg,))
+ observer.sendLine(msg)
+
def reset(self):
self.latestSeq = 0L
self.history = { } # keys=uri, values=sequenceNumber
@@ -518,15 +629,22 @@
def sendHeartbeat(self):
if self.doHeartbeat and self.xmlStream is not None:
- self.enqueue("", lock=False)
+ self.enqueue("update", "", lock=False)
self.reactor.callLater(self.settings['HeartbeatSeconds'],
self.sendHeartbeat)
- def enqueue(self, uri, lock=True):
+ def enqueue(self, op, uri, lock=True):
if self.xmlStream is not None:
# Convert uri to node
nodeName = self.uriToNodeName(uri)
- self.publishNode(nodeName, lock=lock)
+ if op == "create":
+ if not self.lockNode(nodeName):
+ # this node is busy, so it must already be created, or at
+ # least in the proccess
+ return
+ self.createNode(nodeName, publish=False)
+ else:
+ self.publishNode(nodeName, lock=lock)
def uriToNodeName(self, uri):
return getPubSubPath(uri, getPubSubConfiguration(self.config))
@@ -569,7 +687,7 @@
if iq.name == "error":
if iq['code'] == '400':
- self.requestConfigurationForm(nodeName)
+ self.requestConfigurationForm(nodeName, True)
elif iq['code'] == '404':
self.createNode(nodeName)
@@ -580,7 +698,7 @@
self.unlockNode(None, nodeName)
raise
- def createNode(self, nodeName):
+ def createNode(self, nodeName, publish=True):
if self.xmlStream is None:
# We lost our connection
self.unlockNode(None, nodeName)
@@ -592,32 +710,39 @@
child = pubsubElement.addElement('create')
child['node'] = nodeName
d = iq.send(to=self.settings['ServiceAddress'])
- d.addCallback(self.createNodeSuccess, nodeName)
- d.addErrback(self.createNodeFailure, nodeName)
+ d.addCallback(self.createNodeSuccess, nodeName, publish)
+ d.addErrback(self.createNodeFailure, nodeName, publish)
except:
self.unlockNode(None, nodeName)
raise
- def createNodeSuccess(self, iq, nodeName):
+ def createNodeSuccess(self, iq, nodeName, publish):
try:
self.sendDebug("Node creation successful (%s)" % (nodeName,), iq)
# now time to configure; fetch the form
- self.requestConfigurationForm(nodeName)
+ self.requestConfigurationForm(nodeName, publish)
except:
self.unlockNode(None, nodeName)
raise
- def createNodeFailure(self, result, nodeName):
- # If we get here we're giving up
+ def createNodeFailure(self, result, nodeName, publish):
try:
iq = result.value.getElement()
- self.log_error("PubSub node creation error: %s" %
- (iq.toXml().encode('ascii', 'replace')),)
- self.sendError("Node creation failed (%s)" % (nodeName,), iq)
- finally:
+ if iq['code'] == '409':
+ # node already exists, proceed to configure
+ self.sendDebug("Node already exists (%s)" % (nodeName,), iq)
+ self.requestConfigurationForm(nodeName, publish)
+ else:
+ # couldn't create node, give up
+ self.unlockNode(None, nodeName)
+ self.log_error("PubSub node creation error: %s" %
+ (iq.toXml().encode('ascii', 'replace')),)
+ self.sendError("Node creation failed (%s)" % (nodeName,), iq)
+ except:
self.unlockNode(None, nodeName)
+ raise
- def requestConfigurationForm(self, nodeName):
+ def requestConfigurationForm(self, nodeName, publish):
if self.xmlStream is None:
# We lost our connection
self.unlockNode(None, nodeName)
@@ -630,7 +755,8 @@
child = child.addElement('configure')
child['node'] = nodeName
d = iq.send(to=self.settings['ServiceAddress'])
- d.addCallback(self.requestConfigurationFormSuccess, nodeName)
+ d.addCallback(self.requestConfigurationFormSuccess, nodeName,
+ publish)
d.addErrback(self.requestConfigurationFormFailure, nodeName)
except:
self.unlockNode(None, nodeName)
@@ -642,7 +768,7 @@
return child
return None
- def requestConfigurationFormSuccess(self, iq, nodeName):
+ def requestConfigurationFormSuccess(self, iq, nodeName, publish):
if self.xmlStream is None:
# We lost our connection
self.unlockNode(None, nodeName)
@@ -683,7 +809,8 @@
self.sendDebug("Sending configuration form (%s)"
% (nodeName,), filledIq)
d = filledIq.send(to=self.settings['ServiceAddress'])
- d.addCallback(self.configurationSuccess, nodeName)
+ d.addCallback(self.configurationSuccess, nodeName,
+ publish)
d.addErrback(self.configurationFailure, nodeName)
return
@@ -706,7 +833,7 @@
finally:
self.unlockNode(None, nodeName)
- def configurationSuccess(self, iq, nodeName):
+ def configurationSuccess(self, iq, nodeName, publish):
if self.xmlStream is None:
# We lost our connection
self.unlockNode(None, nodeName)
@@ -715,7 +842,12 @@
try:
self.log_debug("PubSub node %s is configured" % (nodeName,))
self.sendDebug("Configured node (%s)" % (nodeName,), iq)
- self.publishNode(nodeName, lock=False)
+ nodeCacher = getNodeCacher()
+ nodeCacher.storeNode(nodeName)
+ if publish:
+ self.publishNode(nodeName, lock=False)
+ else:
+ self.unlockNode(None, nodeName)
except:
self.unlockNode(None, nodeName)
raise
@@ -852,7 +984,7 @@
if frm in self.roster:
txt = str(body).lower()
if txt == "help":
- response = "debug on, debug off, roster, publish <nodename>, hammer <count>"
+ response = "debug on, debug off, roster, create <nodename>, publish <nodename>, hammer <count>"
elif txt == "roster":
response = "Roster: %s" % (str(self.roster),)
elif txt == "debug on":
@@ -870,7 +1002,17 @@
response = "Please phrase it like 'publish nodename'"
else:
response = "Publishing node %s" % (nodeName,)
- self.reactor.callLater(1, self.publishNode, nodeName)
+ self.reactor.callLater(1, self.enqueue, "update",
+ nodeName)
+ elif txt.startswith("create"):
+ try:
+ publish, nodeName = txt.split()
+ except ValueError:
+ response = "Please phrase it like 'create nodename'"
+ else:
+ response = "Creating and configuring node %s" % (nodeName,)
+ self.reactor.callLater(1, self.enqueue, "create",
+ nodeName)
elif txt.startswith("hammer"):
try:
hammer, count = txt.split()
@@ -894,7 +1036,7 @@
def hammer(self, count):
for i in xrange(count):
- self.enqueue("hammertesting%d" % (i,))
+ self.enqueue("update", "hammertesting%d" % (i,))
class XMPPNotificationFactory(xmlstream.XmlStreamFactory, LoggingMixIn):
@@ -1101,6 +1243,17 @@
def makeService(self, options):
+ #
+ # Configure Memcached Client Pool
+ #
+ if config.Memcached["ClientEnabled"]:
+ memcachepool.installPool(
+ IPv4Address(
+ 'TCP',
+ config.Memcached["BindAddress"],
+ config.Memcached["Port"]),
+ config.Memcached["MaxClients"])
+
multiService = service.MultiService()
notifiers = []
@@ -1126,8 +1279,8 @@
self.server = internet.TCPServer(settings["Port"],
SimpleLineNotificationFactory(self.notifier))
- def enqueue(self, uri):
- self.notifier.enqueue(uri)
+ def enqueue(self, op, uri):
+ self.notifier.enqueue(op, uri)
def startService(self):
self.server.startService()
@@ -1143,8 +1296,8 @@
self.client = internet.TCPClient(settings["Host"], settings["Port"],
XMPPNotificationFactory(self.notifier, settings))
- def enqueue(self, uri):
- self.notifier.enqueue(uri)
+ def enqueue(self, op, uri):
+ self.notifier.enqueue(op, uri)
def startService(self):
self.client.startService()
Modified: CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/static.py
===================================================================
--- CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/static.py 2008-09-09 15:41:12 UTC (rev 2951)
+++ CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/static.py 2008-09-09 15:42:42 UTC (rev 2952)
@@ -77,7 +77,8 @@
from twistedcaldav.timezoneservice import TimezoneServiceResource
from twistedcaldav.cache import DisabledCacheNotifier, PropfindCacheMixin
from twistedcaldav.notify import getPubSubConfiguration, getPubSubXMPPURI
-from twistedcaldav.notify import getPubSubHeartbeatURI
+from twistedcaldav.notify import getPubSubHeartbeatURI, getPubSubPath
+from twistedcaldav.notify import ClientNotifier, getNodeCacher
log = Logger()
@@ -332,11 +333,18 @@
except:
return fail(Failure())
+ if hasattr(self, 'clientNotifier'):
+ self.clientNotifier.notify(op="update")
+ else:
+ log.debug("%r does not have a clientNotifier but the CTag changed"
+ % (self,))
+
if hasattr(self, 'cacheNotifier'):
return self.cacheNotifier.changed()
else:
log.debug("%r does not have a cacheNotifier but the CTag changed"
% (self,))
+
return succeed(True)
##
@@ -610,6 +618,7 @@
@param path: the path to the file which will back the resource.
"""
self.cacheNotifier = self.cacheNotifierFactory(self)
+ self.clientNotifier = ClientNotifier(self)
CalDAVFile.__init__(self, path)
DirectoryCalendarHomeResource.__init__(self, parent, record)
@@ -634,6 +643,7 @@
if cls is not None:
child = cls(self.fp.child(name).path, self)
child.cacheNotifier = self.cacheNotifier
+ child.clientNotifier = self.clientNotifier
return child
return self.createSimilarFile(self.fp.child(name).path)
@@ -644,6 +654,7 @@
else:
similar = CalDAVFile(path, principalCollections=self.principalCollections())
similar.cacheNotifier = self.cacheNotifier
+ similar.clientNotifier = self.clientNotifier
return similar
def getChild(self, name):
@@ -660,11 +671,29 @@
else:
qname = property.qname()
+
+ def _succeeded(result, propVal):
+ self.log_info("RESULT: SUCCESS")
+ return propVal
+
+ def _failed(failure):
+ self.log_info("RESULT: FAILURE")
+ return customxml.PubSubXMPPURIProperty()
+
if qname == (customxml.calendarserver_namespace, "xmpp-uri"):
pubSubConfiguration = getPubSubConfiguration(config)
if pubSubConfiguration['enabled']:
- return succeed(customxml.PubSubXMPPURIProperty(
- getPubSubXMPPURI(self.url(), pubSubConfiguration)))
+ if getattr(self, "clientNotifier", None) is not None:
+ url = self.url()
+ nodeName = getPubSubPath(url, pubSubConfiguration)
+ propVal = customxml.PubSubXMPPURIProperty(
+ getPubSubXMPPURI(url, pubSubConfiguration))
+
+ nodeCacher = getNodeCacher()
+ d = nodeCacher.waitForNode(self.clientNotifier, nodeName)
+ d.addCallback(_succeeded, propVal)
+ d.addErrback(_failed)
+ return d
else:
return succeed(customxml.PubSubXMPPURIProperty())
Modified: CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/test/test_notify.py
===================================================================
--- CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/test/test_notify.py 2008-09-09 15:41:12 UTC (rev 2951)
+++ CalendarServer/branches/users/sagen/create-nodes-2950/twistedcaldav/test/test_notify.py 2008-09-09 15:42:42 UTC (rev 2952)
@@ -23,12 +23,17 @@
from twistedcaldav.config import Config
+class StubResource(object):
+ def __init__(self, url):
+ self._url = url
+
+ def url(self):
+ return self._url
+
+
class NotificationClientUserTests(TestCase):
- class NotificationClientUser(NotificationClientUserMixIn):
- pass
-
def test_installNoficationClient(self):
self.assertEquals(getNotificationClient(), None)
self.clock = Clock()
@@ -37,8 +42,11 @@
notificationClient = getNotificationClient()
self.assertNotEquals(notificationClient, None)
- clientUser = self.NotificationClientUser()
- clientUser.sendNotification("a")
+ enabledConfig = Config(config_mod.defaultConfig)
+ enabledConfig.Notifications['Enabled'] = True
+ clientNotifier = ClientNotifier(StubResource("a"),
+ configOverride=enabledConfig)
+ clientNotifier.notify()
self.assertEquals(notificationClient.lines, ["a"])
@@ -67,7 +75,7 @@
self.lines = []
self.observers = set()
- def send(self, uri):
+ def send(self, op, uri):
self.lines.append(uri)
def addObserver(self, observer):
@@ -103,25 +111,25 @@
self.client.factory = StubNotificationClientFactory()
def test_sendWhileNotConnected(self):
- self.client.send("a")
- self.assertEquals(self.client.queued, set(["a"]))
+ self.client.send("update", "a")
+ self.assertEquals(self.client.queued, set(["update a"]))
def test_sendWhileConnected(self):
protocol = StubNotificationClientProtocol()
self.client.addObserver(protocol)
self.client.factory.connected = True
- self.client.send("a")
+ self.client.send("update", "a")
self.assertEquals(self.client.queued, set())
- self.assertEquals(protocol.lines, ["a"])
+ self.assertEquals(protocol.lines, ["update a"])
def test_sendQueue(self):
- self.client.send("a")
- self.assertEquals(self.client.queued, set(["a"]))
+ self.client.send("update", "a")
+ self.assertEquals(self.client.queued, set(["update a"]))
protocol = StubNotificationClientProtocol()
self.client.addObserver(protocol)
self.client.factory.connected = True
self.client.connectionMade()
- self.assertEquals(protocol.lines, ["a"])
+ self.assertEquals(protocol.lines, ["update a"])
self.assertEquals(self.client.queued, set())
@@ -142,14 +150,14 @@
self.coalescer = Coalescer([self.notifier], reactor=self.clock)
def test_delayedNotifications(self):
- self.coalescer.add("A")
+ self.coalescer.add("update", "A")
self.assertEquals(self.notifier.notifications, [])
self.clock.advance(5)
self.assertEquals(self.notifier.notifications, ["A"])
def test_removeDuplicates(self):
- self.coalescer.add("A")
- self.coalescer.add("A")
+ self.coalescer.add("update", "A")
+ self.coalescer.add("update", "A")
self.clock.advance(5)
self.assertEquals(self.notifier.notifications, ["A"])
@@ -161,7 +169,7 @@
self.observers = set()
self.playbackHistory = []
- def enqueue(self, uri):
+ def enqueue(self, op, uri):
self.notifications.append(uri)
def playback(self, protocol, old_seq):
@@ -198,27 +206,27 @@
def test_send(self):
protocol = StubProtocol()
self.notifier.addObserver(protocol)
- self.notifier.enqueue("A")
+ self.notifier.enqueue("update", "A")
self.assertEquals(protocol.lines, ["1 A"])
def test_incrementSequence(self):
protocol = StubProtocol()
self.notifier.addObserver(protocol)
- self.notifier.enqueue("A")
- self.notifier.enqueue("B")
+ self.notifier.enqueue("update", "A")
+ self.notifier.enqueue("update", "B")
self.assertEquals(protocol.lines, ["1 A", "2 B"])
def test_addObserver(self):
protocol = StubProtocol()
self.notifier.addObserver(protocol)
- self.notifier.enqueue("A")
+ self.notifier.enqueue("update", "A")
self.assertEquals(protocol.lines, ["1 A"])
def test_removeObserver(self):
protocol = StubProtocol()
self.notifier.addObserver(protocol)
self.notifier.removeObserver(protocol)
- self.notifier.enqueue("A")
+ self.notifier.enqueue("update", "A")
self.assertEquals(protocol.lines, [])
def test_multipleObservers(self):
@@ -226,7 +234,7 @@
protocol2 = StubProtocol()
self.notifier.addObserver(protocol1)
self.notifier.addObserver(protocol2)
- self.notifier.enqueue("A")
+ self.notifier.enqueue("update", "A")
self.assertEquals(protocol1.lines, ["1 A"])
self.assertEquals(protocol2.lines, ["1 A"])
@@ -234,20 +242,20 @@
protocol = StubProtocol()
self.notifier.addObserver(protocol)
self.notifier.addObserver(protocol)
- self.notifier.enqueue("A")
+ self.notifier.enqueue("update", "A")
self.assertEquals(protocol.lines, ["1 A"])
def test_playback(self):
- self.notifier.enqueue("A")
- self.notifier.enqueue("B")
- self.notifier.enqueue("C")
+ self.notifier.enqueue("update", "A")
+ self.notifier.enqueue("update", "B")
+ self.notifier.enqueue("update", "C")
protocol = StubProtocol()
self.notifier.addObserver(protocol)
self.notifier.playback(protocol, 1)
self.assertEquals(protocol.lines, ["2 B", "3 C"])
def test_reset(self):
- self.notifier.enqueue("A")
+ self.notifier.enqueue("update", "A")
self.assertEquals(self.notifier.history, {"A" : 1})
self.assertEquals(self.notifier.latestSeq, 1)
self.notifier.reset()
@@ -345,7 +353,7 @@
self.notifier.streamOpened(self.xmlStream)
def test_sendWhileConnected(self):
- self.notifier.enqueue("/principals/__uids__/test")
+ self.notifier.enqueue("update", "/principals/__uids__/test")
iq = self.xmlStream.elements[1]
self.assertEquals(iq.name, "iq")
@@ -363,7 +371,7 @@
def test_sendWhileNotConnected(self):
notifier = XMPPNotifier(self.settings, reactor=Clock(),
configOverride=self.xmppDisabledConfig)
- notifier.enqueue("/principals/__uids__/test")
+ notifier.enqueue("update", "/principals/__uids__/test")
self.assertEquals(len(self.xmlStream.elements), 1)
def test_publishNewNode(self):
@@ -437,7 +445,8 @@
fieldElement.addElement('value', content=field[1])
self.assertEquals(len(self.xmlStream.elements), 1)
- self.notifier.requestConfigurationFormSuccess(response, "testNodeName")
+ self.notifier.requestConfigurationFormSuccess(response, "testNodeName",
+ False)
self.assertEquals(len(self.xmlStream.elements), 2)
iq = self.xmlStream.elements[1]
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080909/818edf6c/attachment-0001.html
More information about the calendarserver-changes
mailing list