Skip to content

Commit

Permalink
Adding a bloom command keyspace test
Browse files Browse the repository at this point in the history
Signed-off-by: zackcam <[email protected]>
  • Loading branch information
zackcam committed Jan 16, 2025
1 parent 1bb7d25 commit 095d3df
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 9 deletions.
23 changes: 15 additions & 8 deletions tests/test_bloom_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ def test_too_large_bloom_obj(self):
# is rejected.
assert client.execute_command('CONFIG SET bf.bloom-memory-usage-limit 1000') == b'OK'
cmds = [
'BF.INSERT filter items new_item1',
'BF.ADD filter new_item1',
'BF.MADD filter new_item1 new_item2',
'BF.INSERT filter ITEMS',
'BF.ADD filter',
'BF.MADD filter',
]
# Fill a filter to capacity.
assert client.execute_command('BF.RESERVE filter 0.001 100 EXPANSION 10') == b'OK'
Expand All @@ -94,12 +94,19 @@ def test_too_large_bloom_obj(self):
assert client.execute_command('BF.INFO filter FILTERS') == 1
assert client.execute_command('BF.INFO filter EXPANSION') == 10
# Validate that scale out is rejected with appropriate error.
new_item_idx = 0
for cmd in cmds:
if "BF.ADD" in cmd:
self.verify_error_response(self.client, cmd, obj_exceeds_size_err)
else:
response = client.execute_command(cmd)
assert obj_exceeds_size_err in str(response[0])
response = ""
while obj_exceeds_size_err not in response:
item = f"new_item{new_item_idx}"
new_item_idx += 1
if "BF.ADD" in cmd:
response = self.verify_error_response(self.client,f"{cmd} {item}", obj_exceeds_size_err)
else:
response = str(client.execute_command(f"{cmd} {item}"))
if "1" in response:
assert False, f"{cmd} returned a value of 1 when it should have thrown an {obj_exceeds_size_err}"
new_item_idx -= 1

def test_large_allocation_when_below_maxmemory(self):
two_megabytes = 2 * 1024 * 1024
Expand Down
70 changes: 70 additions & 0 deletions tests/test_bloom_keyspace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import logging, time
from valkey_bloom_test_case import ValkeyBloomTestCaseBase
from valkeytests.conftest import resource_port_tracker

class TestKeyEventNotifications(ValkeyBloomTestCaseBase):
RESERVE_KEYSPACE_MESSAGE = {'type': 'pmessage', 'pattern': b'__key*__:*', 'channel': b'__keyspace@0__:intermediate_val', 'data': b'bloom.reserve'}
RESERVE_KEYEVENT_MESSAGE = {'type': 'pmessage', 'pattern': b'__key*__:*', 'channel': b'__keyevent@0__:bloom.reserve', 'data': b'intermediate_val'}
ADD_KEYSPACE_MESSAGE = {'type': 'pmessage', 'pattern': b'__key*__:*', 'channel': b'__keyspace@0__:intermediate_val', 'data': b'bloom.add'}
ADD_KEYEVENT_MESSAGE = {'type': 'pmessage', 'pattern': b'__key*__:*', 'channel': b'__keyevent@0__:bloom.add', 'data': b'intermediate_val'}

def create_expected_message_list(self, reserve_expected, add_expected, key_name):
expected_messages = []
self.RESERVE_KEYSPACE_MESSAGE['channel'] = f"__keyspace@0__:{key_name}".encode('utf-8')
self.RESERVE_KEYEVENT_MESSAGE['data'] = f"{key_name}".encode('utf-8')
self.ADD_KEYSPACE_MESSAGE['channel'] = f"__keyspace@0__:{key_name}".encode('utf-8')
self.ADD_KEYEVENT_MESSAGE['data'] = f"{key_name}".encode('utf-8')
if reserve_expected:
expected_messages.append(self.RESERVE_KEYEVENT_MESSAGE)
expected_messages.append(self.RESERVE_KEYSPACE_MESSAGE)
if add_expected:
expected_messages.append(self.ADD_KEYSPACE_MESSAGE)
expected_messages.append(self.ADD_KEYEVENT_MESSAGE)
return expected_messages

def check_response(self, result_messages, expected_messages):
extra_message = self.keyspace_client_subscribe.get_message()
if extra_message:
assert False, f"Unexpected extra message returned: {extra_message}"
for message in expected_messages:
assert message in result_messages, f"{message} was not found in messages received"

def get_subscribe_client_messages(self, client, cmd, expected_message_count):
client.execute_command(cmd)
count = 0
messages = []
timeout = time.time() + 5
while expected_message_count != count:
message = self.keyspace_client_subscribe.get_message()
if message:
# Only for the first time we get messages we should skip the first message gotten
if count > 0 or "BF.ADD" not in cmd:
messages.append(message)
count = count + 1
if timeout < time.time():
assert False, f"The number of expected messages failed tor eturn in time, messages received so far {messages}"
return messages

def test_keyspace_bloom_commands(self):
self.create_subscribe_clients()
# The first call to get messages will return message that shows we subscribed to messages so we expect one more message than we need to check for
# the first time we look at messages
bloom_commands = [
('BF.ADD add_test key', True, True, 5),
('BF.MADD madd_test key1 key2', True, True, 4),
('BF.EXISTS exists_test key', False, False, 0),
('BF.INSERT insert_test ITEMS key1 key2', True, True, 4),
('BF.RESERVE reserve_test 0.01 1000', True, False, 2)
]

for command, reserve_expected, add_expected, expected_message_count in bloom_commands:
expected_messages = self.create_expected_message_list(reserve_expected, add_expected, command.split()[1]) if reserve_expected else []
result_messages = self.get_subscribe_client_messages(self.keyspace_client, command, expected_message_count)
self.check_response(result_messages, expected_messages)

def create_subscribe_clients(self):
self.keyspace_client = self.server.get_new_client()
self.keyspace_client_subscribe = self.keyspace_client.pubsub()
self.keyspace_client_subscribe.psubscribe('__key*__:*')
self.keyspace_client.execute_command('CONFIG' ,'SET','notify-keyspace-events', 'KEA')

2 changes: 1 addition & 1 deletion tests/test_bloom_save_and_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def test_restore_failed_large_bloom_filter(self):
# Create a large bloom filter.
# When we try to restore this on a server with the default max allowed filter size of 128MB, start up should fail.
updated_max_size = 180 * 1024 * 1024
original_max_size = 64 * 1024 * 1024
original_max_size = int(client.execute_command('CONFIG GET bf.bloom-memory-usage-limit')[1])
bf_add_result_1 = client.execute_command('CONFIG SET bf.bloom-memory-usage-limit ' + str(updated_max_size))
client.execute_command('BF.RESERVE testSave 0.001 100000000')
assert int(client.execute_command('BF.INFO testSave size')) > original_max_size
Expand Down
1 change: 1 addition & 0 deletions tests/valkey_bloom_test_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def verify_error_response(self, client, cmd, expected_err_reply):
except ResponseError as e:
assert_error_msg = f"Actual error message: '{str(e)}' is different from expected error message '{expected_err_reply}'"
assert str(e) == expected_err_reply, assert_error_msg
return str(e)

def verify_command_success_reply(self, client, cmd, expected_result):
cmd_actual_result = client.execute_command(cmd)
Expand Down

0 comments on commit 095d3df

Please sign in to comment.