Skip to content

Commit

Permalink
CRC removal during diskless full sync with TLS enabled.
Browse files Browse the repository at this point in the history
Signed-off-by: talxsha <[email protected]>
  • Loading branch information
talxsha committed Dec 23, 2024
1 parent d00c856 commit b13cec9
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 11 deletions.
1 change: 1 addition & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3156,6 +3156,7 @@ static int applyClientMaxMemoryUsage(const char **err) {
standardConfig static_configs[] = {
/* Bool configs */
createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL),
createBoolConfig("disable-sync-crc", NULL, MODIFIABLE_CONFIG, server.disable_sync_crc, 0, NULL, NULL),
createBoolConfig("daemonize", NULL, IMMUTABLE_CONFIG, server.daemonize, 0, NULL, NULL),
createBoolConfig("always-show-logo", NULL, IMMUTABLE_CONFIG, server.always_show_logo, 0, NULL, NULL),
createBoolConfig("protected-mode", NULL, MODIFIABLE_CONFIG, server.protected_mode, 1, NULL, NULL),
Expand Down
22 changes: 20 additions & 2 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -3010,6 +3010,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
int error;
long long empty_keys_skipped = 0;

if (rdb->flags & RIO_FLAG_DISABLE_CRC) server.stat_total_crc_disabled_syncs_stated++;
rdb->update_cksum = rdbLoadProgressCallback;
rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
if (rioRead(rdb, buf, 9) == 0) goto eoferr;
Expand Down Expand Up @@ -3354,7 +3355,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
if (rioRead(rdb, &cksum, 8) == 0) goto eoferr;
if (server.rdb_checksum && !server.skip_checksum_validation) {
memrev64ifbe(&cksum);
if (cksum == 0) {
if (cksum == 0 || (rdb->flags & RIO_FLAG_DISABLE_CRC) != 0) {
serverLog(LL_NOTICE, "RDB file was saved with checksum disabled: no check performed.");
} else if (cksum != expected) {
serverLog(LL_WARNING,
Expand Down Expand Up @@ -3545,8 +3546,13 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
safe_to_exit_pipe = pipefds[0]; /* read end */
server.rdb_child_exit_pipe = pipefds[1]; /* write end */
}
/*
* For replicas with repl_state == REPLICA_STATE_WAIT_BGSAVE_END and replica_req == req:
* Check replica capabilities, if every replica supports disabled CRC, run with CRC disabled, otherwise, use CRC.
*/
int disable_sync_crc_capa = server.disable_sync_crc;
/* Collect the connections of the replicas we want to transfer
* the RDB to, which are i WAIT_BGSAVE_START state. */
* the RDB to, which are in WAIT_BGSAVE_START state. */
int connsnum = 0;
connection **conns = zmalloc(sizeof(connection *) * listLength(server.replicas));
server.rdb_pipe_conns = NULL;
Expand Down Expand Up @@ -3578,6 +3584,11 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset());
}

// do not disable CRC on the primary if TLS is disabled or if the replica doesn't support it
if (!connIsTLS(replica->conn) || (replica->replica_capa & REPLICA_CAPA_DISABLE_SYNC_CRC) == 0)
disable_sync_crc_capa = 0;

}

/* Create the child process. */
Expand All @@ -3601,6 +3612,12 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
serverSetCpuAffinity(server.bgsave_cpulist);

if (disable_sync_crc_capa == 1) {
serverLog(LL_NOTICE, "CRC checksum is disabled for this RDB transfer");
// mark rdb object to skip CRC checksum calculations
rdb.flags |= RIO_FLAG_DISABLE_CRC;
}

retval = rdbSaveRioWithEOFMark(req, &rdb, NULL, rsi);
if (retval == C_OK && rioFlush(&rdb) == 0) retval = C_ERR;

Expand Down Expand Up @@ -3666,6 +3683,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
}
if (!dual_channel) close(safe_to_exit_pipe);
if (disable_sync_crc_capa) server.stat_total_crc_disabled_syncs_stated++;
return (childpid == -1) ? C_ERR : C_OK;
}
return C_OK; /* Unreached. */
Expand Down
28 changes: 23 additions & 5 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1244,11 +1244,12 @@ void syncCommand(client *c) {
* the primary can accurately lists replicas and their listening ports in the
* INFO output.
*
* - capa <eof|psync2|dual-channel>
* - capa <eof|psync2|dual-channel|disable_sync_crc>
* What is the capabilities of this instance.
* eof: supports EOF-style RDB transfer for diskless replication.
* psync2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
* dual-channel: supports full sync using rdb channel.
* disable_sync_crc: supports disabling CRC during TLS enabled diskless sync.
*
* - ack <offset> [fack <aofofs>]
* Replica informs the primary the amount of replication stream that it
Expand Down Expand Up @@ -1314,7 +1315,8 @@ void replconfCommand(client *c) {
/* If dual-channel is disable on this primary, treat this command as unrecognized
* replconf option. */
c->replica_capa |= REPLICA_CAPA_DUAL_CHANNEL;
}
} else if (!strcasecmp(c->argv[j + 1]->ptr, REPLICA_CAPA_DISABLE_SYNC_CRC_STR))
c->replica_capa |= REPLICA_CAPA_DISABLE_SYNC_CRC;
} else if (!strcasecmp(c->argv[j]->ptr, "ack")) {
/* REPLCONF ACK is used by replica to inform the primary the amount
* of replication stream that it processed so far. It is an
Expand Down Expand Up @@ -2084,6 +2086,12 @@ void readSyncBulkPayload(connection *conn) {
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: receiving %lld bytes from primary %s",
(long long)server.repl_transfer_size, use_diskless_load ? "to parser" : "to disk");
}

// Set a flag to determin later whether or not the replica will skip CRC calculations for this sync -
// Disable CRC on replica if: (1) TLS is enabled; (2) replica disable_sync_crc is enabled; (3) diskelss sync enabled on both replica and primary.
// Otherwise, CRC should be enabled/disabled as per server.rdb_checksum
if (connIsTLS(conn) && server.disable_sync_crc && use_diskless_load && usemark)
server.repl_meet_disable_crc_cond = 1;
return;
}

Expand Down Expand Up @@ -2251,6 +2259,7 @@ void readSyncBulkPayload(connection *conn) {

serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading);
if (server.repl_meet_disable_crc_cond == 1) rdb.flags |= RIO_FLAG_DISABLE_CRC;

int loadingFailed = 0;
rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx};
Expand Down Expand Up @@ -2493,6 +2502,7 @@ char *sendCommand(connection *conn, ...) {
while (1) {
arg = va_arg(ap, char *);
if (arg == NULL) break;
if (strcmp(arg, "") == 0) continue;
cmdargs = sdscatprintf(cmdargs, "$%zu\r\n%s\r\n", strlen(arg), arg);
argslen++;
}
Expand Down Expand Up @@ -3513,11 +3523,19 @@ void syncWithPrimary(connection *conn) {
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
*
* DISABLE-SYNC-CRC: supports disabling CRC calculations during full sync.
* Inform the primary of this capa only during diskless sync with TLS enabled.
* In disk-based sync, or non-TLS, there is more concern for data corruprion
* so we keep this extra layer of detection.
*
* The primary will ignore capabilities it does not understand. */
server.repl_meet_disable_crc_cond = 0; // reset this value before sync starts
int send_disable_crc_capa = (connIsTLS(conn) && server.disable_sync_crc && useDisklessLoad());
err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2",
server.dual_channel_replication ? "capa" : NULL,
server.dual_channel_replication ? "dual-channel" : NULL, NULL);
send_disable_crc_capa ? "capa" : "",
send_disable_crc_capa ? REPLICA_CAPA_DISABLE_SYNC_CRC_STR : "",
server.dual_channel_replication ? "capa" : "",
server.dual_channel_replication ? "dual-channel" : "", NULL);
if (err) goto write_error;

