[CalendarServer-changes] [14091] twext/trunk/twext/who/ldap

source_changes at macosforge.org source_changes at macosforge.org
Fri Oct 17 16:40:28 PDT 2014


Revision: 14091
          http://trac.calendarserver.org//changeset/14091
Author:   sagen at apple.com
Date:     2014-10-17 16:40:27 -0700 (Fri, 17 Oct 2014)
Log Message:
-----------
Support mapping LDAP attribute values to NamedConstants; adds tests for boolean LDAP attributes

Modified Paths:
--------------
    twext/trunk/twext/who/ldap/_service.py
    twext/trunk/twext/who/ldap/test/test_service.py

Modified: twext/trunk/twext/who/ldap/_service.py
===================================================================
--- twext/trunk/twext/who/ldap/_service.py	2014-10-17 19:49:34 UTC (rev 14090)
+++ twext/trunk/twext/who/ldap/_service.py	2014-10-17 23:40:27 UTC (rev 14091)
@@ -296,14 +296,21 @@
             raise DirectoryConfigurationError("Mapping for uid required")
 
         self._fieldNameToAttributesMap = fieldNameToAttributesMap
-        self._attributeToFieldNameMap = reverseDict(
-            fieldNameToAttributesMap
-        )
+
+        self._attributeToFieldNameMap = {}
+        for name, attributes in fieldNameToAttributesMap.iteritems():
+            for attribute in attributes:
+                if ":" in attribute:
+                    attribute, ignored = attribute.split(":", 1)
+                self._attributeToFieldNameMap[attribute] = name
+
         self._recordTypeSchemas = recordTypeSchemas
 
         attributesToFetch = set()
         for attributes in fieldNameToAttributesMap.values():
             for attribute in attributes:
+                if ":" in attribute:
+                    attribute, ignored = attribute.split(":", 1)
                 attributesToFetch.add(attribute.encode("utf-8"))
         self._attributesToFetch = list(attributesToFetch)
 
@@ -751,9 +758,10 @@
             fields = {}
 
             for attribute, values in recordData.iteritems():
-                fieldNames = self._attributeToFieldNameMap.get(attribute)
+                fieldName = self._attributeToFieldNameMap.get(attribute)
+                attributeRules = self._fieldNameToAttributesMap[fieldName]
 
-                if fieldNames is None:
+                if fieldName is None:
                     # self.log.debug(
                     #     "Unmapped LDAP attribute {attribute!r} in record "
                     #     "data: {recordData!r}",
@@ -761,52 +769,70 @@
                     # )
                     continue
 
-                for fieldName in fieldNames:
-                    valueType = self.fieldName.valueType(fieldName)
+                valueType = self.fieldName.valueType(fieldName)
 
-                    if valueType in (unicode, UUID):
-                        if not isinstance(values, list):
-                            values = [values]
+                if valueType in (unicode, UUID):
+                    if not isinstance(values, list):
+                        values = [values]
 
-                        if valueType is unicode:
-                            newValues = []
-                            for v in values:
-                                if isinstance(v, unicode):
-                                    # because the ldap unit test produces
-                                    # unicode values (?)
-                                    newValues.append(v)
-                                else:
-                                    newValues.append(unicode(v, "utf-8"))
-                            # newValues = [unicode(v, "utf-8") for v in values]
-                        else:
-                            newValues = [valueType(v) for v in values]
+                    if valueType is unicode:
+                        newValues = []
+                        for v in values:
+                            if isinstance(v, unicode):
+                                # because the ldap unit test produces
+                                # unicode values (?)
+                                newValues.append(v)
+                            else:
+                                newValues.append(unicode(v, "utf-8"))
+                        # newValues = [unicode(v, "utf-8") for v in values]
+                    else:
+                        newValues = [valueType(v) for v in values]
 
-                        if self.fieldName.isMultiValue(fieldName):
-                            fields[fieldName] = newValues
-                        else:
-                            fields[fieldName] = newValues[0]
+                    if self.fieldName.isMultiValue(fieldName):
+                        fields[fieldName] = newValues
+                    else:
+                        fields[fieldName] = newValues[0]
 
-                    elif valueType is bool:
-                        if not isinstance(values, list):
-                            values = [values]
-                        if "=" in attribute:
-                            attribute, trueValue = attribute.split("=")
-                        else:
-                            trueValue = "true"
+                elif valueType is bool:
+                    if not isinstance(values, list):
+                        values = [values]
 
+                    rule = attributeRules[0]  # there is only one true value
+                    if ":" in rule:
+                        ignored, trueValue = rule.split(":")
+                    else:
+                        trueValue = "true"
+
+                    for value in values:
+                        if value == trueValue:
+                            fields[fieldName] = True
+                            break
+                    else:
+                        fields[fieldName] = False
+
+                elif issubclass(valueType, Names):
+                    if not isinstance(values, list):
+                        values = [values]
+
+                    for rule in attributeRules:
+                        attribute, attributeValue, fieldValue = rule.split(":")
+
                         for value in values:
