[CalendarServer-changes] [10559] CalendarServer/trunk

source_changes at macosforge.org source_changes at macosforge.org
Mon Jan 28 17:08:37 PST 2013


Revision: 10559
          http://trac.calendarserver.org//changeset/10559
Author:   wsanchez at apple.com
Date:     2013-01-28 17:08:37 -0800 (Mon, 28 Jan 2013)
Log Message:
-----------
Add base directory implementation.

Modified Paths:
--------------
    CalendarServer/trunk/calendarserver/tools/shell/cmd.py
    CalendarServer/trunk/twext/who/idirectory.py

Added Paths:
-----------
    CalendarServer/trunk/twext/who/directory.py
    CalendarServer/trunk/twext/who/test/test_directory.py

Modified: CalendarServer/trunk/calendarserver/tools/shell/cmd.py
===================================================================
--- CalendarServer/trunk/calendarserver/tools/shell/cmd.py	2013-01-28 20:44:40 UTC (rev 10558)
+++ CalendarServer/trunk/calendarserver/tools/shell/cmd.py	2013-01-29 01:08:37 UTC (rev 10559)
@@ -858,3 +858,11 @@
         self.protocol.reloadCommands()
 
     cmd_reload.hidden = "test tool"
+
+    def cmd_xyzzy(self, tokens):
+        """
+        """
+        self.terminal.write("Nothing happens.")
+        self.terminal.nextLine()
+        
+    cmd_sql.hidden = ""

Added: CalendarServer/trunk/twext/who/directory.py
===================================================================
--- CalendarServer/trunk/twext/who/directory.py	                        (rev 0)
+++ CalendarServer/trunk/twext/who/directory.py	2013-01-29 01:08:37 UTC (rev 10559)
@@ -0,0 +1,100 @@
+##
+# Copyright (c) 2006-2013 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Generic directory service base implementation
+"""
+
+__all__ = [
+    "DirectoryService",
+    "DirectoryRecord",
+]
+
+from zope.interface import implements
+
+from twisted.python.util import FancyEqMixin
+
+from twext.who.idirectory import QueryNotSupportedError
+from twext.who.idirectory import FieldName, RecordType
+from twext.who.idirectory import Operand
+from twext.who.idirectory import IDirectoryService, IDirectoryRecord
+
+
+
+class DirectoryService(FancyEqMixin, object):
+    implements(IDirectoryService)
+
+    compareAttributes = (
+        "realmName",
+    )
+
+    RecordTypeClass = RecordType
+    FieldNameClass  = FieldName
+
+    def __init__(self, realmName):
+        self.realmName = realmName
+
+    def recordTypes(self):
+        return self.RecordTypeClass.iterconstants()
+
+    def recordFromQuery(self, expressions, operand=Operand.AND):
+        raise QueryNotSupportedError("")
+
+
+
+class DirectoryRecord(FancyEqMixin, object):
+    implements(IDirectoryRecord)
+
+    requiredFields = (
+        FieldName.recordType,
+        FieldName.uid,
+        FieldName.shortNames,
+    )
+
+    def __init__(self, service, fields):
+        for fieldName in self.requiredFields:
+            if fieldName not in fields or not fields[fieldName]:
+                raise ValueError("%s field is required." % (fieldName,))
+
+            if FieldName.isMultiValue(fieldName):
+                values = fields[fieldName]
+                if len(values) == 0:
+                    raise ValueError("%s field must have at least one value." % (fieldName,))
+                for value in values:
+                    if not value:
+                        raise ValueError("%s field must not be empty." % (fieldName,))
+
+        self.service = service
+        self.fields  = fields
+
+    def __eq__(self, other):
+        if isinstance(self, other.__class__):
+            return (
+                self.service == other.service and
+                self.fields[FieldName.uid] == other.fields[FieldName.uid]
+            )
+        return NotImplemented
+
+    def __getattr__(self, name):
+        try:
+            fieldName = self.service.FieldNameClass.lookupByName(name)
+        except ValueError:
+            raise AttributeError(name)
+
+        try:
+            return self.fields[fieldName]
+        except KeyError:
+            raise AttributeError(name)

Modified: CalendarServer/trunk/twext/who/idirectory.py
===================================================================
--- CalendarServer/trunk/twext/who/idirectory.py	2013-01-28 20:44:40 UTC (rev 10558)
+++ CalendarServer/trunk/twext/who/idirectory.py	2013-01-29 01:08:37 UTC (rev 10559)
@@ -25,7 +25,7 @@
     "MatchType",
     "Operand",
     "QueryFlags",
-    "IDirectoryQueryMatchExpression",
+    "DirectoryQueryMatchExpression",
     "IDirectoryService",
     "IDirectoryRecord",
 ]
@@ -54,9 +54,13 @@
 
 
 ##
-# Constants
+# Data Types
 ##
 
+class RecordType(Names):
+    user  = NamedConstant()
+    group = NamedConstant()
+
 class FieldName(Names):
     """
     Constants for common field names.
