Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adhoc flowspec remove #4083

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.gobblin.service.modules.orchestration;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -39,6 +41,7 @@
import org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.DagTask;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExceptionUtils;
import org.apache.gobblin.util.ExecutorsUtils;


Expand Down Expand Up @@ -67,6 +70,8 @@ public class DagProcessingEngine extends AbstractIdleService {
public static final String DEFAULT_JOB_START_DEADLINE_TIME_MS = "defaultJobStartDeadlineTimeMillis";
@Getter static long defaultJobStartDeadlineTimeMillis;
public static final String DEFAULT_FLOW_FAILURE_OPTION = FailureOption.FINISH_ALL_POSSIBLE.name();
// Todo Update to fetch list from config once transient exception handling is implemented and retryable exceptions defined
public static List<Class<? extends Exception>> retryableExceptions = Collections.EMPTY_LIST;

@Inject
public DagProcessingEngine(Config config, DagTaskStream dagTaskStream, DagProcFactory dagProcFactory,
Expand All @@ -85,6 +90,10 @@ private static void setDefaultJobStartDeadlineTimeMs(long deadlineTimeMs) {
defaultJobStartDeadlineTimeMillis = deadlineTimeMs;
}

public static boolean isTransientException(Exception e) {
return ExceptionUtils.isExceptionInstanceOf(e, retryableExceptions);
}

@Override
protected void startUp() {
Integer numThreads = ConfigUtils.getInt
Expand Down Expand Up @@ -149,6 +158,13 @@ public void run() {
dagTask.conclude();
log.info(dagProc.contextualizeStatus("concluded dagTask"));
} catch (Exception e) {
if(!DagProcessingEngine.isTransientException(e)){
log.error("Ignoring non transient exception. DagTask {} will conclude and will not be retried. Exception - {} ",
dagTask, e);
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
dagTask.conclude();
}
// Todo add the else block for transient exceptions and add conclude task only if retry limit is not breached
log.error("DagProcEngineThread: " + dagProc.contextualizeStatus("error"), e);
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.util.ExceptionUtils;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.DagNodeId;
import org.apache.gobblin.service.modules.orchestration.DagActionStore;
Expand Down Expand Up @@ -92,20 +91,9 @@ public final void process(DagManagementStateStore dagManagementStateStore,
dagProcEngineMetrics.markDagActionsInitialize(getDagActionType(), false);
throw e;
}
try {
logContextualizedInfo("ready to process");
act(dagManagementStateStore, state, dagProcEngineMetrics);
logContextualizedInfo("processed");
} catch (Exception e) {
if (isNonTransientException(e)) {
log.error("Ignoring non transient exception. DagTask {} will conclude and will not be retried. Exception - {} ",
getDagTask(), e);
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
} else {
throw e;
}
}
}

protected abstract T initialize(DagManagementStateStore dagManagementStateStore) throws IOException;
Expand All @@ -126,8 +114,4 @@ public String contextualizeStatus(String message) {
public void logContextualizedInfo(String message) {
log.info(contextualizeStatus(message));
}

protected boolean isNonTransientException(Exception e) {
return ExceptionUtils.isExceptionInstanceOf(e, this.nonRetryableExceptions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public class DagProcUtils {
public static void submitNextNodes(DagManagementStateStore dagManagementStateStore, Dag<JobExecutionPlan> dag,
Dag.DagId dagId) throws IOException {
Set<Dag.DagNode<JobExecutionPlan>> nextNodes = DagUtils.getNext(dag);

if (nextNodes.size() == 1) {
Dag.DagNode<JobExecutionPlan> dagNode = nextNodes.iterator().next();
DagProcUtils.submitJobToExecutor(dagManagementStateStore, dagNode, dagId);
Expand All @@ -98,6 +97,7 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat
Dag.DagId dagId) {
DagUtils.incrementJobAttempt(dagNode);
JobExecutionPlan jobExecutionPlan = DagUtils.getJobExecutionPlan(dagNode);

JobSpec jobSpec = DagUtils.getJobSpec(dagNode);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);

Expand Down Expand Up @@ -139,12 +139,15 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat
dagManagementStateStore.updateDagNode(dagNode);
sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode);
} catch (Exception e) {
TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
String message = "Cannot submit job " + DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
log.error(message, e);
jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage());
if (jobFailedTimer != null) {
jobFailedTimer.stop(jobMetadata);
// Only mark the job as failed in case of non transient exceptions
if(!DagProcessingEngine.isTransientException(e)){
TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED);
String message = "Cannot submit job " + DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri;
log.error(message, e);
jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage());
if (jobFailedTimer != null) {
jobFailedTimer.stop(jobMetadata);
}
}
try {
// when there is no exception, quota will be released in job status monitor or re-evaluate dag proc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,8 @@
import org.apache.gobblin.testing.AssertWithBackoff;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.*;


@Slf4j
public class DagProcessingEngineTest {
Expand Down Expand Up @@ -196,9 +195,15 @@ public void dagProcessingTest()
10000L, "dagTaskStream was not called " + expectedNumOfInvocations + " number of times. "
+ "Actual number of invocations " + Mockito.mockingDetails(this.dagTaskStream).getInvocations().size(),
log, 1, 1000L);

// Currently we are treating all exceptions as non retryable and totalExceptionCount will be equal to count of non retryable exceptions
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(), expectedExceptions);
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(), expectedNonRetryableExceptions);
Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(), expectedExceptions);
}

@Test
public void isNonTransientExceptionTest(){
Assert.assertTrue(!DagProcessingEngine.isTransientException(new RuntimeException("Simulating a non retryable exception!")));
Assert.assertTrue(!DagProcessingEngine.isTransientException(new AzkabanClientException("Simulating a retryable exception!")));
}

private enum ExceptionType {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package org.apache.gobblin.service.modules.orchestration.proc;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@RunWith(PowerMockRunner.class)
@PrepareForTest(EventSubmitter.class)
public class DagProcUtilsTest {

DagManagementStateStore dagManagementStateStore;
SpecExecutor mockSpecExecutor;

@BeforeClass
public void setUp() {
dagManagementStateStore = Mockito.mock(DagManagementStateStore.class);
mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class));
}

@Test
public void testSubmitNextNodesSuccess() throws URISyntaxException, IOException {
Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678);
List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans();
for(JobExecutionPlan jobExecutionPlan: jobExecutionPlans){
Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan);
dagNodeList.add(dagNode);
}
Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
}

