From 468729f0346318c427d37fc8985d19e604356c45 Mon Sep 17 00:00:00 2001 From: Tal Shachar Date: Mon, 23 Dec 2024 15:20:10 +0000 Subject: [PATCH] CRC removal during diskless full sync with TLS enabled Signed-off-by: Tal Shachar --- src/config.c | 1 + src/rdb.c | 22 +++++++++++-- src/replication.c | 28 +++++++++++++--- src/rio.c | 1 + src/rio.h | 1 + src/server.c | 3 ++ src/server.h | 16 ++++++--- tests/integration/disable-sync-crc.tcl | 45 ++++++++++++++++++++++++++ 8 files changed, 106 insertions(+), 11 deletions(-) create mode 100644 tests/integration/disable-sync-crc.tcl diff --git a/src/config.c b/src/config.c index e1cee3f95b..98fb0c63ef 100644 --- a/src/config.c +++ b/src/config.c @@ -3156,6 +3156,7 @@ static int applyClientMaxMemoryUsage(const char **err) { standardConfig static_configs[] = { /* Bool configs */ createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL), + createBoolConfig("disable-sync-crc", NULL, MODIFIABLE_CONFIG, server.disable_sync_crc, 0, NULL, NULL), createBoolConfig("daemonize", NULL, IMMUTABLE_CONFIG, server.daemonize, 0, NULL, NULL), createBoolConfig("always-show-logo", NULL, IMMUTABLE_CONFIG, server.always_show_logo, 0, NULL, NULL), createBoolConfig("protected-mode", NULL, MODIFIABLE_CONFIG, server.protected_mode, 1, NULL, NULL), diff --git a/src/rdb.c b/src/rdb.c index 5fb77a2897..fa325102c7 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3010,6 +3010,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin int error; long long empty_keys_skipped = 0; + if (rdb->flags & RIO_FLAG_DISABLE_CRC) server.stat_total_crc_disabled_syncs_stated++; rdb->update_cksum = rdbLoadProgressCallback; rdb->max_processing_chunk = server.loading_process_events_interval_bytes; if (rioRead(rdb, buf, 9) == 0) goto eoferr; @@ -3354,7 +3355,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin if (rioRead(rdb, &cksum, 8) == 0) goto eoferr; if (server.rdb_checksum && !server.skip_checksum_validation) { memrev64ifbe(&cksum); - if (cksum == 0) { + if (cksum == 0 || (rdb->flags & RIO_FLAG_DISABLE_CRC) != 0) { serverLog(LL_NOTICE, "RDB file was saved with checksum disabled: no check performed."); } else if (cksum != expected) { serverLog(LL_WARNING, @@ -3545,8 +3546,13 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { safe_to_exit_pipe = pipefds[0]; /* read end */ server.rdb_child_exit_pipe = pipefds[1]; /* write end */ } + /* + * For replicas with repl_state == REPLICA_STATE_WAIT_BGSAVE_END and replica_req == req: + * Check replica capabilities, if every replica supports disabled CRC, run with CRC disabled, otherwise, use CRC. + */ + int disable_sync_crc_capa = server.disable_sync_crc; /* Collect the connections of the replicas we want to transfer - * the RDB to, which are i WAIT_BGSAVE_START state. */ + * the RDB to, which are in WAIT_BGSAVE_START state. */ int connsnum = 0; connection **conns = zmalloc(sizeof(connection *) * listLength(server.replicas)); server.rdb_pipe_conns = NULL; @@ -3578,6 +3584,11 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { } replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset()); } + + // do not disable CRC on the primary if TLS is disabled or if the replica doesn't support it + if (!connIsTLS(replica->conn) || (replica->replica_capa & REPLICA_CAPA_DISABLE_SYNC_CRC) == 0) + disable_sync_crc_capa = 0; + } /* Create the child process. */ @@ -3601,6 +3612,12 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { } serverSetCpuAffinity(server.bgsave_cpulist); + if (disable_sync_crc_capa == 1) { + serverLog(LL_NOTICE, "CRC checksum is disabled for this RDB transfer"); + // mark rdb object to skip CRC checksum calculations + rdb.flags |= RIO_FLAG_DISABLE_CRC; + } + retval = rdbSaveRioWithEOFMark(req, &rdb, NULL, rsi); if (retval == C_OK && rioFlush(&rdb) == 0) retval = C_ERR; @@ -3666,6 +3683,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { } } if (!dual_channel) close(safe_to_exit_pipe); + if (disable_sync_crc_capa) server.stat_total_crc_disabled_syncs_stated++; return (childpid == -1) ? C_ERR : C_OK; } return C_OK; /* Unreached. */ diff --git a/src/replication.c b/src/replication.c index 3a207a1d0f..fd70215dd9 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1244,11 +1244,12 @@ void syncCommand(client *c) { * the primary can accurately lists replicas and their listening ports in the * INFO output. * - * - capa + * - capa * What is the capabilities of this instance. * eof: supports EOF-style RDB transfer for diskless replication. * psync2: supports PSYNC v2, so understands +CONTINUE . * dual-channel: supports full sync using rdb channel. + * disable_sync_crc: supports disabling CRC during TLS enabled diskless sync. * * - ack [fack ] * Replica informs the primary the amount of replication stream that it @@ -1314,7 +1315,8 @@ void replconfCommand(client *c) { /* If dual-channel is disable on this primary, treat this command as unrecognized * replconf option. */ c->replica_capa |= REPLICA_CAPA_DUAL_CHANNEL; - } + } else if (!strcasecmp(c->argv[j + 1]->ptr, REPLICA_CAPA_DISABLE_SYNC_CRC_STR)) + c->replica_capa |= REPLICA_CAPA_DISABLE_SYNC_CRC; } else if (!strcasecmp(c->argv[j]->ptr, "ack")) { /* REPLCONF ACK is used by replica to inform the primary the amount * of replication stream that it processed so far. It is an @@ -2084,6 +2086,12 @@ void readSyncBulkPayload(connection *conn) { serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: receiving %lld bytes from primary %s", (long long)server.repl_transfer_size, use_diskless_load ? "to parser" : "to disk"); } + + // Set a flag to determin later whether or not the replica will skip CRC calculations for this sync - + // Disable CRC on replica if: (1) TLS is enabled; (2) replica disable_sync_crc is enabled; (3) diskelss sync enabled on both replica and primary. + // Otherwise, CRC should be enabled/disabled as per server.rdb_checksum + if (connIsTLS(conn) && server.disable_sync_crc && use_diskless_load && usemark) + server.repl_meet_disable_crc_cond = 1; return; } @@ -2251,6 +2259,7 @@ void readSyncBulkPayload(connection *conn) { serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading); + if (server.repl_meet_disable_crc_cond == 1) rdb.flags |= RIO_FLAG_DISABLE_CRC; int loadingFailed = 0; rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx}; @@ -2493,6 +2502,7 @@ char *sendCommand(connection *conn, ...) { while (1) { arg = va_arg(ap, char *); if (arg == NULL) break; + if (strcmp(arg, "") == 0) continue; cmdargs = sdscatprintf(cmdargs, "$%zu\r\n%s\r\n", strlen(arg), arg); argslen++; } @@ -3513,11 +3523,19 @@ void syncWithPrimary(connection *conn) { * * EOF: supports EOF-style RDB transfer for diskless replication. * PSYNC2: supports PSYNC v2, so understands +CONTINUE . - * + * DISABLE-SYNC-CRC: supports disabling CRC calculations during full sync. + * Inform the primary of this capa only during diskless sync with TLS enabled. + * In disk-based sync, or non-TLS, there is more concern for data corruprion + * so we keep this extra layer of detection. + * * The primary will ignore capabilities it does not understand. */ + server.repl_meet_disable_crc_cond = 0; // reset this value before sync starts + int send_disable_crc_capa = (connIsTLS(conn) && server.disable_sync_crc && useDisklessLoad()); err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2", - server.dual_channel_replication ? "capa" : NULL, - server.dual_channel_replication ? "dual-channel" : NULL, NULL); + send_disable_crc_capa ? "capa" : "", + send_disable_crc_capa ? REPLICA_CAPA_DISABLE_SYNC_CRC_STR : "", + server.dual_channel_replication ? "capa" : "", + server.dual_channel_replication ? "dual-channel" : "", NULL); if (err) goto write_error; /* Inform the primary of our (replica) version. */ diff --git a/src/rio.c b/src/rio.c index f2bf1fdb3c..2620669f6e 100644 --- a/src/rio.c +++ b/src/rio.c @@ -425,6 +425,7 @@ void rioFreeFd(rio *r) { /* This function can be installed both in memory and file streams when checksum * computation is needed. */ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) { + if ((r->flags & RIO_FLAG_DISABLE_CRC) != 0) return; // skip CRC64 calculations r->cksum = crc64(r->cksum, buf, len); } diff --git a/src/rio.h b/src/rio.h index ee0f27aa7e..ad2772d55d 100644 --- a/src/rio.h +++ b/src/rio.h @@ -39,6 +39,7 @@ #define RIO_FLAG_READ_ERROR (1 << 0) #define RIO_FLAG_WRITE_ERROR (1 << 1) +#define RIO_FLAG_DISABLE_CRC (1 << 2) #define RIO_TYPE_FILE (1 << 0) #define RIO_TYPE_BUFFER (1 << 1) diff --git a/src/server.c b/src/server.c index 3cdec9fa9b..b8cd0ae942 100644 --- a/src/server.c +++ b/src/server.c @@ -2218,6 +2218,7 @@ void initServerConfig(void) { server.fsynced_reploff_pending = 0; server.rdb_client_id = -1; server.loading_process_events_interval_ms = LOADING_PROCESS_EVENTS_INTERVAL_DEFAULT; + server.repl_meet_disable_crc_cond = 0; /* Replication partial resync backlog */ server.repl_backlog = NULL; @@ -2638,6 +2639,7 @@ void resetServerStats(void) { server.stat_fork_rate = 0; server.stat_total_forks = 0; server.stat_rejected_conn = 0; + server.stat_total_crc_disabled_syncs_stated = 0; server.stat_sync_full = 0; server.stat_sync_partial_ok = 0; server.stat_sync_partial_err = 0; @@ -5878,6 +5880,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "instantaneous_input_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION) / 1024, "instantaneous_output_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT_REPLICATION) / 1024, "rejected_connections:%lld\r\n", server.stat_rejected_conn, + "total_crc_disabled_syncs_stated:%ld\r\n", server.stat_total_crc_disabled_syncs_stated, "sync_full:%lld\r\n", server.stat_sync_full, "sync_partial_ok:%lld\r\n", server.stat_sync_partial_ok, "sync_partial_err:%lld\r\n", server.stat_sync_partial_err, diff --git a/src/server.h b/src/server.h index 841db70614..3bbc222225 100644 --- a/src/server.h +++ b/src/server.h @@ -433,11 +433,15 @@ typedef enum { * a replica that only wants RDB without replication buffer */ #define REPLICA_STATE_BG_RDB_LOAD 11 /* Main channel of a replica which uses dual channel replication. */ -/* Replica capabilities. */ +/* Replica capability flags */ #define REPLICA_CAPA_NONE 0 -#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */ -#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */ -#define REPLICA_CAPA_DUAL_CHANNEL (1 << 2) /* Supports dual channel replication sync */ +#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */ +#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */ +#define REPLICA_CAPA_DUAL_CHANNEL (1 << 2) /* Supports dual channel replication sync */ +#define REPLICA_CAPA_DISABLE_SYNC_CRC (1 << 3) /* Disable CRC checks for sync requests. */ + +/* Replica capability strings */ +#define REPLICA_CAPA_DISABLE_SYNC_CRC_STR "disable-sync-crc" /* Disable CRC calculations during full sync */ /* Replica requirements */ #define REPLICA_REQ_NONE 0 @@ -1838,6 +1842,7 @@ struct valkeyServer { double stat_fork_rate; /* Fork rate in GB/sec. */ long long stat_total_forks; /* Total count of fork. */ long long stat_rejected_conn; /* Clients rejected because of maxclients */ + size_t stat_total_crc_disabled_syncs_stated; /* Total number of full syncs stated with CRC checksum disabled */ // AMZN long long stat_sync_full; /* Number of full resyncs with replicas. */ long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */ long long stat_sync_partial_err; /* Number of unaccepted PSYNC requests. */ @@ -1986,6 +1991,7 @@ struct valkeyServer { char *rdb_filename; /* Name of RDB file */ int rdb_compression; /* Use compression in RDB? */ int rdb_checksum; /* Use RDB checksum? */ + int disable_sync_crc; /* Use RDB checksum during sync? Applicable only for TLS enabled diskless sync */ int rdb_del_sync_files; /* Remove RDB files used only for SYNC if the instance does not use persistence. */ time_t lastsave; /* Unix time of last successful save */ @@ -2112,6 +2118,8 @@ struct valkeyServer { * when it receives an error on the replication stream */ int repl_ignore_disk_write_error; /* Configures whether replicas panic when unable to * persist writes to AOF. */ + int repl_meet_disable_crc_cond; /* Set to true only when replica meets all conditions for disabling CRC */ + /* The following two fields is where we store primary PSYNC replid/offset * while the PSYNC is in progress. At the end we'll copy the fields into * the server->primary client structure. */ diff --git a/tests/integration/disable-sync-crc.tcl b/tests/integration/disable-sync-crc.tcl new file mode 100644 index 0000000000..8bd9309861 --- /dev/null +++ b/tests/integration/disable-sync-crc.tcl @@ -0,0 +1,45 @@ +start_server {tags {"repl tls"} overrides {save {}}} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + foreach master_disable_crc {yes no} { + foreach replica_disable_crc {yes no} { + foreach mds {no yes} { + foreach sdl {disabled on-empty-db swapdb flush-before-load} { + test "CRC disabled sync - master:$master_disable_crc, replica:$replica_disable_crc, tls:$::tls, repl_diskless_sync:$mds, repl_diskless_load:$sdl" { + $master config set disable-sync-crc $master_disable_crc + $master config set repl-diskless-sync $mds + start_server {overrides {save {}}} { + set replica [srv 0 client] + + $replica config set disable-sync-crc $replica_disable_crc + $replica config set repl-diskless-load $sdl + + $replica replicaof $master_host $master_port + + wait_for_condition 50 100 { + [string match {*master_link_status:up*} [$replica info replication]] + } else { + fail "Replication not started" + } + set is_master_crc_disabled [string match {*total_crc_disabled_syncs_started:1*} [$master info stats]] + set is_replica_crc_disabled [string match {*total_crc_disabled_syncs_started:1*} [$replica info stats]] + + if {$replica_disable_crc eq "no" || $sdl eq "disabled" || $mds eq "no" || !$::tls} { + assert_equal 0 $is_master_crc_disabled "Master should not have CRC disabled" + assert_equal 0 $is_replica_crc_disabled "Replica should not have CRC disabled" + } else { + if {$replica_disable_crc eq "no"} { + assert_equal 0 $is_master_crc_disabled "Master should not have CRC disabled" + } else { + assert_equal 1 $is_master_crc_disabled "Master should have CRC disabled" + } + assert_equal 1 $is_replica_crc_disabled "Replica should have CRC disabled" + } + } + } + } + } + } + } +}