-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for object change detection in filters #2
Conversation
@@ -25,6 +25,17 @@ | |||
* in any denormalized records. They are effectively treated as a tombstone. | |||
*/ | |||
public abstract class BaseFilter { | |||
|
|||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Docstrings should start with /**
.
There are some other cases of missing *
in this PR.
*/ | ||
public abstract boolean isFiltered(String entity, BaseRecord record); | ||
public FilterMode filter(String entity, BaseRecord record, BaseRecord oldRecord) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to keep that last parameter optional.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is optional in the sense that a class extending the BaseFilter
has the option of implementing either function.
protected Iterator<ConsumerRecord<byte[], byte[]>> iter; | ||
protected KafkaTopic<K, V> topic; | ||
|
||
// Information about the next valid record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to describe what null
values mean.
/* | ||
* Internal helper to obtain and stage the next non-skipped record | ||
*/ | ||
private ConsumerRecord<byte[], byte[]> getAndStageNextRecord() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getAndStage
is a bit confusing to me. Maybe findAndStage
or something, implying that this is more than a simple getter.
} | ||
|
||
@Override | ||
public boolean hasNext() { | ||
return iter.hasNext(); | ||
return (getAndStageNextRecord() != null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
()
are superfluous
@@ -69,49 +76,125 @@ | |||
private KafkaTopicIterator(Iterator<ConsumerRecord<byte[], byte[]>> iter, KafkaTopic<K, V> topic) { | |||
this.iter = iter; | |||
this.topic = topic; | |||
this.nextRecord = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.resetStagedRecord()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went over the changes, LGTM.
Naturally, I could not approve without a comment about a newline ;)
@@ -34,10 +45,26 @@ public void configure(Map<String, Object> config) { | |||
} | |||
|
|||
/** | |||
* Determines if the given record should be filtered based on its entity | |||
* DEPRECATED |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use this instead.
public void update(T value) { | ||
this.value = value; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing newline, looks like your editor needs to be configured.
168dcec
to
a0678a3
Compare
*/ | ||
public abstract boolean isFiltered(String entity, BaseRecord record); | ||
public FilterMode filter(String entity, BaseRecord record) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Continuing from @moritastic comment, this signature isn't deprecated. It's a breaking change since the return value changes. This PR already introduces a number of backwards compatibility changes. Lets just remove this and I'll make a note of it in the release
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, 0b72cf1 removes the deprecated signature
NOTE: This PR is currently unstable and should not be merged. It suffers from issue #4.Update: Issue #4 is not exclusive to this PR, removing [UNSTABLE] tag.
Why this PR is being opened:
This PR fixes issue #5 by allowing filters to detect changes. Given a One-To-Many relationship with frequent updates to a single record in the first relation, may cause an excessive number of output records to be produced if a large number of records corresponding to it exists in the second relation. This PR introduces the capability for filters to detect changes in input records and decide the course of action that should be taken, one of:
UPDATE
,DELETE
, orSKIP
in an effort to combat this problem.Why existing functionality does not work:
The existing filter functionality only allows records to be treated as deleted (or tombstoned). This may cause output records to exclude necessary data.
What this PR changes:
BaseFilter
requires afilter
function which now returnsFilterMode
BaseFilter.filter()
additionally accepts the oldBaseRecord
as the last argumentDefaultFilter
class updated to be more explicit about previously implicit filtering logicNotes:
See Commit 5dbb83a
Filtering happens in theKafkaTopicIterator.next()
call. An important caveat is thathasNext()
may returnTrue
if records exist in the input topic, however, if those records are all skipped, the return value ofnext()
will beNULL
. In an upcoming commit, I will be changinghasNext()
to perform filtering as well to avoid this discrepancyCommit 25f3212 updates
BaseFilter
to support twofilter()
signatures, one which accepts the old record and one which doesn't. This allows users who only care about the new record in their filters to avoid having an ignored argument.Commit 415f0b0 corrects [known issue] Since
KafkaTopic.hasNext()
andKafkaTopic.next()
can now theoretically consume many input topic messages as they skip along, they should also now incrementrecords.consumed
andrecords.consumed.by.entity
. Previously, this was only done here. Without any changes, we will be under reporting these metrics.Commit c0d719f addresses PR feedback related to large number of arguments necessary for BaseTopic configuration. Created a
TopicConfig
class, an instance can be passed toBaseTopic.configure
instead.Commit 168dcec addresses a bug which fails to null the value field on deletes, see: https://github.com/jwplayer/southpaw/blob/master/src/main/java/com/jwplayer/southpaw/topic/KafkaTopic.java#L91 this is used by the engine to avoid generating denorm docs
0b72cf1 removes deprecated function signature from BaseFilter. This was initially intended to be backwards compatible with existing filters but isn't due to change in return value.