[CalendarServer-changes] [15111] twext/trunk/twext/who/ldap

source_changes at macosforge.org source_changes at macosforge.org
Wed Sep 9 13:08:14 PDT 2015


Revision: 15111
          http://trac.calendarserver.org//changeset/15111
Author:   sagen at apple.com
Date:     2015-09-09 13:08:14 -0700 (Wed, 09 Sep 2015)
Log Message:
-----------
If we get ldap.SERVER_DOWN, retry a couple times with a new connection

Modified Paths:
--------------
    twext/trunk/twext/who/ldap/__init__.py
    twext/trunk/twext/who/ldap/_service.py
    twext/trunk/twext/who/ldap/test/test_service.py

Modified: twext/trunk/twext/who/ldap/__init__.py
===================================================================
--- twext/trunk/twext/who/ldap/__init__.py	2015-09-08 19:11:06 UTC (rev 15110)
+++ twext/trunk/twext/who/ldap/__init__.py	2015-09-09 20:08:14 UTC (rev 15111)
@@ -23,6 +23,7 @@
     "LDAPConfigurationError",
     "LDAPConnectionError",
     "LDAPBindAuthError",
+    "LDAPQueryError",
     "RecordTypeSchema",
     "DirectoryService",
     "LDAPAttribute",
@@ -36,6 +37,7 @@
     LDAPConfigurationError,
     LDAPConnectionError,
     LDAPBindAuthError,
+    LDAPQueryError,
     RecordTypeSchema,
     DirectoryService,
     FieldName