-                            if value == trueValue:
-                                fields[fieldName] = True
+                            if value == attributeValue:
+                                # convert to a constant
+                                try:
+                                    fieldValue = valueType.lookupByName(fieldValue)
+                                    fields[fieldName] = fieldValue
+                                except ValueError:
+                                    pass
                                 break
-                        else:
-                            fields[fieldName] = False
 
-                    else:
-                        raise LDAPConfigurationError(
-                            "Unknown value type {0} for field {1}".format(
-                                valueType, fieldName
-                            )
+                else:
+                    raise LDAPConfigurationError(
+                        "Unknown value type {0} for field {1}".format(
+                            valueType, fieldName
                         )
+                    )
 
             # Skip any results missing the uid, which is a required field
             if self.fieldName.uid not in fields:

Modified: twext/trunk/twext/who/ldap/test/test_service.py
===================================================================
--- twext/trunk/twext/who/ldap/test/test_service.py	2014-10-17 19:49:34 UTC (rev 14090)
+++ twext/trunk/twext/who/ldap/test/test_service.py	2014-10-17 23:40:27 UTC (rev 14091)
@@ -60,8 +60,9 @@
 from twext.who.ldap import (
     LDAPAttribute, RecordTypeSchema, LDAPObjectClass
 )
+from twext.who.util import ConstantsContainer
 
-from twisted.python.constants import NamedConstant, ValueConstant
+from twisted.python.constants import Names, NamedConstant, ValueConstant
 from twisted.python.filepath import FilePath
 from twisted.internet.defer import inlineCallbacks
 from twisted.cred.credentials import UsernamePassword
@@ -86,8 +87,41 @@
 )
 
 
+class TestFieldWithChoices(Names):
 
+    none = NamedConstant()
+    none.description = u"none"
+
+    one = NamedConstant()
+    one.description = u"one"
+
+    two = NamedConstant()
+    two.description = u"two"
+
+    three = NamedConstant()
+    three.description = u"three"
+
+
+class TestFieldName(Names):
+    multiChoice = NamedConstant()
+    multiChoice.description = u"Multiple Choice Test Field"
+    multiChoice.valueType = TestFieldWithChoices
+
+    trueFalse = NamedConstant()
+    trueFalse.description = u"Boolean Test Field"
+    trueFalse.valueType = bool
+
+
+
 TEST_FIELDNAME_MAP = dict(DEFAULT_FIELDNAME_ATTRIBUTE_MAP)
+TEST_FIELDNAME_MAP[TestFieldName.multiChoice] = (
+    u"testField:One:one",
+    u"testField:Two:two",
+    u"testField:Three:three",
+)
+TEST_FIELDNAME_MAP[TestFieldName.trueFalse] = (
+    u"foo:active",
+)
 TEST_FIELDNAME_MAP[BaseFieldName.uid] = (u"__who_uid__",)
 
 
@@ -140,7 +174,7 @@
 
 
     def service(self, **kwargs):
-        return TestService(
+        svc = TestService(
             url=self.url,
             baseDN=self.baseDN,
             fieldNameToAttributesMap=TEST_FIELDNAME_MAP,
@@ -171,6 +205,10 @@
             }),
             **kwargs
         )
+        svc.fieldName = ConstantsContainer(
+            (svc.fieldName, TestFieldName)
+        )
+        return svc
 
 
 
@@ -382,6 +420,56 @@
 
 
 
+class RecordsFromReplyTest(BaseTestCase, unittest.TestCase):
+
+    def test_boolean(self):
+        service = self.service()
+        reply = (
+            (
+                "dn",
+                {
+                    "__who_uid__": u"active",
+                    "foo": u"active",
+                }
+            ),
+            (
+                "dn",
+                {
+                    "__who_uid__": u"inactive",
+                    "foo": u"inactive",
+                }
+            ),
+        )
+        records = service._recordsFromReply(reply, recordType=RecordType.user)
+        self.assertTrue(records[0].trueFalse)
+        self.assertFalse(records[1].trueFalse)
+
+
+    def test_multipleChoice(self):
+        service = self.service()
+        reply = (
+            (
+                "dn",
+                {
+                    "__who_uid__": u"two",
+                    "testField": u"Two",
+                }
+            ),
+            (
+                "dn",
+                {
+                    "__who_uid__": u"four",
+                    "testField": u"Four",
+                }
+            ),
+        )
+        records = service._recordsFromReply(reply, recordType=RecordType.user)
+        self.assertEquals(records[0].multiChoice, TestFieldWithChoices.two)
+
+        # "Four" is not a valid value, so it won't get set
+        self.assertFalse(service.fieldName.multiChoice in records[1].fields)
+
+
 def mockDirectoryDataFromXMLService(service):
     dc0 = u"org"
     dc1 = u"calendarserver"
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20141017/c6d83df9/attachment-0001.html>


More information about the calendarserver-changes mailing list