Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace dict with new hashtable: hash datatype #1502

Merged
merged 3 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1922,7 +1922,7 @@ int rewriteSortedSetObject(rio *r, robj *key, robj *o) {
/* Write either the key or the value of the currently selected item of a hash.
* The 'hi' argument passes a valid hash iterator.
* The 'what' filed specifies if to write a key or a value and can be
* either OBJ_HASH_KEY or OBJ_HASH_VALUE.
* either OBJ_HASH_FIELD or OBJ_HASH_VALUE.
*
* The function returns 0 on error, non-zero on success. */
static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
Expand All @@ -1936,7 +1936,7 @@ static int rioWriteHashIteratorCursor(rio *r, hashTypeIterator *hi, int what) {
return rioWriteBulkString(r, (char *)vstr, vlen);
else
return rioWriteBulkLongLong(r, vll);
} else if (hi->encoding == OBJ_ENCODING_HT) {
} else if (hi->encoding == OBJ_ENCODING_HASHTABLE) {
sds value = hashTypeCurrentFromHashTable(hi, what);
return rioWriteBulkString(r, value, sdslen(value));
}
Expand All @@ -1963,7 +1963,7 @@ int rewriteHashObject(rio *r, robj *key, robj *o) {
}
}

if (!rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_KEY) || !rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_VALUE)) {
if (!rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_FIELD) || !rioWriteHashIteratorCursor(r, &hi, OBJ_HASH_VALUE)) {
hashTypeResetIterator(&hi);
return 0;
}
Expand Down
68 changes: 18 additions & 50 deletions src/db.c
Original file line number Diff line number Diff line change
Expand Up @@ -979,39 +979,6 @@ void keysScanCallback(void *privdata, void *entry) {

/* This callback is used by scanGenericCommand in order to collect elements
* returned by the dictionary iterator into a list. */
void dictScanCallback(void *privdata, const dictEntry *de) {
scanData *data = (scanData *)privdata;
list *keys = data->keys;
robj *o = data->o;
sds val = NULL;
sds key = NULL;
data->sampled++;

/* This callback is only used for scanning elements within a key (hash
* fields, set elements, etc.) so o must be set here. */
serverAssert(o != NULL);

/* Filter element if it does not match the pattern. */
sds keysds = dictGetKey(de);
if (data->pattern) {
if (!stringmatchlen(data->pattern, sdslen(data->pattern), keysds, sdslen(keysds), 0)) {
return;
}
}

if (o->type == OBJ_HASH) {
key = keysds;
if (!data->only_keys) {
val = dictGetVal(de);
}
} else {
serverPanic("Type not handled in dict SCAN callback.");
}

listAddNodeTail(keys, key);
if (val) listAddNodeTail(keys, val);
}

void hashtableScanCallback(void *privdata, void *entry) {
scanData *data = (scanData *)privdata;
sds val = NULL;
Expand All @@ -1025,14 +992,20 @@ void hashtableScanCallback(void *privdata, void *entry) {
* fields, set elements, etc.) so o must be set here. */
serverAssert(o != NULL);

/* get key */
/* get key, value */
if (o->type == OBJ_SET) {
key = (sds)entry;
} else if (o->type == OBJ_ZSET) {
zskiplistNode *node = (zskiplistNode *)entry;
key = node->ele;
/* zset data is copied after filtering by key */
} else if (o->type == OBJ_HASH) {
key = hashTypeEntryGetField(entry);
if (!data->only_keys) {
val = hashTypeEntryGetValue(entry);
}
} else {
serverPanic("Type not handled in hashset SCAN callback.");
serverPanic("Type not handled in hashtable SCAN callback.");
}

/* Filter element if it does not match the pattern. */
Expand All @@ -1042,9 +1015,9 @@ void hashtableScanCallback(void *privdata, void *entry) {
}
}

if (o->type == OBJ_SET) {
/* no value, key used by reference */
} else if (o->type == OBJ_ZSET) {
/* zset data must be copied. Do this after filtering to avoid unneeded
* allocations. */
if (o->type == OBJ_ZSET) {
/* zset data is copied */
zskiplistNode *node = (zskiplistNode *)entry;
key = sdsdup(node->ele);
Expand All @@ -1053,8 +1026,6 @@ void hashtableScanCallback(void *privdata, void *entry) {
int len = ld2string(buf, sizeof(buf), node->score, LD_STR_AUTO);
val = sdsnewlen(buf, len);
}
} else {
serverPanic("Type not handled in hashset SCAN callback.");
}

listAddNodeTail(keys, key);
Expand Down Expand Up @@ -1193,20 +1164,19 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
* cursor to zero to signal the end of the iteration. */

/* Handle the case of kvstore, dict or hashtable. */
dict *dict_table = NULL;
hashtable *hashtable_table = NULL;
hashtable *ht = NULL;
int shallow_copied_list_items = 0;
if (o == NULL) {
shallow_copied_list_items = 1;
} else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HASHTABLE) {
hashtable_table = o->ptr;
ht = o->ptr;
shallow_copied_list_items = 1;
} else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
dict_table = o->ptr;
} else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HASHTABLE) {
ht = o->ptr;
shallow_copied_list_items = 1;
} else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
hashtable_table = zs->ht;
ht = zs->ht;
/* scanning ZSET allocates temporary strings even though it's a dict */
shallow_copied_list_items = 0;
}
Expand All @@ -1220,7 +1190,7 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
}

