Skip to content
This repository has been archived by the owner on Jun 16, 2023. It is now read-only.

修复jstorm代码merge丢失的jstorm-kafka代码 #441

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class KafkaConsumer {
private LinkedList<Host> brokerList;
private int brokerIndex;
private Broker leaderBroker;
private short fetchResponseCode = 0;

public KafkaConsumer(KafkaSpoutConfig config) {
this.config = config;
Expand Down Expand Up @@ -82,14 +83,16 @@ public ByteBufferMessageSet fetchMessages(int partition, long offset) throws IOE
}
}
if (fetchResponse.hasError()) {
short code = fetchResponse.errorCode(topic, partition);
if (code == ErrorMapping.OffsetOutOfRangeCode() && config.resetOffsetIfOutOfRange) {
long startOffset = getOffset(topic, partition, config.startOffsetTime);
offset = startOffset;
fetchResponseCode = fetchResponse.errorCode(topic, partition);
if (fetchResponseCode == ErrorMapping.OffsetOutOfRangeCode()) {
}
// if (code == ErrorMapping.OffsetOutOfRangeCode() && config.resetOffsetIfOutOfRange) {
// long startOffset = getOffset(topic, partition, config.startOffsetTime);
// offset = startOffset;
// }
if(leaderBroker != null) {
LOG.error("fetch data from kafka topic[" + config.topic + "] host[" + leaderBroker.host() + ":" + leaderBroker.port() + "] partition["
+ partition + "] error:" + code);
+ partition + "] error:" + fetchResponseCode);
}else {

}
Expand All @@ -99,6 +102,12 @@ public ByteBufferMessageSet fetchMessages(int partition, long offset) throws IOE
return msgs;
}
}

public short getAndResetFetchResponseCode(){
short code = this.fetchResponseCode;
this.fetchResponseCode = 0;
return code;
}

private SimpleConsumer findLeaderConsumer(int partition) {
try {
Expand Down Expand Up @@ -238,4 +247,4 @@ public void setLeaderBroker(Broker leaderBroker) {
this.leaderBroker = leaderBroker;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,27 @@ public void deactivate() {
@Override
public void nextTuple() {
Collection<PartitionConsumer> partitionConsumers = coordinator.getPartitionConsumers();
boolean isAllSleeping = true;
for(PartitionConsumer consumer: partitionConsumers) {
EmitState state = consumer.emit(collector);
LOG.debug("====== partition "+ consumer.getPartition() + " emit message state is "+state);
if(!consumer.isSleepingConsumer() ){
isAllSleeping = false;
EmitState state = consumer.emit(collector);
LOG.debug("====== partition "+ consumer.getPartition() + " emit message state is "+state);
}
// if(state != EmitState.EMIT_MORE) {
// currentPartitionIndex = (currentPartitionIndex+1) % consumerSize;
// }
// if(state != EmitState.EMIT_NONE) {
// break;
// }
}
if(isAllSleeping){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long now = System.currentTimeMillis();
if((now - lastUpdateMs) > config.offsetUpdateIntervalMs) {
commitState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import java.nio.ByteBuffer;

import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
Expand All @@ -18,6 +19,7 @@
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import kafka.common.ErrorMapping;
import backtype.storm.Config;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.utils.Utils;
Expand Down Expand Up @@ -48,6 +50,7 @@ static enum EmitState {
private long lastCommittedOffset;
private ZkState zkState;
private Map stormConf;
private long consumerSleepEndTime = 0;

public PartitionConsumer(Map conf, KafkaSpoutConfig config, int partition, ZkState offsetState) {
this.stormConf = conf;
Expand All @@ -68,16 +71,17 @@ public PartitionConsumer(Map conf, KafkaSpoutConfig config, int partition, ZkSta
}

try {
if (config.fromBeginning) {
emittingOffset = consumer.getOffset(config.topic, partition, kafka.api.OffsetRequest.EarliestTime());
} else {
if (jsonOffset == null) {
lastCommittedOffset = consumer.getOffset(config.topic, partition, kafka.api.OffsetRequest.LatestTime());
if (jsonOffset == null) {
if (config.fromBeginning) {
emittingOffset = consumer.getOffset(config.topic, partition, kafka.api.OffsetRequest.EarliestTime());
} else {
lastCommittedOffset = jsonOffset;
lastCommittedOffset = consumer.getOffset(config.topic, partition, kafka.api.OffsetRequest.LatestTime());
}
emittingOffset = lastCommittedOffset;
} else {
lastCommittedOffset = jsonOffset;
}
emittingOffset = lastCommittedOffset;

} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
Expand Down Expand Up @@ -125,10 +129,18 @@ private void fillMessages() {
msgs = consumer.fetchMessages(partition, emittingOffset + 1);

if (msgs == null) {
LOG.error("fetch null message from offset {}", emittingOffset);
short fetchResponseCode = consumer.getAndResetFetchResponseCode();
if (fetchResponseCode == ErrorMapping.OffsetOutOfRangeCode()) {
this.emittingOffset = consumer.getOffset(config.topic, partition, kafka.api.OffsetRequest.LatestTime());
LOG.warn("reset kafka offset {}", emittingOffset);
}else{
this.consumerSleepEndTime = System.currentTimeMillis() + 100;
LOG.warn("sleep until {}", consumerSleepEndTime);
}
LOG.warn("fetch null message from offset {}", emittingOffset);
return;
}

int count = 0;
for (MessageAndOffset msg : msgs) {
count += 1;
Expand All @@ -144,6 +156,10 @@ private void fillMessages() {
LOG.error(e.getMessage(),e);
}
}

public boolean isSleepingConsumer(){
return System.currentTimeMillis() < this.consumerSleepEndTime;
}

public void commitState() {
try {
Expand Down Expand Up @@ -224,4 +240,4 @@ public KafkaConsumer getConsumer() {
public void setConsumer(KafkaConsumer consumer) {
this.consumer = consumer;
}
}
}