[CalendarServer-changes] [12768] CalendarServer/trunk/calendarserver/webadmin

source_changes at macosforge.org source_changes at macosforge.org
Thu Feb 27 15:16:58 PST 2014


Revision: 12768
          http://trac.calendarserver.org//changeset/12768
Author:   wsanchez at apple.com
Date:     2014-02-27 15:16:58 -0800 (Thu, 27 Feb 2014)
Log Message:
-----------
Make EventStream.read() defer on new events.

Modified Paths:
--------------
    CalendarServer/trunk/calendarserver/webadmin/eventsource.py
    CalendarServer/trunk/calendarserver/webadmin/test/test_eventsource.py

Modified: CalendarServer/trunk/calendarserver/webadmin/eventsource.py
===================================================================
--- CalendarServer/trunk/calendarserver/webadmin/eventsource.py	2014-02-27 20:17:30 UTC (rev 12767)
+++ CalendarServer/trunk/calendarserver/webadmin/eventsource.py	2014-02-27 23:16:58 UTC (rev 12768)
@@ -30,7 +30,7 @@
 
 from zope.interface import implementer, Interface
 
-from twisted.internet.defer import succeed
+from twisted.internet.defer import Deferred, succeed
 
 from txweb2.stream import IByteStream, fallbackSplit
 from txweb2.resource import Resource
@@ -132,12 +132,17 @@
 
         self._eventDecoder = eventDecoder
         self._events = deque(maxlen=bufferSize)
+        self._streams = set()
 
 
-    def addEvent(self, event):
-        self._events.append(event)
+    def addEvents(self, events):
+        self._events.extend(events)
 
+        # Notify outbound streams that there is new data to vend
+        for stream in self._streams:
+            stream.didAddEvents()
 
+
     def render(self, request):
         lastID = request.headers.getRawHeaders(u"last-event-id")
 
@@ -146,6 +151,13 @@
         response.headers.setHeader(
             b"content-type", MimeType.fromString(b"text/event-stream")
         )
+
+        # Keep track of the event streams
+        request.addResponseFilter(
+            lambda r: self._streams.remove(response.stream)
+        )
+        self._streams.add(response.stream)
+
         return response
 
 
@@ -178,11 +190,19 @@
         self._events = events
         self._lastID = lastID
         self._closed = False
+        self._deferredRead = None
 
 
+    def didAddEvents(self):
+        d = self._deferredRead
+        if d is not None:
+            d.addCallback(lambda _: self.read())
+            d.callback(None)
+
+
     def read(self):
         if self._closed:
-            return None
+            return succeed(None)
 
         lastID = self._lastID
         eventID = None
@@ -209,7 +229,7 @@
             self._lastID = eventID
 
             return succeed(
-                textAsEvent(eventText, eventID, eventClass).encode("utf-8")
+                textAsEvent(eventText, eventID, eventClass)
             )
 
 
@@ -218,19 +238,17 @@
             # client saw.
             self._lastID = None
 
-            return succeed("")
+            return succeed(b"")
 
-        return succeed(None)
+        d = Deferred()
+        self._deferredRead = d
+        return d
 
-        # from twisted.internet.task import deferLater
-        # from twisted.internet import reactor
 
-        # return deferLater(reactor, 1.0, self.read)
-
-
     def split(self, point):
         return fallbackSplit(self, point)
 
 
     def close(self):
+        print("************ CLOSE ************")
         self._closed = True

Modified: CalendarServer/trunk/calendarserver/webadmin/test/test_eventsource.py
===================================================================
--- CalendarServer/trunk/calendarserver/webadmin/test/test_eventsource.py	2014-02-27 20:17:30 UTC (rev 12767)
+++ CalendarServer/trunk/calendarserver/webadmin/test/test_eventsource.py	2014-02-27 23:16:58 UTC (rev 12768)
@@ -23,7 +23,7 @@
 from twisted.internet.defer import inlineCallbacks
 from twisted.trial.unittest import TestCase
 
-from txweb2.http import Request
+from txweb2.server import Request
 from txweb2.http_headers import Headers
 
 from ..eventsource import textAsEvent, EventSourceResource
@@ -118,15 +118,10 @@
     Tests for L{EventSourceResource}.
     """
 
-    def eventSourceResource(self, events=()):
-        resource = EventSourceResource(DictionaryEventDecoder)
+    def eventSourceResource(self):
+        return EventSourceResource(DictionaryEventDecoder)
 
-        for event in events:
-            resource.addEvent(event)
 
-        return resource
-
-
     def render(self, resource):
         headers = Headers()
 
@@ -190,11 +185,13 @@
             dict(eventID=u"4", eventText=u"D"),
         )
 
-        resource = self.eventSourceResource(events)
+        resource = self.eventSourceResource()
+        resource.addEvents(events)
+
         response = self.render(resource)
 
         # Each result from read() is another event
-        for i in range(4):
+        for i in range(len(events)):
             result = yield response.stream.read()
             self.assertEquals(
                 result,
@@ -204,11 +201,68 @@
                 )
             )
 
-        result = yield response.stream.read()
-        self.assertEquals(result, None)
+        # The next read should block on new events.
+        d = response.stream.read()
+        self.assertFalse(d.called)
 
+        d.addErrback(lambda f: None)
+        d.cancel()
 
 
+    @inlineCallbacks
+    def test_streamNewEvents(self):
+        """
+        Events not already buffered are vended after they are posted.
+        """
+        events = (
+            dict(eventID=u"1", eventText=u"A"),
+            dict(eventID=u"2", eventText=u"B"),
+            dict(eventID=u"3", eventText=u"C"),
+            dict(eventID=u"4", eventText=u"D"),
+        )
+
+        resource = self.eventSourceResource()
+
+        response = self.render(resource)
+
+        # The first read should block on new events.
+        d = response.stream.read()
+        self.assertFalse(d.called)
+
+        # Add some events
+        resource.addEvents(events)
+
+        # We should now be unblocked
+        self.assertTrue(d.called)
+
+        # Each result from read() is another event
+        for i in range(len(events)):
+            if d is None:
+                result = yield response.stream.read()
+            else:
+                result = yield d
+                d = None
+
+            self.assertEquals(
+                result,
+                textAsEvent(
+                    text=events[i]["eventText"],
+                    eventID=(events[i]["eventID"])
+                )
+            )
+
+        # The next read should block on new events.
+        d = response.stream.read()
+        self.assertFalse(d.called)
+
+        d.addErrback(lambda f: None)
+        d.cancel()
+
+
+
+    # Test closed
+
+
 class DictionaryEventDecoder(object):
     """
     Decodes events represented as dictionaries.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140227/2fbc99b5/attachment-0001.html>


More information about the calendarserver-changes mailing list