From 268c2a267767ec5a58ef53864b618ad12e7a9aab Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Fri, 27 Sep 2024 17:06:51 -0300 Subject: [PATCH 1/6] Added getVersion for loops and callbacks --- .../WorkflowOutboundCallsInterceptor.java | 2 + .../internal/sync/SyncWorkflowContext.java | 42 ++++++++++ .../internal/sync/WorkflowInternal.java | 5 ++ .../java/io/temporal/workflow/Workflow.java | 84 +++++++++++++++++++ 4 files changed, 133 insertions(+) diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java index 14005dc18..b1602cdb4 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptor.java @@ -595,6 +595,8 @@ R mutableSideEffect( int getVersion(String changeId, int minSupported, int maxSupported); + int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported); + void continueAsNew(ContinueAsNewInput input); void registerQuery(RegisterQueryInput input); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java index 9e1bfc1b4..727c7e176 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/SyncWorkflowContext.java @@ -123,6 +123,8 @@ final class SyncWorkflowContext implements WorkflowContext, WorkflowOutboundCall private Map runningUpdateHandlers = new HashMap<>(); // Map of all running signal handlers. Key is the event Id of the signal event. private Map runningSignalHandlers = new HashMap<>(); + // Current versions for the getVersion call that supports iterationId. + private final Map currentVersions = new HashMap<>(); public SyncWorkflowContext( @Nonnull String namespace, @@ -968,6 +970,46 @@ public int getVersion(String changeId, int minSupported, int maxSupported) { } } + @Override + public int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported) { + Integer currentVersion = currentVersions.get(seriesId); + // When replaying check if there is a marker (by calling getVersion) for each iteration. + if (isReplaying()) { + int iVersion = getVersion(seriesId + "/" + iterationId, minSupported, maxSupported); + if (currentVersion != null) { + if (iVersion < currentVersion) { + throw new IllegalArgumentException( + "getVersion for changeId '" + + seriesId + + "/" + + iterationId + + "' returned " + + iVersion + + " which is smaller than previously found version of " + + currentVersion); + } + if (iVersion != DEFAULT_VERSION) { + currentVersions.put(seriesId, iVersion); + return iVersion; + } + return currentVersion; + } + return iVersion; + } else { + // When not replaying, only insert a marker (by calling getVersion) if the maxSupported is + // larger than the already recorded one. + if (currentVersion == null || (currentVersion != null && maxSupported > currentVersion)) { + int iVersion = getVersion(seriesId + "/" + iterationId, minSupported, maxSupported); + if (iVersion != maxSupported) { + throw new RuntimeException("getVersion returned wrong version: " + iVersion); + } + currentVersions.put(seriesId, iVersion); + return iVersion; + } + return currentVersion; + } + } + @Override public void registerQuery(RegisterQueryInput request) { queryDispatcher.registerQueryHandlers(request); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index 8990c9ab4..c526c8c7e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -519,6 +519,11 @@ public static int getVersion(String changeId, int minSupported, int maxSupported return getWorkflowOutboundInterceptor().getVersion(changeId, minSupported, maxSupported); } + public static int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported) { + assertNotReadOnly("get version"); + return getWorkflowOutboundInterceptor().getVersion(seriesId, iterationId, minSupported, maxSupported); + } + public static Promise promiseAllOf(Iterable> promises) { return new AllOfPromise(promises); } diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java index 5925b82d3..627f9e3bd 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java @@ -989,6 +989,90 @@ public static int getVersion(String changeId, int minSupported, int maxSupported return WorkflowInternal.getVersion(changeId, minSupported, maxSupported); } + /** + * Used to perform workflow safe code changes in code that is called repeatedly like body of a + * loop or a signal handler. + * + *

Consider the following example: + * + *

+   * for (int i=0; i<100; i++) {
+   *     if (getVersion("fix1", DEFAULT_VERSION, 1) == DEFAULT_VERSION) {
+   *         // OLD CODE
+   *     } else {
+   *         // NEW CODE
+   *     }
+   * }
+   * 
+ * + * If the change is introduced after the loop started executing all iterations are going to use + * the default version, event if most of them happen after the change. This happens because the + * getVersion call returns the same version for all calls that share a changeId. The same issue + * arises when changing code in callbacks like signal or update handlers. + * + *

