[CalendarServer-changes] [2807] CalendarServer/trunk
source_changes at macosforge.org
source_changes at macosforge.org
Tue Aug 12 12:45:36 PDT 2008
Revision: 2807
http://trac.macosforge.org/projects/calendarserver/changeset/2807
Author: sagen at apple.com
Date: 2008-08-12 12:45:36 -0700 (Tue, 12 Aug 2008)
Log Message:
-----------
Landing my xmpp-2764 branch: adds xmpp auto-subscription (for interacting with server), starvation prevention, and sends (empty) plistfrag payloads.
Modified Paths:
--------------
CalendarServer/trunk/conf/caldavd-test.plist
CalendarServer/trunk/twistedcaldav/config.py
CalendarServer/trunk/twistedcaldav/notify.py
CalendarServer/trunk/twistedcaldav/test/test_notify.py
Modified: CalendarServer/trunk/conf/caldavd-test.plist
===================================================================
--- CalendarServer/trunk/conf/caldavd-test.plist 2008-08-12 14:51:27 UTC (rev 2806)
+++ CalendarServer/trunk/conf/caldavd-test.plist 2008-08-12 19:45:36 UTC (rev 2807)
@@ -460,9 +460,13 @@
<key>KeepAliveSeconds</key>
<integer>120</integer>
- <!-- Sends messages to this account for debugging -->
- <key>TestJID</key>
- <string></string>
+ <!-- List of glob-like expressions defining which XMPP JIDs can converse with the server -->
+ <key>AllowedJIDs</key>
+ <array>
+ <!--
+ <string>*.apple.com</string>
+ -->
+ </array>
</dict>
</array>
</dict>
Modified: CalendarServer/trunk/twistedcaldav/config.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/config.py 2008-08-12 14:51:27 UTC (rev 2806)
+++ CalendarServer/trunk/twistedcaldav/config.py 2008-08-12 19:45:36 UTC (rev 2807)
@@ -200,7 +200,7 @@
"Password" : "",
"ServiceAddress" : "", # "pubsub.xmpp.host.name"
"KeepAliveSeconds" : 120,
- "TestJID": "",
+ "AllowedJIDs": [],
},
]
},
@@ -483,7 +483,7 @@
service["Enabled"]
):
for key, value in service.iteritems():
- if not value and key not in ("TestJID"):
+ if not value and key not in ("AllowedJIDs"):
raise ConfigurationError("Invalid %s for XMPPNotifierService: %r"
% (key, value))
Modified: CalendarServer/trunk/twistedcaldav/notify.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/notify.py 2008-08-12 14:51:27 UTC (rev 2806)
+++ CalendarServer/trunk/twistedcaldav/notify.py 2008-08-12 19:45:36 UTC (rev 2807)
@@ -49,6 +49,7 @@
from twistedcaldav.log import LoggingMixIn
from twistedcaldav.config import config, parseConfig, defaultConfig
from zope.interface import Interface, implements
+from fnmatch import fnmatch
__all__ = [
"Coalescer",
@@ -179,7 +180,7 @@
self.log_debug("Sending to notification server: %s" % (uri,))
observer.sendLine(str(uri))
else:
- self.log_debug("Queing: %s" % (uri,))
+ self.log_debug("Queuing: %s" % (uri,))
self.queued.add(uri)
def connectionMade(self):
@@ -257,9 +258,14 @@
"""
delaySeconds = 5
+ sendAnywayAfterCount = 5
- def __init__(self, notifiers, reactor=None, delaySeconds=None):
+ def __init__(self, notifiers, reactor=None, delaySeconds=None,
+ sendAnywayAfterCount=None):
+ if sendAnywayAfterCount:
+ self.sendAnywayAfterCount = sendAnywayAfterCount
+
if delaySeconds:
self.delaySeconds = delaySeconds
@@ -267,18 +273,29 @@
from twisted.internet import reactor
self.reactor = reactor
- self.uris = dict()
+ self.uris = {}
self.notifiers = notifiers
def add(self, uri):
- delayed = self.uris.get(uri, None)
+ delayed, count = self.uris.get(uri, [None, 0])
+
if delayed and delayed.active():
- delayed.reset(self.delaySeconds)
+ count += 1
+ if count < self.sendAnywayAfterCount:
+ # reschedule for delaySeconds in the future
+ delayed.reset(self.delaySeconds)
+ self.uris[uri][1] = count
+ self.log_info("Delaying: %s" % (uri,))
+ else:
+ self.log_info("Not delaying to avoid starvation: %s" % (uri,))
else:
- self.uris[uri] = self.reactor.callLater(self.delaySeconds,
- self.delayedEnqueue, uri)
+ self.log_info("Scheduling: %s" % (uri,))
+ self.uris[uri] = [self.reactor.callLater(self.delaySeconds,
+ self.delayedEnqueue, uri), 0]
def delayedEnqueue(self, uri):
+ self.log_info("Time to send: %s" % (uri,))
+ self.uris[uri][1] = 0
for notifier in self.notifiers:
notifier.enqueue(uri)
@@ -467,7 +484,7 @@
pubsubNS = 'http://jabber.org/protocol/pubsub'
nodeConf = {
- 'pubsub#deliver_payloads': '0',
+ 'pubsub#deliver_payloads': '1',
'pubsub#persist_items' : '0',
}
@@ -479,7 +496,7 @@
self.reactor = reactor
self.config = configOverride or config
- self.sendDebugMessages = False
+ self.roster = {}
def enqueue(self, uri):
if self.xmlStream is not None:
@@ -496,6 +513,10 @@
pubsubElement = iq.addElement('pubsub', defaultUri=self.pubsubNS)
publishElement = pubsubElement.addElement('publish')
publishElement['node'] = nodeName
+ itemElement = publishElement.addElement('item')
+ payloadElement = itemElement.addElement('plistfrag',
+ defaultUri='plist-apple')
+ self.sendDebug("Publishing (%s)" % (nodeName,), iq)
iq.addCallback(self.responseFromPublish, nodeName)
iq.send(to=self.settings['ServiceAddress'])
@@ -588,7 +609,7 @@
filledField['type'] = field['type']
valueElement = filledField.addElement('value')
valueElement.addContent(value)
- filledForm.addChild(field)
+ # filledForm.addChild(field)
filledIq.addCallback(self.responseFromConfiguration,
nodeName)
self.sendDebug("Sending configuration form (%s)"
@@ -612,27 +633,109 @@
self.sendError("Failed to configure node (%s)" % (nodeName,), iq)
+ def requestRoster(self):
+ self.roster = {}
+ rosterIq = IQ(self.xmlStream, type='get')
+ rosterIq.addElement("query", "jabber:iq:roster")
+ rosterIq.addCallback(self.handleRoster)
+ rosterIq.send()
+
+ def allowedInRoster(self, jid):
+ for pattern in self.settings.get("AllowedJIDs", []):
+ if fnmatch(jid, pattern):
+ return True
+ return False
+
+ def handleRoster(self, iq):
+ for child in iq.children[0].children:
+ jid = child['jid']
+ if self.allowedInRoster(jid):
+ self.log_info("In roster: %s" % (jid,))
+ if not self.roster.has_key(jid):
+ self.roster[jid] = { 'debug' : False, 'available' : False }
+ else:
+ self.log_info("JID not allowed in roster: %s" % (jid,))
+
+ def handlePresence(self, iq):
+ self.log_info("Presence IQ: %s" %
+ (iq.toXml().encode('ascii', 'replace')),)
+ presenceType = iq.getAttribute('type')
+
+ if presenceType == 'subscribe':
+ frm = JID(iq['from']).userhost()
+ if self.allowedInRoster(frm):
+ self.roster[frm] = { 'debug' : False, 'available' : True }
+ response = domish.Element(('jabber:client', 'presence'))
+ response['to'] = iq['from']
+ response['type'] = 'subscribed'
+ self.xmlStream.send(response)
+
+ # request subscription as well
+ subscribe = domish.Element(('jabber:client', 'presence'))
+ subscribe['to'] = iq['from']
+ subscribe['type'] = 'subscribe'
+ self.xmlStream.send(subscribe)
+ else:
+ self.log_info("JID not allowed in roster: %s" % (frm,))
+ # Reject
+ response = domish.Element(('jabber:client', 'presence'))
+ response['to'] = iq['from']
+ response['type'] = 'unsubscribed'
+ self.xmlStream.send(response)
+
+ elif presenceType == 'unsubscribe':
+ frm = JID(iq['from']).userhost()
+ if self.roster.has_key(frm):
+ del self.roster[frm]
+ response = domish.Element(('jabber:client', 'presence'))
+ response['to'] = iq['from']
+ response['type'] = 'unsubscribed'
+ self.xmlStream.send(response)
+
+ # remove from roster as well
+ removal = IQ(self.xmlStream, type='set')
+ query = removal.addElement("query", "jabber:iq:roster")
+ query.addElement("item")
+ query.item["jid"] = iq["from"]
+ query.item["subscription"] = "remove"
+ removal.send()
+
+ elif presenceType == 'unavailable':
+ frm = JID(iq['from']).userhost()
+ if self.roster.has_key(frm):
+ self.roster[frm]['available'] = False
+
+ else:
+ frm = JID(iq['from']).userhost()
+ if self.allowedInRoster(frm):
+ if self.roster.has_key(frm):
+ self.roster[frm]['available'] = True
+ else:
+ self.roster[frm] = { 'debug' : False, 'available' : True }
+ else:
+ self.log_info("JID not allowed in roster: %s" % (frm,))
+
def streamOpened(self, xmlStream):
self.xmlStream = xmlStream
xmlStream.addObserver('/message', self.handleMessage)
+ xmlStream.addObserver('/presence', self.handlePresence)
+ self.requestRoster()
+
def streamClosed(self):
self.xmlStream = None
def sendDebug(self, txt, element):
- if self.sendDebugMessages:
- testJid = self.settings.get("TestJID", "")
- if testJid:
- txt = "DEBUG: %s %s" % (txt, element.toXml().encode('ascii',
- 'replace'))
- self.sendAlert(testJid, txt)
+ txt = "DEBUG: %s %s" % (txt, element.toXml().encode('ascii', 'replace'))
+ for jid, info in self.roster.iteritems():
+ if info['available'] and info['debug']:
+ self.sendAlert(jid, txt)
def sendError(self, txt, element):
- testJid = self.settings.get("TestJID", "")
- if testJid:
- txt = "ERROR: %s %s" % (txt, element.toXml().encode('ascii',
- 'replace'))
- self.sendAlert(testJid, txt)
+ txt = "ERROR: %s %s" % (txt, element.toXml().encode('ascii', 'replace'))
+ for jid, info in self.roster.iteritems():
+ if info['available']:
+ self.sendAlert(jid, txt)
def sendAlert(self, jid, txt):
if self.xmlStream is not None:
@@ -645,17 +748,32 @@
body = getattr(iq, 'body', None)
if body:
response = None
- txt = str(body).lower()
- if txt == "help":
- response = "debug on, debug off"
- elif txt == "debug on":
- self.sendDebugMessages = True
- response = "Debugging on"
- elif txt == "debug off":
- self.sendDebugMessages = False
- response = "Debugging off"
+ frm = JID(iq['from']).userhost()
+ if frm in self.roster:
+ txt = str(body).lower()
+ if txt == "help":
+ response = "debug on, debug off, roster, hammer <count>"
+ elif txt == "roster":
+ response = "Roster: %s" % (str(self.roster),)
+ elif txt == "debug on":
+ self.roster[frm]['debug'] = True
+ response = "Debugging on"
+ elif txt == "debug off":
+ self.roster[frm]['debug'] = False
+ response = "Debugging off"
+ elif txt.startswith("hammer"):
+ try:
+ hammer, count = txt.split()
+ count = int(count)
+ except ValueError:
+ response = "Please phrase it like 'hammer 100'"
+ else:
+ response = "Hammer will commence now, %d times" % (count,)
+ self.reactor.callLater(1, self.hammer, count)
+ else:
+ response = "I don't understand. Try 'help'."
else:
- response = "I don't understand. Try 'help'."
+ response = "Sorry, you are not authorized to converse with this server"
if response:
message = domish.Element(('jabber:client', 'message'))
@@ -664,6 +782,9 @@
self.xmlStream.send(message)
+ def hammer(self, count):
+ for i in xrange(count):
+ self.enqueue("hammertesting%d" % (count,))
class XMPPNotificationFactory(xmlstream.XmlStreamFactory, LoggingMixIn):
Modified: CalendarServer/trunk/twistedcaldav/test/test_notify.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/test/test_notify.py 2008-08-12 14:51:27 UTC (rev 2806)
+++ CalendarServer/trunk/twistedcaldav/test/test_notify.py 2008-08-12 19:45:36 UTC (rev 2807)
@@ -341,7 +341,7 @@
def test_sendWhileConnected(self):
self.notifier.enqueue("/principals/__uids__/test")
- iq = self.xmlStream.elements[0]
+ iq = self.xmlStream.elements[1]
self.assertEquals(iq.name, "iq")
pubsubElement = list(iq.elements())[0]
@@ -358,21 +358,21 @@
notifier = XMPPNotifier(self.settings, reactor=Clock(),
configOverride=self.xmppDisabledConfig)
notifier.enqueue("/principals/__uids__/test")
- self.assertEquals(self.xmlStream.elements, [])
+ self.assertEquals(len(self.xmlStream.elements), 1)
def test_publishNewNode(self):
self.notifier.publishNode("testNodeName")
- iq = self.xmlStream.elements[0]
+ iq = self.xmlStream.elements[1]
self.assertEquals(iq.name, "iq")
def test_publishReponse400(self):
response = IQ(self.xmlStream, type='error')
errorElement = response.addElement('error')
errorElement['code'] = '400'
- self.assertEquals(len(self.xmlStream.elements), 0)
+ self.assertEquals(len(self.xmlStream.elements), 1)
self.notifier.responseFromPublish("testNodeName", response)
- self.assertEquals(len(self.xmlStream.elements), 1)
- iq = self.xmlStream.elements[0]
+ self.assertEquals(len(self.xmlStream.elements), 2)
+ iq = self.xmlStream.elements[1]
self.assertEquals(iq.name, "iq")
self.assertEquals(iq['type'], "get")
@@ -389,10 +389,10 @@
response = IQ(self.xmlStream, type='error')
errorElement = response.addElement('error')
errorElement['code'] = '404'
- self.assertEquals(len(self.xmlStream.elements), 0)
+ self.assertEquals(len(self.xmlStream.elements), 1)
self.notifier.responseFromPublish("testNodeName", response)
- self.assertEquals(len(self.xmlStream.elements), 1)
- iq = self.xmlStream.elements[0]
+ self.assertEquals(len(self.xmlStream.elements), 2)
+ iq = self.xmlStream.elements[1]
self.assertEquals(iq.name, "iq")
self.assertEquals(iq['type'], "set")
@@ -419,25 +419,26 @@
formElement = configElement.addElement('x')
formElement['type'] = 'form'
fields = [
- ( "unknown", "don't edit me" ),
- ( "pubsub#deliver_payloads", "1" ),
- ( "pubsub#persist_items", "1" ),
+ ( "unknown", "don't edit me", "text-single" ),
+ ( "pubsub#deliver_payloads", "1", "boolean" ),
+ ( "pubsub#persist_items", "1", "boolean" ),
]
expectedFields = {
"unknown" : "don't edit me",
- "pubsub#deliver_payloads" : "0",
+ "pubsub#deliver_payloads" : "1",
"pubsub#persist_items" : "0",
}
for field in fields:
fieldElement = formElement.addElement("field")
fieldElement['var'] = field[0]
+ fieldElement['type'] = field[2]
fieldElement.addElement('value', content=field[1])
- self.assertEquals(len(self.xmlStream.elements), 0)
+ self.assertEquals(len(self.xmlStream.elements), 1)
self.notifier.responseFromConfigurationForm("testNodeName", response)
- self.assertEquals(len(self.xmlStream.elements), 1)
+ self.assertEquals(len(self.xmlStream.elements), 2)
- iq = self.xmlStream.elements[0]
+ iq = self.xmlStream.elements[1]
self.assertEquals(iq.name, "iq")
self.assertEquals(iq['type'], "set")
@@ -468,16 +469,18 @@
factory.connected(xmlStream)
factory.authenticated(xmlStream)
- self.assertEquals(len(xmlStream.elements), 1)
+ self.assertEquals(len(xmlStream.elements), 2)
presence = xmlStream.elements[0]
self.assertEquals(presence.name, 'presence')
+ iq = xmlStream.elements[1]
+ self.assertEquals(iq.name, 'iq')
clock.advance(5)
- self.assertEquals(len(xmlStream.elements), 2)
- presence = xmlStream.elements[1]
+ self.assertEquals(len(xmlStream.elements), 3)
+ presence = xmlStream.elements[2]
self.assertEquals(presence.name, 'presence')
factory.disconnected(xmlStream)
clock.advance(5)
- self.assertEquals(len(xmlStream.elements), 2)
+ self.assertEquals(len(xmlStream.elements), 3)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080812/44083013/attachment-0001.html
More information about the calendarserver-changes
mailing list