Skip to content

Commit

Permalink
Issue 6280 - Changelog trims updates from a given RID even if a consu…
Browse files Browse the repository at this point in the history
…mer has not received any of them

Bug description:
        If a consumer has not received any of the updates from a given RID, the RUV
        (in the replication agreement) contains no csn.
        In such case, the changelog trimming thread compute the starting purge csn
        using the supplier RUV instead of the RA RUV.
        Later the supplier will miss this csn to update the consumer and
        replication is broken

Fix description:
	The function that iterates the elements of the replication agreement RUV
        should call the callback (_cl5EnumConsumerRUV) with all elements even those
        containing empty CSN.
	The callback (_cl5EnumConsumerRUV) should remove the RID
	from the replica RUV in case the RA.RUV is empty.

relates: 389ds#6280

Reviewed by: Pierre Rogier, Simon Pichugin (Thanks)
  • Loading branch information
tbordaz committed Aug 26, 2024
1 parent e70c22f commit e17f751
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 10 deletions.
109 changes: 109 additions & 0 deletions dirsrvtests/tests/suites/replication/changelog_trimming_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@
from lib389._constants import *
from lib389.properties import *
from lib389.topologies import topology_m1 as topo
from lib389.topologies import topology_m1c1
from lib389.tasks import *
from lib389.replica import Changelog5
from lib389.idm.domain import Domain
from lib389.idm.user import UserAccounts
from lib389.utils import ensure_bytes, ds_supports_new_changelog
from lib389.replica import ReplicationManager
from lib389._constants import DN_LDBM

pytestmark = pytest.mark.tier1

Expand Down Expand Up @@ -168,6 +173,110 @@ def test_max_entries(topo, setup_max_entries):
log.fatal('Trimming event did not occur')
assert False

def test_cl_trim_ignore_empty_ruv_element(topology_m1c1):
"""Test trimming is not done on RID not yet replicated
:id: b4b42d8d-dfd1-482e-aca4-8f484761c541
:setup: single supplier and single consumer
:steps:
1. Initialize S1 with new dataset (S1 and C1 are out of sync)
2. Export LDIF the replicated dataset from S1. This LDIF does contain an empty RUV
3. Do a dummy update on S1 (still not replicated to C1)
4. Stop S1 so that it will not try to update C1
5. Reinit C1 from LDIF from step 2
6. Switch to readonly mode the default backend so that it will NOT apply any change (direct or replicated)
7. Start S1, as C1 is in readlonly
8. Adjust trim interval/maxage on S1
9. Wait for maxage+5s for the trimming to occur
10. Switch to not readlonly the default backend so that it will apply updates sent by S1
11. Check that C1 catch up and a new update is also replicated
12. Check S1 had not trim the starting point CSN as we can not find the message "Can't locate CSN"
:expectedresults:
1. successfull export/import on S1
2. successfull export on S1
3. successfull update
4. success
5. successfull init of C1
6. successfull enable readonly
7. success
8. successfull setting for trimming
9. success
10. successfull disable readonly
11. success
12. message not found
"""
log.info("Test trimming is not done on RID not yet replicated...")

s1 = topology_m1c1.ms['supplier1']
s1_tasks = Tasks(s1)
s1_tasks.log = log
c1 = topology_m1c1.cs['consumer1']
c1_tasks = Tasks(c1)
c1_tasks.log = log

# Step 1 - reinit (export/import) S1 with an empty RUV
ldif_file = "{}/export.ldif".format(s1.get_ldif_dir())
args = {EXPORT_REPL_INFO: False, # to get a ldif without RUV
TASK_WAIT: True}
s1_tasks.exportLDIF(DEFAULT_SUFFIX, None, ldif_file, args)
args = {TASK_WAIT: True}
s1_tasks.importLDIF(DEFAULT_SUFFIX, None, ldif_file, args)

# Step 2 - At this point S1 and C1 are out of sync as we reinited S1
# with new dataset
# Export the DB (with empty RUV) to be used to reinit C1
ldif_file = "{}/export_empty_ruv.ldif".format(s1.get_ldif_dir())
args = {EXPORT_REPL_INFO: True,
TASK_WAIT: True}
s1_tasks.exportLDIF(DEFAULT_SUFFIX, None, ldif_file, args)

# - Step 3 - Dummy update on s1 to kick the RUV
test_users_s1 = UserAccounts(s1, DEFAULT_SUFFIX, rdn=None)
user_1 = test_users_s1.create_test_user(uid=1000)

