diff --git a/src/v/cloud_storage/tests/topic_manifest_test.cc b/src/v/cloud_storage/tests/topic_manifest_test.cc index 515a1c6a1afbf..746b08fbc177b 100644 --- a/src/v/cloud_storage/tests/topic_manifest_test.cc +++ b/src/v/cloud_storage/tests/topic_manifest_test.cc @@ -484,6 +484,7 @@ SEASTAR_THREAD_TEST_CASE(test_topic_manifest_serde_feature_table) { false, tristate{}, std::nullopt, + std::nullopt, }; auto random_initial_revision_id diff --git a/src/v/cluster/topic_properties.cc b/src/v/cluster/topic_properties.cc index c6f7e67607531..bd90bdca2384d 100644 --- a/src/v/cluster/topic_properties.cc +++ b/src/v/cluster/topic_properties.cc @@ -42,7 +42,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) { "remote_label: {}, iceberg_mode: {}, " "leaders_preference: {}, " "delete_retention_ms: {}, " - "iceberg_delete: {}", + "iceberg_delete: {}, " + "iceberg_invalid_record_action: {}", properties.compression, properties.cleanup_policy_bitflags, properties.compaction_strategy, @@ -79,7 +80,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) { properties.iceberg_mode, properties.leaders_preference, properties.delete_retention_ms, - properties.iceberg_delete); + properties.iceberg_delete, + properties.iceberg_invalid_record_action); if (config::shard_local_cfg().development_enable_cloud_topics()) { fmt::print( @@ -125,7 +127,8 @@ bool topic_properties::has_overrides() const { || flush_bytes.has_value() || remote_label.has_value() || (iceberg_mode != storage::ntp_config::default_iceberg_mode) || leaders_preference.has_value() || delete_retention_ms.is_engaged() - || iceberg_delete.has_value(); + || iceberg_delete.has_value() + || iceberg_invalid_record_action.has_value(); if (config::shard_local_cfg().development_enable_cloud_topics()) { return overrides @@ -261,6 +264,7 @@ adl::from(iobuf_parser& parser) { false, tristate{disable_tristate}, std::nullopt, + std::nullopt, }; } diff --git a/src/v/cluster/topic_properties.h b/src/v/cluster/topic_properties.h index 93797c69ad494..0e77df244c58b 100644 --- a/src/v/cluster/topic_properties.h +++ b/src/v/cluster/topic_properties.h @@ -33,7 +33,7 @@ namespace cluster { */ struct topic_properties : serde:: - envelope, serde::compat_version<0>> { + envelope, serde::compat_version<0>> { topic_properties() noexcept = default; topic_properties( std::optional compression, @@ -76,7 +76,9 @@ struct topic_properties std::optional leaders_preference, bool cloud_topic_enabled, tristate delete_retention_ms, - std::optional iceberg_delete) + std::optional iceberg_delete, + std::optional + iceberg_invalid_record_action) : compression(compression) , cleanup_policy_bitflags(cleanup_policy_bitflags) , compaction_strategy(compaction_strategy) @@ -118,7 +120,8 @@ struct topic_properties , leaders_preference(std::move(leaders_preference)) , cloud_topic_enabled(cloud_topic_enabled) , delete_retention_ms(delete_retention_ms) - , iceberg_delete(iceberg_delete) {} + , iceberg_delete(iceberg_delete) + , iceberg_invalid_record_action(iceberg_invalid_record_action) {} std::optional compression; std::optional cleanup_policy_bitflags; @@ -195,6 +198,9 @@ struct topic_properties // Should we delete the corresponding iceberg table when deleting the topic. std::optional iceberg_delete; + std::optional + iceberg_invalid_record_action; + bool is_compacted() const; bool has_overrides() const; bool requires_remote_erase() const; @@ -241,7 +247,8 @@ struct topic_properties leaders_preference, cloud_topic_enabled, delete_retention_ms, - iceberg_delete); + iceberg_delete, + iceberg_invalid_record_action); } friend bool operator==(const topic_properties&, const topic_properties&) diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index 462bfba208f61..8e74c949c1d53 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -1101,6 +1101,9 @@ topic_properties topic_table::update_topic_properties( updated_properties.delete_retention_ms, overrides.delete_retention_ms); incremental_update( updated_properties.iceberg_delete, overrides.iceberg_delete); + incremental_update( + updated_properties.iceberg_invalid_record_action, + overrides.iceberg_invalid_record_action); return updated_properties; } diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index f99c0df161268..6d2429cc34d2b 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -386,7 +386,8 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) { "initial_retention_local_target_bytes: {}, " "initial_retention_local_target_ms: {}, write_caching: {}, flush_ms: {}, " "flush_bytes: {}, iceberg_enabled: {}, leaders_preference: {}, " - "remote_read: {}, remote_write: {}, iceberg_delete: {}", + "remote_read: {}, remote_write: {}, iceberg_delete: {}, " + "iceberg_invalid_record_action: {}", i.compression, i.cleanup_policy_bitflags, i.compaction_strategy, @@ -417,7 +418,8 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) { i.leaders_preference, i.remote_read, i.remote_write, - i.iceberg_delete); + i.iceberg_delete, + i.iceberg_invalid_record_action); return o; } diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index ef851afedc352..0dc3a556abbc1 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -568,7 +568,7 @@ struct property_update> struct incremental_topic_updates : serde::envelope< incremental_topic_updates, - serde::version<7>, + serde::version<8>, serde::compat_version<0>> { static constexpr int8_t version_with_data_policy = -1; static constexpr int8_t version_with_shadow_indexing = -3; @@ -641,6 +641,8 @@ struct incremental_topic_updates leaders_preference; property_update> delete_retention_ms; property_update> iceberg_delete; + property_update> + iceberg_invalid_record_action; // To allow us to better control use of the deprecated shadow_indexing // field, use getters and setters instead. @@ -680,7 +682,8 @@ struct incremental_topic_updates remote_read, remote_write, delete_retention_ms, - iceberg_delete); + iceberg_delete, + iceberg_invalid_record_action); } friend std::ostream& diff --git a/src/v/compat/cluster_compat.h b/src/v/compat/cluster_compat.h index 648a2d261ef85..af1732d57eb01 100644 --- a/src/v/compat/cluster_compat.h +++ b/src/v/compat/cluster_compat.h @@ -358,6 +358,10 @@ struct compat_check { json_write(flush_ms); json_write(flush_bytes); json_write(remote_topic_namespace_override); + json::write_exceptional_member_type( + wr, + "iceberg_invalid_record_action", + obj.iceberg_invalid_record_action); } static cluster::topic_properties from_json(json::Value& rd) { @@ -394,6 +398,7 @@ struct compat_check { json_read(flush_ms); json_read(flush_bytes); json_read(remote_topic_namespace_override); + json_read(iceberg_invalid_record_action); return obj; } @@ -425,6 +430,7 @@ struct compat_check { obj.flush_bytes = std::nullopt; obj.flush_ms = std::nullopt; obj.remote_topic_namespace_override = std::nullopt; + obj.iceberg_invalid_record_action = std::nullopt; if (reply != obj) { throw compat_error(fmt::format( @@ -510,6 +516,8 @@ struct compat_check { obj.properties.mpx_virtual_cluster_id = std::nullopt; + obj.properties.iceberg_invalid_record_action = std::nullopt; + // ADL will always squash is_migrated to false obj.is_migrated = false; diff --git a/src/v/compat/cluster_generator.h b/src/v/compat/cluster_generator.h index 08b598bf489e4..9ea11e22ccb16 100644 --- a/src/v/compat/cluster_generator.h +++ b/src/v/compat/cluster_generator.h @@ -13,6 +13,7 @@ #include "cluster/errc.h" #include "cluster/types.h" #include "compat/model_generator.h" +#include "model/metadata.h" #include "model/tests/randoms.h" #include "random/generators.h" #include "test_utils/randoms.h" @@ -655,7 +656,12 @@ struct instance_generator { std::nullopt, false, tristate{disable_tristate}, - std::nullopt}; + std::nullopt, + tests::random_optional([] { + return random_generators::random_choice( + {model::iceberg_invalid_record_action::drop, + model::iceberg_invalid_record_action::dlq_table}); + })}; } static std::vector limits() { return {}; } diff --git a/src/v/compat/cluster_json.h b/src/v/compat/cluster_json.h index 2044c41e3dfc1..c00dcdf0c46a6 100644 --- a/src/v/compat/cluster_json.h +++ b/src/v/compat/cluster_json.h @@ -628,6 +628,8 @@ inline void rjson_serialize( write_member(w, "flush_ms", tps.flush_ms); write_member(w, "iceberg_mode", tps.iceberg_mode); write_member(w, "delete_retention_ms", tps.delete_retention_ms); + write_exceptional_member_type( + w, "iceberg_invalid_record_action", tps.iceberg_invalid_record_action); w.EndObject(); } @@ -700,6 +702,8 @@ inline void read_value(const json::Value& rd, cluster::topic_properties& obj) { read_member(rd, "flush_ms", obj.flush_ms); read_member(rd, "iceberg_mode", obj.iceberg_mode); read_member(rd, "delete_retention_ms", obj.delete_retention_ms); + read_member( + rd, "iceberg_invalid_record_action", obj.iceberg_invalid_record_action); } inline void rjson_serialize( diff --git a/src/v/compat/model_json.h b/src/v/compat/model_json.h index 114cc961ae61a..c6274c36d84b1 100644 --- a/src/v/compat/model_json.h +++ b/src/v/compat/model_json.h @@ -11,6 +11,7 @@ #pragma once #include "compat/json.h" +#include "model/metadata.h" #include "model/record.h" namespace json { @@ -293,6 +294,14 @@ inline void rjson_serialize_exceptional_type( rjson_serialize(w, static_cast(m)); } +inline void rjson_serialize_exceptional_type( + json::Writer& w, + const model::iceberg_invalid_record_action& m) { + using underlying_t + = std::underlying_type_t; + rjson_serialize(w, static_cast(m)); +} + inline void rjson_serialize( json::Writer& wr, const model::record_batch_attributes& b) { diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index b9a1ab1809155..d7fe2721be980 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -3793,6 +3793,17 @@ configuration::configuration() "the topic.", {.needs_restart = needs_restart::no, .visibility = visibility::user}, true) + , iceberg_invalid_record_action( + *this, + "iceberg_invalid_record_action", + "Default value for the redpanda.iceberg.invalid.record.action topic " + "property.", + {.needs_restart = needs_restart::no, .visibility = visibility::user}, + model::iceberg_invalid_record_action::dlq_table, + { + model::iceberg_invalid_record_action::drop, + model::iceberg_invalid_record_action::dlq_table, + }) , development_enable_cloud_topics( *this, "development_enable_cloud_topics", diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index cea4325929f85..1a5bc598f9b0c 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -719,6 +719,8 @@ struct configuration final : public config_store { property> iceberg_rest_catalog_prefix; property iceberg_delete; + enum_property + iceberg_invalid_record_action; configuration(); diff --git a/src/v/config/convert.h b/src/v/config/convert.h index 68590d1cf1c9d..7e14c905668e0 100644 --- a/src/v/config/convert.h +++ b/src/v/config/convert.h @@ -674,4 +674,17 @@ struct convert { } }; +template<> +struct convert { + using type = model::iceberg_invalid_record_action; + + static Node encode(const type& rhs) { return Node(fmt::format("{}", rhs)); } + + static bool decode(const Node& node, type& rhs) { + auto value = node.as(); + rhs = boost::lexical_cast(value); + return true; + } +}; + } // namespace YAML diff --git a/src/v/config/property.h b/src/v/config/property.h index a118e2c6da465..9d06c9e6486d6 100644 --- a/src/v/config/property.h +++ b/src/v/config/property.h @@ -18,6 +18,7 @@ #include "features/enterprise_feature_messages.h" #include "json/stringbuffer.h" #include "json/writer.h" +#include "model/metadata.h" #include "pandaproxy/schema_registry/schema_id_validation.h" #include "utils/to_string.h" @@ -686,6 +687,10 @@ consteval std::string_view property_type_name() { return "leaders_preference"; } else if constexpr (std::is_same_v) { return "string"; + } else if constexpr (std::is_same_v< + type, + model::iceberg_invalid_record_action>) { + return "string"; } else { static_assert( base::unsupported_type::value, "Type name not defined"); diff --git a/src/v/config/rjson_serialization.cc b/src/v/config/rjson_serialization.cc index ce9cb1bdead60..a3005e6395c30 100644 --- a/src/v/config/rjson_serialization.cc +++ b/src/v/config/rjson_serialization.cc @@ -11,6 +11,7 @@ #include "config/tls_config.h" #include "config/types.h" +#include "model/metadata.h" namespace json { @@ -265,4 +266,11 @@ void rjson_serialize( json::Writer& w, config::datalake_catalog_type ct) { stringize(w, ct); } + +void rjson_serialize( + json::Writer& w, + const model::iceberg_invalid_record_action& v) { + stringize(w, v); +} + } // namespace json diff --git a/src/v/config/rjson_serialization.h b/src/v/config/rjson_serialization.h index 2f7e249dc0737..43dd60670ab72 100644 --- a/src/v/config/rjson_serialization.h +++ b/src/v/config/rjson_serialization.h @@ -22,6 +22,7 @@ #include "json/json.h" #include "json/stringbuffer.h" #include "json/writer.h" +#include "model/metadata.h" #include "pandaproxy/schema_registry/schema_id_validation.h" #include @@ -137,4 +138,8 @@ void rjson_serialize( void rjson_serialize( json::Writer&, config::datalake_catalog_type); +void rjson_serialize( + json::Writer&, + const model::iceberg_invalid_record_action&); + } // namespace json diff --git a/src/v/config/tests/BUILD b/src/v/config/tests/BUILD index aaf5518ea3bf0..174d5cddb1619 100644 --- a/src/v/config/tests/BUILD +++ b/src/v/config/tests/BUILD @@ -236,3 +236,17 @@ redpanda_cc_gtest( "@googletest//:gtest", ], ) + +redpanda_cc_gtest( + name = "configuration_test", + timeout = "short", + srcs = [ + "configuration_test.cc", + ], + cpu = 1, + deps = [ + "//src/v/config", + "//src/v/test_utils:gtest", + "@googletest//:gtest", + ], +) diff --git a/src/v/config/tests/CMakeLists.txt b/src/v/config/tests/CMakeLists.txt index c906da93ebd92..f5c0781595638 100644 --- a/src/v/config/tests/CMakeLists.txt +++ b/src/v/config/tests/CMakeLists.txt @@ -32,3 +32,13 @@ rp_test( v::config v::gtest_main ) + +rp_test( + UNIT_TEST + GTEST + BINARY_NAME test_configuration_gtest + SOURCES configuration_test.cc + LIBRARIES + v::config + v::gtest_main +) diff --git a/src/v/config/tests/configuration_test.cc b/src/v/config/tests/configuration_test.cc new file mode 100644 index 0000000000000..cfd8f8bb428d1 --- /dev/null +++ b/src/v/config/tests/configuration_test.cc @@ -0,0 +1,39 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "config/base_property.h" +#include "config/configuration.h" + +#include + +ss::logger lg("config test"); // NOLINT + +namespace config { + +// Test that configuration can be round-tripped through YAML. This mirrors the +// config export/import workflow and enables quick iteration on the +// encoding/decoding/etc. +TEST(ConfigurationTest, Roundtrip) { + auto& cfg = config::shard_local_cfg(); + YAML::Node root_out = to_yaml(cfg, redact_secrets::no); + + lg.debug("Configuration as YAML: {}", root_out); + + try { + cfg.read_yaml(root_out); + YAML::Node root_in = to_yaml(cfg, redact_secrets::no); + + // Compare the two YAML strings. + EXPECT_EQ(fmt::format("{}", root_out), fmt::format("{}", root_in)); + } catch (const std::exception& e) { + FAIL() << e.what(); + } +} + +}; // namespace config diff --git a/src/v/kafka/server/handlers/alter_configs.cc b/src/v/kafka/server/handlers/alter_configs.cc index 3dddacada1401..556047f5d637d 100644 --- a/src/v/kafka/server/handlers/alter_configs.cc +++ b/src/v/kafka/server/handlers/alter_configs.cc @@ -83,7 +83,7 @@ create_topic_properties_update( std::apply(apply_op(op_t::none), update.custom_properties.serde_fields()); static_assert( - std::tuple_size_v == 32, + std::tuple_size_v == 33, "If you added a property, please decide on it's default alter config " "policy, and handle the update in the loop below"); static_assert( @@ -370,7 +370,13 @@ create_topic_properties_update( kafka::config_resource_operation::set); continue; } - + if (cfg.name == topic_property_iceberg_invalid_record_action) { + parse_and_set_optional( + update.properties.iceberg_invalid_record_action, + cfg.value, + kafka::config_resource_operation::set); + continue; + } } catch (const validation_error& e) { return make_error_alter_config_resource_response< alter_configs_resource_response>( diff --git a/src/v/kafka/server/handlers/configs/config_response_utils.cc b/src/v/kafka/server/handlers/configs/config_response_utils.cc index dadf54560768e..ad1980096538b 100644 --- a/src/v/kafka/server/handlers/configs/config_response_utils.cc +++ b/src/v/kafka/server/handlers/configs/config_response_utils.cc @@ -13,8 +13,10 @@ #include "cluster/metadata_cache.h" #include "cluster/types.h" +#include "config/configuration.h" #include "config/node_config.h" #include "kafka/server/handlers/topics/types.h" +#include "model/metadata.h" #include #include @@ -103,7 +105,8 @@ consteval describe_configs_type property_config_type() { std::is_same_v || std::is_same_v || std::is_same_v || - std::is_same_v || std::is_same_v; + std::is_same_v || std::is_same_v || + std::is_same_v; constexpr auto is_long_type = is_long || // Long type since seconds is atleast a 35-bit signed integral @@ -992,6 +995,19 @@ config_response_container_t make_topic_configs( "topic."), &describe_as_string); + add_topic_config_if_requested( + config_keys, + result, + config::shard_local_cfg().iceberg_invalid_record_action.name(), + config::shard_local_cfg().iceberg_invalid_record_action(), + topic_property_iceberg_invalid_record_action, + topic_properties.iceberg_invalid_record_action, + include_synonyms, + maybe_make_documentation( + include_documentation, + "Action to take when an invalid record is encountered."), + &describe_as_string); + return result; } diff --git a/src/v/kafka/server/handlers/create_topics.cc b/src/v/kafka/server/handlers/create_topics.cc index 56db192b3ad2c..969407d67beae 100644 --- a/src/v/kafka/server/handlers/create_topics.cc +++ b/src/v/kafka/server/handlers/create_topics.cc @@ -76,7 +76,8 @@ bool is_supported(std::string_view name) { topic_property_iceberg_mode, topic_property_leaders_preference, topic_property_delete_retention_ms, - topic_property_iceberg_delete}); + topic_property_iceberg_delete, + topic_property_iceberg_invalid_record_action}); if (std::any_of( supported_configs.begin(), @@ -118,6 +119,7 @@ using validators = make_validator_types< vcluster_id_validator, write_caching_configs_validator, iceberg_config_validator, + iceberg_invalid_record_action_validator, cloud_topic_config_validator, delete_retention_ms_validator>; diff --git a/src/v/kafka/server/handlers/incremental_alter_configs.cc b/src/v/kafka/server/handlers/incremental_alter_configs.cc index 30f866fe71e9a..0a52b7210440a 100644 --- a/src/v/kafka/server/handlers/incremental_alter_configs.cc +++ b/src/v/kafka/server/handlers/incremental_alter_configs.cc @@ -374,6 +374,13 @@ create_topic_properties_update( update.properties.iceberg_delete, cfg.value, op); continue; } + if (cfg.name == topic_property_iceberg_invalid_record_action) { + parse_and_set_optional( + update.properties.iceberg_invalid_record_action, + cfg.value, + op); + continue; + } } catch (const validation_error& e) { vlog( diff --git a/src/v/kafka/server/handlers/topics/types.cc b/src/v/kafka/server/handlers/topics/types.cc index 3e1d42941d011..ecea31bb0f102 100644 --- a/src/v/kafka/server/handlers/topics/types.cc +++ b/src/v/kafka/server/handlers/topics/types.cc @@ -17,6 +17,7 @@ #include "kafka/server/handlers/configs/config_utils.h" #include "model/compression.h" #include "model/fundamental.h" +#include "model/metadata.h" #include "model/namespace.h" #include "model/timestamp.h" #include "pandaproxy/schema_registry/subject_name_strategy.h" @@ -272,6 +273,10 @@ to_cluster_type(const creatable_topic& t) { cfg.properties.iceberg_delete = get_bool_value( config_entries, topic_property_iceberg_delete); + cfg.properties.iceberg_invalid_record_action + = get_enum_value( + config_entries, topic_property_iceberg_invalid_record_action); + schema_id_validation_config_parser schema_id_validation_config_parser{ cfg.properties}; diff --git a/src/v/kafka/server/handlers/topics/types.h b/src/v/kafka/server/handlers/topics/types.h index ab3304353791c..dd3cc06aa7c64 100644 --- a/src/v/kafka/server/handlers/topics/types.h +++ b/src/v/kafka/server/handlers/topics/types.h @@ -108,6 +108,9 @@ inline constexpr std::string_view topic_property_cloud_topic_enabled inline constexpr std::string_view topic_property_iceberg_delete = "redpanda.iceberg.delete"; +inline constexpr std::string_view topic_property_iceberg_invalid_record_action + = "redpanda.iceberg.invalid.record.action"; + // Kafka topic properties that is not relevant for Redpanda // Or cannot be altered with kafka alter handler inline constexpr std::array allowlist_topic_noop_confs = { diff --git a/src/v/kafka/server/handlers/topics/validators.h b/src/v/kafka/server/handlers/topics/validators.h index 79b26d4a14bb5..3797ee206dc7b 100644 --- a/src/v/kafka/server/handlers/topics/validators.h +++ b/src/v/kafka/server/handlers/topics/validators.h @@ -301,6 +301,32 @@ struct iceberg_config_validator { } }; +struct iceberg_invalid_record_action_validator { + static constexpr const char* error_message = "Invalid property value."; + + static constexpr error_code ec = error_code::invalid_config; + + static bool is_valid(const creatable_topic& c) { + auto it = std::find_if( + c.configs.begin(), + c.configs.end(), + [](const createable_topic_config& cfg) { + return cfg.name == topic_property_iceberg_invalid_record_action; + }); + if (it == c.configs.end() || !it->value.has_value()) { + return true; + } + try { + std::ignore + = boost::lexical_cast( + it->value.value()); + } catch (const boost::bad_lexical_cast&) { + return false; + } + return true; + } +}; + /* * it's an error to set the cloud topic property if cloud topics development * feature hasn't been enabled. diff --git a/src/v/kafka/server/tests/alter_config_test.cc b/src/v/kafka/server/tests/alter_config_test.cc index 0017e2a53b8df..16c10391e681d 100644 --- a/src/v/kafka/server/tests/alter_config_test.cc +++ b/src/v/kafka/server/tests/alter_config_test.cc @@ -750,6 +750,7 @@ FIXTURE_TEST( "redpanda.leaders.preference", "delete.retention.ms", "redpanda.iceberg.delete", + "redpanda.iceberg.invalid.record.action", }; // All properties_request diff --git a/src/v/model/metadata.h b/src/v/model/metadata.h index ef67f09b67f0f..0954da490658f 100644 --- a/src/v/model/metadata.h +++ b/src/v/model/metadata.h @@ -607,6 +607,17 @@ enum class iceberg_mode : uint8_t { std::ostream& operator<<(std::ostream&, const iceberg_mode&); std::istream& operator>>(std::istream&, iceberg_mode&); +// How to handle invalid records during Iceberg translation. +enum class iceberg_invalid_record_action : uint8_t { + // Drop invalid records. + drop = 0, + // Write invalid records to a dead letter queue table. + dlq_table = 1, +}; + +std::ostream& operator<<(std::ostream&, const iceberg_invalid_record_action&); +std::istream& operator>>(std::istream&, iceberg_invalid_record_action&); + } // namespace model template<> diff --git a/src/v/model/model.cc b/src/v/model/model.cc index 34ec7d03c0e2b..a32b2af4aa792 100644 --- a/src/v/model/model.cc +++ b/src/v/model/model.cc @@ -608,6 +608,30 @@ std::istream& operator>>(std::istream& is, iceberg_mode& mode) { return is; } +std::ostream& +operator<<(std::ostream& os, const iceberg_invalid_record_action& a) { + switch (a) { + case iceberg_invalid_record_action::drop: + return os << "drop"; + case iceberg_invalid_record_action::dlq_table: + return os << "dlq_table"; + } +} + +std::istream& operator>>(std::istream& is, iceberg_invalid_record_action& a) { + using enum iceberg_invalid_record_action; + ss::sstring s; + is >> s; + try { + a = string_switch(s) + .match("drop", drop) + .match("dlq_table", dlq_table); + } catch (const std::runtime_error&) { + is.setstate(std::ios::failbit); + } + return is; +} + std::ostream& operator<<(std::ostream& os, const fips_mode_flag& f) { return os << to_string_view(f); } diff --git a/tests/rptest/clients/types.py b/tests/rptest/clients/types.py index 63f880a3bd3b5..28df6143314a0 100644 --- a/tests/rptest/clients/types.py +++ b/tests/rptest/clients/types.py @@ -41,6 +41,7 @@ class TopicSpec: PROPERTY_FLUSH_BYTES = "flush.bytes" PROPERTY_ICEBERG_MODE = "redpanda.iceberg.mode" PROPERTY_DELETE_RETENTION_MS = "delete.retention.ms" + PROPERTY_ICEBERG_INVALID_RECORD_ACTION = "redpanda.iceberg.invalid.record.action" class CompressionTypes(str, Enum): """ diff --git a/tests/rptest/tests/cluster_config_test.py b/tests/rptest/tests/cluster_config_test.py index 16aba8a3c0f60..0dbc12f5c4781 100644 --- a/tests/rptest/tests/cluster_config_test.py +++ b/tests/rptest/tests/cluster_config_test.py @@ -708,6 +708,10 @@ def test_valid_settings(self): valid_value = random.choice( [e for e in p['enum_values'] if e != initial_value]) + if name == "iceberg_invalid_record_action": + valid_value = random.choice( + [e for e in p['enum_values'] if e != initial_value]) + updates[name] = valid_value patch_result = self.admin.patch_cluster_config(upsert=updates, diff --git a/tests/rptest/tests/datalake/datalake_dlq_test.py b/tests/rptest/tests/datalake/datalake_dlq_test.py index f0e41155a5cb3..c90b4d7dda46c 100644 --- a/tests/rptest/tests/datalake/datalake_dlq_test.py +++ b/tests/rptest/tests/datalake/datalake_dlq_test.py @@ -7,10 +7,12 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0 +from enum import Enum from typing import Optional from ducktape.mark import matrix +from rptest.clients.rpk import RpkException, RpkTool from rptest.clients.serde_client_utils import SchemaType, SerdeClientType from rptest.clients.types import TopicSpec from rptest.services.cluster import cluster @@ -24,6 +26,94 @@ from rptest.tests.datalake.query_engine_base import QueryEngineBase, QueryEngineType from rptest.tests.datalake.utils import supported_storage_types from rptest.tests.redpanda_test import RedpandaTest +from rptest.util import expect_exception + + +class IcebergInvalidRecordAction(str, Enum): + DROP = "drop" + DLQ_TABLE = "dlq_table" + + def __str__(self): + return self.value + + +class DatalakeDLQPropertiesTest(RedpandaTest): + def __init__(self, test_context): + super(DatalakeDLQPropertiesTest, + self).__init__(test_context=test_context, num_brokers=1) + + self.rpk = RpkTool(self.redpanda) + + def set_cluster_config(self, key: str, value): + self.rpk.cluster_config_set(key, value) + + def set_topic_properties(self, key: str, value): + self.rpk.alter_topic_config(self.topic_name, key, value) + + def validate_topic_configs(self, action: IcebergInvalidRecordAction): + configs = self.rpk.describe_topic_configs(self.topic_name) + assert configs[ + TopicSpec.PROPERTY_ICEBERG_INVALID_RECORD_ACTION][0] == str( + action + ), f"Expected {action} but got {configs[TopicSpec.PROPERTY_ICEBERG_INVALID_RECORD_ACTION]}" + + @cluster(num_nodes=1) + def test_properties(self): + action_conf = "iceberg_invalid_record_action" + + self.admin = self.redpanda._admin + + # Topic with custom properties at creation. + topic = TopicSpec() + self.topic_name = topic.name + self.rpk.create_topic( + topic=topic.name, + config={ + TopicSpec.PROPERTY_ICEBERG_INVALID_RECORD_ACTION: "drop", + }) + + self.validate_topic_configs(IcebergInvalidRecordAction.DROP) + + # New topic with defaults + topic = TopicSpec() + self.topic_name = topic.name + self.rpk.create_topic(topic=topic.name, partitions=1) + + # Validate cluster defaults + self.validate_topic_configs(IcebergInvalidRecordAction.DLQ_TABLE) + + # Changing cluster level configs + self.set_cluster_config(action_conf, IcebergInvalidRecordAction.DROP) + self.validate_topic_configs(IcebergInvalidRecordAction.DROP) + + # Change topic property + self.set_topic_properties( + TopicSpec.PROPERTY_ICEBERG_INVALID_RECORD_ACTION, + IcebergInvalidRecordAction.DLQ_TABLE) + self.validate_topic_configs(IcebergInvalidRecordAction.DLQ_TABLE) + + @cluster(num_nodes=1) + def test_create_bad_properties(self): + topic = TopicSpec() + + with expect_exception( + RpkException, lambda e: "Invalid property value." in e.msg and + "INVALID_CONFIG" in e.msg): + self.rpk.create_topic( + topic=topic.name, + config={ + TopicSpec.PROPERTY_ICEBERG_INVALID_RECORD_ACTION: "asd", + }) + + # Create the topic with default property and alter to invalid value + self.rpk.create_topic(topic=topic.name) + with expect_exception( + RpkException, lambda e: + "unable to parse property redpanda.iceberg.invalid.record.action value" + in e.msg and "INVALID_CONFIG" in e.msg): + self.rpk.alter_topic_config( + topic.name, TopicSpec.PROPERTY_ICEBERG_INVALID_RECORD_ACTION, + "asd") class DatalakeDLQTest(RedpandaTest): diff --git a/tests/rptest/tests/describe_topics_test.py b/tests/rptest/tests/describe_topics_test.py index 2ce2b80f2d7e1..66c8e85bd9824 100644 --- a/tests/rptest/tests/describe_topics_test.py +++ b/tests/rptest/tests/describe_topics_test.py @@ -301,6 +301,12 @@ def test_describe_topics_with_documentation_and_types(self): doc_string= "If true, delete the corresponding Iceberg table when deleting the topic." ), + "redpanda.iceberg.invalid.record.action": + ConfigProperty( + config_type="STRING", + value="dlq_table", + doc_string= + "Action to take when an invalid record is encountered."), } tp_spec = TopicSpec() @@ -339,11 +345,11 @@ def test_describe_topics_with_documentation_and_types(self): self.logger.debug( f"name: {name}, type: {config_type}, value: {value}, src: {source_type}" ) - assert name in properties + assert name in properties, f"{name} not in {properties.keys()}" prop = properties[name] - assert config_type == prop.config_type - assert value == prop.value - assert source_type == prop.source_type + assert config_type == prop.config_type, f"{config_type=} != {prop.config_type=}" + assert value == prop.value, f"{value=} != {prop.value=}" + assert source_type == prop.source_type, f"{source_type=} != {prop.source_type=}" # The first empty line is where the table ends and the doc section begins assert last_pos is not None, "Something went wrong with property match" diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 687f6832f83ab..e6cff067d6964 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -144,6 +144,11 @@ def read_topic_properties_serde(rdr: Reader, version): 'delete_retention_ms': rdr.read_tristate(Reader.read_int64), 'iceberg_delete': rdr.read_optional(Reader.read_bool), } + if version >= 11: + topic_properties |= { + 'iceberg_invalid_record_action': + rdr.read_optional(Reader.read_serde_enum), + } return topic_properties