Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flink support value-extractor #220 #222

Open
wants to merge 2 commits into
base: dev.1.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions java/rocksjni/flink_compactionfilterjni.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,15 @@ void Java_org_rocksdb_FlinkCompactionFilter_disposeFlinkCompactionFilterConfigHo
delete config_holder;
}

jlong Java_org_rocksdb_FlinkCompactionFilter_createFlinkValueExtractorFactory(
JNIEnv* env, jclass /* jcls */, jlong config_holder_handle) {
using namespace TERARKDB_NAMESPACE::flink;
auto config_holder =
*(reinterpret_cast<std::shared_ptr<FlinkCompactionFilter::ConfigHolder>*>(
config_holder_handle));
return reinterpret_cast<jlong>(new FlinkValueExtractorFactory(config_holder));
}

/*
* Class: org_rocksdb_FlinkCompactionFilter
* Method: createNewFlinkCompactionFilter0
Expand Down
14 changes: 14 additions & 0 deletions java/rocksjni/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3305,6 +3305,20 @@ void Java_org_rocksdb_ColumnFamilyOptions_setMergeOperator(
mergeOperatorHandle));
}

/*
* Class: org_rocksdb_ColumnFamilyOptions
* Method: setCompactionFilterHandle
* Signature: (JJ)V
*/
void Java_org_rocksdb_ColumnFamilyOptions_setValueExtractorFactoryHandle(
JNIEnv* /*env*/, jobject /*jobj*/, jlong jopt_handle,
jlong jvaluemeta_extractor_factory_handle) {
reinterpret_cast<TERARKDB_NAMESPACE::ColumnFamilyOptions*>(jopt_handle)
->value_meta_extractor_factory =
reinterpret_cast<TERARKDB_NAMESPACE::ValueExtractorFactory*>(
jvaluemeta_extractor_factory_handle);
}

/*
* Class: org_rocksdb_ColumnFamilyOptions
* Method: setCompactionFilterHandle
Expand Down
29 changes: 29 additions & 0 deletions java/rocksjni/value_extractor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
// This file implements the "bridge" between Java and C++ for
// TERARKDB_NAMESPACE::CompactionFilter.

#include "rocksdb/value_extractor.h"

#include <jni.h>

#include "include/org_rocksdb_AbstractValueExtractorFactory.h"

// <editor-fold desc="org.rocksdb.AbstractCompactionFilter">

/*
* Class: org_rocksdb_AbstractCompactionFilter
* Method: disposeInternal
* Signature: (J)V
*/
void Java_org_rocksdb_AbstractValueExtractorFactory_disposeInternal(
JNIEnv* /*env*/, jobject /*jobj*/, jlong handle) {
auto* vef =
reinterpret_cast<TERARKDB_NAMESPACE::ValueExtractorFactory*>(handle);
assert(vef != nullptr);
delete vef;
}
// </editor-fold>
40 changes: 40 additions & 0 deletions java/src/main/java/org/rocksdb/AbstractValueExtractoFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).

package org.rocksdb;

/**
* Each compaction will create a new {@link AbstractCompactionFilter}
* allowing the application to know about different compactions
*
* @param <T> The concrete type of the compaction filter
*/
public abstract class AbstractValueExtractorFactory<T extends AbstractSlice<?>>
extends RocksObject {

public AbstractValueExtractorFactory(final long nativeHandle) {
super(nativeHandle);
}


/**
* A name which identifies this compaction filter
*
* The name will be printed to the LOG file on start up for diagnosis
*/
public abstract String name();

/**
* We override {@link RocksCallbackObject#disposeInternal()}
* as disposing of a TERARKDB_NAMESPACE::AbstractCompactionFilterFactory requires
* a slightly different approach as it is a std::shared_ptr
*/
@Override
protected void disposeInternal() {
disposeInternal(nativeHandle_);
}

private native void disposeInternal(final long handle);
}
12 changes: 12 additions & 0 deletions java/src/main/java/org/rocksdb/ColumnFamilyOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public ColumnFamilyOptions(ColumnFamilyOptions other) {
this.comparator_ = other.comparator_;
this.compactionFilter_ = other.compactionFilter_;
this.compactionFilterFactory_ = other.compactionFilterFactory_;
this.valueExtractorFactory_ = other.valueExtractorFactory_;
this.compactionOptionsUniversal_ = other.compactionOptionsUniversal_;
this.compressionOptions_ = other.compressionOptions_;
}
Expand Down Expand Up @@ -225,6 +226,15 @@ public ColumnFamilyOptions setCompactionFilterFactory(final AbstractCompactionFi
return this;
}


