Revision: 12768 http://trac.calendarserver.org//changeset/12768 Author: wsanchez@apple.com Date: 2014-02-27 15:16:58 -0800 (Thu, 27 Feb 2014) Log Message: ----------- Make EventStream.read() defer on new events. Modified Paths: -------------- CalendarServer/trunk/calendarserver/webadmin/eventsource.py CalendarServer/trunk/calendarserver/webadmin/test/test_eventsource.py Modified: CalendarServer/trunk/calendarserver/webadmin/eventsource.py =================================================================== --- CalendarServer/trunk/calendarserver/webadmin/eventsource.py 2014-02-27 20:17:30 UTC (rev 12767) +++ CalendarServer/trunk/calendarserver/webadmin/eventsource.py 2014-02-27 23:16:58 UTC (rev 12768) @@ -30,7 +30,7 @@ from zope.interface import implementer, Interface -from twisted.internet.defer import succeed +from twisted.internet.defer import Deferred, succeed from txweb2.stream import IByteStream, fallbackSplit from txweb2.resource import Resource @@ -132,12 +132,17 @@ self._eventDecoder = eventDecoder self._events = deque(maxlen=bufferSize) + self._streams = set() - def addEvent(self, event): - self._events.append(event) + def addEvents(self, events): + self._events.extend(events) + # Notify outbound streams that there is new data to vend + for stream in self._streams: + stream.didAddEvents() + def render(self, request): lastID = request.headers.getRawHeaders(u"last-event-id") @@ -146,6 +151,13 @@ response.headers.setHeader( b"content-type", MimeType.fromString(b"text/event-stream") ) + + # Keep track of the event streams + request.addResponseFilter( + lambda r: self._streams.remove(response.stream) + ) + self._streams.add(response.stream) + return response @@ -178,11 +190,19 @@ self._events = events self._lastID = lastID self._closed = False + self._deferredRead = None + def didAddEvents(self): + d = self._deferredRead + if d is not None: + d.addCallback(lambda _: self.read()) + d.callback(None) + + def read(self): if self._closed: - return None + return succeed(None) lastID = self._lastID eventID = None @@ -209,7 +229,7 @@ self._lastID = eventID return succeed( - textAsEvent(eventText, eventID, eventClass).encode("utf-8") + textAsEvent(eventText, eventID, eventClass) ) @@ -218,19 +238,17 @@ # client saw. self._lastID = None - return succeed("") + return succeed(b"") - return succeed(None) + d = Deferred() + self._deferredRead = d + return d - # from twisted.internet.task import deferLater - # from twisted.internet import reactor - # return deferLater(reactor, 1.0, self.read) - - def split(self, point): return fallbackSplit(self, point) def close(self): + print("************ CLOSE ************") self._closed = True Modified: CalendarServer/trunk/calendarserver/webadmin/test/test_eventsource.py =================================================================== --- CalendarServer/trunk/calendarserver/webadmin/test/test_eventsource.py 2014-02-27 20:17:30 UTC (rev 12767) +++ CalendarServer/trunk/calendarserver/webadmin/test/test_eventsource.py 2014-02-27 23:16:58 UTC (rev 12768) @@ -23,7 +23,7 @@ from twisted.internet.defer import inlineCallbacks from twisted.trial.unittest import TestCase -from txweb2.http import Request +from txweb2.server import Request from txweb2.http_headers import Headers from ..eventsource import textAsEvent, EventSourceResource @@ -118,15 +118,10 @@ Tests for L{EventSourceResource}. """ - def eventSourceResource(self, events=()): - resource = EventSourceResource(DictionaryEventDecoder) + def eventSourceResource(self): + return EventSourceResource(DictionaryEventDecoder) - for event in events: - resource.addEvent(event) - return resource - - def render(self, resource): headers = Headers() @@ -190,11 +185,13 @@ dict(eventID=u"4", eventText=u"D"), ) - resource = self.eventSourceResource(events) + resource = self.eventSourceResource() + resource.addEvents(events) + response = self.render(resource) # Each result from read() is another event - for i in range(4): + for i in range(len(events)): result = yield response.stream.read() self.assertEquals( result, @@ -204,11 +201,68 @@ ) ) - result = yield response.stream.read() - self.assertEquals(result, None) + # The next read should block on new events. + d = response.stream.read() + self.assertFalse(d.called) + d.addErrback(lambda f: None) + d.cancel() + @inlineCallbacks + def test_streamNewEvents(self): + """ + Events not already buffered are vended after they are posted. + """ + events = ( + dict(eventID=u"1", eventText=u"A"), + dict(eventID=u"2", eventText=u"B"), + dict(eventID=u"3", eventText=u"C"), + dict(eventID=u"4", eventText=u"D"), + ) + + resource = self.eventSourceResource() + + response = self.render(resource) + + # The first read should block on new events. + d = response.stream.read() + self.assertFalse(d.called) + + # Add some events + resource.addEvents(events) + + # We should now be unblocked + self.assertTrue(d.called) + + # Each result from read() is another event + for i in range(len(events)): + if d is None: + result = yield response.stream.read() + else: + result = yield d + d = None + + self.assertEquals( + result, + textAsEvent( + text=events[i]["eventText"], + eventID=(events[i]["eventID"]) + ) + ) + + # The next read should block on new events. + d = response.stream.read() + self.assertFalse(d.called) + + d.addErrback(lambda f: None) + d.cancel() + + + + # Test closed + + class DictionaryEventDecoder(object): """ Decodes events represented as dictionaries.