Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Redpanda Migrator components #3026

Open
wants to merge 18 commits into
base: main
Choose a base branch
from

Conversation

mihaitodor
Copy link
Collaborator

@mihaitodor mihaitodor commented Nov 21, 2024

I hijacked this PR to address several issues:

Added

  • New redpanda_migrator_offsets input.
  • Fields offset_topic, offset_group, offset_partition, offset_commit_timestamp and offset_metadata added to the redpanda_migrator_offsets output.
  • Fields kafka_key and max_in_flight for the redpanda_migrator_offsets output are now deprecated.
  • Fields batching for the redpanda_migrator output is now deprecated.
  • Field topic_lag_refresh_period added to the redpanda and redpanda_common inputs.
  • Metric redpanda_lag now emitted by the redpanda and redpanda_common inputs.
  • Metadata kafka_lag now emitted by the redpanda and redpanda_common inputs.

Fixed

  • The redpanda_migrator_bundle output now skips schema ID translation when translate_schema_ids: false and schema_registry is configured.
  • The redpanda_migrator output no longer rejects messages if it can't perform schema ID translation.
  • The redpanda_migrator input no longer converts the kafka key to string.

Changed

  • The kafka_key and max_in_flight fields of the redpanda_migrator_offsets output are now deprecated.
  • Fields batch_size and multi_header for the redpanda_migrator input are now deprecated.
    • The redpanda_migrator_bundle input and output now set labels for their subcomponents.
  • The redpanda_migrator input no longer emits tombstone messages.

Redpanda Migrator offset metadata

One quick way to test this is via the following config. Note how I overwrite kafka_offset_metadata to foobar in a mapping processor.

input:
  redpanda_migrator_bundle:
    redpanda_migrator:
      seed_brokers: [ "localhost:9092" ]
      topics:
        - '^[^_]' # Skip internal topics which start with `_`
      regexp_topics: true
      consumer_group: migrator_bundle
      start_from_oldest: true
      replication_factor_override: true
      replication_factor: -1

    schema_registry:
      url: http://localhost:8081
      include_deleted: true
      subject_filter: ""

output:
  processors:
    - switch:
        - check: metadata("input_label") == "redpanda_migrator_offsets_input"
          processors:
            - mapping: |
                meta kafka_offset_metadata = "foobar"
  redpanda_migrator_bundle:
    redpanda_migrator:
      seed_brokers: [ "localhost:9093" ]
      max_in_flight: 1
      replication_factor_override: true
      replication_factor: -1

    schema_registry:
      url: http://localhost:8082

@mihaitodor mihaitodor force-pushed the mihaitodor-add-redpanda-migrator-offset-metadata branch from 34421d0 to 081592f Compare November 21, 2024 02:46
@mihaitodor mihaitodor force-pushed the mihaitodor-add-redpanda-migrator-offset-metadata branch 12 times, most recently from a86bdbd to 72237c4 Compare December 12, 2024 01:18
@mihaitodor mihaitodor force-pushed the mihaitodor-add-redpanda-migrator-offset-metadata branch 5 times, most recently from d37239f to 784ff42 Compare December 16, 2024 11:16
@mihaitodor mihaitodor changed the title Add Redpanda Migrator offset metadata Refactor Redpanda Migrator components Dec 16, 2024
@mihaitodor mihaitodor marked this pull request as ready for review December 16, 2024 11:21
log: res.Logger(),
shutSig: shutdown.NewSignaller(),
clientOpts: optsFn,
topicLagGauge: res.Metrics().NewGauge("redpanda_lag", "topic", "partition"),
Copy link
Collaborator Author

@mihaitodor mihaitodor Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I added the redpanda_migrator input, I had both this gauge and the kafka_lag metadata field. I don't know if we want any of these available by default. Also, should this gauge name be somehow derived from the actual input type (redpanda, redpanda_common, redpanda_migrator, redpanda_migrator_offsets)? It does get the label of the input if set, so maybe that's sufficient.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the label is enough. Do we really want this lag metric for all these inputs? Probably I would assume...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think it's a bit overkill and I don't recall now which conversation led to this pattern. I also emit the kafka_lag metadata field with each message, so one could add a metric processor in the pipeline which creates a gauge for topics as needed. One downside with this approach is if messages stop flowing completely, then this gauge wouldn't get any updates. I think the main idea was to make it easier for people to discover this metric, but it's not clear what the perf impact might be if we consume from thousands of topics, each having multiple partitions. Should I remove it? (cc @Jeffail)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like having the metric emitted here, it's relatively cheap, and extracting from meta is awkward enough no one is going to do it willingly.