public ColumnFamilyOptions setValueExtractorFactory(final AbstractValueExtractorFactory<? extends AbstractSlice<?>> valueExtractorFactory) {
assert (isOwningHandle());
setValueExtractorFactoryHandle(nativeHandle_, valueExtractorFactory.nativeHandle_);
valueExtractorFactory_ = valueExtractorFactory;
return this;
}


@Override
public ColumnFamilyOptions setWriteBufferSize(final long writeBufferSize) {
assert(isOwningHandle());
Expand Down Expand Up @@ -949,6 +959,8 @@ private native void setForceConsistencyChecks(final long handle,
private AbstractCompactionFilter<? extends AbstractSlice<?>> compactionFilter_;
AbstractCompactionFilterFactory<? extends AbstractCompactionFilter<?>>
compactionFilterFactory_;
AbstractValueExtractorFactory<? extends AbstractSlice<?>>
valueExtractorFactory_;
private CompactionOptionsUniversal compactionOptionsUniversal_;
private CompressionOptions compressionOptions_;

Expand Down
14 changes: 13 additions & 1 deletion java/src/main/java/org/rocksdb/FlinkCompactionFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public FlinkCompactionFilter(ConfigHolder configHolder, TimeProvider timeProvide
public FlinkCompactionFilter(ConfigHolder configHolder, TimeProvider timeProvider, Logger logger) {
super(createNewFlinkCompactionFilter0(configHolder.nativeHandle_, timeProvider, logger == null ? 0 : logger.nativeHandle_));
}

private native static long createFlinkValueExtractorFactory(long configHolderHandle);
private native static long createNewFlinkCompactionFilter0(long configHolderHandle, TimeProvider timeProvider, long loggerHandle);
private native static long createNewFlinkCompactionFilterConfigHolder();
private native static void disposeFlinkCompactionFilterConfigHolder(long configHolderHandle);
Expand Down Expand Up @@ -118,6 +118,18 @@ public interface TimeProvider {
long currentTimestamp();
}

public static class FlinkValueExtractorFactory
extends AbstractValueExtractorFactory<Slice> {

public FlinkValueExtractor(ConfigHolder configHolder) {
super(createFlinkValueExtractorFactory(configHolder));
}
@Override
public String name() {
return "FlinkValueExtractorFactory";
}
}

public static class FlinkCompactionFilterFactory extends AbstractCompactionFilterFactory<FlinkCompactionFilter> {
private final ConfigHolder configHolder;
private final TimeProvider timeProvider;
Expand Down
48 changes: 32 additions & 16 deletions utilities/flink/flink_compaction_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ FlinkCompactionFilter::ConfigHolder::~ConfigHolder() {
delete config;
}
}

// at the moment Flink configures filters (can be already created) only once
// when user creates state
// otherwise it can lead to ListElementFilter leak in Config
Expand Down Expand Up @@ -112,17 +111,43 @@ inline void FlinkCompactionFilter::InitConfigIfNotYet() const {
config_cached_ == &DISABLED_CONFIG ? config_holder_->GetConfig()
: config_cached_;
}
Status FlinkCompactionFilter::Extract(const Slice& key, const Slice& value,
std::string* output) const try {
const StateType state_type = config_cached_->state_type_;
const bool tooShortValue =
value.size() < config_cached_->timestamp_offset_ + TIMESTAMP_BYTE_SIZE;
if (state_type != StateType::List && !tooShortValue) {
output->assign(value.data() + config_cached_->timestamp_offset_,
TIMESTAMP_BYTE_SIZE);
}
return Status::OK();
} catch (const std::exception& e) {
return Status::Corruption(e.what());
}

CompactionFilter::Decision FlinkCompactionFilter::FilterV2(
int /*level*/, const Slice& key, ValueType value_type,
const Slice& /*existing_value_meta*/, const LazyBuffer& existing_lazy_value,
const Slice& value_meta, const LazyBuffer& existing_lazy_value,
LazyBuffer* new_value, std::string* /*skip_until*/) const {
auto s = existing_lazy_value.fetch();
if (!s.ok()) {
new_value->reset(std::move(s));
return CompactionFilter::Decision::kKeep;
const StateType state_type = config_cached_->state_type_;
const bool value_or_merge =
value_type == ValueType::kValue || value_type == ValueType::kMergeOperand;
const bool value_state =
state_type == StateType::Value && value_type == ValueType::kValue;
const bool list_entry = state_type == StateType::List && value_or_merge;
const bool toDecide = value_state || list_entry;
const bool list_filter = list_entry && list_element_filter_;
Status s;
bool no_meta = list_filter || value_meta == nullptr;
if (no_meta) {
s = existing_lazy_value.fetch();
if (!s.ok()) {
new_value->reset(std::move(s));
return CompactionFilter::Decision::kKeep;
}
}
const Slice& existing_value = existing_lazy_value.slice();
const Slice& existing_value =
no_meta ? existing_lazy_value.slice() : value_meta;

InitConfigIfNotYet();
CreateListElementFilterIfNull();
Expand All @@ -143,15 +168,6 @@ CompactionFilter::Decision FlinkCompactionFilter::FilterV2(
existing_value.size() <
config_cached_->timestamp_offset_ + TIMESTAMP_BYTE_SIZE;

const StateType state_type = config_cached_->state_type_;
const bool value_or_merge =
value_type == ValueType::kValue || value_type == ValueType::kMergeOperand;
const bool value_state =
state_type == StateType::Value && value_type == ValueType::kValue;
const bool list_entry = state_type == StateType::List && value_or_merge;
const bool toDecide = value_state || list_entry;
const bool list_filter = list_entry && list_element_filter_;

Decision decision = Decision::kKeep;
if (!tooShortValue && toDecide) {
decision = list_filter ? ListDecide(existing_value, new_value)
Expand Down
23 changes: 22 additions & 1 deletion utilities/flink/flink_compaction_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "rocksdb/lazy_buffer.h"
#include "rocksdb/slice.h"
#include "rocksdb/terark_namespace.h"
#include "rocksdb/value_extractor.h"

namespace TERARKDB_NAMESPACE {
namespace flink {
Expand All @@ -32,7 +33,7 @@ static const std::size_t JAVA_MAX_SIZE = static_cast<std::size_t>(0x7fffffff);
* Note: this compaction filter is a special implementation, designed for usage
* only in Apache Flink project.
*/
class FlinkCompactionFilter : public CompactionFilter {
class FlinkCompactionFilter : public CompactionFilter, public ValueExtractor {
public:
enum StateType {
// WARNING!!! Do not change the order of enum entries as it is important for
Expand Down Expand Up @@ -142,6 +143,9 @@ class FlinkCompactionFilter : public CompactionFilter {
const LazyBuffer& existing_value, LazyBuffer* new_value,
std::string* skip_until) const override;

Status Extract(const Slice& key, const Slice& value,
std::string* output) const override;

bool IgnoreSnapshots() const override { return true; }

private:
Expand Down Expand Up @@ -192,5 +196,22 @@ static const FlinkCompactionFilter::Config DISABLED_CONFIG =
std::numeric_limits<int64_t>::max(),
std::numeric_limits<int64_t>::max(), nullptr};

class FlinkValueExtractorFactory : public ValueExtractorFactory {
public:
const char* Name() const override {
return "flink.ValueTimeStampExtractorFactory";
}
explicit FlinkValueExtractorFactory(
std::shared_ptr<FlinkCompactionFilter::ConfigHolder> config_holder)
: config_holder_(std::move(config_holder)) {}
std::unique_ptr<ValueExtractor> CreateValueExtractor(
const Context& context) const {
return std::unique_ptr<ValueExtractor>(
new FlinkCompactionFilter(config_holder_, nullptr, nullptr));
};
private:
std::shared_ptr<FlinkCompactionFilter::ConfigHolder> config_holder_;
};

} // namespace flink
} // namespace TERARKDB_NAMESPACE