Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1831140 E2e tests of Iceberg JSON ingestion #1013

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ public void insertRecord(SinkRecord kafkaSinkRecord, boolean isFirstRowPerPartit
// Simply skip inserting into the buffer if the row should be ignored after channel reset
if (needToSkipCurrentBatch) {
LOGGER.info(
"Ignore adding offset:{} to buffer for channel:{} because we recently reset offset in"
"Ignore inserting offset:{} for channel:{} because we recently reset offset in"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no buffer anymore so this log was misleading.

+ " Kafka. currentProcessedOffset:{}",
kafkaSinkRecord.kafkaOffset(),
this.getChannelNameFormatV1(),
Expand Down
3 changes: 2 additions & 1 deletion test/rest_request_template/iceberg_avro_aws.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true,
"snowflake.streaming.max.client.lag": "1"
"snowflake.streaming.max.client.lag": "1",
"snowflake.streaming.enable.single.buffer": true
}
}
3 changes: 2 additions & 1 deletion test/rest_request_template/iceberg_json_aws.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"errors.deadletterqueue.topic.name": "DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true,
"snowflake.streaming.max.client.lag": "1"
"snowflake.streaming.max.client.lag": "1",
"snowflake.streaming.enable.single.buffer": true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true,
"snowflake.enable.schematization": true,
"snowflake.streaming.max.client.lag": "1"
"snowflake.streaming.max.client.lag": "1",
"snowflake.streaming.enable.single.buffer": true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"errors.deadletterqueue.topic.replication.factor": 1,
"snowflake.streaming.iceberg.enabled": true,
"snowflake.enable.schematization": true,
"snowflake.streaming.max.client.lag": "1"
"snowflake.streaming.max.client.lag": "1",
"snowflake.streaming.enable.single.buffer": true
}
}
57 changes: 54 additions & 3 deletions test/test_suit/base_iceberg_test.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from test_suit.base_e2e import BaseE2eTest
from test_suit.assertions import *
from test_suit.test_utils import RetryableError, NonRetryableError
import json

class BaseIcebergTest(BaseE2eTest):

def __init__(self, driver, name_salt: str, config_file_name: str):
self.driver = driver
self.fileName = "iceberg_json_aws"
self.file_name = config_file_name
self.topic = config_file_name + name_salt

self.test_message_from_docs = {
Expand Down Expand Up @@ -88,7 +90,31 @@ def clean(self):
self.driver.drop_iceberg_table(self.topic)


def verify_iceberg_content(self, content: dict):
def _send_json_values(self, msg: dict, number_of_messages: int):
json_msg = json.dumps(msg)

key = [json.dumps({"number": str(e)}).encode("utf-8") for e in range(number_of_messages)]
value = [json_msg.encode("utf-8") for _ in range(number_of_messages)]

self.driver.sendBytesData(
topic=self.topic,
value=value,
key=key,
partition=0,
headers=self.test_headers,
)

def _assert_number_of_records_in_table(self, expected_number_of_records: int):
number_of_records = self.driver.select_number_of_records(self.topic)
if number_of_records == 0:
raise RetryableError()
elif number_of_records != expected_number_of_records:
raise NonRetryableError(
"Number of record in table is different from number of record sent"
)


def _verify_iceberg_content_from_docs(self, content: dict):
assert_equals(1, content['id'])
assert_equals_with_precision(36.6, content['body_temperature'])
assert_equals('Steve', content['name'])
Expand All @@ -102,11 +128,36 @@ def verify_iceberg_content(self, content: dict):
assert_equals(False, content['animals_possessed']['cats'])


def verify_iceberg_metadata(self, metadata: dict):
def _verify_iceberg_metadata(self, metadata: dict):
assert_equals(0, metadata['offset'])
assert_equals(0, metadata['partition'])
assert_starts_with('iceberg_', metadata['topic'])
assert_not_null(metadata['SnowflakeConnectorPushTime'])

assert_dict_contains('header1', 'value1', metadata['headers'])


def _select_schematized_record_with_offset(self, offset: int) -> dict:
record = (
self.driver.snowflake_conn.cursor()
.execute("select id, body_temperature, name, approved_coffee_types, animals_possessed, null_long, null_array, null_object, empty_array, some_object from {} limit 1 offset {}".format(self.topic, offset))
.fetchone()
)

return {
"id": record[0],
"body_temperature": record[1],
"name": record[2],
"approved_coffee_types": self.__none_or_json_loads(record[3]),
"animals_possessed": self.__none_or_json_loads(record[4]),
"null_long": record[5],
"null_array": self.__none_or_json_loads(record[6]),
"null_object": self.__none_or_json_loads(record[7]),
"empty_array": self.__none_or_json_loads(record[8]),
"some_object": self.__none_or_json_loads(record[9])
}


def __none_or_json_loads(self, value: str) -> dict:
return None if value is None else json.loads(value)

4 changes: 2 additions & 2 deletions test/test_suit/iceberg_avro_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ def verify(self, round):
.fetchone()
)

