Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix-16990] Fix WorkflowExecutionGraph#isTaskFinish is not working correctly #16995

Merged
merged 1 commit into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,27 @@ public interface IWorkflowExecutionGraph {
*/
boolean isTaskExecutionRunnableActive(final ITaskExecutionRunnable taskExecutionRunnable);

/**
* Whether the given task is inactive.
* <p> A task is inactive means the task has been `executed`.
*/
boolean isTaskExecutionRunnableInActive(final ITaskExecutionRunnable taskExecutionRunnable);

/**
* Whether the given task is killed.
*/
boolean isTaskExecutionRunnableKilled(final ITaskExecutionRunnable taskExecutionRunnable);

/**
* Whether the given task is failure.
*/
boolean isTaskExecutionRunnableFailed(final ITaskExecutionRunnable taskExecutionRunnable);

/**
* Whether the given task is paused.
*/
boolean isTaskExecutionRunnablePaused(final ITaskExecutionRunnable taskExecutionRunnable);

/**
* Get the active TaskExecutionRunnable list.
* <p> The active TaskExecutionRunnable means the task is handling in the workflow execution graph.
Expand Down Expand Up @@ -176,6 +192,7 @@ public interface IWorkflowExecutionGraph {
* Check whether the given task is the end of the task chain.
* <p> If the given task has no successor, then it is the end of the task chain.
* <p> If the given task is killed or paused, then it is the end of the task chain.
* <p> If the given task is failure, and all its successors are condition task then it is not end of a task chain.
*/
boolean isEndOfTaskChain(final ITaskExecutionRunnable taskExecutionRunnable);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class WorkflowExecutionGraph implements IWorkflowExecutionGraph {

private final Set<String> activeTaskExecutionRunnable;

private final Set<String> inActiveTaskExecutionRunnable;

public WorkflowExecutionGraph() {
this.failureTaskChains = new HashSet<>();
this.pausedTaskChains = new HashSet<>();
Expand All @@ -60,6 +62,7 @@ public WorkflowExecutionGraph() {
this.successors = new HashMap<>();
this.totalTaskExecuteRunnableMap = new HashMap<>();
this.activeTaskExecutionRunnable = new HashSet<>();
this.inActiveTaskExecutionRunnable = new HashSet<>();
}

@Override
Expand Down Expand Up @@ -143,11 +146,26 @@ public boolean isTaskExecutionRunnableActive(final ITaskExecutionRunnable taskEx
return activeTaskExecutionRunnable.contains(taskExecutionRunnable.getName());
}

@Override
public boolean isTaskExecutionRunnableInActive(ITaskExecutionRunnable taskExecutionRunnable) {
return inActiveTaskExecutionRunnable.contains(taskExecutionRunnable.getName());
}

@Override
public boolean isTaskExecutionRunnableKilled(final ITaskExecutionRunnable taskExecutionRunnable) {
return killedTaskChains.contains(taskExecutionRunnable.getName());
}

@Override
public boolean isTaskExecutionRunnableFailed(ITaskExecutionRunnable taskExecutionRunnable) {
return failureTaskChains.contains(taskExecutionRunnable.getName());
}

@Override
public boolean isTaskExecutionRunnablePaused(ITaskExecutionRunnable taskExecutionRunnable) {
return pausedTaskChains.contains(taskExecutionRunnable.getName());
}

@Override
public List<ITaskExecutionRunnable> getActiveTaskExecutionRunnable() {
return activeTaskExecutionRunnable
Expand All @@ -165,10 +183,10 @@ public List<ITaskExecutionRunnable> getAllTaskExecutionRunnable() {
public boolean isTriggerConditionMet(final ITaskExecutionRunnable taskExecutionRunnable) {
return getPredecessors(taskExecutionRunnable.getName())
.stream()
.allMatch(predecessor -> isTaskFinish(predecessor)
&& !isTaskFailure(predecessor)
&& !isTaskPaused(predecessor)
&& !isTaskKilled(predecessor));
.allMatch(predecessor -> isTaskExecutionRunnableInActive(predecessor)
&& !isTaskExecutionRunnableFailed(predecessor)
&& !isTaskExecutionRunnablePaused(predecessor)
&& !isTaskExecutionRunnableKilled(predecessor));
}

@Override
Expand Down Expand Up @@ -209,6 +227,7 @@ public void markTaskExecutionRunnableActive(final ITaskExecutionRunnable taskExe
@Override
public void markTaskExecutionRunnableInActive(final ITaskExecutionRunnable taskExecutionRunnable) {
activeTaskExecutionRunnable.remove(taskExecutionRunnable.getName());
inActiveTaskExecutionRunnable.add(taskExecutionRunnable.getName());
}

@Override
Expand Down Expand Up @@ -242,8 +261,9 @@ public void markTaskSkipped(final String taskName) {
@Override
public boolean isEndOfTaskChain(final ITaskExecutionRunnable taskExecutionRunnable) {
return successors.get(taskExecutionRunnable.getName()).isEmpty()
|| killedTaskChains.contains(taskExecutionRunnable.getName())
|| pausedTaskChains.contains(taskExecutionRunnable.getName());
|| isTaskExecutionRunnableKilled(taskExecutionRunnable)
|| isTaskExecutionRunnablePaused(taskExecutionRunnable)
|| isTaskExecutionRunnableFailed(taskExecutionRunnable);
}

@Override
Expand Down Expand Up @@ -291,22 +311,6 @@ public boolean isAllSuccessorsAreConditionTask(final ITaskExecutionRunnable task
|| TaskTypeUtils.isConditionTask(taskExecutionRunnable.getTaskInstance().getTaskType()));
}

private boolean isTaskFinish(final ITaskExecutionRunnable taskExecutionRunnable) {
return !activeTaskExecutionRunnable.contains(taskExecutionRunnable.getName());
}

private boolean isTaskFailure(final ITaskExecutionRunnable taskExecutionRunnable) {
return failureTaskChains.contains(taskExecutionRunnable.getName());
}

private boolean isTaskPaused(final ITaskExecutionRunnable taskExecutionRunnable) {
return pausedTaskChains.contains(taskExecutionRunnable.getName());
}

private boolean isTaskKilled(final ITaskExecutionRunnable taskExecutionRunnable) {
return killedTaskChains.contains(taskExecutionRunnable.getName());
}

private void assertTaskExecutionRunnableState(final ITaskExecutionRunnable taskExecutionRunnable,
final TaskExecutionStatus taskExecutionStatus) {
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ protected void pauseActiveTask(final IWorkflowExecutionRunnable workflowExecutio
}
}

protected void onTaskFinish(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final ITaskExecutionRunnable taskExecutionRunnable) {
protected void tryToTriggerSuccessorsAfterTaskFinish(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final ITaskExecutionRunnable taskExecutionRunnable) {
final IWorkflowExecutionGraph workflowExecutionGraph = workflowExecutionRunnable.getWorkflowExecutionGraph();
if (workflowExecutionGraph.isEndOfTaskChain(taskExecutionRunnable)) {
emitWorkflowFinishedEventIfApplicable(workflowExecutionRunnable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void startEventAction(final IWorkflowExecutionRunnable workflowExecutionR
public void topologyLogicalTransitionEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent workflowTopologyLogicalTransitionWithTaskFinishEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
super.onTaskFinish(workflowExecutionRunnable,
super.tryToTriggerSuccessorsAfterTaskFinish(workflowExecutionRunnable,
workflowTopologyLogicalTransitionWithTaskFinishEvent.getTaskExecutionRunnable());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void startEventAction(final IWorkflowExecutionRunnable workflowExecutionR
public void topologyLogicalTransitionEventAction(final IWorkflowExecutionRunnable workflowExecutionRunnable,
final WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent workflowTopologyLogicalTransitionWithTaskFinishEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
super.onTaskFinish(workflowExecutionRunnable,
super.tryToTriggerSuccessorsAfterTaskFinish(workflowExecutionRunnable,
workflowTopologyLogicalTransitionWithTaskFinishEvent.getTaskExecutionRunnable());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void topologyLogicalTransitionEventAction(
final IWorkflowExecutionRunnable workflowExecutionRunnable,
final WorkflowTopologyLogicalTransitionWithTaskFinishLifecycleEvent workflowTopologyLogicalTransitionWithTaskFinishEvent) {
throwExceptionIfStateIsNotMatch(workflowExecutionRunnable);
super.onTaskFinish(workflowExecutionRunnable,
super.tryToTriggerSuccessorsAfterTaskFinish(workflowExecutionRunnable,
workflowTopologyLogicalTransitionWithTaskFinishEvent.getTaskExecutionRunnable());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,90 @@ public void testStartWorkflow_with_subWorkflowTask_dryRunSuccess() {
masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow with one fake task(A) with multiple predecessors run success")
void testStartWorkflow_with_oneTaskWithMultiplePredecessors_runSuccess() {
final String yaml = "/it/start/workflow_with_one_fake_task_with_multiple_predecessors_success.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(parentWorkflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
.matches(
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.SUCCESS);

Assertions
.assertThat(repository.queryTaskInstance(workflowInstanceId))
.hasSize(4)
.anySatisfy(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("A");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
})
.anySatisfy(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("B");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
})
.anySatisfy(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("C");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
})
.anySatisfy(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("D");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
});
});
masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow with one fake task(A) with multiple predecessors run failed")
void testStartWorkflow_with_oneTaskWithMultiplePredecessors_runFailed() {
final String yaml = "/it/start/workflow_with_one_fake_task_with_multiple_predecessors_failed.yaml";
final WorkflowTestCaseContext context = workflowTestCaseContextFactory.initializeContextFromYaml(yaml);
final WorkflowDefinition parentWorkflow = context.getOneWorkflow();

final WorkflowOperator.WorkflowTriggerDTO workflowTriggerDTO = WorkflowOperator.WorkflowTriggerDTO.builder()
.workflowDefinition(parentWorkflow)
.runWorkflowCommandParam(new RunWorkflowCommandParam())
.build();
final Integer workflowInstanceId = workflowOperator.manualTriggerWorkflow(workflowTriggerDTO);

await()
.atMost(Duration.ofMinutes(1))
.untilAsserted(() -> {
Assertions
.assertThat(repository.queryWorkflowInstance(workflowInstanceId))
.matches(
workflowInstance -> workflowInstance.getState() == WorkflowExecutionStatus.FAILURE);

Assertions
.assertThat(repository.queryTaskInstance(workflowInstanceId))
.hasSize(3)
.anySatisfy(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("A");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
})
.anySatisfy(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("B");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.FAILURE);
})
.anySatisfy(taskInstance -> {
assertThat(taskInstance.getName()).isEqualTo("C");
assertThat(taskInstance.getState()).isEqualTo(TaskExecutionStatus.SUCCESS);
});
});
masterContainer.assertAllResourceReleased();
}

@Test
@DisplayName("Test start a workflow with one sub workflow task(A) failed")
public void testStartWorkflow_with_subWorkflowTask_failed() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# A(success) -> B(failed) -> D(success)
# C(success) -> D(success)
project:
name: MasterIntegrationTest
code: 1
description: This is a fake project
userId: 1
userName: admin
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00

workflows:
- name: workflow_with_one_fake_task_with_multiple_predecessors_failed
code: 1
version: 1
projectCode: 1
description: This is a fake workflow with one task which has multiple predecessors
releaseState: ONLINE
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
userId: 1
executionType: PARALLEL

tasks:
- name: A
code: 1
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 2"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH
- name: B
code: 2
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":null,"varPool":[],"shellScript":"xx"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH
- name: C
code: 3
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo success"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH
- name: D
code: 4
version: 1
projectCode: 1
userId: 1
taskType: LogicFakeTask
taskParams: '{"localParams":null,"varPool":[],"shellScript":"echo success"}'
workerGroup: default
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH

taskRelations:
- projectCode: 1
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
postTaskCode: 1
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
- projectCode: 1
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 1
preTaskVersion: 1
postTaskCode: 2
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
- projectCode: 1
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 2
preTaskVersion: 1
postTaskCode: 4
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
- projectCode: 1
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 0
preTaskVersion: 0
postTaskCode: 3
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
- projectCode: 1
workflowDefinitionCode: 1
workflowDefinitionVersion: 1
preTaskCode: 3
preTaskVersion: 1
postTaskCode: 4
postTaskVersion: 1
createTime: 2024-08-12 00:00:00
updateTime: 2024-08-12 00:00:00
Loading
Loading