Skip to content

Commit

Permalink
Issue 6280 - Changelog trims updates from a given RID even if a consu…
Browse files Browse the repository at this point in the history
…mer has not received any of them

Bug description:
        If a consumer has not received any of the updates from a given RID, the RUV
        (in the replication agreement) contains no csn.
        In such case, the changelog trimming thread compute the starting purge csn
        using the supplier RUV instead of the RA RUV.
        Later the supplier will miss this csn to update the consumer and
        replication is broken

Fix description:
	The function that iterates the elements of the replication agreement RUV
        should call the callback (_cl5EnumConsumerRUV) with all elements even those
        containing empty CSN.
	The callback (_cl5EnumConsumerRUV) should remove the RID
	from the replica RUV in case the RA.RUV is empty.

relates: 389ds#6280

Reviewed by:
  • Loading branch information
tbordaz committed Jul 31, 2024
1 parent aef1668 commit 1601239
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 10 deletions.
23 changes: 19 additions & 4 deletions ldap/servers/plugins/replication/cl5_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -2455,7 +2455,7 @@ _cl5CheckMaxRUV(cldb_Handle *cldb, RUV *maxruv)
{
int rc = 0;

rc = ruv_enumerate_elements(maxruv, _cl5CheckCSNinCL, (void *)cldb);
rc = ruv_enumerate_elements(maxruv, _cl5CheckCSNinCL, (void *)cldb, 0 /* all_elements */);

return rc;
}
Expand Down Expand Up @@ -3154,6 +3154,12 @@ _cl5UpdateRUV (cldb_Handle *cldb, CSN *csn, PRBool newReplica, PRBool purge)
return CL5_SUCCESS;
}

