[CalendarServer-changes] [13605] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Tue Jun 3 21:28:02 PDT 2014


Revision: 13605
          http://trac.calendarserver.org//changeset/13605
Author:   sagen at apple.com
Date:     2014-06-03 21:28:02 -0700 (Tue, 03 Jun 2014)
Log Message:
-----------
Handle AMP size limits

Modified Paths:
--------------
    CalendarServer/trunk/requirements-stable.txt
    CalendarServer/trunk/txdav/dps/client.py
    CalendarServer/trunk/txdav/dps/commands.py
    CalendarServer/trunk/txdav/dps/server.py
    CalendarServer/trunk/txdav/dps/test/test_client.py
    CalendarServer/trunk/txdav/who/test/support.py

Modified: CalendarServer/trunk/requirements-stable.txt
===================================================================
--- CalendarServer/trunk/requirements-stable.txt	2014-06-04 04:27:23 UTC (rev 13604)
+++ CalendarServer/trunk/requirements-stable.txt	2014-06-04 04:28:02 UTC (rev 13605)
@@ -5,7 +5,7 @@
 # For CalendarServer development, don't try to get these projects from PyPI; use svn.
 
 -e .
--e svn+http://svn.calendarserver.org/repository/calendarserver/twext/trunk@13602#egg=twextpy
+-e svn+http://svn.calendarserver.org/repository/calendarserver/twext/trunk@13604#egg=twextpy
 -e svn+http://svn.calendarserver.org/repository/calendarserver/PyKerberos/trunk@13420#egg=kerberos
 -e svn+http://svn.calendarserver.org/repository/calendarserver/PyCalendar/trunk@13576#egg=pycalendar
 

Modified: CalendarServer/trunk/txdav/dps/client.py
===================================================================
--- CalendarServer/trunk/txdav/dps/client.py	2014-06-04 04:27:23 UTC (rev 13604)
+++ CalendarServer/trunk/txdav/dps/client.py	2014-06-04 04:28:02 UTC (rev 13605)
@@ -39,7 +39,7 @@
     RecordsMatchingTokensCommand, RecordsMatchingFieldsCommand,
     MembersCommand, GroupsCommand, SetMembersCommand,
     VerifyPlaintextPasswordCommand, VerifyHTTPDigestCommand,
-    WikiAccessForUID
+    WikiAccessForUID, ContinuationCommand
 )
 from txdav.who.directory import (
     CalendarDirectoryRecordMixin, CalendarDirectoryServiceMixin
@@ -127,12 +127,24 @@
 
 
     def _processSingleRecord(self, result):
+        """
+        Takes a dictionary with a "fields" key whose value is a pickled
+        dictionary of a record's fields, and returns a record.
+        """
         serializedFields = pickle.loads(result['fields'])
         return self._dictToRecord(serializedFields)
 
 
     def _processMultipleRecords(self, result):
-        serializedFieldsList = pickle.loads(result['fieldsList'])
+        """
+        Takes a dictionary with a "fieldsList" key whose value is an iterable
+        of pickled dictionaries (of records' fields), and returns a list of
+        records.
+        """
+        serializedFieldsList = []
+        for fields in result["fieldsList"]:
+            fields = pickle.loads(fields)
+            serializedFieldsList.append(fields)
         results = []
         for serializedFields in serializedFieldsList:
             record = self._dictToRecord(serializedFields)
@@ -158,6 +170,26 @@
 
 
     @inlineCallbacks
+    def _sendCommand(self, command, **kwds):
+        """
+        Execute a remote AMP command, first making the connection to the peer.
+        Any kwds are passed on to the AMP command.
+
+        @param command: the AMP command to call
+        @type command: L{twisted.protocols.amp.Command}
+        """
+        ampProto = (yield self._getConnection())
+        try:
+            results = (yield ampProto.callRemote(command, **kwds))
+        except Exception, e:
+            log.error("Failed AMP command", error=e)
+            #  FIXME: is there a way to hook into ConnectionLost?
+            self._connection = None
+            raise
+        returnValue(results)
+
+
+    @inlineCallbacks
     def _call(self, command, postProcess, **kwds):
         """
         Execute a remote AMP command, first making the connection to the peer,
@@ -172,14 +204,27 @@
             L{Deferred} which fires with the post-processed results
         @type postProcess: callable
         """
-        ampProto = (yield self._getConnection())
-        try:
-            results = (yield ampProto.callRemote(command, **kwds))
-        except Exception, e:
-            log.error("Failed AMP command", error=e)
-            #  FIXME: is there a way to hook into ConnectionLost?
-            self._connection = None
-            raise
+        results = yield self._sendCommand(command, **kwds)
+        if results.get("continuation", None) is None:
+            # We have all the results
+            returnValue(postProcess(results))
+
+        # There are more results to fetch, so loop until the continuation
+        # keyword we get back is None
+
+        multi = [results]
+
+        while results.get("continuation", None) is not None:
+            results = yield self._sendCommand(
+                ContinuationCommand,
+                continuation=results["continuation"]
+            )
+            multi.append(results)
+
+        results = {"fieldsList": []}
+        for result in multi:
+            results["fieldsList"].extend(result["fieldsList"])
+
         returnValue(postProcess(results))
 
 

Modified: CalendarServer/trunk/txdav/dps/commands.py
===================================================================
--- CalendarServer/trunk/txdav/dps/commands.py	2014-06-04 04:27:23 UTC (rev 13604)
+++ CalendarServer/trunk/txdav/dps/commands.py	2014-06-04 04:28:02 UTC (rev 13605)
@@ -53,7 +53,8 @@
         ('recordType', amp.String()),
     ]
     response = [
-        ('fieldsList', amp.String()),
+        ('fieldsList', amp.ListOf(amp.String())),
+        ('continuation', amp.String(optional=True)),
     ]
 
 