Modified: twext/trunk/twext/who/ldap/_service.py
===================================================================
--- twext/trunk/twext/who/ldap/_service.py	2015-09-08 19:11:06 UTC (rev 15110)
+++ twext/trunk/twext/who/ldap/_service.py	2015-09-09 20:08:14 UTC (rev 15111)
@@ -603,105 +603,124 @@
         This method is always called in a thread.
         """
 
-        records = []
+        if recordTypes is None:
+            recordTypes = list(self.recordTypes())
 
-        with DirectoryService.Connection(self) as connection:
+        # Retry if we get ldap.SERVER_DOWN
+        TRIES = 3
 
+        for self._retryNumber in xrange(TRIES):
 
-            if recordTypes is None:
-                recordTypes = self.recordTypes()
+            records = []
 
-            for recordType in recordTypes:
+            try:
 
-                if limitResults is not None:
-                    if limitResults < 1:
-                        break
+                with DirectoryService.Connection(self) as connection:
 
-                try:
-                    rdn = self._recordTypeSchemas[recordType].relativeDN
-                except KeyError:
-                    # Skip this unknown record type
-                    continue
+                    for recordType in recordTypes:
 
-                rdn = (
-                    ldap.dn.str2dn(rdn.lower()) +
-                    ldap.dn.str2dn(self._baseDN.lower())
-                )
-                filteredQuery = self._addExtraFilter(recordType, queryString)
-                self.log.debug(
-                    "Performing LDAP query: {rdn} {query} {recordType}{limit}{timeout}",
-                    rdn=rdn,
-                    query=filteredQuery,
-                    recordType=recordType,
-                    limit=" limit={}".format(limitResults) if limitResults else "",
-                    timeout=" timeout={}".format(timeoutSeconds) if timeoutSeconds else "",
-                )
-                try:
-                    s = ldap.async.List(connection)
-                    s.startSearch(
-                        ldap.dn.dn2str(rdn),
-                        ldap.SCOPE_SUBTREE,
-                        filteredQuery,
-                        attrList=self._attributesToFetch,
-                        timeout=timeoutSeconds if timeoutSeconds else -1,
-                        sizelimit=limitResults if limitResults else 0
-                    )
-                    s.processResults()
+                        if limitResults is not None:
+                            if limitResults < 1:
+                                break
 
-                except ldap.SIZELIMIT_EXCEEDED as e:
-                    self.log.debug("LDAP result limit exceeded: {}".format(limitResults,))
+                        try:
+                            rdn = self._recordTypeSchemas[recordType].relativeDN
+                        except KeyError:
+                            # Skip this unknown record type
+                            continue
 
-                except ldap.TIMELIMIT_EXCEEDED as e:
-                    self.log.warn("LDAP timeout exceeded: {} seconds".format(timeoutSeconds,))
+                        rdn = (
+                            ldap.dn.str2dn(rdn.lower()) +
+                            ldap.dn.str2dn(self._baseDN.lower())
+                        )
+                        filteredQuery = self._addExtraFilter(recordType, queryString)
+                        self.log.debug(
+                            "Performing LDAP query: {rdn} {query} {recordType}{limit}{timeout}",
+                            rdn=rdn,
+                            query=filteredQuery,
+                            recordType=recordType,
+                            limit=" limit={}".format(limitResults) if limitResults else "",
+                            timeout=" timeout={}".format(timeoutSeconds) if timeoutSeconds else "",
+                        )
+                        try:
+                            s = ldap.async.List(connection)
+                            s.startSearch(
+                                ldap.dn.dn2str(rdn),
+                                ldap.SCOPE_SUBTREE,
+                                filteredQuery,
+                                attrList=self._attributesToFetch,
+                                timeout=timeoutSeconds if timeoutSeconds else -1,
+                                sizelimit=limitResults if limitResults else 0
+                            )
+                            s.processResults()
 
-                except ldap.FILTER_ERROR as e:
-                    self.log.error(
-                        "Unable to perform query {query!r}: {err}",
-                        query=queryString, err=e
-                    )
-                    raise LDAPQueryError("Unable to perform query", e)
+                        except ldap.SIZELIMIT_EXCEEDED as e:
+                            self.log.debug("LDAP result limit exceeded: {}".format(limitResults,))
 
-                except ldap.NO_SUCH_OBJECT as e:
-                    # self.log.warn("RDN {rdn} does not exist, skipping", rdn=rdn)
-                    continue
+                        except ldap.TIMELIMIT_EXCEEDED as e:
+                            self.log.warn("LDAP timeout exceeded: {} seconds".format(timeoutSeconds,))
 
-                except ldap.INVALID_SYNTAX as e:
-                    self.log.error(
-                        "LDAP invalid syntax {query!r}: {err}",
-                        query=queryString, err=e
-                    )
-                    continue
+                        except ldap.FILTER_ERROR as e:
+                            self.log.error(
+                                "Unable to perform query {query!r}: {err}",
+                                query=queryString, err=e
+                            )
+                            raise LDAPQueryError("Unable to perform query", e)
 
-                except ldap.SERVER_DOWN as e:
-                    self.log.error(
-                        "LDAP server unavailable"
-                    )
-                    raise LDAPQueryError("LDAP server down", e)
+                        except ldap.NO_SUCH_OBJECT as e:
+                            # self.log.warn("RDN {rdn} does not exist, skipping", rdn=rdn)
+                            continue
 
-                except Exception as e:
-                    self.log.error(
-                        "LDAP error {query!r}: {err}",
-                        query=queryString, err=e
-                    )
-                    raise LDAPQueryError("Unable to perform query", e)
+                        except ldap.INVALID_SYNTAX as e:
+                            self.log.error(
+                                "LDAP invalid syntax {query!r}: {err}",
+                                query=queryString, err=e
+                            )
+                            continue
 
-                reply = [resultItem for _ignore_resultType, resultItem in s.allResults]
+                        except ldap.SERVER_DOWN as e:
+                            # Catch this below for retry
+                            raise e
 
-                newRecords = self._recordsFromReply(reply, recordType=recordType)
+                        except Exception as e:
+                            self.log.error(
+                                "LDAP error {query!r}: {err}",
+                                query=queryString, err=e
+                            )
+                            raise LDAPQueryError("Unable to perform query", e)
 
-                self.log.debug(
-                    "Records from LDAP query ({rdn} {query} {recordType}): {count}",
-                    rdn=rdn,
-                    query=queryString,
-                    recordType=recordType,
-                    count=len(newRecords)
+                        reply = [resultItem for _ignore_resultType, resultItem in s.allResults]
+
+                        newRecords = self._recordsFromReply(reply, recordType=recordType)
+
+                        self.log.debug(
+                            "Records from LDAP query ({rdn} {query} {recordType}): {count}",
+                            rdn=rdn,
+                            query=queryString,
+                            recordType=recordType,
+                            count=len(newRecords)
+                        )
+
+                        if limitResults is not None:
+                            limitResults = limitResults - len(newRecords)
+
+                        records.extend(newRecords)
+
+            except ldap.SERVER_DOWN as e:
+                self.log.error(
+                    "LDAP server unavailable"
                 )
+                if self._retryNumber + 1 == TRIES:
+                    # We've hit SERVER_DOWN TRIES times, giving up
+                    raise LDAPQueryError("LDAP server down", e)
+                else:
+                    self.log.error("LDAP connection failure; retrying...")
 
-                if limitResults is not None:
-                    limitResults = limitResults - len(newRecords)
+            else:
+                # Only retry if we got ldap.SERVER_DOWN, otherwise break out of
+                # loop
+                break
 
-                records.extend(newRecords)
-
         self.log.debug(
             "LDAP result count ({query}): {count}",
             query=queryString,
@@ -725,24 +744,49 @@
         @param dn: The DN of the record to search for
         @type dn: C{str}
         """
-        with DirectoryService.Connection(self) as connection:
 
-            self.log.debug("Performing LDAP DN query: {dn}", dn=dn)
+        records = []
 
