Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tekton-kfptask): Update kfptask to publish completed dag status #1426

Merged
merged 3 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tekton-catalog/tekton-kfptask/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ require (
github.com/kubeflow/pipelines v0.0.0-20231027040853-58ce09e07d03
github.com/kubeflow/pipelines/api v0.0.0-20231027040853-58ce09e07d03
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20231027040853-58ce09e07d03
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20231027040853-58ce09e07d03
github.com/tektoncd/pipeline v0.53.2
go.uber.org/zap v1.26.0
google.golang.org/protobuf v1.31.0
k8s.io/api v0.27.1
k8s.io/apimachinery v0.27.3
k8s.io/client-go v0.27.2
Expand Down Expand Up @@ -64,7 +66,6 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20230810215105-e1f0c010f800 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand Down Expand Up @@ -101,7 +102,6 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231009173412-8bfb1ae86b6c // indirect
google.golang.org/grpc v1.58.3 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
3 changes: 2 additions & 1 deletion tekton-catalog/tekton-kfptask/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 23 additions & 2 deletions tekton-catalog/tekton-kfptask/pkg/reconciler/kfptask/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/kubeflow/pipelines/backend/src/v2/driver"
"github.com/kubeflow/pipelines/backend/src/v2/metadata"
"github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform"
pb "github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/pod"
tektonv1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1"
Expand All @@ -43,6 +44,7 @@ import (
listeners "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1"
listenersv1beta1 "github.com/tektoncd/pipeline/pkg/client/listers/pipeline/v1beta1"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/structpb"
k8score "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -462,6 +464,24 @@ func v1ParamsConversion(ctx context.Context, v1beta1Params tektonv1beta1.Params)
return v1Params
}

func DAGPublisher(ctx context.Context, opts driver.Options, mlmd *metadata.Client) (err error) {
defer func() {
if err != nil {
err = fmt.Errorf("failed to publish driver DAG execution %s: %w", fmt.Sprint(opts.DAGExecutionID), err)
}
}()
var outputParameters map[string]*structpb.Value
status := pb.Execution_COMPLETE
execution, err := mlmd.GetExecution(ctx, opts.DAGExecutionID)
if err != nil {
return fmt.Errorf("failed to get execution: %w", err)
}
if err = mlmd.PublishExecution(ctx, execution, outputParameters, nil, status); err != nil {
return fmt.Errorf("failed to publish: %w", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to construct a new error here, because of the defer

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The defer statement is the high level debugging for this function, the fmt error is just to help print out more specific error statement.

Copy link
Member

@yhwang yhwang Dec 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, this error gets overridden by the deffer. there is no meaning to construct a new error here.

I got it. I thought there is only one type of error here. NVM

}
return nil
}

func execDriver(ctx context.Context, options *driverOptions) (*[]tektonv1beta1.CustomRunResult, bool, string, string, string, error) {
var execution *driver.Execution
var err error
Expand All @@ -479,8 +499,9 @@ func execDriver(ctx context.Context, options *driverOptions) (*[]tektonv1beta1.C
case "DAG":
execution, err = driver.DAG(ctx, options.options, options.mlmdClient)
case "DAG_PUB":
// no-op for now
return &[]tektonv1beta1.CustomRunResult{}, taskRunDecision, executionID, executorInput, podSpecPatch, nil
// current DAG_PUB only scheduled when the dag execution is completed
err = DAGPublisher(ctx, options.options, options.mlmdClient)
return &[]tektonv1beta1.CustomRunResult{}, taskRunDecision, executionID, executorInput, podSpecPatch, err
default:
err = fmt.Errorf("unknown driverType %s", options.driverType)
}
Expand Down
Loading