Skip to content

Commit

Permalink
flink support value-extractor #220
Browse files Browse the repository at this point in the history
  • Loading branch information
yapple committed Mar 13, 2022
1 parent fbe63b6 commit fa75757
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 17 deletions.
48 changes: 32 additions & 16 deletions utilities/flink/flink_compaction_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ FlinkCompactionFilter::ConfigHolder::~ConfigHolder() {
delete config;
}
}

// at the moment Flink configures filters (can be already created) only once
// when user creates state
// otherwise it can lead to ListElementFilter leak in Config
Expand Down Expand Up @@ -112,17 +111,43 @@ inline void FlinkCompactionFilter::InitConfigIfNotYet() const {
config_cached_ == &DISABLED_CONFIG ? config_holder_->GetConfig()
: config_cached_;
}
Status FlinkCompactionFilter::Extract(const Slice& key, const Slice& value,
std::string* output) const try {
const StateType state_type = config_cached_->state_type_;
const bool tooShortValue =
value.size() < config_cached_->timestamp_offset_ + TIMESTAMP_BYTE_SIZE;
if (state_type != StateType::List && !tooShortValue) {
output->assign(value.data() + config_cached_->timestamp_offset_,
TIMESTAMP_BYTE_SIZE);
}
return Status::OK();
} catch (const std::exception& e) {
return Status::Corruption(e.what());
}

CompactionFilter::Decision FlinkCompactionFilter::FilterV2(
int /*level*/, const Slice& key, ValueType value_type,
const Slice& /*existing_value_meta*/, const LazyBuffer& existing_lazy_value,
const Slice& value_meta, const LazyBuffer& existing_lazy_value,
LazyBuffer* new_value, std::string* /*skip_until*/) const {
auto s = existing_lazy_value.fetch();
if (!s.ok()) {
new_value->reset(std::move(s));
return CompactionFilter::Decision::kKeep;
const StateType state_type = config_cached_->state_type_;
const bool value_or_merge =
value_type == ValueType::kValue || value_type == ValueType::kMergeOperand;
const bool value_state =
state_type == StateType::Value && value_type == ValueType::kValue;
const bool list_entry = state_type == StateType::List && value_or_merge;
const bool toDecide = value_state || list_entry;
const bool list_filter = list_entry && list_element_filter_;
Status s;
bool no_meta = list_filter || value_meta == nullptr;
if (no_meta) {
s = existing_lazy_value.fetch();
if (!s.ok()) {
new_value->reset(std::move(s));
return CompactionFilter::Decision::kKeep;
}
}
const Slice& existing_value = existing_lazy_value.slice();
const Slice& existing_value =
no_meta ? existing_lazy_value.slice() : value_meta;

InitConfigIfNotYet();
CreateListElementFilterIfNull();
Expand All @@ -143,15 +168,6 @@ CompactionFilter::Decision FlinkCompactionFilter::FilterV2(
existing_value.size() <
config_cached_->timestamp_offset_ + TIMESTAMP_BYTE_SIZE;

const StateType state_type = config_cached_->state_type_;
const bool value_or_merge =
value_type == ValueType::kValue || value_type == ValueType::kMergeOperand;
const bool value_state =
state_type == StateType::Value && value_type == ValueType::kValue;
const bool list_entry = state_type == StateType::List && value_or_merge;
const bool toDecide = value_state || list_entry;
const bool list_filter = list_entry && list_element_filter_;

Decision decision = Decision::kKeep;
if (!tooShortValue && toDecide) {
decision = list_filter ? ListDecide(existing_value, new_value)
Expand Down
6 changes: 5 additions & 1 deletion utilities/flink/flink_compaction_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "rocksdb/lazy_buffer.h"
#include "rocksdb/slice.h"
#include "rocksdb/terark_namespace.h"
#include "rocksdb/value_extractor.h"

namespace TERARKDB_NAMESPACE {
namespace flink {
Expand All @@ -32,7 +33,7 @@ static const std::size_t JAVA_MAX_SIZE = static_cast<std::size_t>(0x7fffffff);
* Note: this compaction filter is a special implementation, designed for usage
* only in Apache Flink project.
*/
class FlinkCompactionFilter : public CompactionFilter {
class FlinkCompactionFilter : public CompactionFilter, ValueExtractor {
public:
enum StateType {
// WARNING!!! Do not change the order of enum entries as it is important for
Expand Down Expand Up @@ -142,6 +143,9 @@ class FlinkCompactionFilter : public CompactionFilter {
const LazyBuffer& existing_value, LazyBuffer* new_value,
std::string* skip_until) const override;

Status Extract(const Slice& key, const Slice& value,
std::string* output) const override;

bool IgnoreSnapshots() const override { return true; }

private:
Expand Down

0 comments on commit fa75757

Please sign in to comment.