<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN"
"http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head><meta http-equiv="content-type" content="text/html; charset=utf-8" />
<title>[13857] twext/trunk/twext/who</title>
</head>
<body>

<style type="text/css"><!--
#msg dl.meta { border: 1px #006 solid; background: #369; padding: 6px; color: #fff; }
#msg dl.meta dt { float: left; width: 6em; font-weight: bold; }
#msg dt:after { content:':';}
#msg dl, #msg dt, #msg ul, #msg li, #header, #footer, #logmsg { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt;  }
#msg dl a { font-weight: bold}
#msg dl a:link    { color:#fc3; }
#msg dl a:active  { color:#ff0; }
#msg dl a:visited { color:#cc6; }
h3 { font-family: verdana,arial,helvetica,sans-serif; font-size: 10pt; font-weight: bold; }
#msg pre { overflow: auto; background: #ffc; border: 1px #fa0 solid; padding: 6px; }
#logmsg { background: #ffc; border: 1px #fa0 solid; padding: 1em 1em 0 1em; }
#logmsg p, #logmsg pre, #logmsg blockquote { margin: 0 0 1em 0; }
#logmsg p, #logmsg li, #logmsg dt, #logmsg dd { line-height: 14pt; }
#logmsg h1, #logmsg h2, #logmsg h3, #logmsg h4, #logmsg h5, #logmsg h6 { margin: .5em 0; }
#logmsg h1:first-child, #logmsg h2:first-child, #logmsg h3:first-child, #logmsg h4:first-child, #logmsg h5:first-child, #logmsg h6:first-child { margin-top: 0; }
#logmsg ul, #logmsg ol { padding: 0; list-style-position: inside; margin: 0 0 0 1em; }
#logmsg ul { text-indent: -1em; padding-left: 1em; }#logmsg ol { text-indent: -1.5em; padding-left: 1.5em; }
#logmsg > ul, #logmsg > ol { margin: 0 0 1em 0; }
#logmsg pre { background: #eee; padding: 1em; }
#logmsg blockquote { border: 1px solid #fa0; border-left-width: 10px; padding: 1em 1em 0 1em; background: white;}
#logmsg dl { margin: 0; }
#logmsg dt { font-weight: bold; }
#logmsg dd { margin: 0; padding: 0 0 0.5em 0; }
#logmsg dd:before { content:'\00bb';}
#logmsg table { border-spacing: 0px; border-collapse: collapse; border-top: 4px solid #fa0; border-bottom: 1px solid #fa0; background: #fff; }
#logmsg table th { text-align: left; font-weight: normal; padding: 0.2em 0.5em; border-top: 1px dotted #fa0; }
#logmsg table td { text-align: right; border-top: 1px dotted #fa0; padding: 0.2em 0.5em; }
#logmsg table thead th { text-align: center; border-bottom: 1px solid #fa0; }
#logmsg table th.Corner { text-align: left; }
#logmsg hr { border: none 0; border-top: 2px dashed #fa0; height: 1px; }
#header, #footer { color: #fff; background: #636; border: 1px #300 solid; padding: 6px; }
#patch { width: 100%; }
#patch h4 {font-family: verdana,arial,helvetica,sans-serif;font-size:10pt;padding:8px;background:#369;color:#fff;margin:0;}
#patch .propset h4, #patch .binary h4 {margin:0;}
#patch pre {padding:0;line-height:1.2em;margin:0;}
#patch .diff {width:100%;background:#eee;padding: 0 0 10px 0;overflow:auto;}
#patch .propset .diff, #patch .binary .diff  {padding:10px 0;}
#patch span {display:block;padding:0 10px;}
#patch .modfile, #patch .addfile, #patch .delfile, #patch .propset, #patch .binary, #patch .copfile {border:1px solid #ccc;margin:10px 0;}
#patch ins {background:#dfd;text-decoration:none;display:block;padding:0 10px;}
#patch del {background:#fdd;text-decoration:none;display:block;padding:0 10px;}
#patch .lines, .info {color:#888;background:#fff;}
--></style>
<div id="msg">
<dl class="meta">
<dt>Revision</dt> <dd><a href="http://trac.calendarserver.org//changeset/13857">13857</a></dd>
<dt>Author</dt> <dd>cdaboo@apple.com</dd>
<dt>Date</dt> <dd>2014-08-08 12:36:13 -0700 (Fri, 08 Aug 2014)</dd>
</dl>

