Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CRC removal during diskless full sync with TLS enabled. #1479

Open
wants to merge 11 commits into
base: unstable
Choose a base branch
from
7 changes: 7 additions & 0 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ typedef struct ConnectionType {

/* TLS specified methods */
sds (*get_peer_cert)(struct connection *conn);

/* Miselenious */
int (*connIntegrityChecked)(void); // return 1 if connection type has built-in integrity checks
} ConnectionType;

struct connection {
Expand Down Expand Up @@ -483,4 +486,8 @@ static inline void connSetPostponeUpdateState(connection *conn, int on) {
}
}

static inline int connIsIntegrityChecked(connection *conn) {
return conn->type->connIntegrityChecked && conn->type->connIntegrityChecked();
}

#endif /* __REDIS_CONNECTION_H */
22 changes: 20 additions & 2 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -3023,6 +3023,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
int error;
long long empty_keys_skipped = 0;

if (rdb->flags & RIO_FLAG_BYPASS_CRC) server.stat_total_sync_bypass_crc++;
rdb->update_cksum = rdbLoadProgressCallback;
rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
if (rioRead(rdb, buf, 9) == 0) goto eoferr;
Expand Down Expand Up @@ -3367,7 +3368,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
if (rioRead(rdb, &cksum, 8) == 0) goto eoferr;
if (server.rdb_checksum && !server.skip_checksum_validation) {
memrev64ifbe(&cksum);
if (cksum == 0) {
if (cksum == 0 || (rdb->flags & RIO_FLAG_BYPASS_CRC)) {
serverLog(LL_NOTICE, "RDB file was saved with checksum disabled: no check performed.");
} else if (cksum != expected) {
serverLog(LL_WARNING,
Expand Down Expand Up @@ -3558,8 +3559,13 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
safe_to_exit_pipe = pipefds[0]; /* read end */
server.rdb_child_exit_pipe = pipefds[1]; /* write end */
}
/*
* For replicas with repl_state == REPLICA_STATE_WAIT_BGSAVE_END and replica_req == req:
* Check replica capabilities, if every replica supports bypassing CRC, primary should also bypass CRC, otherwise, use CRC.
*/
int bypass_crc = 1;
/* Collect the connections of the replicas we want to transfer
* the RDB to, which are i WAIT_BGSAVE_START state. */
* the RDB to, which are in WAIT_BGSAVE_START state. */
int connsnum = 0;
connection **conns = zmalloc(sizeof(connection *) * listLength(server.replicas));
server.rdb_pipe_conns = NULL;
Expand Down Expand Up @@ -3591,6 +3597,11 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
replicationSetupReplicaForFullResync(replica, getPsyncInitialOffset());
}

// do not bypass CRC on the primary if connection doesn't have integrity check or if the replica doesn't support it
if (!connIsIntegrityChecked(replica->conn) || !(replica->replica_capa & REPLICA_CAPA_BYPASS_CRC))
bypass_crc = 0;

}

/* Create the child process. */
Expand All @@ -3614,6 +3625,12 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
serverSetCpuAffinity(server.bgsave_cpulist);

if (bypass_crc) {
serverLog(LL_NOTICE, "CRC checksum is disabled for this RDB transfer");
// mark rdb object to skip CRC checksum calculations
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// mark rdb object to skip CRC checksum calculations

This comment is just describing the next line, which seems very self explanatory to me.

rdb.flags |= RIO_FLAG_BYPASS_CRC;
}

retval = rdbSaveRioWithEOFMark(req, &rdb, NULL, rsi);
if (retval == C_OK && rioFlush(&rdb) == 0) retval = C_ERR;

Expand Down Expand Up @@ -3679,6 +3696,7 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
}
}
if (!dual_channel) close(safe_to_exit_pipe);
if (bypass_crc) server.stat_total_sync_bypass_crc++;
return (childpid == -1) ? C_ERR : C_OK;
}
return C_OK; /* Unreached. */
Expand Down
3 changes: 3 additions & 0 deletions src/rdma.c
Original file line number Diff line number Diff line change
Expand Up @@ -1817,6 +1817,9 @@ static ConnectionType CT_RDMA = {
.process_pending_data = rdmaProcessPendingData,
.postpone_update_state = postPoneUpdateRdmaState,
.update_state = updateRdmaState,

/* Miselenious */
.connIntegrityChecked = NULL,
};

