[CalendarServer-changes] [14091] twext/trunk/twext/who/ldap
source_changes at macosforge.org
source_changes at macosforge.org
Fri Oct 17 16:40:28 PDT 2014
Revision: 14091
http://trac.calendarserver.org//changeset/14091
Author: sagen at apple.com
Date: 2014-10-17 16:40:27 -0700 (Fri, 17 Oct 2014)
Log Message:
-----------
Support mapping LDAP attribute values to NamedConstants; adds tests for boolean LDAP attributes
Modified Paths:
--------------
twext/trunk/twext/who/ldap/_service.py
twext/trunk/twext/who/ldap/test/test_service.py
Modified: twext/trunk/twext/who/ldap/_service.py
===================================================================
--- twext/trunk/twext/who/ldap/_service.py 2014-10-17 19:49:34 UTC (rev 14090)
+++ twext/trunk/twext/who/ldap/_service.py 2014-10-17 23:40:27 UTC (rev 14091)
@@ -296,14 +296,21 @@
raise DirectoryConfigurationError("Mapping for uid required")
self._fieldNameToAttributesMap = fieldNameToAttributesMap
- self._attributeToFieldNameMap = reverseDict(
- fieldNameToAttributesMap
- )
+
+ self._attributeToFieldNameMap = {}
+ for name, attributes in fieldNameToAttributesMap.iteritems():
+ for attribute in attributes:
+ if ":" in attribute:
+ attribute, ignored = attribute.split(":", 1)
+ self._attributeToFieldNameMap[attribute] = name
+
self._recordTypeSchemas = recordTypeSchemas
attributesToFetch = set()
for attributes in fieldNameToAttributesMap.values():
for attribute in attributes:
+ if ":" in attribute:
+ attribute, ignored = attribute.split(":", 1)
attributesToFetch.add(attribute.encode("utf-8"))
self._attributesToFetch = list(attributesToFetch)
@@ -751,9 +758,10 @@
fields = {}
for attribute, values in recordData.iteritems():
- fieldNames = self._attributeToFieldNameMap.get(attribute)
+ fieldName = self._attributeToFieldNameMap.get(attribute)
+ attributeRules = self._fieldNameToAttributesMap[fieldName]
- if fieldNames is None:
+ if fieldName is None:
# self.log.debug(
# "Unmapped LDAP attribute {attribute!r} in record "
# "data: {recordData!r}",
@@ -761,52 +769,70 @@
# )
continue
- for fieldName in fieldNames:
- valueType = self.fieldName.valueType(fieldName)
+ valueType = self.fieldName.valueType(fieldName)
- if valueType in (unicode, UUID):
- if not isinstance(values, list):
- values = [values]
+ if valueType in (unicode, UUID):
+ if not isinstance(values, list):
+ values = [values]
- if valueType is unicode:
- newValues = []
- for v in values:
- if isinstance(v, unicode):
- # because the ldap unit test produces
- # unicode values (?)
- newValues.append(v)
- else:
- newValues.append(unicode(v, "utf-8"))
- # newValues = [unicode(v, "utf-8") for v in values]
- else:
- newValues = [valueType(v) for v in values]
+ if valueType is unicode:
+ newValues = []
+ for v in values:
+ if isinstance(v, unicode):
+ # because the ldap unit test produces
+ # unicode values (?)
+ newValues.append(v)
+ else:
+ newValues.append(unicode(v, "utf-8"))
+ # newValues = [unicode(v, "utf-8") for v in values]
+ else:
+ newValues = [valueType(v) for v in values]
- if self.fieldName.isMultiValue(fieldName):
- fields[fieldName] = newValues
- else:
- fields[fieldName] = newValues[0]
+ if self.fieldName.isMultiValue(fieldName):
+ fields[fieldName] = newValues
+ else:
+ fields[fieldName] = newValues[0]
- elif valueType is bool:
- if not isinstance(values, list):
- values = [values]
- if "=" in attribute:
- attribute, trueValue = attribute.split("=")
- else:
- trueValue = "true"
+ elif valueType is bool:
+ if not isinstance(values, list):
+ values = [values]
+ rule = attributeRules[0] # there is only one true value
+ if ":" in rule:
+ ignored, trueValue = rule.split(":")
+ else:
+ trueValue = "true"
+
+ for value in values:
+ if value == trueValue:
+ fields[fieldName] = True
+ break
+ else:
+ fields[fieldName] = False
+
+ elif issubclass(valueType, Names):
+ if not isinstance(values, list):
+ values = [values]
+
+ for rule in attributeRules:
+ attribute, attributeValue, fieldValue = rule.split(":")
+
for value in values:
- if value == trueValue:
- fields[fieldName] = True
+ if value == attributeValue:
+ # convert to a constant
+ try:
+ fieldValue = valueType.lookupByName(fieldValue)
+ fields[fieldName] = fieldValue
+ except ValueError:
+ pass
break
- else:
- fields[fieldName] = False
- else:
- raise LDAPConfigurationError(
- "Unknown value type {0} for field {1}".format(
- valueType, fieldName
- )
+ else:
+ raise LDAPConfigurationError(
+ "Unknown value type {0} for field {1}".format(
+ valueType, fieldName
)
+ )
# Skip any results missing the uid, which is a required field
if self.fieldName.uid not in fields:
Modified: twext/trunk/twext/who/ldap/test/test_service.py
===================================================================
--- twext/trunk/twext/who/ldap/test/test_service.py 2014-10-17 19:49:34 UTC (rev 14090)
+++ twext/trunk/twext/who/ldap/test/test_service.py 2014-10-17 23:40:27 UTC (rev 14091)
@@ -60,8 +60,9 @@
from twext.who.ldap import (
LDAPAttribute, RecordTypeSchema, LDAPObjectClass
)
+from twext.who.util import ConstantsContainer
-from twisted.python.constants import NamedConstant, ValueConstant
+from twisted.python.constants import Names, NamedConstant, ValueConstant
from twisted.python.filepath import FilePath
from twisted.internet.defer import inlineCallbacks
from twisted.cred.credentials import UsernamePassword
@@ -86,8 +87,41 @@
)
+class TestFieldWithChoices(Names):
+ none = NamedConstant()
+ none.description = u"none"
+
+ one = NamedConstant()
+ one.description = u"one"
+
+ two = NamedConstant()
+ two.description = u"two"
+
+ three = NamedConstant()
+ three.description = u"three"
+
+
+class TestFieldName(Names):
+ multiChoice = NamedConstant()
+ multiChoice.description = u"Multiple Choice Test Field"
+ multiChoice.valueType = TestFieldWithChoices
+
+ trueFalse = NamedConstant()
+ trueFalse.description = u"Boolean Test Field"
+ trueFalse.valueType = bool
+
+
+
TEST_FIELDNAME_MAP = dict(DEFAULT_FIELDNAME_ATTRIBUTE_MAP)
+TEST_FIELDNAME_MAP[TestFieldName.multiChoice] = (
+ u"testField:One:one",
+ u"testField:Two:two",
+ u"testField:Three:three",
+)
+TEST_FIELDNAME_MAP[TestFieldName.trueFalse] = (
+ u"foo:active",
+)
TEST_FIELDNAME_MAP[BaseFieldName.uid] = (u"__who_uid__",)
@@ -140,7 +174,7 @@
def service(self, **kwargs):
- return TestService(
+ svc = TestService(
url=self.url,
baseDN=self.baseDN,
fieldNameToAttributesMap=TEST_FIELDNAME_MAP,
@@ -171,6 +205,10 @@
}),
**kwargs
)
+ svc.fieldName = ConstantsContainer(
+ (svc.fieldName, TestFieldName)
+ )
+ return svc
@@ -382,6 +420,56 @@
+class RecordsFromReplyTest(BaseTestCase, unittest.TestCase):
+
+ def test_boolean(self):
+ service = self.service()
+ reply = (
+ (
+ "dn",
+ {
+ "__who_uid__": u"active",
+ "foo": u"active",
+ }
+ ),
+ (
+ "dn",
+ {
+ "__who_uid__": u"inactive",
+ "foo": u"inactive",
+ }
+ ),
+ )
+ records = service._recordsFromReply(reply, recordType=RecordType.user)
+ self.assertTrue(records[0].trueFalse)
+ self.assertFalse(records[1].trueFalse)
+
+
+ def test_multipleChoice(self):
+ service = self.service()
+ reply = (
+ (
+ "dn",
+ {
+ "__who_uid__": u"two",
+ "testField": u"Two",
+ }
+ ),
+ (
+ "dn",
+ {
+ "__who_uid__": u"four",
+ "testField": u"Four",
+ }
+ ),
+ )
+ records = service._recordsFromReply(reply, recordType=RecordType.user)
+ self.assertEquals(records[0].multiChoice, TestFieldWithChoices.two)
+
+ # "Four" is not a valid value, so it won't get set
+ self.assertFalse(service.fieldName.multiChoice in records[1].fields)
+
+
def mockDirectoryDataFromXMLService(service):
dc0 = u"org"
dc1 = u"calendarserver"
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20141017/c6d83df9/attachment-0001.html>
More information about the calendarserver-changes
mailing list