From 67fa361b146521eacd034cf9b56f53aba7081e30 Mon Sep 17 00:00:00 2001 From: Tal Shachar Date: Mon, 23 Dec 2024 15:20:10 +0000 Subject: [PATCH 1/8] CRC removal during diskless full sync with TLS enabled Signed-off-by: Tal Shachar --- src/config.c | 1 + src/rdb.c | 22 +++++++++++-- src/replication.c | 28 +++++++++++++--- src/rio.c | 1 + src/rio.h | 1 + src/server.c | 3 ++ src/server.h | 16 ++++++--- tests/integration/disable-sync-crc.tcl | 45 ++++++++++++++++++++++++++ 8 files changed, 106 insertions(+), 11 deletions(-) create mode 100644 tests/integration/disable-sync-crc.tcl diff --git a/src/config.c b/src/config.c index e1cee3f95b..98fb0c63ef 100644 --- a/src/config.c +++ b/src/config.c @@ -3156,6 +3156,7 @@ static int applyClientMaxMemoryUsage(const char **err) { standardConfig static_configs[] = { /* Bool configs */ createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL), + createBoolConfig("disable-sync-crc", NULL, MODIFIABLE_CONFIG, server.disable_sync_crc, 0, NULL, NULL), createBoolConfig("daemonize", NULL, IMMUTABLE_CONFIG, server.daemonize, 0, NULL, NULL), createBoolConfig("always-show-logo", NULL, IMMUTABLE_CONFIG, server.always_show_logo, 0, NULL, NULL), createBoolConfig("protected-mode", NULL, MODIFIABLE_CONFIG, server.protected_mode, 1, NULL, NULL), diff --git a/src/rdb.c b/src/rdb.c index 5fb77a2897..fa325102c7 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3010,6 +3010,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin int error; long long empty_keys_skipped = 0; + if (rdb->flags & RIO_FLAG_DISABLE_CRC) server.stat_total_crc_disabled_syncs_stated++; rdb->update_cksum = rdbLoadProgressCallback; rdb->max_processing_chunk = server.loading_process_events_interval_bytes; if (rioRead(rdb, buf, 9) == 0) goto eoferr; @@ -3354,7 +3355,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin if (rioRead(rdb, &cksum, 8) == 0) goto eoferr; if (server.rdb_checksum && !server.skip_checksum_validation) { memrev64ifbe(&cksum); - if (cksum == 0) { + if (cksum == 0 || (rdb->flags & RIO_FLAG_DISABLE_CRC) != 0) { serverLog(LL_NOTICE, "RDB file was saved with checksum disabled: no check performed."); } else if (cksum != expected) { serverLog(LL_WARNING, @@ -3545,8 +3546,13 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { safe_to_exit_pipe = pipefds[0]; /* read end */ server.rdb_child_exit_pipe = pipefds[1]; /* write end */ } + /* + * For replicas with repl_state == REPLICA_STATE_WAIT_BGSAVE_END and replica_req == req: + * Check replica capabilities, if every replica supports disabled CRC, run with CRC disabled, otherwise, use CRC. + */ + int disable_sync_crc_capa = server.disable_sync_crc; /* Collect the connections of the replicas we want to transfer - * the RDB to, which are i WAIT_BGSAVE_START state. */ + * the RDB to, which are in WAIT_BGSAVE_START state. */ int connsnum = 0; connection **conns = zmalloc(sizeof(connection *) * listLength(server.replicas)); server.rdb_pipe_conns = NULL; @@ -3578,6 +3584,11 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { } replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset()); } + + // do not disable CRC on the primary if TLS is disabled or if the replica doesn't support it + if (!connIsTLS(replica->conn) || (replica->replica_capa & REPLICA_CAPA_DISABLE_SYNC_CRC) == 0) + disable_sync_crc_capa = 0; + } /* Create the child process. */ @@ -3601,6 +3612,12 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { } serverSetCpuAffinity(server.bgsave_cpulist); + if (disable_sync_crc_capa == 1) { + serverLog(LL_NOTICE, "CRC checksum is disabled for this RDB transfer"); + // mark rdb object to skip CRC checksum calculations + rdb.flags |= RIO_FLAG_DISABLE_CRC; + } + retval = rdbSaveRioWithEOFMark(req, &rdb, NULL, rsi); if (retval == C_OK && rioFlush(&rdb) == 0) retval = C_ERR; @@ -3666,6 +3683,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { } } if (!dual_channel) close(safe_to_exit_pipe); + if (disable_sync_crc_capa) server.stat_total_crc_disabled_syncs_stated++; return (childpid == -1) ? C_ERR : C_OK; } return C_OK; /* Unreached. */ diff --git a/src/replication.c b/src/replication.c index 3a207a1d0f..fd70215dd9 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1244,11 +1244,12 @@ void syncCommand(client *c) { * the primary can accurately lists replicas and their listening ports in the * INFO output. * - * - capa + * - capa * What is the capabilities of this instance. * eof: supports EOF-style RDB transfer for diskless replication. * psync2: supports PSYNC v2, so understands +CONTINUE . * dual-channel: supports full sync using rdb channel. + * disable_sync_crc: supports disabling CRC during TLS enabled diskless sync. * * - ack [fack ] * Replica informs the primary the amount of replication stream that it @@ -1314,7 +1315,8 @@ void replconfCommand(client *c) { /* If dual-channel is disable on this primary, treat this command as unrecognized * replconf option. */ c->replica_capa |= REPLICA_CAPA_DUAL_CHANNEL; - } + } else if (!strcasecmp(c->argv[j + 1]->ptr, REPLICA_CAPA_DISABLE_SYNC_CRC_STR)) + c->replica_capa |= REPLICA_CAPA_DISABLE_SYNC_CRC; } else if (!strcasecmp(c->argv[j]->ptr, "ack")) { /* REPLCONF ACK is used by replica to inform the primary the amount * of replication stream that it processed so far. It is an @@ -2084,6 +2086,12 @@ void readSyncBulkPayload(connection *conn) { serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: receiving %lld bytes from primary %s", (long long)server.repl_transfer_size, use_diskless_load ? "to parser" : "to disk"); } + + // Set a flag to determin later whether or not the replica will skip CRC calculations for this sync - + // Disable CRC on replica if: (1) TLS is enabled; (2) replica disable_sync_crc is enabled; (3) diskelss sync enabled on both replica and primary. + // Otherwise, CRC should be enabled/disabled as per server.rdb_checksum + if (connIsTLS(conn) && server.disable_sync_crc && use_diskless_load && usemark) + server.repl_meet_disable_crc_cond = 1; return; } @@ -2251,6 +2259,7 @@ void readSyncBulkPayload(connection *conn) { serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading); + if (server.repl_meet_disable_crc_cond == 1) rdb.flags |= RIO_FLAG_DISABLE_CRC; int loadingFailed = 0; rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx}; @@ -2493,6 +2502,7 @@ char *sendCommand(connection *conn, ...) { while (1) { arg = va_arg(ap, char *); if (arg == NULL) break; + if (strcmp(arg, "") == 0) continue; cmdargs = sdscatprintf(cmdargs, "$%zu\r\n%s\r\n", strlen(arg), arg); argslen++; } @@ -3513,11 +3523,19 @@ void syncWithPrimary(connection *conn) { * * EOF: supports EOF-style RDB transfer for diskless replication. * PSYNC2: supports PSYNC v2, so understands +CONTINUE . - * + * DISABLE-SYNC-CRC: supports disabling CRC calculations during full sync. + * Inform the primary of this capa only during diskless sync with TLS enabled. + * In disk-based sync, or non-TLS, there is more concern for data corruprion + * so we keep this extra layer of detection. + * * The primary will ignore capabilities it does not understand. */ + server.repl_meet_disable_crc_cond = 0; // reset this value before sync starts + int send_disable_crc_capa = (connIsTLS(conn) && server.disable_sync_crc && useDisklessLoad()); err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2", - server.dual_channel_replication ? "capa" : NULL, - server.dual_channel_replication ? "dual-channel" : NULL, NULL); + send_disable_crc_capa ? "capa" : "", + send_disable_crc_capa ? REPLICA_CAPA_DISABLE_SYNC_CRC_STR : "", + server.dual_channel_replication ? "capa" : "", + server.dual_channel_replication ? "dual-channel" : "", NULL); if (err) goto write_error; /* Inform the primary of our (replica) version. */ diff --git a/src/rio.c b/src/rio.c index f2bf1fdb3c..2620669f6e 100644 --- a/src/rio.c +++ b/src/rio.c @@ -425,6 +425,7 @@ void rioFreeFd(rio *r) { /* This function can be installed both in memory and file streams when checksum * computation is needed. */ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) { + if ((r->flags & RIO_FLAG_DISABLE_CRC) != 0) return; // skip CRC64 calculations r->cksum = crc64(r->cksum, buf, len); } diff --git a/src/rio.h b/src/rio.h index ee0f27aa7e..ad2772d55d 100644 --- a/src/rio.h +++ b/src/rio.h @@ -39,6 +39,7 @@ #define RIO_FLAG_READ_ERROR (1 << 0) #define RIO_FLAG_WRITE_ERROR (1 << 1) +#define RIO_FLAG_DISABLE_CRC (1 << 2) #define RIO_TYPE_FILE (1 << 0) #define RIO_TYPE_BUFFER (1 << 1) diff --git a/src/server.c b/src/server.c index 3cdec9fa9b..b8cd0ae942 100644 --- a/src/server.c +++ b/src/server.c @@ -2218,6 +2218,7 @@ void initServerConfig(void) { server.fsynced_reploff_pending = 0; server.rdb_client_id = -1; server.loading_process_events_interval_ms = LOADING_PROCESS_EVENTS_INTERVAL_DEFAULT; + server.repl_meet_disable_crc_cond = 0; /* Replication partial resync backlog */ server.repl_backlog = NULL; @@ -2638,6 +2639,7 @@ void resetServerStats(void) { server.stat_fork_rate = 0; server.stat_total_forks = 0; server.stat_rejected_conn = 0; + server.stat_total_crc_disabled_syncs_stated = 0; server.stat_sync_full = 0; server.stat_sync_partial_ok = 0; server.stat_sync_partial_err = 0; @@ -5878,6 +5880,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "instantaneous_input_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION) / 1024, "instantaneous_output_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT_REPLICATION) / 1024, "rejected_connections:%lld\r\n", server.stat_rejected_conn, + "total_crc_disabled_syncs_stated:%ld\r\n", server.stat_total_crc_disabled_syncs_stated, "sync_full:%lld\r\n", server.stat_sync_full, "sync_partial_ok:%lld\r\n", server.stat_sync_partial_ok, "sync_partial_err:%lld\r\n", server.stat_sync_partial_err, diff --git a/src/server.h b/src/server.h index 841db70614..3bbc222225 100644 --- a/src/server.h +++ b/src/server.h @@ -433,11 +433,15 @@ typedef enum { * a replica that only wants RDB without replication buffer */ #define REPLICA_STATE_BG_RDB_LOAD 11 /* Main channel of a replica which uses dual channel replication. */ -/* Replica capabilities. */ +/* Replica capability flags */ #define REPLICA_CAPA_NONE 0 -#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */ -#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */ -#define REPLICA_CAPA_DUAL_CHANNEL (1 << 2) /* Supports dual channel replication sync */ +#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */ +#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */ +#define REPLICA_CAPA_DUAL_CHANNEL (1 << 2) /* Supports dual channel replication sync */ +#define REPLICA_CAPA_DISABLE_SYNC_CRC (1 << 3) /* Disable CRC checks for sync requests. */ + +/* Replica capability strings */ +#define REPLICA_CAPA_DISABLE_SYNC_CRC_STR "disable-sync-crc" /* Disable CRC calculations during full sync */ /* Replica requirements */ #define REPLICA_REQ_NONE 0 @@ -1838,6 +1842,7 @@ struct valkeyServer { double stat_fork_rate; /* Fork rate in GB/sec. */ long long stat_total_forks; /* Total count of fork. */ long long stat_rejected_conn; /* Clients rejected because of maxclients */ + size_t stat_total_crc_disabled_syncs_stated; /* Total number of full syncs stated with CRC checksum disabled */ // AMZN long long stat_sync_full; /* Number of full resyncs with replicas. */ long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */ long long stat_sync_partial_err; /* Number of unaccepted PSYNC requests. */ @@ -1986,6 +1991,7 @@ struct valkeyServer { char *rdb_filename; /* Name of RDB file */ int rdb_compression; /* Use compression in RDB? */ int rdb_checksum; /* Use RDB checksum? */ + int disable_sync_crc; /* Use RDB checksum during sync? Applicable only for TLS enabled diskless sync */ int rdb_del_sync_files; /* Remove RDB files used only for SYNC if the instance does not use persistence. */ time_t lastsave; /* Unix time of last successful save */ @@ -2112,6 +2118,8 @@ struct valkeyServer { * when it receives an error on the replication stream */ int repl_ignore_disk_write_error; /* Configures whether replicas panic when unable to * persist writes to AOF. */ + int repl_meet_disable_crc_cond; /* Set to true only when replica meets all conditions for disabling CRC */ + /* The following two fields is where we store primary PSYNC replid/offset * while the PSYNC is in progress. At the end we'll copy the fields into * the server->primary client structure. */ diff --git a/tests/integration/disable-sync-crc.tcl b/tests/integration/disable-sync-crc.tcl new file mode 100644 index 0000000000..8bd9309861 --- /dev/null +++ b/tests/integration/disable-sync-crc.tcl @@ -0,0 +1,45 @@ +start_server {tags {"repl tls"} overrides {save {}}} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + foreach master_disable_crc {yes no} { + foreach replica_disable_crc {yes no} { + foreach mds {no yes} { + foreach sdl {disabled on-empty-db swapdb flush-before-load} { + test "CRC disabled sync - master:$master_disable_crc, replica:$replica_disable_crc, tls:$::tls, repl_diskless_sync:$mds, repl_diskless_load:$sdl" { + $master config set disable-sync-crc $master_disable_crc + $master config set repl-diskless-sync $mds + start_server {overrides {save {}}} { + set replica [srv 0 client] + + $replica config set disable-sync-crc $replica_disable_crc + $replica config set repl-diskless-load $sdl + + $replica replicaof $master_host $master_port + + wait_for_condition 50 100 { + [string match {*master_link_status:up*} [$replica info replication]] + } else { + fail "Replication not started" + } + set is_master_crc_disabled [string match {*total_crc_disabled_syncs_started:1*} [$master info stats]] + set is_replica_crc_disabled [string match {*total_crc_disabled_syncs_started:1*} [$replica info stats]] + + if {$replica_disable_crc eq "no" || $sdl eq "disabled" || $mds eq "no" || !$::tls} { + assert_equal 0 $is_master_crc_disabled "Master should not have CRC disabled" + assert_equal 0 $is_replica_crc_disabled "Replica should not have CRC disabled" + } else { + if {$replica_disable_crc eq "no"} { + assert_equal 0 $is_master_crc_disabled "Master should not have CRC disabled" + } else { + assert_equal 1 $is_master_crc_disabled "Master should have CRC disabled" + } + assert_equal 1 $is_replica_crc_disabled "Replica should have CRC disabled" + } + } + } + } + } + } + } +} From 2e5314eec06af5a844e3aea7d9172d026bae9a26 Mon Sep 17 00:00:00 2001 From: Tal Shachar Date: Sun, 29 Dec 2024 16:21:37 +0000 Subject: [PATCH 2/8] Adressing comments on first commit: changed disable_sync_crc naming to bypass_crc, encapsulated condition checks for skipping CRC, and chenged connIsTLS condition to connIntegrityChecked in ConnectionType. Some changes in the test as well Signed-off-by: Tal Shachar --- src/config.c | 1 - src/connection.h | 7 ++++ src/rdb.c | 20 ++++++------ src/rdma.c | 3 ++ src/replication.c | 37 +++++++++++---------- src/rio.c | 2 +- src/rio.h | 2 +- src/server.c | 6 ++-- src/server.h | 9 +++--- src/socket.c | 3 ++ src/tls.c | 7 ++++ src/unix.c | 3 ++ tests/integration/bypass-crc.tcl | 37 +++++++++++++++++++++ tests/integration/disable-sync-crc.tcl | 45 -------------------------- 14 files changed, 97 insertions(+), 85 deletions(-) create mode 100644 tests/integration/bypass-crc.tcl delete mode 100644 tests/integration/disable-sync-crc.tcl diff --git a/src/config.c b/src/config.c index 98fb0c63ef..e1cee3f95b 100644 --- a/src/config.c +++ b/src/config.c @@ -3156,7 +3156,6 @@ static int applyClientMaxMemoryUsage(const char **err) { standardConfig static_configs[] = { /* Bool configs */ createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL), - createBoolConfig("disable-sync-crc", NULL, MODIFIABLE_CONFIG, server.disable_sync_crc, 0, NULL, NULL), createBoolConfig("daemonize", NULL, IMMUTABLE_CONFIG, server.daemonize, 0, NULL, NULL), createBoolConfig("always-show-logo", NULL, IMMUTABLE_CONFIG, server.always_show_logo, 0, NULL, NULL), createBoolConfig("protected-mode", NULL, MODIFIABLE_CONFIG, server.protected_mode, 1, NULL, NULL), diff --git a/src/connection.h b/src/connection.h index fd7e0910cf..2716cb6401 100644 --- a/src/connection.h +++ b/src/connection.h @@ -121,6 +121,9 @@ typedef struct ConnectionType { /* TLS specified methods */ sds (*get_peer_cert)(struct connection *conn); + + /* Miselenious */ + int (*connIntegrityChecked)(void); // return 1 iff connection type has built-in integrity checks } ConnectionType; struct connection { @@ -483,4 +486,8 @@ static inline void connSetPostponeUpdateState(connection *conn, int on) { } } +static inline int connIsIntegrityChecked(connection *conn) { + return conn->type->connIntegrityChecked && conn->type->connIntegrityChecked(); +} + #endif /* __REDIS_CONNECTION_H */ diff --git a/src/rdb.c b/src/rdb.c index fa325102c7..636574420f 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3010,7 +3010,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin int error; long long empty_keys_skipped = 0; - if (rdb->flags & RIO_FLAG_DISABLE_CRC) server.stat_total_crc_disabled_syncs_stated++; + if (rdb->flags & RIO_FLAG_BYPASS_CRC) server.stat_total_sync_bypass_crc++; rdb->update_cksum = rdbLoadProgressCallback; rdb->max_processing_chunk = server.loading_process_events_interval_bytes; if (rioRead(rdb, buf, 9) == 0) goto eoferr; @@ -3355,7 +3355,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin if (rioRead(rdb, &cksum, 8) == 0) goto eoferr; if (server.rdb_checksum && !server.skip_checksum_validation) { memrev64ifbe(&cksum); - if (cksum == 0 || (rdb->flags & RIO_FLAG_DISABLE_CRC) != 0) { + if (cksum == 0 || (rdb->flags & RIO_FLAG_BYPASS_CRC)) { serverLog(LL_NOTICE, "RDB file was saved with checksum disabled: no check performed."); } else if (cksum != expected) { serverLog(LL_WARNING, @@ -3548,9 +3548,9 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { } /* * For replicas with repl_state == REPLICA_STATE_WAIT_BGSAVE_END and replica_req == req: - * Check replica capabilities, if every replica supports disabled CRC, run with CRC disabled, otherwise, use CRC. + * Check replica capabilities, if every replica supports bypassing CRC, primary should also bypass CRC, otherwise, use CRC. */ - int disable_sync_crc_capa = server.disable_sync_crc; + int bypass_crc_capa = server.bypass_crc; /* Collect the connections of the replicas we want to transfer * the RDB to, which are in WAIT_BGSAVE_START state. */ int connsnum = 0; @@ -3585,9 +3585,9 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset()); } - // do not disable CRC on the primary if TLS is disabled or if the replica doesn't support it - if (!connIsTLS(replica->conn) || (replica->replica_capa & REPLICA_CAPA_DISABLE_SYNC_CRC) == 0) - disable_sync_crc_capa = 0; + // do not bypass CRC on the primary if TLS is disabled or if the replica doesn't support it + if (!connIsIntegrityChecked(replica->conn) || !(replica->replica_capa & REPLICA_CAPA_BYPASS_CRC)) + bypass_crc_capa = 0; } @@ -3612,10 +3612,10 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { } serverSetCpuAffinity(server.bgsave_cpulist); - if (disable_sync_crc_capa == 1) { + if (bypass_crc_capa) { serverLog(LL_NOTICE, "CRC checksum is disabled for this RDB transfer"); // mark rdb object to skip CRC checksum calculations - rdb.flags |= RIO_FLAG_DISABLE_CRC; + rdb.flags |= RIO_FLAG_BYPASS_CRC; } retval = rdbSaveRioWithEOFMark(req, &rdb, NULL, rsi); @@ -3683,7 +3683,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { } } if (!dual_channel) close(safe_to_exit_pipe); - if (disable_sync_crc_capa) server.stat_total_crc_disabled_syncs_stated++; + if (bypass_crc_capa) server.stat_total_sync_bypass_crc++; return (childpid == -1) ? C_ERR : C_OK; } return C_OK; /* Unreached. */ diff --git a/src/rdma.c b/src/rdma.c index 7fe65ad2d2..d2aca8b708 100644 --- a/src/rdma.c +++ b/src/rdma.c @@ -1817,6 +1817,9 @@ static ConnectionType CT_RDMA = { .process_pending_data = rdmaProcessPendingData, .postpone_update_state = postPoneUpdateRdmaState, .update_state = updateRdmaState, + + /* Miselenious */ + .connIntegrityChecked = NULL, }; ConnectionType *connectionTypeRdma(void) { diff --git a/src/replication.c b/src/replication.c index fd70215dd9..98d956c478 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1244,12 +1244,12 @@ void syncCommand(client *c) { * the primary can accurately lists replicas and their listening ports in the * INFO output. * - * - capa + * - capa * What is the capabilities of this instance. * eof: supports EOF-style RDB transfer for diskless replication. * psync2: supports PSYNC v2, so understands +CONTINUE . * dual-channel: supports full sync using rdb channel. - * disable_sync_crc: supports disabling CRC during TLS enabled diskless sync. + * bypass-crc: supports skipping CRC calculations during TLS enabled diskless sync. * * - ack [fack ] * Replica informs the primary the amount of replication stream that it @@ -1315,8 +1315,8 @@ void replconfCommand(client *c) { /* If dual-channel is disable on this primary, treat this command as unrecognized * replconf option. */ c->replica_capa |= REPLICA_CAPA_DUAL_CHANNEL; - } else if (!strcasecmp(c->argv[j + 1]->ptr, REPLICA_CAPA_DISABLE_SYNC_CRC_STR)) - c->replica_capa |= REPLICA_CAPA_DISABLE_SYNC_CRC; + } else if (!strcasecmp(c->argv[j + 1]->ptr, REPLICA_CAPA_BYPASS_CRC_STR)) + c->replica_capa |= REPLICA_CAPA_BYPASS_CRC; } else if (!strcasecmp(c->argv[j]->ptr, "ack")) { /* REPLCONF ACK is used by replica to inform the primary the amount * of replication stream that it processed so far. It is an @@ -1974,6 +1974,11 @@ static int useDisklessLoad(void) { return enabled; } +/* Returns 1 if the replica can skip CRC calculations during full sync */ +int replicationBypassCRC(connection *conn, int is_replica_diskless_load, int is_primary_diskless_sync) { + return server.bypass_crc && is_replica_diskless_load && is_primary_diskless_sync && connIsIntegrityChecked(conn); +} + /* Helper function for readSyncBulkPayload() to initialize tempDb * before socket-loading the new db from primary. The tempDb may be populated * by swapMainDbWithTempDb or freed by disklessLoadDiscardTempDb later. */ @@ -2087,11 +2092,6 @@ void readSyncBulkPayload(connection *conn) { (long long)server.repl_transfer_size, use_diskless_load ? "to parser" : "to disk"); } - // Set a flag to determin later whether or not the replica will skip CRC calculations for this sync - - // Disable CRC on replica if: (1) TLS is enabled; (2) replica disable_sync_crc is enabled; (3) diskelss sync enabled on both replica and primary. - // Otherwise, CRC should be enabled/disabled as per server.rdb_checksum - if (connIsTLS(conn) && server.disable_sync_crc && use_diskless_load && usemark) - server.repl_meet_disable_crc_cond = 1; return; } @@ -2259,7 +2259,7 @@ void readSyncBulkPayload(connection *conn) { serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading); - if (server.repl_meet_disable_crc_cond == 1) rdb.flags |= RIO_FLAG_DISABLE_CRC; + if (replicationBypassCRC(conn, use_diskless_load, usemark)) rdb.flags |= RIO_FLAG_BYPASS_CRC; int loadingFailed = 0; rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx}; @@ -3523,17 +3523,16 @@ void syncWithPrimary(connection *conn) { * * EOF: supports EOF-style RDB transfer for diskless replication. * PSYNC2: supports PSYNC v2, so understands +CONTINUE . - * DISABLE-SYNC-CRC: supports disabling CRC calculations during full sync. - * Inform the primary of this capa only during diskless sync with TLS enabled. - * In disk-based sync, or non-TLS, there is more concern for data corruprion - * so we keep this extra layer of detection. - * + * BYPASS-CRC: supports skipping CRC calculations during full sync. + * Inform the primary of this capa only during diskless sync with TLS enabled. + * In disk-based sync, or non-TLS, there is more concern for data corruprion + * so we keep this extra layer of detection. + * * The primary will ignore capabilities it does not understand. */ - server.repl_meet_disable_crc_cond = 0; // reset this value before sync starts - int send_disable_crc_capa = (connIsTLS(conn) && server.disable_sync_crc && useDisklessLoad()); + int send_bypass_crc_capa = replicationBypassCRC(conn, useDisklessLoad(), 1); err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2", - send_disable_crc_capa ? "capa" : "", - send_disable_crc_capa ? REPLICA_CAPA_DISABLE_SYNC_CRC_STR : "", + send_bypass_crc_capa ? "capa" : "", + send_bypass_crc_capa ? REPLICA_CAPA_BYPASS_CRC_STR : "", server.dual_channel_replication ? "capa" : "", server.dual_channel_replication ? "dual-channel" : "", NULL); if (err) goto write_error; diff --git a/src/rio.c b/src/rio.c index 2620669f6e..a1638d589b 100644 --- a/src/rio.c +++ b/src/rio.c @@ -425,7 +425,7 @@ void rioFreeFd(rio *r) { /* This function can be installed both in memory and file streams when checksum * computation is needed. */ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) { - if ((r->flags & RIO_FLAG_DISABLE_CRC) != 0) return; // skip CRC64 calculations + if ((r->flags & RIO_FLAG_BYPASS_CRC) != 0) return; // skip CRC64 calculations r->cksum = crc64(r->cksum, buf, len); } diff --git a/src/rio.h b/src/rio.h index ad2772d55d..8d78855d59 100644 --- a/src/rio.h +++ b/src/rio.h @@ -39,7 +39,7 @@ #define RIO_FLAG_READ_ERROR (1 << 0) #define RIO_FLAG_WRITE_ERROR (1 << 1) -#define RIO_FLAG_DISABLE_CRC (1 << 2) +#define RIO_FLAG_BYPASS_CRC (1 << 2) #define RIO_TYPE_FILE (1 << 0) #define RIO_TYPE_BUFFER (1 << 1) diff --git a/src/server.c b/src/server.c index b8cd0ae942..a87dad9ea2 100644 --- a/src/server.c +++ b/src/server.c @@ -2218,7 +2218,7 @@ void initServerConfig(void) { server.fsynced_reploff_pending = 0; server.rdb_client_id = -1; server.loading_process_events_interval_ms = LOADING_PROCESS_EVENTS_INTERVAL_DEFAULT; - server.repl_meet_disable_crc_cond = 0; + server.bypass_crc = 1; /* Replication partial resync backlog */ server.repl_backlog = NULL; @@ -2639,7 +2639,7 @@ void resetServerStats(void) { server.stat_fork_rate = 0; server.stat_total_forks = 0; server.stat_rejected_conn = 0; - server.stat_total_crc_disabled_syncs_stated = 0; + server.stat_total_sync_bypass_crc = 0; server.stat_sync_full = 0; server.stat_sync_partial_ok = 0; server.stat_sync_partial_err = 0; @@ -5880,7 +5880,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) { "instantaneous_input_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION) / 1024, "instantaneous_output_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT_REPLICATION) / 1024, "rejected_connections:%lld\r\n", server.stat_rejected_conn, - "total_crc_disabled_syncs_stated:%ld\r\n", server.stat_total_crc_disabled_syncs_stated, + "total_sync_bypass_crc:%ld\r\n", server.stat_total_sync_bypass_crc, "sync_full:%lld\r\n", server.stat_sync_full, "sync_partial_ok:%lld\r\n", server.stat_sync_partial_ok, "sync_partial_err:%lld\r\n", server.stat_sync_partial_err, diff --git a/src/server.h b/src/server.h index 3bbc222225..f6e3029afd 100644 --- a/src/server.h +++ b/src/server.h @@ -438,10 +438,10 @@ typedef enum { #define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */ #define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */ #define REPLICA_CAPA_DUAL_CHANNEL (1 << 2) /* Supports dual channel replication sync */ -#define REPLICA_CAPA_DISABLE_SYNC_CRC (1 << 3) /* Disable CRC checks for sync requests. */ +#define REPLICA_CAPA_BYPASS_CRC (1 << 3) /* Supports bypassing CRC checks for sync requests. */ /* Replica capability strings */ -#define REPLICA_CAPA_DISABLE_SYNC_CRC_STR "disable-sync-crc" /* Disable CRC calculations during full sync */ +#define REPLICA_CAPA_BYPASS_CRC_STR "bypass-crc" /* Supports bypassing CRC checks for sync requests. */ /* Replica requirements */ #define REPLICA_REQ_NONE 0 @@ -1842,7 +1842,7 @@ struct valkeyServer { double stat_fork_rate; /* Fork rate in GB/sec. */ long long stat_total_forks; /* Total count of fork. */ long long stat_rejected_conn; /* Clients rejected because of maxclients */ - size_t stat_total_crc_disabled_syncs_stated; /* Total number of full syncs stated with CRC checksum disabled */ // AMZN + size_t stat_total_sync_bypass_crc; /* Total number of full syncs stated with CRC checksum bypassed */ long long stat_sync_full; /* Number of full resyncs with replicas. */ long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */ long long stat_sync_partial_err; /* Number of unaccepted PSYNC requests. */ @@ -1991,7 +1991,7 @@ struct valkeyServer { char *rdb_filename; /* Name of RDB file */ int rdb_compression; /* Use compression in RDB? */ int rdb_checksum; /* Use RDB checksum? */ - int disable_sync_crc; /* Use RDB checksum during sync? Applicable only for TLS enabled diskless sync */ + int bypass_crc; /* Skip RDB checksum? Applicable only for TLS enabled diskless full sync */ int rdb_del_sync_files; /* Remove RDB files used only for SYNC if the instance does not use persistence. */ time_t lastsave; /* Unix time of last successful save */ @@ -2118,7 +2118,6 @@ struct valkeyServer { * when it receives an error on the replication stream */ int repl_ignore_disk_write_error; /* Configures whether replicas panic when unable to * persist writes to AOF. */ - int repl_meet_disable_crc_cond; /* Set to true only when replica meets all conditions for disabling CRC */ /* The following two fields is where we store primary PSYNC replid/offset * while the PSYNC is in progress. At the end we'll copy the fields into diff --git a/src/socket.c b/src/socket.c index d89e6c8767..2cb5bc918f 100644 --- a/src/socket.c +++ b/src/socket.c @@ -437,6 +437,9 @@ static ConnectionType CT_Socket = { .process_pending_data = NULL, .postpone_update_state = NULL, .update_state = NULL, + + /* Miselenious */ + .connIntegrityChecked = NULL, }; int connBlock(connection *conn) { diff --git a/src/tls.c b/src/tls.c index 11e6143561..3dc42f209f 100644 --- a/src/tls.c +++ b/src/tls.c @@ -814,6 +814,10 @@ static int connTLSListen(connListener *listener) { return listenToPort(listener); } +static int connTLSIsIntegrityChecked(void) { + return 1; +} + static void connTLSCloseListener(connListener *listener) { connectionTypeTcp()->closeListener(listener); } @@ -1186,6 +1190,9 @@ static ConnectionType CT_TLS = { /* TLS specified methods */ .get_peer_cert = connTLSGetPeerCert, + + /* Miselenious */ + .connIntegrityChecked = connTLSIsIntegrityChecked, }; int RedisRegisterConnectionTypeTLS(void) { diff --git a/src/unix.c b/src/unix.c index 86df05bd52..76ab8f23bd 100644 --- a/src/unix.c +++ b/src/unix.c @@ -207,6 +207,9 @@ static ConnectionType CT_Unix = { .process_pending_data = NULL, .postpone_update_state = NULL, .update_state = NULL, + + /* Miselenious */ + .connIntegrityChecked = NULL, }; int RedisRegisterConnectionTypeUnix(void) { diff --git a/tests/integration/bypass-crc.tcl b/tests/integration/bypass-crc.tcl new file mode 100644 index 0000000000..d07c2e6667 --- /dev/null +++ b/tests/integration/bypass-crc.tcl @@ -0,0 +1,37 @@ +start_server {tags {"repl tls"} overrides {save {}}} { + set primary [srv 0 client] + set primary_host [srv 0 host] + set primary_port [srv 0 port] + set primary_bypassed_crc_counter 0 + foreach mds {no yes} { + foreach sdl {disabled on-empty-db swapdb flush-before-load} { + test "Bypass CRC sync - tls:$::tls, repl_diskless_sync:$mds, repl_diskless_load:$sdl" { + $primary config set repl-diskless-sync $mds + start_server {overrides {save {}}} { + set replica [srv 0 client] + $replica config set repl-diskless-load $sdl + $replica replicaof $primary_host $primary_port + + wait_for_condition 50 100 { + [string match {*master_link_status:up*} [$replica info replication]] + } else { + fail "Replication not started" + } + + set replica_bypassing_crc_count [string match {*total_sync_bypass_crc:1*} [$replica info stats]] + set stats [regexp -inline {total_sync_bypass_crc:(\d+)} [$primary info stats]] + set primary_bypass_crc_count [lindex $stats 1] + + if {$sdl eq "disabled" || $mds eq "no" || !$::tls} { + assert_equal $primary_bypassed_crc_counter $primary_bypass_crc_count "Primary should not bypass CRC in this scenario" + assert_equal 0 $replica_bypassing_crc_count "Replica should not bypass CRC in this scenario" + } else { + incr primary_bypassed_crc_counter + assert_equal $primary_bypassed_crc_counter $primary_bypass_crc_count "Primary should bypass CRC in this scenario" + assert_equal 1 $replica_bypassing_crc_count "Replica should bypass CRC in this scenario" + } + } + } + } + } +} diff --git a/tests/integration/disable-sync-crc.tcl b/tests/integration/disable-sync-crc.tcl deleted file mode 100644 index 8bd9309861..0000000000 --- a/tests/integration/disable-sync-crc.tcl +++ /dev/null @@ -1,45 +0,0 @@ -start_server {tags {"repl tls"} overrides {save {}}} { - set master [srv 0 client] - set master_host [srv 0 host] - set master_port [srv 0 port] - foreach master_disable_crc {yes no} { - foreach replica_disable_crc {yes no} { - foreach mds {no yes} { - foreach sdl {disabled on-empty-db swapdb flush-before-load} { - test "CRC disabled sync - master:$master_disable_crc, replica:$replica_disable_crc, tls:$::tls, repl_diskless_sync:$mds, repl_diskless_load:$sdl" { - $master config set disable-sync-crc $master_disable_crc - $master config set repl-diskless-sync $mds - start_server {overrides {save {}}} { - set replica [srv 0 client] - - $replica config set disable-sync-crc $replica_disable_crc - $replica config set repl-diskless-load $sdl - - $replica replicaof $master_host $master_port - - wait_for_condition 50 100 { - [string match {*master_link_status:up*} [$replica info replication]] - } else { - fail "Replication not started" - } - set is_master_crc_disabled [string match {*total_crc_disabled_syncs_started:1*} [$master info stats]] - set is_replica_crc_disabled [string match {*total_crc_disabled_syncs_started:1*} [$replica info stats]] - - if {$replica_disable_crc eq "no" || $sdl eq "disabled" || $mds eq "no" || !$::tls} { - assert_equal 0 $is_master_crc_disabled "Master should not have CRC disabled" - assert_equal 0 $is_replica_crc_disabled "Replica should not have CRC disabled" - } else { - if {$replica_disable_crc eq "no"} { - assert_equal 0 $is_master_crc_disabled "Master should not have CRC disabled" - } else { - assert_equal 1 $is_master_crc_disabled "Master should have CRC disabled" - } - assert_equal 1 $is_replica_crc_disabled "Replica should have CRC disabled" - } - } - } - } - } - } - } -} From dd180dd9a08b53a78b6ec0b17d208f4a0c598d31 Mon Sep 17 00:00:00 2001 From: Tal Shachar Date: Thu, 2 Jan 2025 12:50:34 +0000 Subject: [PATCH 3/8] Removed bypass_crc flag from server, renaming and comments changes Signed-off-by: Tal Shachar --- src/connection.h | 2 +- src/rdb.c | 4 ++-- src/replication.c | 27 +++++++++++++++++---------- src/server.c | 1 - src/server.h | 1 - 5 files changed, 20 insertions(+), 15 deletions(-) diff --git a/src/connection.h b/src/connection.h index 2716cb6401..6202a47328 100644 --- a/src/connection.h +++ b/src/connection.h @@ -123,7 +123,7 @@ typedef struct ConnectionType { sds (*get_peer_cert)(struct connection *conn); /* Miselenious */ - int (*connIntegrityChecked)(void); // return 1 iff connection type has built-in integrity checks + int (*connIntegrityChecked)(void); // return 1 if connection type has built-in integrity checks } ConnectionType; struct connection { diff --git a/src/rdb.c b/src/rdb.c index 663009403f..1f60cd5303 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3563,7 +3563,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { * For replicas with repl_state == REPLICA_STATE_WAIT_BGSAVE_END and replica_req == req: * Check replica capabilities, if every replica supports bypassing CRC, primary should also bypass CRC, otherwise, use CRC. */ - int bypass_crc_capa = server.bypass_crc; + int bypass_crc_capa = 1; /* Collect the connections of the replicas we want to transfer * the RDB to, which are in WAIT_BGSAVE_START state. */ int connsnum = 0; @@ -3598,7 +3598,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset()); } - // do not bypass CRC on the primary if TLS is disabled or if the replica doesn't support it + // do not bypass CRC on the primary if connection doesn't have integrity check or if the replica doesn't support it if (!connIsIntegrityChecked(replica->conn) || !(replica->replica_capa & REPLICA_CAPA_BYPASS_CRC)) bypass_crc_capa = 0; diff --git a/src/replication.c b/src/replication.c index ce33fa8536..6fe4aab5f5 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1250,7 +1250,8 @@ void syncCommand(client *c) { * eof: supports EOF-style RDB transfer for diskless replication. * psync2: supports PSYNC v2, so understands +CONTINUE . * dual-channel: supports full sync using rdb channel. - * bypass-crc: supports skipping CRC calculations during TLS enabled diskless sync. + * bypass-crc: supports skipping CRC calculations during diskless sync using + * a connection that has integrity checks (such as TLS). * * - ack [fack ] * Replica informs the primary the amount of replication stream that it @@ -1976,8 +1977,8 @@ static int useDisklessLoad(void) { } /* Returns 1 if the replica can skip CRC calculations during full sync */ -int replicationBypassCRC(connection *conn, int is_replica_diskless_load, int is_primary_diskless_sync) { - return server.bypass_crc && is_replica_diskless_load && is_primary_diskless_sync && connIsIntegrityChecked(conn); +int replicationSupportBypassCRC(connection *conn, int is_replica_diskless_load, int is_primary_diskless_sync) { + return is_replica_diskless_load && is_primary_diskless_sync && connIsIntegrityChecked(conn); } /* Helper function for readSyncBulkPayload() to initialize tempDb @@ -2092,7 +2093,6 @@ void readSyncBulkPayload(connection *conn) { serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: receiving %lld bytes from primary %s", (long long)server.repl_transfer_size, use_diskless_load ? "to parser" : "to disk"); } - return; } @@ -2260,8 +2260,14 @@ void readSyncBulkPayload(connection *conn) { serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading); - if (replicationBypassCRC(conn, use_diskless_load, usemark)) rdb.flags |= RIO_FLAG_BYPASS_CRC; - + if (replicationSupportBypassCRC(conn, use_diskless_load, usemark)) { + /* We can bypass CRC checks when data is transmitted through a verified stream. + * The usemark flag indicates that the primary is streaming the data directly without + * writing it to storage. + * Similarly, the use_diskless_load flag indicates that the + * replica will load the payload directly into memory without first writing it to disk. */ + rdb.flags |= RIO_FLAG_BYPASS_CRC; + } int loadingFailed = 0; rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx}; if (rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION, &rsi, &loadingCtx) != C_OK) { @@ -3522,12 +3528,13 @@ void syncWithPrimary(connection *conn) { * EOF: supports EOF-style RDB transfer for diskless replication. * PSYNC2: supports PSYNC v2, so understands +CONTINUE . * BYPASS-CRC: supports skipping CRC calculations during full sync. - * Inform the primary of this capa only during diskless sync with TLS enabled. - * In disk-based sync, or non-TLS, there is more concern for data corruprion - * so we keep this extra layer of detection. + * Inform the primary of this capa only during diskless sync using a + * connection that has integrity checks (such as TLS). + * In disk-based sync, or non-integrity-checked connection, there is more + * concern for data corruprion so we keep this extra layer of detection. * * The primary will ignore capabilities it does not understand. */ - int send_bypass_crc_capa = replicationBypassCRC(conn, useDisklessLoad(), 1); + int send_bypass_crc_capa = replicationSupportBypassCRC(conn, useDisklessLoad(), 1); err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2", send_bypass_crc_capa ? "capa" : "", send_bypass_crc_capa ? REPLICA_CAPA_BYPASS_CRC_STR : "", diff --git a/src/server.c b/src/server.c index 6e07506c08..d66403b6aa 100644 --- a/src/server.c +++ b/src/server.c @@ -2218,7 +2218,6 @@ void initServerConfig(void) { server.fsynced_reploff_pending = 0; server.rdb_client_id = -1; server.loading_process_events_interval_ms = LOADING_PROCESS_EVENTS_INTERVAL_DEFAULT; - server.bypass_crc = 1; server.loading_rio = NULL; /* Replication partial resync backlog */ diff --git a/src/server.h b/src/server.h index 28a5183e5d..81909ae5c8 100644 --- a/src/server.h +++ b/src/server.h @@ -1993,7 +1993,6 @@ struct valkeyServer { char *rdb_filename; /* Name of RDB file */ int rdb_compression; /* Use compression in RDB? */ int rdb_checksum; /* Use RDB checksum? */ - int bypass_crc; /* Skip RDB checksum? Applicable only for TLS enabled diskless full sync */ int rdb_del_sync_files; /* Remove RDB files used only for SYNC if the instance does not use persistence. */ time_t lastsave; /* Unix time of last successful save */ From 6126177862ca33e3919ee3ee64248ab75f0c2fac Mon Sep 17 00:00:00 2001 From: ranshid <88133677+ranshid@users.noreply.github.com> Date: Thu, 2 Jan 2025 18:23:12 +0200 Subject: [PATCH 4/8] Update src/rdb.c Signed-off-by: ranshid <88133677+ranshid@users.noreply.github.com> --- src/rdb.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rdb.c b/src/rdb.c index 1f60cd5303..06f38a9837 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3563,7 +3563,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { * For replicas with repl_state == REPLICA_STATE_WAIT_BGSAVE_END and replica_req == req: * Check replica capabilities, if every replica supports bypassing CRC, primary should also bypass CRC, otherwise, use CRC. */ - int bypass_crc_capa = 1; + int bypass_crc = 1; /* Collect the connections of the replicas we want to transfer * the RDB to, which are in WAIT_BGSAVE_START state. */ int connsnum = 0; From 5c343915488950bbdac0ea3f2049b74082a5f30a Mon Sep 17 00:00:00 2001 From: ranshid <88133677+ranshid@users.noreply.github.com> Date: Thu, 2 Jan 2025 18:23:26 +0200 Subject: [PATCH 5/8] Update src/rdb.c Signed-off-by: ranshid <88133677+ranshid@users.noreply.github.com> --- src/rdb.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rdb.c b/src/rdb.c index 06f38a9837..275b656a93 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3600,7 +3600,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { // do not bypass CRC on the primary if connection doesn't have integrity check or if the replica doesn't support it if (!connIsIntegrityChecked(replica->conn) || !(replica->replica_capa & REPLICA_CAPA_BYPASS_CRC)) - bypass_crc_capa = 0; + bypass_crc = 0; } From e481f73db934da6a3bb3221cef1732c113196ca9 Mon Sep 17 00:00:00 2001 From: ranshid <88133677+ranshid@users.noreply.github.com> Date: Thu, 2 Jan 2025 18:23:33 +0200 Subject: [PATCH 6/8] Update src/rdb.c Signed-off-by: ranshid <88133677+ranshid@users.noreply.github.com> --- src/rdb.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rdb.c b/src/rdb.c index 275b656a93..c170ef212b 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3625,7 +3625,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { } serverSetCpuAffinity(server.bgsave_cpulist); - if (bypass_crc_capa) { + if (bypass_crc) { serverLog(LL_NOTICE, "CRC checksum is disabled for this RDB transfer"); // mark rdb object to skip CRC checksum calculations rdb.flags |= RIO_FLAG_BYPASS_CRC; From 14eeb6f59a1f158ee6b2172e1b3286d828b353c6 Mon Sep 17 00:00:00 2001 From: ranshid <88133677+ranshid@users.noreply.github.com> Date: Thu, 2 Jan 2025 18:23:42 +0200 Subject: [PATCH 7/8] Update src/rdb.c Signed-off-by: ranshid <88133677+ranshid@users.noreply.github.com> --- src/rdb.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rdb.c b/src/rdb.c index c170ef212b..ede4fb215d 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3696,7 +3696,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { } } if (!dual_channel) close(safe_to_exit_pipe); - if (bypass_crc_capa) server.stat_total_sync_bypass_crc++; + if (bypass_crc) server.stat_total_sync_bypass_crc++; return (childpid == -1) ? C_ERR : C_OK; } return C_OK; /* Unreached. */ From 266cd65c294089df5ca6034e99291d9f36881512 Mon Sep 17 00:00:00 2001 From: Ran Shidlansik Date: Thu, 2 Jan 2025 18:48:40 +0200 Subject: [PATCH 8/8] fix format issues Signed-off-by: Ran Shidlansik --- src/connection.h | 2 +- src/rdb.c | 3 +-- src/replication.c | 14 +++++++------- src/server.h | 8 ++++---- 4 files changed, 13 insertions(+), 14 deletions(-) diff --git a/src/connection.h b/src/connection.h index 6202a47328..4b513ffccb 100644 --- a/src/connection.h +++ b/src/connection.h @@ -123,7 +123,7 @@ typedef struct ConnectionType { sds (*get_peer_cert)(struct connection *conn); /* Miselenious */ - int (*connIntegrityChecked)(void); // return 1 if connection type has built-in integrity checks + int (*connIntegrityChecked)(void); // return 1 if connection type has built-in integrity checks } ConnectionType; struct connection { diff --git a/src/rdb.c b/src/rdb.c index ede4fb215d..537bf89a4f 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3599,9 +3599,8 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { } // do not bypass CRC on the primary if connection doesn't have integrity check or if the replica doesn't support it - if (!connIsIntegrityChecked(replica->conn) || !(replica->replica_capa & REPLICA_CAPA_BYPASS_CRC)) + if (!connIsIntegrityChecked(replica->conn) || !(replica->replica_capa & REPLICA_CAPA_BYPASS_CRC)) bypass_crc = 0; - } /* Create the child process. */ diff --git a/src/replication.c b/src/replication.c index 6fe4aab5f5..c9a5b5c3cd 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1250,7 +1250,7 @@ void syncCommand(client *c) { * eof: supports EOF-style RDB transfer for diskless replication. * psync2: supports PSYNC v2, so understands +CONTINUE . * dual-channel: supports full sync using rdb channel. - * bypass-crc: supports skipping CRC calculations during diskless sync using + * bypass-crc: supports skipping CRC calculations during diskless sync using * a connection that has integrity checks (such as TLS). * * - ack [fack ] @@ -2261,10 +2261,10 @@ void readSyncBulkPayload(connection *conn) { serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading); if (replicationSupportBypassCRC(conn, use_diskless_load, usemark)) { - /* We can bypass CRC checks when data is transmitted through a verified stream. - * The usemark flag indicates that the primary is streaming the data directly without - * writing it to storage. - * Similarly, the use_diskless_load flag indicates that the + /* We can bypass CRC checks when data is transmitted through a verified stream. + * The usemark flag indicates that the primary is streaming the data directly without + * writing it to storage. + * Similarly, the use_diskless_load flag indicates that the * replica will load the payload directly into memory without first writing it to disk. */ rdb.flags |= RIO_FLAG_BYPASS_CRC; } @@ -3528,9 +3528,9 @@ void syncWithPrimary(connection *conn) { * EOF: supports EOF-style RDB transfer for diskless replication. * PSYNC2: supports PSYNC v2, so understands +CONTINUE . * BYPASS-CRC: supports skipping CRC calculations during full sync. - * Inform the primary of this capa only during diskless sync using a + * Inform the primary of this capa only during diskless sync using a * connection that has integrity checks (such as TLS). - * In disk-based sync, or non-integrity-checked connection, there is more + * In disk-based sync, or non-integrity-checked connection, there is more * concern for data corruprion so we keep this extra layer of detection. * * The primary will ignore capabilities it does not understand. */ diff --git a/src/server.h b/src/server.h index 81909ae5c8..ba6cfcb940 100644 --- a/src/server.h +++ b/src/server.h @@ -437,10 +437,10 @@ typedef enum { /* Replica capability flags */ #define REPLICA_CAPA_NONE 0 -#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */ -#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */ -#define REPLICA_CAPA_DUAL_CHANNEL (1 << 2) /* Supports dual channel replication sync */ -#define REPLICA_CAPA_BYPASS_CRC (1 << 3) /* Supports bypassing CRC checks for sync requests. */ +#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */ +#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */ +#define REPLICA_CAPA_DUAL_CHANNEL (1 << 2) /* Supports dual channel replication sync */ +#define REPLICA_CAPA_BYPASS_CRC (1 << 3) /* Supports bypassing CRC checks for sync requests. */ /* Replica capability strings */ #define REPLICA_CAPA_BYPASS_CRC_STR "bypass-crc" /* Supports bypassing CRC checks for sync requests. */