Revision: 12767 http://trac.calendarserver.org//changeset/12767 Author: wsanchez@apple.com Date: 2014-02-27 12:17:30 -0800 (Thu, 27 Feb 2014) Log Message: ----------- Start tests. Modified Paths: -------------- CalendarServer/trunk/calendarserver/webadmin/eventsource.py Added Paths: ----------- CalendarServer/trunk/calendarserver/webadmin/test/test_eventsource.py Modified: CalendarServer/trunk/calendarserver/webadmin/eventsource.py =================================================================== --- CalendarServer/trunk/calendarserver/webadmin/eventsource.py 2014-02-27 18:31:01 UTC (rev 12766) +++ CalendarServer/trunk/calendarserver/webadmin/eventsource.py 2014-02-27 20:17:30 UTC (rev 12767) @@ -46,6 +46,11 @@ binary data should be encoded as text if it is to be used in an EventSource stream. + UTF-8 encoded L{bytes} are returned because + http://www.w3.org/TR/eventsource/ states that the only allowed encoding + for C{text/event-stream} is UTF-8. + + @param text: The text (ie. the message) to send in the event. @type text: L{unicode} @@ -56,8 +61,11 @@ @type eventClass: L{unicode} @return: An HTML5 EventSource event as text. - @rtype: L{unicode} + @rtype: UTF-8 encoded L{bytes} """ + if text is None: + raise TypeError("text may not be None") + event = [] if eventID is not None: @@ -70,7 +78,7 @@ u"data: {0}".format(l) for l in text.split("\n") ) - return u"\n".join(event) + u"\n\n" + return (u"\n".join(event) + u"\n\n").encode("utf-8") @@ -122,9 +130,14 @@ """ Resource.__init__(self) + self._eventDecoder = eventDecoder self._events = deque(maxlen=bufferSize) + def addEvent(self, event): + self._events.append(event) + + def render(self, request): lastID = request.headers.getRawHeaders(u"last-event-id") @@ -167,10 +180,6 @@ self._closed = False - def addEvent(self, event): - self._events.append(event) - - def read(self): if self._closed: return None @@ -178,9 +187,9 @@ lastID = self._lastID eventID = None - idForEvent = self.eventDecoder.idForEvent - classForEvent = self.eventDecoder.classForEvent - textForEvent = self.eventDecoder.textForEvent + idForEvent = self._eventDecoder.idForEvent + classForEvent = self._eventDecoder.classForEvent + textForEvent = self._eventDecoder.textForEvent for event in self._events: eventID = idForEvent(event) @@ -197,6 +206,8 @@ eventClass = classForEvent(event) eventText = textForEvent(event) + self._lastID = eventID + return succeed( textAsEvent(eventText, eventID, eventClass).encode("utf-8") ) Added: CalendarServer/trunk/calendarserver/webadmin/test/test_eventsource.py =================================================================== --- CalendarServer/trunk/calendarserver/webadmin/test/test_eventsource.py (rev 0) +++ CalendarServer/trunk/calendarserver/webadmin/test/test_eventsource.py 2014-02-27 20:17:30 UTC (rev 12767) @@ -0,0 +1,229 @@ +## +# Copyright (c) 2011-2014 Apple Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +## + +""" +Tests for L{calendarserver.webadmin.eventsource}. +""" + +from __future__ import print_function + +from twisted.internet.defer import inlineCallbacks +from twisted.trial.unittest import TestCase + +from txweb2.http import Request +from txweb2.http_headers import Headers + +from ..eventsource import textAsEvent, EventSourceResource + + + +class EventGenerationTests(TestCase): + """ + Tests for emitting HTML5 EventSource events. + """ + + def test_textAsEvent(self): + """ + Generate an event from some text. + """ + self.assertEquals( + textAsEvent(u"Hello, World!"), + b"data: Hello, World!\n\n" + ) + + + def test_textAsEvent_newlines(self): + """ + Text with newlines generates multiple C{data:} lines in event. + """ + self.assertEquals( + textAsEvent(u"AAPL\nApple Inc.\n527.10"), + b"data: AAPL\ndata: Apple Inc.\ndata: 527.10\n\n" + ) + + + def test_textAsEvent_newlineTrailing(self): + """ + Text with trailing newline generates trailing blank C{data:} line. + """ + self.assertEquals( + textAsEvent(u"AAPL\nApple Inc.\n527.10\n"), + b"data: AAPL\ndata: Apple Inc.\ndata: 527.10\ndata: \n\n" + ) + + + def test_textAsEvent_encoding(self): + """ + Event text is encoded as UTF-8. + """ + self.assertEquals( + textAsEvent(u"S\xe1nchez"), + b"data: S\xc3\xa1nchez\n\n" + ) + + + def test_textAsEvent_emptyText(self): + """ + Text may not be L{None}. + """ + self.assertEquals( + textAsEvent(u""), + b"data: \n\n" # Note that b"data\n\n" is a valid result here also + ) + + + def test_textAsEvent_NoneText(self): + """ + Text may not be L{None}. + """ + self.assertRaises(TypeError, textAsEvent, None) + + + def test_textAsEvent_eventID(self): + """ + Event ID goes into the C{id} field. + """ + self.assertEquals( + textAsEvent(u"", eventID=u"1234"), + b"id: 1234\ndata: \n\n" # Note order is allowed to vary + ) + + + def test_textAsEvent_eventClass(self): + """ + Event class goes into the C{event} field. + """ + self.assertEquals( + textAsEvent(u"", eventClass=u"buckets"), + b"event: buckets\ndata: \n\n" # Note order is allowed to vary + ) + + + +class EventSourceResourceTests(TestCase): + """ + Tests for L{EventSourceResource}. + """ + + def eventSourceResource(self, events=()): + resource = EventSourceResource(DictionaryEventDecoder) + + for event in events: + resource.addEvent(event) + + return resource + + + def render(self, resource): + headers = Headers() + + request = Request( + chanRequest=None, + command=None, + path="/", + version=None, + contentLength=None, + headers=headers + ) + + return resource.render(request) + + + def test_status(self): + """ + Response status is C{200}. + """ + resource = self.eventSourceResource() + response = self.render(resource) + + self.assertEquals(response.code, 200) + + + def test_contentType(self): + """ + Response content type is C{text/event-stream}. + """ + resource = self.eventSourceResource() + response = self.render(resource) + + self.assertEquals( + response.headers.getRawHeaders(b"Content-Type"), + ["text/event-stream"] + ) + + + def test_contentLength(self): + """ + Response content length is not provided. + """ + resource = self.eventSourceResource() + response = self.render(resource) + + self.assertEquals( + response.headers.getRawHeaders(b"Content-Length"), + None + ) + + + @inlineCallbacks + def test_streamBufferedEvents(self): + """ + Events already buffered are vended immediately, then we get EOF. + """ + events = ( + dict(eventID=u"1", eventText=u"A"), + dict(eventID=u"2", eventText=u"B"), + dict(eventID=u"3", eventText=u"C"), + dict(eventID=u"4", eventText=u"D"), + ) + + resource = self.eventSourceResource(events) + response = self.render(resource) + + # Each result from read() is another event + for i in range(4): + result = yield response.stream.read() + self.assertEquals( + result, + textAsEvent( + text=events[i]["eventText"], + eventID=events[i]["eventID"] + ) + ) + + result = yield response.stream.read() + self.assertEquals(result, None) + + + +class DictionaryEventDecoder(object): + """ + Decodes events represented as dictionaries. + """ + + @staticmethod + def idForEvent(event): + return event.get("eventID") + + + @staticmethod + def classForEvent(event): + return event.get("eventClass") + + + @staticmethod + def textForEvent(event): + return event.get("eventText")
participants (1)
-
source_changes@macosforge.org