Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The consumers don't resume automatically after rebalancing which causes a rebalancing loop. #1720

Open
maksym-opanasenko-ft opened this issue Nov 4, 2024 · 1 comment

Comments

@maksym-opanasenko-ft
Copy link

Describe the bug
The consumers don't resume automatically after rebalancing which causes a rebalancing loop.

To Reproduce
The repository to reproduce (https://github.com/maksym-opanasenko/kafkajs-constant-rebalancing/blob/main/ingex.js)

Steps to reproduce

  1. Create a topic and fill it with more than 1000 messages
  2. Clone the repo and install node modules
  3. Start a single instance of the consumer using npm run start:default
  4. After the consumer has started printing the Processing message... logs, start the second consumer (in a different terminal window) using npm run start:default so the broker starts rebalancing

Expected behavior
A consumer resumes processing messages after rebalancing

Observed behavior

  1. "The group is rebalancing error is logged by kafka logger":
    {"level":"ERROR","timestamp":"2024-11-04T17:33:27.661Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 3)","broker":"kafka:9092","clientId":"pro-content-enricher","error":"The group is rebalancing, so a rejoin is needed","correlationId":49,"size":10}
  2. After some time the "The coordinator is not aware of this member" error is logged:
    {"level":"ERROR","timestamp":"2024-11-04T17:33:43.956Z","logger":"kafkajs","message":"[Connection] Response JoinGroup(key: 11, version: 5)","broker":"kafka:9092","clientId":"pro-content-enricher","error":"The coordinator is not aware of this member","correlationId":321,"size":81}
  3. After some time the consumer reconnects, and the "Consumer has joined the group" message is logged. This causes rebalancing, and the second consumer (in the second terminal window) is affected. After some time the second one repeats the steps 1-3, and causes the rebalancing of the first one.

Summary: When running multiple consumers they cause a rebalancing loop for each other!

Environment:

Important
When running the code from the indexPatched file: https://github.com/maksym-opanasenko/kafkajs-constant-rebalancing/blob/main/indexPatched.js, the consumers are able to recover after rebalancing.

@Squarix
Copy link

Squarix commented Dec 13, 2024

I see your workaround has custom heartbeat implementation.
What will happen, if you will raise the error, when you catch it inside the heartbeat?
I mean, is the problem exists, if we notify the kafkajs about the error and let it handle the error properly on lib level?

It can be some

class CustomKafkaConsumer {
  eachMessage() {
    let heartbeatTimerId;
    let heartbeatResolve;
    const heartbeatPromise = new Promise((resolve, reject) => {
      heartbeatResolve = resolve;
      heartbeatTimerId = +setInterval(async () => {
        try {
	  await heartbeat();
	} catch (error) {
	  console.error('Error sending heartbeat:', error);
	  reject(error);
	}
      }, this.kafkaConfig.heartbeatInterval);
    });
    
    console.log(`Processing message: ${message.key.toString()}`);
    await Promise.race([
      new Promise(resolve => setTimeout(resolve, 5000)),
      heartbeatPromise,
    ]).finally(() => {
      clearInterval(heartbeatTimerId);
      heartbeatResolve();
    });
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants