Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execute launchplans declared locally and automatically adjust input params based on fixed and default values #3115

Merged
merged 3 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ coverage.xml

# Version file is auto-generated by setuptools_scm
flytekit/_version.py
testing
18 changes: 17 additions & 1 deletion flytekit/clis/sdk_in_container/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1052,9 +1052,25 @@ def _create_command(
r = run_level_params.remote_instance()
flyte_ctx = r.context

final_inputs_with_defaults = loaded_entity.python_interface.inputs_with_defaults
if isinstance(loaded_entity, LaunchPlan):
# For LaunchPlans it is essential to handle fixed inputs and default inputs in a special way
# Fixed inputs are inputs that are always passed to the launch plan and cannot be overridden
# Default inputs are inputs that are optional and have a default value
# The final inputs to the launch plan are a combination of the fixed inputs and the default inputs
all_inputs = loaded_entity.python_interface.inputs_with_defaults
default_inputs = loaded_entity.saved_inputs
pmap = loaded_entity.parameters
final_inputs_with_defaults = {}
for name, _ in pmap.parameters.items():
_type, v = all_inputs[name]
if name in default_inputs:
v = default_inputs[name]
final_inputs_with_defaults[name] = _type, v
Comment on lines +1055 to +1069
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider extracting launch plan input logic

The launch plan input handling logic could be extracted into a separate helper method to improve code readability and maintainability. Consider moving the logic for handling fixed and default inputs into a dedicated function.

Code suggestion
Check the AI-generated fix before applying
Suggested change
final_inputs_with_defaults = loaded_entity.python_interface.inputs_with_defaults
if isinstance(loaded_entity, LaunchPlan):
# For LaunchPlans it is essential to handle fixed inputs and default inputs in a special way
# Fixed inputs are inputs that are always passed to the launch plan and cannot be overridden
# Default inputs are inputs that are optional and have a default value
# The final inputs to the launch plan are a combination of the fixed inputs and the default inputs
all_inputs = loaded_entity.python_interface.inputs_with_defaults
default_inputs = loaded_entity.saved_inputs
pmap = loaded_entity.parameters
final_inputs_with_defaults = {}
for name, _ in pmap.parameters.items():
_type, v = all_inputs[name]
if name in default_inputs:
v = default_inputs[name]
final_inputs_with_defaults[name] = _type, v
final_inputs_with_defaults = self._get_launch_plan_inputs(loaded_entity) if isinstance(loaded_entity, LaunchPlan) else loaded_entity.python_interface.inputs_with_defaults
def _get_launch_plan_inputs(self, launch_plan):
# For LaunchPlans it is essential to handle fixed inputs and default inputs in a special way
# Fixed inputs are inputs that are always passed to the launch plan and cannot be overridden
# Default inputs are inputs that are optional and have a default value
# The final inputs to the launch plan are a combination of the fixed inputs and the default inputs
all_inputs = launch_plan.python_interface.inputs_with_defaults
default_inputs = launch_plan.saved_inputs
pmap = launch_plan.parameters
final_inputs = {}
for name, _ in pmap.parameters.items():
_type, v = all_inputs[name]
if name in default_inputs:
v = default_inputs[name]
final_inputs[name] = _type, v
return final_inputs

Code Review Run #58cf8a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged


# Add options for each of the workflow inputs
params = []
for input_name, input_type_val in loaded_entity.python_interface.inputs_with_defaults.items():
for input_name, input_type_val in final_inputs_with_defaults.items():
literal_var = loaded_entity.interface.inputs.get(input_name)
python_type, default_val = input_type_val
required = type(None) not in get_args(python_type) and default_val is None
Expand Down
88 changes: 44 additions & 44 deletions tests/flytekit/unit/cli/pyflyte/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from flytekit.clis.sdk_in_container.run import (
RunLevelParams,
get_entities_in_file,
run_command,
run_command, WorkflowCommand,
)
from flytekit.configuration import Config, Image, ImageConfig
from flytekit.core.task import task
Expand All @@ -28,8 +28,7 @@
from flytekit.remote import FlyteRemote
from typing import Iterator, List
from flytekit.types.iterator import JSON
from flytekit import workflow

from flytekit import workflow, LaunchPlan

pytest.importorskip("pandas")

Expand Down Expand Up @@ -205,7 +204,7 @@ def test_pyflyte_run_cli(workflow_file):
"--s",
json.dumps({"x": {"i": 1, "a": ["h", "e"]}}),
"--t",
json.dumps({"i": [{"i":1,"a":["h","e"]}]}),
json.dumps({"i": [{"i": 1, "a": ["h", "e"]}]}),
],
catch_exceptions=False,
)
Expand Down Expand Up @@ -293,15 +292,16 @@ def test_all_types_with_yaml_input():

