Skip to content

Commit

Permalink
Implement: cluster repl noone (#10)
Browse files Browse the repository at this point in the history
* Implement cluster replicate no one

* Fix arg arity

* Fix arity of cluster replicate

* Fix crash
  • Loading branch information
John Sully authored and skolosov-snap committed Feb 5, 2025
1 parent 8d8ce19 commit 91589d1
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 30 deletions.
83 changes: 54 additions & 29 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -7029,48 +7029,73 @@ int clusterCommandSpecial(client *c) {
clusterDelNode(n);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "replicate") && c->argc == 3) {
} else if (!strcasecmp(c->argv[1]->ptr, "replicate") && (c->argc == 3 || c->argc == 4)) {
/* CLUSTER REPLICATE <NODE ID> */
/* Lookup the specified node in our table. */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
if (!n) {
addReplyErrorFormat(c, "Unknown node %s", (char *)c->argv[2]->ptr);
return 1;
clusterNode *n = NULL;
if (c->argc == 4) {
if (0 != strcasecmp(c->argv[2]->ptr,"NO") || 0 != strcasecmp(c->argv[3]->ptr,"ONE")) {
addReplySubcommandSyntaxError(c);
return 1;
}
} else {
n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
if (!n) {
addReplyErrorFormat(c, "Unknown node %s", (char *)c->argv[2]->ptr);
return 1;
}
}

/* I can't replicate myself. */
if (n == myself) {
addReplyError(c, "Can't replicate myself");
return 1;
}
if (n == NULL) {
if (nodeIsPrimary(myself)) {
addReply(c,shared.ok);
return 1;
}
serverLog(LL_NOTICE,"Stop replication and turning myself into empty primary.");
clusterSetNodeAsPrimary(myself);
if (server.primary != NULL) {
replicationUnsetPrimary();
}
int empty_db_flags = server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS;
emptyData(-1,empty_db_flags, NULL);
/* Reset manual failover state. */
resetManualFailover();
} else {
/* I can't replicate myself. */
if (n == myself) {
addReplyError(c, "Can't replicate myself");
return 1;
}

/* Can't replicate a replica. */
if (nodeIsReplica(n)) {
addReplyError(c, "I can only replicate a master, not a replica.");
return 1;
}
/* Can't replicate a replica. */
if (nodeIsReplica(n)) {
addReplyError(c, "I can only replicate a master, not a replica.");
return 1;
}

/* If the instance is currently a primary, it should have no assigned
* slots nor keys to accept to replicate some other node.
* Replicas can switch to another primary without issues. */
if (clusterNodeIsPrimary(myself) && (myself->numslots != 0 || kvstoreSize(server.db[0].keys) != 0)) {
addReplyError(c, "To set a master the node must be empty and "
"without assigned slots.");
return 1;
}
/* If the instance is currently a primary, it should have no assigned
* slots nor keys to accept to replicate some other node.
* Replicas can switch to another primary without issues. */
if (clusterNodeIsPrimary(myself) && (myself->numslots != 0 || kvstoreSize(server.db[0].keys) != 0)) {
addReplyError(c, "To set a master the node must be empty and "
"without assigned slots.");
return 1;
}

/* If `n` is already my primary, there is no need to re-establish the
* replication connection. */
if (myself->replicaof == n) {
addReply(c, shared.ok);
return 1;
/* If `n` is already my primary, there is no need to re-establish the
* replication connection. */
if (myself->replicaof == n) {
addReply(c, shared.ok);
return 1;
}
}

/* Set the primary.
* If the instance is a primary, it is an empty primary.
* If the instance is a replica, it had a totally different replication history.
* In these both cases, myself as a replica has to do a full sync. */
clusterSetPrimary(n, 1, 1);
if (n != NULL)
clusterSetPrimary(n, 1, 1);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_BROADCAST_ALL);
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "count-failure-reports") && c->argc == 3) {
Expand Down
2 changes: 1 addition & 1 deletion src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = {
{MAKE_CMD("myshardid","Returns the shard ID of a node.","O(1)","7.2.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_MYSHARDID_History,0,CLUSTER_MYSHARDID_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_MYSHARDID_Keyspecs,0,NULL,0)},
{MAKE_CMD("nodes","Returns the cluster configuration for a node.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_NODES_History,0,CLUSTER_NODES_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_NODES_Keyspecs,0,NULL,0)},
{MAKE_CMD("replicas","Lists the replica nodes of a primary node.","O(N) where N is the number of replicas.","5.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_REPLICAS_History,0,CLUSTER_REPLICAS_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_REPLICAS_Keyspecs,0,NULL,1),.args=CLUSTER_REPLICAS_Args},
{MAKE_CMD("replicate","Configure a node as replica of a primary node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_REPLICATE_History,0,CLUSTER_REPLICATE_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_REPLICATE_Keyspecs,0,NULL,1),.args=CLUSTER_REPLICATE_Args},
{MAKE_CMD("replicate","Configure a node as replica of a primary node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_REPLICATE_History,0,CLUSTER_REPLICATE_Tips,0,clusterCommand,-3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_REPLICATE_Keyspecs,0,NULL,1),.args=CLUSTER_REPLICATE_Args},
{MAKE_CMD("reset","Resets a node.","O(N) where N is the number of known nodes. The command may execute a FLUSHALL as a side effect.","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_RESET_History,0,CLUSTER_RESET_Tips,0,clusterCommand,-2,CMD_ADMIN|CMD_STALE|CMD_NOSCRIPT,0,CLUSTER_RESET_Keyspecs,0,NULL,1),.args=CLUSTER_RESET_Args},
{MAKE_CMD("saveconfig","Forces a node to save the cluster configuration to disk.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SAVECONFIG_History,0,CLUSTER_SAVECONFIG_Tips,0,clusterCommand,2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SAVECONFIG_Keyspecs,0,NULL,0)},
{MAKE_CMD("set-config-epoch","Sets the configuration epoch for a new node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SET_CONFIG_EPOCH_History,0,CLUSTER_SET_CONFIG_EPOCH_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SET_CONFIG_EPOCH_Keyspecs,0,NULL,1),.args=CLUSTER_SET_CONFIG_EPOCH_Args},
Expand Down

0 comments on commit 91589d1

Please sign in to comment.