Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datalake: add redpanda.iceberg.invalid.record.action topic property #24898

Draft
wants to merge 3 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/v/cloud_storage/tests/topic_manifest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ SEASTAR_THREAD_TEST_CASE(test_topic_manifest_serde_feature_table) {
false,
tristate<std::chrono::milliseconds>{},
std::nullopt,
std::nullopt,
};

auto random_initial_revision_id
Expand Down
10 changes: 7 additions & 3 deletions src/v/cluster/topic_properties.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
"remote_label: {}, iceberg_mode: {}, "
"leaders_preference: {}, "
"delete_retention_ms: {}, "
"iceberg_delete: {}",
"iceberg_delete: {}"
"iceberg_invalid_record_action: {}",
properties.compression,
properties.cleanup_policy_bitflags,
properties.compaction_strategy,
Expand Down Expand Up @@ -79,7 +80,8 @@ std::ostream& operator<<(std::ostream& o, const topic_properties& properties) {
properties.iceberg_mode,
properties.leaders_preference,
properties.delete_retention_ms,
properties.iceberg_delete);
properties.iceberg_delete,
properties.iceberg_invalid_record_action);

if (config::shard_local_cfg().development_enable_cloud_topics()) {
fmt::print(
Expand Down Expand Up @@ -125,7 +127,8 @@ bool topic_properties::has_overrides() const {
|| flush_bytes.has_value() || remote_label.has_value()
|| (iceberg_mode != storage::ntp_config::default_iceberg_mode)
|| leaders_preference.has_value() || delete_retention_ms.is_engaged()
|| iceberg_delete.has_value();
|| iceberg_delete.has_value()
|| iceberg_invalid_record_action.has_value();

if (config::shard_local_cfg().development_enable_cloud_topics()) {
return overrides
Expand Down Expand Up @@ -261,6 +264,7 @@ adl<cluster::topic_properties>::from(iobuf_parser& parser) {
false,
tristate<std::chrono::milliseconds>{disable_tristate},
std::nullopt,
std::nullopt,
};
}

Expand Down
15 changes: 11 additions & 4 deletions src/v/cluster/topic_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace cluster {
*/
struct topic_properties
: serde::
envelope<topic_properties, serde::version<10>, serde::compat_version<0>> {
envelope<topic_properties, serde::version<11>, serde::compat_version<0>> {
topic_properties() noexcept = default;
topic_properties(
std::optional<model::compression> compression,
Expand Down Expand Up @@ -76,7 +76,9 @@ struct topic_properties
std::optional<config::leaders_preference> leaders_preference,
bool cloud_topic_enabled,
tristate<std::chrono::milliseconds> delete_retention_ms,
std::optional<bool> iceberg_delete)
std::optional<bool> iceberg_delete,
std::optional<model::iceberg_invalid_record_action>
iceberg_invalid_record_action)
: compression(compression)
, cleanup_policy_bitflags(cleanup_policy_bitflags)
, compaction_strategy(compaction_strategy)
Expand Down Expand Up @@ -118,7 +120,8 @@ struct topic_properties
, leaders_preference(std::move(leaders_preference))
, cloud_topic_enabled(cloud_topic_enabled)
, delete_retention_ms(delete_retention_ms)
, iceberg_delete(iceberg_delete) {}
, iceberg_delete(iceberg_delete)
, iceberg_invalid_record_action(iceberg_invalid_record_action) {}

std::optional<model::compression> compression;
std::optional<model::cleanup_policy_bitflags> cleanup_policy_bitflags;
Expand Down Expand Up @@ -195,6 +198,9 @@ struct topic_properties
// Should we delete the corresponding iceberg table when deleting the topic.
std::optional<bool> iceberg_delete;

std::optional<model::iceberg_invalid_record_action>
iceberg_invalid_record_action;

bool is_compacted() const;
bool has_overrides() const;
bool requires_remote_erase() const;
Expand Down Expand Up @@ -241,7 +247,8 @@ struct topic_properties
leaders_preference,
cloud_topic_enabled,
delete_retention_ms,
iceberg_delete);
iceberg_delete,
iceberg_invalid_record_action);
}

friend bool operator==(const topic_properties&, const topic_properties&)
Expand Down
3 changes: 3 additions & 0 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,9 @@ topic_properties topic_table::update_topic_properties(
updated_properties.delete_retention_ms, overrides.delete_retention_ms);
incremental_update(
updated_properties.iceberg_delete, overrides.iceberg_delete);
incremental_update(
updated_properties.iceberg_invalid_record_action,
overrides.iceberg_invalid_record_action);
return updated_properties;
}

Expand Down
6 changes: 4 additions & 2 deletions src/v/cluster/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) {
"initial_retention_local_target_bytes: {}, "
"initial_retention_local_target_ms: {}, write_caching: {}, flush_ms: {}, "
"flush_bytes: {}, iceberg_enabled: {}, leaders_preference: {}, "
"remote_read: {}, remote_write: {}, iceberg_delete: {}",
"remote_read: {}, remote_write: {}, iceberg_delete: {}, "
"iceberg_invalid_record_action: {}",
i.compression,
i.cleanup_policy_bitflags,
i.compaction_strategy,
Expand Down Expand Up @@ -417,7 +418,8 @@ std::ostream& operator<<(std::ostream& o, const incremental_topic_updates& i) {
i.leaders_preference,
i.remote_read,
i.remote_write,
i.iceberg_delete);
i.iceberg_delete,
i.iceberg_invalid_record_action);
return o;
}

