[CalendarServer-changes] [9899] CalendarServer/trunk
source_changes at macosforge.org
source_changes at macosforge.org
Fri Oct 5 15:52:17 PDT 2012
Revision: 9899
http://trac.calendarserver.org//changeset/9899
Author: glyph at apple.com
Date: 2012-10-05 15:52:17 -0700 (Fri, 05 Oct 2012)
Log Message:
-----------
Stream attachments to the client rather than reading them all into memory first.
Modified Paths:
--------------
CalendarServer/trunk/twistedcaldav/storebridge.py
CalendarServer/trunk/txdav/caldav/datastore/file.py
CalendarServer/trunk/txdav/caldav/datastore/sql.py
CalendarServer/trunk/txdav/caldav/datastore/test/common.py
CalendarServer/trunk/txdav/caldav/datastore/util.py
Property Changed:
----------------
CalendarServer/trunk/
Modified: CalendarServer/trunk/twistedcaldav/storebridge.py
===================================================================
--- CalendarServer/trunk/twistedcaldav/storebridge.py 2012-10-05 21:40:57 UTC (rev 9898)
+++ CalendarServer/trunk/twistedcaldav/storebridge.py 2012-10-05 22:52:17 UTC (rev 9899)
@@ -1634,6 +1634,8 @@
stream = ProducerStream()
class StreamProtocol(Protocol):
+ def connectionMade(self):
+ stream.registerProducer(self.transport, False)
def dataReceived(self, data):
stream.write(data)
def connectionLost(self, reason):
@@ -1643,7 +1645,6 @@
except IOError, e:
log.error("Unable to read attachment: %s, due to: %s" % (self, e,))
raise HTTPError(responsecode.NOT_FOUND)
-
return Response(OK, {"content-type":self.contentType()}, stream)
Modified: CalendarServer/trunk/txdav/caldav/datastore/file.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/file.py 2012-10-05 21:40:57 UTC (rev 9898)
+++ CalendarServer/trunk/txdav/caldav/datastore/file.py 2012-10-05 22:52:17 UTC (rev 9899)
@@ -34,8 +34,6 @@
from twisted.internet.defer import inlineCallbacks, returnValue, succeed, fail
-from twisted.python.failure import Failure
-
from twext.python.vcomponent import VComponent
from txdav.xml import element as davxml
from txdav.xml.rfc2518 import ResourceType, GETContentType
@@ -56,7 +54,7 @@
IndexSchedule as OldInboxIndex
from txdav.caldav.datastore.util import (
validateCalendarComponent, dropboxIDFromCalendarObject, CalendarObjectBase,
- StorageTransportBase
+ StorageTransportBase, AttachmentRetrievalTransport
)
from txdav.common.datastore.file import (
@@ -737,6 +735,7 @@
def write(self, data):
# FIXME: multiple chunks
self._file.write(data)
+ return super(AttachmentStorageTransport, self).write(data)
def loseConnection(self):
@@ -801,12 +800,7 @@
def retrieve(self, protocol):
- # FIXME: makeConnection
- # FIXME: actually stream
- # FIMXE: connectionLost
- protocol.dataReceived(self._path.getContent())
- # FIXME: ConnectionDone, not NotImplementedError
- protocol.connectionLost(Failure(NotImplementedError()))
+ return AttachmentRetrievalTransport(self._path).start(protocol)
@property
Modified: CalendarServer/trunk/txdav/caldav/datastore/sql.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/sql.py 2012-10-05 21:40:57 UTC (rev 9898)
+++ CalendarServer/trunk/txdav/caldav/datastore/sql.py 2012-10-05 22:52:17 UTC (rev 9899)
@@ -32,9 +32,7 @@
from twext.python.filepath import CachingFilePath
from twisted.internet.defer import inlineCallbacks, returnValue
-from twisted.internet.error import ConnectionLost
from twisted.python import hashlib
-from twisted.python.failure import Failure
from twistedcaldav import caldavxml, customxml
from twistedcaldav.caldavxml import ScheduleCalendarTransp, Opaque
@@ -80,6 +78,8 @@
from pycalendar.duration import PyCalendarDuration
from pycalendar.timezone import PyCalendarTimezone
+from txdav.caldav.datastore.util import AttachmentRetrievalTransport
+
from zope.interface.declarations import implements
import os
@@ -1531,8 +1531,7 @@
def retrieve(self, protocol):
- protocol.dataReceived(self._path.getContent())
- protocol.connectionLost(Failure(ConnectionLost()))
+ return AttachmentRetrievalTransport(self._path).start(protocol)
_removeStatement = Delete(
Modified: CalendarServer/trunk/txdav/caldav/datastore/test/common.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/test/common.py 2012-10-05 21:40:57 UTC (rev 9898)
+++ CalendarServer/trunk/txdav/caldav/datastore/test/common.py 2012-10-05 22:52:17 UTC (rev 9899)
@@ -2080,24 +2080,21 @@
t.write(sampleData)
yield t.loseConnection()
yield self.exceedQuotaTest(get)
+ @inlineCallbacks
def checkOriginal():
- catch = StringIO()
- catch.dataReceived = catch.write
- lost = []
- catch.connectionLost = lost.append
- attachment.retrieve(catch)
+ actual = yield self.attachmentToString(attachment)
expected = sampleData
# note: 60 is less than len(expected); trimming is just to make
# the error message look sane when the test fails.
- actual = catch.getvalue()[:60]
+ actual = actual[:60]
self.assertEquals(actual, expected)
- checkOriginal()
+ yield checkOriginal()
yield self.commit()
# Make sure that things go back to normal after a commit of that
# transaction.
obj = yield self.calendarObjectUnderTest()
attachment = yield get()
- checkOriginal()
+ yield checkOriginal()
def test_removeAttachmentWithName(self, refresh=lambda x:x):
Modified: CalendarServer/trunk/txdav/caldav/datastore/util.py
===================================================================
--- CalendarServer/trunk/txdav/caldav/datastore/util.py 2012-10-05 21:40:57 UTC (rev 9898)
+++ CalendarServer/trunk/txdav/caldav/datastore/util.py 2012-10-05 22:52:17 UTC (rev 9899)
@@ -45,6 +45,11 @@
InternalDataStoreError, HomeChildNameAlreadyExistsError
)
from txdav.base.datastore.util import normalizeUUIDOrNot
+from twisted.protocols.basic import FileSender
+from twisted.internet.interfaces import ITransport
+from twisted.internet.interfaces import IConsumer
+from twisted.internet.error import ConnectionLost
+from twisted.internet.task import LoopingCall
log = Logger()
@@ -299,6 +304,9 @@
self.storeTransport = storeTransport
self.done = Deferred()
+ def connectionMade(self):
+ self.storeTransport.registerProducer(self.transport, False)
+
def dataReceived(self, data):
self.storeTransport.write(data)
@@ -462,14 +470,34 @@
Create a storage transport with a reference to an L{IAttachment} and a
L{twext.web2.http_headers.MimeType}.
"""
+ from twisted.internet import reactor
+ self._clock = reactor
self._attachment = attachment
self._contentType = contentType
+ self._producer = None
# Make sure we have some kind of contrent-type
if self._contentType is None:
self._contentType = http_headers.MimeType.fromString(getType(self._attachment.name(), self.contentTypes))
+ def write(self, data):
+ """
+ Children must override this to actually write the data, but should
+ upcall this implementation to interact properly with producers.
+ """
+ if self._producer and self._streamingProducer:
+ # XXX this needs to be in a callLater because otherwise
+ # resumeProducing will call write which will call resumeProducing
+ # (etc) forever.
+ self._clock.callLater(0, self._producer.resumeProducing)
+
+
+ def registerProducer(self, producer, streaming):
+ self._producer = producer
+ self._streamingProducer = streaming
+
+
def getPeer(self):
return StorageTransportAddress(self._attachment, False)
@@ -482,7 +510,86 @@
return self.write(''.join(seq))
+ def stopProducing(self):
+ return self.loseConnection()
+
+
+class AttachmentRetrievalTransport(FileSender, object):
+ """
+ The transport for a protocol that does L{IAttachment.retrieve}.
+ """
+ implements(ITransport)
+
+ def __init__(self, filePath):
+ from twisted.internet import reactor
+ self.filePath = filePath
+ self.clock = reactor
+
+
+ def start(self, protocol):
+ this = self
+ class Consumer(object):
+ implements(IConsumer)
+ def registerProducer(self, producer, streaming):
+ protocol.makeConnection(producer)
+ this._maybeLoopDelivery()
+ def write(self, data):
+ protocol.dataReceived(data)
+ def unregisterProducer(self):
+ this._done(protocol)
+ self.beginFileTransfer(self.filePath.open(), Consumer())
+
+
+ def _done(self, protocol):
+ if self._deliveryLoop:
+ self._deliveryLoop.stop()
+ protocol.connectionLost(Failure(ConnectionLost()))
+
+
+ def write(self, data):
+ raise NotImplemented("This is a read-only transport.")
+
+
+ def writeSequence(self, datas):
+ self.write("".join(datas))
+
+
+ def loseConnection(self):
+ pass
+
+
+ def getPeer(self):
+ return self
+
+
+ def getHost(self):
+ return self
+
+
+ _everResumedProducing = False
+
+ def resumeProducing(self):
+ self._everResumedProducing = True
+ super(AttachmentRetrievalTransport, self).resumeProducing()
+
+
+ _deliveryLoop = None
+
+ def _maybeLoopDelivery(self):
+ """
+ If no consumer was registered (as inferred by the fact that
+ resumeProducing() wasn't called)
+ """
+ if not self._everResumedProducing:
+ # Not registered as a streaming producer.
+ def deliverNextChunk():
+ super(AttachmentRetrievalTransport, self).resumeProducing()
+ self._deliveryLoop = LoopingCall(deliverNextChunk)
+ self._deliveryLoop.start(0.01, True)
+
+
+
def fixOneCalendarObject(component):
"""
Correct the properties which may contain a user's directory UUID within a
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20121005/fe05c2ea/attachment-0001.html>
More information about the calendarserver-changes
mailing list