Skip to content

Commit

Permalink
Refactor syncWithPrimary - move state machine logic into helper funct…
Browse files Browse the repository at this point in the history
…ions

Signed-off-by: Nitai Caro <[email protected]>
  • Loading branch information
Nitai Caro committed Dec 23, 2024
1 parent 3b04925 commit 4f0cb27
Showing 1 changed file with 190 additions and 132 deletions.
322 changes: 190 additions & 132 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ void replicationSteadyStateInit(void);
void dualChannelSetupMainConnForPsync(connection *conn);
void dualChannelSyncHandleRdbLoadCompletion(void);
static void dualChannelFullSyncWithPrimary(connection *conn);
void syncWithPrimary(connection *conn);

/* We take a global flag to remember if this instance generated an RDB
* because of replication, so that we can remove the RDB file in case
Expand Down Expand Up @@ -3339,6 +3340,171 @@ void dualChannelSetupMainConnForPsync(connection *conn) {
sdsfree(err);
}


int syncWithPrimaryHandleConnectingState(connection *conn, sds *err) {
serverLog(LL_NOTICE, "Non blocking connect for SYNC fired the event.");
/* Delete the writable event so that the readable event remains
* registered and we can wait for the PONG reply. */
connSetReadHandler(conn, syncWithPrimary);
connSetWriteHandler(conn, NULL);
/* Send the PING, don't check for errors at all, we have the timeout
* that will take care about this. */
*err = sendCommand(conn, "PING", NULL);
if (*err) return C_ERR;
return C_OK;
}

int syncWithPrimaryHandleReceivePingReplyState(connection *conn, sds *err) {
*err = receiveSynchronousResponse(conn);

/* The primary did not reply */
if (*err == NULL) return C_ERR;

/* We accept only two replies as valid, a positive +PONG reply
* (we just check for "+") or an authentication error.
* Note that older versions of Redis OSS replied with "operation not
* permitted" instead of using a proper error code, so we test
* both. */
if (*err[0] != '+' && strncmp(*err, "-NOAUTH", 7) != 0 && strncmp(*err, "-NOPERM", 7) != 0 &&
strncmp(*err, "-ERR operation not permitted", 28) != 0) {
serverLog(LL_WARNING, "Error reply to PING from primary: '%s'", *err);
return C_ERR;
} else {
serverLog(LL_NOTICE, "Primary replied to PING, replication can continue...");
}
return C_OK;
}

int syncWithPrimaryHandleSendHandshakeState(connection *conn, sds *err) {
/* AUTH with the primary if required. */
if (server.primary_auth) {
char *args[3] = {"AUTH", NULL, NULL};
size_t lens[3] = {4, 0, 0};
int argc = 1;
if (server.primary_user) {
args[argc] = server.primary_user;
lens[argc] = strlen(server.primary_user);
argc++;
}
args[argc] = server.primary_auth;
lens[argc] = sdslen(server.primary_auth);
argc++;
*err = sendCommandArgv(conn, argc, args, lens);
if (*err) return C_ERR;
}

/* Set the replica port, so that primary's INFO command can list the
* replica listening port correctly. */
{
sds portstr = getReplicaPortString();
*err = sendCommand(conn, "REPLCONF", "listening-port", portstr, NULL);
sdsfree(portstr);
if (*err) return C_ERR;
}

/* Set the replica ip, so that primary's INFO command can list the
* replica IP address port correctly in case of port forwarding or NAT.
* Skip REPLCONF ip-address if there is no replica-announce-ip option set. */
if (server.replica_announce_ip) {
*err = sendCommand(conn, "REPLCONF", "ip-address", server.replica_announce_ip, NULL);
if (*err) return C_ERR;
}

/* Inform the primary of our (replica) capabilities.
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
*
* The primary will ignore capabilities it does not understand. */
*err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2",
server.dual_channel_replication ? "capa" : NULL,
server.dual_channel_replication ? "dual-channel" : NULL, NULL);
if (*err) return C_ERR;

/* Inform the primary of our (replica) version. */
*err = sendCommand(conn, "REPLCONF", "version", VALKEY_VERSION, NULL);
if (*err) return C_ERR;
return C_OK;
}

int syncWithPrimaryHandleReceiveAuthReplyState(connection *conn, sds *err) {
if (server.primary_auth) {
*err = receiveSynchronousResponse(conn);
if (*err == NULL) return C_ERR;
if (*err[0] == '-') {
serverLog(LL_WARNING, "Unable to AUTH to PRIMARY: %s", *err);
return C_ERR;
}
}
return C_OK;
}

int syncWithPrimaryHandleReceivePortReplyState(connection *conn, sds *err) {
*err = receiveSynchronousResponse(conn);
if (*err == NULL) return C_ERR;
/* Ignore the error if any, not all the Redis OSS versions support
* REPLCONF listening-port. */
if (*err[0] == '-') {
serverLog(LL_NOTICE,
"(Non critical) Primary does not understand "
"REPLCONF listening-port: %s",
*err);
}
return C_OK;
}

