Skip to content

Commit

Permalink
Merge pull request #439 from GoogleCloudPlatform/kfp-prebuilt-update
Browse files Browse the repository at this point in the history
Update KFP prebuilt components pipeline
  • Loading branch information
takumiohym authored Apr 17, 2024
2 parents 4a94c24 + 12c1b55 commit efbe9e4
Show file tree
Hide file tree
Showing 5 changed files with 272 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,19 +204,21 @@
"import os\n",
"\n",
"from google.cloud.aiplatform import hyperparameter_tuning as hpt\n",
"from google_cloud_pipeline_components.aiplatform import (\n",
"from google_cloud_pipeline_components.types import artifact_types\n",
"from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp\n",
"from google_cloud_pipeline_components.v1.endpoint import (\n",
" EndpointCreateOp,\n",
" ModelDeployOp,\n",
" ModelUploadOp,\n",
")\n",
"from google_cloud_pipeline_components.experimental import (\n",
" hyperparameter_tuning_job\n",
")\n",
"from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp\n",
"from google_cloud_pipeline_components.v1.hyperparameter_tuning_job import (\n",
" HyperparameterTuningJobRunOp,\n",
" serialize_metrics,\n",
" serialize_parameters,\n",
")\n",
"from kfp.v2 import dsl\n",
"from google_cloud_pipeline_components.v1.model import ModelUploadOp\n",
"from kfp import dsl\n",
"\n",
"from retrieve_best_hptune_component import retrieve_best_hptune_result\n",
"\n",
"PIPELINE_ROOT = os.getenv(\"PIPELINE_ROOT\")\n",
"PROJECT_ID = os.getenv(\"PROJECT_ID\")\n",
Expand Down Expand Up @@ -244,7 +246,6 @@
" pipeline_root=PIPELINE_ROOT,\n",
")\n",
"def create_pipeline():\n",
"\n",
" worker_pool_specs = [\n",
" {\n",
" \"machine_spec\": {\n",
Expand All @@ -264,11 +265,9 @@
" }\n",
" ]\n",
"\n",
" metric_spec = hyperparameter_tuning_job.serialize_metrics(\n",
" {\"accuracy\": \"maximize\"}\n",
" )\n",
" metric_spec = serialize_metrics({\"accuracy\": \"maximize\"})\n",
"\n",
" parameter_spec = hyperparameter_tuning_job.serialize_parameters(\n",
" parameter_spec = serialize_parameters(\n",
" {\n",
" \"alpha\": hpt.DoubleParameterSpec(\n",
" min=1.0e-4, max=1.0e-1, scale=\"linear\"\n",
Expand All @@ -283,41 +282,27 @@
" # TODO\n",
" )\n",
"\n",
" trials_task = hyperparameter_tuning_job.GetTrialsOp(\n",
" gcp_resources=hp_tuning_task.outputs[\"gcp_resources\"]\n",
" )\n",
"\n",
" best_hyperparameters_task = (\n",
" hyperparameter_tuning_job.GetBestHyperparametersOp(\n",
" trials=trials_task.output, study_spec_metrics=metric_spec\n",
" )\n",
" )\n",
"\n",
" # Construct new worker_pool_specs and\n",
" # train new model based on best hyperparameters\n",
" worker_pool_specs_task = hyperparameter_tuning_job.GetWorkerPoolSpecsOp(\n",
" best_hyperparameters=best_hyperparameters_task.output,\n",
" worker_pool_specs=[\n",
" {\n",
" \"machine_spec\": {\"machine_type\": \"n1-standard-4\"},\n",
" \"replica_count\": 1,\n",
" \"container_spec\": {\n",
" \"image_uri\": TRAINING_CONTAINER_IMAGE_URI,\n",
" \"args\": [\n",
" f\"--training_dataset_path={TRAINING_FILE_PATH}\",\n",
" f\"--validation_dataset_path={VALIDATION_FILE_PATH}\",\n",
" \"--nohptune\",\n",
" ],\n",
" },\n",
" }\n",
" ],\n",
" best_retrieval_task = retrieve_best_hptune_result(\n",
" project=PROJECT_ID,\n",
" location=REGION,\n",
" gcp_resources=hp_tuning_task.outputs[\"gcp_resources\"],\n",
" container_uri=TRAINING_CONTAINER_IMAGE_URI,\n",
" training_file_path=TRAINING_FILE_PATH,\n",
" validation_file_path=VALIDATION_FILE_PATH,\n",
" )\n",
"\n",
" training_task = CustomTrainingJobOp( # pylint: disable=unused-variable\n",
" # TODO\n",
"\n",
" )\n",
"\n",
" importer_spec = dsl.importer(\n",
" artifact_uri=f\"{BASE_OUTPUT_DIR}/model\",\n",
" artifact_class=artifact_types.UnmanagedContainerModel,\n",
" metadata={\"containerSpec\": {\"imageUri\": SERVING_CONTAINER_IMAGE_URI}},\n",
" )\n",
" importer_spec.after(training_task)\n",
"\n",
" model_upload_task = ModelUploadOp( # pylint: disable=unused-variable\n",
" # TODO\n",
" )\n",
Expand Down Expand Up @@ -417,7 +402,7 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"We compile the pipeline from the Python file we generated into a JSON description using the following command:"
"We compile the pipeline from the Python file we generated into a YAML description using the following command:"
]
},
{
Expand All @@ -426,7 +411,7 @@
"metadata": {},
"outputs": [],
"source": [
"PIPELINE_JSON = \"covertype_kfp_pipeline.json\""
"PIPELINE_YAML = \"covertype_kfp_pipeline.yaml\""
]
},
{
Expand All @@ -435,12 +420,12 @@
"source": [
"### Exercise\n",
"\n",
"Compile the `pipeline_vertex/pipeline.py` with the `dsl-compile-v2` command line:"
"Compile the `pipeline_vertex/pipeline.py` with the `kfp dsl compile` command line:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -454,11 +439,11 @@
"**Note:** You can also use the Python SDK to compile the pipeline:\n",
"\n",
"```python\n",
"from kfp.v2 import compiler\n",
"from kfp import compiler\n",
"\n",
"compiler.Compiler().compile(\n",
" pipeline_func=create_pipeline, \n",
" package_path=PIPELINE_JSON,\n",
" package_path=PIPELINE_YAML,\n",
")\n",
"\n",
"```"
Expand All @@ -477,7 +462,7 @@
"metadata": {},
"outputs": [],
"source": [
"!head {PIPELINE_JSON}"
"!head {PIPELINE_YAML}"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Copyright 2021 Google LLC

# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at

# https://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS"
# BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Lightweight component tuning function."""
from typing import Dict, List, NamedTuple

from kfp.dsl import component


@component(
base_image="python:3.8",
packages_to_install=["google-cloud-aiplatform"],
)
def retrieve_best_hptune_result(
project: str,
location: str,
gcp_resources: str,
container_uri: str,
training_file_path: str,
validation_file_path: str,
) -> NamedTuple(
"Outputs",
[
("best_parameters", Dict),
("best_metrics", Dict),
("best_worker_pool_spec", List),
],
):

# pylint: disable=import-outside-toplevel
import json

from google.cloud import aiplatform

aiplatform.init(project=project, location=location)

# Retrieve the hyperparameter tuning job result
gcp_resources = json.loads(gcp_resources)
job_id = gcp_resources["resources"][0]["resourceUri"].split("/")[-1]
hp_job = aiplatform.HyperparameterTuningJob.get(job_id)

# Retrieve the best trial
metrics = [
trial.final_measurement.metrics[0].value for trial in hp_job.trials
]
goal = hp_job.to_dict()["studySpec"]["metrics"][0]["goal"]
goal_f = min if goal == "MINIMIZE" else max # MINIMIZE or MAXIMIZE
best_result = goal_f(metrics)
best_trial = hp_job.trials[metrics.index(best_result)]

best_parameters = {
param.parameter_id: param.value for param in best_trial.parameters
}

best_metrics = {
m.metric_id: m.value for m in best_trial.final_measurement.metrics
}

# Construct worker_pool_spec
best_worker_pool_spec = [
{
"machine_spec": {"machine_type": "n1-standard-4"},
"replica_count": 1,
"container_spec": {
"image_uri": container_uri,
"args": [
f"--training_dataset_path={training_file_path}",
f"--validation_dataset_path={validation_file_path}",
"--nohptune",
],
},
}
]

for k, v in best_parameters.items():
best_worker_pool_spec[0]["container_spec"]["args"].append(f"--{k}={v}")

return best_parameters, best_metrics, best_worker_pool_spec
Loading

0 comments on commit efbe9e4

Please sign in to comment.