<h3>Log Message</h3>
<pre>Use a thread pool for LDAP connections to avoid internal python-ldap locking issues when doing concurrent LDAP requests.</pre>

<h3>Modified Paths</h3>
<ul>
<li><a href="#twexttrunktwextwholdap_servicepy">twext/trunk/twext/who/ldap/_service.py</a></li>
</ul>

<h3>Added Paths</h3>
<ul>
<li><a href="#twexttrunktwextwhotesttest_concurrencypy">twext/trunk/twext/who/test/test_concurrency.py</a></li>
</ul>

</div>
<div id="patch">
<h3>Diff</h3>
<a id="twexttrunktwextwholdap_servicepy"></a>
<div class="modfile"><h4>Modified: twext/trunk/twext/who/ldap/_service.py (13856 => 13857)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/who/ldap/_service.py        2014-08-08 19:33:37 UTC (rev 13856)
+++ twext/trunk/twext/who/ldap/_service.py        2014-08-08 19:36:13 UTC (rev 13857)
</span><span class="lines">@@ -21,14 +21,19 @@
</span><span class="cx"> LDAP directory service implementation.
</span><span class="cx"> &quot;&quot;&quot;
</span><span class="cx"> 
</span><ins>+from Queue import Queue, Empty
+from threading import RLock
</ins><span class="cx"> from uuid import UUID
</span><span class="cx"> 
</span><ins>+import collections
</ins><span class="cx"> import ldap
</span><span class="cx"> 
</span><span class="cx"> from twisted.python.constants import Names, NamedConstant
</span><span class="cx"> from twisted.internet.defer import succeed, inlineCallbacks, returnValue
</span><del>-from twisted.internet.threads import deferToThread
</del><ins>+from twisted.internet.threads import deferToThreadPool
</ins><span class="cx"> from twisted.cred.credentials import IUsernamePassword
</span><ins>+from twisted.python.threadpool import ThreadPool
+from twisted.internet import reactor
</ins><span class="cx"> 
</span><span class="cx"> from twext.python.log import Logger
</span><span class="cx"> from twext.python.types import MappingProxyType
</span><span class="lines">@@ -101,7 +106,7 @@
</span><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx"> #
</span><del>-# Data type extentions
</del><ins>+# Data type extensions
</ins><span class="cx"> #
</span><span class="cx"> 
</span><span class="cx"> class FieldName(Names):
</span><span class="lines">@@ -219,6 +224,9 @@
</span><span class="cx">         useTLS=False,
</span><span class="cx">         fieldNameToAttributesMap=DEFAULT_FIELDNAME_ATTRIBUTE_MAP,
</span><span class="cx">         recordTypeSchemas=DEFAULT_RECORDTYPE_SCHEMAS,
</span><ins>+        ownThreadpool=True,
+        threadPoolMax=10,
+        connectionMax=10,
</ins><span class="cx">         _debug=False,
</span><span class="cx">     ):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="lines">@@ -294,19 +302,133 @@
</span><span class="cx">                 attributesToFetch.add(attribute.encode(&quot;utf-8&quot;))
</span><span class="cx">         self._attributesToFetch = list(attributesToFetch)
</span><span class="cx"> 
</span><ins>+        # Threaded connection pool. The connection size limit here is the size for connections doing queries.
+        # There will also be one-off connections for authentications which also run in their own threads. Thus
+        # the threadpool max ought to be larger than the connection max to allow for both pooled query connections
+        # and one-off auth-only connections.
</ins><span class="cx"> 
</span><ins>+        self.ownThreadpool = ownThreadpool
+        if self.ownThreadpool:
+            self.threadpool = ThreadPool(minthreads=1, maxthreads=threadPoolMax, name=&quot;LDAPDirectoryService&quot;)
+        else:
+            # Use the default threadpool but adjust its size to fit our needs
+            self.threadpool = reactor.getThreadPool()
+            self.threadpool.adjustPoolsize(
+                max(threadPoolMax, self.threadpool.max)
+            )
+        self.connectionMax = connectionMax
+        self.connectionCreateLock = RLock()
+        self.connections = []
+        self.connectionQueue = Queue()
+        self.poolStats = collections.defaultdict(int)
+        self.activeCount = 0
+
+        reactor.callWhenRunning(self.start)
+        reactor.addSystemEventTrigger('during', 'shutdown', self.stop)
+
+
+    def start(self):
+        &quot;&quot;&quot;
+        Start up this service. Initialize the threadpool (if we own it).
+        &quot;&quot;&quot;
+        if self.ownThreadpool:
+            self.threadpool.start()
+
+
+    def stop(self):
+        &quot;&quot;&quot;
+        Stop the service. Stop the threadpool if we own it and do other clean-up.
+        &quot;&quot;&quot;
+        if self.ownThreadpool:
+            self.threadpool.stop()
+
+        # FIXME: we should probably also close the pool of active connections too
+
+
</ins><span class="cx">     @property
</span><span class="cx">     def realmName(self):
</span><span class="cx">         return u&quot;{self.url}&quot;.format(self=self)
</span><span class="cx"> 
</span><span class="cx"> 
</span><del>-    @inlineCallbacks
</del><ins>+    class Connection(object):
+        &quot;&quot;&quot;
+        ContextManager object for getting a connection from the pool. On exit the connection
+        will be put back in the pool if no exception was raised. Otherwise, the connection will be
+        removed from the active connection list, which will allow a new &quot;clean&quot; connection to
+        be created later if needed.
+        &quot;&quot;&quot;
+
+        def __init__(self, ds):
+            self.ds = ds
+
+        def __enter__(self):
+            self.connection = self.ds._getConnection()
+            return self.connection
+
+        def __exit__(self, exc_type, exc_val, exc_tb):
+            if exc_type is None:
+                self.ds._returnConnection(self.connection)
+                return True
+            else:
+                self.ds._failedConnection(self.connection)
+                return False
+
+
+    def _getConnection(self):
+        &quot;&quot;&quot;
+        Get a connection from the connection pool. This will retrieve a connection from the connection
+        pool L{Queue} object. If the L{Queue} is empty, it will check to see whether a new connection can
+        be created (based on the connection limit), and if so create that and use it. If no new
+        connections can be created, it will block on the L{Queue} until an existing, in-use, connection
+        is put back.
+        &quot;&quot;&quot;
+        try:
+            connection = self.connectionQueue.get(block=False)
+        except Empty:
+            # Note we use a lock here to prevent a race condition in which multiple requests for a new connection
+            # could succeed even though the connection counts starts out one less than the maximum. This can happen
+            # because self._connect() can take a while.
+            self.connectionCreateLock.acquire()
+            if len(self.connections) &lt; self.connectionMax:
+                connection = self._connect()
+                self.connections.append(connection)
+                self.connectionCreateLock.release()
+            else:
+                self.connectionCreateLock.release()
+                self.poolStats[&quot;connection-blocked&quot;] += 1
+                connection = self.connectionQueue.get()
+
+        self.poolStats[&quot;connection-{}&quot;.format(self.connections.index(connection))] += 1
+        self.activeCount += 1
+        self.poolStats[&quot;connection-max&quot;] = max(self.poolStats[&quot;connection-max&quot;], self.activeCount)
+        return connection
+
+
+    def _returnConnection(self, connection):
+        &quot;&quot;&quot;
+        A connection is no longer needed - return it to the pool.
+        &quot;&quot;&quot;
+        self.activeCount -= 1
+        self.connectionQueue.put(connection)
+
+
+    def _failedConnection(self, connection):
+        &quot;&quot;&quot;
+        A connection has failed - remove it from the list of active connections. A new
+        one will be created if needed.
+        &quot;&quot;&quot;
+        self.activeCount -= 1
+        self.poolStats[&quot;connection-errors&quot;] += 1
+        self.connections.remove(connection)
+
+
</ins><span class="cx">     def _connect(self):
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="cx">         Connect to the directory server.
</span><ins>+        This will always be called in a thread to prevent blocking.
</ins><span class="cx"> 
</span><del>-        @returns: A deferred connection object.
-        @rtype: deferred L{ldap.ldapobject.LDAPObject}
</del><ins>+        @returns: The connection object.
+        @rtype: L{ldap.ldapobject.LDAPObject}
</ins><span class="cx"> 
</span><span class="cx">         @raises: L{LDAPConnectionError} if unable to connect.
</span><span class="cx">         &quot;&quot;&quot;
</span><span class="lines">@@ -314,75 +436,54 @@
</span><span class="cx">         # FIXME: ldap connection objects are not thread safe, so let's set up
</span><span class="cx">         # a connection pool
</span><span class="cx"> 
</span><del>-        if not hasattr(self, &quot;_connection&quot;):
-            self.log.debug(&quot;Connecting to LDAP at {log_source.url}&quot;)
-            connection = ldap.initialize(self.url)
</del><ins>+        self.log.debug(&quot;Connecting to LDAP at {log_source.url}&quot;)
+        connection = self._newConnection()
</ins><span class="cx"> 
</span><del>-            # FIXME: Use trace_file option to wire up debug logging when
-            # Twisted adopts the new logging stuff.
-
-            for option, value in (
-                (ldap.OPT_TIMEOUT, self._timeout),
-                (ldap.OPT_X_TLS_CACERTFILE, self._tlsCACertificateFile),
-                (ldap.OPT_X_TLS_CACERTDIR, self._tlsCACertificateDirectory),
-                (ldap.OPT_DEBUG_LEVEL, self._debug),
-            ):
-                if value is not None:
-                    connection.set_option(option, value)
-
-            if self._useTLS:
-                self.log.debug(&quot;Starting TLS for {log_source.url}&quot;)
-                yield deferToThread(connection.start_tls_s)
-
-            if self._credentials is not None:
-                if IUsernamePassword.providedBy(self._credentials):
-                    try:
-                        yield deferToThread(
-                            connection.simple_bind_s,
-                            self._credentials.username,
-                            self._credentials.password,
-                        )
-                        self.log.debug(
-                            &quot;Bound to LDAP as {credentials.username}&quot;,
-                            credentials=self._credentials
-                        )
-                    except (
-                        ldap.INVALID_CREDENTIALS, ldap.INVALID_DN_SYNTAX
-                    ) as e:
-                        self.log.error(
-                            &quot;Unable to bind to LDAP as {credentials.username}&quot;,
-                            credentials=self._credentials
-                        )
-                        raise LDAPBindAuthError(
-                            self._credentials.username, e
-                        )
-
-                else:
-                    raise LDAPConnectionError(
-                        &quot;Unknown credentials type: {0}&quot;
-                        .format(self._credentials)
</del><ins>+        if self._credentials is not None:
+            if IUsernamePassword.providedBy(self._credentials):
+                try:
+                    connection.simple_bind_s(
+                        self._credentials.username,
+                        self._credentials.password,
</ins><span class="cx">                     )
</span><ins>+                    self.log.debug(
+                        &quot;Bound to LDAP as {credentials.username}&quot;,
+                        credentials=self._credentials
+                    )
+                except (
+                    ldap.INVALID_CREDENTIALS, ldap.INVALID_DN_SYNTAX
+                ) as e:
+                    self.log.error(
+                        &quot;Unable to bind to LDAP as {credentials.username}&quot;,
+                        credentials=self._credentials
+                    )
+                    raise LDAPBindAuthError(
+                        self._credentials.username, e
+                    )
</ins><span class="cx"> 
</span><del>-            self._connection = connection
</del><ins>+            else:
+                raise LDAPConnectionError(
+                    &quot;Unknown credentials type: {0}&quot;
+                    .format(self._credentials)
+                )
</ins><span class="cx"> 
</span><del>-        returnValue(self._connection)
</del><ins>+        return connection
</ins><span class="cx"> 
</span><span class="cx"> 
</span><del>-    @inlineCallbacks
-    def _authenticateUsernamePassword(self, dn, password):
</del><ins>+    def _newConnection(self):
</ins><span class="cx">         &quot;&quot;&quot;
</span><del>-        Open a secondary connection to the LDAP server and try binding to it
-        with the given credentials
</del><ins>+        Create a new LDAP connection and initialize and start TLS if required.
+        This will always be called in a thread to prevent blocking.
</ins><span class="cx"> 
</span><del>-        @returns: True if the password is correct, False otherwise
-        @rtype: deferred C{bool}
</del><ins>+        @returns: The connection object.
+        @rtype: L{ldap.ldapobject.LDAPObject}
</ins><span class="cx"> 
</span><span class="cx">         @raises: L{LDAPConnectionError} if unable to connect.
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        self.log.debug(&quot;Authenticating {dn}&quot;, dn=dn)
</del><span class="cx">         connection = ldap.initialize(self.url)
</span><span class="cx"> 
</span><del>-        # FIXME:  Use a separate connection pool perhaps
</del><ins>+        # FIXME: Use trace_file option to wire up debug logging when
+        # Twisted adopts the new logging stuff.
</ins><span class="cx"> 
</span><span class="cx">         for option, value in (
</span><span class="cx">             (ldap.OPT_TIMEOUT, self._timeout),
</span><span class="lines">@@ -395,102 +496,158 @@
</span><span class="cx"> 
</span><span class="cx">         if self._useTLS:
</span><span class="cx">             self.log.debug(&quot;Starting TLS for {log_source.url}&quot;)
</span><del>-            yield deferToThread(connection.start_tls_s)
</del><ins>+            connection.start_tls_s()
</ins><span class="cx"> 
</span><ins>+        return connection
+
+
+    def _authenticateUsernamePassword(self, dn, password):
+        &quot;&quot;&quot;
+        Open a secondary connection to the LDAP server and try binding to it
+        with the given credentials
+
+        @returns: True if the password is correct, False otherwise
+        @rtype: deferred C{bool}
+
+        @raises: L{LDAPConnectionError} if unable to connect.
+        &quot;&quot;&quot;
+        return deferToThreadPool(
+            reactor, self.threadpool,
+            self._authenticateUsernamePassword_inThread, dn, password
+        )
+
+
+    def _authenticateUsernamePassword_inThread(self, dn, password):
+        &quot;&quot;&quot;
+        Open a secondary connection to the LDAP server and try binding to it
+        with the given credentials.
+        This method is always called in a thread.
+
+        @returns: True if the password is correct, False otherwise
+        @rtype: C{bool}
+
+        @raises: L{LDAPConnectionError} if unable to connect.
+        &quot;&quot;&quot;
+        self.log.debug(&quot;Authenticating {dn}&quot;, dn=dn)
+        connection = self._newConnection()
+
+
</ins><span class="cx">         try:
</span><del>-            yield deferToThread(
-                connection.simple_bind_s,
-                dn,
-                password,
-            )
</del><ins>+            connection.simple_bind_s(dn, password)
</ins><span class="cx">             self.log.debug(&quot;Authenticated {dn}&quot;, dn=dn)
</span><del>-            returnValue(True)
</del><ins>+            return True
</ins><span class="cx">         except (
</span><span class="cx">             ldap.INVALID_CREDENTIALS, ldap.INVALID_DN_SYNTAX
</span><span class="cx">         ):
</span><span class="cx">             self.log.debug(&quot;Unable to authenticate {dn}&quot;, dn=dn)
</span><del>-            returnValue(False)
</del><ins>+            return False
+        finally:
+            # TODO: should we explicitly &quot;close&quot; the connection in a finally
+            # clause rather than just let it go out of scope and be garbage collected
+            # at some indeterminate point in the future? Up side is that we won't hang
+            # on to the connection or other resources for longer than needed. Down side
+            # is we will take a little bit of extra time in this call to close it down.
+            # If we do decide to &quot;close&quot; then we probably have to use one of the &quot;unbind&quot;
+            # methods on the L{LDAPObject}.
+            connection = None
</ins><span class="cx"> 
</span><span class="cx"> 
</span><del>-    @inlineCallbacks
</del><span class="cx">     def _recordsFromQueryString(self, queryString, recordTypes=None):
</span><ins>+        return deferToThreadPool(
+            reactor, self.threadpool,
+            self._recordsFromQueryString_inThread,
+            queryString,
+            recordTypes,
+        )
+
+
+    def _recordsFromQueryString_inThread(self, queryString, recordTypes=None):
+        &quot;&quot;&quot;
+        This method is always called in a thread.
+        &quot;&quot;&quot;
</ins><span class="cx">         records = []
</span><span class="cx"> 
</span><del>-        connection = yield self._connect()
</del><ins>+        with DirectoryService.Connection(self) as connection:
</ins><span class="cx"> 
</span><del>-        if recordTypes is None:
-            recordTypes = self.recordTypes()
</del><ins>+            if recordTypes is None:
+                recordTypes = self.recordTypes()
</ins><span class="cx"> 
</span><del>-        for recordType in recordTypes:
-            rdn = self._recordTypeSchemas[recordType].relativeDN
-            rdn = (
-                ldap.dn.str2dn(rdn.lower()) +
-                ldap.dn.str2dn(self._baseDN.lower())
-            )
-            self.log.debug(
-                &quot;Performing LDAP query: {rdn} {query} {recordType}&quot;,
-                rdn=rdn,
-                query=queryString,
-                recordType=recordType
-            )
-            try:
-                reply = yield deferToThread(
-                    connection.search_s,
-                    ldap.dn.dn2str(rdn),
-                    ldap.SCOPE_SUBTREE,
-                    queryString,
-                    attrlist=self._attributesToFetch
</del><ins>+            for recordType in recordTypes:
+                rdn = self._recordTypeSchemas[recordType].relativeDN
+                rdn = (
+                    ldap.dn.str2dn(rdn.lower()) +
+                    ldap.dn.str2dn(self._baseDN.lower())
</ins><span class="cx">                 )
</span><ins>+                self.log.debug(
+                    &quot;Performing LDAP query: {rdn} {query} {recordType}&quot;,
+                    rdn=rdn,
+                    query=queryString,
+                    recordType=recordType
+                )
+                try:
+                    reply = connection.search_s(
+                        ldap.dn.dn2str(rdn),
+                        ldap.SCOPE_SUBTREE,
+                        queryString,
+                        attrlist=self._attributesToFetch
+                    )
</ins><span class="cx"> 
</span><del>-            except ldap.FILTER_ERROR as e:
-                self.log.error(
-                    &quot;Unable to perform query {0!r}: {1}&quot;
-                    .format(queryString, e)
</del><ins>+                except ldap.FILTER_ERROR as e:
+                    self.log.error(
+                        &quot;Unable to perform query {0!r}: {1}&quot;
+                        .format(queryString, e)
+                    )
+                    raise LDAPQueryError(&quot;Unable to perform query&quot;, e)
+
+                except ldap.NO_SUCH_OBJECT as e:
+                    self.log.warn(&quot;RDN {rdn} does not exist, skipping&quot;, rdn=rdn)
+                    continue
+
+                records.extend(
+                    self._recordsFromReply(reply, recordType=recordType)
</ins><span class="cx">                 )
</span><del>-                raise LDAPQueryError(&quot;Unable to perform query&quot;, e)
</del><span class="cx"> 
</span><del>-            except ldap.NO_SUCH_OBJECT as e:
-                self.log.warn(&quot;RDN {rdn} does not exist, skipping&quot;, rdn=rdn)
-                continue
</del><ins>+        return records
</ins><span class="cx"> 
</span><del>-            records.extend(
-                (yield self._recordsFromReply(reply, recordType=recordType))
-            )
-        returnValue(records)
</del><span class="cx"> 
</span><ins>+    def _recordWithDN(self, dn):
+        return deferToThreadPool(
+            reactor, self.threadpool,
+            self._recordWithDN_inThread, dn
+        )
</ins><span class="cx"> 
</span><del>-    @inlineCallbacks
-    def _recordWithDN(self, dn):
</del><ins>+
+    def _recordWithDN_inThread(self, dn):
</ins><span class="cx">         &quot;&quot;&quot;
</span><ins>+        This method is always called in a thread.
+
</ins><span class="cx">         @param dn: The DN of the record to search for
</span><span class="cx">         @type dn: C{str}
</span><span class="cx">         &quot;&quot;&quot;
</span><del>-        connection = yield self._connect()
</del><ins>+        with DirectoryService.Connection(self) as connection:
</ins><span class="cx"> 
</span><del>-        self.log.debug(&quot;Performing LDAP DN query: {dn}&quot;, dn=dn)
</del><ins>+            self.log.debug(&quot;Performing LDAP DN query: {dn}&quot;, dn=dn)
</ins><span class="cx"> 
</span><del>-        reply = yield deferToThread(
-            connection.search_s,
-            dn,
-            ldap.SCOPE_SUBTREE,
-            &quot;(objectClass=*)&quot;,
-            attrlist=self._attributesToFetch
-        )
-        records = self._recordsFromReply(reply)
</del><ins>+            reply = connection.search_s(
+                dn,
+                ldap.SCOPE_SUBTREE,
+                &quot;(objectClass=*)&quot;,
+                attrlist=self._attributesToFetch
+            )
+            records = self._recordsFromReply(reply)
+
</ins><span class="cx">         if len(records):
</span><del>-            returnValue(records[0])
</del><ins>+            return records[0]
</ins><span class="cx">         else:
</span><del>-            returnValue(None)
</del><ins>+            return None
</ins><span class="cx"> 
</span><span class="cx"> 
</span><span class="cx">     def _recordsFromReply(self, reply, recordType=None):
</span><span class="cx">         records = []
</span><span class="cx"> 
</span><del>-
</del><span class="cx">         for dn, recordData in reply:
</span><span class="cx"> 
</span><span class="cx">             # Determine the record type
</span><del>-
</del><span class="cx">             if recordType is None:
</span><span class="cx">                 recordType = recordTypeForDN(
</span><span class="cx">                     self._baseDN, self._recordTypeSchemas, dn
</span><span class="lines">@@ -510,7 +667,6 @@
</span><span class="cx">                 continue
</span><span class="cx"> 
</span><span class="cx">             # Populate a fields dictionary
</span><del>-
</del><span class="cx">             fields = {}
</span><span class="cx"> 
</span><span class="cx">             for attribute, values in recordData.iteritems():
</span></span></pre></div>
<a id="twexttrunktwextwhotesttest_concurrencypy"></a>
<div class="addfile"><h4>Added: twext/trunk/twext/who/test/test_concurrency.py (0 => 13857)</h4>
<pre class="diff"><span>
<span class="info">--- twext/trunk/twext/who/test/test_concurrency.py                                (rev 0)
+++ twext/trunk/twext/who/test/test_concurrency.py        2014-08-08 19:36:13 UTC (rev 13857)
</span><span class="lines">@@ -0,0 +1,203 @@
</span><ins>+##
+# Copyright (c) 2013-2014 Apple Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the &quot;License&quot;);
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an &quot;AS IS&quot; BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+
+from __future__ import print_function
+from twext.python.types import MappingProxyType
+from twext.who.directory import DirectoryRecord
+from twext.who.idirectory import RecordType, FieldName
+from twext.who.ldap._constants import LDAPAttribute, LDAPObjectClass
+from twext.who.ldap._service import DirectoryService as LDAPDirectoryService, \
+    DEFAULT_FIELDNAME_ATTRIBUTE_MAP, RecordTypeSchema
+from twext.who.opendirectory._service import DirectoryService as ODDirectoryService
+from twisted.internet.defer import inlineCallbacks, DeferredList, returnValue
+from twisted.trial import unittest
+import time
+
+&quot;&quot;&quot;
+Test the concurrency of DS implementations against real servers.
+&quot;&quot;&quot;
+
+TEST_FIELDNAME_MAP = dict(DEFAULT_FIELDNAME_ATTRIBUTE_MAP)
+TEST_FIELDNAME_MAP[FieldName.uid] = (u&quot;uid&quot;,)
+
+TEST_RECORDTYPE_SCHEMAS_OSX = MappingProxyType({
+
+    RecordType.user: RecordTypeSchema(
+        # cn=users
+        relativeDN=u&quot;cn={0}&quot;.format(&quot;users&quot;),
+
+        # (objectClass=inetOrgPerson)
+        attributes=(
+            (
+                LDAPAttribute.objectClass.value,
+                LDAPObjectClass.inetOrgPerson.value,
+            ),
+        ),
+    ),
+
+    RecordType.group: RecordTypeSchema(
+        # cn=groups
+        relativeDN=u&quot;cn={0}&quot;.format(&quot;groups&quot;),
+
+        # (objectClass=groupOfNames)
+        attributes=(
+            (
+                LDAPAttribute.objectClass.value,
+                LDAPObjectClass.groupOfNames.value,
+            ),
+        ),
+    ),
+
+})
+
+TEST_RECORDTYPE_SCHEMAS_OTHER = MappingProxyType({
+
+    RecordType.user: RecordTypeSchema(
+        # ou=person
+        relativeDN=u&quot;ou={0}&quot;.format(&quot;people&quot;),
+
+        # (objectClass=inetOrgPerson)
+        attributes=(
+            (
+                LDAPAttribute.objectClass.value,
+                LDAPObjectClass.inetOrgPerson.value,
+            ),
+        ),
+    ),
+
+    RecordType.group: RecordTypeSchema(
+        # ou=groupOfNames
+        relativeDN=u&quot;ou={0}&quot;.format(LDAPObjectClass.groupOfNames.value),
+
+        # (objectClass=groupOfNames)
+        attributes=(
+            (
+                LDAPAttribute.objectClass.value,
+                LDAPObjectClass.groupOfNames.value,
+            ),
+        ),
+    ),
+
+})
+
+class DirectoryServiceConcurrencyTest(unittest.TestCase):
+    &quot;&quot;&quot;
+    Tests for directory records.
+    &quot;&quot;&quot;
+
+    @inlineCallbacks
+    def _runTest(self, num_threads, multiple_services, service_maker, details, num_requests, do_auth):
+
+        if multiple_services:
+            services = [service_maker() for _ in range(num_threads)]
+        else:
+            services = [service_maker()] * num_threads
+
+        # Warm up each service before starting timer
+        for n, svc in enumerate(services):
+            record = yield svc.recordWithShortName(RecordType.user, details[&quot;user&quot;].format(n + 1))
+            self.assertTrue(isinstance(record, DirectoryRecord))
+
+        start = time.time()
+        ctr = [0]
+
+        @inlineCallbacks
+        def _records(n):
+            for _ in range(num_requests):
+                record = yield services[n].recordWithShortName(RecordType.user, details[&quot;user&quot;].format(n + 1))
+                self.assertTrue(isinstance(record, DirectoryRecord))
+                ctr[0] += 1
+
+        @inlineCallbacks
+        def _auth(n):
+            record = yield services[n].recordWithShortName(RecordType.user, details[&quot;user&quot;].format(n + 1))
+            for _ in range(num_requests):
+                result = yield record.verifyPlaintextPassword(details[&quot;pswd&quot;].format(n + 1))
+                self.assertTrue(result)
+                ctr[0] += 1
+
+        dl = []
+        for i in range(num_threads):
+            dl.append((_auth if do_auth else _records)(i))
+
+        dl = DeferredList(dl)
+        yield dl
+
+        returnValue((time.time() - start, ctr[0],))
+
+
+    @inlineCallbacks
+    def test_ldap_multi_service(self):
+        &quot;&quot;&quot;
+        See if {ldap._service.DirectoryService is concurrent.
+        &quot;&quot;&quot;
+
+        num_threads = 20
+        multiple_services = False
+        num_requests = 100
+        do_auth = False
+        use_od = False
+        configChoice = &quot;local&quot;
+
+        configs = {
+            &quot;local&quot;: {
+                &quot;url&quot;: &quot;ldap://localhost&quot;,
+                &quot;baseDN&quot;: &quot;dc=example,dc=com&quot;,
+                &quot;rschema&quot;: TEST_RECORDTYPE_SCHEMAS_OSX,
+                &quot;user&quot;: u&quot;user{:02d}&quot;,
+                &quot;pswd&quot;: u&quot;user{:02d}&quot;,
+            },
+            &quot;example&quot;: {
+                &quot;url&quot;: &quot;ldap://example.com&quot;,
+                &quot;baseDN&quot;: &quot;o=example.com,o=email&quot;,
+                &quot;rschema&quot;: TEST_RECORDTYPE_SCHEMAS_OTHER,
+                &quot;user&quot;: u&quot;TestAccount{}&quot;,
+                &quot;pswd&quot;: u&quot;pswd&quot;,
+            },
+        }
+
+        details = configs[configChoice]
+
+        def _serviceMaker():
+            if use_od:
+                return ODDirectoryService(
+                    nodeName=&quot;/LDAPv3/127.0.0.1&quot;,
+                )
+            else:
+                return LDAPDirectoryService(
+                    url=details[&quot;url&quot;],
+                    baseDN=details[&quot;baseDN&quot;],
+                    fieldNameToAttributesMap=TEST_FIELDNAME_MAP,
+                    recordTypeSchemas=details[&quot;rschema&quot;],
+                    threadPoolMax=20,
+                )
+
+
+        duration, count = yield self._runTest(num_threads, multiple_services, _serviceMaker, details, num_requests, do_auth)
+
+        print(
+            &quot;\n\nType: {} {} {}\nNumber of Services/Requests: {}/{}\nTime: {}\nCount: {}\n&quot;.format(
+                &quot;OD&quot; if use_od else &quot;LDAP&quot;,
+                &quot;Multiple&quot; if multiple_services else &quot;Single&quot;,
+                &quot;Auth&quot; if do_auth else &quot;query&quot;,
+                num_threads,
+                num_requests,
+                duration,
+                count,
+            )
+        )
+
+    test_ldap_multi_service.skip = &quot;Not really a unit test - requires actually server to work&quot;
</ins></span></pre>
</div>
</div>

</body>
</html>