int syncWithPrimaryHandleReceiveIPReplyState(connection *conn, sds *err) {
if (server.replica_announce_ip) {
*err = receiveSynchronousResponse(conn);
if (*err == NULL) return C_ERR;
/* Ignore the error if any, not all the Redis OSS versions support
* REPLCONF ip-address. */
if (*err[0] == '-') {
serverLog(LL_NOTICE,
"(Non critical) Primary does not understand "
"REPLCONF ip-address: %s",
*err);
}
}
return C_OK;
}

int syncWithPrimaryHandleReceiveCapaReplyState(connection *conn, sds *err) {
*err = receiveSynchronousResponse(conn);
if (*err == NULL) return C_ERR;
/* Ignore the error if any, not all the Redis OSS versions support
* REPLCONF capa. */
if (*err[0] == '-') {
serverLog(LL_NOTICE,
"(Non critical) Primary does not understand "
"REPLCONF capa: %s",
*err);
}
return C_OK;
}

int syncWithPrimaryHandleReceiveVersionReplyState(connection *conn, sds *err) {
*err = receiveSynchronousResponse(conn);
if (*err == NULL) return C_ERR;
/* Ignore the error if any. Valkey >= 8 supports REPLCONF VERSION. */
if (*err[0] == '-') {
serverLog(LL_NOTICE,
"(Non critical) Primary does not understand "
"REPLCONF VERSION: %s",
*err);
}
return C_OK;
}

int syncWithPrimaryHandleSendPsyncState(connection *conn, sds *err) {
if (replicaSendPsyncCommand(conn) == PSYNC_WRITE_ERROR) {
*err = sdsnew("Write error sending the PSYNC command.");
abortFailover("Write error to failover target");
return C_ERR;
}
return C_OK;
}