# - Step 4 - Stop supplier1 to prevent it to update C1 once this one
s1.stop()

# - Step 5 - reinit C1 with empty RUV ldif file
args = {TASK_WAIT: True}
c1_tasks.importLDIF(DEFAULT_SUFFIX, None, ldif_file, args)

# - Step 6 - Put default backend of C1 into read only mode
c1.modify_s('cn=%s,%s' % (DEFAULT_BENAME, DN_LDBM), [(ldap.MOD_REPLACE, 'nsslapd-readonly', b'on')])

# - Step 7 - Now it is safe to start S1, no update will apply on C1
# and C1 RUV will stay empty
s1.start()

# - Step 8 - set Triming interval to 2sec to have frequent trimming
# and max-age to 8sec to not wait too long
if ds_supports_new_changelog():
set_value(s1, MAXAGE, '8')
set_value(s1, TRIMINTERVAL, '2')
else:
cl = Changelog5(s1)
cl.set_max_age('8')
cl.set_trim_interval('2')

# - Step 9 - Wait for the trimming to occur
time.sleep(30)

# - Step 10 - disable read-only mode on the default backend of C1
c1.modify_s('cn=%s,%s' % (DEFAULT_BENAME, DN_LDBM), [(ldap.MOD_REPLACE, 'nsslapd-readonly', b'off')])

# - Step 11 - Check that M1 and M2 are in sync
repl = ReplicationManager(DEFAULT_SUFFIX)
repl.wait_for_replication(s1, c1, timeout=20)

# Recheck after another update
user_2 = test_users_s1.create_test_user(uid=2000)
repl.wait_for_replication(s1, c1, timeout=20)

# - Step 12 - check in infamous message when a CSN
# is missing from the changelog
assert not s1.ds_error_log.match('.*Can.t locate CSN}.*')

if __name__ == '__main__':
# Run isolated
Expand Down
23 changes: 19 additions & 4 deletions ldap/servers/plugins/replication/cl5_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -2455,7 +2455,7 @@ _cl5CheckMaxRUV(cldb_Handle *cldb, RUV *maxruv)
{
int rc = 0;

rc = ruv_enumerate_elements(maxruv, _cl5CheckCSNinCL, (void *)cldb);
rc = ruv_enumerate_elements(maxruv, _cl5CheckCSNinCL, (void *)cldb, 0 /* all_elements */);

return rc;
}
Expand Down Expand Up @@ -3154,6 +3154,12 @@ _cl5UpdateRUV (cldb_Handle *cldb, CSN *csn, PRBool newReplica, PRBool purge)
return CL5_SUCCESS;
}