Expand Down
7 changes: 5 additions & 2 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ struct property_update<tristate<T>>
struct incremental_topic_updates
: serde::envelope<
incremental_topic_updates,
serde::version<7>,
serde::version<8>,
serde::compat_version<0>> {
static constexpr int8_t version_with_data_policy = -1;
static constexpr int8_t version_with_shadow_indexing = -3;
Expand Down Expand Up @@ -641,6 +641,8 @@ struct incremental_topic_updates
leaders_preference;
property_update<tristate<std::chrono::milliseconds>> delete_retention_ms;
property_update<std::optional<bool>> iceberg_delete;
property_update<std::optional<model::iceberg_invalid_record_action>>
iceberg_invalid_record_action;

// To allow us to better control use of the deprecated shadow_indexing
// field, use getters and setters instead.
Expand Down Expand Up @@ -680,7 +682,8 @@ struct incremental_topic_updates
remote_read,
remote_write,
delete_retention_ms,
iceberg_delete);
iceberg_delete,
iceberg_invalid_record_action);
}

friend std::ostream&
Expand Down
8 changes: 7 additions & 1 deletion src/v/compat/cluster_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "cluster/errc.h"
#include "cluster/types.h"
#include "compat/model_generator.h"
#include "model/metadata.h"
#include "model/tests/randoms.h"
#include "random/generators.h"
#include "test_utils/randoms.h"
Expand Down Expand Up @@ -655,7 +656,12 @@ struct instance_generator<cluster::topic_properties> {
std::nullopt,
false,
tristate<std::chrono::milliseconds>{disable_tristate},
std::nullopt};
std::nullopt,
tests::random_optional([] {
return random_generators::random_choice(
{model::iceberg_invalid_record_action::drop,
model::iceberg_invalid_record_action::dlq_table});
})};
}

