[CalendarServer-changes] [13857] twext/trunk/twext/who

source_changes at macosforge.org source_changes at macosforge.org
Fri Aug 8 12:36:13 PDT 2014


Revision: 13857
          http://trac.calendarserver.org//changeset/13857
Author:   cdaboo at apple.com
Date:     2014-08-08 12:36:13 -0700 (Fri, 08 Aug 2014)
Log Message:
-----------
Use a thread pool for LDAP connections to avoid internal python-ldap locking issues when doing concurrent LDAP requests.

Modified Paths:
--------------
    twext/trunk/twext/who/ldap/_service.py

Added Paths:
-----------
    twext/trunk/twext/who/test/test_concurrency.py

Modified: twext/trunk/twext/who/ldap/_service.py
===================================================================
--- twext/trunk/twext/who/ldap/_service.py	2014-08-08 19:33:37 UTC (rev 13856)
+++ twext/trunk/twext/who/ldap/_service.py	2014-08-08 19:36:13 UTC (rev 13857)
@@ -21,14 +21,19 @@
 LDAP directory service implementation.
 """
 
+from Queue import Queue, Empty
+from threading import RLock
 from uuid import UUID
 
+import collections
 import ldap
 
 from twisted.python.constants import Names, NamedConstant
 from twisted.internet.defer import succeed, inlineCallbacks, returnValue
-from twisted.internet.threads import deferToThread
+from twisted.internet.threads import deferToThreadPool
 from twisted.cred.credentials import IUsernamePassword
+from twisted.python.threadpool import ThreadPool
+from twisted.internet import reactor
 
 from twext.python.log import Logger
 from twext.python.types import MappingProxyType
@@ -101,7 +106,7 @@
 
 
 #
-# Data type extentions
+# Data type extensions
 #
 
 class FieldName(Names):
@@ -219,6 +224,9 @@
         useTLS=False,
         fieldNameToAttributesMap=DEFAULT_FIELDNAME_ATTRIBUTE_MAP,
         recordTypeSchemas=DEFAULT_RECORDTYPE_SCHEMAS,
+        ownThreadpool=True,
+        threadPoolMax=10,
+        connectionMax=10,
         _debug=False,
     ):
         """
@@ -294,19 +302,133 @@
                 attributesToFetch.add(attribute.encode("utf-8"))
         self._attributesToFetch = list(attributesToFetch)
 
+        # Threaded connection pool. The connection size limit here is the size for connections doing queries.
+        # There will also be one-off connections for authentications which also run in their own threads. Thus
+        # the threadpool max ought to be larger than the connection max to allow for both pooled query connections
+        # and one-off auth-only connections.
 
+        self.ownThreadpool = ownThreadpool
+        if self.ownThreadpool:
+            self.threadpool = ThreadPool(minthreads=1, maxthreads=threadPoolMax, name="LDAPDirectoryService")
+        else:
+            # Use the default threadpool but adjust its size to fit our needs
+            self.threadpool = reactor.getThreadPool()
+            self.threadpool.adjustPoolsize(
+                max(threadPoolMax, self.threadpool.max)
+            )
+        self.connectionMax = connectionMax
+        self.connectionCreateLock = RLock()
+        self.connections = []
+        self.connectionQueue = Queue()
+        self.poolStats = collections.defaultdict(int)
+        self.activeCount = 0
+
+        reactor.callWhenRunning(self.start)
+        reactor.addSystemEventTrigger('during', 'shutdown', self.stop)
+
+
+    def start(self):
+        """
+        Start up this service. Initialize the threadpool (if we own it).
+        """
+        if self.ownThreadpool:
+            self.threadpool.start()
+
+
+    def stop(self):
+        """
+        Stop the service. Stop the threadpool if we own it and do other clean-up.
+        """
+        if self.ownThreadpool:
+            self.threadpool.stop()
+
+        # FIXME: we should probably also close the pool of active connections too
+
+
     @property
     def realmName(self):
         return u"{self.url}".format(self=self)
 
 
