[CalendarServer-changes] [13605] CalendarServer/trunk
source_changes at macosforge.org
source_changes at macosforge.org
Tue Jun 3 21:28:02 PDT 2014
Revision: 13605
http://trac.calendarserver.org//changeset/13605
Author: sagen at apple.com
Date: 2014-06-03 21:28:02 -0700 (Tue, 03 Jun 2014)
Log Message:
-----------
Handle AMP size limits
Modified Paths:
--------------
CalendarServer/trunk/requirements-stable.txt
CalendarServer/trunk/txdav/dps/client.py
CalendarServer/trunk/txdav/dps/commands.py
CalendarServer/trunk/txdav/dps/server.py
CalendarServer/trunk/txdav/dps/test/test_client.py
CalendarServer/trunk/txdav/who/test/support.py
Modified: CalendarServer/trunk/requirements-stable.txt
===================================================================
--- CalendarServer/trunk/requirements-stable.txt 2014-06-04 04:27:23 UTC (rev 13604)
+++ CalendarServer/trunk/requirements-stable.txt 2014-06-04 04:28:02 UTC (rev 13605)
@@ -5,7 +5,7 @@
# For CalendarServer development, don't try to get these projects from PyPI; use svn.
-e .
--e svn+http://svn.calendarserver.org/repository/calendarserver/twext/trunk@13602#egg=twextpy
+-e svn+http://svn.calendarserver.org/repository/calendarserver/twext/trunk@13604#egg=twextpy
-e svn+http://svn.calendarserver.org/repository/calendarserver/PyKerberos/trunk@13420#egg=kerberos
-e svn+http://svn.calendarserver.org/repository/calendarserver/PyCalendar/trunk@13576#egg=pycalendar
Modified: CalendarServer/trunk/txdav/dps/client.py
===================================================================
--- CalendarServer/trunk/txdav/dps/client.py 2014-06-04 04:27:23 UTC (rev 13604)
+++ CalendarServer/trunk/txdav/dps/client.py 2014-06-04 04:28:02 UTC (rev 13605)
@@ -39,7 +39,7 @@
RecordsMatchingTokensCommand, RecordsMatchingFieldsCommand,
MembersCommand, GroupsCommand, SetMembersCommand,
VerifyPlaintextPasswordCommand, VerifyHTTPDigestCommand,
- WikiAccessForUID
+ WikiAccessForUID, ContinuationCommand
)
from txdav.who.directory import (
CalendarDirectoryRecordMixin, CalendarDirectoryServiceMixin
@@ -127,12 +127,24 @@
def _processSingleRecord(self, result):
+ """
+ Takes a dictionary with a "fields" key whose value is a pickled
+ dictionary of a record's fields, and returns a record.
+ """
serializedFields = pickle.loads(result['fields'])
return self._dictToRecord(serializedFields)
def _processMultipleRecords(self, result):
- serializedFieldsList = pickle.loads(result['fieldsList'])
+ """
+ Takes a dictionary with a "fieldsList" key whose value is an iterable
+ of pickled dictionaries (of records' fields), and returns a list of
+ records.
+ """
+ serializedFieldsList = []
+ for fields in result["fieldsList"]:
+ fields = pickle.loads(fields)
+ serializedFieldsList.append(fields)
results = []
for serializedFields in serializedFieldsList:
record = self._dictToRecord(serializedFields)
@@ -158,6 +170,26 @@
@inlineCallbacks
+ def _sendCommand(self, command, **kwds):
+ """
+ Execute a remote AMP command, first making the connection to the peer.
+ Any kwds are passed on to the AMP command.
+
+ @param command: the AMP command to call
+ @type command: L{twisted.protocols.amp.Command}
+ """
+ ampProto = (yield self._getConnection())
+ try:
+ results = (yield ampProto.callRemote(command, **kwds))
+ except Exception, e:
+ log.error("Failed AMP command", error=e)
+ # FIXME: is there a way to hook into ConnectionLost?
+ self._connection = None
+ raise
+ returnValue(results)
+
+
+ @inlineCallbacks
def _call(self, command, postProcess, **kwds):
"""
Execute a remote AMP command, first making the connection to the peer,
@@ -172,14 +204,27 @@
L{Deferred} which fires with the post-processed results
@type postProcess: callable
"""
- ampProto = (yield self._getConnection())
- try:
- results = (yield ampProto.callRemote(command, **kwds))
- except Exception, e:
- log.error("Failed AMP command", error=e)
- # FIXME: is there a way to hook into ConnectionLost?
- self._connection = None
- raise
+ results = yield self._sendCommand(command, **kwds)
+ if results.get("continuation", None) is None:
+ # We have all the results
+ returnValue(postProcess(results))
+
+ # There are more results to fetch, so loop until the continuation
+ # keyword we get back is None
+
+ multi = [results]
+
+ while results.get("continuation", None) is not None:
+ results = yield self._sendCommand(
+ ContinuationCommand,
+ continuation=results["continuation"]
+ )
+ multi.append(results)
+
+ results = {"fieldsList": []}
+ for result in multi:
+ results["fieldsList"].extend(result["fieldsList"])
+
returnValue(postProcess(results))
Modified: CalendarServer/trunk/txdav/dps/commands.py
===================================================================
--- CalendarServer/trunk/txdav/dps/commands.py 2014-06-04 04:27:23 UTC (rev 13604)
+++ CalendarServer/trunk/txdav/dps/commands.py 2014-06-04 04:28:02 UTC (rev 13605)
@@ -53,7 +53,8 @@
('recordType', amp.String()),
]
response = [
- ('fieldsList', amp.String()),
+ ('fieldsList', amp.ListOf(amp.String())),
+ ('continuation', amp.String(optional=True)),
]
@@ -63,18 +64,31 @@
('emailAddress', amp.String()),
]
response = [
- ('fieldsList', amp.String()),
+ ('fieldsList', amp.ListOf(amp.String())),
+ ('continuation', amp.String(optional=True)),
]
+class ContinuationCommand(amp.Command):
+ arguments = [
+ ('continuation', amp.String(optional=True)),
+ ]
+ response = [
+ ('fieldsList', amp.ListOf(amp.String())),
+ ('continuation', amp.String(optional=True)),
+ ]
+
+
+
class RecordsMatchingTokensCommand(amp.Command):
arguments = [
('tokens', amp.ListOf(amp.String())),
('context', amp.String(optional=True)),
]
response = [
- ('fieldsList', amp.String()),
+ ('fieldsList', amp.ListOf(amp.String())),
+ ('continuation', amp.String(optional=True)),
]
@@ -86,14 +100,15 @@
('recordType', amp.String(optional=True)),
]
response = [
- ('fieldsList', amp.String()),
+ ('fieldsList', amp.ListOf(amp.String())),
+ ('continuation', amp.String(optional=True)),
]
class UpdateRecordsCommand(amp.Command):
arguments = [
- ('fieldsList', amp.String()),
+ ('fieldsList', amp.ListOf(amp.String())),
('create', amp.Boolean(optional=True)),
]
response = [
@@ -117,7 +132,8 @@
('uid', amp.String()),
]
response = [
- ('fieldsList', amp.String()),
+ ('fieldsList', amp.ListOf(amp.String())),
+ ('continuation', amp.String(optional=True)),
]
@@ -127,7 +143,7 @@
('uid', amp.String()),
]
response = [
- ('fieldsList', amp.String()),
+ ('fieldsList', amp.ListOf(amp.String())),
]
Modified: CalendarServer/trunk/txdav/dps/server.py
===================================================================
--- CalendarServer/trunk/txdav/dps/server.py 2014-06-04 04:27:23 UTC (rev 13604)
+++ CalendarServer/trunk/txdav/dps/server.py 2014-06-04 04:28:02 UTC (rev 13605)
@@ -15,6 +15,7 @@
##
import cPickle as pickle
+import datetime
import uuid
from twext.python.log import Logger
@@ -35,7 +36,7 @@
RecordsMatchingTokensCommand, RecordsMatchingFieldsCommand,
MembersCommand, GroupsCommand, SetMembersCommand,
VerifyPlaintextPasswordCommand, VerifyHTTPDigestCommand,
- WikiAccessForUID
+ WikiAccessForUID, ContinuationCommand
# UpdateRecordsCommand, RemoveRecordsCommand
)
from txdav.who.util import directoryFromConfig
@@ -62,7 +63,104 @@
amp.AMP.__init__(self)
self._directory = directory
+ # How to large we let an AMP response get before breaking it up
+ self._maxSize = 60000
+ # The cache of results we have not fully responded with. A dictionary
+ # whose keys are "continuation tokens" and whose values are tuples of
+ # (timestamp, list-of-records). When a response does not fit within
+ # AMP size limits, the remaining records are stored in this dictionary
+ # keyed by an opaque token we generate to return to the client so that
+ # it can ask for the remaining results later.
+ self._continuations = {}
+
+
+ def _storeContinuation(self, records):
+ """
+ Store an iterable of records and generate an opaque token we can
+ give back to the client so they can later retrieve these remaining
+ results that did not fit in the previous AMP response.
+
+ @param records: an iterable of records
+ @return: a C{str} token
+ """
+ token = str(uuid.uuid4())
+ # FIXME: I included a timestamp just in case we want to have code that
+ # looks for stale continuations to expire them.
+ self._continuations[token] = (datetime.datetime.now(), records)
+ return token
+
+
+ def _retrieveContinuation(self, token):
+ """
+ Retrieve the previously stored iterable of records associated with
+ the token, and remove the token.
+
+ @param token: a C{str} token previously returned by _storeContinuation
+ @return: an iterable of records, or None if the token does not exist
+ """
+ if token in self._continuations:
+ timestamp, records = self._continuations[token]
+ del self._continuations[token]
+ else:
+ records = None
+ return records
+
+
+ @ContinuationCommand.responder
+ def continuation(self, continuation):
+ """
+ The client calls this command in order to retrieve records that did
+ not fit into an earlier response.
+
+ @param continuation: the token returned via the "continuation" key
+ in the previous response.
+ """
+ log.debug("Continuation: {c}", c=continuation)
+ records = self._retrieveContinuation(continuation)
+ response = self._recordsToResponse(records)
+ log.debug("Responding with: {response}", response=response)
+ return response
+
+
+ def _recordsToResponse(self, records):
+ """
+ Craft an AMP response containing as many records as will fit within
+ the size limit. Remaining records are stored as a "continuation",
+ identified by a token that is returned to the client to fetch later
+ via the ContinuationCommand.
+
+ @param records: an iterable of records
+ @return: the response dictionary, with a list of pickled records
+ stored in the "fieldsList" key, and if there are leftover
+ records that did not fit, there will be a "continuation" key
+ containing the token the client must send via ContinuationCommand.
+ """
+ fieldsList = []
+ count = 0
+ if records:
+ size = 0
+ while size < self._maxSize:
+ try:
+ record = records.pop()
+ except (KeyError, IndexError):
+ # We're done.
+ # Note: because records is an iterable (list or set)
+ # we're catching both KeyError and IndexError.
+ break
+ pickled = pickle.dumps(self.recordToDict(record))
+ size = size + len(pickled)
+ fieldsList.append(pickled)
+ count += 1
+
+ response = {"fieldsList": fieldsList}
+
+ if records:
+ response["continuation"] = self._storeContinuation(records)
+
+ return response
+
+
def recordToDict(self, record):
"""
Turn a record in a dictionary of fields which can be reconstituted
@@ -140,12 +238,7 @@
records = (yield self._directory.recordsWithRecordType(
self._directory.recordType.lookupByName(recordType))
)
- fieldsList = []
- for record in records:
- fieldsList.append(self.recordToDict(record))
- response = {
- "fieldsList": pickle.dumps(fieldsList),
- }
+ response = self._recordsToResponse(records)
log.debug("Responding with: {response}", response=response)
returnValue(response)
@@ -156,16 +249,13 @@
emailAddress = emailAddress.decode("utf-8")
log.debug("RecordsWithEmailAddress: {e}", e=emailAddress)
records = (yield self._directory.recordsWithEmailAddress(emailAddress))
- fieldsList = []
- for record in records:
- fieldsList.append(self.recordToDict(record))
- response = {
- "fieldsList": pickle.dumps(fieldsList),
- }
+ response = self._recordsToResponse(records)
log.debug("Responding with: {response}", response=response)
returnValue(response)
+
+
@RecordsMatchingTokensCommand.responder
@inlineCallbacks
def recordsMatchingTokens(self, tokens, context=None):
@@ -174,16 +264,12 @@
records = yield self._directory.recordsMatchingTokens(
tokens, context=context
)
- fieldsList = []
- for record in records:
- fieldsList.append(self.recordToDict(record))
- response = {
- "fieldsList": pickle.dumps(fieldsList),
- }
+ response = self._recordsToResponse(records)
log.debug("Responding with: {response}", response=response)
returnValue(response)
+
@RecordsMatchingFieldsCommand.responder
@inlineCallbacks
def recordsMatchingFields(self, fields, operand="OR", recordType=None):
@@ -209,12 +295,7 @@
records = yield self._directory.recordsMatchingFields(
newFields, operand=operand, recordType=recordType
)
- fieldsList = []
- for record in records:
- fieldsList.append(self.recordToDict(record))
- response = {
- "fieldsList": pickle.dumps(fieldsList),
- }
+ response = self._recordsToResponse(records)
log.debug("Responding with: {response}", response=response)
returnValue(response)
@@ -230,13 +311,12 @@
log.error("Failed in members", error=e)
record = None
- fieldsList = []
+ records = []
if record is not None:
for member in (yield record.members()):
- fieldsList.append(self.recordToDict(member))
- response = {
- "fieldsList": pickle.dumps(fieldsList),
- }
+ records.append(member)
+
+ response = self._recordsToResponse(records)
log.debug("Responding with: {response}", response=response)
returnValue(response)
Modified: CalendarServer/trunk/txdav/dps/test/test_client.py
===================================================================
--- CalendarServer/trunk/txdav/dps/test/test_client.py 2014-06-04 04:27:23 UTC (rev 13604)
+++ CalendarServer/trunk/txdav/dps/test/test_client.py 2014-06-04 04:28:02 UTC (rev 13605)
@@ -30,6 +30,9 @@
from txdav.dps.client import DirectoryService
from txdav.dps.server import DirectoryProxyAMPProtocol
from txdav.who.directory import CalendarDirectoryServiceMixin
+from txdav.who.test.support import (
+ TestRecord, CalendarInMemoryDirectoryService
+)
from twistedcaldav.test.util import StoreTestCase
from twistedcaldav.config import config
@@ -590,3 +593,109 @@
)
)
self.assertEquals(authenticated, answer)
+
+
+class DPSClientLargeResultsTest(unittest.TestCase):
+ """
+ Tests the client against a single directory service (as opposed to the
+ augmented, aggregated structure you get from directoryFromConfig(), which
+ is tested in the class below)
+ """
+
+ @inlineCallbacks
+ def setUp(self):
+
+ self.numUsers = 1000
+
+ # The "local" directory service
+ self.directory = DirectoryService(None)
+
+ # The "remote" directory service
+ remoteDirectory = CalendarInMemoryDirectoryService(None)
+
+ # Add users
+ records = []
+ fieldName = remoteDirectory.fieldName
+ for i in xrange(self.numUsers):
+ records.append(
+ TestRecord(
+ remoteDirectory,
+ {
+ fieldName.uid: u"foo{ctr:05d}".format(ctr=i),
+ fieldName.shortNames: (u"foo{ctr:05d}".format(ctr=i),),
+ fieldName.fullNames: (u"foo{ctr:05d}".format(ctr=i),),
+ fieldName.recordType: RecordType.user,
+ }
+ )
+ )
+
+ # Add a big group
+ records.append(
+ TestRecord(
+ remoteDirectory,
+ {
+ fieldName.uid: u"bigGroup",
+ fieldName.recordType: RecordType.group,
+ }
+ )
+ )
+
+ yield remoteDirectory.updateRecords(records, create=True)
+
+ group = yield remoteDirectory.recordWithUID(u"bigGroup")
+ members = yield remoteDirectory.recordsWithRecordType(RecordType.user)
+ yield group.setMembers(members)
+
+ # Connect the two services directly via an IOPump
+ client = AMP()
+ server = DirectoryProxyAMPProtocol(remoteDirectory)
+ pump = returnConnected(server, client)
+
+ # Replace the normal _getConnection method with one that bypasses any
+ # actual networking
+ self.patch(self.directory, "_getConnection", lambda: succeed(client))
+
+ # Wrap the normal _call method with one that flushes the IOPump
+ # afterwards
+ origCall = self.directory._call
+
+ def newCall(*args, **kwds):
+ d = origCall(*args, **kwds)
+ pump.flush()
+ return d
+
+ self.patch(self.directory, "_call", newCall)
+
+
+ @inlineCallbacks
+ def test_tooBigResults(self):
+ """
+ The AMP protocol limits values to 65,535 bytes, so the DPS server
+ breaks up the responses to fit. This test uses 1000 records to verify
+ the various methods work seamlessly in the face of large results.
+ Normally only a couple hundred records would fit in a single response.
+ """
+
+ # recordsMatchingTokens
+ records = yield self.directory.recordsMatchingTokens([u"foo"])
+ self.assertEquals(len(records), self.numUsers)
+
+ # recordsMatchingFields
+ fields = (
+ (u"fullNames", "foo", MatchFlags.caseInsensitive, MatchType.contains),
+ )
+ records = yield self.directory.recordsMatchingFields(
+ fields, operand=Operand.OR, recordType=RecordType.user
+ )
+ self.assertEquals(len(records), self.numUsers)
+
+ # recordsWithRecordType
+ records = yield self.directory.recordsWithRecordType(
+ RecordType.user
+ )
+ self.assertEquals(len(records), self.numUsers)
+
+ # members()
+ group = yield self.directory.recordWithUID(u"bigGroup")
+ members = yield group.members()
+ self.assertEquals(len(members), self.numUsers)
Modified: CalendarServer/trunk/txdav/who/test/support.py
===================================================================
--- CalendarServer/trunk/txdav/who/test/support.py 2014-06-04 04:27:23 UTC (rev 13604)
+++ CalendarServer/trunk/txdav/who/test/support.py 2014-06-04 04:28:02 UTC (rev 13605)
@@ -18,15 +18,24 @@
RecordType,
NoSuchRecordError
)
-from twext.who.index import DirectoryService as IndexDirectoryService
+from twext.who.index import (
+ DirectoryService as IndexDirectoryService,
+ DirectoryRecord as IndexedDirectoryRecord
+)
from twext.who.util import ConstantsContainer
+from twisted.internet.defer import succeed, inlineCallbacks
+from txdav.who.directory import (
+ CalendarDirectoryRecordMixin, CalendarDirectoryServiceMixin
+)
from txdav.who.idirectory import (
RecordType as CalRecordType
)
-from twisted.internet.defer import succeed, inlineCallbacks
+class TestRecord(IndexedDirectoryRecord, CalendarDirectoryRecordMixin):
+ pass
+
class InMemoryDirectoryService(IndexDirectoryService):
"""
An in-memory IDirectoryService. You must call updateRecords( ) if you want
@@ -69,3 +78,11 @@
if record.uid in uids:
recordSet.remove(record)
return succeed(None)
+
+
+
+class CalendarInMemoryDirectoryService(
+ InMemoryDirectoryService,
+ CalendarDirectoryServiceMixin
+):
+ pass
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140603/17c6f3d3/attachment-0001.html>
More information about the calendarserver-changes
mailing list