Skip to content

Commit

Permalink
sentinel use info sentinel command to run faster
Browse files Browse the repository at this point in the history
Signed-off-by: wei.kukey <[email protected]>
  • Loading branch information
kukey committed Jan 6, 2025
1 parent 33b8241 commit 2a5d3f6
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 38 deletions.
3 changes: 3 additions & 0 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -3373,6 +3373,9 @@ standardConfig static_configs[] = {
createSpecialConfig("replicaof", "slaveof", IMMUTABLE_CONFIG | MULTI_ARG_CONFIG, setConfigReplicaOfOption, getConfigReplicaOfOption, rewriteConfigReplicaOfOption, NULL),
createSpecialConfig("latency-tracking-info-percentiles", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigLatencyTrackingInfoPercentilesOutputOption, getConfigLatencyTrackingInfoPercentilesOutputOption, rewriteConfigLatencyTrackingInfoPercentilesOutputOption, NULL),

/* Capabalities */
createBoolConfig("info-simple-for-sentinel", NULL, IMMUTABLE_CONFIG, server.info_simple_for_sentinel, 1, NULL, NULL),

/* NULL Terminator, this is dropped when we convert to the runtime array. */
{NULL},
};
Expand Down
22 changes: 19 additions & 3 deletions src/sentinel.c
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ typedef struct sentinelValkeyInstance {
* are set to NULL no script is executed. */
char *notification_script;
char *client_reconfig_script;
sds info; /* cached INFO output */
sds info; /* cached INFO output */
int info_simple; /* Instance support info simple for sentinel. */
} sentinelValkeyInstance;

/* Main state. */
Expand Down Expand Up @@ -1346,6 +1347,9 @@ sentinelValkeyInstance *createSentinelValkeyInstance(char *name,
ri->role_reported_time = mstime();
ri->replica_conf_change_time = mstime();

/* capability */
ri->info_simple = 0;

/* Add into the right table. */
dictAdd(table, ri->name, ri);
return ri;
Expand Down Expand Up @@ -2358,6 +2362,8 @@ void sentinelReconnectInstance(sentinelValkeyInstance *ri) {

/* Send a PING ASAP when reconnecting. */
sentinelSendPing(ri);

ri->info_simple = 0;
}
}
/* Pub / Sub */
Expand Down Expand Up @@ -2429,6 +2435,11 @@ void sentinelRefreshInstanceInfo(sentinelValkeyInstance *ri, const char *info) {
sentinelValkeyInstance *replica;
sds l = lines[j];

/* capability for info simple */
if (sdslen(l) >= 26 && !memcmp(l, "info_simple_for_sentinel:", 25)) {
ri->info_simple = atoi(l + 25);
}

/* run_id:<40 hex chars>*/
if (sdslen(l) >= 47 && !memcmp(l, "run_id:", 7)) {
if (ri->runid == NULL) {
Expand Down Expand Up @@ -2997,8 +3008,13 @@ void sentinelSendPeriodicCommands(sentinelValkeyInstance *ri) {

/* Send INFO to primaries and replicas, not sentinels. */
if ((ri->flags & SRI_SENTINEL) == 0 && (ri->info_refresh == 0 || (now - ri->info_refresh) > info_period)) {
retval = redisAsyncCommand(ri->link->cc, sentinelInfoReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri, "INFO"));
if (ri->info_simple) {
retval = redisAsyncCommand(ri->link->cc, sentinelInfoReplyCallback, ri, "%s %s",
sentinelInstanceMapCommand(ri, "INFO"), "SENTINEL");
} else {
retval = redisAsyncCommand(ri->link->cc, sentinelInfoReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri, "INFO"));
}
if (retval == C_OK) ri->link->pending_commands++;
}

Expand Down
99 changes: 64 additions & 35 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -5557,6 +5557,41 @@ void totalNumberOfStatefulKeys(unsigned long *blocking_keys,
if (watched_keys) *watched_keys = wkeys;
}

static sds genValKeyReplicasInfo(sds info) {
if (listLength(server.replicas)) {
int replica_id = 0;
listNode *ln;
listIter li;

listRewind(server.replicas, &li);
while ((ln = listNext(&li))) {
client *replica = listNodeValue(ln);
char ip[NET_IP_STR_LEN], *replica_ip = replica->replica_addr;
int port;
long lag = 0;

if (!replica_ip) {
if (connAddrPeerName(replica->conn, ip, sizeof(ip), &port) == -1) continue;
replica_ip = ip;
}
const char *state = replstateToString(replica->repl_state);
if (state[0] == '\0') continue;
if (replica->repl_state == REPLICA_STATE_ONLINE) lag = time(NULL) - replica->repl_ack_time;

info = sdscatprintf(info,
"slave%d:ip=%s,port=%d,state=%s,"
"offset=%lld,lag=%ld,type=%s\r\n",
replica_id, replica_ip, replica->replica_listening_port, state,
replica->repl_ack_off, lag,
replica->flag.repl_rdb_channel ? "rdb-channel"
: replica->repl_state == REPLICA_STATE_BG_RDB_LOAD ? "main-channel"
: "replica");
replica_id++;
}
}
return info;
}

/* Create the string returned by the INFO command. This is decoupled
* by the INFO command itself as we need to report the same information
* on memory corruption problems. */
Expand All @@ -5567,6 +5602,29 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
int sections = 0;
if (everything) all_sections = 1;

if (dictFind(section_dict, "sentinel") != NULL) {
info = sdscatprintf(info, FMTARGS(
"run_id:%s\r\n", server.runid,
"role:%s\r\n", server.primary_host == NULL ? "master" : "slave"));
info = genValKeyReplicasInfo(info);
if (server.primary_host) {
long long replica_repl_offset = 1;
if (server.primary) {
replica_repl_offset = server.primary->reploff;
} else if (server.cached_primary) {
replica_repl_offset = server.cached_primary->reploff;
}

info = sdscatprintf(info, FMTARGS(
"master_host:%s\r\n", server.primary_host,
"master_port:%d\r\n", server.primary_port,
"master_link_status:%s\r\n", (server.repl_state == REPL_STATE_CONNECTED) ? "up" : "down",
"slave_priority:%d\r\n", server.replica_priority,
"slave_repl_offset:%lld\r\n", replica_repl_offset,
"replica_announced:%d\r\n", server.replica_announced));
}
}

/* Server */
if (all_sections || (dictFind(section_dict, "server") != NULL)) {
static int call_uname = 1;
Expand Down Expand Up @@ -5631,7 +5689,8 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"executable:%s\r\n", server.executable ? server.executable : "",
"config_file:%s\r\n", server.configfile ? server.configfile : "",
"io_threads_active:%i\r\n", server.active_io_threads_num > 1,
"availability_zone:%s\r\n", server.availability_zone));
"availability_zone:%s\r\n", server.availability_zone,
"info_simple_for_sentinel:%i\r\n", server.info_simple_for_sentinel));

/* Conditional properties */
if (isShutdownInitiated()) {
Expand Down Expand Up @@ -6001,37 +6060,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
info = sdscatprintf(info, "min_slaves_good_slaves:%d\r\n", server.repl_good_replicas_count);
}

if (listLength(server.replicas)) {
int replica_id = 0;
listNode *ln;
listIter li;

listRewind(server.replicas, &li);
while ((ln = listNext(&li))) {
client *replica = listNodeValue(ln);
char ip[NET_IP_STR_LEN], *replica_ip = replica->replica_addr;
int port;
long lag = 0;

if (!replica_ip) {
if (connAddrPeerName(replica->conn, ip, sizeof(ip), &port) == -1) continue;
replica_ip = ip;
}
const char *state = replstateToString(replica->repl_state);
if (state[0] == '\0') continue;
if (replica->repl_state == REPLICA_STATE_ONLINE) lag = time(NULL) - replica->repl_ack_time;

info = sdscatprintf(info,
"slave%d:ip=%s,port=%d,state=%s,"
"offset=%lld,lag=%ld,type=%s\r\n",
replica_id, replica_ip, replica->replica_listening_port, state,
replica->repl_ack_off, lag,
replica->flag.repl_rdb_channel ? "rdb-channel"
: replica->repl_state == REPLICA_STATE_BG_RDB_LOAD ? "main-channel"
: "replica");
replica_id++;
}
}
info = genValKeyReplicasInfo(info);
info = sdscatprintf(
info,
FMTARGS(
Expand Down Expand Up @@ -6562,8 +6591,8 @@ void dismissMemoryInChild(void) {
/* madvise(MADV_DONTNEED) may not work if Transparent Huge Pages is enabled. */
if (server.thp_enabled) return;

/* Currently we use zmadvise_dontneed only when we use jemalloc with Linux.
* so we avoid these pointless loops when they're not going to do anything. */
/* Currently we use zmadvise_dontneed only when we use jemalloc with Linux.
* so we avoid these pointless loops when they're not going to do anything. */
#if defined(USE_JEMALLOC) && defined(__linux__)
listIter li;
listNode *ln;
Expand Down Expand Up @@ -7008,7 +7037,7 @@ __attribute__((weak)) int main(int argc, char **argv) {
}
if (server.sentinel_mode) sentinelCheckConfigFile();

/* Do system checks */
/* Do system checks */
#ifdef __linux__
linuxMemoryWarnings();
sds err_msg = NULL;
Expand Down
3 changes: 3 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -2115,6 +2115,9 @@ struct valkeyServer {
/* Local environment */
char *locale_collate;
char *debug_context; /* A free-form string that has no impact on server except being included in a crash report. */

/* capabilities */
int info_simple_for_sentinel; /* server support simple info for sentinel. */
};

#define MAX_KEYS_BUFFER 256
Expand Down
1 change: 1 addition & 0 deletions tests/unit/introspection.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ start_server {tags {"introspection"}} {
rdma-rx-size
rdma-bind
rdma-port
info-simple-for-sentinel
}

if {!$::tls} {
Expand Down

0 comments on commit 2a5d3f6

Please sign in to comment.