/*
* Dual channel for full sync
*
Expand Down Expand Up @@ -3419,7 +3585,7 @@ void dualChannelSetupMainConnForPsync(connection *conn) {
* establish a connection with the primary. */
void syncWithPrimary(connection *conn) {
char tmpfile[256], *err = NULL;
int psync_result;
int psync_result, ret;

/* If this event fired after the user turned the instance into a primary
* with REPLICAOF NO ONE we must just return ASAP. */
Expand All @@ -3437,163 +3603,59 @@ void syncWithPrimary(connection *conn) {
switch (server.repl_state) {
/* Send a PING to check the primary is able to reply without errors. */
case REPL_STATE_CONNECTING:
serverLog(LL_NOTICE, "Non blocking connect for SYNC fired the event.");
/* Delete the writable event so that the readable event remains
* registered and we can wait for the PONG reply. */
connSetReadHandler(conn, syncWithPrimary);
connSetWriteHandler(conn, NULL);
ret = syncWithPrimaryHandleConnectingState(conn, &err);
if (ret == C_ERR) goto write_error;
server.repl_state = REPL_STATE_RECEIVE_PING_REPLY;
/* Send the PING, don't check for errors at all, we have the timeout
* that will take care about this. */
err = sendCommand(conn, "PING", NULL);
if (err) goto write_error;
goto ok;
/* Receive the PONG command. */
case REPL_STATE_RECEIVE_PING_REPLY:
err = receiveSynchronousResponse(conn);

/* The primary did not reply */
if (err == NULL) goto no_response_error;

/* We accept only two replies as valid, a positive +PONG reply
* (we just check for "+") or an authentication error.
* Note that older versions of Redis OSS replied with "operation not
* permitted" instead of using a proper error code, so we test
* both. */
if (err[0] != '+' && strncmp(err, "-NOAUTH", 7) != 0 && strncmp(err, "-NOPERM", 7) != 0 &&
strncmp(err, "-ERR operation not permitted", 28) != 0) {
serverLog(LL_WARNING, "Error reply to PING from primary: '%s'", err);
ret = syncWithPrimaryHandleReceivePingReplyState(conn, &err);
if (ret == C_ERR) {
if (err == NULL) goto no_response_error;
goto error;
} else {
serverLog(LL_NOTICE, "Primary replied to PING, replication can continue...");
}
sdsfree(err);
if (err) sdsfree(err);
err = NULL;
server.repl_state = REPL_STATE_SEND_HANDSHAKE;
/* fall through */
case REPL_STATE_SEND_HANDSHAKE:
/* AUTH with the primary if required. */
if (server.primary_auth) {
char *args[3] = {"AUTH", NULL, NULL};
size_t lens[3] = {4, 0, 0};
int argc = 1;
if (server.primary_user) {
args[argc] = server.primary_user;
lens[argc] = strlen(server.primary_user);
argc++;
}
args[argc] = server.primary_auth;
lens[argc] = sdslen(server.primary_auth);
argc++;
err = sendCommandArgv(conn, argc, args, lens);
if (err) goto write_error;
}

/* Set the replica port, so that primary's INFO command can list the
* replica listening port correctly. */
{
sds portstr = getReplicaPortString();
err = sendCommand(conn, "REPLCONF", "listening-port", portstr, NULL);
sdsfree(portstr);
if (err) goto write_error;
}

/* Set the replica ip, so that primary's INFO command can list the
* replica IP address port correctly in case of port forwarding or NAT.
* Skip REPLCONF ip-address if there is no replica-announce-ip option set. */
if (server.replica_announce_ip) {
err = sendCommand(conn, "REPLCONF", "ip-address", server.replica_announce_ip, NULL);
if (err) goto write_error;
}

/* Inform the primary of our (replica) capabilities.
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
*
* The primary will ignore capabilities it does not understand. */
err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2",
server.dual_channel_replication ? "capa" : NULL,
server.dual_channel_replication ? "dual-channel" : NULL, NULL);
if (err) goto write_error;

/* Inform the primary of our (replica) version. */
err = sendCommand(conn, "REPLCONF", "version", VALKEY_VERSION, NULL);
if (err) goto write_error;

ret = syncWithPrimaryHandleSendHandshakeState(conn, &err);
if (ret == C_ERR) goto write_error;
server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY;
goto ok;
/* Receive AUTH reply. */
case REPL_STATE_RECEIVE_AUTH_REPLY:
if (server.primary_auth) {
err = receiveSynchronousResponse(conn);
ret = syncWithPrimaryHandleReceiveAuthReplyState(conn, &err);
if (ret == C_ERR) {
if (err == NULL) goto no_response_error;
if (err[0] == '-') {
serverLog(LL_WARNING, "Unable to AUTH to PRIMARY: %s", err);
goto error;
}
server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY;
goto ok;
goto error;
}
server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY;
if (server.primary_auth) goto ok;
/* fall through */
/* Receive REPLCONF listening-port reply. */
case REPL_STATE_RECEIVE_PORT_REPLY:
err = receiveSynchronousResponse(conn);
if (err == NULL) goto no_response_error;
/* Ignore the error if any, not all the Redis OSS versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
serverLog(LL_NOTICE,
"(Non critical) Primary does not understand "
"REPLCONF listening-port: %s",
err);
}
ret = syncWithPrimaryHandleReceivePortReplyState(conn, &err);
if (ret == C_ERR) goto no_response_error;
server.repl_state = REPL_STATE_RECEIVE_IP_REPLY;
goto ok;
/* Receive REPLCONF ip-address reply. */
case REPL_STATE_RECEIVE_IP_REPLY:
if (server.replica_announce_ip) {
err = receiveSynchronousResponse(conn);
if (err == NULL) goto no_response_error;
/* Ignore the error if any, not all the Redis OSS versions support
* REPLCONF ip-address. */
if (err[0] == '-') {
serverLog(LL_NOTICE,
"(Non critical) Primary does not understand "
"REPLCONF ip-address: %s",
err);
}
server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY;
goto ok;
}
ret = syncWithPrimaryHandleReceiveIPReplyState(conn, &err);
if (ret == C_ERR) goto no_response_error;
server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY;
if (server.replica_announce_ip) goto ok;
/* fall through */
/* Receive CAPA reply. */
case REPL_STATE_RECEIVE_CAPA_REPLY:
err = receiveSynchronousResponse(conn);
if (err == NULL) goto no_response_error;
/* Ignore the error if any, not all the Redis OSS versions support
* REPLCONF capa. */
if (err[0] == '-') {
serverLog(LL_NOTICE,
"(Non critical) Primary does not understand "
"REPLCONF capa: %s",
err);
}
ret = syncWithPrimaryHandleReceiveCapaReplyState(conn, &err);
if (ret == C_ERR) goto no_response_error;
server.repl_state = REPL_STATE_RECEIVE_VERSION_REPLY;
goto ok;
/* Receive VERSION reply. */
case REPL_STATE_RECEIVE_VERSION_REPLY:
err = receiveSynchronousResponse(conn);
if (err == NULL) goto no_response_error;
/* Ignore the error if any. Valkey >= 8 supports REPLCONF VERSION. */
if (err[0] == '-') {
serverLog(LL_NOTICE,
"(Non critical) Primary does not understand "
"REPLCONF VERSION: %s",
err);
}
ret = syncWithPrimaryHandleReceiveVersionReplyState(conn, &err);
if (ret == C_ERR) goto no_response_error;
server.repl_state = REPL_STATE_SEND_PSYNC;
/* fall through */
/* Try a partial resynchronization. If we don't have a cached primary
Expand All @@ -3602,14 +3664,10 @@ void syncWithPrimary(connection *conn) {
* and the global offset, to try a partial resync at the next
* reconnection attempt. */
case REPL_STATE_SEND_PSYNC:
if (replicaSendPsyncCommand(conn) == PSYNC_WRITE_ERROR) {
err = sdsnew("Write error sending the PSYNC command.");
abortFailover("Write error to failover target");
goto write_error;
}
ret = syncWithPrimaryHandleSendPsyncState(conn, &err);
if (ret == C_ERR) goto write_error;
server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY;
goto ok;

/* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC_REPLY. */
default:
if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) {
Expand Down

0 comments on commit 4f0cb27

Please sign in to comment.