From fa7575782e83d2629c74c986c8fe508e0cd773c9 Mon Sep 17 00:00:00 2001 From: "wangyi.ywq" Date: Mon, 14 Mar 2022 01:17:14 +0800 Subject: [PATCH] flink support value-extractor #220 --- utilities/flink/flink_compaction_filter.cc | 48 ++++++++++++++-------- utilities/flink/flink_compaction_filter.h | 6 ++- 2 files changed, 37 insertions(+), 17 deletions(-) diff --git a/utilities/flink/flink_compaction_filter.cc b/utilities/flink/flink_compaction_filter.cc index 62cef7f5c1..5c53352c7a 100644 --- a/utilities/flink/flink_compaction_filter.cc +++ b/utilities/flink/flink_compaction_filter.cc @@ -50,7 +50,6 @@ FlinkCompactionFilter::ConfigHolder::~ConfigHolder() { delete config; } } - // at the moment Flink configures filters (can be already created) only once // when user creates state // otherwise it can lead to ListElementFilter leak in Config @@ -112,17 +111,43 @@ inline void FlinkCompactionFilter::InitConfigIfNotYet() const { config_cached_ == &DISABLED_CONFIG ? config_holder_->GetConfig() : config_cached_; } +Status FlinkCompactionFilter::Extract(const Slice& key, const Slice& value, + std::string* output) const try { + const StateType state_type = config_cached_->state_type_; + const bool tooShortValue = + value.size() < config_cached_->timestamp_offset_ + TIMESTAMP_BYTE_SIZE; + if (state_type != StateType::List && !tooShortValue) { + output->assign(value.data() + config_cached_->timestamp_offset_, + TIMESTAMP_BYTE_SIZE); + } + return Status::OK(); +} catch (const std::exception& e) { + return Status::Corruption(e.what()); +} CompactionFilter::Decision FlinkCompactionFilter::FilterV2( int /*level*/, const Slice& key, ValueType value_type, - const Slice& /*existing_value_meta*/, const LazyBuffer& existing_lazy_value, + const Slice& value_meta, const LazyBuffer& existing_lazy_value, LazyBuffer* new_value, std::string* /*skip_until*/) const { - auto s = existing_lazy_value.fetch(); - if (!s.ok()) { - new_value->reset(std::move(s)); - return CompactionFilter::Decision::kKeep; + const StateType state_type = config_cached_->state_type_; + const bool value_or_merge = + value_type == ValueType::kValue || value_type == ValueType::kMergeOperand; + const bool value_state = + state_type == StateType::Value && value_type == ValueType::kValue; + const bool list_entry = state_type == StateType::List && value_or_merge; + const bool toDecide = value_state || list_entry; + const bool list_filter = list_entry && list_element_filter_; + Status s; + bool no_meta = list_filter || value_meta == nullptr; + if (no_meta) { + s = existing_lazy_value.fetch(); + if (!s.ok()) { + new_value->reset(std::move(s)); + return CompactionFilter::Decision::kKeep; + } } - const Slice& existing_value = existing_lazy_value.slice(); + const Slice& existing_value = + no_meta ? existing_lazy_value.slice() : value_meta; InitConfigIfNotYet(); CreateListElementFilterIfNull(); @@ -143,15 +168,6 @@ CompactionFilter::Decision FlinkCompactionFilter::FilterV2( existing_value.size() < config_cached_->timestamp_offset_ + TIMESTAMP_BYTE_SIZE; - const StateType state_type = config_cached_->state_type_; - const bool value_or_merge = - value_type == ValueType::kValue || value_type == ValueType::kMergeOperand; - const bool value_state = - state_type == StateType::Value && value_type == ValueType::kValue; - const bool list_entry = state_type == StateType::List && value_or_merge; - const bool toDecide = value_state || list_entry; - const bool list_filter = list_entry && list_element_filter_; - Decision decision = Decision::kKeep; if (!tooShortValue && toDecide) { decision = list_filter ? ListDecide(existing_value, new_value) diff --git a/utilities/flink/flink_compaction_filter.h b/utilities/flink/flink_compaction_filter.h index b554726a10..15ef3871d1 100644 --- a/utilities/flink/flink_compaction_filter.h +++ b/utilities/flink/flink_compaction_filter.h @@ -16,6 +16,7 @@ #include "rocksdb/lazy_buffer.h" #include "rocksdb/slice.h" #include "rocksdb/terark_namespace.h" +#include "rocksdb/value_extractor.h" namespace TERARKDB_NAMESPACE { namespace flink { @@ -32,7 +33,7 @@ static const std::size_t JAVA_MAX_SIZE = static_cast(0x7fffffff); * Note: this compaction filter is a special implementation, designed for usage * only in Apache Flink project. */ -class FlinkCompactionFilter : public CompactionFilter { +class FlinkCompactionFilter : public CompactionFilter, ValueExtractor { public: enum StateType { // WARNING!!! Do not change the order of enum entries as it is important for @@ -142,6 +143,9 @@ class FlinkCompactionFilter : public CompactionFilter { const LazyBuffer& existing_value, LazyBuffer* new_value, std::string* skip_until) const override; + Status Extract(const Slice& key, const Slice& value, + std::string* output) const override; + bool IgnoreSnapshots() const override { return true; } private: