From 095d3dfa43c3880e40f48d1818be6533622a063d Mon Sep 17 00:00:00 2001
From: zackcam <zackcam@amazon.com>
Date: Mon, 13 Jan 2025 18:33:47 +0000
Subject: [PATCH] Adding a bloom command keyspace test

Signed-off-by: zackcam <zackcam@amazon.com>
---
 tests/test_bloom_basic.py            | 23 +++++----
 tests/test_bloom_keyspace.py         | 70 ++++++++++++++++++++++++++++
 tests/test_bloom_save_and_restore.py |  2 +-
 tests/valkey_bloom_test_case.py      |  1 +
 4 files changed, 87 insertions(+), 9 deletions(-)
 create mode 100644 tests/test_bloom_keyspace.py

diff --git a/tests/test_bloom_basic.py b/tests/test_bloom_basic.py
index 9fd2975..9508bfc 100644
--- a/tests/test_bloom_basic.py
+++ b/tests/test_bloom_basic.py
@@ -81,9 +81,9 @@ def test_too_large_bloom_obj(self):
         # is rejected.
         assert client.execute_command('CONFIG SET bf.bloom-memory-usage-limit 1000') == b'OK'
         cmds = [
-            'BF.INSERT filter items new_item1',
-            'BF.ADD filter new_item1',
-            'BF.MADD filter new_item1 new_item2',
+            'BF.INSERT filter ITEMS',
+            'BF.ADD filter',
+            'BF.MADD filter',
         ]
         # Fill a filter to capacity.
         assert client.execute_command('BF.RESERVE filter 0.001 100 EXPANSION 10') == b'OK'
@@ -94,12 +94,19 @@ def test_too_large_bloom_obj(self):
         assert client.execute_command('BF.INFO filter FILTERS') == 1
         assert client.execute_command('BF.INFO filter EXPANSION') == 10
         # Validate that scale out is rejected with appropriate error.
+        new_item_idx = 0
         for cmd in cmds:
-            if "BF.ADD" in cmd:
-                self.verify_error_response(self.client, cmd, obj_exceeds_size_err)
-            else:
-                response = client.execute_command(cmd)
-                assert obj_exceeds_size_err in str(response[0])
+            response = ""
+            while obj_exceeds_size_err not in response:
+                item = f"new_item{new_item_idx}"
+                new_item_idx += 1
+                if "BF.ADD" in cmd:
+                    response = self.verify_error_response(self.client,f"{cmd} {item}", obj_exceeds_size_err)
+                else:
+                    response = str(client.execute_command(f"{cmd} {item}"))
+                if "1" in response:
+                    assert False, f"{cmd} returned a value of 1 when it should have thrown an {obj_exceeds_size_err}"
+            new_item_idx -= 1
 
     def test_large_allocation_when_below_maxmemory(self):
         two_megabytes = 2 * 1024 * 1024