/*
* This callback is used to determine the point from where
* the replication changelog will be trimmed.
* It uses the RUV of the replica (ruv) and the element of RUV of the replication agreement.
* It keeps, in the replica.RUV, the smallest csn between replica.RUV and RA.RUV.
*/
static int
_cl5EnumConsumerRUV(const ruv_enum_data *element, void *arg)
{
Expand All @@ -3165,6 +3171,15 @@ _cl5EnumConsumerRUV(const ruv_enum_data *element, void *arg)

ruv = (RUV *)arg;

/*
* If RA contains no csn (the consumer never received update generated from this RID)
* then the trimming should ignore the RID that is in the ruv of the replica
*/
if (element->csn == NULL) {
ruv_delete_replica(ruv, element->rid);
return 0;
}

rc = ruv_get_largest_csn_for_replica(ruv, csn_get_replicaid(element->csn), &csn);
if (rc != RUV_SUCCESS || csn == NULL || csn_compare(element->csn, csn) < 0) {
ruv_set_max_csn(ruv, element->csn, NULL);
Expand Down Expand Up @@ -3213,7 +3228,7 @@ _cl5GetRUV2Purge2(Replica *replica, RUV **ruv)
consRUVObj = agmt_get_consumer_ruv(agmt);
if (consRUVObj) {
consRUV = (RUV *)object_get_data(consRUVObj);
rc = ruv_enumerate_elements(consRUV, _cl5EnumConsumerRUV, *ruv);
rc = ruv_enumerate_elements(consRUV, _cl5EnumConsumerRUV, *ruv, 1 /* all_elements */);
if (rc != RUV_SUCCESS) {
slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5GetRUV2Purge2 - "
"Failed to construct ruv; ruv error - %d\n",
Expand Down Expand Up @@ -4118,10 +4133,10 @@ cl5BuildCSNList(const RUV *consRuv, const RUV *supRuv)
data.pos = 0;

/* add consumer elements to the list */
rc = ruv_enumerate_elements(consRuv, ruv_consumer_iterator, &data);
rc = ruv_enumerate_elements(consRuv, ruv_consumer_iterator, &data, 0 /* all_elements */);
if (rc == 0 && supRuv) {
/* add supplier elements to the list */
rc = ruv_enumerate_elements(supRuv, ruv_supplier_iterator, &data);
rc = ruv_enumerate_elements(supRuv, ruv_supplier_iterator, &data, 0 /* all_elements */);
}

/* we have no csns */
Expand Down
2 changes: 1 addition & 1 deletion ldap/servers/plugins/replication/cl5_clcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ static int
clcache_refresh_local_maxcsns(CLC_Buffer *buf)
{

return ruv_enumerate_elements(buf->buf_local_ruv, clcache_refresh_local_maxcsn, buf);
return ruv_enumerate_elements(buf->buf_local_ruv, clcache_refresh_local_maxcsn, buf, 0 /* all_elements */);
}

/*
Expand Down
2 changes: 1 addition & 1 deletion ldap/servers/plugins/replication/repl5_replica.c
Original file line number Diff line number Diff line change
Expand Up @@ -3694,7 +3694,7 @@ replica_log_ruv_elements_nolock(const Replica *r)
/* we log it as a delete operation to have the least number of fields
to set. the entry can be identified by a special target uniqueid and
special target dn */
rc = ruv_enumerate_elements(ruv, replica_log_start_iteration, (void *)r);
rc = ruv_enumerate_elements(ruv, replica_log_start_iteration, (void *)r, 0 /* all_elements */);
return rc;
}

Expand Down
10 changes: 7 additions & 3 deletions ldap/servers/plugins/replication/repl5_ruv.c
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ ruv_get_min_csn_ext(const RUV *ruv, CSN **csn, int ignore_cleaned_rid)
}

int
ruv_enumerate_elements(const RUV *ruv, FNEnumRUV fn, void *arg)
ruv_enumerate_elements(const RUV *ruv, FNEnumRUV fn, void *arg, int all_elements)
{
int cookie;
RUVElement *elem;
Expand All @@ -987,8 +987,12 @@ ruv_enumerate_elements(const RUV *ruv, FNEnumRUV fn, void *arg)
slapi_rwlock_rdlock(ruv->lock);
for (elem = (RUVElement *)dl_get_first(ruv->elements, &cookie); elem;
elem = (RUVElement *)dl_get_next(ruv->elements, &cookie)) {
/* we only return elements that contains both minimal and maximal CSNs */
if (elem->csn && elem->min_csn) {
/*
* Either we return all elements or
* only those that contains both minimal and maximal CSNs
*/
if (all_elements || (elem->csn && elem->min_csn)) {
enum_data.rid = elem->rid;
enum_data.csn = elem->csn;
enum_data.min_csn = elem->min_csn;
rc = fn(&enum_data, arg);
Expand Down
3 changes: 2 additions & 1 deletion ldap/servers/plugins/replication/repl5_ruv.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ enum

typedef struct ruv_enum_data
{
ReplicaId rid;
CSN *csn;
CSN *min_csn;
} ruv_enum_data;
Expand Down Expand Up @@ -99,7 +100,7 @@ int ruv_get_max_csn(const RUV *ruv, CSN **csn);
int ruv_get_max_csn_ext(const RUV *ruv, CSN **csn, int ignore_cleaned_rid);
int ruv_get_rid_max_csn(const RUV *ruv, CSN **csn, ReplicaId rid);
int ruv_get_rid_max_csn_ext(const RUV *ruv, CSN **csn, ReplicaId rid, int ignore_cleaned_rid);
int ruv_enumerate_elements(const RUV *ruv, FNEnumRUV fn, void *arg);
int ruv_enumerate_elements(const RUV *ruv, FNEnumRUV fn, void *arg, int all_elements);
Slapi_Value **ruv_last_modified_to_valuearray(RUV *ruv);
Slapi_Value **ruv_to_valuearray(RUV *ruv);
int ruv_to_smod(const RUV *ruv, Slapi_Mod *smod);
Expand Down

0 comments on commit e17f751

Please sign in to comment.