@Test
public void testWhenSubmitToExecutorSuccess() throws URISyntaxException, IOException, ExecutionException, InterruptedException {
Dag.DagId dagId = new Dag.DagId("flowGroup1", "flowName1", 2345680);
List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(0);
Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan);
dagNodeList.add(dagNode);
Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class);
Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics);
Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode);
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
}

@Test(expectedExceptions = RuntimeException.class)
public void testWhenSubmitToExecutorGivesRuntimeException() throws URISyntaxException, IOException, ExecutionException, InterruptedException{
Dag.DagId dagId = new Dag.DagId("flowGroup3", "flowName3", 2345678);
List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>();
JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(2);
Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan);
dagNodeList.add(dagNode);
Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList);
Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any());
SpecProducer<Spec> mockedSpecProducer = mockSpecExecutor.getProducer().get();
Mockito.doThrow(RuntimeException.class).when(mockedSpecProducer).addSpec(Mockito.any(JobSpec.class));
DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class);
Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics);
Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode);
DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId);
}

private List<JobExecutionPlan> getJobExecutionPlans() throws URISyntaxException {
Config flowConfig1 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName1")
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup1").build();
Config flowConfig2 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName2")
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup2").build();
Config flowConfig3 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName3")
.addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup3").build();
List<Config> flowConfigs = Arrays.asList(flowConfig1, flowConfig2, flowConfig3);

Config jobConfig1 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1")
.addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName1").build();
Config jobConfig2 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job2")
.addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName2").build();
Config jobConfig3 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1")
.addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName3").build();
List<Config> jobConfigs = Arrays.asList(jobConfig1, jobConfig2, jobConfig3);
List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>();
for (int i = 0; i < 3; i++) {
Config jobConfig = jobConfigs.get(i);
FlowSpec flowSpec = FlowSpec.builder("testFlowSpec").withConfig(flowConfigs.get(i)).build();
if(i==2){
jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
ConfigValueFactory.fromAnyRef("testUri")), mockSpecExecutor, 0L, ConfigFactory.empty()));
}
else{
jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH,
ConfigValueFactory.fromAnyRef("testUri")), new InMemorySpecExecutor(ConfigFactory.empty()), 0L, ConfigFactory.empty()));
}
}
return jobExecutionPlans;
}
}