forked from 389ds/389-ds-base
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
104 additions
and
0 deletions.
There are no files selected for viewing
104 changes: 104 additions & 0 deletions
104
dirsrvtests/tests/suites/schema/schema_standard_update.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
# --- BEGIN COPYRIGHT BLOCK --- | ||
# Copyright (C) 2024 Red Hat, Inc. | ||
# All rights reserved. | ||
# | ||
# License: GPL (version 3 or any later version). | ||
# See LICENSE for details. | ||
# --- END COPYRIGHT BLOCK --- | ||
# | ||
import logging | ||
import time | ||
import ldap | ||
import pytest | ||
from lib389._constants import SUFFIX, DEFAULT_SUFFIX | ||
from lib389.replica import ReplicationManager | ||
from lib389.topologies import topology_m2 as topology | ||
from lib389.schema import Schema | ||
from lib389.idm.user import UserAccounts | ||
from lib389.utils import * | ||
|
||
logging.getLogger(__name__).setLevel(logging.DEBUG) | ||
log = logging.getLogger(__name__) | ||
|
||
|
||
def trigger_update(topology, user_rdn, num): | ||
"""It triggers an update on the supplier. This will start a replication | ||
session and a schema push | ||
""" | ||
|
||
users_s = UserAccounts(topology.ms["supplier1"], DEFAULT_SUFFIX) | ||
user = users_s.get(user_rdn) | ||
user.replace('telephonenumber', str(num)) | ||
|
||
# wait until the update is replicated (until up to x seconds) | ||
users_c = UserAccounts(topology.ms["supplier2"], DEFAULT_SUFFIX) | ||
for _ in range(30): | ||
try: | ||
user = users_c.get(user_rdn) | ||
val = user.get_attr_val_int('telephonenumber') | ||
if val == num: | ||
return | ||
# the expected value is not yet replicated. try again | ||
time.sleep(1) | ||
log.debug(f"trigger_update: receive {val} (expected {num})") | ||
except ldap.NO_SUCH_OBJECT: | ||
time.sleep(1) | ||
|
||
|
||
def trigger_schema_push(topology, user_rdn, num): | ||
"""Triggers a schema push from the supplier to the consumer or hub.""" | ||
|
||
m1 = topology.ms["supplier1"] | ||
m2 = topology.ms["supplier2"] | ||
|
||
agreements = m1.agreement.list(suffix=SUFFIX, | ||
consumer_host=m2.host, | ||
consumer_port=m2.port) | ||
assert (len(agreements) == 1) | ||
ra = agreements[0] | ||
trigger_update(topology, user_rdn, num) | ||
m1.agreement.pause(ra.dn) | ||
m1.agreement.resume(ra.dn) | ||
trigger_update(topology, user_rdn, num) | ||
|
||
|
||
@pytest.fixture(scope="function") | ||
def schema_replication_init(topology): | ||
"""Initialize the test environment """ | ||
|
||
supplier = topology.ms["supplier1"] | ||
users = UserAccounts(supplier, DEFAULT_SUFFIX) | ||
user = users.create_test_user() | ||
user.replace('telephonenumber', '0') | ||
|
||
return user | ||
|
||
|
||
def test_custom_schema_rewrites_standard_schema(topology, schema_replication_init): | ||
repl = ReplicationManager(DEFAULT_SUFFIX) | ||
user = schema_replication_init | ||
m1 = topology.ms["supplier1"] | ||
m2 = topology.ms["supplier2"] | ||
schema_m1 = Schema(m1) | ||
schema_m2 = Schema(m2) | ||
attr_type = schema_m1.query_attributetype('surname')[0] | ||
|
||
# Replace the standard schema with the custom one | ||
attr_type = attr_type.replace('caseIgnoreMatch', 'caseIgnoreIA5Match') | ||
attr_type = attr_type.replace('caseIgnoreSubstringsMatch', 'caseIgnoreIA5SubstringsMatch') | ||
|
||
schema_m1.add('attributetypes', attr_type) | ||
|
||
trigger_schema_push(topology, user.rdn, 1) | ||
repl.wait_for_replication(m1, m2) | ||
schema_csn_m1 = m1.schema.get_schema_csn() | ||
schema_csn_m2 = m2.schema.get_schema_csn() | ||
assert schema_csn_m1 == schema_csn_m2 | ||
|
||
assert 'caseIgnoreIA5Match' in schema_m1.query_attributetype('surname')[0] | ||
assert 'caseIgnoreIA5Match' in schema_m2.query_attributetype('surname')[0] | ||
|
||
m2.restart() | ||
|
||
assert 'caseIgnoreIA5Match' in schema_m1.query_attributetype('surname')[0] | ||
assert 'caseIgnoreIA5Match' in schema_m2.query_attributetype('surname')[0] |