Skip to content

Commit

Permalink
initialize topology specs directly without waitging for listener call…
Browse files Browse the repository at this point in the history
…backs
  • Loading branch information
arjun4084346 committed Apr 29, 2024
1 parent 4de95d6 commit 6d61f02
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@

import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.commons.lang3.StringUtils;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.quartz.CronExpression;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,6 +46,7 @@
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
Expand All @@ -58,12 +57,14 @@
import org.apache.gobblin.runtime.job_catalog.FSJobCatalog;
import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;


// Provide base implementation for constructing multi-hops route.
Expand All @@ -73,8 +74,7 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {
// Since {@link SpecCompiler} is an {@link SpecCatalogListener}, it is expected that any Spec change should be reflected
// to these data structures.
@Getter
@Setter
protected final Map<URI, TopologySpec> topologySpecMap;
protected final Map<URI, TopologySpec> topologySpecMap = Maps.newConcurrentMap();

protected final Config config;
protected final Logger log;
Expand All @@ -97,35 +97,13 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler {

private Optional<UserQuotaManager> userQuotaManager;

public BaseFlowToJobSpecCompiler(Config config){
this(config,true);
}

public BaseFlowToJobSpecCompiler(Config config, boolean instrumentationEnabled){
this(config, Optional.<Logger>absent(), true);
}

public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log){
this(config, log,true);
}

public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled){
this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
if (instrumentationEnabled) {
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), IdentityFlowToJobSpecCompiler.class);
this.flowCompilationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_SUCCESSFUL_METER));
this.flowCompilationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_FAILED_METER));
this.flowCompilationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.FLOW_COMPILATION_TIMER));
this.dataAuthorizationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.DATA_AUTHORIZATION_TIMER));
}
else {
this.metricContext = null;
this.flowCompilationSuccessFulMeter = Optional.absent();
this.flowCompilationFailedMeter = Optional.absent();
this.flowCompilationTimer = Optional.absent();
this.dataAuthorizationTimer = Optional.absent();
}

