diff --git a/java/rocksjni/flink_compactionfilterjni.cc b/java/rocksjni/flink_compactionfilterjni.cc index 1b74654682..f090ce461d 100644 --- a/java/rocksjni/flink_compactionfilterjni.cc +++ b/java/rocksjni/flink_compactionfilterjni.cc @@ -183,6 +183,15 @@ void Java_org_rocksdb_FlinkCompactionFilter_disposeFlinkCompactionFilterConfigHo delete config_holder; } +jlong Java_org_rocksdb_FlinkCompactionFilter_createFlinkValueExtractorFactory( + JNIEnv* env, jclass /* jcls */, jlong config_holder_handle) { + using namespace TERARKDB_NAMESPACE::flink; + auto config_holder = + *(reinterpret_cast*>( + config_holder_handle)); + return reinterpret_cast(new FlinkValueExtractorFactory(config_holder)); +} + /* * Class: org_rocksdb_FlinkCompactionFilter * Method: createNewFlinkCompactionFilter0 diff --git a/java/rocksjni/options.cc b/java/rocksjni/options.cc index a579139a90..d5578ab2af 100644 --- a/java/rocksjni/options.cc +++ b/java/rocksjni/options.cc @@ -3305,6 +3305,20 @@ void Java_org_rocksdb_ColumnFamilyOptions_setMergeOperator( mergeOperatorHandle)); } +/* + * Class: org_rocksdb_ColumnFamilyOptions + * Method: setCompactionFilterHandle + * Signature: (JJ)V + */ +void Java_org_rocksdb_ColumnFamilyOptions_setValueExtractorFactoryHandle( + JNIEnv* /*env*/, jobject /*jobj*/, jlong jopt_handle, + jlong jvaluemeta_extractor_factory_handle) { + reinterpret_cast(jopt_handle) + ->value_meta_extractor_factory = + reinterpret_cast( + jvaluemeta_extractor_factory_handle); +} + /* * Class: org_rocksdb_ColumnFamilyOptions * Method: setCompactionFilterHandle diff --git a/java/rocksjni/value_extractor.cc b/java/rocksjni/value_extractor.cc new file mode 100644 index 0000000000..bcf9f12406 --- /dev/null +++ b/java/rocksjni/value_extractor.cc @@ -0,0 +1,29 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// This file implements the "bridge" between Java and C++ for +// TERARKDB_NAMESPACE::CompactionFilter. + +#include "rocksdb/value_extractor.h" + +#include + +#include "include/org_rocksdb_AbstractValueExtractorFactory.h" + +// + +/* + * Class: org_rocksdb_AbstractCompactionFilter + * Method: disposeInternal + * Signature: (J)V + */ +void Java_org_rocksdb_AbstractValueExtractorFactory_disposeInternal( + JNIEnv* /*env*/, jobject /*jobj*/, jlong handle) { + auto* vef = + reinterpret_cast(handle); + assert(vef != nullptr); + delete vef; +} +// diff --git a/java/src/main/java/org/rocksdb/AbstractValueExtractoFactory.java b/java/src/main/java/org/rocksdb/AbstractValueExtractoFactory.java new file mode 100644 index 0000000000..fe6853f733 --- /dev/null +++ b/java/src/main/java/org/rocksdb/AbstractValueExtractoFactory.java @@ -0,0 +1,40 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +package org.rocksdb; + +/** + * Each compaction will create a new {@link AbstractCompactionFilter} + * allowing the application to know about different compactions + * + * @param The concrete type of the compaction filter + */ +public abstract class AbstractValueExtractorFactory> + extends RocksObject { + + public AbstractValueExtractorFactory(final long nativeHandle) { + super(nativeHandle); + } + + + /** + * A name which identifies this compaction filter + * + * The name will be printed to the LOG file on start up for diagnosis + */ + public abstract String name(); + + /** + * We override {@link RocksCallbackObject#disposeInternal()} + * as disposing of a TERARKDB_NAMESPACE::AbstractCompactionFilterFactory requires + * a slightly different approach as it is a std::shared_ptr + */ + @Override + protected void disposeInternal() { + disposeInternal(nativeHandle_); + } + + private native void disposeInternal(final long handle); +} diff --git a/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java b/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java index 3eb0ac7b38..72795da444 100644 --- a/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java +++ b/java/src/main/java/org/rocksdb/ColumnFamilyOptions.java @@ -48,6 +48,7 @@ public ColumnFamilyOptions(ColumnFamilyOptions other) { this.comparator_ = other.comparator_; this.compactionFilter_ = other.compactionFilter_; this.compactionFilterFactory_ = other.compactionFilterFactory_; + this.valueExtractorFactory_ = other.valueExtractorFactory_; this.compactionOptionsUniversal_ = other.compactionOptionsUniversal_; this.compressionOptions_ = other.compressionOptions_; } @@ -225,6 +226,15 @@ public ColumnFamilyOptions setCompactionFilterFactory(final AbstractCompactionFi return this; } + + public ColumnFamilyOptions setValueExtractorFactory(final AbstractValueExtractorFactory> valueExtractorFactory) { + assert (isOwningHandle()); + setValueExtractorFactoryHandle(nativeHandle_, valueExtractorFactory.nativeHandle_); + valueExtractorFactory_ = valueExtractorFactory; + return this; + } + + @Override public ColumnFamilyOptions setWriteBufferSize(final long writeBufferSize) { assert(isOwningHandle()); @@ -949,6 +959,8 @@ private native void setForceConsistencyChecks(final long handle, private AbstractCompactionFilter> compactionFilter_; AbstractCompactionFilterFactory> compactionFilterFactory_; + AbstractValueExtractorFactory> + valueExtractorFactory_; private CompactionOptionsUniversal compactionOptionsUniversal_; private CompressionOptions compressionOptions_; diff --git a/java/src/main/java/org/rocksdb/FlinkCompactionFilter.java b/java/src/main/java/org/rocksdb/FlinkCompactionFilter.java index 577b7547a5..ffa760e7eb 100644 --- a/java/src/main/java/org/rocksdb/FlinkCompactionFilter.java +++ b/java/src/main/java/org/rocksdb/FlinkCompactionFilter.java @@ -26,7 +26,7 @@ public FlinkCompactionFilter(ConfigHolder configHolder, TimeProvider timeProvide public FlinkCompactionFilter(ConfigHolder configHolder, TimeProvider timeProvider, Logger logger) { super(createNewFlinkCompactionFilter0(configHolder.nativeHandle_, timeProvider, logger == null ? 0 : logger.nativeHandle_)); } - + private native static long createFlinkValueExtractorFactory(long configHolderHandle); private native static long createNewFlinkCompactionFilter0(long configHolderHandle, TimeProvider timeProvider, long loggerHandle); private native static long createNewFlinkCompactionFilterConfigHolder(); private native static void disposeFlinkCompactionFilterConfigHolder(long configHolderHandle); @@ -118,6 +118,18 @@ public interface TimeProvider { long currentTimestamp(); } + public static class FlinkValueExtractorFactory + extends AbstractValueExtractorFactory { + + public FlinkValueExtractor(ConfigHolder configHolder) { + super(createFlinkValueExtractorFactory(configHolder)); + } + @Override + public String name() { + return "FlinkValueExtractorFactory"; + } + } + public static class FlinkCompactionFilterFactory extends AbstractCompactionFilterFactory { private final ConfigHolder configHolder; private final TimeProvider timeProvider; diff --git a/utilities/flink/flink_compaction_filter.cc b/utilities/flink/flink_compaction_filter.cc index 62cef7f5c1..5c53352c7a 100644 --- a/utilities/flink/flink_compaction_filter.cc +++ b/utilities/flink/flink_compaction_filter.cc @@ -50,7 +50,6 @@ FlinkCompactionFilter::ConfigHolder::~ConfigHolder() { delete config; } } - // at the moment Flink configures filters (can be already created) only once // when user creates state // otherwise it can lead to ListElementFilter leak in Config @@ -112,17 +111,43 @@ inline void FlinkCompactionFilter::InitConfigIfNotYet() const { config_cached_ == &DISABLED_CONFIG ? config_holder_->GetConfig() : config_cached_; } +Status FlinkCompactionFilter::Extract(const Slice& key, const Slice& value, + std::string* output) const try { + const StateType state_type = config_cached_->state_type_; + const bool tooShortValue = + value.size() < config_cached_->timestamp_offset_ + TIMESTAMP_BYTE_SIZE; + if (state_type != StateType::List && !tooShortValue) { + output->assign(value.data() + config_cached_->timestamp_offset_, + TIMESTAMP_BYTE_SIZE); + } + return Status::OK(); +} catch (const std::exception& e) { + return Status::Corruption(e.what()); +} CompactionFilter::Decision FlinkCompactionFilter::FilterV2( int /*level*/, const Slice& key, ValueType value_type, - const Slice& /*existing_value_meta*/, const LazyBuffer& existing_lazy_value, + const Slice& value_meta, const LazyBuffer& existing_lazy_value, LazyBuffer* new_value, std::string* /*skip_until*/) const { - auto s = existing_lazy_value.fetch(); - if (!s.ok()) { - new_value->reset(std::move(s)); - return CompactionFilter::Decision::kKeep; + const StateType state_type = config_cached_->state_type_; + const bool value_or_merge = + value_type == ValueType::kValue || value_type == ValueType::kMergeOperand; + const bool value_state = + state_type == StateType::Value && value_type == ValueType::kValue; + const bool list_entry = state_type == StateType::List && value_or_merge; + const bool toDecide = value_state || list_entry; + const bool list_filter = list_entry && list_element_filter_; + Status s; + bool no_meta = list_filter || value_meta == nullptr; + if (no_meta) { + s = existing_lazy_value.fetch(); + if (!s.ok()) { + new_value->reset(std::move(s)); + return CompactionFilter::Decision::kKeep; + } } - const Slice& existing_value = existing_lazy_value.slice(); + const Slice& existing_value = + no_meta ? existing_lazy_value.slice() : value_meta; InitConfigIfNotYet(); CreateListElementFilterIfNull(); @@ -143,15 +168,6 @@ CompactionFilter::Decision FlinkCompactionFilter::FilterV2( existing_value.size() < config_cached_->timestamp_offset_ + TIMESTAMP_BYTE_SIZE; - const StateType state_type = config_cached_->state_type_; - const bool value_or_merge = - value_type == ValueType::kValue || value_type == ValueType::kMergeOperand; - const bool value_state = - state_type == StateType::Value && value_type == ValueType::kValue; - const bool list_entry = state_type == StateType::List && value_or_merge; - const bool toDecide = value_state || list_entry; - const bool list_filter = list_entry && list_element_filter_; - Decision decision = Decision::kKeep; if (!tooShortValue && toDecide) { decision = list_filter ? ListDecide(existing_value, new_value) diff --git a/utilities/flink/flink_compaction_filter.h b/utilities/flink/flink_compaction_filter.h index b554726a10..71c13424cc 100644 --- a/utilities/flink/flink_compaction_filter.h +++ b/utilities/flink/flink_compaction_filter.h @@ -16,6 +16,7 @@ #include "rocksdb/lazy_buffer.h" #include "rocksdb/slice.h" #include "rocksdb/terark_namespace.h" +#include "rocksdb/value_extractor.h" namespace TERARKDB_NAMESPACE { namespace flink { @@ -32,7 +33,7 @@ static const std::size_t JAVA_MAX_SIZE = static_cast(0x7fffffff); * Note: this compaction filter is a special implementation, designed for usage * only in Apache Flink project. */ -class FlinkCompactionFilter : public CompactionFilter { +class FlinkCompactionFilter : public CompactionFilter, public ValueExtractor { public: enum StateType { // WARNING!!! Do not change the order of enum entries as it is important for @@ -142,6 +143,9 @@ class FlinkCompactionFilter : public CompactionFilter { const LazyBuffer& existing_value, LazyBuffer* new_value, std::string* skip_until) const override; + Status Extract(const Slice& key, const Slice& value, + std::string* output) const override; + bool IgnoreSnapshots() const override { return true; } private: @@ -192,5 +196,22 @@ static const FlinkCompactionFilter::Config DISABLED_CONFIG = std::numeric_limits::max(), std::numeric_limits::max(), nullptr}; +class FlinkValueExtractorFactory : public ValueExtractorFactory { + public: + const char* Name() const override { + return "flink.ValueTimeStampExtractorFactory"; + } + explicit FlinkValueExtractorFactory( + std::shared_ptr config_holder) + : config_holder_(std::move(config_holder)) {} + std::unique_ptr CreateValueExtractor( + const Context& context) const { + return std::unique_ptr( + new FlinkCompactionFilter(config_holder_, nullptr, nullptr)); + }; + private: + std::shared_ptr config_holder_; +}; + } // namespace flink } // namespace TERARKDB_NAMESPACE \ No newline at end of file