From d04b0f1adb718574c2b2ba5687264c9620d7fed5 Mon Sep 17 00:00:00 2001 From: Zhenqiu Huang Date: Fri, 17 Jan 2025 11:04:51 -0800 Subject: [PATCH] [HUDI-8881] trigger failure earlier in Flink Hudi sink --- .../sink/StreamWriteOperatorCoordinator.java | 17 ++++++++++++- .../TestStreamWriteOperatorCoordinator.java | 24 ++++++++++++++----- 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 64b580a20ad27..22046ad90f17c 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -284,6 +284,8 @@ public void handleEventFromOperator(int i, OperatorEvent operatorEvent) { ValidationUtils.checkState(operatorEvent instanceof WriteMetadataEvent, "The coordinator can only handle WriteMetaEvent"); WriteMetadataEvent event = (WriteMetadataEvent) operatorEvent; + // throw exception as early as possible + filterWriteFailure(event); if (event.isEndInput()) { // handle end input event synchronously @@ -456,7 +458,7 @@ private void handleBootstrapEvent(WriteMetadataEvent event) { } } - private void handleEndInputEvent(WriteMetadataEvent event) { + private void handleEndInputEvent(WriteMetadataEvent event) throws HoodieException { addEventToBuffer(event); if (allEventsReceived()) { // start to commit the instant. @@ -473,6 +475,19 @@ private void handleEndInputEvent(WriteMetadataEvent event) { } } + private void filterWriteFailure(WriteMetadataEvent event) throws HoodieException { + // It will early detect any write failures in each of task and rollback the instant + // to prevent data loss caused by commit failure after a checkpoint is finished successfully. + if (!event.isBootstrap()){ + long totalErrorRecords = event.getWriteStatuses().stream().map(WriteStatus::getTotalRecords).reduce(Long::sum).orElse(0L); + if (totalErrorRecords > 0 && ! this.conf.getBoolean(FlinkOptions.IGNORE_FAILED)) { + // Rolls back instant + writeClient.rollback(event.getInstantTime()); + throw new HoodieException(String.format("Write failure happened for Instant [%s] and rolled back !", instant)); + } + } + } + private void scheduleTableServices(Boolean committed) { // if compaction is on, schedule the compaction if (tableState.scheduleCompaction) { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index c9a5d4c9ff2f5..1df244a429355 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -32,6 +32,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.sink.event.WriteMetadataEvent; @@ -69,12 +70,7 @@ import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.CoreMatchers.startsWith; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.*; /** * Test cases for StreamingSinkOperatorCoordinator. @@ -287,6 +283,22 @@ public void testRecommitWithLazyFailedWritesCleanPolicy() { assertThat("Recommits the instant with lazy failed writes clean policy", TestUtils.getLastCompleteInstant(tempFile.getAbsolutePath()), is(instant)); } + @Test + public void testWriteFailureDetection() throws Exception { + // reset + reset(); + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1); + coordinator = new StreamWriteOperatorCoordinator(conf, context); + coordinator.start(); + coordinator.setExecutor(new MockCoordinatorExecutor(context)); + assertThrows(HoodieException.class, () -> { + WriteMetadataEvent event = createOperatorEvent(0, coordinator.getInstant(), "par1", true, 0.1); + event.getWriteStatuses().get(0).markFailure("", "par1", new IOException("Write failure")); + coordinator.handleEventFromOperator(0, event); + }); + } + @Test public void testHiveSyncInvoked() throws Exception { // reset