/* Inform the primary of our (replica) version. */
Expand Down
1 change: 1 addition & 0 deletions src/rio.c
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ void rioFreeFd(rio *r) {
/* This function can be installed both in memory and file streams when checksum
* computation is needed. */
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
if ((r->flags & RIO_FLAG_DISABLE_CRC) != 0) return; // skip CRC64 calculations
r->cksum = crc64(r->cksum, buf, len);
}

Expand Down
1 change: 1 addition & 0 deletions src/rio.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#define RIO_FLAG_READ_ERROR (1 << 0)
#define RIO_FLAG_WRITE_ERROR (1 << 1)
#define RIO_FLAG_DISABLE_CRC (1 << 2)

#define RIO_TYPE_FILE (1 << 0)
#define RIO_TYPE_BUFFER (1 << 1)
Expand Down
3 changes: 3 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2218,6 +2218,7 @@ void initServerConfig(void) {
server.fsynced_reploff_pending = 0;
server.rdb_client_id = -1;
server.loading_process_events_interval_ms = LOADING_PROCESS_EVENTS_INTERVAL_DEFAULT;
server.repl_meet_disable_crc_cond = 0;

/* Replication partial resync backlog */
server.repl_backlog = NULL;
Expand Down Expand Up @@ -2638,6 +2639,7 @@ void resetServerStats(void) {
server.stat_fork_rate = 0;
server.stat_total_forks = 0;
server.stat_rejected_conn = 0;
server.stat_total_crc_disabled_syncs_stated = 0;
server.stat_sync_full = 0;
server.stat_sync_partial_ok = 0;
server.stat_sync_partial_err = 0;
Expand Down Expand Up @@ -5878,6 +5880,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"instantaneous_input_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION) / 1024,
"instantaneous_output_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT_REPLICATION) / 1024,
"rejected_connections:%lld\r\n", server.stat_rejected_conn,
"total_crc_disabled_syncs_stated:%ld\r\n", server.stat_total_crc_disabled_syncs_stated,
"sync_full:%lld\r\n", server.stat_sync_full,
"sync_partial_ok:%lld\r\n", server.stat_sync_partial_ok,
"sync_partial_err:%lld\r\n", server.stat_sync_partial_err,
Expand Down
16 changes: 12 additions & 4 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -433,11 +433,15 @@ typedef enum {
* a replica that only wants RDB without replication buffer */
#define REPLICA_STATE_BG_RDB_LOAD 11 /* Main channel of a replica which uses dual channel replication. */

/* Replica capabilities. */
/* Replica capability flags */
#define REPLICA_CAPA_NONE 0
#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */
#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */
#define REPLICA_CAPA_DUAL_CHANNEL (1 << 2) /* Supports dual channel replication sync */
#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */
#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */
#define REPLICA_CAPA_DUAL_CHANNEL (1 << 2) /* Supports dual channel replication sync */
#define REPLICA_CAPA_DISABLE_SYNC_CRC (1 << 3) /* Disable CRC checks for sync requests. */

/* Replica capability strings */
#define REPLICA_CAPA_DISABLE_SYNC_CRC_STR "disable-sync-crc" /* Disable CRC calculations during full sync */

/* Replica requirements */
#define REPLICA_REQ_NONE 0
Expand Down Expand Up @@ -1838,6 +1842,7 @@ struct valkeyServer {
double stat_fork_rate; /* Fork rate in GB/sec. */
long long stat_total_forks; /* Total count of fork. */
long long stat_rejected_conn; /* Clients rejected because of maxclients */
size_t stat_total_crc_disabled_syncs_stated; /* Total number of full syncs stated with CRC checksum disabled */ // AMZN
long long stat_sync_full; /* Number of full resyncs with replicas. */
long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */
long long stat_sync_partial_err; /* Number of unaccepted PSYNC requests. */
Expand Down Expand Up @@ -1986,6 +1991,7 @@ struct valkeyServer {
char *rdb_filename; /* Name of RDB file */
int rdb_compression; /* Use compression in RDB? */
int rdb_checksum; /* Use RDB checksum? */
int disable_sync_crc; /* Use RDB checksum during sync? Applicable only for TLS enabled diskless sync */
int rdb_del_sync_files; /* Remove RDB files used only for SYNC if
the instance does not use persistence. */
time_t lastsave; /* Unix time of last successful save */
Expand Down Expand Up @@ -2112,6 +2118,8 @@ struct valkeyServer {
* when it receives an error on the replication stream */
int repl_ignore_disk_write_error; /* Configures whether replicas panic when unable to
* persist writes to AOF. */
int repl_meet_disable_crc_cond; /* Set to true only when replica meets all conditions for disabling CRC */

/* The following two fields is where we store primary PSYNC replid/offset
* while the PSYNC is in progress. At the end we'll copy the fields into
* the server->primary client structure. */
Expand Down
45 changes: 45 additions & 0 deletions tests/integration/disable-sync-crc.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
start_server {tags {"repl tls"} overrides {save {}}} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
foreach master_disable_crc {yes no} {
foreach replica_disable_crc {yes no} {
foreach mds {no yes} {
foreach sdl {disabled on-empty-db swapdb flush-before-load} {
test "CRC disabled sync - master:$master_disable_crc, replica:$replica_disable_crc, tls:$::tls, repl_diskless_sync:$mds, repl_diskless_load:$sdl" {
$master config set disable-sync-crc $master_disable_crc
$master config set repl-diskless-sync $mds
start_server {overrides {save {}}} {
set replica [srv 0 client]

$replica config set disable-sync-crc $replica_disable_crc
$replica config set repl-diskless-load $sdl

$replica replicaof $master_host $master_port

wait_for_condition 50 100 {
[string match {*master_link_status:up*} [$replica info replication]]
} else {
fail "Replication not started"
}
set is_master_crc_disabled [string match {*total_crc_disabled_syncs_started:1*} [$master info stats]]
set is_replica_crc_disabled [string match {*total_crc_disabled_syncs_started:1*} [$replica info stats]]

if {$replica_disable_crc eq "no" || $sdl eq "disabled" || $mds eq "no" || !$::tls} {
assert_equal 0 $is_master_crc_disabled "Master should not have CRC disabled"
assert_equal 0 $is_replica_crc_disabled "Replica should not have CRC disabled"
} else {
if {$replica_disable_crc eq "no"} {
assert_equal 0 $is_master_crc_disabled "Master should not have CRC disabled"
} else {
assert_equal 1 $is_master_crc_disabled "Master should have CRC disabled"
}
assert_equal 1 $is_replica_crc_disabled "Replica should have CRC disabled"
}
}
}
}
}
}
}
}

0 comments on commit b13cec9

Please sign in to comment.