@@ -63,18 +64,31 @@
         ('emailAddress', amp.String()),
     ]
     response = [
-        ('fieldsList', amp.String()),
+        ('fieldsList', amp.ListOf(amp.String())),
+        ('continuation', amp.String(optional=True)),
     ]
 
 
 
+class ContinuationCommand(amp.Command):
+    arguments = [
+        ('continuation', amp.String(optional=True)),
+    ]
+    response = [
+        ('fieldsList', amp.ListOf(amp.String())),
+        ('continuation', amp.String(optional=True)),
+    ]
+
+
+
 class RecordsMatchingTokensCommand(amp.Command):
     arguments = [
         ('tokens', amp.ListOf(amp.String())),
         ('context', amp.String(optional=True)),
     ]
     response = [
-        ('fieldsList', amp.String()),
+        ('fieldsList', amp.ListOf(amp.String())),
+        ('continuation', amp.String(optional=True)),
     ]
 
 
@@ -86,14 +100,15 @@
         ('recordType', amp.String(optional=True)),
     ]
     response = [
-        ('fieldsList', amp.String()),
+        ('fieldsList', amp.ListOf(amp.String())),
+        ('continuation', amp.String(optional=True)),
     ]
 
 
 
 class UpdateRecordsCommand(amp.Command):
     arguments = [
-        ('fieldsList', amp.String()),
+        ('fieldsList', amp.ListOf(amp.String())),
         ('create', amp.Boolean(optional=True)),
     ]
     response = [
@@ -117,7 +132,8 @@
         ('uid', amp.String()),
     ]
     response = [
-        ('fieldsList', amp.String()),
+        ('fieldsList', amp.ListOf(amp.String())),
+        ('continuation', amp.String(optional=True)),
     ]
 
 
@@ -127,7 +143,7 @@
         ('uid', amp.String()),
     ]
     response = [
-        ('fieldsList', amp.String()),
+        ('fieldsList', amp.ListOf(amp.String())),
     ]
 
 

Modified: CalendarServer/trunk/txdav/dps/server.py
===================================================================
--- CalendarServer/trunk/txdav/dps/server.py	2014-06-04 04:27:23 UTC (rev 13604)
+++ CalendarServer/trunk/txdav/dps/server.py	2014-06-04 04:28:02 UTC (rev 13605)
@@ -15,6 +15,7 @@
 ##
 
 import cPickle as pickle
+import datetime
 import uuid
 
 from twext.python.log import Logger