The following solution works as it uses different changeId for each iteration: + * + *

+   * for (int i=0; i<100; i++) {
+   *     if (getVersion("fix1-" + i, DEFAULT_VERSION, 1) == DEFAULT_VERSION) {
+   *         // OLD CODE
+   *     } else {
+   *         // NEW CODE
+   *     }
+   * }
+   * 
+ * + * But such solution creates a marker event as well as it updates a search attribute for each + * iteration. So it is not practical for the large number of iterations. + * + *

This method provides an efficient alternative to the solution that used different seriesId + * for each iteration. It only inserts a marker when a version changes. + * + *

Here is how it could be used: + * + *

+   * for (int i=0; i<100; i++) {
+   *     if (getVersion("fix1", String.valueOf(i), DEFAULT_VERSION, 1) == DEFAULT_VERSION) {
+   *         // OLD CODE
+   *     } else {
+   *         // NEW CODE
+   *     }
+   * }
+   * 
+ * + * It is OK to add more branches later assuming that seriesId stays the same. During replay the + * iterationId should return the same values as it returned in the original execution. + * + *
+   * for (int i=0; i<100; i++) {
+   *     int v = getVersion("fix1", String.valueOf(i), DEFAULT_VERSION, 2);
+   *     if (v == DEFAULT_VERSION) {
+   *         // OLD CODE
+   *     } else if (v == 1) {
+   *         // CODE FOR THE FIRST CHANGE
+   *     } else {
+   *         // CODE FOR THE LAST CHANGE
+   *     }
+   * }
+   * 
+ * + * All calls that have the same seriesId and iterationId argument return the same value. But only + * if they follow each other. The moment a call with a different iterationId is made the version + * is going to change. + * + * @param seriesId identifier of a series of changes. + * @param iterationId identifier of each iteration over the changed code. + * @param minSupported min version supported for the change + * @param maxSupported max version supported for the change, this version is used as the current + * one during the original execution. + * @return {@code maxSupported} when is originally executed. Original version recorded in the + * history on replays. + */ + public static int getVersion( + String seriesId, String iterationId, int minSupported, int maxSupported) { + return WorkflowInternal.getVersion(seriesId, iterationId, minSupported, maxSupported); + } + /** * Get scope for reporting business metrics in workflow logic. This should be used instead of * creating new metrics scopes as it is able to dedupe metrics during replay. From d4c5aae9e84c36f2312fcf3fc6d5fbdef38fc0f2 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Fri, 27 Sep 2024 23:08:49 -0400 Subject: [PATCH 2/6] Fixed compilation issues --- .../WorkflowOutboundCallsInterceptorBase.java | 5 + .../internal/sync/WorkflowInternal.java | 6 +- .../versionTests/GetVersionSeriesTest.java | 117 ++++++++++++++++++ .../workflow/versionTests/GetVersionTest.java | 7 +- .../TestActivityEnvironmentInternal.java | 5 + .../internal/TracingWorkerInterceptor.java | 8 ++ 6 files changed, 143 insertions(+), 5 deletions(-) create mode 100644 temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionSeriesTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java index eb658ee62..8bda16142 100644 --- a/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java +++ b/temporal-sdk/src/main/java/io/temporal/common/interceptors/WorkflowOutboundCallsInterceptorBase.java @@ -106,6 +106,11 @@ public int getVersion(String changeId, int minSupported, int maxSupported) { return next.getVersion(changeId, minSupported, maxSupported); } + @Override + public int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported) { + return next.getVersion(seriesId, iterationId, minSupported, maxSupported); + } + @Override public void continueAsNew(ContinueAsNewInput input) { next.continueAsNew(input); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java index c526c8c7e..58bddabe1 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/sync/WorkflowInternal.java @@ -519,9 +519,11 @@ public static int getVersion(String changeId, int minSupported, int maxSupported return getWorkflowOutboundInterceptor().getVersion(changeId, minSupported, maxSupported); } - public static int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported) { + public static int getVersion( + String seriesId, String iterationId, int minSupported, int maxSupported) { assertNotReadOnly("get version"); - return getWorkflowOutboundInterceptor().getVersion(seriesId, iterationId, minSupported, maxSupported); + return getWorkflowOutboundInterceptor() + .getVersion(seriesId, iterationId, minSupported, maxSupported); } public static Promise promiseAllOf(Iterable> promises) { diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionSeriesTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionSeriesTest.java new file mode 100644 index 000000000..0e29a884a --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionSeriesTest.java @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved. + * + * Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this material except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.temporal.workflow.versionTests; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import io.temporal.testing.WorkflowReplayer; +import io.temporal.testing.internal.SDKTestOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.testing.internal.TracingWorkerInterceptor; +import io.temporal.worker.WorkerOptions; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.shared.TestActivities.TestActivitiesImpl; +import io.temporal.workflow.shared.TestActivities.VariousTestActivities; +import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1; +import io.temporal.workflow.unsafe.WorkflowUnsafe; +import java.time.Duration; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; + +public class GetVersionSeriesTest { + + private static boolean hasReplayed; + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowTypes(TestGetVersionSeriesWorkflowImpl.class) + .setActivityImplementations(new TestActivitiesImpl()) + // Forcing a replay. Full history arrived from a normal queue causing a replay. + .setWorkerOptions( + WorkerOptions.newBuilder() + .setStickyQueueScheduleToStartTimeout(Duration.ZERO) + .build()) + .build(); + + @Test + public void testGetVersion() { + TestWorkflow1 workflowStub = + testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + assertTrue(hasReplayed); + assertEquals("foo", result); + testWorkflowRule + .getInterceptor(TracingWorkerInterceptor.class) + .setExpected( + "interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP, + "newThread workflow-method", + "getVersion", + "executeActivity Activity2", + "activity Activity2", + "getVersion", + "executeActivity customActivity1", + "activity customActivity1", + "executeActivity customActivity1", + "activity customActivity1", + "sleep PT1S", + "getVersion", + "executeActivity customActivity1", + "activity customActivity1"); + } + + @Test + @Ignore + public void testGetVersionReplay() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testGetVersionSeriesHistory.json", TestGetVersionSeriesWorkflowImpl.class); + } + + public static class TestGetVersionSeriesWorkflowImpl implements TestWorkflow1 { + + @Override + public String execute(String taskQueue) { + VariousTestActivities testActivities = + Workflow.newActivityStub( + VariousTestActivities.class, + SDKTestOptions.newActivityOptionsForTaskQueue(taskQueue)); + + for (int i = 0; i < 10; i++) { + // Test adding a version check in non-replay code. + int maxSupported = i / 2 + 1; + int version = + Workflow.getVersion("s1", String.valueOf(i), Workflow.DEFAULT_VERSION, maxSupported); + assertEquals(version, maxSupported); + testActivities.activity2("activity2", 2); + } + + // Test adding a version check in replay code. + if (WorkflowUnsafe.isReplaying()) { + hasReplayed = true; + } + // Force replay + Workflow.sleep(1000); + return "foo"; + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionTest.java index e59464faa..97e160ee1 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionTest.java @@ -57,9 +57,6 @@ public class GetVersionTest { public void testGetVersion() { TestWorkflow1 workflowStub = testWorkflowRule.newWorkflowStubTimeoutOptions(TestWorkflow1.class); - String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); - assertTrue(hasReplayed); - assertEquals("activity22activity1activity1activity1", result); testWorkflowRule .getInterceptor(TracingWorkerInterceptor.class) .setExpected( @@ -77,6 +74,10 @@ public void testGetVersion() { "getVersion", "executeActivity customActivity1", "activity customActivity1"); + + String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); + assertTrue(hasReplayed); + assertEquals("activity22activity1activity1activity1", result); } @Test diff --git a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java index fd4afa070..9b8638825 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java +++ b/temporal-testing/src/main/java/io/temporal/testing/TestActivityEnvironmentInternal.java @@ -430,6 +430,11 @@ public int getVersion(String changeId, int minSupported, int maxSupported) { throw new UnsupportedOperationException("not implemented"); } + @Override + public int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public void continueAsNew(ContinueAsNewInput input) { throw new UnsupportedOperationException("not implemented"); diff --git a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java index 3d30cf662..d15210614 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java +++ b/temporal-testing/src/main/java/io/temporal/testing/internal/TracingWorkerInterceptor.java @@ -276,6 +276,14 @@ public int getVersion(String changeId, int minSupported, int maxSupported) { return next.getVersion(changeId, minSupported, maxSupported); } + @Override + public int getVersion(String seriesId, String iterationId, int minSupported, int maxSupported) { + if (!WorkflowUnsafe.isReplaying()) { + trace.add("getVersionSeries"); + } + return next.getVersion(seriesId, iterationId, minSupported, maxSupported); + } + @Override public void continueAsNew(ContinueAsNewInput input) { if (!WorkflowUnsafe.isReplaying()) { From 75934635be06955f82ddc358728f0e526d468a40 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Fri, 27 Sep 2024 23:33:23 -0400 Subject: [PATCH 3/6] Fixed SDKTestWorkflowRule to call shutdown --- .../temporal/testing/internal/SDKTestWorkflowRule.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/temporal-testing/src/main/java/io/temporal/testing/internal/SDKTestWorkflowRule.java b/temporal-testing/src/main/java/io/temporal/testing/internal/SDKTestWorkflowRule.java index 97907f02b..7d1c18b55 100644 --- a/temporal-testing/src/main/java/io/temporal/testing/internal/SDKTestWorkflowRule.java +++ b/temporal-testing/src/main/java/io/temporal/testing/internal/SDKTestWorkflowRule.java @@ -232,7 +232,14 @@ public SDKTestWorkflowRule build() { } public Statement apply(@Nonnull Statement base, Description description) { - Statement testWorkflowStatement = base; + Statement testWorkflowStatement = + new Statement() { + @Override + public void evaluate() throws Throwable { + base.evaluate(); + shutdown(); + } + }; Test annotation = description.getAnnotation(Test.class); boolean timeoutIsOverriddenOnTestAnnotation = annotation != null && annotation.timeout() > 0; From 6d1d46321399cdb5f68270b1ebca620770f438a4 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Sat, 28 Sep 2024 00:35:42 -0400 Subject: [PATCH 4/6] Fixed test broken after rule fix --- .../test/java/io/temporal/workflow/SagaTest.java | 1 + .../activityTests/ParallelLocalActivitiesTest.java | 1 - .../workflow/activityTests/TestLocalActivity.java | 4 ---- .../searchattributes/UpsertSearchAttributeTest.java | 8 ++++++-- .../UpsertTypedSearchAttributeTest.java | 13 ++++++++++--- 5 files changed, 17 insertions(+), 10 deletions(-) diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/SagaTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/SagaTest.java index 78a29a6df..c049337d2 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/SagaTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/SagaTest.java @@ -57,6 +57,7 @@ public void testSaga() { "activity customActivity1", "executeChildWorkflow TestNoArgsWorkflowFunc", "interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP, + "registerUpdateHandlers update", "newThread workflow-method", "executeActivity ThrowIO", "activity ThrowIO", diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ParallelLocalActivitiesTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ParallelLocalActivitiesTest.java index 2b27d42f5..17957dbf5 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ParallelLocalActivitiesTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/ParallelLocalActivitiesTest.java @@ -69,7 +69,6 @@ public void testParallelLocalActivities() { expected.add("newThread workflow-method"); for (int i = 0; i < TestParallelLocalActivitiesWorkflowImpl.COUNT; i++) { expected.add("executeLocalActivity SleepActivity"); - expected.add("currentTimeMillis"); } for (int i = 0; i < TestParallelLocalActivitiesWorkflowImpl.COUNT; i++) { expected.add("local activity SleepActivity"); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/TestLocalActivity.java b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/TestLocalActivity.java index 841944c0a..f69880fad 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/TestLocalActivity.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/activityTests/TestLocalActivity.java @@ -71,12 +71,10 @@ public void testLocalActivity() { "interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP, "newThread workflow-method", "executeLocalActivity ThrowIO", - "currentTimeMillis", "local activity ThrowIO", "local activity ThrowIO", "local activity ThrowIO", "executeLocalActivity Activity2", - "currentTimeMillis", "local activity Activity2", "executeActivity Activity2", "activity Activity2"); @@ -115,12 +113,10 @@ public void testLocalActivityNoInput() { "interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP, "newThread workflow-method", "executeLocalActivity ThrowIO", - "currentTimeMillis", "local activity ThrowIO", "local activity ThrowIO", "local activity ThrowIO", "executeLocalActivity Activity2", - "currentTimeMillis", "local activity Activity2", "executeActivity Activity2", "activity Activity2"); diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/searchattributes/UpsertSearchAttributeTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/searchattributes/UpsertSearchAttributeTest.java index 286552f70..f00a5a0b5 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/searchattributes/UpsertSearchAttributeTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/searchattributes/UpsertSearchAttributeTest.java @@ -69,8 +69,12 @@ public void testUpsertSearchAttributes() { "interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP, "newThread workflow-method", "upsertSearchAttributes", - "executeActivity Activity", - "activity Activity"); + "sleep PT0.1S", + "upsertSearchAttributes", + "sleep PT0.1S", + "upsertSearchAttributes", + "upsertSearchAttributes", + "sleep PT0.1S"); testWorkflowRule.assertHistoryEvent( execution.getWorkflowId(), EventType.EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES); } diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/searchattributes/UpsertTypedSearchAttributeTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/searchattributes/UpsertTypedSearchAttributeTest.java index 779037e4b..d41422ee7 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/searchattributes/UpsertTypedSearchAttributeTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/searchattributes/UpsertTypedSearchAttributeTest.java @@ -72,9 +72,16 @@ public void testUpsertSearchAttributes() { .setExpected( "interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP, "newThread workflow-method", - "upsertSearchAttributes", - "executeActivity Activity", - "activity Activity"); + "upsertTypedSearchAttributes", + "sleep PT0.1S", + "upsertTypedSearchAttributes", + "sleep PT0.1S", + "upsertTypedSearchAttributes", + "upsertTypedSearchAttributes", + "sleep PT0.1S", + "upsertTypedSearchAttributes", + "upsertTypedSearchAttributes", + "upsertTypedSearchAttributes"); testWorkflowRule.assertHistoryEvent( execution.getWorkflowId(), EventType.EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES); } From 7c5377d32af3aa997e5a2661de9900c9b751e9c4 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Sat, 28 Sep 2024 00:49:28 -0400 Subject: [PATCH 5/6] Fixed unit test --- .../versionTests/GetVersionSeriesTest.java | 41 ++++++------------- 1 file changed, 12 insertions(+), 29 deletions(-) diff --git a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionSeriesTest.java b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionSeriesTest.java index 0e29a884a..1efa9da5a 100644 --- a/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionSeriesTest.java +++ b/temporal-sdk/src/test/java/io/temporal/workflow/versionTests/GetVersionSeriesTest.java @@ -23,10 +23,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import io.temporal.testing.WorkflowReplayer; +import io.temporal.api.enums.v1.EventType; +import io.temporal.api.history.v1.HistoryEvent; +import io.temporal.client.WorkflowStub; import io.temporal.testing.internal.SDKTestOptions; import io.temporal.testing.internal.SDKTestWorkflowRule; -import io.temporal.testing.internal.TracingWorkerInterceptor; import io.temporal.worker.WorkerOptions; import io.temporal.workflow.Workflow; import io.temporal.workflow.shared.TestActivities.TestActivitiesImpl; @@ -34,7 +35,7 @@ import io.temporal.workflow.shared.TestWorkflows.TestWorkflow1; import io.temporal.workflow.unsafe.WorkflowUnsafe; import java.time.Duration; -import org.junit.Ignore; +import java.util.List; import org.junit.Rule; import org.junit.Test; @@ -61,30 +62,11 @@ public void testGetVersion() { String result = workflowStub.execute(testWorkflowRule.getTaskQueue()); assertTrue(hasReplayed); assertEquals("foo", result); - testWorkflowRule - .getInterceptor(TracingWorkerInterceptor.class) - .setExpected( - "interceptExecuteWorkflow " + SDKTestWorkflowRule.UUID_REGEXP, - "newThread workflow-method", - "getVersion", - "executeActivity Activity2", - "activity Activity2", - "getVersion", - "executeActivity customActivity1", - "activity customActivity1", - "executeActivity customActivity1", - "activity customActivity1", - "sleep PT1S", - "getVersion", - "executeActivity customActivity1", - "activity customActivity1"); - } - - @Test - @Ignore - public void testGetVersionReplay() throws Exception { - WorkflowReplayer.replayWorkflowExecutionFromResource( - "testGetVersionSeriesHistory.json", TestGetVersionSeriesWorkflowImpl.class); + WorkflowStub untyped = WorkflowStub.fromTyped(workflowStub); + List markers = + testWorkflowRule.getHistoryEvents( + untyped.getExecution().getWorkflowId(), EventType.EVENT_TYPE_MARKER_RECORDED); + assertEquals(10, markers.size()); } public static class TestGetVersionSeriesWorkflowImpl implements TestWorkflow1 { @@ -96,11 +78,12 @@ public String execute(String taskQueue) { VariousTestActivities.class, SDKTestOptions.newActivityOptionsForTaskQueue(taskQueue)); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 20; i++) { // Test adding a version check in non-replay code. int maxSupported = i / 2 + 1; int version = - Workflow.getVersion("s1", String.valueOf(i), Workflow.DEFAULT_VERSION, maxSupported); + Workflow.getVersion( + "s1", String.valueOf(maxSupported), Workflow.DEFAULT_VERSION, maxSupported); assertEquals(version, maxSupported); testActivities.activity2("activity2", 2); } From 125cd021239a4ff8b752003e18c42f0bad1a5c0d Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Tue, 1 Oct 2024 15:10:58 -0700 Subject: [PATCH 6/6] Updated Javadoc --- .../java/io/temporal/workflow/Workflow.java | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java index 7a9475d4d..6ada58982 100644 --- a/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java +++ b/temporal-sdk/src/main/java/io/temporal/workflow/Workflow.java @@ -1016,12 +1016,14 @@ public static int getVersion(String changeId, int minSupported, int maxSupported * } * * - * If the change is introduced after the loop started executing all iterations are going to use - * the default version, event if most of them happen after the change. This happens because the - * getVersion call returns the same version for all calls that share a changeId. The same issue - * arises when changing code in callbacks like signal or update handlers. + * If the change is introduced after the loop starts executing, all iterations are going to use + * the default version, even if most of them happen after the change. This happens because the * + * getVersion call returns the same version for all calls that share a changeId. The same issue * + * arises when changing code in callbacks like signal or update handlers. Frequently, there is a + * need for a new version used for newer iterations (or signal handler invocations). * - *

The following solution works as it uses different changeId for each iteration: + *

The following solution supports updating the version of each iteration separately, as it + * uses a different changeId for each iteration: * *

    * for (int i=0; i<100; i++) {
@@ -1033,10 +1035,10 @@ public static int getVersion(String changeId, int minSupported, int maxSupported
    * }
    * 
* - * But such solution creates a marker event as well as it updates a search attribute for each - * iteration. So it is not practical for the large number of iterations. + * The drawback is a marker event as well as a search attribute update for each iteration. So, it + * is not practical for the large number of iterations. * - *

This method provides an efficient alternative to the solution that used different seriesId + *

This method provides an efficient alternative to the solution that uses a different changeId * for each iteration. It only inserts a marker when a version changes. * *

Here is how it could be used: @@ -1051,8 +1053,8 @@ public static int getVersion(String changeId, int minSupported, int maxSupported * } * * - * It is OK to add more branches later assuming that seriesId stays the same. During replay the - * iterationId should return the same values as it returned in the original execution. + * Adding more branches later is OK, assuming that the series stays the same. During replay, the + * iterationId should return the same values as in the original execution. * *

    * for (int i=0; i<100; i++) {
@@ -1067,9 +1069,9 @@ public static int getVersion(String changeId, int minSupported, int maxSupported
    * }
    * 
* - * All calls that have the same seriesId and iterationId argument return the same value. But only - * if they follow each other. The moment a call with a different iterationId is made the version - * is going to change. + * All calls with the same seriesId and iterationId argument return the same value. But only if + * they follow each other. The moment a call with a different iteration is made, the version + * changes. * * @param seriesId identifier of a series of changes. * @param iterationId identifier of each iteration over the changed code.