Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Quotient remove #127

Merged
merged 5 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 134 additions & 24 deletions probables/quotientfilter/quotientfilter.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
""" BloomFilter and BloomFiter on Disk, python implementation
""" Quotient Filter, python implementation
License: MIT
Author: Tyler Barrus ([email protected])
"""

import sys
from array import array
from typing import Iterator, List, Optional
from typing import Iterator, List, Optional, TextIO

from probables.exceptions import QuotientFilterError
from probables.hashes import KeyT, SimpleHashT, fnv_1a_32
Expand Down Expand Up @@ -39,6 +40,7 @@ class QuotientFilter:
"_filter",
"_max_load_factor",
"_auto_resize",
"__mod_size",
)

def __init__(
Expand All @@ -54,6 +56,7 @@ def __set_params(self, quotient: int, auto_expand: bool, hash_function: Optional
self._q: int = quotient
self._r: int = 32 - quotient
self._size: int = 1 << self._q # same as 2**q
self.__mod_size: int = self._size - 1
self._elements_added: int = 0
self._auto_resize: bool = auto_expand
self._hash_func: SimpleHashT = fnv_1a_32 if hash_function is None else hash_function # type: ignore
Expand Down Expand Up @@ -161,6 +164,25 @@ def add_alt(self, _hash: int) -> None:
if self._contained_at_loc(key_quotient, key_remainder) == -1:
self._add(key_quotient, key_remainder)

def remove(self, key: KeyT) -> None:
"""Remove key from the quotient filter

Args:
key (str|bytes): The element to remove
"""
_hash = self._hash_func(key, 0)
self.remove_alt(_hash)

def remove_alt(self, _hash: int) -> None:
"""Remove key from the quotient filter

Args:
_hash (int): The element to remove
"""
key_quotient = _hash >> self._r
key_remainder = _hash & ((1 << self._r) - 1)
self._remove_element(key_quotient, key_remainder)

def check(self, key: KeyT) -> bool:
"""Check to see if key is likely in the quotient filter

Expand Down Expand Up @@ -266,14 +288,15 @@ def merge(self, second: "QuotientFilter") -> None:
self.add_alt(_h)

def _shift_insert(self, q: int, r: int, orig_idx: int, insert_idx: int, flag: int):
"""Insert the element q and r by shifting elements as needed"""
if self._is_empty_element(insert_idx):
self._filter[insert_idx] = r
self._is_occupied[q] = 1
self._is_continuation[insert_idx] = 1 if insert_idx != orig_idx else 0
self._is_shifted[insert_idx] = 1 if insert_idx != q else 0

else:
next_idx = (insert_idx + 1) & (self._size - 1)
next_idx = (insert_idx + 1) & self.__mod_size

while True:
was_empty = self._is_empty_element(next_idx)
Expand All @@ -291,17 +314,18 @@ def _shift_insert(self, q: int, r: int, orig_idx: int, insert_idx: int, flag: in
if was_empty:
break

next_idx = (next_idx + 1) & (self._size - 1)
next_idx = (next_idx + 1) & self.__mod_size

self._filter[insert_idx] = r
self._is_occupied[q] = 1
self._is_continuation[insert_idx] = 1 if insert_idx != orig_idx else 0
self._is_shifted[insert_idx] = 1 if insert_idx != q else 0

if flag == 1:
self._is_continuation[(insert_idx + 1) & (self._size - 1)] = 1
self._is_continuation[(insert_idx + 1) & self.__mod_size] = 1

def _get_start_index(self, quotient: int) -> int:
"""Get the starting index for the quotient"""
if self._is_empty_element(quotient):
return quotient

Expand All @@ -313,7 +337,7 @@ def _get_start_index(self, quotient: int) -> int:
cnts += 1

if self._is_shifted[j] == 1:
j = (j - 1) & (self._size - 1)
j = (j - 1) & self.__mod_size
else:
break

Expand All @@ -323,11 +347,12 @@ def _get_start_index(self, quotient: int) -> int:
break
cnts -= 1

j = (j + 1) & (self._size - 1)
j = (j + 1) & self.__mod_size

return j

def _add(self, q: int, r: int):
"""Add an quotient into the filter"""
if self._size == self._elements_added:
raise QuotientFilterError("Unable to insert the element due to insufficient space")
if self._is_empty_element(q):
Expand All @@ -350,7 +375,7 @@ def _add(self, q: int, r: int):
)

while starts == 0 and f != 0 and r > self._filter[start_idx]:
start_idx = (start_idx + 1) & (self._size - 1)
start_idx = (start_idx + 1) & self.__mod_size

if self._is_continuation[start_idx] == 0:
starts += 1
Expand All @@ -367,6 +392,76 @@ def _add(self, q: int, r: int):
self._shift_insert(q, r, orig_start_idx, start_idx, 1)
self._elements_added += 1

def _remove_element(self, q: int, r: int) -> None:
idx = self._contained_at_loc(q, r)

# element not in the filter, exit
if idx == -1:
return

next_idx = (idx + 1) & self.__mod_size