ConnectionType *connectionTypeRdma(void) {
Expand Down
34 changes: 29 additions & 5 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1245,11 +1245,13 @@ void syncCommand(client *c) {
* the primary can accurately lists replicas and their listening ports in the
* INFO output.
*
* - capa <eof|psync2|dual-channel>
* - capa <eof|psync2|dual-channel|bypass-crc>
* What is the capabilities of this instance.
* eof: supports EOF-style RDB transfer for diskless replication.
* psync2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
* dual-channel: supports full sync using rdb channel.
* bypass-crc: supports skipping CRC calculations during diskless sync using
* a connection that has integrity checks (such as TLS).
*
* - ack <offset> [fack <aofofs>]
* Replica informs the primary the amount of replication stream that it
Expand Down Expand Up @@ -1315,7 +1317,8 @@ void replconfCommand(client *c) {
/* If dual-channel is disable on this primary, treat this command as unrecognized
* replconf option. */
c->replica_capa |= REPLICA_CAPA_DUAL_CHANNEL;
}
} else if (!strcasecmp(c->argv[j + 1]->ptr, REPLICA_CAPA_BYPASS_CRC_STR))
c->replica_capa |= REPLICA_CAPA_BYPASS_CRC;
} else if (!strcasecmp(c->argv[j]->ptr, "ack")) {
/* REPLCONF ACK is used by replica to inform the primary the amount
* of replication stream that it processed so far. It is an
Expand Down Expand Up @@ -1973,6 +1976,11 @@ static int useDisklessLoad(void) {
return enabled;
}

/* Returns 1 if the replica can skip CRC calculations during full sync */
int replicationSupportBypassCRC(connection *conn, int is_replica_diskless_load, int is_primary_diskless_sync) {
return is_replica_diskless_load && is_primary_diskless_sync && connIsIntegrityChecked(conn);
}

/* Helper function for readSyncBulkPayload() to initialize tempDb
* before socket-loading the new db from primary. The tempDb may be populated
* by swapMainDbWithTempDb or freed by disklessLoadDiscardTempDb later. */
Expand Down Expand Up @@ -2252,7 +2260,14 @@ void readSyncBulkPayload(connection *conn) {

serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading);

if (replicationSupportBypassCRC(conn, use_diskless_load, usemark)) {
/* We can bypass CRC checks when data is transmitted through a verified stream.
* The usemark flag indicates that the primary is streaming the data directly without
* writing it to storage.
* Similarly, the use_diskless_load flag indicates that the
* replica will load the payload directly into memory without first writing it to disk. */
rdb.flags |= RIO_FLAG_BYPASS_CRC;
}
int loadingFailed = 0;
rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx};
if (rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION, &rsi, &loadingCtx) != C_OK) {
Expand Down Expand Up @@ -2494,6 +2509,7 @@ char *sendCommand(connection *conn, ...) {
while (1) {
arg = va_arg(ap, char *);
if (arg == NULL) break;
if (strcmp(arg, "") == 0) continue;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few things things:

  1. Empty strings can be valid arguments, this might break some obscure functionality like if someone set a password like an empty string. (I don't think that is allowed today, but would like to be more resistant to weird edge cases. You can introduce an arbitrary static string that points to nothing to have the same behavior.
  2. If you continue with this path, you should use strncmp() where possible. You can actually just do if (arg[0] == '\0') as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO using some const indicator is not the correct way. Can we just use SendCommandArgv?

cmdargs = sdscatprintf(cmdargs, "$%zu\r\n%s\r\n", strlen(arg), arg);
argslen++;
}
Expand Down Expand Up @@ -3511,11 +3527,19 @@ void syncWithPrimary(connection *conn) {
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
* BYPASS-CRC: supports skipping CRC calculations during full sync.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BYPASS isn't the right word, since it implies we are doing something else instead. I think SKIP-CRC is what you want.

* Inform the primary of this capa only during diskless sync using a
* connection that has integrity checks (such as TLS).
* In disk-based sync, or non-integrity-checked connection, there is more
* concern for data corruprion so we keep this extra layer of detection.
*
* The primary will ignore capabilities it does not understand. */
int send_bypass_crc_capa = replicationSupportBypassCRC(conn, useDisklessLoad(), 1);
err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2",
server.dual_channel_replication ? "capa" : NULL,
server.dual_channel_replication ? "dual-channel" : NULL, NULL);
send_bypass_crc_capa ? "capa" : "",
send_bypass_crc_capa ? REPLICA_CAPA_BYPASS_CRC_STR : "",
server.dual_channel_replication ? "capa" : "",
server.dual_channel_replication ? "dual-channel" : "", NULL);
if (err) goto write_error;

/* Inform the primary of our (replica) version. */
Expand Down
1 change: 1 addition & 0 deletions src/rio.c
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ void rioFreeFd(rio *r) {
/* This function can be installed both in memory and file streams when checksum
* computation is needed. */
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) {
if ((r->flags & RIO_FLAG_BYPASS_CRC) != 0) return; // skip CRC64 calculations
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if ((r->flags & RIO_FLAG_BYPASS_CRC) != 0) return; // skip CRC64 calculations
if ((r->flags & RIO_FLAG_SKIP_CHECKSUM) != 0) return; // skip checksum

r->cksum = crc64(r->cksum, buf, len);
}

Expand Down
1 change: 1 addition & 0 deletions src/rio.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#define RIO_FLAG_READ_ERROR (1 << 0)
#define RIO_FLAG_WRITE_ERROR (1 << 1)
#define RIO_FLAG_CLOSE_ASAP (1 << 2) /* Rio was closed asynchronously during the current rio operation. */
#define RIO_FLAG_BYPASS_CRC (1 << 3)

#define RIO_TYPE_FILE (1 << 0)
#define RIO_TYPE_BUFFER (1 << 1)
Expand Down
2 changes: 2 additions & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2639,6 +2639,7 @@ void resetServerStats(void) {
server.stat_fork_rate = 0;
server.stat_total_forks = 0;
server.stat_rejected_conn = 0;
server.stat_total_sync_bypass_crc = 0;
server.stat_sync_full = 0;
server.stat_sync_partial_ok = 0;
server.stat_sync_partial_err = 0;
Expand Down Expand Up @@ -5879,6 +5880,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"instantaneous_input_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION) / 1024,
"instantaneous_output_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT_REPLICATION) / 1024,
"rejected_connections:%lld\r\n", server.stat_rejected_conn,
"total_sync_bypass_crc:%ld\r\n", server.stat_total_sync_bypass_crc,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"total_sync_bypass_crc:%ld\r\n", server.stat_total_sync_bypass_crc,
"total_sync_bypass_crc:%ld\r\n", server.stat_total_sync_bypass_crc,

What is the use of this metric? What are end users supposed to do with this information?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metric is merely for testing purposes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We generally prefer using log lines for testing purposes, since these info fields are part of our public facing contract and we can't change them.

"sync_full:%lld\r\n", server.stat_sync_full,
"sync_partial_ok:%lld\r\n", server.stat_sync_partial_ok,
"sync_partial_err:%lld\r\n", server.stat_sync_partial_err,
Expand Down
14 changes: 10 additions & 4 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,15 @@ typedef enum {
* a replica that only wants RDB without replication buffer */
#define REPLICA_STATE_BG_RDB_LOAD 11 /* Main channel of a replica which uses dual channel replication. */

/* Replica capabilities. */
/* Replica capability flags */
#define REPLICA_CAPA_NONE 0
#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */
#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */
#define REPLICA_CAPA_DUAL_CHANNEL (1 << 2) /* Supports dual channel replication sync */
#define REPLICA_CAPA_EOF (1 << 0) /* Can parse the RDB EOF streaming format. */
#define REPLICA_CAPA_PSYNC2 (1 << 1) /* Supports PSYNC2 protocol. */
#define REPLICA_CAPA_DUAL_CHANNEL (1 << 2) /* Supports dual channel replication sync */
#define REPLICA_CAPA_BYPASS_CRC (1 << 3) /* Supports bypassing CRC checks for sync requests. */

/* Replica capability strings */
#define REPLICA_CAPA_BYPASS_CRC_STR "bypass-crc" /* Supports bypassing CRC checks for sync requests. */

/* Replica requirements */
#define REPLICA_REQ_NONE 0
Expand Down Expand Up @@ -1840,6 +1844,7 @@ struct valkeyServer {
double stat_fork_rate; /* Fork rate in GB/sec. */
long long stat_total_forks; /* Total count of fork. */
long long stat_rejected_conn; /* Clients rejected because of maxclients */
size_t stat_total_sync_bypass_crc; /* Total number of full syncs stated with CRC checksum bypassed */
long long stat_sync_full; /* Number of full resyncs with replicas. */
long long stat_sync_partial_ok; /* Number of accepted PSYNC requests. */
long long stat_sync_partial_err; /* Number of unaccepted PSYNC requests. */
Expand Down Expand Up @@ -2115,6 +2120,7 @@ struct valkeyServer {
* when it receives an error on the replication stream */
int repl_ignore_disk_write_error; /* Configures whether replicas panic when unable to
* persist writes to AOF. */

/* The following two fields is where we store primary PSYNC replid/offset
* while the PSYNC is in progress. At the end we'll copy the fields into
* the server->primary client structure. */
Expand Down
3 changes: 3 additions & 0 deletions src/socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,9 @@ static ConnectionType CT_Socket = {
.process_pending_data = NULL,
.postpone_update_state = NULL,
.update_state = NULL,

/* Miselenious */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/* Miselenious */
/* Miscellaneous */

.connIntegrityChecked = NULL,
};

int connBlock(connection *conn) {
Expand Down
7 changes: 7 additions & 0 deletions src/tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,10 @@ static int connTLSListen(connListener *listener) {
return listenToPort(listener);
}

static int connTLSIsIntegrityChecked(void) {
return 1;
}

static void connTLSCloseListener(connListener *listener) {
connectionTypeTcp()->closeListener(listener);
}
Expand Down Expand Up @@ -1186,6 +1190,9 @@ static ConnectionType CT_TLS = {

/* TLS specified methods */
.get_peer_cert = connTLSGetPeerCert,

/* Miselenious */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/* Miselenious */
/* Miscellaneous */

.connIntegrityChecked = connTLSIsIntegrityChecked,
};

int RedisRegisterConnectionTypeTLS(void) {
Expand Down
3 changes: 3 additions & 0 deletions src/unix.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ static ConnectionType CT_Unix = {
.process_pending_data = NULL,
.postpone_update_state = NULL,
.update_state = NULL,

/* Miselenious */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/* Miselenious */
/* Miscellaneous */

.connIntegrityChecked = NULL,
};

int RedisRegisterConnectionTypeUnix(void) {
Expand Down
37 changes: 37 additions & 0 deletions tests/integration/bypass-crc.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
start_server {tags {"repl tls"} overrides {save {}}} {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only need to run these tests with TLS enabled, otherwise this condition is always false.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Shouldn't we have tests to check that without TLS we do not skip CRC?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just have one test for that though? do we really need the 6 that we have?

set primary [srv 0 client]
set primary_host [srv 0 host]
set primary_port [srv 0 port]
set primary_bypassed_crc_counter 0
foreach mds {no yes} {
foreach sdl {disabled on-empty-db swapdb flush-before-load} {
Comment on lines +6 to +7
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what either mds or sdl stand for, can we use more normal names?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the naming used in other tests I saw just to stay with format. Will change it

test "Bypass CRC sync - tls:$::tls, repl_diskless_sync:$mds, repl_diskless_load:$sdl" {
$primary config set repl-diskless-sync $mds
start_server {overrides {save {}}} {
set replica [srv 0 client]
$replica config set repl-diskless-load $sdl
$replica replicaof $primary_host $primary_port

wait_for_condition 50 100 {
[string match {*master_link_status:up*} [$replica info replication]]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[string match {*master_link_status:up*} [$replica info replication]]
[s 0 master_link_status] eq {up}

} else {
fail "Replication not started"
}

set replica_bypassing_crc_count [string match {*total_sync_bypass_crc:1*} [$replica info stats]]
set stats [regexp -inline {total_sync_bypass_crc:(\d+)} [$primary info stats]]
set primary_bypass_crc_count [lindex $stats 1]
Comment on lines +21 to +23
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
set replica_bypassing_crc_count [string match {*total_sync_bypass_crc:1*} [$replica info stats]]
set stats [regexp -inline {total_sync_bypass_crc:(\d+)} [$primary info stats]]
set primary_bypass_crc_count [lindex $stats 1]
set replica_bypassing_crc_count [s 0 total_sync_bypass_crc]
set primary_bypass_crc_count [s 1 total_sync_bypass_crc]


if {$sdl eq "disabled" || $mds eq "no" || !$::tls} {
assert_equal $primary_bypassed_crc_counter $primary_bypass_crc_count "Primary should not bypass CRC in this scenario"
assert_equal 0 $replica_bypassing_crc_count "Replica should not bypass CRC in this scenario"
} else {
incr primary_bypassed_crc_counter
assert_equal $primary_bypassed_crc_counter $primary_bypass_crc_count "Primary should bypass CRC in this scenario"
assert_equal 1 $replica_bypassing_crc_count "Replica should bypass CRC in this scenario"
}
}
}
}
}
}
Loading