[CalendarServer-changes] [15111] twext/trunk/twext/who/ldap
source_changes at macosforge.org
source_changes at macosforge.org
Wed Sep 9 13:08:14 PDT 2015
Revision: 15111
http://trac.calendarserver.org//changeset/15111
Author: sagen at apple.com
Date: 2015-09-09 13:08:14 -0700 (Wed, 09 Sep 2015)
Log Message:
-----------
If we get ldap.SERVER_DOWN, retry a couple times with a new connection
Modified Paths:
--------------
twext/trunk/twext/who/ldap/__init__.py
twext/trunk/twext/who/ldap/_service.py
twext/trunk/twext/who/ldap/test/test_service.py
Modified: twext/trunk/twext/who/ldap/__init__.py
===================================================================
--- twext/trunk/twext/who/ldap/__init__.py 2015-09-08 19:11:06 UTC (rev 15110)
+++ twext/trunk/twext/who/ldap/__init__.py 2015-09-09 20:08:14 UTC (rev 15111)
@@ -23,6 +23,7 @@
"LDAPConfigurationError",
"LDAPConnectionError",
"LDAPBindAuthError",
+ "LDAPQueryError",
"RecordTypeSchema",
"DirectoryService",
"LDAPAttribute",
@@ -36,6 +37,7 @@
LDAPConfigurationError,
LDAPConnectionError,
LDAPBindAuthError,
+ LDAPQueryError,
RecordTypeSchema,
DirectoryService,
FieldName
Modified: twext/trunk/twext/who/ldap/_service.py
===================================================================
--- twext/trunk/twext/who/ldap/_service.py 2015-09-08 19:11:06 UTC (rev 15110)
+++ twext/trunk/twext/who/ldap/_service.py 2015-09-09 20:08:14 UTC (rev 15111)
@@ -603,105 +603,124 @@
This method is always called in a thread.
"""
- records = []
+ if recordTypes is None:
+ recordTypes = list(self.recordTypes())
- with DirectoryService.Connection(self) as connection:
+ # Retry if we get ldap.SERVER_DOWN
+ TRIES = 3
+ for self._retryNumber in xrange(TRIES):
- if recordTypes is None:
- recordTypes = self.recordTypes()
+ records = []
- for recordType in recordTypes:
+ try:
- if limitResults is not None:
- if limitResults < 1:
- break
+ with DirectoryService.Connection(self) as connection:
- try:
- rdn = self._recordTypeSchemas[recordType].relativeDN
- except KeyError:
- # Skip this unknown record type
- continue
+ for recordType in recordTypes:
- rdn = (
- ldap.dn.str2dn(rdn.lower()) +
- ldap.dn.str2dn(self._baseDN.lower())
- )
- filteredQuery = self._addExtraFilter(recordType, queryString)
- self.log.debug(
- "Performing LDAP query: {rdn} {query} {recordType}{limit}{timeout}",
- rdn=rdn,
- query=filteredQuery,
- recordType=recordType,
- limit=" limit={}".format(limitResults) if limitResults else "",
- timeout=" timeout={}".format(timeoutSeconds) if timeoutSeconds else "",
- )
- try:
- s = ldap.async.List(connection)
- s.startSearch(
- ldap.dn.dn2str(rdn),
- ldap.SCOPE_SUBTREE,
- filteredQuery,
- attrList=self._attributesToFetch,
- timeout=timeoutSeconds if timeoutSeconds else -1,
- sizelimit=limitResults if limitResults else 0
- )
- s.processResults()
+ if limitResults is not None:
+ if limitResults < 1:
+ break
- except ldap.SIZELIMIT_EXCEEDED as e:
- self.log.debug("LDAP result limit exceeded: {}".format(limitResults,))
+ try:
+ rdn = self._recordTypeSchemas[recordType].relativeDN
+ except KeyError:
+ # Skip this unknown record type
+ continue
- except ldap.TIMELIMIT_EXCEEDED as e:
- self.log.warn("LDAP timeout exceeded: {} seconds".format(timeoutSeconds,))
+ rdn = (
+ ldap.dn.str2dn(rdn.lower()) +
+ ldap.dn.str2dn(self._baseDN.lower())
+ )
+ filteredQuery = self._addExtraFilter(recordType, queryString)
+ self.log.debug(
+ "Performing LDAP query: {rdn} {query} {recordType}{limit}{timeout}",
+ rdn=rdn,
+ query=filteredQuery,
+ recordType=recordType,
+ limit=" limit={}".format(limitResults) if limitResults else "",
+ timeout=" timeout={}".format(timeoutSeconds) if timeoutSeconds else "",
+ )
+ try:
+ s = ldap.async.List(connection)
+ s.startSearch(
+ ldap.dn.dn2str(rdn),
+ ldap.SCOPE_SUBTREE,
+ filteredQuery,
+ attrList=self._attributesToFetch,
+ timeout=timeoutSeconds if timeoutSeconds else -1,
+ sizelimit=limitResults if limitResults else 0
+ )
+ s.processResults()
- except ldap.FILTER_ERROR as e:
- self.log.error(
- "Unable to perform query {query!r}: {err}",
- query=queryString, err=e
- )
- raise LDAPQueryError("Unable to perform query", e)
+ except ldap.SIZELIMIT_EXCEEDED as e:
+ self.log.debug("LDAP result limit exceeded: {}".format(limitResults,))
- except ldap.NO_SUCH_OBJECT as e:
- # self.log.warn("RDN {rdn} does not exist, skipping", rdn=rdn)
- continue
+ except ldap.TIMELIMIT_EXCEEDED as e:
+ self.log.warn("LDAP timeout exceeded: {} seconds".format(timeoutSeconds,))
- except ldap.INVALID_SYNTAX as e:
- self.log.error(
- "LDAP invalid syntax {query!r}: {err}",
- query=queryString, err=e
- )
- continue
+ except ldap.FILTER_ERROR as e:
+ self.log.error(
+ "Unable to perform query {query!r}: {err}",
+ query=queryString, err=e
+ )
+ raise LDAPQueryError("Unable to perform query", e)
- except ldap.SERVER_DOWN as e:
- self.log.error(
- "LDAP server unavailable"
- )
- raise LDAPQueryError("LDAP server down", e)
+ except ldap.NO_SUCH_OBJECT as e:
+ # self.log.warn("RDN {rdn} does not exist, skipping", rdn=rdn)
+ continue
- except Exception as e:
- self.log.error(
- "LDAP error {query!r}: {err}",
- query=queryString, err=e
- )
- raise LDAPQueryError("Unable to perform query", e)
+ except ldap.INVALID_SYNTAX as e:
+ self.log.error(
+ "LDAP invalid syntax {query!r}: {err}",
+ query=queryString, err=e
+ )
+ continue
- reply = [resultItem for _ignore_resultType, resultItem in s.allResults]
+ except ldap.SERVER_DOWN as e:
+ # Catch this below for retry
+ raise e
- newRecords = self._recordsFromReply(reply, recordType=recordType)
+ except Exception as e:
+ self.log.error(
+ "LDAP error {query!r}: {err}",
+ query=queryString, err=e
+ )
+ raise LDAPQueryError("Unable to perform query", e)
- self.log.debug(
- "Records from LDAP query ({rdn} {query} {recordType}): {count}",
- rdn=rdn,
- query=queryString,
- recordType=recordType,
- count=len(newRecords)
+ reply = [resultItem for _ignore_resultType, resultItem in s.allResults]
+
+ newRecords = self._recordsFromReply(reply, recordType=recordType)
+
+ self.log.debug(
+ "Records from LDAP query ({rdn} {query} {recordType}): {count}",
+ rdn=rdn,
+ query=queryString,
+ recordType=recordType,
+ count=len(newRecords)
+ )
+
+ if limitResults is not None:
+ limitResults = limitResults - len(newRecords)
+
+ records.extend(newRecords)
+
+ except ldap.SERVER_DOWN as e:
+ self.log.error(
+ "LDAP server unavailable"
)
+ if self._retryNumber + 1 == TRIES:
+ # We've hit SERVER_DOWN TRIES times, giving up
+ raise LDAPQueryError("LDAP server down", e)
+ else:
+ self.log.error("LDAP connection failure; retrying...")
- if limitResults is not None:
- limitResults = limitResults - len(newRecords)
+ else:
+ # Only retry if we got ldap.SERVER_DOWN, otherwise break out of
+ # loop
+ break
- records.extend(newRecords)
-
self.log.debug(
"LDAP result count ({query}): {count}",
query=queryString,
@@ -725,24 +744,49 @@
@param dn: The DN of the record to search for
@type dn: C{str}
"""
- with DirectoryService.Connection(self) as connection:
- self.log.debug("Performing LDAP DN query: {dn}", dn=dn)
+ records = []
+ # Retry if we get ldap.SERVER_DOWN
+ TRIES = 3
+
+ for self._retryNumber in xrange(TRIES):
+
try:
- reply = connection.search_s(
- dn,
- ldap.SCOPE_SUBTREE,
- "(objectClass=*)",
- attrlist=self._attributesToFetch
+
+ with DirectoryService.Connection(self) as connection:
+
+ self.log.debug("Performing LDAP DN query: {dn}", dn=dn)
+
+ try:
+ reply = connection.search_s(
+ dn,
+ ldap.SCOPE_SUBTREE,
+ "(objectClass=*)",
+ attrlist=self._attributesToFetch
+ )
+ records = self._recordsFromReply(reply)
+ except ldap.NO_SUCH_OBJECT:
+ records = []
+ except ldap.INVALID_DN_SYNTAX:
+ self.log.warn("Invalid LDAP DN syntax: '{dn}'", dn=dn)
+ records = []
+
+ except ldap.SERVER_DOWN as e:
+ self.log.error(
+ "LDAP server unavailable"
)
- records = self._recordsFromReply(reply)
- except ldap.NO_SUCH_OBJECT:
- records = []
- except ldap.INVALID_DN_SYNTAX:
- self.log.warn("Invalid LDAP DN syntax: '{dn}'", dn=dn)
- records = []
+ if self._retryNumber + 1 == TRIES:
+ # We've hit SERVER_DOWN TRIES times, giving up
+ raise LDAPQueryError("LDAP server down", e)
+ else:
+ self.log.error("LDAP connection failure; retrying...")
+ else:
+ # Only retry if we got ldap.SERVER_DOWN, otherwise break out of
+ # loop
+ break
+
if len(records):
return records[0]
else:
Modified: twext/trunk/twext/who/ldap/test/test_service.py
===================================================================
--- twext/trunk/twext/who/ldap/test/test_service.py 2015-09-08 19:11:06 UTC (rev 15110)
+++ twext/trunk/twext/who/ldap/test/test_service.py 2015-09-09 20:08:14 UTC (rev 15111)
@@ -58,7 +58,7 @@
from twext.python.types import MappingProxyType
from twext.who.idirectory import RecordType
from twext.who.ldap import (
- LDAPAttribute, RecordTypeSchema, LDAPObjectClass
+ LDAPAttribute, RecordTypeSchema, LDAPObjectClass, LDAPQueryError
)
from twext.who.util import ConstantsContainer
@@ -445,6 +445,55 @@
self.assertEquals(repr(service), u"<TestService u'ldap://localhost/'>")
+ def test_server_down(self):
+ """
+ Verify an ldap.SERVER_DOWN error will retry 2 more times and that
+ the connection is closed if all attempts fail.
+ """
+
+ service = self.service()
+
+ # Verify that without a SERVER_DOWN we don't need to retry, and we
+ # still have a connection in the pool
+ service._recordsFromQueryString_inThread("(this=that)")
+ self.assertEquals(service._retryNumber, 0)
+ self.assertEquals(len(service.connections), 1)
+
+ service._recordWithDN_inThread("cn=test")
+ self.assertEquals(service._retryNumber, 0)
+ self.assertEquals(len(service.connections), 1)
+
+ # Force a search to raise SERVER_DOWN
+ def raiseServerDown(*args, **kwds):
+ raise ldap.SERVER_DOWN
+ self.patch(LDAPObject, "search_ext", raiseServerDown)
+ self.patch(LDAPObject, "search_s", raiseServerDown)
+
+ # Now try recordsFromQueryString
+ try:
+ service._recordsFromQueryString_inThread("(this=that)")
+ except LDAPQueryError:
+ # Verify the number of times we retried
+ self.assertEquals(service._retryNumber, 2)
+ except:
+ self.fail("Should have raised LDAPQueryError")
+
+ # Verify the connections are all closed
+ self.assertEquals(len(service.connections), 0)
+
+ # Now try recordWithDN
+ try:
+ service._recordWithDN_inThread("cn=test")
+ except LDAPQueryError:
+ # Verify the number of times we retried
+ self.assertEquals(service._retryNumber, 2)
+ except:
+ self.fail("Should have raised LDAPQueryError")
+
+ # Verify the connections are all closed
+ self.assertEquals(len(service.connections), 0)
+
+
class ExtraFiltersTest(BaseTestCase, unittest.TestCase):
def test_extraFilters(self):
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150909/bfcf8922/attachment-0001.html>
More information about the calendarserver-changes
mailing list