@@ -67,6 +71,13 @@
     guid           = NamedConstant()
     emailAddresses = NamedConstant()
 
+    shortNames.multiValue     = True
+    emailAddresses.multiValue = True
+
+    @classmethod
+    def isMultiValue(cls, name):
+        return hasattr(name, "multiValue") and name.multiValue
+
 class MatchType(Names):
     """
     Query match types.
@@ -85,23 +96,28 @@
     """
     caseInsensitive = FlagConstant()
 
+class DirectoryQueryMatchExpression(object):
+    """
+    Directory query.
 
+    @ivar fieldName: a L{FieldName}
+    @ivar fieldValue: a text value to match
+    @ivar matchType: a L{MatchType}
+    @ivar flags: l{QueryFlags}
+    """
 
+    def __init__(self, fieldName, fieldValue, matchType=MatchType.equals, flags=None):
+        self.fieldName  = fieldName
+        self.fieldValue = fieldValue
+        self.matchType  = matchType
+        self.flags      = flags
+
+
+
 ##
 # Interfaces
 ##
 
-class IDirectoryQueryMatchExpression(Interface):
-    """
-    Directory query.
-    """
-    fieldName  = Attribute("A L{FieldName}.")
-    fieldValue = Attribute("A text value to match.")
-    matchType  = Attribute("A L{MatchType}.")
-    flags      = Attribute("L{QueryFlags}.")
-
-
-
 class IDirectoryService(Interface):
     """
     Directory service.

Added: CalendarServer/trunk/twext/who/test/test_directory.py
===================================================================
--- CalendarServer/trunk/twext/who/test/test_directory.py	                        (rev 0)
+++ CalendarServer/trunk/twext/who/test/test_directory.py	2013-01-29 01:08:37 UTC (rev 10559)
@@ -0,0 +1,161 @@
+##
+# Copyright (c) 2013 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+"""
+Generic directory service base implementation tests
+"""
+
+from zope.interface.verify import verifyObject, BrokenMethodImplementation
+
+from twisted.trial import unittest
+
+from twext.who.idirectory import IDirectoryService, IDirectoryRecord
+from twext.who.idirectory import RecordType, FieldName
+from twext.who.directory import DirectoryService, DirectoryRecord
+
+
+
+class DirectoryServiceTest(unittest.TestCase):
+    realmName = "xyzzy"
+
+    def _testService(self):
+        return DirectoryService(self.realmName)
+
+    def test_interface(self):
+        service = self._testService()
+        try:
+            verifyObject(IDirectoryService, service)
+        except BrokenMethodImplementation, e:
+            self.fail(e)
+
+    def test_init(self):
+        service = self._testService()
+        self.assertEquals(service.realmName, self.realmName)
+
+    def test_recordTypes(self):
+        service = self._testService()
+        self.assertEquals(
+            set(service.recordTypes()),
+            set(service.RecordTypeClass.iterconstants())
+        )
+
+    def test_recordFromQuery(self):
+        raise NotImplementedError()
+
+    test_recordFromQuery.todo = "Not implemented."
+
+
+
+class DirectoryRecordTest(unittest.TestCase):
+    realmName = "xyzzy"
+
+    fields_wsanchez = {
+        FieldName.recordType:     RecordType.user,
+        FieldName.shortNames:     ("wsanchez",),
+        FieldName.uid:            "wsanchez",
+        FieldName.emailAddresses: ("wsanchez at calendarserver.org", "wsanchez at example.com")
+    }
+
+    fields_glyph = {
+        FieldName.recordType:     RecordType.user,
+        FieldName.shortNames:     ("glyph",),
+        FieldName.uid:            "glyph",
+        FieldName.emailAddresses: ("glyph at calendarserver.org", "glyph at example.com")
+    }
+
+    def _testService(self):
+        if not hasattr(self, "_service"):
+            self._service = DirectoryService(self.realmName)
+        return self._service
+
+    def _testRecord(self, fields=None, service=None):
+        if fields is None:
+            fields = self.fields_wsanchez
+        if service is None:
+            service = self._testService()
+        return DirectoryRecord(service, fields)
+
+    def test_interface(self):
+        record = self._testRecord()
+        try:
+            verifyObject(IDirectoryRecord, record)
+        except BrokenMethodImplementation, e:
+            self.fail(e)
+
+    def test_init(self):
+        service  = self._testService()
+        wsanchez = self._testRecord(self.fields_wsanchez)
+
+        self.assertEquals(wsanchez.service, service)
+        self.assertEquals(wsanchez.fields , self.fields_wsanchez)
+
+    def test_initWithNoUID(self):
+        fields = self.fields_wsanchez.copy()
+        del fields[FieldName.uid]
+        self.assertRaises(ValueError, self._testRecord, fields)
+
+        fields = self.fields_wsanchez.copy()
+        fields[FieldName.uid] = ""
+        self.assertRaises(ValueError, self._testRecord, fields)
+
+    def test_initWithNoRecordType(self):
+        fields = self.fields_wsanchez.copy()
+        del fields[FieldName.recordType]
+        self.assertRaises(ValueError, self._testRecord, fields)
+
+        fields = self.fields_wsanchez.copy()
+        fields[FieldName.recordType] = ""
+        self.assertRaises(ValueError, self._testRecord, fields)
+
+    def test_initWithNoShortNames(self):
+        fields = self.fields_wsanchez.copy()
+        del fields[FieldName.shortNames]
+        self.assertRaises(ValueError, self._testRecord, fields)
+
+        fields = self.fields_wsanchez.copy()
+        fields[FieldName.shortNames] = ()
+        self.assertRaises(ValueError, self._testRecord, fields)
+
+        fields = self.fields_wsanchez.copy()
+        fields[FieldName.shortNames] = ("",)
+        self.assertRaises(ValueError, self._testRecord, fields)
+
+        fields = self.fields_wsanchez.copy()
+        fields[FieldName.shortNames] = ("wsanchez", "")
+        self.assertRaises(ValueError, self._testRecord, fields)
+
+    def test_compare(self):
+        fields_glyphmod = self.fields_glyph.copy()
+        del fields_glyphmod[FieldName.emailAddresses]
+
+        wsanchez    = self._testRecord(self.fields_wsanchez)
+        wsanchezmod = self._testRecord(self.fields_wsanchez, DirectoryService("plugh"))
+        glyph       = self._testRecord(self.fields_glyph)
+        glyphmod    = self._testRecord(fields_glyphmod)
+
+        self.assertEquals(wsanchez, wsanchez)
+        self.assertNotEqual(wsanchez, glyph)
+        self.assertEquals(glyph, glyphmod) # UID matches
+        self.assertNotEqual(glyphmod, wsanchez)
+        self.assertNotEqual(wsanchez, wsanchezmod) # Different service
+
+    def test_attributeAccess(self):
+        wsanchez = self._testRecord(self.fields_wsanchez)
+
+        self.assertEquals(wsanchez.recordType    , wsanchez.fields[FieldName.recordType    ])
+        self.assertEquals(wsanchez.uid           , wsanchez.fields[FieldName.uid           ])
+        self.assertEquals(wsanchez.shortNames    , wsanchez.fields[FieldName.shortNames    ])
+        self.assertEquals(wsanchez.emailAddresses, wsanchez.fields[FieldName.emailAddresses])
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20130128/2cef2bd0/attachment-0001.html>


More information about the calendarserver-changes mailing list