-    @inlineCallbacks
+    class Connection(object):
+        """
+        ContextManager object for getting a connection from the pool. On exit the connection
+        will be put back in the pool if no exception was raised. Otherwise, the connection will be
+        removed from the active connection list, which will allow a new "clean" connection to
+        be created later if needed.
+        """
+
+        def __init__(self, ds):
+            self.ds = ds
+
+        def __enter__(self):
+            self.connection = self.ds._getConnection()
+            return self.connection
+
+        def __exit__(self, exc_type, exc_val, exc_tb):
+            if exc_type is None:
+                self.ds._returnConnection(self.connection)
+                return True
+            else:
+                self.ds._failedConnection(self.connection)
+                return False
+
+
+    def _getConnection(self):
+        """
+        Get a connection from the connection pool. This will retrieve a connection from the connection
+        pool L{Queue} object. If the L{Queue} is empty, it will check to see whether a new connection can
+        be created (based on the connection limit), and if so create that and use it. If no new
+        connections can be created, it will block on the L{Queue} until an existing, in-use, connection
+        is put back.
+        """
+        try:
+            connection = self.connectionQueue.get(block=False)
+        except Empty:
+            # Note we use a lock here to prevent a race condition in which multiple requests for a new connection
+            # could succeed even though the connection counts starts out one less than the maximum. This can happen
+            # because self._connect() can take a while.
+            self.connectionCreateLock.acquire()
+            if len(self.connections) < self.connectionMax:
+                connection = self._connect()
+                self.connections.append(connection)
+                self.connectionCreateLock.release()
+            else:
+                self.connectionCreateLock.release()
+                self.poolStats["connection-blocked"] += 1
+                connection = self.connectionQueue.get()
+
+        self.poolStats["connection-{}".format(self.connections.index(connection))] += 1
+        self.activeCount += 1
+        self.poolStats["connection-max"] = max(self.poolStats["connection-max"], self.activeCount)
+        return connection
+
+
+    def _returnConnection(self, connection):
+        """
+        A connection is no longer needed - return it to the pool.
+        """
+        self.activeCount -= 1
+        self.connectionQueue.put(connection)
+
+
+    def _failedConnection(self, connection):
+        """
+        A connection has failed - remove it from the list of active connections. A new
+        one will be created if needed.
+        """
+        self.activeCount -= 1
+        self.poolStats["connection-errors"] += 1
+        self.connections.remove(connection)
+
+
     def _connect(self):
         """
         Connect to the directory server.
+        This will always be called in a thread to prevent blocking.
 
-        @returns: A deferred connection object.
-        @rtype: deferred L{ldap.ldapobject.LDAPObject}
+        @returns: The connection object.
+        @rtype: L{ldap.ldapobject.LDAPObject}
 
         @raises: L{LDAPConnectionError} if unable to connect.
         """
@@ -314,75 +436,54 @@
         # FIXME: ldap connection objects are not thread safe, so let's set up
         # a connection pool
 
-        if not hasattr(self, "_connection"):
-            self.log.debug("Connecting to LDAP at {log_source.url}")
-            connection = ldap.initialize(self.url)
+        self.log.debug("Connecting to LDAP at {log_source.url}")
+        connection = self._newConnection()
 
