Skip to content

Commit

Permalink
Add Per-Thread Failure Handlers on WAIT_FOR_THREADS Node (#657) (#662)
Browse files Browse the repository at this point in the history
Features:
* Adds ability to specify failure handlers which run as the child of the child threads on the wait for threads node.

Fixes:
* Fix external bootstraper
* Adds unit test for wait for threads with exception handler on java sdk
* Fixes issue where certain Failure's didn't end up on their respective NodeRun's.
* Fixes issue with server not rejecting RunWf requests that don't specify required input variables
* Adds e2e tests to verify proper handling of variable errors
* Adds 8 e2e tests for the new handling of failures on a per-thread basis in wait for threads nodes
* Removes two legacy e2e tests which are covered in the new tests
---------

Co-authored-by: Eduwer Camacaro <[email protected]>
Co-authored-by: Saúl Piña <[email protected]>
  • Loading branch information
3 people authored Feb 12, 2024
1 parent ebe21c1 commit 6f3cfe8
Show file tree
Hide file tree
Showing 68 changed files with 4,267 additions and 2,380 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import {
LHStatus,
MetadataStatus,
VariableType,
WaitForThreadsPolicy
} from '../../../../../../../littlehorse-public-api/common_enums'
import type {
TaskNode,
Expand Down Expand Up @@ -2795,14 +2794,16 @@ describe('Layouting graph from LH Nodes', () => {
],
'failureHandlers': [],
'waitForThreads': {
'threads': [
{
'threadRunNumber': {
'variableName': '2-spawned-thread-START_THREAD'
'threads': {
threads: [
{
'threadRunNumber': {
'variableName': '2-spawned-thread-START_THREAD'
}
}
}
],
'policy': WaitForThreadsPolicy.STOP_ON_FAILURE
]
},
perThreadFailureHandlers: []
}
},
'4-parent-task-2-TASK': {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { faker } from '@faker-js/faker'
import type { NodeRun } from '../../../littlehorse-public-api/node_run'
import { WaitForThreadsRun_WaitingThreadStatus, type NodeRun } from '../../../littlehorse-public-api/node_run'
import type { ThreadRun } from '../../../littlehorse-public-api/wf_run'
import { ThreadType } from '../../../littlehorse-public-api/wf_run'
import { LHStatus, WaitForThreadsPolicy } from '../../../littlehorse-public-api/common_enums'
import { LHStatus } from '../../../littlehorse-public-api/common_enums'
import ThreadRunsHandler from './ThreadRunsHandler'

describe('handle Thread Runs related logic', () => {
Expand Down Expand Up @@ -86,15 +86,14 @@ describe('handle Thread Runs related logic', () => {
{
'threadStatus': LHStatus.RUNNING,
'threadRunNumber': 1,
'alreadyHandled': false
'waitingStatus': WaitForThreadsRun_WaitingThreadStatus.THREAD_IN_PROGRESS
},
{
'threadStatus': LHStatus.RUNNING,
'threadRunNumber': 2,
'alreadyHandled': false
'waitingStatus': WaitForThreadsRun_WaitingThreadStatus.THREAD_IN_PROGRESS
}
],
'policy': WaitForThreadsPolicy.STOP_ON_FAILURE
]
},
'failureHandlerIds': []
}
Expand Down Expand Up @@ -150,10 +149,9 @@ describe('handle Thread Runs related logic', () => {
{
'threadStatus': LHStatus.RUNNING,
'threadRunNumber': 1,
'alreadyHandled': false
'waitingStatus': WaitForThreadsRun_WaitingThreadStatus.THREAD_IN_PROGRESS
}
],
'policy': WaitForThreadsPolicy.STOP_ON_FAILURE
]
},
'failureHandlerIds': []
}
Expand Down
37 changes: 0 additions & 37 deletions dashboard/apps/web/littlehorse-public-api/common_enums.ts
Original file line number Diff line number Diff line change
Expand Up @@ -516,40 +516,3 @@ export function lHErrorTypeToNumber(object: LHErrorType): number {
return -1;
}
}

export enum WaitForThreadsPolicy {
STOP_ON_FAILURE = "STOP_ON_FAILURE",
UNRECOGNIZED = "UNRECOGNIZED",
}

export function waitForThreadsPolicyFromJSON(object: any): WaitForThreadsPolicy {
switch (object) {
case 0:
case "STOP_ON_FAILURE":
return WaitForThreadsPolicy.STOP_ON_FAILURE;
case -1:
case "UNRECOGNIZED":
default:
return WaitForThreadsPolicy.UNRECOGNIZED;
}
}

export function waitForThreadsPolicyToJSON(object: WaitForThreadsPolicy): string {
switch (object) {
case WaitForThreadsPolicy.STOP_ON_FAILURE:
return "STOP_ON_FAILURE";
case WaitForThreadsPolicy.UNRECOGNIZED:
default:
return "UNRECOGNIZED";
}
}

export function waitForThreadsPolicyToNumber(object: WaitForThreadsPolicy): number {
switch (object) {
case WaitForThreadsPolicy.STOP_ON_FAILURE:
return 0;
case WaitForThreadsPolicy.UNRECOGNIZED:
default:
return -1;
}
}
159 changes: 120 additions & 39 deletions dashboard/apps/web/littlehorse-public-api/node_run.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
/* eslint-disable */
import * as _m0 from "protobufjs/minimal";
import {
LHStatus,
lHStatusFromJSON,
lHStatusToJSON,
lHStatusToNumber,
WaitForThreadsPolicy,
waitForThreadsPolicyFromJSON,
waitForThreadsPolicyToJSON,
waitForThreadsPolicyToNumber,
} from "./common_enums";
import { LHStatus, lHStatusFromJSON, lHStatusToJSON, lHStatusToNumber } from "./common_enums";
import { Timestamp } from "./google/protobuf/timestamp";
import { ExternalEventDefId, ExternalEventId, NodeRunId, TaskRunId, UserTaskRunId, WfSpecId } from "./object_id";
import { VariableValue } from "./variable";
Expand Down Expand Up @@ -157,11 +148,82 @@ export interface StartMultipleThreadsRun {
export interface WaitForThreadsRun {
/** The threads that are being waited for. */
threads: WaitForThreadsRun_WaitForThread[];
}

/** The status of a single ThreadRun that we are waiting for. */
export enum WaitForThreadsRun_WaitingThreadStatus {
/** THREAD_IN_PROGRESS - The ThreadRun is in progress (i.e. not COMPLETED nor EXCEPTION nor ERROR) */
THREAD_IN_PROGRESS = "THREAD_IN_PROGRESS",
/**
* The policy to use when handling failures for Threads. Currently, only
* one policy exists.
* THREAD_HANDLING_FAILURE - The ThreadRun failed with some failure, and the FailureHandler is running
* for that Failure.
*/
policy: WaitForThreadsPolicy;
THREAD_HANDLING_FAILURE = "THREAD_HANDLING_FAILURE",
/**
* THREAD_COMPLETED_OR_FAILURE_HANDLED - We can mark this ThreadRun as "already waited for", meaning that either:
* 1. It completed successfully, OR
* 2. It failed, and the Failure Handler successfully completed
*/
THREAD_COMPLETED_OR_FAILURE_HANDLED = "THREAD_COMPLETED_OR_FAILURE_HANDLED",
/**
* THREAD_UNSUCCESSFUL - The ThreadRun did not complete successfully, and there wasn't a successful
* run of a Failure Handler for the Failure that was thrown.
*/
THREAD_UNSUCCESSFUL = "THREAD_UNSUCCESSFUL",
UNRECOGNIZED = "UNRECOGNIZED",
}

export function waitForThreadsRun_WaitingThreadStatusFromJSON(object: any): WaitForThreadsRun_WaitingThreadStatus {
switch (object) {
case 0:
case "THREAD_IN_PROGRESS":
return WaitForThreadsRun_WaitingThreadStatus.THREAD_IN_PROGRESS;
case 1:
case "THREAD_HANDLING_FAILURE":
return WaitForThreadsRun_WaitingThreadStatus.THREAD_HANDLING_FAILURE;
case 2:
case "THREAD_COMPLETED_OR_FAILURE_HANDLED":
return WaitForThreadsRun_WaitingThreadStatus.THREAD_COMPLETED_OR_FAILURE_HANDLED;
case 3:
case "THREAD_UNSUCCESSFUL":
return WaitForThreadsRun_WaitingThreadStatus.THREAD_UNSUCCESSFUL;
case -1:
case "UNRECOGNIZED":
default:
return WaitForThreadsRun_WaitingThreadStatus.UNRECOGNIZED;
}
}

export function waitForThreadsRun_WaitingThreadStatusToJSON(object: WaitForThreadsRun_WaitingThreadStatus): string {
switch (object) {
case WaitForThreadsRun_WaitingThreadStatus.THREAD_IN_PROGRESS:
return "THREAD_IN_PROGRESS";
case WaitForThreadsRun_WaitingThreadStatus.THREAD_HANDLING_FAILURE:
return "THREAD_HANDLING_FAILURE";
case WaitForThreadsRun_WaitingThreadStatus.THREAD_COMPLETED_OR_FAILURE_HANDLED:
return "THREAD_COMPLETED_OR_FAILURE_HANDLED";
case WaitForThreadsRun_WaitingThreadStatus.THREAD_UNSUCCESSFUL:
return "THREAD_UNSUCCESSFUL";
case WaitForThreadsRun_WaitingThreadStatus.UNRECOGNIZED:
default:
return "UNRECOGNIZED";
}
}

export function waitForThreadsRun_WaitingThreadStatusToNumber(object: WaitForThreadsRun_WaitingThreadStatus): number {
switch (object) {
case WaitForThreadsRun_WaitingThreadStatus.THREAD_IN_PROGRESS:
return 0;
case WaitForThreadsRun_WaitingThreadStatus.THREAD_HANDLING_FAILURE:
return 1;
case WaitForThreadsRun_WaitingThreadStatus.THREAD_COMPLETED_OR_FAILURE_HANDLED:
return 2;
case WaitForThreadsRun_WaitingThreadStatus.THREAD_UNSUCCESSFUL:
return 3;
case WaitForThreadsRun_WaitingThreadStatus.UNRECOGNIZED:
default:
return -1;
}
}

/** A 'WaitForThread' structure defines a thread that is being waited for. */
Expand All @@ -177,8 +239,17 @@ export interface WaitForThreadsRun_WaitForThread {
threadStatus: LHStatus;
/** The number of the ThreadRun being waited for. */
threadRunNumber: number;
/** INTERNAL: flag used by scheduler internally. */
alreadyHandled: boolean;
/**
* The "waiting status" of this specific thread: whether it's still running,
* already done, handling a failure, or completely failed.
*/
waitingStatus: WaitForThreadsRun_WaitingThreadStatus;
/**
* If there is a failure on the ThreadRun, and we have a failure handler defined
* for it, then we will start a failure handler for this threadrun. This field
* is the id of that threadRun.
*/
failureHandlerThreadRunId?: number | undefined;
}

/** The sub-node structure for an EXTERNAL_EVENT NodeRun. */
Expand Down Expand Up @@ -949,17 +1020,14 @@ export const StartMultipleThreadsRun = {
};

function createBaseWaitForThreadsRun(): WaitForThreadsRun {
return { threads: [], policy: WaitForThreadsPolicy.STOP_ON_FAILURE };
return { threads: [] };
}

export const WaitForThreadsRun = {
encode(message: WaitForThreadsRun, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
for (const v of message.threads) {
WaitForThreadsRun_WaitForThread.encode(v!, writer.uint32(10).fork()).ldelim();
}
if (message.policy !== WaitForThreadsPolicy.STOP_ON_FAILURE) {
writer.uint32(16).int32(waitForThreadsPolicyToNumber(message.policy));
}
return writer;
},

Expand All @@ -977,13 +1045,6 @@ export const WaitForThreadsRun = {

message.threads.push(WaitForThreadsRun_WaitForThread.decode(reader, reader.uint32()));
continue;
case 2:
if (tag !== 16) {
break;
}

message.policy = waitForThreadsPolicyFromJSON(reader.int32());
continue;
}
if ((tag & 7) === 4 || tag === 0) {
break;
Expand All @@ -998,7 +1059,6 @@ export const WaitForThreadsRun = {
threads: globalThis.Array.isArray(object?.threads)
? object.threads.map((e: any) => WaitForThreadsRun_WaitForThread.fromJSON(e))
: [],
policy: isSet(object.policy) ? waitForThreadsPolicyFromJSON(object.policy) : WaitForThreadsPolicy.STOP_ON_FAILURE,
};
},

Expand All @@ -1007,9 +1067,6 @@ export const WaitForThreadsRun = {
if (message.threads?.length) {
obj.threads = message.threads.map((e) => WaitForThreadsRun_WaitForThread.toJSON(e));
}
if (message.policy !== WaitForThreadsPolicy.STOP_ON_FAILURE) {
obj.policy = waitForThreadsPolicyToJSON(message.policy);
}
return obj;
},

Expand All @@ -1019,13 +1076,18 @@ export const WaitForThreadsRun = {
fromPartial<I extends Exact<DeepPartial<WaitForThreadsRun>, I>>(object: I): WaitForThreadsRun {
const message = createBaseWaitForThreadsRun();
message.threads = object.threads?.map((e) => WaitForThreadsRun_WaitForThread.fromPartial(e)) || [];
message.policy = object.policy ?? WaitForThreadsPolicy.STOP_ON_FAILURE;
return message;
},
};

function createBaseWaitForThreadsRun_WaitForThread(): WaitForThreadsRun_WaitForThread {
return { threadEndTime: undefined, threadStatus: LHStatus.STARTING, threadRunNumber: 0, alreadyHandled: false };
return {
threadEndTime: undefined,
threadStatus: LHStatus.STARTING,
threadRunNumber: 0,
waitingStatus: WaitForThreadsRun_WaitingThreadStatus.THREAD_IN_PROGRESS,
failureHandlerThreadRunId: undefined,
};
}

export const WaitForThreadsRun_WaitForThread = {
Expand All @@ -1039,8 +1101,11 @@ export const WaitForThreadsRun_WaitForThread = {
if (message.threadRunNumber !== 0) {
writer.uint32(24).int32(message.threadRunNumber);
}
if (message.alreadyHandled === true) {
writer.uint32(40).bool(message.alreadyHandled);
if (message.waitingStatus !== WaitForThreadsRun_WaitingThreadStatus.THREAD_IN_PROGRESS) {
writer.uint32(32).int32(waitForThreadsRun_WaitingThreadStatusToNumber(message.waitingStatus));
}
if (message.failureHandlerThreadRunId !== undefined) {
writer.uint32(40).int32(message.failureHandlerThreadRunId);
}
return writer;
},
Expand Down Expand Up @@ -1073,12 +1138,19 @@ export const WaitForThreadsRun_WaitForThread = {

message.threadRunNumber = reader.int32();
continue;
case 4:
if (tag !== 32) {
break;
}

message.waitingStatus = waitForThreadsRun_WaitingThreadStatusFromJSON(reader.int32());
continue;
case 5:
if (tag !== 40) {
break;
}

message.alreadyHandled = reader.bool();
message.failureHandlerThreadRunId = reader.int32();
continue;
}
if ((tag & 7) === 4 || tag === 0) {
Expand All @@ -1094,7 +1166,12 @@ export const WaitForThreadsRun_WaitForThread = {
threadEndTime: isSet(object.threadEndTime) ? globalThis.String(object.threadEndTime) : undefined,
threadStatus: isSet(object.threadStatus) ? lHStatusFromJSON(object.threadStatus) : LHStatus.STARTING,
threadRunNumber: isSet(object.threadRunNumber) ? globalThis.Number(object.threadRunNumber) : 0,
alreadyHandled: isSet(object.alreadyHandled) ? globalThis.Boolean(object.alreadyHandled) : false,
waitingStatus: isSet(object.waitingStatus)
? waitForThreadsRun_WaitingThreadStatusFromJSON(object.waitingStatus)
: WaitForThreadsRun_WaitingThreadStatus.THREAD_IN_PROGRESS,
failureHandlerThreadRunId: isSet(object.failureHandlerThreadRunId)
? globalThis.Number(object.failureHandlerThreadRunId)
: undefined,
};
},

Expand All @@ -1109,8 +1186,11 @@ export const WaitForThreadsRun_WaitForThread = {
if (message.threadRunNumber !== 0) {
obj.threadRunNumber = Math.round(message.threadRunNumber);
}
if (message.alreadyHandled === true) {
obj.alreadyHandled = message.alreadyHandled;
if (message.waitingStatus !== WaitForThreadsRun_WaitingThreadStatus.THREAD_IN_PROGRESS) {
obj.waitingStatus = waitForThreadsRun_WaitingThreadStatusToJSON(message.waitingStatus);
}
if (message.failureHandlerThreadRunId !== undefined) {
obj.failureHandlerThreadRunId = Math.round(message.failureHandlerThreadRunId);
}
return obj;
},
Expand All @@ -1125,7 +1205,8 @@ export const WaitForThreadsRun_WaitForThread = {
message.threadEndTime = object.threadEndTime ?? undefined;
message.threadStatus = object.threadStatus ?? LHStatus.STARTING;
message.threadRunNumber = object.threadRunNumber ?? 0;
message.alreadyHandled = object.alreadyHandled ?? false;
message.waitingStatus = object.waitingStatus ?? WaitForThreadsRun_WaitingThreadStatus.THREAD_IN_PROGRESS;
message.failureHandlerThreadRunId = object.failureHandlerThreadRunId ?? undefined;
return message;
},
};
Expand Down
Loading

0 comments on commit 6f3cfe8

Please sign in to comment.