/* For main hash table scan or scannable data structure. */
if (!o || dict_table || hashtable_table) {
if (!o || ht) {
/* We set the max number of iterations to ten times the specified
* COUNT, so if the hash table is in a pathological state (very
* sparsely populated) we avoid to block too much time at the cost
Expand Down Expand Up @@ -1260,10 +1230,8 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
* If cursor is empty, we should try exploring next non-empty slot. */
if (o == NULL) {
cursor = kvstoreScan(c->db->keys, cursor, onlydidx, keysScanCallback, NULL, &data);
} else if (dict_table) {
cursor = dictScan(dict_table, cursor, dictScanCallback, &data);
} else {
cursor = hashtableScan(hashtable_table, cursor, hashtableScanCallback, &data);
cursor = hashtableScan(ht, cursor, hashtableScanCallback, &data);
}
} while (cursor && maxiterations-- && data.sampled < count);
} else if (o->type == OBJ_SET) {
Expand Down
12 changes: 3 additions & 9 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ void xorObjectDigest(serverDb *db, robj *keyobj, unsigned char *digest, robj *o)
sds sdsele;

memset(eledigest, 0, 20);
sdsele = hashTypeCurrentObjectNewSds(&hi, OBJ_HASH_KEY);
sdsele = hashTypeCurrentObjectNewSds(&hi, OBJ_HASH_FIELD);
mixDigest(eledigest, sdsele, sdslen(sdsele));
sdsfree(sdsele);
sdsele = hashTypeCurrentObjectNewSds(&hi, OBJ_HASH_VALUE);
Expand Down Expand Up @@ -923,23 +923,17 @@ void debugCommand(client *c) {
robj *o = objectCommandLookupOrReply(c, c->argv[2], shared.nokeyerr);
if (o == NULL) return;

/* Get the dict reference from the object, if possible. */
dict *d = NULL;
/* Get the hashtable reference from the object, if possible. */
hashtable *ht = NULL;
switch (o->encoding) {
case OBJ_ENCODING_SKIPLIST: {
zset *zs = o->ptr;
ht = zs->ht;
} break;
case OBJ_ENCODING_HT: d = o->ptr; break;
case OBJ_ENCODING_HASHTABLE: ht = o->ptr; break;
}

if (d != NULL) {
char buf[4096];
dictGetStats(buf, sizeof(buf), d, full);
addReplyVerbatim(c, buf, strlen(buf), "txt");
} else if (ht != NULL) {
if (ht != NULL) {
char buf[4096];
hashtableGetStats(buf, sizeof(buf), ht, full);
addReplyVerbatim(c, buf, strlen(buf), "txt");
Expand Down
62 changes: 30 additions & 32 deletions src/defrag.c
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,6 @@ void activeDefragSdsHashtableCallback(void *privdata, void *entry_ref) {
if (new_sds != NULL) *sds_ref = new_sds;
}

void activeDefragSdsHashtable(hashtable *ht) {
unsigned long cursor = 0;
do {
cursor = hashtableScanDefrag(ht, cursor, activeDefragSdsHashtableCallback, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
} while (cursor != 0);
}

/* Defrag a list of ptr, sds or robj string values */
static void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
quicklistNode *newnode, *node = *node_ref;
Expand Down Expand Up @@ -481,26 +474,24 @@ static void scanHashtableCallbackCountScanned(void *privdata, void *elemref) {
server.stat_active_defrag_scanned++;
}

/* Used as dict scan callback when all the work is done in the dictDefragFunctions. */
static void scanCallbackCountScanned(void *privdata, const dictEntry *de) {
UNUSED(privdata);
UNUSED(de);
server.stat_active_defrag_scanned++;
}

static void scanLaterSet(robj *ob, unsigned long *cursor) {
if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HASHTABLE) return;
hashtable *ht = ob->ptr;
*cursor = hashtableScanDefrag(ht, *cursor, activeDefragSdsHashtableCallback, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
}

static void activeDefragHashTypeEntry(void *privdata, void *element_ref) {
SoftlyRaining marked this conversation as resolved.
Show resolved Hide resolved
UNUSED(privdata);
hashTypeEntry **entry_ref = (hashTypeEntry **)element_ref;

hashTypeEntry *new_entry = hashTypeEntryDefrag(*entry_ref, activeDefragAlloc, activeDefragSds);
if (new_entry) *entry_ref = new_entry;
}

static void scanLaterHash(robj *ob, unsigned long *cursor) {
if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HT) return;
dict *d = ob->ptr;
dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc,
.defragKey = (dictDefragAllocFunction *)activeDefragSds,
.defragVal = (dictDefragAllocFunction *)activeDefragSds};
*cursor = dictScanDefrag(d, *cursor, scanCallbackCountScanned, &defragfns, NULL);
if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HASHTABLE) return;
hashtable *ht = ob->ptr;
*cursor = hashtableScanDefrag(ht, *cursor, activeDefragHashTypeEntry, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
}

static void defragQuicklist(robj *ob) {
Expand Down Expand Up @@ -538,15 +529,19 @@ static void defragZsetSkiplist(robj *ob) {
}

static void defragHash(robj *ob) {
dict *d, *newd;
serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT);
d = ob->ptr;
if (dictSize(d) > server.active_defrag_max_scan_fields)
serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HASHTABLE);
hashtable *ht = ob->ptr;
if (hashtableSize(ht) > server.active_defrag_max_scan_fields) {
defragLater(ob);
else
activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS);
/* defrag the dict struct and tables */
if ((newd = dictDefragTables(ob->ptr))) ob->ptr = newd;
} else {
unsigned long cursor = 0;
do {
cursor = hashtableScanDefrag(ht, cursor, activeDefragHashTypeEntry, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
} while (cursor != 0);
}
/* defrag the hashtable struct and tables */
hashtable *new_hashtable = hashtableDefragTables(ht, activeDefragAlloc);
if (new_hashtable) ob->ptr = new_hashtable;
}

static void defragSet(robj *ob) {
Expand All @@ -555,11 +550,14 @@ static void defragSet(robj *ob) {
if (hashtableSize(ht) > server.active_defrag_max_scan_fields) {
defragLater(ob);
} else {
activeDefragSdsHashtable(ht);
unsigned long cursor = 0;
do {
cursor = hashtableScanDefrag(ht, cursor, activeDefragSdsHashtableCallback, NULL, activeDefragAlloc, HASHTABLE_SCAN_EMIT_REF);
} while (cursor != 0);
}
/* defrag the hashtable struct and tables */
hashtable *newHashtable = hashtableDefragTables(ht, activeDefragAlloc);
if (newHashtable) ob->ptr = newHashtable;
hashtable *new_hashtable = hashtableDefragTables(ht, activeDefragAlloc);
if (new_hashtable) ob->ptr = new_hashtable;
}

/* Defrag callback for radix tree iterator, called for each node,
Expand Down Expand Up @@ -776,7 +774,7 @@ static void defragKey(defragKeysCtx *ctx, robj **elemref) {
} else if (ob->type == OBJ_HASH) {
if (ob->encoding == OBJ_ENCODING_LISTPACK) {
if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl;
} else if (ob->encoding == OBJ_ENCODING_HT) {
} else if (ob->encoding == OBJ_ENCODING_HASHTABLE) {
defragHash(ob);
} else {
serverPanic("Unknown hash encoding");
Expand Down
6 changes: 3 additions & 3 deletions src/lazyfree.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ size_t lazyfreeGetFreeEffort(robj *key, robj *obj, int dbid) {
} else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = obj->ptr;
return zs->zsl->length;
} else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
dict *ht = obj->ptr;
return dictSize(ht);
} else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HASHTABLE) {
hashtable *ht = obj->ptr;
return hashtableSize(ht);
} else if (obj->type == OBJ_STREAM) {
size_t effort = 0;
stream *s = obj->ptr;
Expand Down
35 changes: 6 additions & 29 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -11090,25 +11090,6 @@ typedef struct {
ValkeyModuleScanKeyCB fn;
} ScanKeyCBData;

static void moduleScanKeyDictCallback(void *privdata, const dictEntry *de) {
ScanKeyCBData *data = privdata;
sds key = dictGetKey(de);
robj *o = data->key->value;
robj *field = createStringObject(key, sdslen(key));
robj *value = NULL;

if (o->type == OBJ_HASH) {
sds val = dictGetVal(de);
value = createStringObject(val, sdslen(val));
} else {
serverPanic("unexpected object type");
}

data->fn(data->key, field, value, data->user_data);
decrRefCount(field);
if (value) decrRefCount(value);
}

static void moduleScanKeyHashtableCallback(void *privdata, void *entry) {
ScanKeyCBData *data = privdata;
robj *o = data->key->value;
Expand All @@ -11122,6 +11103,10 @@ static void moduleScanKeyHashtableCallback(void *privdata, void *entry) {
zskiplistNode *node = (zskiplistNode *)entry;
key = node->ele;
value = createStringObjectFromLongDouble(node->score, 0);
} else if (o->type == OBJ_HASH) {
key = hashTypeEntryGetField(entry);
sds val = hashTypeEntryGetValue(entry);
value = createStringObject(val, sdslen(val));
} else {
serverPanic("unexpected object type");
}
Expand Down Expand Up @@ -11185,13 +11170,12 @@ int VM_ScanKey(ValkeyModuleKey *key, ValkeyModuleScanCursor *cursor, ValkeyModul
errno = EINVAL;
return 0;
}
dict *d = NULL;
hashtable *ht = NULL;
robj *o = key->value;
if (o->type == OBJ_SET) {
if (o->encoding == OBJ_ENCODING_HASHTABLE) ht = o->ptr;
} else if (o->type == OBJ_HASH) {
if (o->encoding == OBJ_ENCODING_HT) d = o->ptr;
if (o->encoding == OBJ_ENCODING_HASHTABLE) ht = o->ptr;
} else if (o->type == OBJ_ZSET) {
if (o->encoding == OBJ_ENCODING_SKIPLIST) ht = ((zset *)o->ptr)->ht;
} else {
Expand All @@ -11203,14 +11187,7 @@ int VM_ScanKey(ValkeyModuleKey *key, ValkeyModuleScanCursor *cursor, ValkeyModul
return 0;
}
int ret = 1;
if (d) {
ScanKeyCBData data = {key, privdata, fn};
cursor->cursor = dictScan(d, cursor->cursor, moduleScanKeyDictCallback, &data);
if (cursor->cursor == 0) {
cursor->done = 1;
ret = 0;
}
} else if (ht) {
if (ht) {
ScanKeyCBData data = {key, privdata, fn};
cursor->cursor = hashtableScan(ht, cursor->cursor, moduleScanKeyHashtableCallback, &data);
if (cursor->cursor == 0) {
Expand Down
Loading
Loading