[CalendarServer-changes] [12768] CalendarServer/trunk/calendarserver/webadmin
source_changes at macosforge.org
source_changes at macosforge.org
Thu Feb 27 15:16:58 PST 2014
Revision: 12768
http://trac.calendarserver.org//changeset/12768
Author: wsanchez at apple.com
Date: 2014-02-27 15:16:58 -0800 (Thu, 27 Feb 2014)
Log Message:
-----------
Make EventStream.read() defer on new events.
Modified Paths:
--------------
CalendarServer/trunk/calendarserver/webadmin/eventsource.py
CalendarServer/trunk/calendarserver/webadmin/test/test_eventsource.py
Modified: CalendarServer/trunk/calendarserver/webadmin/eventsource.py
===================================================================
--- CalendarServer/trunk/calendarserver/webadmin/eventsource.py 2014-02-27 20:17:30 UTC (rev 12767)
+++ CalendarServer/trunk/calendarserver/webadmin/eventsource.py 2014-02-27 23:16:58 UTC (rev 12768)
@@ -30,7 +30,7 @@
from zope.interface import implementer, Interface
-from twisted.internet.defer import succeed
+from twisted.internet.defer import Deferred, succeed
from txweb2.stream import IByteStream, fallbackSplit
from txweb2.resource import Resource
@@ -132,12 +132,17 @@
self._eventDecoder = eventDecoder
self._events = deque(maxlen=bufferSize)
+ self._streams = set()
- def addEvent(self, event):
- self._events.append(event)
+ def addEvents(self, events):
+ self._events.extend(events)
+ # Notify outbound streams that there is new data to vend
+ for stream in self._streams:
+ stream.didAddEvents()
+
def render(self, request):
lastID = request.headers.getRawHeaders(u"last-event-id")
@@ -146,6 +151,13 @@
response.headers.setHeader(
b"content-type", MimeType.fromString(b"text/event-stream")
)
+
+ # Keep track of the event streams
+ request.addResponseFilter(
+ lambda r: self._streams.remove(response.stream)
+ )
+ self._streams.add(response.stream)
+
return response
@@ -178,11 +190,19 @@
self._events = events
self._lastID = lastID
self._closed = False
+ self._deferredRead = None
+ def didAddEvents(self):
+ d = self._deferredRead
+ if d is not None:
+ d.addCallback(lambda _: self.read())
+ d.callback(None)
+
+
def read(self):
if self._closed:
- return None
+ return succeed(None)
lastID = self._lastID
eventID = None
@@ -209,7 +229,7 @@
self._lastID = eventID
return succeed(
- textAsEvent(eventText, eventID, eventClass).encode("utf-8")
+ textAsEvent(eventText, eventID, eventClass)
)
@@ -218,19 +238,17 @@
# client saw.
self._lastID = None
- return succeed("")
+ return succeed(b"")
- return succeed(None)
+ d = Deferred()
+ self._deferredRead = d
+ return d
- # from twisted.internet.task import deferLater
- # from twisted.internet import reactor
- # return deferLater(reactor, 1.0, self.read)
-
-
def split(self, point):
return fallbackSplit(self, point)
def close(self):
+ print("************ CLOSE ************")
self._closed = True
Modified: CalendarServer/trunk/calendarserver/webadmin/test/test_eventsource.py
===================================================================
--- CalendarServer/trunk/calendarserver/webadmin/test/test_eventsource.py 2014-02-27 20:17:30 UTC (rev 12767)
+++ CalendarServer/trunk/calendarserver/webadmin/test/test_eventsource.py 2014-02-27 23:16:58 UTC (rev 12768)
@@ -23,7 +23,7 @@
from twisted.internet.defer import inlineCallbacks
from twisted.trial.unittest import TestCase
-from txweb2.http import Request
+from txweb2.server import Request
from txweb2.http_headers import Headers
from ..eventsource import textAsEvent, EventSourceResource
@@ -118,15 +118,10 @@
Tests for L{EventSourceResource}.
"""
- def eventSourceResource(self, events=()):
- resource = EventSourceResource(DictionaryEventDecoder)
+ def eventSourceResource(self):
+ return EventSourceResource(DictionaryEventDecoder)
- for event in events:
- resource.addEvent(event)
- return resource
-
-
def render(self, resource):
headers = Headers()
@@ -190,11 +185,13 @@
dict(eventID=u"4", eventText=u"D"),
)
- resource = self.eventSourceResource(events)
+ resource = self.eventSourceResource()
+ resource.addEvents(events)
+
response = self.render(resource)
# Each result from read() is another event
- for i in range(4):
+ for i in range(len(events)):
result = yield response.stream.read()
self.assertEquals(
result,
@@ -204,11 +201,68 @@
)
)
- result = yield response.stream.read()
- self.assertEquals(result, None)
+ # The next read should block on new events.
+ d = response.stream.read()
+ self.assertFalse(d.called)
+ d.addErrback(lambda f: None)
+ d.cancel()
+ @inlineCallbacks
+ def test_streamNewEvents(self):
+ """
+ Events not already buffered are vended after they are posted.
+ """
+ events = (
+ dict(eventID=u"1", eventText=u"A"),
+ dict(eventID=u"2", eventText=u"B"),
+ dict(eventID=u"3", eventText=u"C"),
+ dict(eventID=u"4", eventText=u"D"),
+ )
+
+ resource = self.eventSourceResource()
+
+ response = self.render(resource)
+
+ # The first read should block on new events.
+ d = response.stream.read()
+ self.assertFalse(d.called)
+
+ # Add some events
+ resource.addEvents(events)
+
+ # We should now be unblocked
+ self.assertTrue(d.called)
+
+ # Each result from read() is another event
+ for i in range(len(events)):
+ if d is None:
+ result = yield response.stream.read()
+ else:
+ result = yield d
+ d = None
+
+ self.assertEquals(
+ result,
+ textAsEvent(
+ text=events[i]["eventText"],
+ eventID=(events[i]["eventID"])
+ )
+ )
+
+ # The next read should block on new events.
+ d = response.stream.read()
+ self.assertFalse(d.called)
+
+ d.addErrback(lambda f: None)
+ d.cancel()
+
+
+
+ # Test closed
+
+
class DictionaryEventDecoder(object):
"""
Decodes events represented as dictionaries.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140227/2fbc99b5/attachment-0001.html>
More information about the calendarserver-changes
mailing list