Skip to content

Commit

Permalink
kafka set failed state when produce or fetch failed (#1685)
Browse files Browse the repository at this point in the history
  • Loading branch information
kedixa authored Jan 13, 2025
1 parent 3aeb5a0 commit 8cb8935
Showing 1 changed file with 8 additions and 31 deletions.
39 changes: 8 additions & 31 deletions src/factory/KafkaTaskImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -521,30 +521,18 @@ bool __ComplexKafkaTask::process_fetch()
this->get_resp()->get_toppar_list()->rewind();
while ((toppar = this->get_resp()->get_toppar_list()->get_next()) != NULL)
{
if (toppar->get_error() == KAFKA_OFFSET_OUT_OF_RANGE)
int toppar_error = toppar->get_error();

if (toppar_error == KAFKA_OFFSET_OUT_OF_RANGE)
{
toppar->set_offset(KAFKA_OFFSET_OVERFLOW);
toppar->set_low_watermark(KAFKA_OFFSET_UNINIT);
toppar->set_high_watermark(KAFKA_OFFSET_UNINIT);
ret = true;
}

switch (toppar->get_error())
else if (toppar_error)
{
case KAFKA_UNKNOWN_TOPIC_OR_PARTITION:
case KAFKA_LEADER_NOT_AVAILABLE:
case KAFKA_NOT_LEADER_FOR_PARTITION:
case KAFKA_BROKER_NOT_AVAILABLE:
case KAFKA_REPLICA_NOT_AVAILABLE:
case KAFKA_KAFKA_STORAGE_ERROR:
case KAFKA_FENCED_LEADER_EPOCH:
this->get_req()->set_api_type(Kafka_Metadata);
return true;
case 0:
case KAFKA_OFFSET_OUT_OF_RANGE:
break;
default:
ctx_ = toppar->get_error();
ctx_ = toppar_error;
this->error = WFT_ERR_KAFKA_FETCH_FAILED;
this->state = WFT_STATE_TASK_ERROR;
return false;
Expand Down Expand Up @@ -580,21 +568,10 @@ bool __ComplexKafkaTask::process_produce()
return true;
}

switch (toppar->get_error())
if (toppar->get_error())
{
case KAFKA_UNKNOWN_TOPIC_OR_PARTITION:
case KAFKA_LEADER_NOT_AVAILABLE:
case KAFKA_NOT_LEADER_FOR_PARTITION:
case KAFKA_BROKER_NOT_AVAILABLE:
case KAFKA_REPLICA_NOT_AVAILABLE:
case KAFKA_KAFKA_STORAGE_ERROR:
case KAFKA_FENCED_LEADER_EPOCH:
this->get_req()->set_api_type(Kafka_Metadata);
return true;
case 0:
break;
default:
this->error = toppar->get_error();
ctx_ = toppar->get_error();
this->error = WFT_ERR_KAFKA_PRODUCE_FAILED;
this->state = WFT_STATE_TASK_ERROR;
return false;
}
Expand Down

0 comments on commit 8cb8935

Please sign in to comment.