@mihaitodor mihaitodor force-pushed the mihaitodor-add-redpanda-migrator-offset-metadata branch from 784ff42 to 642fd09 Compare December 16, 2024 11:43
@mihaitodor mihaitodor force-pushed the mihaitodor-add-redpanda-migrator-offset-metadata branch 6 times, most recently from 34c5d16 to 5749553 Compare December 31, 2024 14:13
@mihaitodor mihaitodor requested review from Jeffail and removed request for Jeffail December 31, 2024 14:50
@mihaitodor mihaitodor force-pushed the mihaitodor-add-redpanda-migrator-offset-metadata branch from 5749553 to bcfeae2 Compare December 31, 2024 15:07
@@ -76,18 +105,25 @@ type FranzReaderOrdered struct {

// NewFranzReaderOrderedFromConfig attempts to instantiate a new
// FranzReaderOrdered reader from a parsed config.
func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Resources, optsFn func() ([]kgo.Opt, error)) (*FranzReaderOrdered, error) {
func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Resources, clientOptsFn clientOptsFn, recordToMessageFn recordToMessageFn, preflightHookFn preflightHookFn, closeHookFn closeHookFn) (*FranzReaderOrdered, error) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This constructor got a bit messy to use... It can be hard to tell which of these funcs is set to nil at the call site and one can easily mix them up. I'm thinking to maybe introduce functional options for it or maybe a struct which contain all the parameters. WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think I'd go with something like a FranzReaderHooks type, where you can do stuff like NewFranzReaderHooks().WithRecordToMessageFn(foo).WithCloseHookFn(bar).

- New `redpanda_migrator_offsets` input
- Fields `offset_topic`, `offset_group`, `offset_partition`, `offset_commit_timestamp` and `offset_metadata` added to the `redpanda_migrator_offsets` output

Signed-off-by: Mihai Todor <[email protected]>
This is required in order to pull in twmb/franz-go#838

This is needed because the `redpanda_migrator` input needs to
create all the matched topics during the first call to
`ReadBatch()`.

