[CalendarServer-changes] [9899] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Fri Oct 5 15:52:17 PDT 2012


Revision: 9899
          http://trac.calendarserver.org//changeset/9899
Author:   glyph at apple.com
Date:     2012-10-05 15:52:17 -0700 (Fri, 05 Oct 2012)
Log Message:
-----------
Stream attachments to the client rather than reading them all into memory first.

Modified Paths:
--------------
    CalendarServer/trunk/twistedcaldav/storebridge.py
    CalendarServer/trunk/txdav/caldav/datastore/file.py
    CalendarServer/trunk/txdav/caldav/datastore/sql.py
    CalendarServer/trunk/txdav/caldav/datastore/test/common.py
    CalendarServer/trunk/txdav/caldav/datastore/util.py

Property Changed:
----------------
    CalendarServer/trunk/

Modified: CalendarServer/trunk/twistedcaldav/storebridge.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/storebridge.py	2012-10-05 21:40:57 UTC (rev 9898)
+++ CalendarServer/trunk/twistedcaldav/storebridge.py	2012-10-05 22:52:17 UTC (rev 9899)
@@ -1634,6 +1634,8 @@
 
         stream = ProducerStream()
         class StreamProtocol(Protocol):
+            def connectionMade(self):
+                stream.registerProducer(self.transport, False)
             def dataReceived(self, data):
                 stream.write(data)
             def connectionLost(self, reason):
@@ -1643,7 +1645,6 @@
         except IOError, e:
             log.error("Unable to read attachment: %s, due to: %s" % (self, e,))
             raise HTTPError(responsecode.NOT_FOUND)
-            
         return Response(OK, {"content-type":self.contentType()}, stream)
 
 

Modified: CalendarServer/trunk/txdav/caldav/datastore/file.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/file.py	2012-10-05 21:40:57 UTC (rev 9898)
+++ CalendarServer/trunk/txdav/caldav/datastore/file.py	2012-10-05 22:52:17 UTC (rev 9899)
@@ -34,8 +34,6 @@
 
 from twisted.internet.defer import inlineCallbacks, returnValue, succeed, fail
 
-from twisted.python.failure import Failure
-
 from twext.python.vcomponent import VComponent
 from txdav.xml import element as davxml
 from txdav.xml.rfc2518 import ResourceType, GETContentType