result = runner.invoke(
pyflyte.main,
["run", os.path.join(DIR_NAME, "workflow.py"), "my_wf", "--inputs-file", os.path.join(os.path.dirname(os.path.realpath(__file__)), "my_wf_input.yaml")],
["run", os.path.join(DIR_NAME, "workflow.py"), "my_wf", "--inputs-file",
os.path.join(os.path.dirname(os.path.realpath(__file__)), "my_wf_input.yaml")],
catch_exceptions=False,
)
assert result.exit_code == 0, result.stdout


def test_all_types_with_pipe_input(monkeypatch):
runner = CliRunner()
input= str(json.load(open(os.path.join(os.path.dirname(os.path.realpath(__file__)), "my_wf_input.json"),"r")))
input = str(json.load(open(os.path.join(os.path.dirname(os.path.realpath(__file__)), "my_wf_input.json"), "r")))
monkeypatch.setattr("sys.stdin", io.StringIO(input))
result = runner.invoke(
pyflyte.main,
Expand All @@ -321,18 +321,18 @@ def test_all_types_with_pipe_input(monkeypatch):
"pipe_input, option_input",
[
(
str(
json.load(
open(
os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"my_wf_input.json",
),
"r",
str(
json.load(
open(
os.path.join(
os.path.dirname(os.path.realpath(__file__)),
"my_wf_input.json",
),
"r",
)
Comment on lines +324 to +332
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using pathlib for file operations

Consider simplifying the nested function calls by using pathlib.Path for file operations. This could make the code more readable and maintainable.

Code suggestion
Check the AI-generated fix before applying
 -                str(
 -                    json.load(
 -                        open(
 -                            os.path.join(
 -                                os.path.dirname(os.path.realpath(__file__)),
 -                                "my_wf_input.json",
 -                            ),
 -                            "r",
 -                        )
 -                    )
 -                ),
 +                str(json.loads(pathlib.Path(__file__).parent.joinpath("my_wf_input.json").read_text())),

Code Review Run #58cf8a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

)
)
),
"GREEN",
),
"GREEN",
)
],
)
Expand Down Expand Up @@ -579,11 +579,11 @@ def test_list_default_arguments(wf_path):
reason="Github macos-latest image does not have docker installed as per https://github.com/orgs/community/discussions/25777",
)
def test_pyflyte_run_run(
mock_image,
image_string,
leaf_configuration_file_name,
final_image_config,
mock_image_spec_builder,
mock_image,
image_string,
leaf_configuration_file_name,
final_image_config,
mock_image_spec_builder,
):
mock_image.return_value = "cr.flyte.org/flyteorg/flytekit:py3.9-latest"
ImageBuildEngine.register("test", mock_image_spec_builder)
Expand All @@ -597,10 +597,10 @@ def tk(): ...
image_config = ImageConfig.validate_image(None, "", image_tuple)

pp = (
pathlib.Path(__file__).parent.parent.parent
/ "configuration"
/ "configs"
/ leaf_configuration_file_name
pathlib.Path(__file__).parent.parent.parent
/ "configuration"
/ "configs"
/ leaf_configuration_file_name
)