+        # Retry if we get ldap.SERVER_DOWN
+        TRIES = 3
+
+        for self._retryNumber in xrange(TRIES):
+
             try:
-                reply = connection.search_s(
-                    dn,
-                    ldap.SCOPE_SUBTREE,
-                    "(objectClass=*)",
-                    attrlist=self._attributesToFetch
+
+                with DirectoryService.Connection(self) as connection:
+
+                    self.log.debug("Performing LDAP DN query: {dn}", dn=dn)
+
+                    try:
+                        reply = connection.search_s(
+                            dn,
+                            ldap.SCOPE_SUBTREE,
+                            "(objectClass=*)",
+                            attrlist=self._attributesToFetch
+                        )
+                        records = self._recordsFromReply(reply)
+                    except ldap.NO_SUCH_OBJECT:
+                        records = []
+                    except ldap.INVALID_DN_SYNTAX:
+                        self.log.warn("Invalid LDAP DN syntax: '{dn}'", dn=dn)
+                        records = []
+
+            except ldap.SERVER_DOWN as e:
+                self.log.error(
+                    "LDAP server unavailable"
                 )
-                records = self._recordsFromReply(reply)
-            except ldap.NO_SUCH_OBJECT:
-                records = []
-            except ldap.INVALID_DN_SYNTAX:
-                self.log.warn("Invalid LDAP DN syntax: '{dn}'", dn=dn)
-                records = []
+                if self._retryNumber + 1 == TRIES:
+                    # We've hit SERVER_DOWN TRIES times, giving up
+                    raise LDAPQueryError("LDAP server down", e)
+                else:
+                    self.log.error("LDAP connection failure; retrying...")
 
+            else:
+                # Only retry if we got ldap.SERVER_DOWN, otherwise break out of
+                # loop
+                break
+
         if len(records):
             return records[0]
         else:

Modified: twext/trunk/twext/who/ldap/test/test_service.py
===================================================================
--- twext/trunk/twext/who/ldap/test/test_service.py	2015-09-08 19:11:06 UTC (rev 15110)
+++ twext/trunk/twext/who/ldap/test/test_service.py	2015-09-09 20:08:14 UTC (rev 15111)
@@ -58,7 +58,7 @@
 from twext.python.types import MappingProxyType
 from twext.who.idirectory import RecordType
 from twext.who.ldap import (
-    LDAPAttribute, RecordTypeSchema, LDAPObjectClass
+    LDAPAttribute, RecordTypeSchema, LDAPObjectClass, LDAPQueryError
 )
 from twext.who.util import ConstantsContainer
 
@@ -445,6 +445,55 @@
         self.assertEquals(repr(service), u"<TestService u'ldap://localhost/'>")
 
 
+    def test_server_down(self):
+        """
+        Verify an ldap.SERVER_DOWN error will retry 2 more times and that
+        the connection is closed if all attempts fail.
+        """
+
+        service = self.service()
+
+        # Verify that without a SERVER_DOWN we don't need to retry, and we
+        # still have a connection in the pool
+        service._recordsFromQueryString_inThread("(this=that)")
+        self.assertEquals(service._retryNumber, 0)
+        self.assertEquals(len(service.connections), 1)
+
+        service._recordWithDN_inThread("cn=test")
+        self.assertEquals(service._retryNumber, 0)
+        self.assertEquals(len(service.connections), 1)
+
+        # Force a search to raise SERVER_DOWN
+        def raiseServerDown(*args, **kwds):
+            raise ldap.SERVER_DOWN
+        self.patch(LDAPObject, "search_ext", raiseServerDown)
+        self.patch(LDAPObject, "search_s", raiseServerDown)
+
+        # Now try recordsFromQueryString
+        try:
+            service._recordsFromQueryString_inThread("(this=that)")
+        except LDAPQueryError:
+            # Verify the number of times we retried
+            self.assertEquals(service._retryNumber, 2)
+        except:
+            self.fail("Should have raised LDAPQueryError")
+
+        # Verify the connections are all closed
+        self.assertEquals(len(service.connections), 0)
+
+        # Now try recordWithDN
+        try:
+            service._recordWithDN_inThread("cn=test")
+        except LDAPQueryError:
+            # Verify the number of times we retried
+            self.assertEquals(service._retryNumber, 2)
+        except:
+            self.fail("Should have raised LDAPQueryError")
+
+        # Verify the connections are all closed
+        self.assertEquals(len(service.connections), 0)
+
+
 class ExtraFiltersTest(BaseTestCase, unittest.TestCase):
 
     def test_extraFilters(self):
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <https://lists.macosforge.org/pipermail/calendarserver-changes/attachments/20150909/bfcf8922/attachment-0001.html>


More information about the calendarserver-changes mailing list