[CalendarServer-changes] [2807] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Tue Aug 12 12:45:36 PDT 2008


Revision: 2807
          http://trac.macosforge.org/projects/calendarserver/changeset/2807
Author:   sagen at apple.com
Date:     2008-08-12 12:45:36 -0700 (Tue, 12 Aug 2008)
Log Message:
-----------
Landing my xmpp-2764 branch: adds xmpp auto-subscription (for interacting with server), starvation prevention, and sends (empty) plistfrag payloads.

Modified Paths:
--------------
    CalendarServer/trunk/conf/caldavd-test.plist
    CalendarServer/trunk/twistedcaldav/config.py
    CalendarServer/trunk/twistedcaldav/notify.py
    CalendarServer/trunk/twistedcaldav/test/test_notify.py

Modified: CalendarServer/trunk/conf/caldavd-test.plist
===================================================================
--- CalendarServer/trunk/conf/caldavd-test.plist	2008-08-12 14:51:27 UTC (rev 2806)
+++ CalendarServer/trunk/conf/caldavd-test.plist	2008-08-12 19:45:36 UTC (rev 2807)
@@ -460,9 +460,13 @@
         <key>KeepAliveSeconds</key>
         <integer>120</integer>
 
-        <!-- Sends messages to this account for debugging -->
-        <key>TestJID</key>
-        <string></string>
+        <!-- List of glob-like expressions defining which XMPP JIDs can converse with the server -->
+        <key>AllowedJIDs</key>
+        <array>
+          <!--
+          <string>*.apple.com</string>
+          -->
+        </array>
       </dict>
     </array>
   </dict>

Modified: CalendarServer/trunk/twistedcaldav/config.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/config.py	2008-08-12 14:51:27 UTC (rev 2806)
+++ CalendarServer/trunk/twistedcaldav/config.py	2008-08-12 19:45:36 UTC (rev 2807)
@@ -200,7 +200,7 @@
                 "Password" : "",
                 "ServiceAddress" : "", # "pubsub.xmpp.host.name"
                 "KeepAliveSeconds" : 120,
-                "TestJID": "",
+                "AllowedJIDs": [],
             },
         ]
     },
@@ -483,7 +483,7 @@
                 service["Enabled"]
             ):
                 for key, value in service.iteritems():
-                    if not value and key not in ("TestJID"):
+                    if not value and key not in ("AllowedJIDs"):
                         raise ConfigurationError("Invalid %s for XMPPNotifierService: %r"
                                                  % (key, value))
 

Modified: CalendarServer/trunk/twistedcaldav/notify.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/notify.py	2008-08-12 14:51:27 UTC (rev 2806)
+++ CalendarServer/trunk/twistedcaldav/notify.py	2008-08-12 19:45:36 UTC (rev 2807)
@@ -49,6 +49,7 @@
 from twistedcaldav.log import LoggingMixIn
 from twistedcaldav.config import config, parseConfig, defaultConfig
 from zope.interface import Interface, implements