/*
* This callback is used to determine the point from where
* the replication changelog will be trimmed.
* It uses the RUV of the replica (ruv) and the element of RUV of the replication agreement.
* It keeps, in the replica.RUV, the smallest csn between replica.RUV and RA.RUV.
*/
static int
_cl5EnumConsumerRUV(const ruv_enum_data *element, void *arg)
{
Expand All @@ -3165,6 +3171,15 @@ _cl5EnumConsumerRUV(const ruv_enum_data *element, void *arg)

ruv = (RUV *)arg;

/*
* If RA contains no csn (the consumer never received update generated from this RID)
* then the trimming should ignore the RID that is in the ruv of the replica
*/
if (element->csn == NULL) {
ruv_delete_replica(ruv, element->rid);
return 0;
}

rc = ruv_get_largest_csn_for_replica(ruv, csn_get_replicaid(element->csn), &csn);
if (rc != RUV_SUCCESS || csn == NULL || csn_compare(element->csn, csn) < 0) {
ruv_set_max_csn(ruv, element->csn, NULL);
Expand Down Expand Up @@ -3213,7 +3228,7 @@ _cl5GetRUV2Purge2(Replica *replica, RUV **ruv)
consRUVObj = agmt_get_consumer_ruv(agmt);
if (consRUVObj) {
consRUV = (RUV *)object_get_data(consRUVObj);
rc = ruv_enumerate_elements(consRUV, _cl5EnumConsumerRUV, *ruv);
rc = ruv_enumerate_elements(consRUV, _cl5EnumConsumerRUV, *ruv, 1 /* all_elements */);
if (rc != RUV_SUCCESS) {
slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name_cl, "_cl5GetRUV2Purge2 - "
"Failed to construct ruv; ruv error - %d\n",
Expand Down Expand Up @@ -4118,10 +4133,10 @@ cl5BuildCSNList(const RUV *consRuv, const RUV *supRuv)
data.pos = 0;

/* add consumer elements to the list */
rc = ruv_enumerate_elements(consRuv, ruv_consumer_iterator, &data);
rc = ruv_enumerate_elements(consRuv, ruv_consumer_iterator, &data, 0 /* all_elements */);
if (rc == 0 && supRuv) {
/* add supplier elements to the list */
rc = ruv_enumerate_elements(supRuv, ruv_supplier_iterator, &data);
rc = ruv_enumerate_elements(supRuv, ruv_supplier_iterator, &data, 0 /* all_elements */);
}

/* we have no csns */
Expand Down
2 changes: 1 addition & 1 deletion ldap/servers/plugins/replication/cl5_clcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ static int
clcache_refresh_local_maxcsns(CLC_Buffer *buf)
{

return ruv_enumerate_elements(buf->buf_local_ruv, clcache_refresh_local_maxcsn, buf);
return ruv_enumerate_elements(buf->buf_local_ruv, clcache_refresh_local_maxcsn, buf, 0 /* all_elements */);
}

/*
Expand Down
2 changes: 1 addition & 1 deletion ldap/servers/plugins/replication/repl5_replica.c
Original file line number Diff line number Diff line change
Expand Up @@ -3694,7 +3694,7 @@ replica_log_ruv_elements_nolock(const Replica *r)
/* we log it as a delete operation to have the least number of fields
to set. the entry can be identified by a special target uniqueid and
special target dn */
rc = ruv_enumerate_elements(ruv, replica_log_start_iteration, (void *)r);
rc = ruv_enumerate_elements(ruv, replica_log_start_iteration, (void *)r, 0 /* all_elements */);
return rc;
}

Expand Down
10 changes: 7 additions & 3 deletions ldap/servers/plugins/replication/repl5_ruv.c
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ ruv_get_min_csn_ext(const RUV *ruv, CSN **csn, int ignore_cleaned_rid)
}

int
ruv_enumerate_elements(const RUV *ruv, FNEnumRUV fn, void *arg)
ruv_enumerate_elements(const RUV *ruv, FNEnumRUV fn, void *arg, int all_elements)
{
int cookie;
RUVElement *elem;
Expand All @@ -987,8 +987,12 @@ ruv_enumerate_elements(const RUV *ruv, FNEnumRUV fn, void *arg)
slapi_rwlock_rdlock(ruv->lock);
for (elem = (RUVElement *)dl_get_first(ruv->elements, &cookie); elem;
elem = (RUVElement *)dl_get_next(ruv->elements, &cookie)) {
/* we only return elements that contains both minimal and maximal CSNs */
if (elem->csn && elem->min_csn) {
/*
* Either we return all elements or
* only those that contains both minimal and maximal CSNs
*/
if (all_elements || (elem->csn && elem->min_csn)) {
enum_data.rid = elem->rid;
enum_data.csn = elem->csn;
enum_data.min_csn = elem->min_csn;
rc = fn(&enum_data, arg);
Expand Down
3 changes: 2 additions & 1 deletion ldap/servers/plugins/replication/repl5_ruv.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ enum

typedef struct ruv_enum_data
{
ReplicaId rid;
CSN *csn;
CSN *min_csn;
} ruv_enum_data;
Expand Down Expand Up @@ -99,7 +100,7 @@ int ruv_get_max_csn(const RUV *ruv, CSN **csn);
int ruv_get_max_csn_ext(const RUV *ruv, CSN **csn, int ignore_cleaned_rid);
int ruv_get_rid_max_csn(const RUV *ruv, CSN **csn, ReplicaId rid);
int ruv_get_rid_max_csn_ext(const RUV *ruv, CSN **csn, ReplicaId rid, int ignore_cleaned_rid);
int ruv_enumerate_elements(const RUV *ruv, FNEnumRUV fn, void *arg);
int ruv_enumerate_elements(const RUV *ruv, FNEnumRUV fn, void *arg, int all_elements);
Slapi_Value **ruv_last_modified_to_valuearray(RUV *ruv);
Slapi_Value **ruv_to_valuearray(RUV *ruv);
int ruv_to_smod(const RUV *ruv, Slapi_Mod *smod);
Expand Down

0 comments on commit 1601239

Please sign in to comment.