@@ -56,7 +54,7 @@
     IndexSchedule as OldInboxIndex
 from txdav.caldav.datastore.util import (
     validateCalendarComponent, dropboxIDFromCalendarObject, CalendarObjectBase,
-    StorageTransportBase
+    StorageTransportBase, AttachmentRetrievalTransport
 )
 
 from txdav.common.datastore.file import (
@@ -737,6 +735,7 @@
     def write(self, data):
         # FIXME: multiple chunks
         self._file.write(data)
+        return super(AttachmentStorageTransport, self).write(data)
 
 
     def loseConnection(self):
@@ -801,12 +800,7 @@
 
 
     def retrieve(self, protocol):
-        # FIXME: makeConnection
-        # FIXME: actually stream
-        # FIMXE: connectionLost
-        protocol.dataReceived(self._path.getContent())
-        # FIXME: ConnectionDone, not NotImplementedError
-        protocol.connectionLost(Failure(NotImplementedError()))
+        return AttachmentRetrievalTransport(self._path).start(protocol)
 
 
     @property

Modified: CalendarServer/trunk/txdav/caldav/datastore/sql.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/sql.py	2012-10-05 21:40:57 UTC (rev 9898)
+++ CalendarServer/trunk/txdav/caldav/datastore/sql.py	2012-10-05 22:52:17 UTC (rev 9899)
@@ -32,9 +32,7 @@
 from twext.python.filepath import CachingFilePath
 
 from twisted.internet.defer import inlineCallbacks, returnValue
-from twisted.internet.error import ConnectionLost
 from twisted.python import hashlib
-from twisted.python.failure import Failure
 
 from twistedcaldav import caldavxml, customxml
 from twistedcaldav.caldavxml import ScheduleCalendarTransp, Opaque
@@ -80,6 +78,8 @@
 from pycalendar.duration import PyCalendarDuration
 from pycalendar.timezone import PyCalendarTimezone
 
+from txdav.caldav.datastore.util import AttachmentRetrievalTransport
+
 from zope.interface.declarations import implements
 
 import os
@@ -1531,8 +1531,7 @@
 
 
     def retrieve(self, protocol):
-        protocol.dataReceived(self._path.getContent())
-        protocol.connectionLost(Failure(ConnectionLost()))
+        return AttachmentRetrievalTransport(self._path).start(protocol)
 
 
     _removeStatement = Delete(

Modified: CalendarServer/trunk/txdav/caldav/datastore/test/common.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/test/common.py	2012-10-05 21:40:57 UTC (rev 9898)
+++ CalendarServer/trunk/txdav/caldav/datastore/test/common.py	2012-10-05 22:52:17 UTC (rev 9899)
@@ -2080,24 +2080,21 @@
         t.write(sampleData)
         yield t.loseConnection()
         yield self.exceedQuotaTest(get)
+        @inlineCallbacks
         def checkOriginal():
-            catch = StringIO()
-            catch.dataReceived = catch.write
-            lost = []
-            catch.connectionLost = lost.append
-            attachment.retrieve(catch)
+            actual = yield self.attachmentToString(attachment)
             expected = sampleData
             # note: 60 is less than len(expected); trimming is just to make
             # the error message look sane when the test fails.
-            actual = catch.getvalue()[:60]
+            actual = actual[:60]
             self.assertEquals(actual, expected)
-        checkOriginal()
+        yield checkOriginal()
         yield self.commit()
         # Make sure that things go back to normal after a commit of that
         # transaction.
         obj = yield self.calendarObjectUnderTest()
         attachment = yield get()
-        checkOriginal()
+        yield checkOriginal()
 
 
     def test_removeAttachmentWithName(self, refresh=lambda x:x):

Modified: CalendarServer/trunk/txdav/caldav/datastore/util.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/util.py	2012-10-05 21:40:57 UTC (rev 9898)
+++ CalendarServer/trunk/txdav/caldav/datastore/util.py	2012-10-05 22:52:17 UTC (rev 9899)
@@ -45,6 +45,11 @@
     InternalDataStoreError, HomeChildNameAlreadyExistsError
 )
 from txdav.base.datastore.util import normalizeUUIDOrNot
+from twisted.protocols.basic import FileSender
+from twisted.internet.interfaces import ITransport
+from twisted.internet.interfaces import IConsumer
+from twisted.internet.error import ConnectionLost
+from twisted.internet.task import LoopingCall
 
 log = Logger()
 
@@ -299,6 +304,9 @@
         self.storeTransport = storeTransport
         self.done = Deferred()
 
+    def connectionMade(self):
+        self.storeTransport.registerProducer(self.transport, False)
+
     def dataReceived(self, data):
         self.storeTransport.write(data)
 
@@ -462,14 +470,34 @@
         Create a storage transport with a reference to an L{IAttachment} and a
         L{twext.web2.http_headers.MimeType}.
         """
+        from twisted.internet import reactor
+        self._clock = reactor
         self._attachment = attachment
         self._contentType = contentType
+        self._producer = None
 
         # Make sure we have some kind of contrent-type
         if self._contentType is None:
             self._contentType = http_headers.MimeType.fromString(getType(self._attachment.name(), self.contentTypes))
 
 
+    def write(self, data):
+        """
+        Children must override this to actually write the data, but should
+        upcall this implementation to interact properly with producers.
+        """
+        if self._producer and self._streamingProducer:
+            # XXX this needs to be in a callLater because otherwise
+            # resumeProducing will call write which will call resumeProducing
+            # (etc) forever.
+            self._clock.callLater(0, self._producer.resumeProducing)
+
+
+    def registerProducer(self, producer, streaming):
+        self._producer = producer
+        self._streamingProducer = streaming
+
+
     def getPeer(self):
         return StorageTransportAddress(self._attachment, False)
 
@@ -482,7 +510,86 @@
         return self.write(''.join(seq))
 
 
+    def stopProducing(self):
+        return self.loseConnection()
 
+
+
+class AttachmentRetrievalTransport(FileSender, object):
+    """
+    The transport for a protocol that does L{IAttachment.retrieve}.
+    """
+    implements(ITransport)
+
+    def __init__(self, filePath):
+        from twisted.internet import reactor
+        self.filePath = filePath
+        self.clock = reactor
+
+
+    def start(self, protocol):
+        this = self
+        class Consumer(object):
+            implements(IConsumer)
+            def registerProducer(self, producer, streaming):
+                protocol.makeConnection(producer)
+                this._maybeLoopDelivery()
+            def write(self, data):
+                protocol.dataReceived(data)
+            def unregisterProducer(self):
+                this._done(protocol)
+        self.beginFileTransfer(self.filePath.open(), Consumer())
+
+
+    def _done(self, protocol):
+        if self._deliveryLoop:
+            self._deliveryLoop.stop()
+        protocol.connectionLost(Failure(ConnectionLost()))
+
+
+    def write(self, data):
+        raise NotImplemented("This is a read-only transport.")
+
+
+    def writeSequence(self, datas):
+        self.write("".join(datas))
+
+
+    def loseConnection(self):
+        pass
+
+
+    def getPeer(self):
+        return self
+
+
+    def getHost(self):
+        return self
+
+
+    _everResumedProducing = False
+
+    def resumeProducing(self):
+        self._everResumedProducing = True
+        super(AttachmentRetrievalTransport, self).resumeProducing()
+
+
+    _deliveryLoop = None
+
+    def _maybeLoopDelivery(self):
+        """
+        If no consumer was registered (as inferred by the fact that
+        resumeProducing() wasn't called)
+        """
+        if not self._everResumedProducing:
+            # Not registered as a streaming producer.
+            def deliverNextChunk():
+                super(AttachmentRetrievalTransport, self).resumeProducing()
+            self._deliveryLoop = LoopingCall(deliverNextChunk)
+            self._deliveryLoop.start(0.01, True)
+
+
+
 def fixOneCalendarObject(component):
     """
     Correct the properties which may contain a user's directory UUID within a
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20121005/fe05c2ea/attachment-0001.html>


More information about the calendarserver-changes mailing list