static std::vector<cluster::topic_properties> limits() { return {}; }
Expand Down
11 changes: 11 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3793,6 +3793,17 @@ configuration::configuration()
"the topic.",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
true)
, iceberg_invalid_record_action(
*this,
"iceberg_invalid_record_action",
"Default value for the redpanda.iceberg.invalid.record.action topic "
"property.",
{.needs_restart = needs_restart::no, .visibility = visibility::user},
model::iceberg_invalid_record_action::dlq_table,
{
model::iceberg_invalid_record_action::drop,
model::iceberg_invalid_record_action::dlq_table,
})
, development_enable_cloud_topics(
*this,
"development_enable_cloud_topics",
Expand Down
2 changes: 2 additions & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,8 @@ struct configuration final : public config_store {
property<std::optional<ss::sstring>> iceberg_rest_catalog_prefix;

property<bool> iceberg_delete;
enum_property<model::iceberg_invalid_record_action>
iceberg_invalid_record_action;

configuration();

Expand Down
13 changes: 13 additions & 0 deletions src/v/config/convert.h
Original file line number Diff line number Diff line change
Expand Up @@ -674,4 +674,17 @@ struct convert<config::datalake_catalog_type> {
}
};

template<>
struct convert<model::iceberg_invalid_record_action> {
using type = model::iceberg_invalid_record_action;

static Node encode(const type& rhs) { return Node(fmt::format("{}", rhs)); }

static bool decode(const Node& node, type& rhs) {
auto value = node.as<std::string>();
rhs = boost::lexical_cast<type>(value);
return true;
}
};

} // namespace YAML
5 changes: 5 additions & 0 deletions src/v/config/property.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "features/enterprise_feature_messages.h"
#include "json/stringbuffer.h"
#include "json/writer.h"
#include "model/metadata.h"
#include "pandaproxy/schema_registry/schema_id_validation.h"
#include "utils/to_string.h"

Expand Down Expand Up @@ -686,6 +687,10 @@ consteval std::string_view property_type_name() {
return "leaders_preference";
} else if constexpr (std::is_same_v<type, config::datalake_catalog_type>) {
return "string";
} else if constexpr (std::is_same_v<
type,
model::iceberg_invalid_record_action>) {
return "string";
} else {
static_assert(
base::unsupported_type<T>::value, "Type name not defined");
Expand Down
10 changes: 8 additions & 2 deletions src/v/kafka/server/handlers/alter_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ create_topic_properties_update(
std::apply(apply_op(op_t::none), update.custom_properties.serde_fields());

static_assert(
std::tuple_size_v<decltype(update.properties.serde_fields())> == 32,
std::tuple_size_v<decltype(update.properties.serde_fields())> == 33,
"If you added a property, please decide on it's default alter config "
"policy, and handle the update in the loop below");
static_assert(
Expand Down Expand Up @@ -370,7 +370,13 @@ create_topic_properties_update(
kafka::config_resource_operation::set);
continue;
}

if (cfg.name == topic_property_iceberg_invalid_record_action) {
parse_and_set_optional(
update.properties.iceberg_invalid_record_action,
cfg.value,
kafka::config_resource_operation::set);
continue;
}
} catch (const validation_error& e) {
return make_error_alter_config_resource_response<
alter_configs_resource_response>(
Expand Down
18 changes: 17 additions & 1 deletion src/v/kafka/server/handlers/configs/config_response_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@

#include "cluster/metadata_cache.h"
#include "cluster/types.h"
#include "config/configuration.h"
#include "config/node_config.h"
#include "kafka/server/handlers/topics/types.h"
#include "model/metadata.h"

#include <charconv>
#include <chrono>
Expand Down Expand Up @@ -103,7 +105,8 @@ consteval describe_configs_type property_config_type() {
std::is_same_v<T, pandaproxy::schema_registry::subject_name_strategy> ||
std::is_same_v<T, model::vcluster_id> ||
std::is_same_v<T, model::write_caching_mode> ||
std::is_same_v<T, config::leaders_preference> || std::is_same_v<T, model::iceberg_mode>;
std::is_same_v<T, config::leaders_preference> || std::is_same_v<T, model::iceberg_mode> ||
std::is_same_v<T, model::iceberg_invalid_record_action>;

constexpr auto is_long_type = is_long<T> ||
// Long type since seconds is atleast a 35-bit signed integral
Expand Down Expand Up @@ -992,6 +995,19 @@ config_response_container_t make_topic_configs(
"topic."),
&describe_as_string<bool>);

add_topic_config_if_requested(
config_keys,
result,
config::shard_local_cfg().iceberg_invalid_record_action.name(),
config::shard_local_cfg().iceberg_invalid_record_action(),
topic_property_iceberg_invalid_record_action,
topic_properties.iceberg_invalid_record_action,
include_synonyms,
maybe_make_documentation(
include_documentation,
"Action to take when an invalid record is encountered."),
&describe_as_string<model::iceberg_invalid_record_action>);

return result;
}

Expand Down
4 changes: 3 additions & 1 deletion src/v/kafka/server/handlers/create_topics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ bool is_supported(std::string_view name) {
topic_property_iceberg_mode,
topic_property_leaders_preference,
topic_property_delete_retention_ms,
topic_property_iceberg_delete});
topic_property_iceberg_delete,
topic_property_iceberg_invalid_record_action});