self.verify_iceberg_content(json.loads(first_record[0]))
self.verify_iceberg_metadata(json.loads(first_record[1]))
self._verify_iceberg_content_from_docs(json.loads(first_record[0]))
self._verify_iceberg_metadata(json.loads(first_record[1]))
28 changes: 4 additions & 24 deletions test/test_suit/iceberg_json_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,17 @@ def setup(self):


def send(self):
msg = json.dumps(self.test_message)

key = []
value = []
for e in range(100):
key.append(json.dumps({"number": str(e)}).encode("utf-8"))
value.append(msg.encode("utf-8"))

self.driver.sendBytesData(
topic=self.topic,
value=value,
key=key,
partition=0,
headers=self.test_headers,
)
self._send_json_values(self.test_message_from_docs, 100)


def verify(self, round):
number_of_records = self.driver.select_number_of_records(self.topic)
if number_of_records == 0:
raise RetryableError()
elif number_of_records != 100:
raise NonRetryableError(
"Number of record in table is different from number of record sent"
)
self._assert_number_of_records_in_table(100)

first_record = (
self.driver.snowflake_conn.cursor()
.execute("Select * from {} limit 1".format(self.topic))
.fetchone()
)

self.verify_iceberg_content(json.loads(first_record[0]))
self.verify_iceberg_metadata(json.loads(first_record[1]))
self._verify_iceberg_content_from_docs(json.loads(first_record[0]))
self._verify_iceberg_metadata(json.loads(first_record[1]))
1 change: 1 addition & 0 deletions test/test_suit/iceberg_schema_evolution_avro_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ def setup(self):
table_name=self.topic,
external_volume="kafka_push_e2e_volume_aws", # volume created manually
)
self.driver.enable_schema_evolution_for_iceberg(self.topic)


def send(self):
Expand Down
16 changes: 14 additions & 2 deletions test/test_suit/iceberg_schema_evolution_json_aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,24 @@ def setup(self):
table_name=self.topic,
external_volume="kafka_push_e2e_volume_aws", # volume created manually
)
self.driver.enable_schema_evolution_for_iceberg(self.topic)


def send(self):
pass
self._send_json_values(self.test_message_from_docs, 100)
self._send_json_values(self.test_message_for_schema_evolution_1, 100)
# TODO SNOW-1731264
# net.snowflake.ingest.utils.SFException: The given row cannot be converted to the internal format: Object of type java.util.LinkedHashMap cannot be ingested into Snowflake column NULL_OBJECT of type STRING, rowIndex:0. Allowed Java types: String, Number, boolean, char
# self._send_json_values(self.test_message_for_schema_evolution_2, 100)


def verify(self, round):
pass
self._assert_number_of_records_in_table(200)

actual_record_from_docs_dict = self._select_schematized_record_with_offset(1)
actual_record_for_schema_evolution_1 = self._select_schematized_record_with_offset(100)
# TODO SNOW-1731264
# actual_record_for_schema_evolution_2 = self._select_schematized_record_with_offset(200)

print(actual_record_from_docs_dict)
self._verify_iceberg_content_from_docs(actual_record_from_docs_dict)
5 changes: 4 additions & 1 deletion test/test_verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,13 @@ def cleanTableStagePipe(self, connectorName, topicName="", partitionNumber=1):

print(datetime.now().strftime("%H:%M:%S "), "=== Done ===", flush=True)

def enable_schema_evolution_for_iceberg(self, table: str):
self.snowflake_conn.cursor().execute("alter iceberg table {} set ENABLE_SCHEMA_EVOLUTION = true".format(table))

def create_empty_iceberg_table(self, table_name: str, external_volume: str):
sql = """
CREATE ICEBERG TABLE IF NOT EXISTS {} (
record_content OBJECT()
record_metadata OBJECT()
)
EXTERNAL_VOLUME = '{}'
CATALOG = 'SNOWFLAKE'
Expand Down
Loading