-            # FIXME: Use trace_file option to wire up debug logging when
-            # Twisted adopts the new logging stuff.
-
-            for option, value in (
-                (ldap.OPT_TIMEOUT, self._timeout),
-                (ldap.OPT_X_TLS_CACERTFILE, self._tlsCACertificateFile),
-                (ldap.OPT_X_TLS_CACERTDIR, self._tlsCACertificateDirectory),
-                (ldap.OPT_DEBUG_LEVEL, self._debug),
-            ):
-                if value is not None:
-                    connection.set_option(option, value)
-
-            if self._useTLS:
-                self.log.debug("Starting TLS for {log_source.url}")
-                yield deferToThread(connection.start_tls_s)
-
-            if self._credentials is not None:
-                if IUsernamePassword.providedBy(self._credentials):
-                    try:
-                        yield deferToThread(
-                            connection.simple_bind_s,
-                            self._credentials.username,
-                            self._credentials.password,
-                        )
-                        self.log.debug(
-                            "Bound to LDAP as {credentials.username}",
-                            credentials=self._credentials
-                        )
-                    except (
-                        ldap.INVALID_CREDENTIALS, ldap.INVALID_DN_SYNTAX
-                    ) as e:
-                        self.log.error(
-                            "Unable to bind to LDAP as {credentials.username}",
-                            credentials=self._credentials
-                        )
-                        raise LDAPBindAuthError(
-                            self._credentials.username, e
-                        )
-
-                else:
-                    raise LDAPConnectionError(
-                        "Unknown credentials type: {0}"
-                        .format(self._credentials)
+        if self._credentials is not None:
+            if IUsernamePassword.providedBy(self._credentials):
+                try:
+                    connection.simple_bind_s(
+                        self._credentials.username,
+                        self._credentials.password,
                     )
+                    self.log.debug(
+                        "Bound to LDAP as {credentials.username}",
+                        credentials=self._credentials
+                    )
+                except (
+                    ldap.INVALID_CREDENTIALS, ldap.INVALID_DN_SYNTAX
+                ) as e:
+                    self.log.error(
+                        "Unable to bind to LDAP as {credentials.username}",
+                        credentials=self._credentials
+                    )
+                    raise LDAPBindAuthError(
+                        self._credentials.username, e
+                    )
 
-            self._connection = connection
+            else:
+                raise LDAPConnectionError(
+                    "Unknown credentials type: {0}"
+                    .format(self._credentials)
+                )
 
-        returnValue(self._connection)
+        return connection
 
 
-    @inlineCallbacks
-    def _authenticateUsernamePassword(self, dn, password):
+    def _newConnection(self):
         """
-        Open a secondary connection to the LDAP server and try binding to it
-        with the given credentials
+        Create a new LDAP connection and initialize and start TLS if required.
+        This will always be called in a thread to prevent blocking.
 
-        @returns: True if the password is correct, False otherwise
-        @rtype: deferred C{bool}
+        @returns: The connection object.
+        @rtype: L{ldap.ldapobject.LDAPObject}
 
         @raises: L{LDAPConnectionError} if unable to connect.
         """
-        self.log.debug("Authenticating {dn}", dn=dn)
         connection = ldap.initialize(self.url)
 
-        # FIXME:  Use a separate connection pool perhaps
+        # FIXME: Use trace_file option to wire up debug logging when
+        # Twisted adopts the new logging stuff.
 
         for option, value in (
             (ldap.OPT_TIMEOUT, self._timeout),
@@ -395,102 +496,158 @@
 
         if self._useTLS:
             self.log.debug("Starting TLS for {log_source.url}")
-            yield deferToThread(connection.start_tls_s)
+            connection.start_tls_s()
 
+        return connection
+
+
+    def _authenticateUsernamePassword(self, dn, password):
+        """
+        Open a secondary connection to the LDAP server and try binding to it
+        with the given credentials
+
+        @returns: True if the password is correct, False otherwise
+        @rtype: deferred C{bool}
+
+        @raises: L{LDAPConnectionError} if unable to connect.
+        """
+        return deferToThreadPool(
+            reactor, self.threadpool,
+            self._authenticateUsernamePassword_inThread, dn, password
+        )
+
+
+    def _authenticateUsernamePassword_inThread(self, dn, password):
+        """
+        Open a secondary connection to the LDAP server and try binding to it
+        with the given credentials.
+        This method is always called in a thread.
+
+        @returns: True if the password is correct, False otherwise
+        @rtype: C{bool}
+
+        @raises: L{LDAPConnectionError} if unable to connect.
+        """
+        self.log.debug("Authenticating {dn}", dn=dn)
+        connection = self._newConnection()
+
+
         try:
-            yield deferToThread(
-                connection.simple_bind_s,
-                dn,
-                password,
-            )
+            connection.simple_bind_s(dn, password)
             self.log.debug("Authenticated {dn}", dn=dn)
-            returnValue(True)
+            return True
         except (
             ldap.INVALID_CREDENTIALS, ldap.INVALID_DN_SYNTAX
         ):
             self.log.debug("Unable to authenticate {dn}", dn=dn)
-            returnValue(False)
+            return False
+        finally:
+            # TODO: should we explicitly "close" the connection in a finally
+            # clause rather than just let it go out of scope and be garbage collected
+            # at some indeterminate point in the future? Up side is that we won't hang
+            # on to the connection or other resources for longer than needed. Down side
+            # is we will take a little bit of extra time in this call to close it down.
+            # If we do decide to "close" then we probably have to use one of the "unbind"
+            # methods on the L{LDAPObject}.
+            connection = None
 
 
-    @inlineCallbacks
     def _recordsFromQueryString(self, queryString, recordTypes=None):
+        return deferToThreadPool(
+            reactor, self.threadpool,
+            self._recordsFromQueryString_inThread,
+            queryString,
+            recordTypes,
+        )
+
+
+    def _recordsFromQueryString_inThread(self, queryString, recordTypes=None):
+        """
+        This method is always called in a thread.
+        """
         records = []
 
-        connection = yield self._connect()
+        with DirectoryService.Connection(self) as connection:
 
-        if recordTypes is None:
-            recordTypes = self.recordTypes()
+            if recordTypes is None:
+                recordTypes = self.recordTypes()
 
-        for recordType in recordTypes:
-            rdn = self._recordTypeSchemas[recordType].relativeDN
-            rdn = (
-                ldap.dn.str2dn(rdn.lower()) +
-                ldap.dn.str2dn(self._baseDN.lower())
-            )
-            self.log.debug(
-                "Performing LDAP query: {rdn} {query} {recordType}",
-                rdn=rdn,
-                query=queryString,
-                recordType=recordType
-            )
-            try:
-                reply = yield deferToThread(
-                    connection.search_s,
-                    ldap.dn.dn2str(rdn),
-                    ldap.SCOPE_SUBTREE,
-                    queryString,
-                    attrlist=self._attributesToFetch
+            for recordType in recordTypes:
+                rdn = self._recordTypeSchemas[recordType].relativeDN
+                rdn = (
+                    ldap.dn.str2dn(rdn.lower()) +
+                    ldap.dn.str2dn(self._baseDN.lower())
                 )
+                self.log.debug(
+                    "Performing LDAP query: {rdn} {query} {recordType}",
+                    rdn=rdn,
+                    query=queryString,
+                    recordType=recordType
+                )
+                try:
+                    reply = connection.search_s(
+                        ldap.dn.dn2str(rdn),
+                        ldap.SCOPE_SUBTREE,
+                        queryString,
+                        attrlist=self._attributesToFetch
+                    )
 
-            except ldap.FILTER_ERROR as e:
-                self.log.error(
-                    "Unable to perform query {0!r}: {1}"
-                    .format(queryString, e)
+                except ldap.FILTER_ERROR as e:
+                    self.log.error(
+                        "Unable to perform query {0!r}: {1}"
+                        .format(queryString, e)
+                    )
+                    raise LDAPQueryError("Unable to perform query", e)
+
+                except ldap.NO_SUCH_OBJECT as e:
+                    self.log.warn("RDN {rdn} does not exist, skipping", rdn=rdn)
+                    continue
+
+                records.extend(
+                    self._recordsFromReply(reply, recordType=recordType)
                 )
-                raise LDAPQueryError("Unable to perform query", e)
 
-            except ldap.NO_SUCH_OBJECT as e:
-                self.log.warn("RDN {rdn} does not exist, skipping", rdn=rdn)
-                continue
+        return records
 
-            records.extend(
-                (yield self._recordsFromReply(reply, recordType=recordType))
-            )
-        returnValue(records)
 
+    def _recordWithDN(self, dn):
+        return deferToThreadPool(
+            reactor, self.threadpool,
+            self._recordWithDN_inThread, dn
+        )
 
-    @inlineCallbacks
-    def _recordWithDN(self, dn):
+
+    def _recordWithDN_inThread(self, dn):
         """
+        This method is always called in a thread.
+
         @param dn: The DN of the record to search for
         @type dn: C{str}
         """
-        connection = yield self._connect()
+        with DirectoryService.Connection(self) as connection:
 
-        self.log.debug("Performing LDAP DN query: {dn}", dn=dn)
+            self.log.debug("Performing LDAP DN query: {dn}", dn=dn)
 
-        reply = yield deferToThread(
-            connection.search_s,
-            dn,
-            ldap.SCOPE_SUBTREE,
-            "(objectClass=*)",
-            attrlist=self._attributesToFetch
-        )
-        records = self._recordsFromReply(reply)
+            reply = connection.search_s(
+                dn,
+                ldap.SCOPE_SUBTREE,
+                "(objectClass=*)",
+                attrlist=self._attributesToFetch
+            )
+            records = self._recordsFromReply(reply)
+
         if len(records):
-            returnValue(records[0])
+            return records[0]
         else:
-            returnValue(None)
+            return None
 
 
     def _recordsFromReply(self, reply, recordType=None):
         records = []
 
-
         for dn, recordData in reply:
 
             # Determine the record type
-
             if recordType is None:
                 recordType = recordTypeForDN(
                     self._baseDN, self._recordTypeSchemas, dn
@@ -510,7 +667,6 @@
                 continue
 
             # Populate a fields dictionary
-
             fields = {}
 
             for attribute, values in recordData.iteritems():

Added: twext/trunk/twext/who/test/test_concurrency.py
===================================================================
--- twext/trunk/twext/who/test/test_concurrency.py	                        (rev 0)
+++ twext/trunk/twext/who/test/test_concurrency.py	2014-08-08 19:36:13 UTC (rev 13857)
@@ -0,0 +1,203 @@
+##
+# Copyright (c) 2013-2014 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from __future__ import print_function
+from twext.python.types import MappingProxyType
+from twext.who.directory import DirectoryRecord
+from twext.who.idirectory import RecordType, FieldName
+from twext.who.ldap._constants import LDAPAttribute, LDAPObjectClass
+from twext.who.ldap._service import DirectoryService as LDAPDirectoryService, \
+    DEFAULT_FIELDNAME_ATTRIBUTE_MAP, RecordTypeSchema
+from twext.who.opendirectory._service import DirectoryService as ODDirectoryService
+from twisted.internet.defer import inlineCallbacks, DeferredList, returnValue
+from twisted.trial import unittest
+import time
+
+"""
+Test the concurrency of DS implementations against real servers.
+"""
+
+TEST_FIELDNAME_MAP = dict(DEFAULT_FIELDNAME_ATTRIBUTE_MAP)
+TEST_FIELDNAME_MAP[FieldName.uid] = (u"uid",)
+
+TEST_RECORDTYPE_SCHEMAS_OSX = MappingProxyType({
+
+    RecordType.user: RecordTypeSchema(
+        # cn=users
+        relativeDN=u"cn={0}".format("users"),
+
+        # (objectClass=inetOrgPerson)
+        attributes=(
+            (
+                LDAPAttribute.objectClass.value,
+                LDAPObjectClass.inetOrgPerson.value,
+            ),
+        ),
+    ),
+
+    RecordType.group: RecordTypeSchema(
+        # cn=groups
+        relativeDN=u"cn={0}".format("groups"),
+
+        # (objectClass=groupOfNames)
+        attributes=(
+            (
+                LDAPAttribute.objectClass.value,
+                LDAPObjectClass.groupOfNames.value,
+            ),
+        ),
+    ),
+
+})
+
+TEST_RECORDTYPE_SCHEMAS_OTHER = MappingProxyType({
+
+    RecordType.user: RecordTypeSchema(
+        # ou=person
+        relativeDN=u"ou={0}".format("people"),
+
+        # (objectClass=inetOrgPerson)
+        attributes=(
+            (
+                LDAPAttribute.objectClass.value,
+                LDAPObjectClass.inetOrgPerson.value,
+            ),
+        ),
+    ),
+
+    RecordType.group: RecordTypeSchema(
+        # ou=groupOfNames
+        relativeDN=u"ou={0}".format(LDAPObjectClass.groupOfNames.value),
+
+        # (objectClass=groupOfNames)
+        attributes=(
+            (
+                LDAPAttribute.objectClass.value,
+                LDAPObjectClass.groupOfNames.value,
+            ),
+        ),
+    ),
+
+})
+
+class DirectoryServiceConcurrencyTest(unittest.TestCase):
+    """
+    Tests for directory records.
+    """
+
+    @inlineCallbacks
+    def _runTest(self, num_threads, multiple_services, service_maker, details, num_requests, do_auth):
+
+        if multiple_services:
+            services = [service_maker() for _ in range(num_threads)]
+        else:
+            services = [service_maker()] * num_threads
+
+        # Warm up each service before starting timer
+        for n, svc in enumerate(services):
+            record = yield svc.recordWithShortName(RecordType.user, details["user"].format(n + 1))
+            self.assertTrue(isinstance(record, DirectoryRecord))
+
+        start = time.time()
+        ctr = [0]
+
+        @inlineCallbacks
+        def _records(n):
+            for _ in range(num_requests):
+                record = yield services[n].recordWithShortName(RecordType.user, details["user"].format(n + 1))
+                self.assertTrue(isinstance(record, DirectoryRecord))
+                ctr[0] += 1
+
+        @inlineCallbacks
+        def _auth(n):
+            record = yield services[n].recordWithShortName(RecordType.user, details["user"].format(n + 1))
+            for _ in range(num_requests):
+                result = yield record.verifyPlaintextPassword(details["pswd"].format(n + 1))
+                self.assertTrue(result)
+                ctr[0] += 1
+
+        dl = []
+        for i in range(num_threads):
+            dl.append((_auth if do_auth else _records)(i))
+
+        dl = DeferredList(dl)
+        yield dl
+
+        returnValue((time.time() - start, ctr[0],))
+
+
+    @inlineCallbacks
+    def test_ldap_multi_service(self):
+        """
+        See if {ldap._service.DirectoryService is concurrent.
+        """
+
+        num_threads = 20
+        multiple_services = False
+        num_requests = 100
+        do_auth = False
+        use_od = False
+        configChoice = "local"
+
+        configs = {
+            "local": {
+                "url": "ldap://localhost",
+                "baseDN": "dc=example,dc=com",
+                "rschema": TEST_RECORDTYPE_SCHEMAS_OSX,
+                "user": u"user{:02d}",
+                "pswd": u"user{:02d}",
+            },
+            "example": {
+                "url": "ldap://example.com",
+                "baseDN": "o=example.com,o=email",
+                "rschema": TEST_RECORDTYPE_SCHEMAS_OTHER,
+                "user": u"TestAccount{}",
+                "pswd": u"pswd",
+            },
+        }
+
+        details = configs[configChoice]
+
+        def _serviceMaker():
+            if use_od:
+                return ODDirectoryService(
+                    nodeName="/LDAPv3/127.0.0.1",
+                )
+            else:
+                return LDAPDirectoryService(
+                    url=details["url"],
+                    baseDN=details["baseDN"],
+                    fieldNameToAttributesMap=TEST_FIELDNAME_MAP,
+                    recordTypeSchemas=details["rschema"],
+                    threadPoolMax=20,
+                )
+
+
+        duration, count = yield self._runTest(num_threads, multiple_services, _serviceMaker, details, num_requests, do_auth)
+
+        print(
+            "\n\nType: {} {} {}\nNumber of Services/Requests: {}/{}\nTime: {}\nCount: {}\n".format(
+                "OD" if use_od else "LDAP",
+                "Multiple" if multiple_services else "Single",
+                "Auth" if do_auth else "query",
+                num_threads,
+                num_requests,
+                duration,
+                count,
+            )
+        )
+
+    test_ldap_multi_service.skip = "Not really a unit test - requires actually server to work"
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140808/9c8c812f/attachment-0001.html>


More information about the calendarserver-changes mailing list