diff --git a/dirsrvtests/tests/suites/filter/filter_test.py b/dirsrvtests/tests/suites/filter/filter_test.py index 471b595948..79d87959ba 100644 --- a/dirsrvtests/tests/suites/filter/filter_test.py +++ b/dirsrvtests/tests/suites/filter/filter_test.py @@ -1,13 +1,13 @@ # --- BEGIN COPYRIGHT BLOCK --- -# Copyright (C) 2020 Red Hat, Inc. +# Copyright (C) 2020-2025 Red Hat, Inc. # All rights reserved. # # License: GPL (version 3 or any later version). # See LICENSE for details. # --- END COPYRIGHT BLOCK --- # +import ldap import logging - import pytest import time from ldap import SCOPE_SUBTREE @@ -111,6 +111,7 @@ def test_filter_search_original_attrs(topology_st): log.info('test_filter_search_original_attrs: PASSED') + def test_filter_scope_one(topology_st): """Test ldapsearch with scope one gives only single entry @@ -132,6 +133,7 @@ def test_filter_scope_one(topology_st): log.info('Search should only have one entry') assert len(results) == 1 + def test_filter_with_attribute_subtype(topology_st): """Adds 2 test entries and Search with filters including subtype and ! @@ -183,10 +185,12 @@ def test_filter_with_attribute_subtype(topology_st): entry_en_only.setValues('cn', entry_name_en_only) entry_en_only.setValues('cn;en', entry_name_en) - topology_st.standalone.log.info("Try to add Add %s: %r" % (entry_dn_both, entry_both)) + topology_st.standalone.log.info("Try to add Add %s: %r" % (entry_dn_both, + entry_both)) topology_st.standalone.add_s(entry_both) - topology_st.standalone.log.info("Try to add Add %s: %r" % (entry_dn_en_only, entry_en_only)) + topology_st.standalone.log.info("Try to add Add %s: %r" % (entry_dn_en_only, + entry_en_only)) topology_st.standalone.add_s(entry_en_only) topology_st.standalone.log.info("\n\n######################### SEARCH ######################\n") @@ -224,6 +228,7 @@ def test_filter_with_attribute_subtype(topology_st): log.info('Testcase PASSED') + def test_extended_search(topology_st): """Test we can search with equality extended matching rule @@ -305,6 +310,7 @@ def test_extended_search(topology_st): ents = topology_st.standalone.search_s(SUFFIX, ldap.SCOPE_SUBTREE, myfilter) assert len(ents) == 1 + def test_match_large_valueset(topology_st): """Test that when returning a big number of entries and that we need to match the filter from a large valueset @@ -364,24 +370,27 @@ def test_match_large_valueset(topology_st): perf_user_rdn = "user1000" # Step 1. Prepare the backends and tune the groups entrycache - try: - be_users = backends.create(properties={'parent': DEFAULT_SUFFIX, 'nsslapd-suffix': users_suffix, 'name': users_backend}) - be_groups = backends.create(properties={'parent': DEFAULT_SUFFIX, 'nsslapd-suffix': groups_suffix, 'name': groups_backend}) - - # set the entry cache to 200Mb as the 1K groups of 2K users require at least 170Mb - if get_default_db_lib() == "bdb": - config_ldbm = DSLdapObject(inst, DN_CONFIG_LDBM) - config_ldbm.set('nsslapd-cache-autosize', '0') - be_groups.replace('nsslapd-cachememsize', groups_entrycache) - except: - raise + backends.create(properties={'parent': DEFAULT_SUFFIX, + 'nsslapd-suffix': users_suffix, + 'name': users_backend}) + be_groups = backends.create(properties={'parent': DEFAULT_SUFFIX, + 'nsslapd-suffix': groups_suffix, + 'name': groups_backend}) + + # Set the entry cache to 200Mb as the 1K groups of 2K users require at + # least 170Mb + if get_default_db_lib() == "bdb": + config_ldbm = DSLdapObject(inst, DN_CONFIG_LDBM) + config_ldbm.set('nsslapd-cache-autosize', '0') + be_groups.replace('nsslapd-cachememsize', groups_entrycache) # Step 2. Generate a test ldif (10k users entries) log.info("Generating users LDIF...") ldif_dir = inst.get_ldif_dir() users_import_ldif = "%s/%s" % (ldif_dir, users_ldif) groups_import_ldif = "%s/%s" % (ldif_dir, groups_ldif) - dbgen_users(inst, users_number, users_import_ldif, suffix=users_suffix, generic=True, parent=users_suffix) + dbgen_users(inst, users_number, users_import_ldif, suffix=DEFAULT_SUFFIX, + generic=True, parent=users_suffix) # Generate a test ldif (800 groups with 10k members) that fit in 700Mb entry cache props = { @@ -421,13 +430,15 @@ def test_match_large_valueset(topology_st): # Step 6. Gather the etime from the access log inst.stop() access_log = DirsrvAccessLog(inst) - search_result = access_log.match(".*RESULT err=0 tag=101 nentries=%s.*" % groups_number) + search_result = access_log.match(".*RESULT err=0 tag=101 nentries=%s.*" % + groups_number) log.info("Found patterns are %s", search_result[0]) log.info("Found patterns are %s", search_result[1]) etime = float(search_result[1].split('etime=')[1]) log.info("Duration of the search from access log was %f", etime) assert len(entries) == groups_number - assert (etime < 5) + assert etime < 5 + def test_filter_not_operator(topology_st, request): """Test ldapsearch with scope one gives only single entry @@ -467,38 +478,47 @@ def test_filter_not_operator(topology_st, request): 'cn': f'bit {user}', 'sn': user.title(), 'manager': f'uid={user},{SUFFIX}', - 'userpassword': PW_DM, + 'userpassword': "password", 'homeDirectory': '/home/' + user, 'uidNumber': '1000', 'gidNumber': '2000', }) # Adding specific values to the users log.info('Adding telephonenumber values') - user = UserAccount(topology_st.standalone, 'uid=user1, ou=people, %s' % DEFAULT_SUFFIX) + user = UserAccount(topology_st.standalone, + 'uid=user1, ou=people, %s' % DEFAULT_SUFFIX) user.add('telephonenumber', ['1234', '2345']) - user = UserAccount(topology_st.standalone, 'uid=user2, ou=people, %s' % DEFAULT_SUFFIX) + user = UserAccount(topology_st.standalone, + 'uid=user2, ou=people, %s' % DEFAULT_SUFFIX) user.add('telephonenumber', ['1234', '4567']) - user = UserAccount(topology_st.standalone, 'uid=user3, ou=people, %s' % DEFAULT_SUFFIX) + user = UserAccount(topology_st.standalone, + 'uid=user3, ou=people, %s' % DEFAULT_SUFFIX) user.add('telephonenumber', ['1234', '4567']) - user = UserAccount(topology_st.standalone, 'uid=user4, ou=people, %s' % DEFAULT_SUFFIX) + user = UserAccount(topology_st.standalone, + 'uid=user4, ou=people, %s' % DEFAULT_SUFFIX) user.add('telephonenumber', ['1234']) - user = UserAccount(topology_st.standalone, 'uid=user5, ou=people, %s' % DEFAULT_SUFFIX) + user = UserAccount(topology_st.standalone, + 'uid=user5, ou=people, %s' % DEFAULT_SUFFIX) user.add('telephonenumber', ['2345']) - user = UserAccount(topology_st.standalone, 'uid=user6, ou=people, %s' % DEFAULT_SUFFIX) + user = UserAccount(topology_st.standalone, + 'uid=user6, ou=people, %s' % DEFAULT_SUFFIX) user.add('telephonenumber', ['3456']) - user = UserAccount(topology_st.standalone, 'uid=user7, ou=people, %s' % DEFAULT_SUFFIX) + user = UserAccount(topology_st.standalone, + 'uid=user7, ou=people, %s' % DEFAULT_SUFFIX) user.add('telephonenumber', ['4567']) - user = UserAccount(topology_st.standalone, 'uid=user8, ou=people, %s' % DEFAULT_SUFFIX) + user = UserAccount(topology_st.standalone, + 'uid=user8, ou=people, %s' % DEFAULT_SUFFIX) user.add('telephonenumber', ['1234']) - user = UserAccount(topology_st.standalone, 'uid=user9, ou=people, %s' % DEFAULT_SUFFIX) + user = UserAccount(topology_st.standalone, + 'uid=user9, ou=people, %s' % DEFAULT_SUFFIX) user.add('telephonenumber', ['1234', '4567']) # Do a first test of filter containing a NOT @@ -506,7 +526,8 @@ def test_filter_not_operator(topology_st, request): log.info('Search with filter containing NOT') log.info('expect user2, user3, user6, user8 and user9') filter1 = "(|(telephoneNumber=3456)(&(telephoneNumber=1234)(!(|(uid=user1)(uid=user4)))))" - entries = topology_st.standalone.search_s(DEFAULT_SUFFIX, SCOPE_SUBTREE, filter1) + entries = topology_st.standalone.search_s(DEFAULT_SUFFIX, SCOPE_SUBTREE, + filter1) uids = [] for entry in entries: assert entry.hasAttr('uid') @@ -521,7 +542,8 @@ def test_filter_not_operator(topology_st, request): log.info('Search with a second filter containing NOT') log.info('expect user2, user3, user8 and user9') filter1 = "(|(&(telephoneNumber=1234)(!(|(uid=user1)(uid=user4)))))" - entries = topology_st.standalone.search_s(DEFAULT_SUFFIX, SCOPE_SUBTREE, filter1) + entries = topology_st.standalone.search_s(DEFAULT_SUFFIX, SCOPE_SUBTREE, + filter1) uids = [] for entry in entries: assert entry.hasAttr('uid') @@ -539,7 +561,6 @@ def fin(): pass user.delete() - request.addfinalizer(fin)