obj = RunLevelParams(
Expand Down Expand Up @@ -641,7 +641,7 @@ def jsons():

@mock.patch("flytekit.configuration.default_images.DefaultImages.default_image")
def test_pyflyte_run_with_iterator_json_type(
mock_image, mock_image_spec_builder, caplog
mock_image, mock_image_spec_builder, caplog
):
mock_image.return_value = "cr.flyte.org/flyteorg/flytekit:py3.9-latest"
ImageBuildEngine.register(
Expand Down Expand Up @@ -679,10 +679,10 @@ def tk_simple_iterator(x: Iterator[int] = iter([1, 2, 3])) -> Iterator[int]:
image_config = ImageConfig.validate_image(None, "", image_tuple)

pp = (
pathlib.Path(__file__).parent.parent.parent
/ "configuration"
/ "configs"
/ "no_images.yaml"
pathlib.Path(__file__).parent.parent.parent
/ "configuration"
/ "configs"
/ "no_images.yaml"
)

obj = RunLevelParams(
Expand Down Expand Up @@ -796,9 +796,9 @@ def test_pyflyte_run_with_none(a_val, workflow_file):
[
(["--env", "MY_ENV_VAR=hello"], '["MY_ENV_VAR"]', "hello"),
(
["--env", "MY_ENV_VAR=hello", "--env", "ABC=42"],
'["MY_ENV_VAR","ABC"]',
"hello,42",
["--env", "MY_ENV_VAR=hello", "--env", "ABC=42"],
'["MY_ENV_VAR","ABC"]',
"hello,42",
),
],
)
Expand All @@ -813,16 +813,16 @@ def test_pyflyte_run_with_none(a_val, workflow_file):
def test_envvar_local_execution(envs, envs_argument, expected_output, workflow_file):
runner = CliRunner()
args = (
[
"run",
]
+ envs
+ [
workflow_file,
"wf_with_env_vars",
"--env_vars",
]
+ [envs_argument]
[
"run",
]
+ envs
+ [
workflow_file,
"wf_with_env_vars",
"--env_vars",
]
+ [envs_argument]
)
result = runner.invoke(
pyflyte.main,
Expand Down
72 changes: 72 additions & 0 deletions tests/flytekit/unit/cli/pyflyte/test_run_lps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import click

from flytekit import task, workflow, LaunchPlan
from flytekit.clis.sdk_in_container.run import WorkflowCommand, RunLevelParams

import mock
import pytest


@task
def two_inputs(x: int, y: str) -> str:
return f"{x},{y}"

@workflow
def two_inputs_wf(x: int, y: str) -> str:
return two_inputs(x, y)

lp_fixed_y_default_x = LaunchPlan.get_or_create(
workflow=two_inputs_wf,
name="fixed-default-inputs",
fixed_inputs={"y": "hello"},
default_inputs={"x": 1}
)

lp_fixed_y = LaunchPlan.get_or_create(
workflow=two_inputs_wf,
name="fixed-y",
fixed_inputs={"y": "hello"},
)

lp_fixed_x = LaunchPlan.get_or_create(
workflow=two_inputs_wf,
name="fixed-x",
fixed_inputs={"x": 1},
)

lp_fixed_all = LaunchPlan.get_or_create(
workflow=two_inputs_wf,
name="fixed-all",
fixed_inputs={"x": 1, "y": "test"},
)

lp_default_x = LaunchPlan.get_or_create(
name="default-inputs",
workflow=two_inputs_wf,
default_inputs={"x": 1}
)

lp_simple = LaunchPlan.get_or_create(
workflow=two_inputs_wf,
name="no-fixed-default",
)

@pytest.mark.parametrize("lp_execs", [
(lp_fixed_y_default_x, {"x": 1}),
(lp_fixed_y, {"x": None}),
(lp_fixed_x, {"y": None}),
(lp_fixed_all, {}),
(lp_default_x, {"y": None, "x": 1}),
(lp_simple, {"x": None, "y": None}),
])
def test_workflowcommand_create_command(lp_execs):
cmd = WorkflowCommand("testfile.py")
rp = RunLevelParams()
ctx = click.Context(cmd, obj=rp)
lp, exp_opts = lp_execs
opts = cmd._create_command(ctx, "test_entity", rp, lp, "launch plan").params
for o in opts:
if "input" in o.name:
continue
assert o.name in exp_opts
assert o.default == exp_opts[o.name]
Loading