diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index 1101bff59..cc1a2dae7 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -740,11 +740,12 @@ func (wc *WorkflowClient) DescribeWorkflowExecution(ctx context.Context, workflo // QueryWorkflow queries a given workflow execution // workflowID and queryType are required, other parameters are optional. -// - workflow ID of the workflow. -// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID. -// - taskQueue can be default(empty string). If empty string then it will pick the taskQueue of the running execution of that workflow ID. -// - queryType is the type of the query. -// - args... are the optional query parameters. +// - workflow ID of the workflow. +// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID. +// - taskQueue can be default(empty string). If empty string then it will pick the taskQueue of the running execution of that workflow ID. +// - queryType is the type of the query. +// - args... are the optional query parameters. +// // The errors it can return: // - serviceerror.InvalidArgument // - serviceerror.Internal @@ -943,8 +944,9 @@ func (wc *WorkflowClient) QueryWorkflowWithOptions(ctx context.Context, request // DescribeTaskQueue returns information about the target taskqueue, right now this API returns the // pollers which polled this taskqueue in last few minutes. -// - taskqueue name of taskqueue -// - taskqueueType type of taskqueue, can be workflow or activity +// - taskqueue name of taskqueue +// - taskqueueType type of taskqueue, can be workflow or activity +// // The errors it can return: // - serviceerror.InvalidArgument // - serviceerror.Internal @@ -1798,13 +1800,6 @@ func (w *workflowClientInterceptor) UpdateWithStartWorkflow( updateReq.WorkflowExecution.WorkflowId = startReq.WorkflowId } - grpcCtx, cancel := newGRPCContext( - ctx, - grpcMetricsHandler(w.client.metricsHandler.WithTags( - metrics.RPCTags(startOp.input.WorkflowType, metrics.NoneTagValue, startOp.input.Options.TaskQueue))), - defaultGrpcRetryParameters(ctx)) - defer cancel() - iterFn := func(fnCtx context.Context, fnRunID string) HistoryEventIterator { metricsHandler := w.client.metricsHandler.WithTags(metrics.RPCTags(startOp.input.WorkflowType, metrics.NoneTagValue, startOp.input.Options.TaskQueue)) @@ -1825,10 +1820,14 @@ func (w *workflowClientInterceptor) UpdateWithStartWorkflow( }, nil) } - updateResp, err := w.updateWithStartWorkflow(grpcCtx, startReq, updateReq, onStart) + metricsHandler := w.client.metricsHandler.WithTags(metrics.RPCTags(startOp.input.WorkflowType, + metrics.NoneTagValue, startOp.input.Options.TaskQueue)) + + updateResp, err := w.updateWithStartWorkflow(ctx, startReq, updateReq, onStart, metricsHandler) if err != nil { return nil, err } + handle, err := w.updateHandleFromResponse(ctx, updateReq.WaitPolicy.LifecycleStage, updateResp) if err != nil { return nil, err @@ -1845,6 +1844,7 @@ func (w *workflowClientInterceptor) updateWithStartWorkflow( startRequest *workflowservice.StartWorkflowExecutionRequest, updateRequest *workflowservice.UpdateWorkflowExecutionRequest, onStart func(*workflowservice.StartWorkflowExecutionResponse), + rpcMetricsHandler metrics.Handler, ) (*workflowservice.UpdateWorkflowExecutionResponse, error) { startOp := &workflowservice.ExecuteMultiOperationRequest_Operation{ Operation: &workflowservice.ExecuteMultiOperationRequest_Operation_StartWorkflow{ @@ -1868,7 +1868,12 @@ func (w *workflowClientInterceptor) updateWithStartWorkflow( seenStart := false for { multiResp, err := func() (*workflowservice.ExecuteMultiOperationResponse, error) { - grpcCtx, cancel := newGRPCContext(ctx, grpcTimeout(pollUpdateTimeout), grpcLongPoll(true), defaultGrpcRetryParameters(ctx)) + grpcCtx, cancel := newGRPCContext( + ctx, + grpcTimeout(pollUpdateTimeout), + grpcLongPoll(true), + grpcMetricsHandler(rpcMetricsHandler), + defaultGrpcRetryParameters(ctx)) defer cancel() multiResp, err := w.client.workflowService.ExecuteMultiOperation(grpcCtx, &multiRequest) @@ -2198,6 +2203,7 @@ func (w *workflowClientInterceptor) UpdateWorkflow( resp, err = func() (*workflowservice.UpdateWorkflowExecutionResponse, error) { grpcCtx, cancel := newGRPCContext(ctx, grpcTimeout(pollUpdateTimeout), grpcLongPoll(true), defaultGrpcRetryParameters(ctx)) defer cancel() + return w.client.workflowService.UpdateWorkflowExecution(grpcCtx, req) }() if err != nil { diff --git a/internal/internal_workflow_client_test.go b/internal/internal_workflow_client_test.go index 9ae7d1f69..54bc4bc76 100644 --- a/internal/internal_workflow_client_test.go +++ b/internal/internal_workflow_client_test.go @@ -1031,8 +1031,41 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_Retry() { s.NoError(err) } -func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_OperationNotExecuted() { +func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_DefaultTimeout() { + var actualDeadline time.Time + expectedDeadline := time.Now().Add(pollUpdateTimeout) + s.workflowServiceClient.EXPECT(). + ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func( + ctx context.Context, + _ *workflowservice.ExecuteMultiOperationRequest, + _ ...grpc.CallOption, + ) (*workflowservice.ExecuteMultiOperationResponse, error) { + actualDeadline, _ = ctx.Deadline() + return nil, errors.New("intentional error") + }) + _, _ = s.workflowClient.UpdateWithStartWorkflow( + context.Background(), + UpdateWithStartWorkflowOptions{ + UpdateOptions: UpdateWorkflowOptions{ + UpdateName: "update", + WaitForStage: WorkflowUpdateStageCompleted, + }, + StartWorkflowOperation: s.workflowClient.NewWithStartWorkflowOperation( + StartWorkflowOptions{ + ID: workflowID, + WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL, + TaskQueue: taskqueue, + }, workflowType, + ), + }, + ) + + require.WithinDuration(s.T(), expectedDeadline, actualDeadline, 2*time.Second) +} + +func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_OperationNotExecuted() { startOp := s.workflowClient.NewWithStartWorkflowOperation( StartWorkflowOptions{ ID: workflowID,