@@ -35,7 +36,7 @@
     RecordsMatchingTokensCommand, RecordsMatchingFieldsCommand,
     MembersCommand, GroupsCommand, SetMembersCommand,
     VerifyPlaintextPasswordCommand, VerifyHTTPDigestCommand,
-    WikiAccessForUID
+    WikiAccessForUID, ContinuationCommand
     # UpdateRecordsCommand, RemoveRecordsCommand
 )
 from txdav.who.util import directoryFromConfig
@@ -62,7 +63,104 @@
         amp.AMP.__init__(self)
         self._directory = directory
 
+        # How to large we let an AMP response get before breaking it up
+        self._maxSize = 60000
 
+        # The cache of results we have not fully responded with.  A dictionary
+        # whose keys are "continuation tokens" and whose values are tuples of
+        # (timestamp, list-of-records).  When a response does not fit within
+        # AMP size limits, the remaining records are stored in this dictionary
+        # keyed by an opaque token we generate to return to the client so that
+        # it can ask for the remaining results later.
+        self._continuations = {}
+
+
+    def _storeContinuation(self, records):
+        """
+        Store an iterable of records and generate an opaque token we can
+        give back to the client so they can later retrieve these remaining
+        results that did not fit in the previous AMP response.
+
+        @param records: an iterable of records
+        @return: a C{str} token
+        """
+        token = str(uuid.uuid4())
+        # FIXME: I included a timestamp just in case we want to have code that
+        # looks for stale continuations to expire them.
+        self._continuations[token] = (datetime.datetime.now(), records)
+        return token
+
+
+    def _retrieveContinuation(self, token):
+        """
+        Retrieve the previously stored iterable of records associated with
+        the token, and remove the token.
+
+        @param token: a C{str} token previously returned by _storeContinuation
+        @return: an iterable of records, or None if the token does not exist
+        """
+        if token in self._continuations:
+            timestamp, records = self._continuations[token]
+            del self._continuations[token]
+        else:
+            records = None
+        return records
+
+
+    @ContinuationCommand.responder
+    def continuation(self, continuation):
+        """
+        The client calls this command in order to retrieve records that did
+        not fit into an earlier response.
+
+        @param continuation: the token returned via the "continuation" key
+            in the previous response.
+        """
+        log.debug("Continuation: {c}", c=continuation)
+        records = self._retrieveContinuation(continuation)
+        response = self._recordsToResponse(records)
+        log.debug("Responding with: {response}", response=response)
+        return response
+
+
+    def _recordsToResponse(self, records):
+        """
+        Craft an AMP response containing as many records as will fit within
+        the size limit.  Remaining records are stored as a "continuation",
+        identified by a token that is returned to the client to fetch later
+        via the ContinuationCommand.
+
+        @param records: an iterable of records
+        @return: the response dictionary, with a list of pickled records
+            stored in the "fieldsList" key, and if there are leftover
+            records that did not fit, there will be a "continuation" key
+            containing the token the client must send via ContinuationCommand.
+        """
+        fieldsList = []
+        count = 0
+        if records:
+            size = 0
+            while size < self._maxSize:
+                try:
+                    record = records.pop()
+                except (KeyError, IndexError):
+                    # We're done.
+                    # Note: because records is an iterable (list or set)
+                    # we're catching both KeyError and IndexError.
+                    break
+                pickled = pickle.dumps(self.recordToDict(record))
+                size = size + len(pickled)
+                fieldsList.append(pickled)
+                count += 1
+
+        response = {"fieldsList": fieldsList}
+
+        if records:
+            response["continuation"] = self._storeContinuation(records)
+
+        return response
+
+
     def recordToDict(self, record):
         """
         Turn a record in a dictionary of fields which can be reconstituted
@@ -140,12 +238,7 @@
         records = (yield self._directory.recordsWithRecordType(
             self._directory.recordType.lookupByName(recordType))
         )
-        fieldsList = []
-        for record in records:
-            fieldsList.append(self.recordToDict(record))
-        response = {
-            "fieldsList": pickle.dumps(fieldsList),
-        }
+        response = self._recordsToResponse(records)
         log.debug("Responding with: {response}", response=response)
         returnValue(response)
 
@@ -156,16 +249,13 @@
         emailAddress = emailAddress.decode("utf-8")
         log.debug("RecordsWithEmailAddress: {e}", e=emailAddress)
         records = (yield self._directory.recordsWithEmailAddress(emailAddress))
-        fieldsList = []
-        for record in records:
-            fieldsList.append(self.recordToDict(record))
-        response = {
-            "fieldsList": pickle.dumps(fieldsList),
-        }
+        response = self._recordsToResponse(records)
         log.debug("Responding with: {response}", response=response)
         returnValue(response)
 
 
+
+
     @RecordsMatchingTokensCommand.responder
     @inlineCallbacks
     def recordsMatchingTokens(self, tokens, context=None):
@@ -174,16 +264,12 @@
         records = yield self._directory.recordsMatchingTokens(
             tokens, context=context
         )
-        fieldsList = []
-        for record in records:
-            fieldsList.append(self.recordToDict(record))
-        response = {
-            "fieldsList": pickle.dumps(fieldsList),
-        }
+        response = self._recordsToResponse(records)
         log.debug("Responding with: {response}", response=response)
         returnValue(response)
 
 
+
     @RecordsMatchingFieldsCommand.responder
     @inlineCallbacks
     def recordsMatchingFields(self, fields, operand="OR", recordType=None):
@@ -209,12 +295,7 @@
         records = yield self._directory.recordsMatchingFields(
             newFields, operand=operand, recordType=recordType
         )
-        fieldsList = []
-        for record in records:
-            fieldsList.append(self.recordToDict(record))
-        response = {
-            "fieldsList": pickle.dumps(fieldsList),
-        }
+        response = self._recordsToResponse(records)
         log.debug("Responding with: {response}", response=response)
         returnValue(response)
 
@@ -230,13 +311,12 @@
             log.error("Failed in members", error=e)
             record = None
 
-        fieldsList = []
+        records = []
         if record is not None:
             for member in (yield record.members()):
-                fieldsList.append(self.recordToDict(member))
-        response = {
-            "fieldsList": pickle.dumps(fieldsList),
-        }
+                records.append(member)
+
+        response = self._recordsToResponse(records)
         log.debug("Responding with: {response}", response=response)
         returnValue(response)
 

Modified: CalendarServer/trunk/txdav/dps/test/test_client.py
===================================================================
--- CalendarServer/trunk/txdav/dps/test/test_client.py	2014-06-04 04:27:23 UTC (rev 13604)
+++ CalendarServer/trunk/txdav/dps/test/test_client.py	2014-06-04 04:28:02 UTC (rev 13605)
@@ -30,6 +30,9 @@
 from txdav.dps.client import DirectoryService
 from txdav.dps.server import DirectoryProxyAMPProtocol
 from txdav.who.directory import CalendarDirectoryServiceMixin
+from txdav.who.test.support import (
+    TestRecord, CalendarInMemoryDirectoryService
+)
 from twistedcaldav.test.util import StoreTestCase
 from twistedcaldav.config import config
 
@@ -590,3 +593,109 @@
                     )
                 )
                 self.assertEquals(authenticated, answer)
+
+
+class DPSClientLargeResultsTest(unittest.TestCase):
+    """
+    Tests the client against a single directory service (as opposed to the
+    augmented, aggregated structure you get from directoryFromConfig(), which
+    is tested in the class below)
+    """
+
+    @inlineCallbacks
+    def setUp(self):
+
+        self.numUsers = 1000
+
+        # The "local" directory service
+        self.directory = DirectoryService(None)
+
+        # The "remote" directory service
+        remoteDirectory = CalendarInMemoryDirectoryService(None)
+
+        # Add users
+        records = []
+        fieldName = remoteDirectory.fieldName
+        for i in xrange(self.numUsers):
+            records.append(
+                TestRecord(
+                    remoteDirectory,
+                    {
+                        fieldName.uid: u"foo{ctr:05d}".format(ctr=i),
+                        fieldName.shortNames: (u"foo{ctr:05d}".format(ctr=i),),
+                        fieldName.fullNames: (u"foo{ctr:05d}".format(ctr=i),),
+                        fieldName.recordType: RecordType.user,
+                    }
+                )
+            )
+
+        # Add a big group
+        records.append(
+            TestRecord(
+                remoteDirectory,
+                {
+                    fieldName.uid: u"bigGroup",
+                    fieldName.recordType: RecordType.group,
+                }
+            )
+        )
+
+        yield remoteDirectory.updateRecords(records, create=True)
+
+        group = yield remoteDirectory.recordWithUID(u"bigGroup")
+        members = yield remoteDirectory.recordsWithRecordType(RecordType.user)
+        yield group.setMembers(members)
+
+        # Connect the two services directly via an IOPump
+        client = AMP()
+        server = DirectoryProxyAMPProtocol(remoteDirectory)
+        pump = returnConnected(server, client)
+
+        # Replace the normal _getConnection method with one that bypasses any
+        # actual networking
+        self.patch(self.directory, "_getConnection", lambda: succeed(client))
+
+        # Wrap the normal _call method with one that flushes the IOPump
+        # afterwards
+        origCall = self.directory._call
+
+        def newCall(*args, **kwds):
+            d = origCall(*args, **kwds)
+            pump.flush()
+            return d
+
+        self.patch(self.directory, "_call", newCall)
+
+
+    @inlineCallbacks
+    def test_tooBigResults(self):
+        """
+        The AMP protocol limits values to 65,535 bytes, so the DPS server
+        breaks up the responses to fit.  This test uses 1000 records to verify
+        the various methods work seamlessly in the face of large results.
+        Normally only a couple hundred records would fit in a single response.
+        """
+
+        # recordsMatchingTokens
+        records = yield self.directory.recordsMatchingTokens([u"foo"])
+        self.assertEquals(len(records), self.numUsers)
+
+        # recordsMatchingFields
+        fields = (
+            (u"fullNames", "foo", MatchFlags.caseInsensitive, MatchType.contains),
+        )
+        records = yield self.directory.recordsMatchingFields(
+            fields, operand=Operand.OR, recordType=RecordType.user
+        )
+        self.assertEquals(len(records), self.numUsers)
+
+        # recordsWithRecordType
+        records = yield self.directory.recordsWithRecordType(
+            RecordType.user
+        )
+        self.assertEquals(len(records), self.numUsers)
+
+        # members()
+        group = yield self.directory.recordWithUID(u"bigGroup")
+        members = yield group.members()
+        self.assertEquals(len(members), self.numUsers)

Modified: CalendarServer/trunk/txdav/who/test/support.py
===================================================================
--- CalendarServer/trunk/txdav/who/test/support.py	2014-06-04 04:27:23 UTC (rev 13604)
+++ CalendarServer/trunk/txdav/who/test/support.py	2014-06-04 04:28:02 UTC (rev 13605)
@@ -18,15 +18,24 @@
     RecordType,
     NoSuchRecordError
 )
-from twext.who.index import DirectoryService as IndexDirectoryService
+from twext.who.index import (
+    DirectoryService as IndexDirectoryService,
+    DirectoryRecord as IndexedDirectoryRecord
+)
 from twext.who.util import ConstantsContainer
+from twisted.internet.defer import succeed, inlineCallbacks
+from txdav.who.directory import (
+    CalendarDirectoryRecordMixin, CalendarDirectoryServiceMixin
+)
 from txdav.who.idirectory import (
     RecordType as CalRecordType
 )
-from twisted.internet.defer import succeed, inlineCallbacks
 
 
+class TestRecord(IndexedDirectoryRecord, CalendarDirectoryRecordMixin):
+    pass
 
+
 class InMemoryDirectoryService(IndexDirectoryService):
     """
     An in-memory IDirectoryService.  You must call updateRecords( ) if you want
@@ -69,3 +78,11 @@
                     if record.uid in uids:
                         recordSet.remove(record)
         return succeed(None)
+
+
+
+class CalendarInMemoryDirectoryService(
+    InMemoryDirectoryService,
+    CalendarDirectoryServiceMixin
+):
+    pass
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140603/17c6f3d3/attachment-0001.html>


More information about the calendarserver-changes mailing list