diff --git a/src/v/datalake/coordinator/iceberg_file_committer.cc b/src/v/datalake/coordinator/iceberg_file_committer.cc index 7f8d8522d2c24..38becc77738b4 100644 --- a/src/v/datalake/coordinator/iceberg_file_committer.cc +++ b/src/v/datalake/coordinator/iceberg_file_committer.cc @@ -28,6 +28,7 @@ namespace datalake::coordinator { namespace { + file_committer::errc log_and_convert_catalog_errc(iceberg::catalog::errc e, std::string_view msg) { switch (e) { @@ -95,6 +96,261 @@ get_iceberg_committed_offset(const iceberg::table_metadata& table) { } return std::nullopt; } + +checked build_partition_key_struct( + const model::topic& topic, + const iceberg::table_metadata& table, + const data_file& f, + const iceberg::schema& schema) { + auto pspec_id = iceberg::partition_spec::id_t{f.partition_spec_id}; + auto partition_spec = table.get_partition_spec(pspec_id); + if (!partition_spec) { + vlog( + datalake_log.error, + "can't find partition spec {} in table for topic {}", + pspec_id, + topic); + return file_committer::errc::failed; + } + + if (partition_spec->fields.size() != f.partition_key.size()) { + vlog( + datalake_log.error, + "[topic: {}, file: {}] partition key size {} doesn't " + "match spec size {} (spec id: {})", + topic, + f.remote_path, + f.partition_key.size(), + partition_spec->fields.size(), + pspec_id); + return file_committer::errc::failed; + } + + auto key_type = iceberg::partition_key_type::create( + *partition_spec, schema); + + iceberg::struct_value pk; + + for (size_t i = 0; i < partition_spec->fields.size(); ++i) { + const auto& field_type = key_type.type.fields.at(i); + const auto& field_bytes = f.partition_key.at(i); + if (field_bytes) { + try { + pk.fields.push_back(iceberg::value_from_bytes( + field_type->type, field_bytes.value())); + } catch (const std::invalid_argument& e) { + vlog( + datalake_log.error, + "[topic: {}, file: {}] failed to parse " + "partition key field {} (type: {}): {}", + topic, + f.remote_path, + i, + field_type->type, + e); + return file_committer::errc::failed; + } + } else { + pk.fields.push_back(std::nullopt); + } + } + + return pk; +} + +checked build_partition_key_struct( + const model::topic& topic, + const iceberg::table_metadata& table, + const data_file& f) { + if (f.table_schema_id < 0) { + // File created by a legacy Redpanda version that only + // supported hourly partitioning, the partition key value is + // in the hour_deprecated field. + iceberg::struct_value pk; + pk.fields.emplace_back(iceberg::int_value(f.hour_deprecated)); + return pk; + } else { + auto schema = table.get_schema( + iceberg::schema::id_t{f.table_schema_id}); + if (!schema) { + vlog( + datalake_log.error, + "can't find schema {} in table for topic {}", + f.table_schema_id, + topic); + return file_committer::errc::failed; + } + + return build_partition_key_struct(topic, table, f, *schema); + } +} + +checked build_partition_key( + const model::topic& topic, + const iceberg::table_metadata& table, + const data_file& f) { + auto pk_res = build_partition_key_struct(topic, table, f); + if (pk_res.has_error()) { + return pk_res.error(); + } + return iceberg::partition_key{ + std::make_unique(std::move(pk_res.value()))}; +} + +/// A table_commit_builder accumulates files to commit to an Iceberg table and +/// manages the commit process. +/// +/// A file_committer will usually create one or more `table_commit_builder` +/// instances per topic, and will call process_pending_entry for each pending +/// entry in the topic's state. Once all pending entries have been processed, +/// the file_committer must call commit to commit the files to the Iceberg +/// table. +class table_commit_builder { +public: + static checked create( + iceberg::table_identifier table_id, iceberg::table_metadata&& table) { + auto meta_res = get_iceberg_committed_offset(table); + if (meta_res.has_error()) { + vlog( + datalake_log.warn, + "Error getting snapshot property '{}' for table {}: {}", + commit_meta_prop, + table_id, + meta_res.error()); + return file_committer::errc::failed; + } + + return table_commit_builder( + std::move(table_id), std::move(table), meta_res.value()); + } + +public: + checked process_pending_entry( + const model::topic& topic, + model::revision_id topic_revision, + const iceberg::manifest_io& io, + const model::offset added_pending_at, + const chunked_vector& files) { + if (should_skip_entry(added_pending_at)) { + // This entry was committed to the Iceberg table already. + // Intentionally collect the pending commit above so we can + // replicate the fact that it was committed previously, but + // don't construct a data_file to send to Iceberg as it is + // already committed. + vlog( + datalake_log.debug, + "Skipping entry for topic {} revision {} added at " + "coordinator offset {} because table {} has data including " + "coordinator offset {}", + topic, + topic_revision, + added_pending_at, + table_id_, + table_commit_offset_); + } else { + for (const auto& f : files) { + auto pk = build_partition_key(topic, table_, f); + if (pk.has_error()) { + return pk.error(); + } + + // TODO: pass schema_id and pspec_id to merge_append_action + // (currently it assumes that the files were serialized with the + // current schema and a single partition spec). + icb_files_.push_back({ + .content_type = iceberg::data_file_content_type::data, + .file_path = io.to_uri(std::filesystem::path(f.remote_path)), + .file_format = iceberg::data_file_format::parquet, + .partition = std::move(pk.value()), + .record_count = f.row_count, + .file_size_bytes = f.file_size_bytes, + }); + } + } + + new_committed_offset_ = std::max( + new_committed_offset_, + std::make_optional(added_pending_at)); + + return std::nullopt; + } + + ss::future> commit( + const model::topic& topic, + model::revision_id topic_revision, + iceberg::catalog& catalog, + iceberg::manifest_io& io) && { + if (icb_files_.empty()) { + // No new files to commit. + vlog( + datalake_log.debug, + "All committed files were deduplicated for topic {} revision {}, " + "returning without updating Iceberg catalog", + topic, + topic_revision); + co_return std::nullopt; + } + + vassert( + new_committed_offset_.has_value(), + "New Iceberg files implies new commit metadata"); + const auto commit_meta = commit_offset_metadata{ + .offset = *new_committed_offset_, + }; + + vlog( + datalake_log.debug, + "Adding {} files to Iceberg table {}", + icb_files_.size(), + table_id_); + iceberg::transaction txn(std::move(table_)); + auto icb_append_res = co_await txn.merge_append( + io, + std::move(icb_files_), + {{commit_meta_prop, to_json_str(commit_meta)}}); + if (icb_append_res.has_error()) { + co_return log_and_convert_action_errc( + icb_append_res.error(), + fmt::format( + "Iceberg merge append failed for table {}", table_id_)); + } + auto icb_commit_res = co_await catalog.commit_txn( + table_id_, std::move(txn)); + if (icb_commit_res.has_error()) { + co_return log_and_convert_catalog_errc( + icb_commit_res.error(), + fmt::format( + "Iceberg transaction did not commit to table {}", table_id_)); + } + + co_return std::nullopt; + } + +private: + table_commit_builder( + iceberg::table_identifier table_id, + iceberg::table_metadata&& table, + std::optional table_commit_offset) + : table_id_(std::move(table_id)) + , table_(std::move(table)) + , table_commit_offset_(table_commit_offset) {} + +private: + bool should_skip_entry(model::offset added_pending_at) const { + return table_commit_offset_.has_value() + && added_pending_at <= *table_commit_offset_; + } + +private: + iceberg::table_identifier table_id_; + iceberg::table_metadata table_; + std::optional table_commit_offset_; + + // State accumulated. + chunked_vector icb_files_; + std::optional new_committed_offset_; +}; + } // namespace ss::future< @@ -111,28 +367,22 @@ iceberg_file_committer::commit_topic_files_to_catalog( } auto topic_revision = tp_it->second.revision; - auto table_id = table_id_provider::table_id(topic); - auto table_res = co_await load_table(table_id); - if (table_res.has_error()) { + auto main_table_id = table_id_provider::table_id(topic); + auto main_table_res = co_await load_table(main_table_id); + if (main_table_res.has_error()) { vlog( datalake_log.warn, "Error loading table {} for committing from topic {}", - table_id, + main_table_id, topic); - co_return table_res.error(); + co_return main_table_res.error(); } - auto& table = table_res.value(); - auto meta_res = get_iceberg_committed_offset(table); - if (meta_res.has_error()) { - vlog( - datalake_log.warn, - "Error getting snapshot property '{}' for table {}: {}", - commit_meta_prop, - table_id, - meta_res.error()); - co_return errc::failed; + auto main_table_commit_builder_res = table_commit_builder::create( + std::move(main_table_id), std::move(main_table_res.value())); + if (main_table_commit_builder_res.has_error()) { + co_return main_table_commit_builder_res.error(); } - auto iceberg_commit_meta_opt = meta_res.value(); + auto& main_table_commit_builder = main_table_commit_builder_res.value(); // update the iterator after a scheduling point tp_it = state.topic_to_state.find(topic); @@ -150,120 +400,15 @@ iceberg_file_committer::commit_topic_files_to_catalog( } chunked_hash_map pending_commits; - chunked_vector icb_files; - std::optional new_committed_offset; const auto& tp_state = tp_it->second; for (const auto& [pid, p_state] : tp_state.pid_to_pending_files) { for (const auto& e : p_state.pending_entries) { pending_commits[pid] = e.data.last_offset; - if ( - iceberg_commit_meta_opt.has_value() - && e.added_pending_at <= *iceberg_commit_meta_opt) { - // This entry was committed to the Iceberg table already. - // Intentionally collect the pending commit above so we can - // replicate the fact that it was committed previously, but - // don't construct a data_file to send to Iceberg as it is - // already committed. - vlog( - datalake_log.debug, - "Skipping entry for topic {} revision {} added at " - "coordinator offset {} because table {} has data including " - "coordinator offset {}", - topic, - topic_revision, - e.added_pending_at, - table_id, - *iceberg_commit_meta_opt); - continue; - } - new_committed_offset = std::max( - new_committed_offset, - std::make_optional(e.added_pending_at)); - for (const auto& f : e.data.files) { - auto pk = std::make_unique(); - if (f.table_schema_id >= 0) { - auto schema_id = iceberg::schema::id_t{f.table_schema_id}; - auto schema = table.get_schema(schema_id); - if (!schema) { - vlog( - datalake_log.error, - "can't find schema {} in table for topic {}", - schema_id, - topic); - co_return errc::failed; - } - - auto pspec_id = iceberg::partition_spec::id_t{ - f.partition_spec_id}; - auto partition_spec = table.get_partition_spec(pspec_id); - if (!partition_spec) { - vlog( - datalake_log.error, - "can't find partition spec {} in table for topic {}", - pspec_id, - topic); - co_return errc::failed; - } - - if ( - partition_spec->fields.size() != f.partition_key.size()) { - vlog( - datalake_log.error, - "[topic: {}, file: {}] partition key size {} doesn't " - "match spec size {} (spec id: {})", - topic, - f.remote_path, - f.partition_key.size(), - partition_spec->fields.size(), - pspec_id); - co_return errc::failed; - } - - auto key_type = iceberg::partition_key_type::create( - *partition_spec, *schema); - - for (size_t i = 0; i < partition_spec->fields.size(); ++i) { - const auto& field_type = key_type.type.fields.at(i); - const auto& field_bytes = f.partition_key.at(i); - if (field_bytes) { - try { - pk->fields.push_back(iceberg::value_from_bytes( - field_type->type, field_bytes.value())); - } catch (const std::invalid_argument& e) { - vlog( - datalake_log.error, - "[topic: {}, file: {}] failed to parse " - "partition key field {} (type: {}): {}", - topic, - f.remote_path, - i, - field_type->type, - e); - co_return errc::failed; - } - } else { - pk->fields.push_back(std::nullopt); - } - } - } else { - // File created by a legacy Redpanda version that only - // supported hourly partitioning, the partition key value is - // in the hour_deprecated field. - pk->fields.emplace_back( - iceberg::int_value{f.hour_deprecated}); - } - // TODO: pass schema_id and pspec_id to merge_append_action - // (currently it assumes that the files were serialized with the - // current schema and a single partition spec). - icb_files.push_back({ - .content_type = iceberg::data_file_content_type::data, - .file_path = io_.to_uri(std::filesystem::path(f.remote_path)), - .file_format = iceberg::data_file_format::parquet, - .partition = iceberg::partition_key{std::move(pk)}, - .record_count = f.row_count, - .file_size_bytes = f.file_size_bytes, - }); + auto res = main_table_commit_builder.process_pending_entry( + topic, topic_revision, io_, e.added_pending_at, e.data.files); + if (res.has_error()) { + co_return res.error(); } } } @@ -283,51 +428,20 @@ iceberg_file_committer::commit_topic_files_to_catalog( if (update_res.has_error()) { vlog( datalake_log.warn, - "Could not build STM update for committing to {}: {}", - table_id, + "Could not build STM update for committing topic {} revision {}: " + "{}", + topic, + topic_revision, update_res.error()); co_return errc::failed; } updates.emplace_back(std::move(update_res.value())); } - if (icb_files.empty()) { - // All files are deduplicated. - vlog( - datalake_log.debug, - "All committed files were deduplicated for topic {} revision {}, " - "returning without updating Iceberg catalog", - topic, - topic_revision); - co_return updates; - } - vassert( - new_committed_offset.has_value(), - "New Iceberg files implies new commit metadata"); - const auto commit_meta = commit_offset_metadata{ - .offset = *new_committed_offset, - }; - vlog( - datalake_log.debug, - "Adding {} files to Iceberg table {}", - icb_files.size(), - table_id); - iceberg::transaction txn(std::move(table)); - auto icb_append_res = co_await txn.merge_append( - io_, - std::move(icb_files), - {{commit_meta_prop, to_json_str(commit_meta)}}); - if (icb_append_res.has_error()) { - co_return log_and_convert_action_errc( - icb_append_res.error(), - fmt::format("Iceberg merge append failed for table {}", table_id)); - } - auto icb_commit_res = co_await catalog_.commit_txn( - table_id, std::move(txn)); - if (icb_commit_res.has_error()) { - co_return log_and_convert_catalog_errc( - icb_commit_res.error(), - fmt::format( - "Iceberg transaction did not commit to table {}", table_id)); + + auto commit_res = co_await std::move(main_table_commit_builder) + .commit(topic, topic_revision, catalog_, io_); + if (commit_res.has_error()) { + co_return commit_res.error(); } co_return updates; } diff --git a/src/v/datalake/translation_task.cc b/src/v/datalake/translation_task.cc index 4503f531325e4..089ac5e7bc38a 100644 --- a/src/v/datalake/translation_task.cc +++ b/src/v/datalake/translation_task.cc @@ -34,6 +34,29 @@ translation_task::errc map_error_code(cloud_data_io::errc errc) { } } +ss::future> execute_single_upload( + cloud_data_io& _cloud_io, + const local_file_metadata& lf_meta, + const remote_path& remote_path_prefix, + retry_chain_node& parent_rcn, + lazy_abort_source& lazy_as) { + auto remote_path = calculate_remote_path(lf_meta.path, remote_path_prefix); + auto result = co_await _cloud_io.upload_data_file( + lf_meta, remote_path, parent_rcn, lazy_as); + if (result.has_error()) { + vlog( + datalake_log.warn, + "error uploading file {} to {} - {}", + lf_meta, + remote_path, + result.error()); + + co_return map_error_code(result.error()); + } + + co_return remote_path; +} + ss::future> delete_local_data_files( const chunked_vector& files) { @@ -55,71 +78,23 @@ delete_local_data_files( }); } -} // namespace -translation_task::translation_task( - cloud_data_io& cloud_io, - schema_manager& schema_mgr, - type_resolver& type_resolver, - record_translator& record_translator, - table_creator& table_creator) - : _cloud_io(&cloud_io) - , _schema_mgr(&schema_mgr) - , _type_resolver(&type_resolver) - , _record_translator(&record_translator) - , _table_creator(&table_creator) {} - ss::future< - checked> -translation_task::translate( - const model::ntp& ntp, - model::revision_id topic_revision, - std::unique_ptr writer_factory, - custom_partitioning_enabled is_custom_partitioning_enabled, - model::record_batch_reader reader, + checked, translation_task::errc>> +upload_files( + cloud_data_io& _cloud_io, + const chunked_vector& files, + translation_task::custom_partitioning_enabled is_custom_partitioning_enabled, const remote_path& remote_path_prefix, retry_chain_node& rcn, lazy_abort_source& lazy_as) { - record_multiplexer mux( - ntp, - topic_revision, - std::move(writer_factory), - *_schema_mgr, - *_type_resolver, - *_record_translator, - *_table_creator, - lazy_as); - // Write local files - auto mux_result = co_await std::move(reader).consume( - std::move(mux), _read_timeout + model::timeout_clock::now()); - - if (mux_result.has_error()) { - vlog( - datalake_log.warn, - "Error writing data files - {}", - mux_result.error()); - co_return errc::file_io_error; - } - auto write_result = std::move(mux_result).value(); - if (datalake_log.is_enabled(seastar::log_level::trace)) { - vlog( - datalake_log.trace, - "translation result base offset: {}, last offset: {}, data files: {}", - write_result.start_offset, - write_result.last_offset, - fmt::join(write_result.data_files, ", ")); - } - - coordinator::translated_offset_range ret{ - .start_offset = write_result.start_offset, - .last_offset = write_result.last_offset, - }; - ret.files.reserve(write_result.data_files.size()); + chunked_vector ret; + ret.reserve(files.size()); - std::optional upload_error; - // TODO: parallelize uploads - for (auto& file : write_result.data_files) { + std::optional upload_error; + for (auto& file : files) { auto r = co_await execute_single_upload( - file.local_file, remote_path_prefix, rcn, lazy_as); + _cloud_io, file.local_file, remote_path_prefix, rcn, lazy_as); + if (r.has_error()) { vlog( datalake_log.warn, @@ -162,11 +137,10 @@ translation_task::translate( uploaded.hour_deprecated = get_hour(file.partition_key); } - ret.files.push_back(std::move(uploaded)); + ret.push_back(std::move(uploaded)); } - auto delete_result = co_await delete_local_data_files( - write_result.data_files); + auto delete_result = co_await delete_local_data_files(files); // for now we simply ignore the local deletion failures if (delete_result.has_error()) { vlog( @@ -179,12 +153,12 @@ translation_task::translate( // in this case we delete any successfully uploaded remote files before // returning a result chunked_vector files_to_delete; - for (auto& data_file : ret.files) { + for (auto& data_file : ret) { files_to_delete.emplace_back(data_file.remote_path); } // TODO: add mechanism for cleaning up orphaned files that may be left // behind when delete operation failed or was aborted. - auto remote_del_result = co_await _cloud_io->delete_data_files( + auto remote_del_result = co_await _cloud_io.delete_data_files( std::move(files_to_delete), rcn); if (remote_del_result.has_error()) { vlog( @@ -197,27 +171,78 @@ translation_task::translate( co_return ret; } -ss::future> -translation_task::execute_single_upload( - const local_file_metadata& lf_meta, + +} // namespace +translation_task::translation_task( + cloud_data_io& cloud_io, + schema_manager& schema_mgr, + type_resolver& type_resolver, + record_translator& record_translator, + table_creator& table_creator) + : _cloud_io(&cloud_io) + , _schema_mgr(&schema_mgr) + , _type_resolver(&type_resolver) + , _record_translator(&record_translator) + , _table_creator(&table_creator) {} + +ss::future< + checked> +translation_task::translate( + const model::ntp& ntp, + model::revision_id topic_revision, + std::unique_ptr writer_factory, + custom_partitioning_enabled is_custom_partitioning_enabled, + model::record_batch_reader reader, const remote_path& remote_path_prefix, - retry_chain_node& parent_rcn, + retry_chain_node& rcn, lazy_abort_source& lazy_as) { - auto remote_path = calculate_remote_path(lf_meta.path, remote_path_prefix); - auto result = co_await _cloud_io->upload_data_file( - lf_meta, remote_path, parent_rcn, lazy_as); - if (result.has_error()) { + record_multiplexer mux( + ntp, + topic_revision, + std::move(writer_factory), + *_schema_mgr, + *_type_resolver, + *_record_translator, + *_table_creator, + lazy_as); + // Write local files + auto mux_result = co_await std::move(reader).consume( + std::move(mux), _read_timeout + model::timeout_clock::now()); + + if (mux_result.has_error()) { vlog( datalake_log.warn, - "error uploading file {} to {} - {}", - lf_meta, - remote_path, - result.error()); + "Error writing data files - {}", + mux_result.error()); + co_return errc::file_io_error; + } + auto write_result = std::move(mux_result).value(); + if (datalake_log.is_enabled(seastar::log_level::trace)) { + vlog( + datalake_log.trace, + "translation result base offset: {}, last offset: {}, data files: {}", + write_result.start_offset, + write_result.last_offset, + fmt::join(write_result.data_files, ", ")); + } - co_return map_error_code(result.error()); + coordinator::translated_offset_range ret{ + .start_offset = write_result.start_offset, + .last_offset = write_result.last_offset, + }; + auto upload_res = co_await upload_files( + *_cloud_io, + write_result.data_files, + is_custom_partitioning_enabled, + remote_path_prefix, + rcn, + lazy_as); + if (upload_res.has_error()) { + co_return upload_res.error(); } + ret.files = std::move(upload_res.value()); - co_return remote_path; + co_return ret; } std::ostream& operator<<(std::ostream& o, translation_task::errc ec) { diff --git a/src/v/datalake/translation_task.h b/src/v/datalake/translation_task.h index 82996377fa80d..203bc7fc4994d 100644 --- a/src/v/datalake/translation_task.h +++ b/src/v/datalake/translation_task.h @@ -57,12 +57,6 @@ class translation_task { private: friend std::ostream& operator<<(std::ostream&, errc); - ss::future> execute_single_upload( - const local_file_metadata& lf_meta, - const remote_path& remote_path_prefix, - retry_chain_node& parent_rcn, - lazy_abort_source& lazy_as); - ss::future delete_remote_files( chunked_vector, retry_chain_node& parent_rcn);