Skip to content

Commit

Permalink
PL-354 - fix key equality (confluentinc#638)
Browse files Browse the repository at this point in the history
fix: array key equality - enables byte array key comparison to work correctly in Key ordered mode
  • Loading branch information
rkolesnev authored Sep 11, 2023
1 parent d337fe5 commit 075b4f9
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 6 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ NOTE:: Dependency version bumps are not listed here.
ifndef::github_name[]
toc::[]
endif::[]
== 0.5.2.8

=== Fixes

* fix: Fix equality and hash code for ShardKey with array key (#638), resolves (#579)

== 0.5.2.7

=== Fixes
Expand Down
6 changes: 6 additions & 0 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1514,6 +1514,12 @@ NOTE:: Dependency version bumps are not listed here.
ifndef::github_name[]
toc::[]
endif::[]
== 0.5.2.8

=== Fixes

* fix: Fix equality and hash code for ShardKey with array key (#638), resolves (#579)

== 0.5.2.7

=== Fixes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.confluent.parallelconsumer.state;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2023 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder;
Expand All @@ -10,6 +10,9 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Objects;

/**
* Simple value class for processing {@link ShardKey}s to make the various key systems type safe and extendable.
*
Expand Down Expand Up @@ -41,7 +44,6 @@ public static ShardKey ofTopicPartition(final ConsumerRecord<?, ?> rec) {
}

@Value
@RequiredArgsConstructor
@EqualsAndHashCode(callSuper = true)
public static class KeyOrderedKey extends ShardKey {

Expand All @@ -54,13 +56,95 @@ public static class KeyOrderedKey extends ShardKey {
/**
* The key of the record being referenced. Nullable if record is produced with a null key.
*/
Object key;
KeyWithEquals key;

public KeyOrderedKey(final ConsumerRecord<?, ?> rec) {
this(new TopicPartition(rec.topic(), rec.partition()), rec.key());
}

public KeyOrderedKey(final TopicPartition topicPartition, final Object key) {
if (key instanceof KeyWithEquals) {
this.key = (KeyWithEquals) key;
} else {
this.key = new KeyWithEquals(key);
}
this.topicName = topicPartition;
}
}

@Value
@RequiredArgsConstructor
public static class KeyWithEquals {
Object key;

@Override
public boolean equals(Object o) {
if (o == this) return true;
if (!(o instanceof KeyWithEquals)) return false;
KeyWithEquals other = (KeyWithEquals) o;
if (other.key == null && this.key == null) return true;
if (other.key == null || this.key == null) return false;
return Objects.deepEquals(this.key, other.key);
}

@Override
public int hashCode() {

final int PRIME = 59;
int result = 1;
result = (result * PRIME);
if (key == null) {
result = result + 43;
return result;
}
if (isArray(key)) {
result = result + arrayHashCode(key);
} else {
result = result + key.hashCode();
}
return result;
}


private int arrayHashCode(Object t) {
if (t instanceof Object[]) {
return Arrays.deepHashCode((Object[]) t);
} else {
return primitiveArrayHashCode(t, t.getClass().getComponentType());
}
}

/**
* Copy of {@link Arrays#primitiveArrayHashCode} logic
*
* @param a
* @param cl
* @return
*/
private int primitiveArrayHashCode(Object a, Class<?> cl) {
return
(cl == byte.class) ? Arrays.hashCode((byte[]) a) :
(cl == int.class) ? Arrays.hashCode((int[]) a) :
(cl == long.class) ? Arrays.hashCode((long[]) a) :
(cl == char.class) ? Arrays.hashCode((char[]) a) :
(cl == short.class) ? Arrays.hashCode((short[]) a) :
(cl == boolean.class) ? Arrays.hashCode((boolean[]) a) :
(cl == double.class) ? Arrays.hashCode((double[]) a) :
// If new primitive types are ever added, this method must be
// expanded or we will fail here with ClassCastException.
Arrays.hashCode((float[]) a);
}

private boolean isArray(Object obj) {
return obj instanceof Object[] || obj instanceof boolean[] ||
obj instanceof byte[] || obj instanceof short[] ||
obj instanceof char[] || obj instanceof int[] ||
obj instanceof long[] || obj instanceof float[] ||
obj instanceof double[];
}
}


@Value
@EqualsAndHashCode(callSuper = true)
public static class TopicPartitionKey extends ShardKey {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package io.confluent.parallelconsumer.state;

/*-
* Copyright (C) 2020-2022 Confluent, Inc.
* Copyright (C) 2020-2023 Confluent, Inc.
*/

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.stream.Stream;

import static io.confluent.parallelconsumer.ManagedTruth.assertThat;
import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY;
Expand Down Expand Up @@ -43,12 +48,14 @@ void keyTest() {
String topicOne = "t1";
TopicPartition topicOneP0 = new TopicPartition("t1", 0);
String keyOne = "k1";

String keyOneAgain = "k1";

// same inputs, different key instances equal
var reck1 = new ConsumerRecord<>(topicOne, 0, 0, keyOne, "v");
var reck1Again = new ConsumerRecord<>(topicOne, 0, 0, keyOneAgain, "v");

ShardKey key1 = ShardKey.of(reck1, ordering);
ShardKey anotherInstanceWithSameInputs = ShardKey.of(reck1, ordering);
ShardKey anotherInstanceWithSameInputs = ShardKey.of(reck1Again, ordering);
assertThat(key1).isEqualTo(anotherInstanceWithSameInputs);

// same topic, same partition, different key
Expand All @@ -74,4 +81,75 @@ void keyTest() {
//assertThat("false").isEmpty();
}

private static Object keyObject = new Object();

static Stream<Arguments> keyEqualityParams() {
return Stream.of(
Arguments.of(keyObject, keyObject),
Arguments.of("key", "key"),
Arguments.of((byte) 1, (byte) 1),
Arguments.of(true, true),
Arguments.of((short) 1, (short) 1),
Arguments.of(1, 1),
Arguments.of(1L, 1L),
Arguments.of((float) 1.1, (float) 1.1),
Arguments.of(1.1, 1.1),
Arguments.of('a', 'a'),
Arguments.of(null, null),
Arguments.of(Boolean.TRUE, Boolean.TRUE),
Arguments.of(Short.valueOf((short) 1), Short.valueOf((short) 1)),
Arguments.of(Integer.valueOf(1), Integer.valueOf(1)),
Arguments.of(Long.valueOf(1L), Long.valueOf(1L)),
Arguments.of(Float.valueOf((float) 1.1), Float.valueOf((float) 1.1)),
Arguments.of(Double.valueOf(1.1), Double.valueOf(1.1)),
Arguments.of(Character.valueOf('a'), Character.valueOf('a')),
Arguments.of(null, null),

Arguments.of(new Object[]{keyObject, "key"}, new Object[]{keyObject, "key"}),
Arguments.of(new String[]{"key1", "key2"}, new String[]{"key1", "key2"}),
Arguments.of(new byte[]{1, 2}, new byte[]{1, 2}),
Arguments.of(new boolean[]{true, false}, new boolean[]{true, false}),
Arguments.of(new short[]{1, 2}, new short[]{1, 2}),
Arguments.of(new int[]{1, 2}, new int[]{1, 2}),
Arguments.of(new long[]{1, 2}, new long[]{1, 2}),
Arguments.of(new float[]{1, 2}, new float[]{1, 2}),
Arguments.of(new double[]{1, 2}, new double[]{1, 2}),
Arguments.of(new char[]{'1', '2'}, new char[]{'1', '2'}),
Arguments.of(new Object[]{null}, new Object[]{null})
);
}

/**
* Parametrized key equality test for different reference and primitive types including arrays.
*/
@ParameterizedTest
@MethodSource("keyEqualityParams")
void testKeyEquality(Object keyOne, Object keyTwo) {
String topic = "topic1";
var reck1 = new ConsumerRecord<>(topic, 0, 0, keyOne, "v");
var reck2 = new ConsumerRecord<>(topic, 0, 0, keyTwo, "v");

ShardKey shardKey1 = ShardKey.of(reck1, KEY);
ShardKey shardKey2 = ShardKey.of(reck2, KEY);
assertThat(shardKey1).isEqualTo(shardKey2);
}

/**
* Tests that equality works correctly for byte[] keys - based on array contents not ref.
*/
@Test
void keyTestByteArray() {
ParallelConsumerOptions.ProcessingOrder ordering = KEY;
String topicOne = "t1";
byte[] keyOne = "k1".getBytes();
byte[] keyOneAgain = "k1".getBytes();

// same inputs, different key instances equal
var reck1 = new ConsumerRecord<>(topicOne, 0, 0, keyOne, "v");
var reck1Again = new ConsumerRecord<>(topicOne, 0, 0, keyOneAgain, "v");

ShardKey key1 = ShardKey.of(reck1, ordering);
ShardKey anotherInstanceWithSameInputs = ShardKey.of(reck1Again, ordering);
assertThat(key1).isEqualTo(anotherInstanceWithSameInputs);
}
}

0 comments on commit 075b4f9

Please sign in to comment.