# track if this is the only element in this run...
remove_orig_idx = False
if self._is_run_or_cluster_start(idx) and self._is_continuation[next_idx] == 0:
remove_orig_idx = True

# element is the end of a cluster and the next element is either the beginning of a cluster or empty
if self._is_empty_element(next_idx) or self._is_cluster_start(next_idx):
self._filter[idx] = 0
self._is_occupied.clear_bit(idx)
self._is_continuation.clear_bit(idx)
self._is_shifted.clear_bit(idx)

if remove_orig_idx:
self._is_occupied[q] = 0
return

# find the minimum idx for the cluster; will be needed to determine if elements are in cluster start positions.
min_idx = idx
while not self._is_cluster_start(min_idx):
min_idx = (min_idx - 1) & self.__mod_size

# this is an edge case for first move...
if self._is_run_or_cluster_start(idx) and self._is_continuation[next_idx] == 1:
self._filter[idx] = self._filter[next_idx]
self._is_continuation[idx] = 0
self._is_shifted[idx] = self._is_shifted[next_idx]

idx = next_idx
next_idx = (idx + 1) & self.__mod_size

while not self._is_cluster_start(next_idx) and not self._is_empty_element(next_idx):
self._filter[idx] = self._filter[next_idx]
self._is_continuation[idx] = self._is_continuation[next_idx]
self._is_shifted[idx] = self._is_shifted[next_idx]

idx = next_idx
next_idx = (idx + 1) & self.__mod_size
# clean out the last element
self._filter[idx] = 0
self._is_continuation[idx] = 0
self._is_shifted[idx] = 0
self._is_occupied[idx] = 0

if remove_orig_idx:
self._is_occupied[q] = 0

# now figure out if things are in the correct place....
cur_quot = -1
queue: List[int] = []
while min_idx != next_idx:
if self._is_occupied[min_idx] == 1:
queue.append(min_idx)
if self._is_run_start(min_idx) == 1:
cur_quot = queue.pop(0)

if cur_quot == min_idx:
self._is_continuation[min_idx] = 0
self._is_shifted[min_idx] = 0
self._is_occupied[min_idx] = 1
min_idx = (min_idx + 1) & self.__mod_size

def _contained_at_loc(self, q: int, r: int) -> int:
"""returns the index location of the element, or -1 if not present"""
if self._is_occupied[q] == 0:
Expand All @@ -385,34 +480,39 @@ def _contained_at_loc(self, q: int, r: int) -> int:
if self._filter[start_idx] == r:
return start_idx

start_idx = (start_idx + 1) & (self._size - 1)
start_idx = (start_idx + 1) & self.__mod_size

return -1

def _is_cluster_start(self, elt: int) -> bool:
"""Does this `elt` sit at the beginning of a cluster?"""
return self._is_occupied[elt] == 1 and self._is_continuation[elt] == 0 and self._is_shifted[elt] == 0

def _is_run_start(self, elt: int) -> bool:
"""Does `elt` sit at the beginning of a run?"""
return not self._is_continuation[elt] == 1 and (self._is_occupied[elt] == 1 or self._is_shifted[elt] == 1)

def _is_run_or_cluster_start(self, elt: int) -> bool:
if self._is_cluster_start(elt):
return True
if self._is_run_start(elt):
return True
return False

def _is_empty_element(self, elt: int) -> bool:
return (
self._is_occupied.check_bit(elt) + self._is_continuation.check_bit(elt) + self._is_shifted.check_bit(elt)
) == 0
"""Is this an empty element?"""
return (self._is_occupied[elt] + self._is_continuation[elt] + self._is_shifted[elt]) == 0

def print(self):
"""show the bits and the run/cluster/continuation/empty status"""
def print(self, file: TextIO = sys.stdout):
"""show the bits and the run/cluster/continuation/empty status, defaults to `sys.stdout`"""
print("idx\t--\tO-C-S\tStatus", file=file)
print("----------------------------------------", file=file)
for i in range(self._size):
# is_a = ""
if self._is_empty_element(i):
is_a = "Empty"
elif self._is_cluster_start(i):
is_a = "Cluster Start"
elif self._is_run_start(i):
is_a = "Run Start"
else:
is_a = "Continuation"
print(f"{i}\t--\t{self._is_occupied[i]}-{self._is_continuation[i]}-{self._is_shifted[i]}\t{is_a}")
print(
f"{i}\t--\t{self._is_occupied[i]}-{self._is_continuation[i]}-{self._is_shifted[i]}"
f"\t{self._element_is(i)}",
file=file,
)

def validate_metadata(self):
"""check for invalid bit settings"""
Expand All @@ -421,3 +521,13 @@ def validate_metadata(self):
print(f"Row failed: {i}")
if self._is_occupied[i] == 1 and self._is_continuation == 1 and self._is_shifted == 0:
print(f"Row failed: {i}")

def _element_is(self, idx):
is_a = "Continuation"
if self._is_empty_element(idx):
is_a = "Empty"
elif self._is_cluster_start(idx):
is_a = "Cluster Start"
elif self._is_run_start(idx):
is_a = "Run Start"
return is_a
Loading
Loading