public BaseFlowToJobSpecCompiler(Config config, Collection<TopologySpec> topologySpecSet){
this.log = LoggerFactory.getLogger(getClass());
this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), IdentityFlowToJobSpecCompiler.class);
this.flowCompilationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_SUCCESSFUL_METER));
this.flowCompilationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_FAILED_METER));
this.flowCompilationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.FLOW_COMPILATION_TIMER));
this.dataAuthorizationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.DATA_AUTHORIZATION_TIMER));
this.warmStandbyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY, false);
if (this.warmStandbyEnabled) {
userQuotaManager = Optional.of(GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class,
Expand All @@ -134,10 +112,12 @@ public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean in
userQuotaManager = Optional.absent();
}

this.topologySpecMap = Maps.newConcurrentMap();
topologySpecSet.forEach(this::onAddTopologySpec);


this.config = config;

/***
/*
* ETL-5996
* For multi-tenancy, the following needs to be added:
* 1. Change singular templateCatalog to Map<URI, JobCatalogWithTemplates> to support multiple templateCatalogs
Expand Down Expand Up @@ -219,8 +199,6 @@ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) {
public AddSpecResponse onAddSpec(Spec addedSpec) {
if (addedSpec instanceof FlowSpec) {
return onAddFlowSpec((FlowSpec) addedSpec);
} else if (addedSpec instanceof TopologySpec) {
return onAddTopologySpec( (TopologySpec) addedSpec);
}
return new AddSpecResponse(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
package org.apache.gobblin.service.modules.flow;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;

Expand All @@ -50,20 +48,8 @@
@Alpha
public class IdentityFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler {

public IdentityFlowToJobSpecCompiler(Config config) {
super(config, true);
}

public IdentityFlowToJobSpecCompiler(Config config, boolean instrumentationEnabled) {
super(config, Optional.<Logger>absent(), instrumentationEnabled);
}

public IdentityFlowToJobSpecCompiler(Config config, Optional<Logger> log) {
super(config, log, true);
}

public IdentityFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
super(config, log, instrumentationEnabled);
public IdentityFlowToJobSpecCompiler(Config config, Collection<TopologySpec> topologySpecSet) {
super(config, topologySpecSet);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.gobblin.service.modules.flow;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;

Expand All @@ -28,6 +29,7 @@
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
Expand All @@ -45,8 +47,8 @@ public class MockedSpecCompiler extends IdentityFlowToJobSpecCompiler {
private static final int NUMBER_OF_JOBS = 3;
public static final String UNCOMPILABLE_FLOW = "uncompilableFlow";

public MockedSpecCompiler(Config config) {
super(config);
public MockedSpecCompiler(Config config, Collection<TopologySpec> topologySpecSet) {
super(config, topologySpecSet);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.lang.reflect.InvocationTargetException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -34,7 +36,6 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;

import com.google.common.base.Joiner;
import com.google.common.base.Optional;
Expand All @@ -57,6 +58,7 @@
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraph;
import org.apache.gobblin.service.modules.flowgraph.Dag;
Expand Down Expand Up @@ -98,26 +100,14 @@ public class MultiHopFlowCompiler extends BaseFlowToJobSpecCompiler {
// a map to hold aliases of data nodes, e.g. gobblin.service.datanode.aliases.map=node1-dev:node1,node1-stg:node1,node1-prod:node1
public static final String DATA_NODE_ID_TO_ALIAS_MAP = ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX + "datanode.aliases.map";

public MultiHopFlowCompiler(Config config) {
this(config, true);
}

public MultiHopFlowCompiler(Config config, boolean instrumentationEnabled) {
this(config, Optional.<Logger>absent(), instrumentationEnabled);
}

public MultiHopFlowCompiler(Config config, Optional<Logger> log) {
this(config, log, true);
}

public MultiHopFlowCompiler(Config config, AtomicReference<FlowGraph> flowGraph) {
super(config, Optional.absent(), true);
super(config, Collections.EMPTY_SET);
this.flowGraph = flowGraph;
this.dataMovementAuthorizer = new NoopDataMovementAuthorizer(config);
}

public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) {
super(config, log, instrumentationEnabled);
public MultiHopFlowCompiler(Config config, Collection<TopologySpec> topologySpecSet) {
super(config, topologySpecSet);
try {
this.dataNodeAliasMap = config.hasPath(DATA_NODE_ID_TO_ALIAS_MAP)
? Splitter.on(",").withKeyValueSeparator(":").split(config.getString(DATA_NODE_ID_TO_ALIAS_MAP))
Expand Down Expand Up @@ -161,7 +151,7 @@ public MultiHopFlowCompiler(Config config, Optional<Logger> log, boolean instrum
try {
String flowGraphMonitorClassName = ConfigUtils.getString(this.config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOWGRAPH_CLASS_KEY, GitFlowGraphMonitor.class.getCanonicalName());
this.flowGraphMonitor = (FlowGraphMonitor) ConstructorUtils.invokeConstructor(Class.forName(new ClassAliasResolver<>(FlowGraphMonitor.class).resolve(
flowGraphMonitorClassName)), gitFlowGraphConfig, flowTemplateCatalog, this, this.topologySpecMap, this.getInitComplete(), instrumentationEnabled);
flowGraphMonitorClassName)), gitFlowGraphConfig, flowTemplateCatalog, this, this.topologySpecMap, this.getInitComplete(), true);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.gobblin.service.modules.orchestration.TimingEventUtils;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
Expand Down Expand Up @@ -73,12 +74,12 @@ public class FlowCompilationValidationHelper {

@Inject
public FlowCompilationValidationHelper(Config config, SharedFlowMetricsSingleton sharedFlowMetricsSingleton,
UserQuotaManager userQuotaManager, FlowStatusGenerator flowStatusGenerator) {
UserQuotaManager userQuotaManager, FlowStatusGenerator flowStatusGenerator, TopologySpecFactory topologySpecFactory) {
try {
String specCompilerClassName = ConfigUtils.getString(config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY,
ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS);
this.specCompiler = (SpecCompiler) ConstructorUtils.invokeConstructor(Class.forName(
new ClassAliasResolver<>(SpecCompiler.class).resolve(specCompilerClassName)), config);
new ClassAliasResolver<>(SpecCompiler.class).resolve(specCompilerClassName)), config, topologySpecFactory.getTopologies());
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException |
ClassNotFoundException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

Expand Down Expand Up @@ -86,13 +87,14 @@ public void setup() throws Exception {
// Initialize compiler with template catalog
Properties compilerWithTemplateCatalogProperties = new Properties();
compilerWithTemplateCatalogProperties.setProperty(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY, TEST_TEMPLATE_CATALOG_URI);
this.compilerWithTemplateCalague = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(compilerWithTemplateCatalogProperties));
this.compilerWithTemplateCalague = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(compilerWithTemplateCatalogProperties),
Collections.EMPTY_SET);

// Add a topology to compiler
this.compilerWithTemplateCalague.onAddSpec(initTopologySpec());

// Initialize compiler without template catalog
this.compilerWithoutTemplateCalague = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(new Properties()));
this.compilerWithoutTemplateCalague = new IdentityFlowToJobSpecCompiler(ConfigUtils.propertiesToConfig(new Properties()), Collections.EMPTY_SET);

// Add a topology to compiler
this.compilerWithoutTemplateCalague.onAddSpec(initTopologySpec());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

Expand Down Expand Up @@ -53,6 +54,7 @@
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor;
import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler;
import org.apache.gobblin.service.modules.topology.TopologySpecFactory;
import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
Expand All @@ -61,6 +63,7 @@

import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;


Expand Down Expand Up @@ -92,6 +95,8 @@ public void setup() throws Exception {
cleanUpDir(FLOW_SPEC_STORE_DIR);

Properties orchestratorProperties = new Properties();
// Create Spec to play with
this.topologySpec = initTopologySpec();

Properties topologyProperties = new Properties();
topologyProperties.put("specStore.fs.dir", TOPOLOGY_SPEC_STORE_DIR);
Expand All @@ -114,6 +119,8 @@ public void setup() throws Exception {
FlowLaunchHandler mockFlowTriggerHandler = mock(FlowLaunchHandler.class);
DagManager mockDagManager = mock(DagManager.class);
doNothing().when(mockDagManager).setTopologySpecMap(anyMap());
TopologySpecFactory mockedTopologySpecFactory = mock(TopologySpecFactory.class);
doReturn(Collections.singleton(this.topologySpec)).when(mockedTopologySpecFactory).getTopologies();
Config config = ConfigBuilder.create()
.addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY,
MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName())
Expand All @@ -130,13 +137,12 @@ public void setup() throws Exception {
this.orchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties),
this.topologyCatalog, mockDagManager, Optional.of(logger), mockStatusGenerator,
Optional.of(mockFlowTriggerHandler), sharedFlowMetricsSingleton, Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore),
new FlowCompilationValidationHelper(config, sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockStatusGenerator));
new FlowCompilationValidationHelper(config, sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockStatusGenerator,
mockedTopologySpecFactory));
this.topologyCatalog.addListener(orchestrator);
this.flowCatalog.addListener(orchestrator);
// Start application
this.serviceLauncher.start();
// Create Spec to play with
this.topologySpec = initTopologySpec();
this.flowSpec = initFlowSpec();
}

Expand Down Expand Up @@ -244,9 +250,10 @@ public void createTopologySpec() {
}
// Make sure TopologyCatalog is empty
Assert.assertTrue(specs.size() == 0, "Spec store should be empty before addition");
// Make sure TopologyCatalog Listener is empty
Assert.assertTrue(specCompiler.getTopologySpecMap().size() == 0, "SpecCompiler should not know about any Topology "
+ "before addition");
Assert.assertTrue(specCompiler.getTopologySpecMap().size() == 1, "SpecCompiler should know about any Topology "
+ " irrespective of what is there in the topology catalog");
// Make sure TopologyCatalog empty
Assert.assertTrue(this.topologyCatalog.getSize() == 0, "Topology catalog should contain 0 Spec before addition");

// Create and add Spec
this.topologyCatalog.put(topologySpec);
Expand All @@ -262,7 +269,7 @@ public void createTopologySpec() {
// Make sure TopologyCatalog has the added Topology
Assert.assertTrue(specs.size() == 1, "Spec store should contain 1 Spec after addition");
// Make sure TopologyCatalog Listener knows about added Topology
Assert.assertTrue(specCompiler.getTopologySpecMap().size() == 1, "SpecCompiler should contain 1 Spec after addition");
Assert.assertTrue(this.topologyCatalog.getSize() == 1, "Topology catalog should contain 1 Spec after addition");
}

@Test (dependsOnMethods = "createTopologySpec")
Expand Down

0 comments on commit 6d61f02

Please sign in to comment.