From 5d514030988ba726b347167937d97bb9ab56e767 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Palancher?= Date: Sat, 23 May 2026 11:49:24 +0200 Subject: [PATCH 1/2] fix(auth): ldap exceeded limit with paged search fix #41 --- CHANGELOG.md | 10 +++ src/authentication/rfl/authentication/ldap.py | 77 +++++++++++++++++-- 2 files changed, 82 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a005175..bab488f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,16 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [unreleased] + +### Added +- auth: Add optional `page_size` argument to `LDAPAuthentifier` (default 1000) for + LDAP paged search page size (#41). + +### Fixed +- auth: Support LDAP paged results on subtree searches to avoid + `ldap.SIZELIMIT_EXCEEDED` (#41). + ## [1.8.0] - 2026-05-22 ### Added diff --git a/src/authentication/rfl/authentication/ldap.py b/src/authentication/rfl/authentication/ldap.py index 793b548..52c42d3 100644 --- a/src/authentication/rfl/authentication/ldap.py +++ b/src/authentication/rfl/authentication/ldap.py @@ -11,6 +11,7 @@ try: import ldap import ldap.filter + from ldap.controls import SimplePagedResultsControl except ImportError as err: raise ImportError("python-ldap is required for RFL LDAP Authentication") from err @@ -20,6 +21,8 @@ logger = logging.getLogger(__name__) +DEFAULT_LDAP_PAGE_SIZE = 1000 + class LDAPAuthentifier: def __init__( @@ -40,7 +43,13 @@ def __init__( restricted_groups: Optional[List[str]] = None, lookup_user_dn: bool = False, lookup_as_user: Optional[bool] = True, + page_size: int = DEFAULT_LDAP_PAGE_SIZE, ): + if page_size <= 0: + raise LDAPAuthenticationError( + f"LDAP page_size must be a positive integer, got {page_size}" + ) + self.page_size = page_size self.uri = uri self.cacert = cacert self.user_base = user_base @@ -130,6 +139,49 @@ def connection(self): ) from err return connection + def _search_paged( + self, + connection: ldap.ldapobject.LDAPObject, + base: str, + scope: int, + search_filter: str, + attrlist: Optional[List[str]] = None, + ) -> List: + """Run an LDAP search with RFC 2696 paged results and return all pages.""" + known_ldap_resp_ctrls = { + SimplePagedResultsControl.controlType: SimplePagedResultsControl, + } + page_control = SimplePagedResultsControl(True, size=self.page_size, cookie="") + results = [] + while True: + msgid = connection.search_ext( + base, + scope, + search_filter, + attrlist=attrlist, + serverctrls=[page_control], + ) + _, rdata, _, serverctrls = connection.result3( + msgid, resp_ctrl_classes=known_ldap_resp_ctrls + ) + results.extend(rdata) + pctrls = [ + control + for control in (serverctrls or []) + if control.controlType == SimplePagedResultsControl.controlType + ] + if not pctrls: + logger.debug( + "LDAP server did not return paged results control for search " + "base %s", + base, + ) + break + if not pctrls[0].cookie: + break + page_control.cookie = pctrls[0].cookie + return results + def _get_user_info( self, connection: ldap.ldapobject.LDAPObject, user_dn: str ) -> Tuple[str, int]: @@ -204,7 +256,8 @@ def _get_groups( f"(member={ldap.filter.escape_filter_chars(user_dn)}){gid_filter}))" ) try: - results = connection.search_s( + results = self._search_paged( + connection, self.group_base, ldap.SCOPE_SUBTREE, search_filter, @@ -214,6 +267,10 @@ def _get_groups( raise LDAPAuthenticationError( f"Unable to find group base {self.group_base}" ) from err + except ldap.SIZELIMIT_EXCEEDED as err: + raise LDAPAuthenticationError( + f"LDAP size limit exceeded on group search: {err}" + ) from err logger.debug( "LDAP search base: %s, scope: subtree, filter: %s, results: %s", self.group_base, @@ -292,7 +349,8 @@ def _lookup_user_dn(self, user): f"({self.user_name_attribute}={ldap.filter.escape_filter_chars(user)}))" ) try: - results = connection.search_s( + results = self._search_paged( + connection, self.user_base, ldap.SCOPE_SUBTREE, search_filter, @@ -310,11 +368,15 @@ def _lookup_user_dn(self, user): raise LDAPAuthenticationError( f"Operations error on user DN lookup: {err}" ) from err + except ldap.SIZELIMIT_EXCEEDED as err: + raise LDAPAuthenticationError( + f"LDAP size limit exceeded on user DN lookup: {err}" + ) from err finally: connection.unbind_s() logger.debug( "LDAP search base: %s, scope: subtree, filter: %s, results: %s", - self.group_base, + self.user_base, search_filter, str(results), ) @@ -396,7 +458,8 @@ def _list_user_dn(self, connection): """Return list of all users name/pairs pairs in LDAP directory.""" search_filter = f"(objectClass={self.user_class})" try: - results = connection.search_s( + results = self._search_paged( + connection, self.user_base, ldap.SCOPE_SUBTREE, search_filter, @@ -410,9 +473,13 @@ def _list_user_dn(self, connection): raise LDAPAuthenticationError( f"Operations error on users search: {err}" ) from err + except ldap.SIZELIMIT_EXCEEDED as err: + raise LDAPAuthenticationError( + f"LDAP size limit exceeded on users search: {err}" + ) from err logger.debug( "LDAP search base: %s, scope: subtree, filter: %s, results: %s", - self.group_base, + self.user_base, search_filter, str(results), ) From 669415bb80798d9a1097b8819ccef03d2e17f8e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9mi=20Palancher?= Date: Sat, 23 May 2026 11:50:08 +0200 Subject: [PATCH 2/2] tests(auth): cover ldap paged search --- src/authentication/rfl/tests/test_ldap.py | 416 +++++++++++++++++----- 1 file changed, 322 insertions(+), 94 deletions(-) diff --git a/src/authentication/rfl/tests/test_ldap.py b/src/authentication/rfl/tests/test_ldap.py index 001f31e..4c077cf 100644 --- a/src/authentication/rfl/tests/test_ldap.py +++ b/src/authentication/rfl/tests/test_ldap.py @@ -5,18 +5,54 @@ # SPDX-License-Identifier: LGPL-3.0-or-later import unittest -from unittest.mock import patch, Mock +from unittest.mock import ANY, patch, Mock from pathlib import Path import urllib import ssl import ldap import ldap.filter +from ldap.controls import SimplePagedResultsControl -from rfl.authentication.ldap import LDAPAuthentifier +from rfl.authentication.ldap import ( + DEFAULT_LDAP_PAGE_SIZE, + LDAPAuthentifier, +) from rfl.authentication.errors import LDAPAuthenticationError +def _configure_mock_paged_ldap_connection(connection, pages): + """Configure a mock connection for _search_paged (one or more result pages).""" + page_specs = [] + for index, page_data in enumerate(pages): + cookie = b"next" if index < len(pages) - 1 else b"" + page_specs.append((page_data, cookie)) + + def search_ext_impl( + base, + scope, + filterstr, + attrlist=None, + attrsonly=0, + serverctrls=None, + clientctrls=None, + timeout=-1, + sizelimit=0, + ): + search_ext_impl.call_count += 1 + return search_ext_impl.call_count + + search_ext_impl.call_count = 0 + + def result3(msgid, resp_ctrl_classes=None): + page_data, cookie = page_specs[msgid - 1] + page_control = SimplePagedResultsControl(True, size=1000, cookie=cookie) + return (ldap.RES_SEARCH_RESULT, page_data, msgid, [page_control]) + + connection.search_ext = Mock(side_effect=search_ext_impl) + connection.result3 = Mock(side_effect=result3) + + class MockLDAPObject: pass @@ -118,7 +154,7 @@ def test_lookup_user_dn_disabled(self, mock_ldap_initialize): f"{self.authentifier.user_base}", ) mock_ldap_object.simple_bind_s.assert_not_called() - mock_ldap_object.search_s.assert_not_called() + mock_ldap_object.search_ext.assert_not_called() mock_ldap_object.unbind_s.assert_not_called() @patch.object(ldap, "initialize") @@ -127,19 +163,24 @@ def test_lookup_user_dn_enabled(self, mock_ldap_initialize): self.authentifier.lookup_user_dn = True # setup LDAP mock mock_ldap_object = mock_ldap_initialize.return_value - mock_ldap_object.search_s.return_value = [ - ( - f"uid=john,ou=admins,{self.authentifier.user_base}", - {"cn": [b"John Doe"]}, - ) - ] + _configure_mock_paged_ldap_connection( + mock_ldap_object, + [ + [ + ( + f"uid=john,ou=admins,{self.authentifier.user_base}", + {"cn": [b"John Doe"]}, + ) + ] + ], + ) self.assertEqual( self.authentifier._lookup_user_dn("john"), f"uid=john,ou=admins,{self.authentifier.user_base}", ) mock_ldap_object.simple_bind_s.assert_not_called() - mock_ldap_object.search_s.assert_called_once() + mock_ldap_object.search_ext.assert_called_once() mock_ldap_object.unbind_s.assert_called_once() @patch.object(ldap, "initialize") @@ -151,12 +192,17 @@ def test_lookup_user_dn_enabled_bind_dn(self, mock_ldap_initialize): self.authentifier.lookup_user_dn = True # setup LDAP mock mock_ldap_object = mock_ldap_initialize.return_value - mock_ldap_object.search_s.return_value = [ - ( - f"uid=john,ou=admins,{self.authentifier.user_base}", - {"cn": [b"John Doe"]}, - ) - ] + _configure_mock_paged_ldap_connection( + mock_ldap_object, + [ + [ + ( + f"uid=john,ou=admins,{self.authentifier.user_base}", + {"cn": [b"John Doe"]}, + ) + ] + ], + ) self.assertEqual( self.authentifier._lookup_user_dn("john"), @@ -165,7 +211,7 @@ def test_lookup_user_dn_enabled_bind_dn(self, mock_ldap_initialize): mock_ldap_object.simple_bind_s.assert_called_once_with( self.authentifier.bind_dn, self.authentifier.bind_password ) - mock_ldap_object.search_s.assert_called_once() + mock_ldap_object.search_ext.assert_called_once() mock_ldap_object.unbind_s.assert_called_once() @patch.object(ldap, "initialize") @@ -176,19 +222,25 @@ def test_lookup_user_dn_escape_special_chars(self, mock_ldap_initialize): mock_ldap_object = mock_ldap_initialize.return_value user_with_special_chars = "John Doe(user)" escaped_user = ldap.filter.escape_filter_chars(user_with_special_chars) - mock_ldap_object.search_s.return_value = [ - ( - f"uid={user_with_special_chars},ou=admins,{self.authentifier.user_base}", - {"cn": [b"John Doe(user)"]}, - ) - ] + _configure_mock_paged_ldap_connection( + mock_ldap_object, + [ + [ + ( + f"uid={user_with_special_chars},ou=admins," + f"{self.authentifier.user_base}", + {"cn": [b"John Doe(user)"]}, + ) + ] + ], + ) self.assertEqual( self.authentifier._lookup_user_dn(user_with_special_chars), f"uid={user_with_special_chars},ou=admins,{self.authentifier.user_base}", ) # Verify the filter contains escaped user value - call_args = mock_ldap_object.search_s.call_args + call_args = mock_ldap_object.search_ext.call_args search_filter = call_args[0][2] # Verify the filter contains escaped user value self.assertIn( @@ -219,7 +271,7 @@ def test_lookup_user_dn_enabled_bind_dn_missing_password( r"is required$", ): self.authentifier._lookup_user_dn("john") - mock_ldap_object.search_s.assert_not_called() + mock_ldap_object.search_ext.assert_not_called() mock_ldap_object.unbind_s.assert_not_called() @patch.object(ldap, "initialize") @@ -241,7 +293,7 @@ def test_lookup_user_dn_enabled_bind_dn_invalid_credentials( r"^Invalid bind DN or password$", ): self.authentifier._lookup_user_dn("john") - mock_ldap_object.search_s.assert_not_called() + mock_ldap_object.search_ext.assert_not_called() mock_ldap_object.unbind_s.assert_not_called() @patch.object(ldap, "initialize") @@ -261,7 +313,7 @@ def test_lookup_user_ldap_server_down_error(self, mock_ldap_initialize): rf"^LDAP server {self.authentifier.uri.geturl()} is unreachable$", ): self.authentifier._lookup_user_dn("john") - mock_ldap_object.search_s.assert_not_called() + mock_ldap_object.search_ext.assert_not_called() mock_ldap_object.unbind_s.assert_not_called() @patch.object(ldap, "initialize") @@ -273,7 +325,7 @@ def test_lookup_user_ldap_operations_error(self, mock_ldap_initialize): self.authentifier.lookup_user_dn = True # setup LDAP mock mock_ldap_object = mock_ldap_initialize.return_value - mock_ldap_object.search_s.side_effect = ldap.OPERATIONS_ERROR("fail") + mock_ldap_object.search_ext.side_effect = ldap.OPERATIONS_ERROR("fail") # Check exception is raised due to LDAP server down with self.assertRaisesRegex( @@ -281,7 +333,7 @@ def test_lookup_user_ldap_operations_error(self, mock_ldap_initialize): r"^Operations error on user DN lookup: fail$", ): self.authentifier._lookup_user_dn("john") - mock_ldap_object.search_s.assert_called_once() + mock_ldap_object.search_ext.assert_called_once() mock_ldap_object.unbind_s.assert_called_once() @patch.object(ldap, "initialize") @@ -290,7 +342,7 @@ def test_lookup_user_dn_enabled_not_found(self, mock_ldap_initialize): self.authentifier.lookup_user_dn = True # setup LDAP mock mock_ldap_object = mock_ldap_initialize.return_value - mock_ldap_object.search_s.return_value = [] + _configure_mock_paged_ldap_connection(mock_ldap_object, [[]]) # Check exception is raised due to no result found with self.assertRaisesRegex( @@ -298,7 +350,7 @@ def test_lookup_user_dn_enabled_not_found(self, mock_ldap_initialize): r"^Unable to find user john in base ou=people,dc=corp,dc=org$", ): self.authentifier._lookup_user_dn("john") - mock_ldap_object.search_s.assert_called_once() + mock_ldap_object.search_ext.assert_called_once() mock_ldap_object.unbind_s.assert_called_once() @patch.object(ldap, "initialize") @@ -307,16 +359,21 @@ def test_lookup_user_dn_enabled_too_many_results(self, mock_ldap_initialize): self.authentifier.lookup_user_dn = True # setup LDAP mock mock_ldap_object = mock_ldap_initialize.return_value - mock_ldap_object.search_s.return_value = [ - ( - f"uid=john,ou=admins,{self.authentifier.user_base}", - {"cn": [b"John Doe"]}, - ), - ( - f"uid=alice,ou=admins,{self.authentifier.user_base}", - {"cn": [b"Alice Doe"]}, - ), - ] + _configure_mock_paged_ldap_connection( + mock_ldap_object, + [ + [ + ( + f"uid=john,ou=admins,{self.authentifier.user_base}", + {"cn": [b"John Doe"]}, + ), + ( + f"uid=alice,ou=admins,{self.authentifier.user_base}", + {"cn": [b"Alice Doe"]}, + ), + ] + ], + ) # Check exception is raised due to too many results found with self.assertRaisesRegex( LDAPAuthenticationError, @@ -324,7 +381,7 @@ def test_lookup_user_dn_enabled_too_many_results(self, mock_ldap_initialize): r"ou=people,dc=corp,dc=org$", ): self.authentifier._lookup_user_dn("john") - mock_ldap_object.search_s.assert_called_once() + mock_ldap_object.search_ext.assert_called_once() mock_ldap_object.unbind_s.assert_called_once() @patch.object(LDAPAuthentifier, "_lookup_user_dn") @@ -575,10 +632,15 @@ def test_custom_primary_attribute(self): def test_get_groups(self): connection = Mock(spec=ldap.ldapobject.LDAPObject) - connection.search_s.return_value = [ - ("cn=scientists,ou=groups,dc=corp,dc=org", {"cn": [b"scientists"]}), - ("cn=biology,ou=groups,dc=corp,dc=org", {"cn": [b"biology"]}), - ] + _configure_mock_paged_ldap_connection( + connection, + [ + [ + ("cn=scientists,ou=groups,dc=corp,dc=org", {"cn": [b"scientists"]}), + ("cn=biology,ou=groups,dc=corp,dc=org", {"cn": [b"biology"]}), + ] + ], + ) groups = self.authentifier._get_groups( connection, "john", "uid=john,ou=people,dc=corp,dc=org", 42 ) @@ -586,9 +648,10 @@ def test_get_groups(self): def test_get_groups_escape_special_chars(self): connection = Mock(spec=ldap.ldapobject.LDAPObject) - connection.search_s.return_value = [ - ("cn=scientists,ou=groups,dc=corp,dc=org", {"cn": [b"scientists"]}), - ] + _configure_mock_paged_ldap_connection( + connection, + [[("cn=scientists,ou=groups,dc=corp,dc=org", {"cn": [b"scientists"]})]], + ) user_name_with_special = "John Doe(user)" user_dn_with_special = "uid=John Doe(user),ou=people,dc=corp,dc=org" escaped_user_name = ldap.filter.escape_filter_chars(user_name_with_special) @@ -599,52 +662,70 @@ def test_get_groups_escape_special_chars(self): ) self.assertEqual(groups, ["scientists"]) - connection.search_s.assert_called_once_with( + connection.search_ext.assert_called_once_with( self.authentifier.group_base, ldap.SCOPE_SUBTREE, f"(&(|(objectClass=posixGroup)(objectClass=groupOfNames))" f"(|(memberUid={escaped_user_name})" f"(member={escaped_user_dn})(gidNumber={gid})))", - [self.authentifier.group_name_attribute], + attrlist=[self.authentifier.group_name_attribute], + serverctrls=[ANY], ) def test_get_groups_without_gid(self): connection = Mock(spec=ldap.ldapobject.LDAPObject) - connection.search_s.return_value = [ - ("cn=scientists,ou=groups,dc=corp,dc=org", {"cn": [b"scientists"]}), - ("cn=biology,ou=groups,dc=corp,dc=org", {"cn": [b"biology"]}), - ] + _configure_mock_paged_ldap_connection( + connection, + [ + [ + ("cn=scientists,ou=groups,dc=corp,dc=org", {"cn": [b"scientists"]}), + ("cn=biology,ou=groups,dc=corp,dc=org", {"cn": [b"biology"]}), + ] + ], + ) user = "john" dn = "uid=john,ou=people,dc=corp,dc=org" gid = 42 # First call with gid and check LDAP search filter. groups = self.authentifier._get_groups(connection, user, dn, gid) self.assertEqual(groups, ["scientists", "biology"]) - connection.search_s.assert_called_once_with( + connection.search_ext.assert_called_once_with( self.authentifier.group_base, ldap.SCOPE_SUBTREE, "(&(|(objectClass=posixGroup)(objectClass=groupOfNames))" f"(|(memberUid={user})(member={dn})(gidNumber={gid})))", - [self.authentifier.group_name_attribute], + attrlist=[self.authentifier.group_name_attribute], + serverctrls=[ANY], + ) + connection.search_ext.reset_mock() + connection.result3.reset_mock() + _configure_mock_paged_ldap_connection( + connection, + [ + [ + ("cn=scientists,ou=groups,dc=corp,dc=org", {"cn": [b"scientists"]}), + ("cn=biology,ou=groups,dc=corp,dc=org", {"cn": [b"biology"]}), + ] + ], ) - connection.search_s.reset_mock() # Then a second call with undefined gid must remove gidNumber from LDAP search # filter. groups = self.authentifier._get_groups(connection, user, dn, None) self.assertEqual(groups, ["scientists", "biology"]) - connection.search_s.assert_called_once_with( + connection.search_ext.assert_called_once_with( self.authentifier.group_base, ldap.SCOPE_SUBTREE, "(&(|(objectClass=posixGroup)(objectClass=groupOfNames))" f"(|(memberUid={user})(member={dn})))", - [self.authentifier.group_name_attribute], + attrlist=[self.authentifier.group_name_attribute], + serverctrls=[ANY], ) def test_groups_base_not_found(self): connection = Mock(spec=ldap.ldapobject.LDAPObject) # When group base DN is not found in LDAP, ldap module raises NO_SUCH_OBJECT # exception. - connection.search_s.side_effect = ldap.NO_SUCH_OBJECT("fail") + connection.search_ext.side_effect = ldap.NO_SUCH_OBJECT("fail") with self.assertRaisesRegex( LDAPAuthenticationError, rf"^Unable to find group base {self.authentifier.group_base}$", @@ -657,12 +738,17 @@ def test_get_groups_name_attribute_not_found(self): connection = Mock(spec=ldap.ldapobject.LDAPObject) self.authentifier.group_name_attribute = "fail" # If the group entries in LDAP directory does not contain attributes whose name - # matches group_name_attribute, search_s returns an empty dict in the second + # matches group_name_attribute, search returns an empty dict in the second # element of the result tuple. - connection.search_s.return_value = [ - ("cn=scientists,ou=groups,dc=corp,dc=org", {}), - ("cn=biology,ou=groups,dc=corp,dc=org", {}), - ] + _configure_mock_paged_ldap_connection( + connection, + [ + [ + ("cn=scientists,ou=groups,dc=corp,dc=org", {}), + ("cn=biology,ou=groups,dc=corp,dc=org", {}), + ] + ], + ) with self.assertRaisesRegex( LDAPAuthenticationError, r"^Unable to extract group name with fail attribute from group entries$", @@ -674,8 +760,8 @@ def test_get_groups_name_attribute_not_found(self): def test_get_groups_class_not_found(self): connection = Mock(spec=ldap.ldapobject.LDAPObject) # If entries with one of group_object_classes is not found in group_base - # subtree, search_s returns an empty list. - connection.search_s.return_value = [] + # subtree, search returns an empty list. + _configure_mock_paged_ldap_connection(connection, [[]]) with self.assertLogs("rfl.authentication.ldap", level="WARNING") as cm: groups = self.authentifier._get_groups( connection, "john", "uid=john,ou=people,dc=corp,dc=org", 42 @@ -689,6 +775,7 @@ def test_get_groups_class_not_found(self): ], ) # Test log message without gid + _configure_mock_paged_ldap_connection(connection, [[]]) with self.assertLogs("rfl.authentication.ldap", level="WARNING") as cm: groups = self.authentifier._get_groups( connection, "john", "uid=john,ou=people,dc=corp,dc=org", None @@ -704,10 +791,15 @@ def test_get_groups_class_not_found(self): def test_custom_group_object_classes(self): connection = Mock(spec=ldap.ldapobject.LDAPObject) - connection.search_s.return_value = [ - ("cn=scientists,ou=groups,dc=corp,dc=org", {"cn": [b"scientists"]}), - ("cn=biology,ou=groups,dc=corp,dc=org", {"cn": [b"biology"]}), - ] + _configure_mock_paged_ldap_connection( + connection, + [ + [ + ("cn=scientists,ou=groups,dc=corp,dc=org", {"cn": [b"scientists"]}), + ("cn=biology,ou=groups,dc=corp,dc=org", {"cn": [b"biology"]}), + ] + ], + ) group_object_class = "group" login = "john" gid = 42 @@ -715,12 +807,13 @@ def test_custom_group_object_classes(self): groups = self.authentifier._get_groups( connection, login, f"uid={login},ou=people,dc=corp,dc=org", gid ) - connection.search_s.assert_called_once_with( + connection.search_ext.assert_called_once_with( self.authentifier.group_base, ldap.SCOPE_SUBTREE, f"(&(|(objectClass={group_object_class}))(|(memberUid={login})" f"(member=uid={login},ou=people,dc=corp,dc=org)(gidNumber={gid})))", - [self.authentifier.group_name_attribute], + attrlist=[self.authentifier.group_name_attribute], + serverctrls=[ANY], ) self.assertEqual(groups, ["scientists", "biology"]) @@ -758,10 +851,15 @@ def test_in_restricted_groups(self): def test_list_user_dn(self): connection = Mock(spec=ldap.ldapobject.LDAPObject) - connection.search_s.return_value = [ - ("uid=john,ou=people,dc=corp,dc=org", {"uid": [b"john"]}), - ("uid=marie,ou=people,dc=corp,dc=org", {"uid": [b"marie"]}), - ] + _configure_mock_paged_ldap_connection( + connection, + [ + [ + ("uid=john,ou=people,dc=corp,dc=org", {"uid": [b"john"]}), + ("uid=marie,ou=people,dc=corp,dc=org", {"uid": [b"marie"]}), + ] + ], + ) results = self.authentifier._list_user_dn(connection) self.assertEqual(len(results), 2) self.assertEqual(results[0], ("john", "uid=john,ou=people,dc=corp,dc=org")) @@ -771,7 +869,7 @@ def test_list_user_dn_not_found(self): connection = Mock(spec=ldap.ldapobject.LDAPObject) # When user DN is not found in LDAP, ldap module raises NO_SUCH_OBJECT # exception. - connection.search_s.side_effect = ldap.NO_SUCH_OBJECT("fail") + connection.search_ext.side_effect = ldap.NO_SUCH_OBJECT("fail") with self.assertRaisesRegex( LDAPAuthenticationError, r"^Unable to find user base ou=people,dc=corp,dc=org$", @@ -780,7 +878,7 @@ def test_list_user_dn_not_found(self): def test_list_user_dn_no_result(self): connection = Mock(spec=ldap.ldapobject.LDAPObject) - connection.search_s.return_value = [] + _configure_mock_paged_ldap_connection(connection, [[]]) with self.assertLogs("rfl", level="WARNING") as lc: results = self.authentifier._list_user_dn(connection) self.assertEqual( @@ -796,10 +894,15 @@ def test_list_user_dn_no_user_name_attribute(self): connection = Mock(spec=ldap.ldapobject.LDAPObject) # All results miss the user name attribute, _list_user_dn() is expected to raise # LDAPAuthenticationError. - connection.search_s.return_value = [ - ("uid=john,ou=people,dc=corp,dc=org", {}), - ("uid=jane,ou=people,dc=corp,dc=org", {}), - ] + _configure_mock_paged_ldap_connection( + connection, + [ + [ + ("uid=john,ou=people,dc=corp,dc=org", {}), + ("uid=jane,ou=people,dc=corp,dc=org", {}), + ] + ], + ) with self.assertRaisesRegex( LDAPAuthenticationError, r"^Unable to extract user uid from user entries$", @@ -810,10 +913,15 @@ def test_list_user_dn_missing_user_name_attribute(self): connection = Mock(spec=ldap.ldapobject.LDAPObject) # At least one result has the user name attribute, _list_user_dn() must return # these results and log warning message for other dn that miss this attribute. - connection.search_s.return_value = [ - ("uid=john,ou=people,dc=corp,dc=org", {}), - ("uid=jane,ou=people,dc=corp,dc=org", {"uid": [b"jane"]}), - ] + _configure_mock_paged_ldap_connection( + connection, + [ + [ + ("uid=john,ou=people,dc=corp,dc=org", {}), + ("uid=jane,ou=people,dc=corp,dc=org", {"uid": [b"jane"]}), + ] + ], + ) with self.assertLogs("rfl", level="INFO") as lc: results = self.authentifier._list_user_dn(connection) @@ -969,18 +1077,18 @@ def test_users_bind_dn_invalid_credentials(self, mock_ldap_initialize): ): self.authentifier.users() - @patch.object(ldap.ldapobject.LDAPObject, "search_s") - def test_users_ldap_server_down_error(self, mock_search_s): - mock_search_s.side_effect = ldap.SERVER_DOWN("fail") + @patch.object(ldap.ldapobject.LDAPObject, "search_ext") + def test_users_ldap_server_down_error(self, mock_search_ext): + mock_search_ext.side_effect = ldap.SERVER_DOWN("fail") with self.assertRaisesRegex( LDAPAuthenticationError, rf"^LDAP server {self.authentifier.uri.geturl()} is unreachable$", ): self.authentifier.users() - @patch.object(ldap.ldapobject.LDAPObject, "search_s") - def test_users_ldap_operations_error(self, mock_search_s): - mock_search_s.side_effect = ldap.OPERATIONS_ERROR("fail") + @patch.object(ldap.ldapobject.LDAPObject, "search_ext") + def test_users_ldap_operations_error(self, mock_search_ext): + mock_search_ext.side_effect = ldap.OPERATIONS_ERROR("fail") with self.assertRaisesRegex( LDAPAuthenticationError, r"^Operations error on users search: fail$", @@ -988,7 +1096,127 @@ def test_users_ldap_operations_error(self, mock_search_s): self.authentifier.users() +class TestLDAPPagedSearch(unittest.TestCase): + def setUp(self): + self.authentifier = LDAPAuthentifier( + uri=urllib.parse.urlparse("ldap://localhost"), + user_base="ou=people,dc=corp,dc=org", + group_base="ou=groups,dc=corp,dc=org", + ) + + def test_search_paged_single_page(self): + connection = Mock(spec=ldap.ldapobject.LDAPObject) + entries = [ + ("uid=john,ou=people,dc=corp,dc=org", {"uid": [b"john"]}), + ] + _configure_mock_paged_ldap_connection(connection, [entries]) + results = self.authentifier._search_paged( + connection, + self.authentifier.user_base, + ldap.SCOPE_SUBTREE, + "(objectClass=posixAccount)", + [self.authentifier.user_name_attribute], + ) + self.assertEqual(results, entries) + connection.search_ext.assert_called_once() + connection.result3.assert_called_once() + + def test_search_paged_multiple_pages(self): + connection = Mock(spec=ldap.ldapobject.LDAPObject) + page_one = [("uid=john,ou=people,dc=corp,dc=org", {"uid": [b"john"]})] + page_two = [("uid=marie,ou=people,dc=corp,dc=org", {"uid": [b"marie"]})] + _configure_mock_paged_ldap_connection(connection, [page_one, page_two]) + results = self.authentifier._search_paged( + connection, + self.authentifier.user_base, + ldap.SCOPE_SUBTREE, + "(objectClass=posixAccount)", + [self.authentifier.user_name_attribute], + ) + self.assertEqual(results, page_one + page_two) + self.assertEqual(connection.search_ext.call_count, 2) + self.assertEqual(connection.result3.call_count, 2) + + def test_search_paged_custom_page_size(self): + authentifier = LDAPAuthentifier( + uri=urllib.parse.urlparse("ldap://localhost"), + user_base="ou=people,dc=corp,dc=org", + group_base="ou=groups,dc=corp,dc=org", + page_size=42, + ) + connection = Mock(spec=ldap.ldapobject.LDAPObject) + _configure_mock_paged_ldap_connection(connection, [[]]) + authentifier._search_paged( + connection, + authentifier.user_base, + ldap.SCOPE_SUBTREE, + "(objectClass=posixAccount)", + ) + page_control = connection.search_ext.call_args[1]["serverctrls"][0] + self.assertEqual(page_control.size, 42) + + def test_search_paged_no_control(self): + connection = Mock(spec=ldap.ldapobject.LDAPObject) + entries = [("uid=john,ou=people,dc=corp,dc=org", {"uid": [b"john"]})] + + def search_ext_impl( + base, + scope, + filterstr, + attrlist=None, + attrsonly=0, + serverctrls=None, + clientctrls=None, + timeout=-1, + sizelimit=0, + ): + return 1 + + def result3(msgid, resp_ctrl_classes=None): + return (ldap.RES_SEARCH_RESULT, entries, msgid, None) + + connection.search_ext = Mock(side_effect=search_ext_impl) + connection.result3 = Mock(side_effect=result3) + results = self.authentifier._search_paged( + connection, + self.authentifier.user_base, + ldap.SCOPE_SUBTREE, + "(objectClass=posixAccount)", + ) + self.assertEqual(results, entries) + connection.search_ext.assert_called_once() + + class TestLDAPAuthentifierInit(unittest.TestCase): + def test_page_size_default(self): + auth = LDAPAuthentifier( + uri=urllib.parse.urlparse("ldap://localhost"), + user_base="ou=people,dc=corp,dc=org", + group_base="ou=groups,dc=corp,dc=org", + ) + self.assertEqual(auth.page_size, DEFAULT_LDAP_PAGE_SIZE) + + def test_page_size_custom(self): + auth = LDAPAuthentifier( + uri=urllib.parse.urlparse("ldap://localhost"), + user_base="ou=people,dc=corp,dc=org", + group_base="ou=groups,dc=corp,dc=org", + page_size=250, + ) + self.assertEqual(auth.page_size, 250) + + def test_page_size_invalid(self): + with self.assertRaisesRegex( + LDAPAuthenticationError, + r"^LDAP page_size must be a positive integer, got 0$", + ): + LDAPAuthentifier( + uri=urllib.parse.urlparse("ldap://localhost"), + user_base="ou=people,dc=corp,dc=org", + group_base="ou=groups,dc=corp,dc=org", + page_size=0, + ) + def test_lookup_as_user_auto_bind_dn(self): # lookup_as_user is None, bind_dn and bind_password are set, should do lookup # with service credentials.