+from fnmatch import fnmatch
 
 __all__ = [
     "Coalescer",
@@ -179,7 +180,7 @@
                 self.log_debug("Sending to notification server: %s" % (uri,))
                 observer.sendLine(str(uri))
         else:
-            self.log_debug("Queing: %s" % (uri,))
+            self.log_debug("Queuing: %s" % (uri,))
             self.queued.add(uri)
 
     def connectionMade(self):
@@ -257,9 +258,14 @@
     """
 
     delaySeconds = 5
+    sendAnywayAfterCount = 5
 
-    def __init__(self, notifiers, reactor=None, delaySeconds=None):
+    def __init__(self, notifiers, reactor=None, delaySeconds=None,
+        sendAnywayAfterCount=None):
 
+        if sendAnywayAfterCount:
+            self.sendAnywayAfterCount = sendAnywayAfterCount
+
         if delaySeconds:
             self.delaySeconds = delaySeconds
 
@@ -267,18 +273,29 @@
             from twisted.internet import reactor
         self.reactor = reactor
 
-        self.uris = dict()
+        self.uris = {}
         self.notifiers = notifiers
 
     def add(self, uri):
-        delayed = self.uris.get(uri, None)
+        delayed, count = self.uris.get(uri, [None, 0])
+
         if delayed and delayed.active():
-            delayed.reset(self.delaySeconds)
+            count += 1
+            if count < self.sendAnywayAfterCount:
+                # reschedule for delaySeconds in the future
+                delayed.reset(self.delaySeconds)
+                self.uris[uri][1] = count
+                self.log_info("Delaying: %s" % (uri,))
+            else:
+                self.log_info("Not delaying to avoid starvation: %s" % (uri,))
         else:
-            self.uris[uri] = self.reactor.callLater(self.delaySeconds,
-                self.delayedEnqueue, uri)
+            self.log_info("Scheduling: %s" % (uri,))
+            self.uris[uri] = [self.reactor.callLater(self.delaySeconds,
+                self.delayedEnqueue, uri), 0]
 
     def delayedEnqueue(self, uri):
+        self.log_info("Time to send: %s" % (uri,))
+        self.uris[uri][1] = 0
         for notifier in self.notifiers:
             notifier.enqueue(uri)
 
@@ -467,7 +484,7 @@
     pubsubNS = 'http://jabber.org/protocol/pubsub'
 
     nodeConf = {
-        'pubsub#deliver_payloads': '0',
+        'pubsub#deliver_payloads': '1',
         'pubsub#persist_items'   : '0',
     }
 
@@ -479,7 +496,7 @@
         self.reactor = reactor
         self.config = configOverride or config
 
-        self.sendDebugMessages = False
+        self.roster = {}
 
     def enqueue(self, uri):
         if self.xmlStream is not None:
@@ -496,6 +513,10 @@
             pubsubElement = iq.addElement('pubsub', defaultUri=self.pubsubNS)
             publishElement = pubsubElement.addElement('publish')
             publishElement['node'] = nodeName
+            itemElement = publishElement.addElement('item')
+            payloadElement = itemElement.addElement('plistfrag',
+                defaultUri='plist-apple')
+            self.sendDebug("Publishing (%s)" % (nodeName,), iq)
             iq.addCallback(self.responseFromPublish, nodeName)
             iq.send(to=self.settings['ServiceAddress'])
 
@@ -588,7 +609,7 @@
                                         filledField['type'] = field['type']
                                         valueElement = filledField.addElement('value')
                                         valueElement.addContent(value)
-                                        filledForm.addChild(field)
+                                        # filledForm.addChild(field)
                         filledIq.addCallback(self.responseFromConfiguration,
                             nodeName)
                         self.sendDebug("Sending configuration form (%s)"
@@ -612,27 +633,109 @@
             self.sendError("Failed to configure node (%s)" % (nodeName,), iq)
 
 
+    def requestRoster(self):
+        self.roster = {}
+        rosterIq = IQ(self.xmlStream, type='get')
+        rosterIq.addElement("query", "jabber:iq:roster")
+        rosterIq.addCallback(self.handleRoster)
+        rosterIq.send()
+
+    def allowedInRoster(self, jid):
+        for pattern in self.settings.get("AllowedJIDs", []):
+            if fnmatch(jid, pattern):
+                return True
+        return False
+
+    def handleRoster(self, iq):
+        for child in iq.children[0].children:
+            jid = child['jid']
+            if self.allowedInRoster(jid):
+                self.log_info("In roster: %s" % (jid,))
+                if not self.roster.has_key(jid):
+                    self.roster[jid] = { 'debug' : False, 'available' : False }
+            else:
+                self.log_info("JID not allowed in roster: %s" % (jid,))
+
+    def handlePresence(self, iq):
+        self.log_info("Presence IQ: %s" %
+            (iq.toXml().encode('ascii', 'replace')),)
+        presenceType = iq.getAttribute('type')
+
+        if presenceType == 'subscribe':
+            frm = JID(iq['from']).userhost()
+            if self.allowedInRoster(frm):
+                self.roster[frm] = { 'debug' : False, 'available' : True }
+                response = domish.Element(('jabber:client', 'presence'))
+                response['to'] = iq['from']
+                response['type'] = 'subscribed'
+                self.xmlStream.send(response)
+
+                # request subscription as well
+                subscribe = domish.Element(('jabber:client', 'presence'))
+                subscribe['to'] = iq['from']
+                subscribe['type'] = 'subscribe'
+                self.xmlStream.send(subscribe)
+            else:
+                self.log_info("JID not allowed in roster: %s" % (frm,))
+                # Reject
+                response = domish.Element(('jabber:client', 'presence'))
+                response['to'] = iq['from']
+                response['type'] = 'unsubscribed'
+                self.xmlStream.send(response)
+
+        elif presenceType == 'unsubscribe':
+            frm = JID(iq['from']).userhost()
+            if self.roster.has_key(frm):
+                del self.roster[frm]
+            response = domish.Element(('jabber:client', 'presence'))
+            response['to'] = iq['from']
+            response['type'] = 'unsubscribed'
+            self.xmlStream.send(response)
+
+            # remove from roster as well
+            removal = IQ(self.xmlStream, type='set')
+            query = removal.addElement("query", "jabber:iq:roster")
+            query.addElement("item")
+            query.item["jid"] = iq["from"]
+            query.item["subscription"] = "remove"
+            removal.send()
+
+        elif presenceType == 'unavailable':
+            frm = JID(iq['from']).userhost()
+            if self.roster.has_key(frm):
+                self.roster[frm]['available'] = False
+
+        else:
+            frm = JID(iq['from']).userhost()
+            if self.allowedInRoster(frm):
+                if self.roster.has_key(frm):
+                    self.roster[frm]['available'] = True
+                else:
+                    self.roster[frm] = { 'debug' : False, 'available' : True }
+            else:
+                self.log_info("JID not allowed in roster: %s" % (frm,))
+
     def streamOpened(self, xmlStream):
         self.xmlStream = xmlStream
         xmlStream.addObserver('/message', self.handleMessage)
+        xmlStream.addObserver('/presence', self.handlePresence)
+        self.requestRoster()
 
+
     def streamClosed(self):
         self.xmlStream = None
 
     def sendDebug(self, txt, element):
-        if self.sendDebugMessages:
-            testJid = self.settings.get("TestJID", "")
-            if testJid:
-                txt = "DEBUG: %s %s" % (txt, element.toXml().encode('ascii',
-                    'replace'))
-                self.sendAlert(testJid, txt)
+        txt = "DEBUG: %s %s" % (txt, element.toXml().encode('ascii', 'replace'))
+        for jid, info in self.roster.iteritems():
+            if info['available'] and info['debug']:
+                self.sendAlert(jid, txt)
 
     def sendError(self, txt, element):
-        testJid = self.settings.get("TestJID", "")
-        if testJid:
-            txt = "ERROR: %s %s" % (txt, element.toXml().encode('ascii',
-                'replace'))
-            self.sendAlert(testJid, txt)
+        txt = "ERROR: %s %s" % (txt, element.toXml().encode('ascii', 'replace'))
+        for jid, info in self.roster.iteritems():
+            if info['available']:
+                self.sendAlert(jid, txt)
 
     def sendAlert(self, jid, txt):
         if self.xmlStream is not None:
@@ -645,17 +748,32 @@
         body = getattr(iq, 'body', None)
         if body:
             response = None
-            txt = str(body).lower()
-            if txt == "help":
-                response = "debug on, debug off"
-            elif txt == "debug on":
-                self.sendDebugMessages = True
-                response = "Debugging on"
-            elif txt == "debug off":
-                self.sendDebugMessages = False
-                response = "Debugging off"
+            frm = JID(iq['from']).userhost()
+            if frm in self.roster:
+                txt = str(body).lower()
+                if txt == "help":
+                    response = "debug on, debug off, roster, hammer <count>"
+                elif txt == "roster":
+                    response = "Roster: %s" % (str(self.roster),)
+                elif txt == "debug on":
+                    self.roster[frm]['debug'] = True
+                    response = "Debugging on"
+                elif txt == "debug off":
+                    self.roster[frm]['debug'] = False
+                    response = "Debugging off"
+                elif txt.startswith("hammer"):
+                    try:
+                        hammer, count = txt.split()
+                        count = int(count)
+                    except ValueError:
+                        response = "Please phrase it like 'hammer 100'"
+                    else:
+                        response = "Hammer will commence now, %d times" % (count,)
+                        self.reactor.callLater(1, self.hammer, count)
+                else:
+                    response = "I don't understand.  Try 'help'."
             else:
-                response = "I don't understand.  Try 'help'."
+                response = "Sorry, you are not authorized to converse with this server"
 
             if response:
                 message = domish.Element(('jabber:client', 'message'))
@@ -664,6 +782,9 @@
                 self.xmlStream.send(message)
 
 
+    def hammer(self, count):
+        for i in xrange(count):
+            self.enqueue("hammertesting%d" % (count,))
 
 
 class XMPPNotificationFactory(xmlstream.XmlStreamFactory, LoggingMixIn):

Modified: CalendarServer/trunk/twistedcaldav/test/test_notify.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/test/test_notify.py	2008-08-12 14:51:27 UTC (rev 2806)
+++ CalendarServer/trunk/twistedcaldav/test/test_notify.py	2008-08-12 19:45:36 UTC (rev 2807)
@@ -341,7 +341,7 @@
     def test_sendWhileConnected(self):
         self.notifier.enqueue("/principals/__uids__/test")
 
-        iq = self.xmlStream.elements[0]
+        iq = self.xmlStream.elements[1]
         self.assertEquals(iq.name, "iq")
 
         pubsubElement = list(iq.elements())[0]
@@ -358,21 +358,21 @@
         notifier = XMPPNotifier(self.settings, reactor=Clock(),
             configOverride=self.xmppDisabledConfig)
         notifier.enqueue("/principals/__uids__/test")
-        self.assertEquals(self.xmlStream.elements, [])
+        self.assertEquals(len(self.xmlStream.elements), 1)
 
     def test_publishNewNode(self):
         self.notifier.publishNode("testNodeName")
-        iq = self.xmlStream.elements[0]
+        iq = self.xmlStream.elements[1]
         self.assertEquals(iq.name, "iq")
 
     def test_publishReponse400(self):
         response = IQ(self.xmlStream, type='error')
         errorElement = response.addElement('error')
         errorElement['code'] = '400'
-        self.assertEquals(len(self.xmlStream.elements), 0)
+        self.assertEquals(len(self.xmlStream.elements), 1)
         self.notifier.responseFromPublish("testNodeName", response)
-        self.assertEquals(len(self.xmlStream.elements), 1)
-        iq = self.xmlStream.elements[0]
+        self.assertEquals(len(self.xmlStream.elements), 2)
+        iq = self.xmlStream.elements[1]
         self.assertEquals(iq.name, "iq")
         self.assertEquals(iq['type'], "get")
 
@@ -389,10 +389,10 @@
         response = IQ(self.xmlStream, type='error')
         errorElement = response.addElement('error')
         errorElement['code'] = '404'
-        self.assertEquals(len(self.xmlStream.elements), 0)
+        self.assertEquals(len(self.xmlStream.elements), 1)
         self.notifier.responseFromPublish("testNodeName", response)
-        self.assertEquals(len(self.xmlStream.elements), 1)
-        iq = self.xmlStream.elements[0]
+        self.assertEquals(len(self.xmlStream.elements), 2)
+        iq = self.xmlStream.elements[1]
         self.assertEquals(iq.name, "iq")
         self.assertEquals(iq['type'], "set")
 
@@ -419,25 +419,26 @@
         formElement = configElement.addElement('x')
         formElement['type'] = 'form'
         fields = [
-            ( "unknown", "don't edit me" ),
-            ( "pubsub#deliver_payloads", "1" ),
-            ( "pubsub#persist_items", "1" ),
+            ( "unknown", "don't edit me", "text-single" ),
+            ( "pubsub#deliver_payloads", "1", "boolean" ),
+            ( "pubsub#persist_items", "1", "boolean" ),
         ]
         expectedFields = {
             "unknown" : "don't edit me",
-            "pubsub#deliver_payloads" : "0",
+            "pubsub#deliver_payloads" : "1",
             "pubsub#persist_items" : "0",
         }
         for field in fields:
             fieldElement = formElement.addElement("field")
             fieldElement['var'] = field[0]
+            fieldElement['type'] = field[2]
             fieldElement.addElement('value', content=field[1])
 
-        self.assertEquals(len(self.xmlStream.elements), 0)
+        self.assertEquals(len(self.xmlStream.elements), 1)
         self.notifier.responseFromConfigurationForm("testNodeName", response)
-        self.assertEquals(len(self.xmlStream.elements), 1)
+        self.assertEquals(len(self.xmlStream.elements), 2)
 
-        iq = self.xmlStream.elements[0]
+        iq = self.xmlStream.elements[1]
         self.assertEquals(iq.name, "iq")
         self.assertEquals(iq['type'], "set")
 
@@ -468,16 +469,18 @@
         factory.connected(xmlStream)
         factory.authenticated(xmlStream)
 
-        self.assertEquals(len(xmlStream.elements), 1)
+        self.assertEquals(len(xmlStream.elements), 2)
         presence = xmlStream.elements[0]
         self.assertEquals(presence.name, 'presence')
+        iq = xmlStream.elements[1]
+        self.assertEquals(iq.name, 'iq')
 
         clock.advance(5)
 
-        self.assertEquals(len(xmlStream.elements), 2)
-        presence = xmlStream.elements[1]
+        self.assertEquals(len(xmlStream.elements), 3)
+        presence = xmlStream.elements[2]
         self.assertEquals(presence.name, 'presence')
 
         factory.disconnected(xmlStream)
         clock.advance(5)
-        self.assertEquals(len(xmlStream.elements), 2)
+        self.assertEquals(len(xmlStream.elements), 3)
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20080812/44083013/attachment-0001.html 


More information about the calendarserver-changes mailing list