Skip to content

Commit

Permalink
Fix Update-with-Start grpc deadline (#1798)
Browse files Browse the repository at this point in the history
  • Loading branch information
stephanos authored Feb 3, 2025
1 parent 0262297 commit 349283d
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 17 deletions.
38 changes: 22 additions & 16 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,11 +740,12 @@ func (wc *WorkflowClient) DescribeWorkflowExecution(ctx context.Context, workflo

// QueryWorkflow queries a given workflow execution
// workflowID and queryType are required, other parameters are optional.
// - workflow ID of the workflow.
// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
// - taskQueue can be default(empty string). If empty string then it will pick the taskQueue of the running execution of that workflow ID.
// - queryType is the type of the query.
// - args... are the optional query parameters.
// - workflow ID of the workflow.
// - runID can be default(empty string). if empty string then it will pick the running execution of that workflow ID.
// - taskQueue can be default(empty string). If empty string then it will pick the taskQueue of the running execution of that workflow ID.
// - queryType is the type of the query.
// - args... are the optional query parameters.
//
// The errors it can return:
// - serviceerror.InvalidArgument
// - serviceerror.Internal
Expand Down Expand Up @@ -943,8 +944,9 @@ func (wc *WorkflowClient) QueryWorkflowWithOptions(ctx context.Context, request

// DescribeTaskQueue returns information about the target taskqueue, right now this API returns the
// pollers which polled this taskqueue in last few minutes.
// - taskqueue name of taskqueue
// - taskqueueType type of taskqueue, can be workflow or activity
// - taskqueue name of taskqueue
// - taskqueueType type of taskqueue, can be workflow or activity
//
// The errors it can return:
// - serviceerror.InvalidArgument
// - serviceerror.Internal
Expand Down Expand Up @@ -1798,13 +1800,6 @@ func (w *workflowClientInterceptor) UpdateWithStartWorkflow(
updateReq.WorkflowExecution.WorkflowId = startReq.WorkflowId
}

grpcCtx, cancel := newGRPCContext(
ctx,
grpcMetricsHandler(w.client.metricsHandler.WithTags(
metrics.RPCTags(startOp.input.WorkflowType, metrics.NoneTagValue, startOp.input.Options.TaskQueue))),
defaultGrpcRetryParameters(ctx))
defer cancel()

iterFn := func(fnCtx context.Context, fnRunID string) HistoryEventIterator {
metricsHandler := w.client.metricsHandler.WithTags(metrics.RPCTags(startOp.input.WorkflowType,
metrics.NoneTagValue, startOp.input.Options.TaskQueue))
Expand All @@ -1825,10 +1820,14 @@ func (w *workflowClientInterceptor) UpdateWithStartWorkflow(
}, nil)
}

updateResp, err := w.updateWithStartWorkflow(grpcCtx, startReq, updateReq, onStart)
metricsHandler := w.client.metricsHandler.WithTags(metrics.RPCTags(startOp.input.WorkflowType,
metrics.NoneTagValue, startOp.input.Options.TaskQueue))

updateResp, err := w.updateWithStartWorkflow(ctx, startReq, updateReq, onStart, metricsHandler)
if err != nil {
return nil, err
}

handle, err := w.updateHandleFromResponse(ctx, updateReq.WaitPolicy.LifecycleStage, updateResp)
if err != nil {
return nil, err
Expand All @@ -1845,6 +1844,7 @@ func (w *workflowClientInterceptor) updateWithStartWorkflow(
startRequest *workflowservice.StartWorkflowExecutionRequest,
updateRequest *workflowservice.UpdateWorkflowExecutionRequest,
onStart func(*workflowservice.StartWorkflowExecutionResponse),
rpcMetricsHandler metrics.Handler,
) (*workflowservice.UpdateWorkflowExecutionResponse, error) {
startOp := &workflowservice.ExecuteMultiOperationRequest_Operation{
Operation: &workflowservice.ExecuteMultiOperationRequest_Operation_StartWorkflow{
Expand All @@ -1868,7 +1868,12 @@ func (w *workflowClientInterceptor) updateWithStartWorkflow(
seenStart := false
for {
multiResp, err := func() (*workflowservice.ExecuteMultiOperationResponse, error) {
grpcCtx, cancel := newGRPCContext(ctx, grpcTimeout(pollUpdateTimeout), grpcLongPoll(true), defaultGrpcRetryParameters(ctx))
grpcCtx, cancel := newGRPCContext(
ctx,
grpcTimeout(pollUpdateTimeout),
grpcLongPoll(true),
grpcMetricsHandler(rpcMetricsHandler),
defaultGrpcRetryParameters(ctx))
defer cancel()

multiResp, err := w.client.workflowService.ExecuteMultiOperation(grpcCtx, &multiRequest)
Expand Down Expand Up @@ -2198,6 +2203,7 @@ func (w *workflowClientInterceptor) UpdateWorkflow(
resp, err = func() (*workflowservice.UpdateWorkflowExecutionResponse, error) {
grpcCtx, cancel := newGRPCContext(ctx, grpcTimeout(pollUpdateTimeout), grpcLongPoll(true), defaultGrpcRetryParameters(ctx))
defer cancel()

return w.client.workflowService.UpdateWorkflowExecution(grpcCtx, req)
}()
if err != nil {
Expand Down
35 changes: 34 additions & 1 deletion internal/internal_workflow_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,8 +1031,41 @@ func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_Retry() {
s.NoError(err)
}

func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_OperationNotExecuted() {
func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_DefaultTimeout() {
var actualDeadline time.Time
expectedDeadline := time.Now().Add(pollUpdateTimeout)
s.workflowServiceClient.EXPECT().
ExecuteMultiOperation(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(
ctx context.Context,
_ *workflowservice.ExecuteMultiOperationRequest,
_ ...grpc.CallOption,
) (*workflowservice.ExecuteMultiOperationResponse, error) {
actualDeadline, _ = ctx.Deadline()
return nil, errors.New("intentional error")
})

_, _ = s.workflowClient.UpdateWithStartWorkflow(
context.Background(),
UpdateWithStartWorkflowOptions{
UpdateOptions: UpdateWorkflowOptions{
UpdateName: "update",
WaitForStage: WorkflowUpdateStageCompleted,
},
StartWorkflowOperation: s.workflowClient.NewWithStartWorkflowOperation(
StartWorkflowOptions{
ID: workflowID,
WorkflowIDConflictPolicy: enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
TaskQueue: taskqueue,
}, workflowType,
),
},
)

require.WithinDuration(s.T(), expectedDeadline, actualDeadline, 2*time.Second)
}

func (s *workflowRunSuite) TestExecuteWorkflowWithUpdate_OperationNotExecuted() {
startOp := s.workflowClient.NewWithStartWorkflowOperation(
StartWorkflowOptions{
ID: workflowID,
Expand Down

0 comments on commit 349283d

Please sign in to comment.