<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>[13857] twext/trunk/twext/who</title>
</head>
<body>
<style type="text/css"><!--
#msg dl.meta { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; }
#msg dl.meta dt { float: left; width: 6em; font-weight: bold; }
#msg dt:after { content:':';}
#msg dl, #msg dt, #msg ul, #msg li, #header, #footer, #logmsg { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; }
#msg dl a { font-weight: bold}
#msg dl a:link { color:#fc3; }
#msg dl a:active { color:#ff0; }
#msg dl a:visited { color:#cc6; }
h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: bold; }
#msg pre { overflow: auto; background: #ffc; border: 1px #fa0 solid; padding: 6px; }
#logmsg { background: #ffc; border: 1px #fa0 solid; padding: 1em 1em 0 1em; }
#logmsg p, #logmsg pre, #logmsg blockquote { margin: 0 0 1em 0; }
#logmsg p, #logmsg li, #logmsg dt, #logmsg dd { line-height: 14pt; }
#logmsg h1, #logmsg h2, #logmsg h3, #logmsg h4, #logmsg h5, #logmsg h6 { margin: .5em 0; }
#logmsg h1:first-child, #logmsg h2:first-child, #logmsg h3:first-child, #logmsg h4:first-child, #logmsg h5:first-child, #logmsg h6:first-child { margin-top: 0; }
#logmsg ul, #logmsg ol { padding: 0; list-style-position: inside; margin: 0 0 0 1em; }
#logmsg ul { text-indent: -1em; padding-left: 1em; }#logmsg ol { text-indent: -1.5em; padding-left: 1.5em; }
#logmsg > ul, #logmsg > ol { margin: 0 0 1em 0; }
#logmsg pre { background: #eee; padding: 1em; }
#logmsg blockquote { border: 1px solid #fa0; border-left-width: 10px; padding: 1em 1em 0 1em; background: white;}
#logmsg dl { margin: 0; }
#logmsg dt { font-weight: bold; }
#logmsg dd { margin: 0; padding: 0 0 0.5em 0; }
#logmsg dd:before { content:'\00bb';}
#logmsg table { border-spacing: 0px; border-collapse: collapse; border-top: 4px solid #fa0; border-bottom: 1px solid #fa0; background: #fff; }
#logmsg table th { text-align: left; font-weight: normal; padding: 0.2em 0.5em; border-top: 1px dotted #fa0; }
#logmsg table td { text-align: right; border-top: 1px dotted #fa0; padding: 0.2em 0.5em; }
#logmsg table thead th { text-align: center; border-bottom: 1px solid #fa0; }
#logmsg table th.Corner { text-align: left; }
#logmsg hr { border: none 0; border-top: 2px dashed #fa0; height: 1px; }
#header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: 6px; }
#patch { width: 100%; }
#patch h4 {font-family: verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;}
#patch .propset h4, #patch .binary h4 {margin:0;}
#patch pre {padding:0;line-height:1.2em;margin:0;}
#patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;}
#patch .propset .diff, #patch .binary .diff {padding:10px 0;}
#patch span {display:block;padding:0 10px;}
#patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, #patch .copfile {border:1px solid #ccc;margin:10px 0;}
#patch ins {background:#dfd;text-decoration:none;display:block;padding:0 10px;}
#patch del {background:#fdd;text-decoration:none;display:block;padding:0 10px;}
#patch .lines, .info {color:#888;background:#fff;}
--></style>
<div id="msg">
<dl class="meta">
<dt>Revision</dt> <dd><a href="http://trac.calendarserver.org//changeset/13857">13857</a></dd>
<dt>Author</dt> <dd>cdaboo@apple.com</dd>
<dt>Date</dt> <dd>2014-08-08 12:36:13 -0700 (Fri, 08 Aug 2014)</dd>
</dl>
<h3>Log Message</h3>
<pre>Use a thread pool for LDAP connections to avoid internal python-ldap locking issues when doing concurrent LDAP requests.</pre>
<h3>Modified Paths</h3>
<ul>
<li><a href="#twexttrunktwextwholdap_servicepy">twext/trunk/twext/who/ldap/_service.py</a></li>
</ul>
<h3>Added Paths</h3>
<ul>
<li><a href="#twexttrunktwextwhotesttest_concurrencypy">twext/trunk/twext/who/test/test_concurrency.py</a></li>
</ul>
</div>
<div id="patch">
<h3>Diff</h3>
<a id="twexttrunktwextwholdap_servicepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/who/ldap/_service.py (13856 => 13857)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/who/ldap/_service.py        2014-08-08 19:33:37 UTC (rev 13856)
+++ twext/trunk/twext/who/ldap/_service.py        2014-08-08 19:36:13 UTC (rev 13857)
</span><span class="lines">@@ -21,14 +21,19 @@
</span><span class="cx"> LDAP directory service implementation.
</span><span class="cx"> """
</span><span class="cx">
</span><ins>+from Queue import Queue, Empty
+from threading import RLock
</ins><span class="cx"> from uuid import UUID
</span><span class="cx">
</span><ins>+import collections
</ins><span class="cx"> import ldap
</span><span class="cx">
</span><span class="cx"> from twisted.python.constants import Names, NamedConstant
</span><span class="cx"> from twisted.internet.defer import succeed, inlineCallbacks, returnValue
</span><del>-from twisted.internet.threads import deferToThread
</del><ins>+from twisted.internet.threads import deferToThreadPool
</ins><span class="cx"> from twisted.cred.credentials import IUsernamePassword
</span><ins>+from twisted.python.threadpool import ThreadPool
+from twisted.internet import reactor
</ins><span class="cx">
</span><span class="cx"> from twext.python.log import Logger
</span><span class="cx"> from twext.python.types import MappingProxyType
</span><span class="lines">@@ -101,7 +106,7 @@
</span><span class="cx">
</span><span class="cx">
</span><span class="cx"> #
</span><del>-# Data type extentions
</del><ins>+# Data type extensions
</ins><span class="cx"> #
</span><span class="cx">
</span><span class="cx"> class FieldName(Names):
</span><span class="lines">@@ -219,6 +224,9 @@
</span><span class="cx"> useTLS=False,
</span><span class="cx"> fieldNameToAttributesMap=DEFAULT_FIELDNAME_ATTRIBUTE_MAP,
</span><span class="cx"> recordTypeSchemas=DEFAULT_RECORDTYPE_SCHEMAS,
</span><ins>+ ownThreadpool=True,
+ threadPoolMax=10,
+ connectionMax=10,
</ins><span class="cx"> _debug=False,
</span><span class="cx"> ):
</span><span class="cx"> """
</span><span class="lines">@@ -294,19 +302,133 @@
</span><span class="cx"> attributesToFetch.add(attribute.encode("utf-8"))
</span><span class="cx"> self._attributesToFetch = list(attributesToFetch)
</span><span class="cx">
</span><ins>+ # Threaded connection pool. The connection size limit here is the size for connections doing queries.
+ # There will also be one-off connections for authentications which also run in their own threads. Thus
+ # the threadpool max ought to be larger than the connection max to allow for both pooled query connections
+ # and one-off auth-only connections.
</ins><span class="cx">
</span><ins>+ self.ownThreadpool = ownThreadpool
+ if self.ownThreadpool:
+ self.threadpool = ThreadPool(minthreads=1, maxthreads=threadPoolMax, name="LDAPDirectoryService")
+ else:
+ # Use the default threadpool but adjust its size to fit our needs
+ self.threadpool = reactor.getThreadPool()
+ self.threadpool.adjustPoolsize(
+ max(threadPoolMax, self.threadpool.max)
+ )
+ self.connectionMax = connectionMax
+ self.connectionCreateLock = RLock()
+ self.connections = []
+ self.connectionQueue = Queue()
+ self.poolStats = collections.defaultdict(int)
+ self.activeCount = 0
+
+ reactor.callWhenRunning(self.start)
+ reactor.addSystemEventTrigger('during', 'shutdown', self.stop)
+
+
+ def start(self):
+ """
+ Start up this service. Initialize the threadpool (if we own it).
+ """
+ if self.ownThreadpool:
+ self.threadpool.start()
+
+
+ def stop(self):
+ """
+ Stop the service. Stop the threadpool if we own it and do other clean-up.
+ """
+ if self.ownThreadpool:
+ self.threadpool.stop()
+
+ # FIXME: we should probably also close the pool of active connections too
+
+
</ins><span class="cx"> @property
</span><span class="cx"> def realmName(self):
</span><span class="cx"> return u"{self.url}".format(self=self)
</span><span class="cx">
</span><span class="cx">
</span><del>- @inlineCallbacks
</del><ins>+ class Connection(object):
+ """
+ ContextManager object for getting a connection from the pool. On exit the connection
+ will be put back in the pool if no exception was raised. Otherwise, the connection will be
+ removed from the active connection list, which will allow a new "clean" connection to
+ be created later if needed.
+ """
+
+ def __init__(self, ds):
+ self.ds = ds
+
+ def __enter__(self):
+ self.connection = self.ds._getConnection()
+ return self.connection
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if exc_type is None:
+ self.ds._returnConnection(self.connection)
+ return True
+ else:
+ self.ds._failedConnection(self.connection)
+ return False
+
+
+ def _getConnection(self):
+ """
+ Get a connection from the connection pool. This will retrieve a connection from the connection
+ pool L{Queue} object. If the L{Queue} is empty, it will check to see whether a new connection can
+ be created (based on the connection limit), and if so create that and use it. If no new
+ connections can be created, it will block on the L{Queue} until an existing, in-use, connection
+ is put back.
+ """
+ try:
+ connection = self.connectionQueue.get(block=False)
+ except Empty:
+ # Note we use a lock here to prevent a race condition in which multiple requests for a new connection
+ # could succeed even though the connection counts starts out one less than the maximum. This can happen
+ # because self._connect() can take a while.
+ self.connectionCreateLock.acquire()
+ if len(self.connections) < self.connectionMax:
+ connection = self._connect()
+ self.connections.append(connection)
+ self.connectionCreateLock.release()
+ else:
+ self.connectionCreateLock.release()
+ self.poolStats["connection-blocked"] += 1
+ connection = self.connectionQueue.get()
+
+ self.poolStats["connection-{}".format(self.connections.index(connection))] += 1
+ self.activeCount += 1
+ self.poolStats["connection-max"] = max(self.poolStats["connection-max"], self.activeCount)
+ return connection
+
+
+ def _returnConnection(self, connection):
+ """
+ A connection is no longer needed - return it to the pool.
+ """
+ self.activeCount -= 1
+ self.connectionQueue.put(connection)
+
+
+ def _failedConnection(self, connection):
+ """
+ A connection has failed - remove it from the list of active connections. A new
+ one will be created if needed.
+ """
+ self.activeCount -= 1
+ self.poolStats["connection-errors"] += 1
+ self.connections.remove(connection)
+
+
</ins><span class="cx"> def _connect(self):
</span><span class="cx"> """
</span><span class="cx"> Connect to the directory server.
</span><ins>+ This will always be called in a thread to prevent blocking.
</ins><span class="cx">
</span><del>- @returns: A deferred connection object.
- @rtype: deferred L{ldap.ldapobject.LDAPObject}
</del><ins>+ @returns: The connection object.
+ @rtype: L{ldap.ldapobject.LDAPObject}
</ins><span class="cx">
</span><span class="cx"> @raises: L{LDAPConnectionError} if unable to connect.
</span><span class="cx"> """
</span><span class="lines">@@ -314,75 +436,54 @@
</span><span class="cx"> # FIXME: ldap connection objects are not thread safe, so let's set up
</span><span class="cx"> # a connection pool
</span><span class="cx">
</span><del>- if not hasattr(self, "_connection"):
- self.log.debug("Connecting to LDAP at {log_source.url}")
- connection = ldap.initialize(self.url)
</del><ins>+ self.log.debug("Connecting to LDAP at {log_source.url}")
+ connection = self._newConnection()
</ins><span class="cx">
</span><del>- # FIXME: Use trace_file option to wire up debug logging when
- # Twisted adopts the new logging stuff.
-
- for option, value in (
- (ldap.OPT_TIMEOUT, self._timeout),
- (ldap.OPT_X_TLS_CACERTFILE, self._tlsCACertificateFile),
- (ldap.OPT_X_TLS_CACERTDIR, self._tlsCACertificateDirectory),
- (ldap.OPT_DEBUG_LEVEL, self._debug),
- ):
- if value is not None:
- connection.set_option(option, value)
-
- if self._useTLS:
- self.log.debug("Starting TLS for {log_source.url}")
- yield deferToThread(connection.start_tls_s)
-
- if self._credentials is not None:
- if IUsernamePassword.providedBy(self._credentials):
- try:
- yield deferToThread(
- connection.simple_bind_s,
- self._credentials.username,
- self._credentials.password,
- )
- self.log.debug(
- "Bound to LDAP as {credentials.username}",
- credentials=self._credentials
- )
- except (
- ldap.INVALID_CREDENTIALS, ldap.INVALID_DN_SYNTAX
- ) as e:
- self.log.error(
- "Unable to bind to LDAP as {credentials.username}",
- credentials=self._credentials
- )
- raise LDAPBindAuthError(
- self._credentials.username, e
- )
-
- else:
- raise LDAPConnectionError(
- "Unknown credentials type: {0}"
- .format(self._credentials)
</del><ins>+ if self._credentials is not None:
+ if IUsernamePassword.providedBy(self._credentials):
+ try:
+ connection.simple_bind_s(
+ self._credentials.username,
+ self._credentials.password,
</ins><span class="cx"> )
</span><ins>+ self.log.debug(
+ "Bound to LDAP as {credentials.username}",
+ credentials=self._credentials
+ )
+ except (
+ ldap.INVALID_CREDENTIALS, ldap.INVALID_DN_SYNTAX
+ ) as e:
+ self.log.error(
+ "Unable to bind to LDAP as {credentials.username}",
+ credentials=self._credentials
+ )
+ raise LDAPBindAuthError(
+ self._credentials.username, e
+ )
</ins><span class="cx">
</span><del>- self._connection = connection
</del><ins>+ else:
+ raise LDAPConnectionError(
+ "Unknown credentials type: {0}"
+ .format(self._credentials)
+ )
</ins><span class="cx">
</span><del>- returnValue(self._connection)
</del><ins>+ return connection
</ins><span class="cx">
</span><span class="cx">
</span><del>- @inlineCallbacks
- def _authenticateUsernamePassword(self, dn, password):
</del><ins>+ def _newConnection(self):
</ins><span class="cx"> """
</span><del>- Open a secondary connection to the LDAP server and try binding to it
- with the given credentials
</del><ins>+ Create a new LDAP connection and initialize and start TLS if required.
+ This will always be called in a thread to prevent blocking.
</ins><span class="cx">
</span><del>- @returns: True if the password is correct, False otherwise
- @rtype: deferred C{bool}
</del><ins>+ @returns: The connection object.
+ @rtype: L{ldap.ldapobject.LDAPObject}
</ins><span class="cx">
</span><span class="cx"> @raises: L{LDAPConnectionError} if unable to connect.
</span><span class="cx"> """
</span><del>- self.log.debug("Authenticating {dn}", dn=dn)
</del><span class="cx"> connection = ldap.initialize(self.url)
</span><span class="cx">
</span><del>- # FIXME: Use a separate connection pool perhaps
</del><ins>+ # FIXME: Use trace_file option to wire up debug logging when
+ # Twisted adopts the new logging stuff.
</ins><span class="cx">
</span><span class="cx"> for option, value in (
</span><span class="cx"> (ldap.OPT_TIMEOUT, self._timeout),
</span><span class="lines">@@ -395,102 +496,158 @@
</span><span class="cx">
</span><span class="cx"> if self._useTLS:
</span><span class="cx"> self.log.debug("Starting TLS for {log_source.url}")
</span><del>- yield deferToThread(connection.start_tls_s)
</del><ins>+ connection.start_tls_s()
</ins><span class="cx">
</span><ins>+ return connection
+
+
+ def _authenticateUsernamePassword(self, dn, password):
+ """
+ Open a secondary connection to the LDAP server and try binding to it
+ with the given credentials
+
+ @returns: True if the password is correct, False otherwise
+ @rtype: deferred C{bool}
+
+ @raises: L{LDAPConnectionError} if unable to connect.
+ """
+ return deferToThreadPool(
+ reactor, self.threadpool,
+ self._authenticateUsernamePassword_inThread, dn, password
+ )
+
+
+ def _authenticateUsernamePassword_inThread(self, dn, password):
+ """
+ Open a secondary connection to the LDAP server and try binding to it
+ with the given credentials.
+ This method is always called in a thread.
+
+ @returns: True if the password is correct, False otherwise
+ @rtype: C{bool}
+
+ @raises: L{LDAPConnectionError} if unable to connect.
+ """
+ self.log.debug("Authenticating {dn}", dn=dn)
+ connection = self._newConnection()
+
+
</ins><span class="cx"> try:
</span><del>- yield deferToThread(
- connection.simple_bind_s,
- dn,
- password,
- )
</del><ins>+ connection.simple_bind_s(dn, password)
</ins><span class="cx"> self.log.debug("Authenticated {dn}", dn=dn)
</span><del>- returnValue(True)
</del><ins>+ return True
</ins><span class="cx"> except (
</span><span class="cx"> ldap.INVALID_CREDENTIALS, ldap.INVALID_DN_SYNTAX
</span><span class="cx"> ):
</span><span class="cx"> self.log.debug("Unable to authenticate {dn}", dn=dn)
</span><del>- returnValue(False)
</del><ins>+ return False
+ finally:
+ # TODO: should we explicitly "close" the connection in a finally
+ # clause rather than just let it go out of scope and be garbage collected
+ # at some indeterminate point in the future? Up side is that we won't hang
+ # on to the connection or other resources for longer than needed. Down side
+ # is we will take a little bit of extra time in this call to close it down.
+ # If we do decide to "close" then we probably have to use one of the "unbind"
+ # methods on the L{LDAPObject}.
+ connection = None
</ins><span class="cx">
</span><span class="cx">
</span><del>- @inlineCallbacks
</del><span class="cx"> def _recordsFromQueryString(self, queryString, recordTypes=None):
</span><ins>+ return deferToThreadPool(
+ reactor, self.threadpool,
+ self._recordsFromQueryString_inThread,
+ queryString,
+ recordTypes,
+ )
+
+
+ def _recordsFromQueryString_inThread(self, queryString, recordTypes=None):
+ """
+ This method is always called in a thread.
+ """
</ins><span class="cx"> records = []
</span><span class="cx">
</span><del>- connection = yield self._connect()
</del><ins>+ with DirectoryService.Connection(self) as connection:
</ins><span class="cx">
</span><del>- if recordTypes is None:
- recordTypes = self.recordTypes()
</del><ins>+ if recordTypes is None:
+ recordTypes = self.recordTypes()
</ins><span class="cx">
</span><del>- for recordType in recordTypes:
- rdn = self._recordTypeSchemas[recordType].relativeDN
- rdn = (
- ldap.dn.str2dn(rdn.lower()) +
- ldap.dn.str2dn(self._baseDN.lower())
- )
- self.log.debug(
- "Performing LDAP query: {rdn} {query} {recordType}",
- rdn=rdn,
- query=queryString,
- recordType=recordType
- )
- try:
- reply = yield deferToThread(
- connection.search_s,
- ldap.dn.dn2str(rdn),
- ldap.SCOPE_SUBTREE,
- queryString,
- attrlist=self._attributesToFetch
</del><ins>+ for recordType in recordTypes:
+ rdn = self._recordTypeSchemas[recordType].relativeDN
+ rdn = (
+ ldap.dn.str2dn(rdn.lower()) +
+ ldap.dn.str2dn(self._baseDN.lower())
</ins><span class="cx"> )
</span><ins>+ self.log.debug(
+ "Performing LDAP query: {rdn} {query} {recordType}",
+ rdn=rdn,
+ query=queryString,
+ recordType=recordType
+ )
+ try:
+ reply = connection.search_s(
+ ldap.dn.dn2str(rdn),
+ ldap.SCOPE_SUBTREE,
+ queryString,
+ attrlist=self._attributesToFetch
+ )
</ins><span class="cx">
</span><del>- except ldap.FILTER_ERROR as e:
- self.log.error(
- "Unable to perform query {0!r}: {1}"
- .format(queryString, e)
</del><ins>+ except ldap.FILTER_ERROR as e:
+ self.log.error(
+ "Unable to perform query {0!r}: {1}"
+ .format(queryString, e)
+ )
+ raise LDAPQueryError("Unable to perform query", e)
+
+ except ldap.NO_SUCH_OBJECT as e:
+ self.log.warn("RDN {rdn} does not exist, skipping", rdn=rdn)
+ continue
+
+ records.extend(
+ self._recordsFromReply(reply, recordType=recordType)
</ins><span class="cx"> )
</span><del>- raise LDAPQueryError("Unable to perform query", e)
</del><span class="cx">
</span><del>- except ldap.NO_SUCH_OBJECT as e:
- self.log.warn("RDN {rdn} does not exist, skipping", rdn=rdn)
- continue
</del><ins>+ return records
</ins><span class="cx">
</span><del>- records.extend(
- (yield self._recordsFromReply(reply, recordType=recordType))
- )
- returnValue(records)
</del><span class="cx">
</span><ins>+ def _recordWithDN(self, dn):
+ return deferToThreadPool(
+ reactor, self.threadpool,
+ self._recordWithDN_inThread, dn
+ )
</ins><span class="cx">
</span><del>- @inlineCallbacks
- def _recordWithDN(self, dn):
</del><ins>+
+ def _recordWithDN_inThread(self, dn):
</ins><span class="cx"> """
</span><ins>+ This method is always called in a thread.
+
</ins><span class="cx"> @param dn: The DN of the record to search for
</span><span class="cx"> @type dn: C{str}
</span><span class="cx"> """
</span><del>- connection = yield self._connect()
</del><ins>+ with DirectoryService.Connection(self) as connection:
</ins><span class="cx">
</span><del>- self.log.debug("Performing LDAP DN query: {dn}", dn=dn)
</del><ins>+ self.log.debug("Performing LDAP DN query: {dn}", dn=dn)
</ins><span class="cx">
</span><del>- reply = yield deferToThread(
- connection.search_s,
- dn,
- ldap.SCOPE_SUBTREE,
- "(objectClass=*)",
- attrlist=self._attributesToFetch
- )
- records = self._recordsFromReply(reply)
</del><ins>+ reply = connection.search_s(
+ dn,
+ ldap.SCOPE_SUBTREE,
+ "(objectClass=*)",
+ attrlist=self._attributesToFetch
+ )
+ records = self._recordsFromReply(reply)
+
</ins><span class="cx"> if len(records):
</span><del>- returnValue(records[0])
</del><ins>+ return records[0]
</ins><span class="cx"> else:
</span><del>- returnValue(None)
</del><ins>+ return None
</ins><span class="cx">
</span><span class="cx">
</span><span class="cx"> def _recordsFromReply(self, reply, recordType=None):
</span><span class="cx"> records = []
</span><span class="cx">
</span><del>-
</del><span class="cx"> for dn, recordData in reply:
</span><span class="cx">
</span><span class="cx"> # Determine the record type
</span><del>-
</del><span class="cx"> if recordType is None:
</span><span class="cx"> recordType = recordTypeForDN(
</span><span class="cx"> self._baseDN, self._recordTypeSchemas, dn
</span><span class="lines">@@ -510,7 +667,6 @@
</span><span class="cx"> continue
</span><span class="cx">
</span><span class="cx"> # Populate a fields dictionary
</span><del>-
</del><span class="cx"> fields = {}
</span><span class="cx">
</span><span class="cx"> for attribute, values in recordData.iteritems():
</span></span></pre></div>
<a id="twexttrunktwextwhotesttest_concurrencypy"></a>
<div class="addfile"><h4>Added: twext/trunk/twext/who/test/test_concurrency.py (0 => 13857)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/who/test/test_concurrency.py         (rev 0)
+++ twext/trunk/twext/who/test/test_concurrency.py        2014-08-08 19:36:13 UTC (rev 13857)
</span><span class="lines">@@ -0,0 +1,203 @@
</span><ins>+##
+# Copyright (c) 2013-2014 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from __future__ import print_function
+from twext.python.types import MappingProxyType
+from twext.who.directory import DirectoryRecord
+from twext.who.idirectory import RecordType, FieldName
+from twext.who.ldap._constants import LDAPAttribute, LDAPObjectClass
+from twext.who.ldap._service import DirectoryService as LDAPDirectoryService, \
+ DEFAULT_FIELDNAME_ATTRIBUTE_MAP, RecordTypeSchema
+from twext.who.opendirectory._service import DirectoryService as ODDirectoryService
+from twisted.internet.defer import inlineCallbacks, DeferredList, returnValue
+from twisted.trial import unittest
+import time
+
+"""
+Test the concurrency of DS implementations against real servers.
+"""
+
+TEST_FIELDNAME_MAP = dict(DEFAULT_FIELDNAME_ATTRIBUTE_MAP)
+TEST_FIELDNAME_MAP[FieldName.uid] = (u"uid",)
+
+TEST_RECORDTYPE_SCHEMAS_OSX = MappingProxyType({
+
+ RecordType.user: RecordTypeSchema(
+ # cn=users
+ relativeDN=u"cn={0}".format("users"),
+
+ # (objectClass=inetOrgPerson)
+ attributes=(
+ (
+ LDAPAttribute.objectClass.value,
+ LDAPObjectClass.inetOrgPerson.value,
+ ),
+ ),
+ ),
+
+ RecordType.group: RecordTypeSchema(
+ # cn=groups
+ relativeDN=u"cn={0}".format("groups"),
+
+ # (objectClass=groupOfNames)
+ attributes=(
+ (
+ LDAPAttribute.objectClass.value,
+ LDAPObjectClass.groupOfNames.value,
+ ),
+ ),
+ ),
+
+})
+
+TEST_RECORDTYPE_SCHEMAS_OTHER = MappingProxyType({
+
+ RecordType.user: RecordTypeSchema(
+ # ou=person
+ relativeDN=u"ou={0}".format("people"),
+
+ # (objectClass=inetOrgPerson)
+ attributes=(
+ (
+ LDAPAttribute.objectClass.value,
+ LDAPObjectClass.inetOrgPerson.value,
+ ),
+ ),
+ ),
+
+ RecordType.group: RecordTypeSchema(
+ # ou=groupOfNames
+ relativeDN=u"ou={0}".format(LDAPObjectClass.groupOfNames.value),
+
+ # (objectClass=groupOfNames)
+ attributes=(
+ (
+ LDAPAttribute.objectClass.value,
+ LDAPObjectClass.groupOfNames.value,
+ ),
+ ),
+ ),
+
+})
+
+class DirectoryServiceConcurrencyTest(unittest.TestCase):
+ """
+ Tests for directory records.
+ """
+
+ @inlineCallbacks
+ def _runTest(self, num_threads, multiple_services, service_maker, details, num_requests, do_auth):
+
+ if multiple_services:
+ services = [service_maker() for _ in range(num_threads)]
+ else:
+ services = [service_maker()] * num_threads
+
+ # Warm up each service before starting timer
+ for n, svc in enumerate(services):
+ record = yield svc.recordWithShortName(RecordType.user, details["user"].format(n + 1))
+ self.assertTrue(isinstance(record, DirectoryRecord))
+
+ start = time.time()
+ ctr = [0]
+
+ @inlineCallbacks
+ def _records(n):
+ for _ in range(num_requests):
+ record = yield services[n].recordWithShortName(RecordType.user, details["user"].format(n + 1))
+ self.assertTrue(isinstance(record, DirectoryRecord))
+ ctr[0] += 1
+
+ @inlineCallbacks
+ def _auth(n):
+ record = yield services[n].recordWithShortName(RecordType.user, details["user"].format(n + 1))
+ for _ in range(num_requests):
+ result = yield record.verifyPlaintextPassword(details["pswd"].format(n + 1))
+ self.assertTrue(result)
+ ctr[0] += 1
+
+ dl = []
+ for i in range(num_threads):
+ dl.append((_auth if do_auth else _records)(i))
+
+ dl = DeferredList(dl)
+ yield dl
+
+ returnValue((time.time() - start, ctr[0],))
+
+
+ @inlineCallbacks
+ def test_ldap_multi_service(self):
+ """
+ See if {ldap._service.DirectoryService is concurrent.
+ """
+
+ num_threads = 20
+ multiple_services = False
+ num_requests = 100
+ do_auth = False
+ use_od = False
+ configChoice = "local"
+
+ configs = {
+ "local": {
+ "url": "ldap://localhost",
+ "baseDN": "dc=example,dc=com",
+ "rschema": TEST_RECORDTYPE_SCHEMAS_OSX,
+ "user": u"user{:02d}",
+ "pswd": u"user{:02d}",
+ },
+ "example": {
+ "url": "ldap://example.com",
+ "baseDN": "o=example.com,o=email",
+ "rschema": TEST_RECORDTYPE_SCHEMAS_OTHER,
+ "user": u"TestAccount{}",
+ "pswd": u"pswd",
+ },
+ }
+
+ details = configs[configChoice]
+
+ def _serviceMaker():
+ if use_od:
+ return ODDirectoryService(
+ nodeName="/LDAPv3/127.0.0.1",
+ )
+ else:
+ return LDAPDirectoryService(
+ url=details["url"],
+ baseDN=details["baseDN"],
+ fieldNameToAttributesMap=TEST_FIELDNAME_MAP,
+ recordTypeSchemas=details["rschema"],
+ threadPoolMax=20,
+ )
+
+
+ duration, count = yield self._runTest(num_threads, multiple_services, _serviceMaker, details, num_requests, do_auth)
+
+ print(
+ "\n\nType: {} {} {}\nNumber of Services/Requests: {}/{}\nTime: {}\nCount: {}\n".format(
+ "OD" if use_od else "LDAP",
+ "Multiple" if multiple_services else "Single",
+ "Auth" if do_auth else "query",
+ num_threads,
+ num_requests,
+ duration,
+ count,
+ )
+ )
+
+ test_ldap_multi_service.skip = "Not really a unit test - requires actually server to work"
</ins></span></pre>
</div>
</div>
</body>
</html>