[CalendarServer-changes] [13857] twext/trunk/twext/who
source_changes at macosforge.org
source_changes at macosforge.org
Fri Aug 8 12:36:13 PDT 2014
Revision: 13857
http://trac.calendarserver.org//changeset/13857
Author: cdaboo at apple.com
Date: 2014-08-08 12:36:13 -0700 (Fri, 08 Aug 2014)
Log Message:
-----------
Use a thread pool for LDAP connections to avoid internal python-ldap locking issues when doing concurrent LDAP requests.
Modified Paths:
--------------
twext/trunk/twext/who/ldap/_service.py
Added Paths:
-----------
twext/trunk/twext/who/test/test_concurrency.py
Modified: twext/trunk/twext/who/ldap/_service.py
===================================================================
--- twext/trunk/twext/who/ldap/_service.py 2014-08-08 19:33:37 UTC (rev 13856)
+++ twext/trunk/twext/who/ldap/_service.py 2014-08-08 19:36:13 UTC (rev 13857)
@@ -21,14 +21,19 @@
LDAP directory service implementation.
"""
+from Queue import Queue, Empty
+from threading import RLock
from uuid import UUID
+import collections
import ldap
from twisted.python.constants import Names, NamedConstant
from twisted.internet.defer import succeed, inlineCallbacks, returnValue
-from twisted.internet.threads import deferToThread
+from twisted.internet.threads import deferToThreadPool
from twisted.cred.credentials import IUsernamePassword
+from twisted.python.threadpool import ThreadPool
+from twisted.internet import reactor
from twext.python.log import Logger
from twext.python.types import MappingProxyType
@@ -101,7 +106,7 @@
#
-# Data type extentions
+# Data type extensions
#
class FieldName(Names):
@@ -219,6 +224,9 @@
useTLS=False,
fieldNameToAttributesMap=DEFAULT_FIELDNAME_ATTRIBUTE_MAP,
recordTypeSchemas=DEFAULT_RECORDTYPE_SCHEMAS,
+ ownThreadpool=True,
+ threadPoolMax=10,
+ connectionMax=10,
_debug=False,
):
"""
@@ -294,19 +302,133 @@
attributesToFetch.add(attribute.encode("utf-8"))
self._attributesToFetch = list(attributesToFetch)
+ # Threaded connection pool. The connection size limit here is the size for connections doing queries.
+ # There will also be one-off connections for authentications which also run in their own threads. Thus
+ # the threadpool max ought to be larger than the connection max to allow for both pooled query connections
+ # and one-off auth-only connections.
+ self.ownThreadpool = ownThreadpool
+ if self.ownThreadpool:
+ self.threadpool = ThreadPool(minthreads=1, maxthreads=threadPoolMax, name="LDAPDirectoryService")
+ else:
+ # Use the default threadpool but adjust its size to fit our needs
+ self.threadpool = reactor.getThreadPool()
+ self.threadpool.adjustPoolsize(
+ max(threadPoolMax, self.threadpool.max)
+ )
+ self.connectionMax = connectionMax
+ self.connectionCreateLock = RLock()
+ self.connections = []
+ self.connectionQueue = Queue()
+ self.poolStats = collections.defaultdict(int)
+ self.activeCount = 0
+
+ reactor.callWhenRunning(self.start)
+ reactor.addSystemEventTrigger('during', 'shutdown', self.stop)
+
+
+ def start(self):
+ """
+ Start up this service. Initialize the threadpool (if we own it).
+ """
+ if self.ownThreadpool:
+ self.threadpool.start()
+
+
+ def stop(self):
+ """
+ Stop the service. Stop the threadpool if we own it and do other clean-up.
+ """
+ if self.ownThreadpool:
+ self.threadpool.stop()
+
+ # FIXME: we should probably also close the pool of active connections too
+
+
@property
def realmName(self):
return u"{self.url}".format(self=self)
- @inlineCallbacks
+ class Connection(object):
+ """
+ ContextManager object for getting a connection from the pool. On exit the connection
+ will be put back in the pool if no exception was raised. Otherwise, the connection will be
+ removed from the active connection list, which will allow a new "clean" connection to
+ be created later if needed.
+ """
+
+ def __init__(self, ds):
+ self.ds = ds
+
+ def __enter__(self):
+ self.connection = self.ds._getConnection()
+ return self.connection
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if exc_type is None:
+ self.ds._returnConnection(self.connection)
+ return True
+ else:
+ self.ds._failedConnection(self.connection)
+ return False
+
+
+ def _getConnection(self):
+ """
+ Get a connection from the connection pool. This will retrieve a connection from the connection
+ pool L{Queue} object. If the L{Queue} is empty, it will check to see whether a new connection can
+ be created (based on the connection limit), and if so create that and use it. If no new
+ connections can be created, it will block on the L{Queue} until an existing, in-use, connection
+ is put back.
+ """
+ try:
+ connection = self.connectionQueue.get(block=False)
+ except Empty:
+ # Note we use a lock here to prevent a race condition in which multiple requests for a new connection
+ # could succeed even though the connection counts starts out one less than the maximum. This can happen
+ # because self._connect() can take a while.
+ self.connectionCreateLock.acquire()
+ if len(self.connections) < self.connectionMax:
+ connection = self._connect()
+ self.connections.append(connection)
+ self.connectionCreateLock.release()
+ else:
+ self.connectionCreateLock.release()
+ self.poolStats["connection-blocked"] += 1
+ connection = self.connectionQueue.get()
+
+ self.poolStats["connection-{}".format(self.connections.index(connection))] += 1
+ self.activeCount += 1
+ self.poolStats["connection-max"] = max(self.poolStats["connection-max"], self.activeCount)
+ return connection
+
+
+ def _returnConnection(self, connection):
+ """
+ A connection is no longer needed - return it to the pool.
+ """
+ self.activeCount -= 1
+ self.connectionQueue.put(connection)
+
+
+ def _failedConnection(self, connection):
+ """
+ A connection has failed - remove it from the list of active connections. A new
+ one will be created if needed.
+ """
+ self.activeCount -= 1
+ self.poolStats["connection-errors"] += 1
+ self.connections.remove(connection)
+
+
def _connect(self):
"""
Connect to the directory server.
+ This will always be called in a thread to prevent blocking.
- @returns: A deferred connection object.
- @rtype: deferred L{ldap.ldapobject.LDAPObject}
+ @returns: The connection object.
+ @rtype: L{ldap.ldapobject.LDAPObject}
@raises: L{LDAPConnectionError} if unable to connect.
"""
@@ -314,75 +436,54 @@
# FIXME: ldap connection objects are not thread safe, so let's set up
# a connection pool
- if not hasattr(self, "_connection"):
- self.log.debug("Connecting to LDAP at {log_source.url}")
- connection = ldap.initialize(self.url)
+ self.log.debug("Connecting to LDAP at {log_source.url}")
+ connection = self._newConnection()
- # FIXME: Use trace_file option to wire up debug logging when
- # Twisted adopts the new logging stuff.
-
- for option, value in (
- (ldap.OPT_TIMEOUT, self._timeout),
- (ldap.OPT_X_TLS_CACERTFILE, self._tlsCACertificateFile),
- (ldap.OPT_X_TLS_CACERTDIR, self._tlsCACertificateDirectory),
- (ldap.OPT_DEBUG_LEVEL, self._debug),
- ):
- if value is not None:
- connection.set_option(option, value)
-
- if self._useTLS:
- self.log.debug("Starting TLS for {log_source.url}")
- yield deferToThread(connection.start_tls_s)
-
- if self._credentials is not None:
- if IUsernamePassword.providedBy(self._credentials):
- try:
- yield deferToThread(
- connection.simple_bind_s,
- self._credentials.username,
- self._credentials.password,
- )
- self.log.debug(
- "Bound to LDAP as {credentials.username}",
- credentials=self._credentials
- )
- except (
- ldap.INVALID_CREDENTIALS, ldap.INVALID_DN_SYNTAX
- ) as e:
- self.log.error(
- "Unable to bind to LDAP as {credentials.username}",
- credentials=self._credentials
- )
- raise LDAPBindAuthError(
- self._credentials.username, e
- )
-
- else:
- raise LDAPConnectionError(
- "Unknown credentials type: {0}"
- .format(self._credentials)
+ if self._credentials is not None:
+ if IUsernamePassword.providedBy(self._credentials):
+ try:
+ connection.simple_bind_s(
+ self._credentials.username,
+ self._credentials.password,
)
+ self.log.debug(
+ "Bound to LDAP as {credentials.username}",
+ credentials=self._credentials
+ )
+ except (
+ ldap.INVALID_CREDENTIALS, ldap.INVALID_DN_SYNTAX
+ ) as e:
+ self.log.error(
+ "Unable to bind to LDAP as {credentials.username}",
+ credentials=self._credentials
+ )
+ raise LDAPBindAuthError(
+ self._credentials.username, e
+ )
- self._connection = connection
+ else:
+ raise LDAPConnectionError(
+ "Unknown credentials type: {0}"
+ .format(self._credentials)
+ )
- returnValue(self._connection)
+ return connection
- @inlineCallbacks
- def _authenticateUsernamePassword(self, dn, password):
+ def _newConnection(self):
"""
- Open a secondary connection to the LDAP server and try binding to it
- with the given credentials
+ Create a new LDAP connection and initialize and start TLS if required.
+ This will always be called in a thread to prevent blocking.
- @returns: True if the password is correct, False otherwise
- @rtype: deferred C{bool}
+ @returns: The connection object.
+ @rtype: L{ldap.ldapobject.LDAPObject}
@raises: L{LDAPConnectionError} if unable to connect.
"""
- self.log.debug("Authenticating {dn}", dn=dn)
connection = ldap.initialize(self.url)
- # FIXME: Use a separate connection pool perhaps
+ # FIXME: Use trace_file option to wire up debug logging when
+ # Twisted adopts the new logging stuff.
for option, value in (
(ldap.OPT_TIMEOUT, self._timeout),
@@ -395,102 +496,158 @@
if self._useTLS:
self.log.debug("Starting TLS for {log_source.url}")
- yield deferToThread(connection.start_tls_s)
+ connection.start_tls_s()
+ return connection
+
+
+ def _authenticateUsernamePassword(self, dn, password):
+ """
+ Open a secondary connection to the LDAP server and try binding to it
+ with the given credentials
+
+ @returns: True if the password is correct, False otherwise
+ @rtype: deferred C{bool}
+
+ @raises: L{LDAPConnectionError} if unable to connect.
+ """
+ return deferToThreadPool(
+ reactor, self.threadpool,
+ self._authenticateUsernamePassword_inThread, dn, password
+ )
+
+
+ def _authenticateUsernamePassword_inThread(self, dn, password):
+ """
+ Open a secondary connection to the LDAP server and try binding to it
+ with the given credentials.
+ This method is always called in a thread.
+
+ @returns: True if the password is correct, False otherwise
+ @rtype: C{bool}
+
+ @raises: L{LDAPConnectionError} if unable to connect.
+ """
+ self.log.debug("Authenticating {dn}", dn=dn)
+ connection = self._newConnection()
+
+
try:
- yield deferToThread(
- connection.simple_bind_s,
- dn,
- password,
- )
+ connection.simple_bind_s(dn, password)
self.log.debug("Authenticated {dn}", dn=dn)
- returnValue(True)
+ return True
except (
ldap.INVALID_CREDENTIALS, ldap.INVALID_DN_SYNTAX
):
self.log.debug("Unable to authenticate {dn}", dn=dn)
- returnValue(False)
+ return False
+ finally:
+ # TODO: should we explicitly "close" the connection in a finally
+ # clause rather than just let it go out of scope and be garbage collected
+ # at some indeterminate point in the future? Up side is that we won't hang
+ # on to the connection or other resources for longer than needed. Down side
+ # is we will take a little bit of extra time in this call to close it down.
+ # If we do decide to "close" then we probably have to use one of the "unbind"
+ # methods on the L{LDAPObject}.
+ connection = None
- @inlineCallbacks
def _recordsFromQueryString(self, queryString, recordTypes=None):
+ return deferToThreadPool(
+ reactor, self.threadpool,
+ self._recordsFromQueryString_inThread,
+ queryString,
+ recordTypes,
+ )
+
+
+ def _recordsFromQueryString_inThread(self, queryString, recordTypes=None):
+ """
+ This method is always called in a thread.
+ """
records = []
- connection = yield self._connect()
+ with DirectoryService.Connection(self) as connection:
- if recordTypes is None:
- recordTypes = self.recordTypes()
+ if recordTypes is None:
+ recordTypes = self.recordTypes()
- for recordType in recordTypes:
- rdn = self._recordTypeSchemas[recordType].relativeDN
- rdn = (
- ldap.dn.str2dn(rdn.lower()) +
- ldap.dn.str2dn(self._baseDN.lower())
- )
- self.log.debug(
- "Performing LDAP query: {rdn} {query} {recordType}",
- rdn=rdn,
- query=queryString,
- recordType=recordType
- )
- try:
- reply = yield deferToThread(
- connection.search_s,
- ldap.dn.dn2str(rdn),
- ldap.SCOPE_SUBTREE,
- queryString,
- attrlist=self._attributesToFetch
+ for recordType in recordTypes:
+ rdn = self._recordTypeSchemas[recordType].relativeDN
+ rdn = (
+ ldap.dn.str2dn(rdn.lower()) +
+ ldap.dn.str2dn(self._baseDN.lower())
)
+ self.log.debug(
+ "Performing LDAP query: {rdn} {query} {recordType}",
+ rdn=rdn,
+ query=queryString,
+ recordType=recordType
+ )
+ try:
+ reply = connection.search_s(
+ ldap.dn.dn2str(rdn),
+ ldap.SCOPE_SUBTREE,
+ queryString,
+ attrlist=self._attributesToFetch
+ )
- except ldap.FILTER_ERROR as e:
- self.log.error(
- "Unable to perform query {0!r}: {1}"
- .format(queryString, e)
+ except ldap.FILTER_ERROR as e:
+ self.log.error(
+ "Unable to perform query {0!r}: {1}"
+ .format(queryString, e)
+ )
+ raise LDAPQueryError("Unable to perform query", e)
+
+ except ldap.NO_SUCH_OBJECT as e:
+ self.log.warn("RDN {rdn} does not exist, skipping", rdn=rdn)
+ continue
+
+ records.extend(
+ self._recordsFromReply(reply, recordType=recordType)
)
- raise LDAPQueryError("Unable to perform query", e)
- except ldap.NO_SUCH_OBJECT as e:
- self.log.warn("RDN {rdn} does not exist, skipping", rdn=rdn)
- continue
+ return records
- records.extend(
- (yield self._recordsFromReply(reply, recordType=recordType))
- )
- returnValue(records)
+ def _recordWithDN(self, dn):
+ return deferToThreadPool(
+ reactor, self.threadpool,
+ self._recordWithDN_inThread, dn
+ )
- @inlineCallbacks
- def _recordWithDN(self, dn):
+
+ def _recordWithDN_inThread(self, dn):
"""
+ This method is always called in a thread.
+
@param dn: The DN of the record to search for
@type dn: C{str}
"""
- connection = yield self._connect()
+ with DirectoryService.Connection(self) as connection:
- self.log.debug("Performing LDAP DN query: {dn}", dn=dn)
+ self.log.debug("Performing LDAP DN query: {dn}", dn=dn)
- reply = yield deferToThread(
- connection.search_s,
- dn,
- ldap.SCOPE_SUBTREE,
- "(objectClass=*)",
- attrlist=self._attributesToFetch
- )
- records = self._recordsFromReply(reply)
+ reply = connection.search_s(
+ dn,
+ ldap.SCOPE_SUBTREE,
+ "(objectClass=*)",
+ attrlist=self._attributesToFetch
+ )
+ records = self._recordsFromReply(reply)
+
if len(records):
- returnValue(records[0])
+ return records[0]
else:
- returnValue(None)
+ return None
def _recordsFromReply(self, reply, recordType=None):
records = []
-
for dn, recordData in reply:
# Determine the record type
-
if recordType is None:
recordType = recordTypeForDN(
self._baseDN, self._recordTypeSchemas, dn
@@ -510,7 +667,6 @@
continue
# Populate a fields dictionary
-
fields = {}
for attribute, values in recordData.iteritems():
Added: twext/trunk/twext/who/test/test_concurrency.py
===================================================================
--- twext/trunk/twext/who/test/test_concurrency.py (rev 0)
+++ twext/trunk/twext/who/test/test_concurrency.py 2014-08-08 19:36:13 UTC (rev 13857)
@@ -0,0 +1,203 @@
+##
+# Copyright (c) 2013-2014 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from __future__ import print_function
+from twext.python.types import MappingProxyType
+from twext.who.directory import DirectoryRecord
+from twext.who.idirectory import RecordType, FieldName
+from twext.who.ldap._constants import LDAPAttribute, LDAPObjectClass
+from twext.who.ldap._service import DirectoryService as LDAPDirectoryService, \
+ DEFAULT_FIELDNAME_ATTRIBUTE_MAP, RecordTypeSchema
+from twext.who.opendirectory._service import DirectoryService as ODDirectoryService
+from twisted.internet.defer import inlineCallbacks, DeferredList, returnValue
+from twisted.trial import unittest
+import time
+
+"""
+Test the concurrency of DS implementations against real servers.
+"""
+
+TEST_FIELDNAME_MAP = dict(DEFAULT_FIELDNAME_ATTRIBUTE_MAP)
+TEST_FIELDNAME_MAP[FieldName.uid] = (u"uid",)
+
+TEST_RECORDTYPE_SCHEMAS_OSX = MappingProxyType({
+
+ RecordType.user: RecordTypeSchema(
+ # cn=users
+ relativeDN=u"cn={0}".format("users"),
+
+ # (objectClass=inetOrgPerson)
+ attributes=(
+ (
+ LDAPAttribute.objectClass.value,
+ LDAPObjectClass.inetOrgPerson.value,
+ ),
+ ),
+ ),
+
+ RecordType.group: RecordTypeSchema(
+ # cn=groups
+ relativeDN=u"cn={0}".format("groups"),
+
+ # (objectClass=groupOfNames)
+ attributes=(
+ (
+ LDAPAttribute.objectClass.value,
+ LDAPObjectClass.groupOfNames.value,
+ ),
+ ),
+ ),
+
+})
+
+TEST_RECORDTYPE_SCHEMAS_OTHER = MappingProxyType({
+
+ RecordType.user: RecordTypeSchema(
+ # ou=person
+ relativeDN=u"ou={0}".format("people"),
+
+ # (objectClass=inetOrgPerson)
+ attributes=(
+ (
+ LDAPAttribute.objectClass.value,
+ LDAPObjectClass.inetOrgPerson.value,
+ ),
+ ),
+ ),
+
+ RecordType.group: RecordTypeSchema(
+ # ou=groupOfNames
+ relativeDN=u"ou={0}".format(LDAPObjectClass.groupOfNames.value),
+
+ # (objectClass=groupOfNames)
+ attributes=(
+ (
+ LDAPAttribute.objectClass.value,
+ LDAPObjectClass.groupOfNames.value,
+ ),
+ ),
+ ),
+
+})
+
+class DirectoryServiceConcurrencyTest(unittest.TestCase):
+ """
+ Tests for directory records.
+ """
+
+ @inlineCallbacks
+ def _runTest(self, num_threads, multiple_services, service_maker, details, num_requests, do_auth):
+
+ if multiple_services:
+ services = [service_maker() for _ in range(num_threads)]
+ else:
+ services = [service_maker()] * num_threads
+
+ # Warm up each service before starting timer
+ for n, svc in enumerate(services):
+ record = yield svc.recordWithShortName(RecordType.user, details["user"].format(n + 1))
+ self.assertTrue(isinstance(record, DirectoryRecord))
+
+ start = time.time()
+ ctr = [0]
+
+ @inlineCallbacks
+ def _records(n):
+ for _ in range(num_requests):
+ record = yield services[n].recordWithShortName(RecordType.user, details["user"].format(n + 1))
+ self.assertTrue(isinstance(record, DirectoryRecord))
+ ctr[0] += 1
+
+ @inlineCallbacks
+ def _auth(n):
+ record = yield services[n].recordWithShortName(RecordType.user, details["user"].format(n + 1))
+ for _ in range(num_requests):
+ result = yield record.verifyPlaintextPassword(details["pswd"].format(n + 1))
+ self.assertTrue(result)
+ ctr[0] += 1
+
+ dl = []
+ for i in range(num_threads):
+ dl.append((_auth if do_auth else _records)(i))
+
+ dl = DeferredList(dl)
+ yield dl
+
+ returnValue((time.time() - start, ctr[0],))
+
+
+ @inlineCallbacks
+ def test_ldap_multi_service(self):
+ """
+ See if {ldap._service.DirectoryService is concurrent.
+ """
+
+ num_threads = 20
+ multiple_services = False
+ num_requests = 100
+ do_auth = False
+ use_od = False
+ configChoice = "local"
+
+ configs = {
+ "local": {
+ "url": "ldap://localhost",
+ "baseDN": "dc=example,dc=com",
+ "rschema": TEST_RECORDTYPE_SCHEMAS_OSX,
+ "user": u"user{:02d}",
+ "pswd": u"user{:02d}",
+ },
+ "example": {
+ "url": "ldap://example.com",
+ "baseDN": "o=example.com,o=email",
+ "rschema": TEST_RECORDTYPE_SCHEMAS_OTHER,
+ "user": u"TestAccount{}",
+ "pswd": u"pswd",
+ },
+ }
+
+ details = configs[configChoice]
+
+ def _serviceMaker():
+ if use_od:
+ return ODDirectoryService(
+ nodeName="/LDAPv3/127.0.0.1",
+ )
+ else:
+ return LDAPDirectoryService(
+ url=details["url"],
+ baseDN=details["baseDN"],
+ fieldNameToAttributesMap=TEST_FIELDNAME_MAP,
+ recordTypeSchemas=details["rschema"],
+ threadPoolMax=20,
+ )
+
+
+ duration, count = yield self._runTest(num_threads, multiple_services, _serviceMaker, details, num_requests, do_auth)
+
+ print(
+ "\n\nType: {} {} {}\nNumber of Services/Requests: {}/{}\nTime: {}\nCount: {}\n".format(
+ "OD" if use_od else "LDAP",
+ "Multiple" if multiple_services else "Single",
+ "Auth" if do_auth else "query",
+ num_threads,
+ num_requests,
+ duration,
+ count,
+ )
+ )
+
+ test_ldap_multi_service.skip = "Not really a unit test - requires actually server to work"
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20140808/9c8c812f/attachment-0001.html>
More information about the calendarserver-changes
mailing list