diff --git a/tests/test_bloom_keyspace.py b/tests/test_bloom_keyspace.py
new file mode 100644
index 0000000..fd2dc5b
--- /dev/null
+++ b/tests/test_bloom_keyspace.py
@@ -0,0 +1,70 @@
+import logging, time
+from valkey_bloom_test_case import ValkeyBloomTestCaseBase
+from valkeytests.conftest import resource_port_tracker
+
+class TestKeyEventNotifications(ValkeyBloomTestCaseBase):
+    RESERVE_KEYSPACE_MESSAGE = {'type': 'pmessage', 'pattern': b'__key*__:*', 'channel': b'__keyspace@0__:intermediate_val', 'data': b'bloom.reserve'}
+    RESERVE_KEYEVENT_MESSAGE = {'type': 'pmessage', 'pattern': b'__key*__:*', 'channel': b'__keyevent@0__:bloom.reserve', 'data': b'intermediate_val'}
+    ADD_KEYSPACE_MESSAGE = {'type': 'pmessage', 'pattern': b'__key*__:*', 'channel': b'__keyspace@0__:intermediate_val', 'data': b'bloom.add'} 
+    ADD_KEYEVENT_MESSAGE = {'type': 'pmessage', 'pattern': b'__key*__:*', 'channel': b'__keyevent@0__:bloom.add', 'data': b'intermediate_val'}
+
+    def create_expected_message_list(self, reserve_expected, add_expected, key_name):
+        expected_messages = []
+        self.RESERVE_KEYSPACE_MESSAGE['channel'] = f"__keyspace@0__:{key_name}".encode('utf-8')
+        self.RESERVE_KEYEVENT_MESSAGE['data'] = f"{key_name}".encode('utf-8')
+        self.ADD_KEYSPACE_MESSAGE['channel'] = f"__keyspace@0__:{key_name}".encode('utf-8')
+        self.ADD_KEYEVENT_MESSAGE['data'] = f"{key_name}".encode('utf-8')
+        if reserve_expected:
+            expected_messages.append(self.RESERVE_KEYEVENT_MESSAGE)
+            expected_messages.append(self.RESERVE_KEYSPACE_MESSAGE)
+        if add_expected:
+            expected_messages.append(self.ADD_KEYSPACE_MESSAGE)
+            expected_messages.append(self.ADD_KEYEVENT_MESSAGE)
+        return expected_messages
+
+    def check_response(self, result_messages, expected_messages):
+        extra_message = self.keyspace_client_subscribe.get_message()
+        if extra_message:
+            assert False, f"Unexpected extra message returned: {extra_message}"
+        for message in expected_messages:
+            assert message in result_messages, f"{message} was not found in messages received"
+
+    def get_subscribe_client_messages(self, client, cmd, expected_message_count):
+        client.execute_command(cmd)
+        count = 0
+        messages = []
+        timeout = time.time() + 5
+        while expected_message_count != count:
+            message = self.keyspace_client_subscribe.get_message()
+            if message:
+                # Only for the first time we get messages we should skip the first message gotten
+                if count > 0 or "BF.ADD" not in cmd:
+                    messages.append(message)    
+                count = count + 1
+            if timeout < time.time():
+                assert False, f"The number of expected messages failed tor eturn in time, messages received so far {messages}"
+        return messages
+    
+    def test_keyspace_bloom_commands(self):
+        self.create_subscribe_clients()
+        # The first call to get messages will return message that shows we subscribed to messages so we expect one more message than we need to check for
+        # the first time we look at messages
+        bloom_commands = [
+            ('BF.ADD add_test key', True, True, 5),
+            ('BF.MADD madd_test key1 key2', True, True, 4),
+            ('BF.EXISTS exists_test key', False, False, 0),
+            ('BF.INSERT insert_test ITEMS key1 key2', True, True, 4),
+            ('BF.RESERVE reserve_test 0.01 1000', True, False, 2)
+        ]
+
+        for command, reserve_expected, add_expected, expected_message_count in bloom_commands:
+            expected_messages = self.create_expected_message_list(reserve_expected, add_expected, command.split()[1]) if reserve_expected else []
+            result_messages = self.get_subscribe_client_messages(self.keyspace_client, command, expected_message_count)
+            self.check_response(result_messages, expected_messages)
+
+    def create_subscribe_clients(self):
+        self.keyspace_client = self.server.get_new_client()
+        self.keyspace_client_subscribe = self.keyspace_client.pubsub()
+        self.keyspace_client_subscribe.psubscribe('__key*__:*')
+        self.keyspace_client.execute_command('CONFIG' ,'SET','notify-keyspace-events', 'KEA')
+    
\ No newline at end of file
diff --git a/tests/test_bloom_save_and_restore.py b/tests/test_bloom_save_and_restore.py
index b13e6c4..8937ba8 100644
--- a/tests/test_bloom_save_and_restore.py
+++ b/tests/test_bloom_save_and_restore.py
@@ -77,7 +77,7 @@ def test_restore_failed_large_bloom_filter(self):
         # Create a large bloom filter.
         # When we try to restore this on a server with the default max allowed filter size of 128MB, start up should fail.
         updated_max_size = 180 * 1024 * 1024
-        original_max_size = 64 * 1024 * 1024
+        original_max_size = int(client.execute_command('CONFIG GET bf.bloom-memory-usage-limit')[1])
         bf_add_result_1 = client.execute_command('CONFIG SET bf.bloom-memory-usage-limit ' + str(updated_max_size))
         client.execute_command('BF.RESERVE testSave 0.001 100000000')
         assert int(client.execute_command('BF.INFO testSave size')) > original_max_size
diff --git a/tests/valkey_bloom_test_case.py b/tests/valkey_bloom_test_case.py
index bcc1516..0581401 100644
--- a/tests/valkey_bloom_test_case.py
+++ b/tests/valkey_bloom_test_case.py
@@ -31,6 +31,7 @@ def verify_error_response(self, client, cmd, expected_err_reply):
         except ResponseError as e:
             assert_error_msg = f"Actual error message: '{str(e)}' is different from expected error message '{expected_err_reply}'"
             assert str(e) == expected_err_reply, assert_error_msg
+            return str(e)
 
     def verify_command_success_reply(self, client, cmd, expected_result):
         cmd_actual_result = client.execute_command(cmd)