Revision: 15477 http://trac.calendarserver.org//changeset/15477 Author: wsanchez@apple.com Date: 2016-03-17 15:16:13 -0700 (Thu, 17 Mar 2016) Log Message: ----------- lint Modified Paths: -------------- twext/trunk/twext/who/ldap/_service.py Modified: twext/trunk/twext/who/ldap/_service.py =================================================================== --- twext/trunk/twext/who/ldap/_service.py 2016-03-16 22:56:28 UTC (rev 15476) +++ twext/trunk/twext/who/ldap/_service.py 2016-03-17 22:16:13 UTC (rev 15477) @@ -15,12 +15,12 @@ # limitations under the License. ## -from __future__ import print_function - """ LDAP directory service implementation. """ +from __future__ import print_function + from Queue import Queue, Empty from threading import RLock from uuid import UUID @@ -115,6 +115,9 @@ # class FieldName(Names): + """ + LDAP field names. + """ dn = NamedConstant() dn.description = u"distinguished name" @@ -271,9 +274,7 @@ @param extraFilters: A dict (keyed off recordType) of extra filter fragments to AND in to any generated queries. @type extraFilters: L{dicts} of L{unicode} - """ - self.url = url self._baseDN = baseDN self._credentials = credentials @@ -325,14 +326,21 @@ attributesToFetch.add(attribute.encode("utf-8")) self._attributesToFetch = list(attributesToFetch) - # Threaded connection pool. The connection size limit here is the size for connections doing queries. - # There will also be one-off connections for authentications which also run in their own threads. Thus - # the threadpool max ought to be larger than the connection max to allow for both pooled query connections - # and one-off auth-only connections. + # Threaded connection pool. + # The connection size limit here is the size for connections doing + # queries. + # There will also be one-off connections for authentications which also + # run in their own threads. + # Thus the threadpool max ought to be larger than the connection max to + # allow for both pooled query connections and one-off auth-only + # connections. self.ownThreadpool = ownThreadpool if self.ownThreadpool: - self.threadpool = ThreadPool(minthreads=1, maxthreads=threadPoolMax, name="LDAPDirectoryService") + self.threadpool = ThreadPool( + minthreads=1, maxthreads=threadPoolMax, + name="LDAPDirectoryService", + ) else: # Use the default threadpool but adjust its size to fit our needs self.threadpool = reactor.getThreadPool() @@ -347,7 +355,7 @@ self.activeCount = 0 reactor.callWhenRunning(self.start) - reactor.addSystemEventTrigger('during', 'shutdown', self.stop) + reactor.addSystemEventTrigger("during", "shutdown", self.stop) def start(self): @@ -360,12 +368,14 @@ def stop(self): """ - Stop the service. Stop the threadpool if we own it and do other clean-up. + Stop the service. + Stop the threadpool if we own it and do other clean-up. """ if self.ownThreadpool: self.threadpool.stop() - # FIXME: we should probably also close the pool of active connections too + # FIXME: we should probably also close the pool of active connections + # too. @property @@ -375,10 +385,12 @@ class Connection(object): """ - ContextManager object for getting a connection from the pool. On exit the connection - will be put back in the pool if no exception was raised. Otherwise, the connection will be - removed from the active connection list, which will allow a new "clean" connection to - be created later if needed. + ContextManager object for getting a connection from the pool. + On exit the connection will be put back in the pool if no exception was + raised. + Otherwise, the connection will be removed from the active connection + list, which will allow a new "clean" connection to be created later if + needed. """ def __init__(self, ds): @@ -399,18 +411,22 @@ def _getConnection(self): """ - Get a connection from the connection pool. This will retrieve a connection from the connection - pool L{Queue} object. If the L{Queue} is empty, it will check to see whether a new connection can - be created (based on the connection limit), and if so create that and use it. If no new - connections can be created, it will block on the L{Queue} until an existing, in-use, connection - is put back. + Get a connection from the connection pool. + This will retrieve a connection from the connection pool L{Queue} + object. + If the L{Queue} is empty, it will check to see whether a new connection + can be created (based on the connection limit), and if so create that + and use it. + If no new connections can be created, it will block on the L{Queue} + until an existing, in-use, connection is put back. """ try: connection = self.connectionQueue.get(block=False) except Empty: - # Note we use a lock here to prevent a race condition in which multiple requests for a new connection - # could succeed even though the connection counts starts out one less than the maximum. This can happen - # because self._connect() can take a while. + # Note we use a lock here to prevent a race condition in which + # multiple requests for a new connection could succeed even though + # the connection counts starts out one less than the maximum. + # This can happen because self._connect() can take a while. self.connectionCreateLock.acquire() if len(self.connections) < self.connectionMax: connection = self._connect() @@ -421,9 +437,16 @@ self.poolStats["connection-blocked"] += 1 connection = self.connectionQueue.get() - self.poolStats["connection-{}".format(self.connections.index(connection))] += 1 + + connectionID = "connection-{}".format( + self.connections.index(connection) + ) + + self.poolStats[connectionID] += 1 self.activeCount += 1 - self.poolStats["connection-max"] = max(self.poolStats["connection-max"], self.activeCount) + self.poolStats["connection-max"] = max( + self.poolStats["connection-max"], self.activeCount + ) return connection @@ -437,8 +460,8 @@ def _failedConnection(self, connection): """ - A connection has failed - remove it from the list of active connections. A new - one will be created if needed. + A connection has failed; remove it from the list of active connections. + A new one will be created if needed. """ self.activeCount -= 1 self.poolStats["connection-errors"] += 1 @@ -455,7 +478,6 @@ @raises: L{LDAPConnectionError} if unable to connect. """ - # FIXME: ldap connection objects are not thread safe, so let's set up # a connection pool @@ -568,12 +590,14 @@ return False finally: # TODO: should we explicitly "close" the connection in a finally - # clause rather than just let it go out of scope and be garbage collected - # at some indeterminate point in the future? Up side is that we won't hang - # on to the connection or other resources for longer than needed. Down side - # is we will take a little bit of extra time in this call to close it down. - # If we do decide to "close" then we probably have to use one of the "unbind" - # methods on the L{LDAPObject}. + # clause rather than just let it go out of scope and be garbage + # collected at some indeterminate point in the future? + # Up side is that we won't hang on to the connection or other + # resources for longer than needed. + # Down side is we will take a little bit of extra time in this call + # to close it down. + # If we do decide to "close" then we probably have to use one of + # the "unbind" methods on the L{LDAPObject}. connection = None @@ -603,9 +627,8 @@ self, queryString, recordTypes=None, limitResults=None, timeoutSeconds=None ): - """ - This method is always called in a thread. - """ + # This method is always called in a thread. + if recordTypes is None: recordTypes = list(self.recordTypes()) @@ -634,14 +657,23 @@ ldap.dn.str2dn(rdn.lower()) + ldap.dn.str2dn(self._baseDN.lower()) ) - filteredQuery = self._addExtraFilter(recordType, queryString) + filteredQuery = self._addExtraFilter( + recordType, queryString + ) self.log.debug( - "Performing LDAP query: {rdn} {query} {recordType}{limit}{timeout}", + "Performing LDAP query: " + "{rdn} {query} {recordType}{limit}{timeout}", rdn=rdn, query=filteredQuery, recordType=recordType, - limit=" limit={}".format(limitResults) if limitResults else "", - timeout=" timeout={}".format(timeoutSeconds) if timeoutSeconds else "", + limit=( + " limit={}".format(limitResults) + if limitResults else "" + ), + timeout=( + " timeout={}".format(timeoutSeconds) + if timeoutSeconds else "" + ), ) try: s = ldap.async.List(connection) @@ -650,8 +682,14 @@ ldap.SCOPE_SUBTREE, filteredQuery, attrList=self._attributesToFetch, - timeout=timeoutSeconds if timeoutSeconds else -1, - sizelimit=limitResults if limitResults else 0 + timeout=( + timeoutSeconds + if timeoutSeconds else -1 + ), + sizelimit=( + limitResults + if limitResults else 0 + ), ) s.processResults() @@ -669,7 +707,9 @@ raise LDAPQueryError("Unable to perform query", e) except ldap.NO_SUCH_OBJECT as e: - # self.log.warn("RDN {rdn} does not exist, skipping", rdn=rdn) + # self.log.warn( + # "RDN {rdn} does not exist, skipping", rdn=rdn + # ) continue except ldap.INVALID_SYNTAX as e: @@ -690,12 +730,19 @@ ) raise LDAPQueryError("Unable to perform query", e) - reply = [resultItem for _ignore_resultType, resultItem in s.allResults] + reply = [ + resultItem + for _ignore_resultType, resultItem + in s.allResults + ] - newRecords = self._recordsFromReply(reply, recordType=recordType) + newRecords = self._recordsFromReply( + reply, recordType=recordType + ) self.log.debug( - "Records from LDAP query ({rdn} {query} {recordType}): {count}", + "Records from LDAP query " + "({rdn} {query} {recordType}): {count}", rdn=rdn, query=queryString, recordType=recordType, @@ -708,18 +755,16 @@ records.extend(newRecords) except ldap.SERVER_DOWN as e: - self.log.error( - "LDAP server unavailable" - ) + self.log.error("LDAP server unavailable") if self._retryNumber + 1 == self._tries: - # We've hit SERVER_DOWN self._tries times, giving up + # We've hit SERVER_DOWN self._tries times, giving up. raise LDAPQueryError("LDAP server down", e) else: self.log.error("LDAP connection failure; retrying...") else: # Only retry if we got ldap.SERVER_DOWN, otherwise break out of - # loop + # loop. break self.log.debug( @@ -740,11 +785,10 @@ def _recordWithDN_inThread(self, dn): """ - This method is always called in a thread. - @param dn: The DN of the record to search for @type dn: C{str} """ + # This method is always called in a thread. records = [] @@ -819,7 +863,9 @@ # Populate a fields dictionary fields = {} - for fieldName, attributeRules in self._fieldNameToAttributesMap.iteritems(): + for fieldName, attributeRules in ( + self._fieldNameToAttributesMap.iteritems() + ): valueType = self.fieldName.valueType(fieldName) for attributeRule in attributeRules: @@ -845,8 +891,10 @@ newValues = [valueType(v) for v in values] except Exception, e: self.log.warn( - "Can't parse value {name} {values} ({error})", - name=fieldName, values=values, error=str(e) + "Can't parse value {name} {values} " + "({error})", + name=fieldName, values=values, + error=str(e) ) continue @@ -879,13 +927,17 @@ if not isinstance(values, list): values = [values] - _ignore_attribute, attributeValue, fieldValue = attributeRule.split(":") + _ignore_attribute, attributeValue, fieldValue = ( + attributeRule.split(":") + ) for value in values: if value == attributeValue: # convert to a constant try: - fieldValue = valueType.lookupByName(fieldValue) + fieldValue = ( + valueType.lookupByName(fieldValue) + ) fields[fieldName] = fieldValue except ValueError: pass @@ -1034,7 +1086,8 @@ dn = ldap.dn.str2dn(dnStr.lower()) attrName, value, ignored = dn[0][0] fieldName = self.service._attributeToFieldNameMap[attrName][0] - fieldValuesByRecordType.setdefault(recordType, []).append((fieldName, value)) + fieldValues = fieldValuesByRecordType.setdefault(recordType, []) + fieldValues.append((fieldName, value)) continue except: @@ -1098,6 +1151,9 @@ def reverseDict(source): + """ + Reverse keys and values in a mapping. + """ new = {} for key, values in source.iteritems(): @@ -1155,7 +1211,6 @@ @param recordData: LDAP record data. @type recordData: mapping """ - for recordType, schema in recordTypeSchemas.iteritems(): for attribute, value in schema.attributes: dataValue = recordData.get(attribute)
participants (1)
-
source_changes@macosforge.org