Skip to content

Commit

Permalink
[HUDI-8881] trigger failure earlier in Flink Hudi sink
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangZhenQiu committed Jan 23, 2025
1 parent baf141a commit 3112505
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
ValidationUtils.checkState(operatorEvent instanceof WriteMetadataEvent,
"The coordinator can only handle WriteMetaEvent");
WriteMetadataEvent event = (WriteMetadataEvent) operatorEvent;
// throw exception as early as possible
detectWriteFailure(event);

if (event.isEndInput()) {
// handle end input event synchronously
Expand Down Expand Up @@ -473,6 +475,17 @@ private void handleEndInputEvent(WriteMetadataEvent event) {
}
}

private void detectWriteFailure(WriteMetadataEvent event) throws HoodieException {
// It will early detect any write failures in each of task and rollback the instant
// to prevent data loss caused by commit failure after a checkpoint is finished successfully.
long totalErrorRecords = event.getWriteStatuses().stream().map(WriteStatus::getTotalErrorRecords).reduce(Long::sum).orElse(0L);
if (totalErrorRecords > 0 && ! this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) {
// Rolls back instant
writeClient.rollback(event.getInstantTime());
throw new HoodieException(String.format("Write failure happened for Instant [%s] and rolled back !", instant));
}
}

private void scheduleTableServices(Boolean committed) {
// if compaction is on, schedule the compaction
if (tableState.scheduleCompaction) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.sink.event.WriteMetadataEvent;
Expand Down Expand Up @@ -74,6 +75,7 @@
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
Expand Down Expand Up @@ -287,6 +289,22 @@ public void testRecommitWithLazyFailedWritesCleanPolicy() {
assertThat("Recommits the instant with lazy failed writes clean policy", TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()), is(instant));
}

@Test
public void testWriteFailureDetection() throws Exception {
// reset
reset();
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
coordinator = new StreamWriteOperatorCoordinator(conf, context);
coordinator.start();
coordinator.setExecutor(new MockCoordinatorExecutor(context));
assertThrows(HoodieException.class, () -> {
WriteMetadataEvent event = createOperatorEvent(0, coordinator.getInstant(), "par1", true, 0.1);
event.getWriteStatuses().get(0).markFailure("", "par1", new IOException("Write failure"));
coordinator.handleEventFromOperator(0, event);
});
}

@Test
public void testHiveSyncInvoked() throws Exception {
// reset
Expand Down

0 comments on commit 3112505

Please sign in to comment.