From 3ca3da97c09dd0fc5bf4874ad84f6b346b56046c Mon Sep 17 00:00:00 2001 From: "wangyi.ywq" Date: Tue, 15 Mar 2022 00:36:15 +0800 Subject: [PATCH] support flink_value_extractor_factory jni #220 --- java/rocksjni/flink_compactionfilterjni.cc | 9 +++++ java/rocksjni/options.cc | 14 +++++++ java/rocksjni/value_extractor.cc | 29 ++++++++++++++ .../rocksdb/AbstractValueExtractoFactory.java | 40 +++++++++++++++++++ .../org/rocksdb/FlinkCompactionFilter.java | 14 ++++++- utilities/flink/flink_compaction_filter.h | 21 +++++++++- 6 files changed, 124 insertions(+), 3 deletions(-) create mode 100644 java/rocksjni/value_extractor.cc create mode 100644 java/src/main/java/org/rocksdb/AbstractValueExtractoFactory.java diff --git a/java/rocksjni/flink_compactionfilterjni.cc b/java/rocksjni/flink_compactionfilterjni.cc index 1b74654682..f090ce461d 100644 --- a/java/rocksjni/flink_compactionfilterjni.cc +++ b/java/rocksjni/flink_compactionfilterjni.cc @@ -183,6 +183,15 @@ void Java_org_rocksdb_FlinkCompactionFilter_disposeFlinkCompactionFilterConfigHo delete config_holder; } +jlong Java_org_rocksdb_FlinkCompactionFilter_createFlinkValueExtractorFactory( + JNIEnv* env, jclass /* jcls */, jlong config_holder_handle) { + using namespace TERARKDB_NAMESPACE::flink; + auto config_holder = + *(reinterpret_cast*>( + config_holder_handle)); + return reinterpret_cast(new FlinkValueExtractorFactory(config_holder)); +} + /* * Class: org_rocksdb_FlinkCompactionFilter * Method: createNewFlinkCompactionFilter0 diff --git a/java/rocksjni/options.cc b/java/rocksjni/options.cc index a579139a90..d5578ab2af 100644 --- a/java/rocksjni/options.cc +++ b/java/rocksjni/options.cc @@ -3305,6 +3305,20 @@ void Java_org_rocksdb_ColumnFamilyOptions_setMergeOperator( mergeOperatorHandle)); } +/* + * Class: org_rocksdb_ColumnFamilyOptions + * Method: setCompactionFilterHandle + * Signature: (JJ)V + */ +void Java_org_rocksdb_ColumnFamilyOptions_setValueExtractorFactoryHandle( + JNIEnv* /*env*/, jobject /*jobj*/, jlong jopt_handle, + jlong jvaluemeta_extractor_factory_handle) { + reinterpret_cast(jopt_handle) + ->value_meta_extractor_factory = + reinterpret_cast( + jvaluemeta_extractor_factory_handle); +} + /* * Class: org_rocksdb_ColumnFamilyOptions * Method: setCompactionFilterHandle diff --git a/java/rocksjni/value_extractor.cc b/java/rocksjni/value_extractor.cc new file mode 100644 index 0000000000..bcf9f12406 --- /dev/null +++ b/java/rocksjni/value_extractor.cc @@ -0,0 +1,29 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). +// +// This file implements the "bridge" between Java and C++ for +// TERARKDB_NAMESPACE::CompactionFilter. + +#include "rocksdb/value_extractor.h" + +#include + +#include "include/org_rocksdb_AbstractValueExtractorFactory.h" + +// + +/* + * Class: org_rocksdb_AbstractCompactionFilter + * Method: disposeInternal + * Signature: (J)V + */ +void Java_org_rocksdb_AbstractValueExtractorFactory_disposeInternal( + JNIEnv* /*env*/, jobject /*jobj*/, jlong handle) { + auto* vef = + reinterpret_cast(handle); + assert(vef != nullptr); + delete vef; +} +// diff --git a/java/src/main/java/org/rocksdb/AbstractValueExtractoFactory.java b/java/src/main/java/org/rocksdb/AbstractValueExtractoFactory.java new file mode 100644 index 0000000000..fe6853f733 --- /dev/null +++ b/java/src/main/java/org/rocksdb/AbstractValueExtractoFactory.java @@ -0,0 +1,40 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +package org.rocksdb; + +/** + * Each compaction will create a new {@link AbstractCompactionFilter} + * allowing the application to know about different compactions + * + * @param The concrete type of the compaction filter + */ +public abstract class AbstractValueExtractorFactory> + extends RocksObject { + + public AbstractValueExtractorFactory(final long nativeHandle) { + super(nativeHandle); + } + + + /** + * A name which identifies this compaction filter + * + * The name will be printed to the LOG file on start up for diagnosis + */ + public abstract String name(); + + /** + * We override {@link RocksCallbackObject#disposeInternal()} + * as disposing of a TERARKDB_NAMESPACE::AbstractCompactionFilterFactory requires + * a slightly different approach as it is a std::shared_ptr + */ + @Override + protected void disposeInternal() { + disposeInternal(nativeHandle_); + } + + private native void disposeInternal(final long handle); +} diff --git a/java/src/main/java/org/rocksdb/FlinkCompactionFilter.java b/java/src/main/java/org/rocksdb/FlinkCompactionFilter.java index 577b7547a5..ffa760e7eb 100644 --- a/java/src/main/java/org/rocksdb/FlinkCompactionFilter.java +++ b/java/src/main/java/org/rocksdb/FlinkCompactionFilter.java @@ -26,7 +26,7 @@ public FlinkCompactionFilter(ConfigHolder configHolder, TimeProvider timeProvide public FlinkCompactionFilter(ConfigHolder configHolder, TimeProvider timeProvider, Logger logger) { super(createNewFlinkCompactionFilter0(configHolder.nativeHandle_, timeProvider, logger == null ? 0 : logger.nativeHandle_)); } - + private native static long createFlinkValueExtractorFactory(long configHolderHandle); private native static long createNewFlinkCompactionFilter0(long configHolderHandle, TimeProvider timeProvider, long loggerHandle); private native static long createNewFlinkCompactionFilterConfigHolder(); private native static void disposeFlinkCompactionFilterConfigHolder(long configHolderHandle); @@ -118,6 +118,18 @@ public interface TimeProvider { long currentTimestamp(); } + public static class FlinkValueExtractorFactory + extends AbstractValueExtractorFactory { + + public FlinkValueExtractor(ConfigHolder configHolder) { + super(createFlinkValueExtractorFactory(configHolder)); + } + @Override + public String name() { + return "FlinkValueExtractorFactory"; + } + } + public static class FlinkCompactionFilterFactory extends AbstractCompactionFilterFactory { private final ConfigHolder configHolder; private final TimeProvider timeProvider; diff --git a/utilities/flink/flink_compaction_filter.h b/utilities/flink/flink_compaction_filter.h index 15ef3871d1..71c13424cc 100644 --- a/utilities/flink/flink_compaction_filter.h +++ b/utilities/flink/flink_compaction_filter.h @@ -33,7 +33,7 @@ static const std::size_t JAVA_MAX_SIZE = static_cast(0x7fffffff); * Note: this compaction filter is a special implementation, designed for usage * only in Apache Flink project. */ -class FlinkCompactionFilter : public CompactionFilter, ValueExtractor { +class FlinkCompactionFilter : public CompactionFilter, public ValueExtractor { public: enum StateType { // WARNING!!! Do not change the order of enum entries as it is important for @@ -144,7 +144,7 @@ class FlinkCompactionFilter : public CompactionFilter, ValueExtractor { std::string* skip_until) const override; Status Extract(const Slice& key, const Slice& value, - std::string* output) const override; + std::string* output) const override; bool IgnoreSnapshots() const override { return true; } @@ -196,5 +196,22 @@ static const FlinkCompactionFilter::Config DISABLED_CONFIG = std::numeric_limits::max(), std::numeric_limits::max(), nullptr}; +class FlinkValueExtractorFactory : public ValueExtractorFactory { + public: + const char* Name() const override { + return "flink.ValueTimeStampExtractorFactory"; + } + explicit FlinkValueExtractorFactory( + std::shared_ptr config_holder) + : config_holder_(std::move(config_holder)) {} + std::unique_ptr CreateValueExtractor( + const Context& context) const { + return std::unique_ptr( + new FlinkCompactionFilter(config_holder_, nullptr, nullptr)); + }; + private: + std::shared_ptr config_holder_; +}; + } // namespace flink } // namespace TERARKDB_NAMESPACE \ No newline at end of file