if (std::any_of(
supported_configs.begin(),
Expand Down Expand Up @@ -118,6 +119,7 @@ using validators = make_validator_types<
vcluster_id_validator,
write_caching_configs_validator,
iceberg_config_validator,
iceberg_invalid_record_action_validator,
cloud_topic_config_validator,
delete_retention_ms_validator>;

Expand Down
7 changes: 7 additions & 0 deletions src/v/kafka/server/handlers/incremental_alter_configs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,13 @@ create_topic_properties_update(
update.properties.iceberg_delete, cfg.value, op);
continue;
}
if (cfg.name == topic_property_iceberg_invalid_record_action) {
parse_and_set_optional(
update.properties.iceberg_invalid_record_action,
cfg.value,
op);
continue;
}

} catch (const validation_error& e) {
vlog(
Expand Down
5 changes: 5 additions & 0 deletions src/v/kafka/server/handlers/topics/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "kafka/server/handlers/configs/config_utils.h"
#include "model/compression.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "model/namespace.h"
#include "model/timestamp.h"
#include "pandaproxy/schema_registry/subject_name_strategy.h"
Expand Down Expand Up @@ -272,6 +273,10 @@ to_cluster_type(const creatable_topic& t) {
cfg.properties.iceberg_delete = get_bool_value(
config_entries, topic_property_iceberg_delete);

cfg.properties.iceberg_invalid_record_action
= get_enum_value<model::iceberg_invalid_record_action>(
config_entries, topic_property_iceberg_invalid_record_action);

schema_id_validation_config_parser schema_id_validation_config_parser{
cfg.properties};

Expand Down
3 changes: 3 additions & 0 deletions src/v/kafka/server/handlers/topics/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ inline constexpr std::string_view topic_property_cloud_topic_enabled
inline constexpr std::string_view topic_property_iceberg_delete
= "redpanda.iceberg.delete";

inline constexpr std::string_view topic_property_iceberg_invalid_record_action
= "redpanda.iceberg.invalid.record.action";

// Kafka topic properties that is not relevant for Redpanda
// Or cannot be altered with kafka alter handler
inline constexpr std::array<std::string_view, 20> allowlist_topic_noop_confs = {
Expand Down
26 changes: 26 additions & 0 deletions src/v/kafka/server/handlers/topics/validators.h
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,32 @@ struct iceberg_config_validator {
}
};

struct iceberg_invalid_record_action_validator {
static constexpr const char* error_message = "Invalid property value.";

static constexpr error_code ec = error_code::invalid_config;

static bool is_valid(const creatable_topic& c) {
auto it = std::find_if(
c.configs.begin(),
c.configs.end(),
[](const createable_topic_config& cfg) {
return cfg.name == topic_property_iceberg_invalid_record_action;
});
if (it == c.configs.end() || !it->value.has_value()) {
return true;
}
try {
std::ignore
= boost::lexical_cast<model::iceberg_invalid_record_action>(
it->value.value());
} catch (const boost::bad_lexical_cast&) {
return false;
}
return true;
}
};

/*
* it's an error to set the cloud topic property if cloud topics development
* feature hasn't been enabled.
Expand Down
Loading
Loading