Signed-off-by: Mihai Todor <[email protected]>
Signed-off-by: Mihai Todor <[email protected]>
Signed-off-by: Mihai Todor <[email protected]>
@mihaitodor mihaitodor force-pushed the mihaitodor-add-redpanda-migrator-offset-metadata branch from bcfeae2 to e0838a5 Compare January 3, 2025 18:29
@@ -114,8 +121,13 @@ func (f *FranzReaderOrdered) recordsToBatch(records []*kgo.Record) *batchWithRec
var length uint64
var batch service.MessageBatch
for _, r := range records {
record, err := f.recordToMessageFn(r)
if err != nil {
f.log.Debugf("Failed to convert kafka record to message: %s", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is dropping records - that seems bad right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're intentionally dropping tombstone records. Otherwise, kafka.FranzRecordToMessageV1() doesn't return any errors, but yeah, it's easy to misuse. Alternatively, I could set an error on the message, but I'm not aware of any other input which does that and, if the idea is to drop those messages anyway, it would be wasteful to need an extra processor for that. Not sure what's the best approach here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should change the signature of the record to message function so that it's clearer messages are dropped in this case, so have it return (*service.Message, bool), where nil, false is an explicit instruction to discard the record as if it didn't exist.

internal/impl/kafka/franz_reader_ordered.go Outdated Show resolved Hide resolved
@@ -76,18 +79,22 @@ type FranzReaderOrdered struct {

// NewFranzReaderOrderedFromConfig attempts to instantiate a new
// FranzReaderOrdered reader from a parsed config.
func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Resources, optsFn func() ([]kgo.Opt, error)) (*FranzReaderOrdered, error) {
func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Resources, optsFn func() ([]kgo.Opt, error), recordToMessageFn RecordToMessageFn) (*FranzReaderOrdered, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we expand the docs to explain what recordToMessageFn is and that it can be nil? Also is there no way to do this transform after the fact? IE a mapping on the V1 message type?

Copy link
Collaborator Author

@mihaitodor mihaitodor Jan 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we expand the docs to explain what recordToMessageFn is and that it can be nil?

Good idea, sorted! I am still thinking if I should change the signature of this constructor to make it easier to use. Maybe a separate refactor...

Also is there no way to do this transform after the fact? IE a mapping on the V1 message type?

For the redpanda_migrator input, I'd lose the ability to drop tombstone messages directly here, so I'd need to use a mapping processor like I explained in the other comment. I guess that's fine even if it adds a bit of perf overhead, but may be a bit annoying if someone wants to use this redpanda_migrator input directly instead of the redpanda_migrator_bundle.
For the redpanda_migrator_offsets input it gets a bit awkward, because then we'd need a custom processor which can decode both the record.Key and record.Value using franz-go APIs. We'd also lose the ability to filter out messages which contain consumer group updates about topics we don't care about, so I'd have to move the topic filter to the processor instead. It would be confusing for people who don't know they always need to attach this processor to the redpanda_migrator_offsets input.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is an example of what I mean by changing this via a mapping: https://gist.github.com/rockwotj/48bddf4557a5210d6fa415a1cc736090

Basically just wrap the FranzReader in another struct that implements service.BatchReader and in it's ReadBatch function do what ever filtering/transforming you need to do. No hooks required.

}

// Consume messages from the `__consumer_offsets` topic
d.Topics = []string{`__consumer_offsets`}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the plan for Redpanda Serverless, which doesn't support this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a ticket for that and for now it's still something which needs to be discussed. I don't know what we can do there, but both the redpanda_migrator_offsets input and output don't currently work with RP Serverless.

seed_brokers: [ "127.0.0.1:9092" ]
topics: [ "__consumer_offsets" ]
topics: [ "foobar" ]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this foobar?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the topic filter since the redpanda_migrator_offsets input reads messages from __consumer_offsets and we need to drop updates about topics which we're not interested in.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we don't want a filter by default? Or this should be configurable? I don't understand why this is hardcoded the way it is.

@@ -80,7 +99,7 @@ type FranzReaderOrdered struct {

// NewFranzReaderOrderedFromConfig attempts to instantiate a new
// FranzReaderOrdered reader from a parsed config.
func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Resources, optsFn func() ([]kgo.Opt, error), recordToMessageFn RecordToMessageFn) (*FranzReaderOrdered, error) {
func NewFranzReaderOrderedFromConfig(conf *service.ParsedConfig, res *service.Resources, optsFn func() ([]kgo.Opt, error), recordToMessageFn recordToMessageFn, preflightHookFn preflightHookFn, closeHookFn func(res *service.Resources)) (*FranzReaderOrdered, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really loving all these extra hooks into this method... It would be great to wrap this method/reader instead (ala decorator pattern style) - can we do that? I think the net result would be cleaner.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I hate it too (see above), but I wasn't sure how to go about it without making this PR huge.

Which of the following would you prefer:

  • Have exported fields on the reader and writer and set them after calling the NewFranzReaderOrderedFromConfig() and NewFranzWriterFromConfig() constructors
  • Use the functional options pattern
  • Pass a struct which contains all these parameters to the constructors
  • Something else
    I guess you're thinking of the first approach which I haven't considered initially and the functional options approach feels overkill. Embedding the FranzReaderOrdered and FranzWriter structs in new structs and overriding the relevant methods would be nice, but then I have to introduce some more exported helper methods... Let me know if you'd like me to explore this approach.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I was thinking as a diff you can apply on top of this PR: https://gist.github.com/rockwotj/48bddf4557a5210d6fa415a1cc736090. It requires exporting the kgo.Client, but I think that's an okay thing. I haven't seen any hooks yet that fundamentally cannot be implemented in a wrapper layer (decorator pattern).

internal/impl/kafka/franz_reader_ordered.go Outdated Show resolved Hide resolved
internal/impl/kafka/franz_reader_ordered.go Outdated Show resolved Hide resolved
}

// NewFranzWriterFromConfig uses a parsed config to extract customisation for
// writing data to a Kafka broker. A closure function must be provided that is
// responsible for granting access to a connected client.
func NewFranzWriterFromConfig(conf *service.ParsedConfig, accessClientFn func(FranzSharedClientUseFn) error, yieldClientFn func(context.Context) error) (*FranzWriter, error) {
func NewFranzWriterFromConfig(conf *service.ParsedConfig, accessClientFn accessClientFn, yieldClientFn yieldClientFn, onConnectHookFn func(client *kgo.Client), onWriteHookFn onWriteHookFn) (*FranzWriter, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here with these hooks, can we wrap the writer that is returned instead?

Signed-off-by: Mihai Todor <[email protected]>
@mihaitodor mihaitodor requested a review from rockwotj January 5, 2025 21:23
Comment on lines +149 to +156
matchesTopic := func(topic string) bool {
if len(topicPatterns) > 0 {
return slices.ContainsFunc(topicPatterns, func(tp *regexp.Regexp) bool {
return tp.MatchString(topic)
})
}
return slices.ContainsFunc(topics, func(t string) bool {
return t == topic
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good! Now I think this can be lifted out of this closure so it's declared before NewFranzReaderOrderedFromConfig is called. It helps remove clutter from what this function is doing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants