diff --git a/src/rmt_core.c b/src/rmt_core.c index 940d178..470979d 100644 --- a/src/rmt_core.c +++ b/src/rmt_core.c @@ -2511,12 +2511,12 @@ void redis_migrate(rmtContext *ctx, int type) node_next_nodes_count ++; rnode = rnode->next; } - if (node_next_nodes_count != wdata->nodes_count && + /*if (node_next_nodes_count != wdata->nodes_count && srgroup->kind != GROUP_TYPE_RDBFILE) { log_error("Error: node_next_nodes_count %d != write_data->nodes_count %d.", node_next_nodes_count, wdata->nodes_count); goto done; - } + }*/ } if (threads_hold_nodes_count != node_count) { log_error("Error: write threads hold node count %s is wrong", diff --git a/src/rmt_redis.c b/src/rmt_redis.c index dfc58e6..4ad9fe8 100644 --- a/src/rmt_redis.c +++ b/src/rmt_redis.c @@ -521,6 +521,8 @@ int redis_group_init(rmtContext *ctx, redis_group *rgroup, rgroup->key_hash = NULL; rgroup->ncontinuum = 0; + rgroup->distribution = CONF_UNSET_DIST; + rgroup->ctx = ctx; if(source) { @@ -667,6 +669,7 @@ void redis_group_deinit(redis_group *rgroup) rgroup->ncontinuum = 0; rgroup->ctx = NULL; + rgroup->distribution = CONF_UNSET_DIST; } int redis_rdb_init(redis_rdb *rdb, const char *addr, int type) @@ -6552,11 +6555,13 @@ int redis_parse_rdb_file(redis_node *srnode, int mbuf_count_one_time) log_debug(LOG_DEBUG, "key: %s, value array length: %u", key, array_n(value)); - if (rdb->handler != NULL && - (srgroup->kind == GROUP_TYPE_SINGLE || srgroup->get_backend_node == NULL || - srgroup->get_backend_node(srgroup, key, sdslen(key)) == srnode) && - (ctx->filter == NULL || - stringmatchlen(ctx->filter, sdslen(ctx->filter), key, sdslen(key), 0))) { + if (rdb->handler != NULL + && (srgroup->kind == GROUP_TYPE_SINGLE + || srgroup->get_backend_node == NULL + || (srgroup->distribution != DIST_RANDOM && srgroup->get_backend_node(srgroup, key, sdslen(key)) == srnode) + || srgroup->distribution == DIST_RANDOM) + && (ctx->filter == NULL + || stringmatchlen(ctx->filter, sdslen(ctx->filter), key, sdslen(key), 0))) { ret = rdb->handler(srnode, key, data_type, value, expiretime_type, expiretime, trgroup); if (ret < 0) { @@ -7786,6 +7791,8 @@ redis_twem_init_from_conf(redis_group *rgroup, conf_pool *cp) parts = NULL; } + rgroup->distribution = cp->distribution; + switch(cp->distribution){ case DIST_KETAMA: ret = redis_twem_init_route_with_ketama(rgroup, &nodes, total_weight); @@ -7896,8 +7903,8 @@ redis_twem_random_dispatch(struct array *continuums, uint32_t ncontinuum, uint32 uint32_t redis_twem_backend_idx(redis_group *rgroup, uint8_t *key, uint32_t keylen) { - uint32_t idx, hash; - int distribution = DIST_KETAMA; + uint32_t idx = 0, hash; + int distribution = rgroup->distribution;//DIST_KETAMA; struct continuum *continuum; RMT_NOTUSED(rgroup); @@ -7930,8 +7937,8 @@ redis_twem_backend_idx(redis_group *rgroup, uint8_t *key, uint32_t keylen) redis_node * redis_twem_backend_node(redis_group *rgroup, uint8_t *key, uint32_t keylen) { - uint32_t idx, hash; - int distribution = DIST_KETAMA; + uint32_t idx = 0, hash; + int distribution = rgroup->distribution;//DIST_KETAMA; struct continuum *continuum; RMT_NOTUSED(rgroup); diff --git a/src/rmt_redis.h b/src/rmt_redis.h index a69bc43..088274a 100644 --- a/src/rmt_redis.h +++ b/src/rmt_redis.h @@ -140,6 +140,8 @@ typedef struct redis_group{ hash_t key_hash; uint32_t ncontinuum; /* # continuum points */ + + dist_type_t distribution; }redis_group; typedef struct redis_node{