diff --git a/api/read-only-replacements/MarquezApp.java b/api/read-only-replacements/MarquezApp.java new file mode 100644 index 0000000000..f53ed444ab --- /dev/null +++ b/api/read-only-replacements/MarquezApp.java @@ -0,0 +1,222 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez; + +import com.codahale.metrics.jdbi3.InstrumentedSqlLogger; +import com.fasterxml.jackson.databind.SerializationFeature; +import io.dropwizard.Application; +import io.dropwizard.assets.AssetsBundle; +import io.dropwizard.configuration.EnvironmentVariableSubstitutor; +import io.dropwizard.configuration.SubstitutingSourceProvider; +import io.dropwizard.db.DataSourceFactory; +import io.dropwizard.db.ManagedDataSource; +import io.dropwizard.jdbi3.JdbiFactory; +import io.dropwizard.setup.Bootstrap; +import io.dropwizard.setup.Environment; +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.dropwizard.DropwizardExports; +import io.prometheus.client.exporter.MetricsServlet; +import io.prometheus.client.hotspot.DefaultExports; +import io.sentry.Sentry; +import java.util.EnumSet; +import javax.servlet.DispatcherType; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import marquez.api.filter.JobRedirectFilter; +import marquez.api.filter.exclusions.Exclusions; +import marquez.api.filter.exclusions.ExclusionsConfig; +import marquez.cli.DbMigrateCommand; +import marquez.cli.DbRetentionCommand; +import marquez.cli.MetadataCommand; +import marquez.cli.SeedCommand; +import marquez.common.Utils; +import marquez.db.DbMigration; +import marquez.jobs.DbRetentionJob; +import marquez.jobs.MaterializeViewRefresherJob; +import marquez.logging.DelegatingSqlLogger; +import marquez.logging.LabelledSqlLogger; +import marquez.logging.LoggingMdcFilter; +import marquez.service.DatabaseMetrics; +import marquez.tracing.SentryConfig; +import marquez.tracing.TracingContainerResponseFilter; +import marquez.tracing.TracingSQLLogger; +import marquez.tracing.TracingServletFilter; +import org.flywaydb.core.api.FlywayException; +import org.jdbi.v3.core.Jdbi; +import org.jdbi.v3.core.statement.SqlLogger; +import org.jdbi.v3.jackson2.Jackson2Config; +import org.jdbi.v3.jackson2.Jackson2Plugin; +import org.jdbi.v3.postgres.PostgresPlugin; +import org.jdbi.v3.sqlobject.SqlObjectPlugin; + +@Slf4j +public final class MarquezApp extends Application { + private static final String APP_NAME = "MarquezApp"; + private static final String DB_SOURCE_NAME = APP_NAME + "-source"; + private static final String DB_POSTGRES = "postgresql"; + private static final boolean ERROR_ON_UNDEFINED = false; + + // Monitoring + private static final String PROMETHEUS = "prometheus"; + private static final String PROMETHEUS_V2 = "prometheus_v2"; + private static final String PROMETHEUS_ENDPOINT = "/metrics"; + private static final String PROMETHEUS_ENDPOINT_V2 = "/v2beta/metrics"; + + public static void main(final String[] args) throws Exception { + new MarquezApp().run(args); + } + + @Override + public String getName() { + return APP_NAME; + } + + @Override + public void initialize(@NonNull Bootstrap bootstrap) { + // Enable metric collection for prometheus. + CollectorRegistry.defaultRegistry.register( + new DropwizardExports(bootstrap.getMetricRegistry())); + DatabaseMetrics.registry.register(new DropwizardExports(bootstrap.getMetricRegistry())); + DefaultExports.initialize(); // Add metrics for CPU, JVM memory, etc. + DefaultExports.register(DatabaseMetrics.registry); + + // Enable variable substitution with environment variables. + bootstrap.setConfigurationSourceProvider( + new SubstitutingSourceProvider( + bootstrap.getConfigurationSourceProvider(), + new EnvironmentVariableSubstitutor(ERROR_ON_UNDEFINED))); + + // Add CLI commands + bootstrap.addCommand(new DbMigrateCommand()); + bootstrap.addCommand(new DbRetentionCommand()); + bootstrap.addCommand(new MetadataCommand()); + bootstrap.addCommand(new SeedCommand()); + + bootstrap.getObjectMapper().disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + Utils.addZonedDateTimeMixin(bootstrap.getObjectMapper()); + + // Add graphql playground + bootstrap.addBundle( + new AssetsBundle( + "/assets", + "/graphql-playground", + "graphql-playground/index.htm", + "graphql-playground")); + } + + @Override + public void run(@NonNull MarquezConfig config, @NonNull Environment env) { + final DataSourceFactory sourceFactory = config.getDataSourceFactory(); + final ManagedDataSource source = sourceFactory.build(env.metrics(), DB_SOURCE_NAME); + + log.info("Running startup actions..."); + + // try { + // DbMigration.migrateDbOrError(config.getFlywayFactory(), source, config.isMigrateOnStartup()); + // } catch (FlywayException errorOnDbMigrate) { + // log.info("Stopping app..."); + // // Propagate throwable up the stack. + // onFatalError(errorOnDbMigrate); // Signal app termination. + // } + + if (isSentryEnabled(config)) { + Sentry.init( + options -> { + options.setTracesSampleRate(config.getSentry().getTracesSampleRate()); + options.setEnvironment(config.getSentry().getEnvironment()); + options.setDsn(config.getSentry().getDsn()); + options.setDebug(config.getSentry().isDebug()); + }); + + env.servlets() + .addFilter("tracing-filter", new TracingServletFilter()) + .addMappingForUrlPatterns(EnumSet.of(DispatcherType.REQUEST), true, "/*"); + env.jersey().register(new TracingContainerResponseFilter()); + } + + final Jdbi jdbi = newJdbi(config, env, source); + final MarquezContext marquezContext = + MarquezContext.builder() + .jdbi(jdbi) + .searchConfig(config.getSearchConfig()) + .tags(config.getTags()) + .build(); + + registerResources(config, env, marquezContext); + registerServlets(env); + registerFilters(env, marquezContext); + + // // Add scheduled jobs to lifecycle. + // if (config.hasDbRetentionPolicy()) { + // // Add job to apply retention policy to database. + // env.lifecycle().manage(new DbRetentionJob(jdbi, config.getDbRetention())); + // } + + // // Add job to refresh materialized views. + // env.lifecycle().manage(new MaterializeViewRefresherJob(jdbi)); + + // set namespaceFilter + ExclusionsConfig exclusions = config.getExclude(); + Exclusions.use(exclusions); + } + + private boolean isSentryEnabled(MarquezConfig config) { + return config.getSentry() != null + && !config.getSentry().getDsn().equals(SentryConfig.DEFAULT_DSN); + } + + /** Returns a new {@link Jdbi} object. */ + private Jdbi newJdbi( + @NonNull MarquezConfig config, @NonNull Environment env, @NonNull ManagedDataSource source) { + final JdbiFactory factory = new JdbiFactory(); + final Jdbi jdbi = + factory + .build(env, config.getDataSourceFactory(), source, DB_POSTGRES) + .installPlugin(new SqlObjectPlugin()) + .installPlugin(new PostgresPlugin()) + .installPlugin(new Jackson2Plugin()); + SqlLogger sqlLogger = + new DelegatingSqlLogger(new LabelledSqlLogger(), new InstrumentedSqlLogger(env.metrics())); + if (isSentryEnabled(config)) { + sqlLogger = new TracingSQLLogger(sqlLogger); + } + jdbi.setSqlLogger(sqlLogger); + jdbi.getConfig(Jackson2Config.class).setMapper(Utils.getMapper()); + return jdbi; + } + + public void registerResources( + @NonNull MarquezConfig config, @NonNull Environment env, MarquezContext context) { + + if (config.getGraphql().isEnabled()) { + env.servlets() + .addServlet("api/v1-beta/graphql", context.getGraphqlServlet()) + .addMapping("/api/v1-beta/graphql", "/api/v1/schema.json"); + } + + log.debug("Registering resources..."); + for (final Object resource : context.getResources()) { + env.jersey().register(resource); + } + } + + private void registerServlets(@NonNull Environment env) { + log.debug("Registering servlets..."); + + // Expose metrics for monitoring. + env.servlets().addServlet(PROMETHEUS, new MetricsServlet()).addMapping(PROMETHEUS_ENDPOINT); + env.servlets() + .addServlet(PROMETHEUS_V2, new MetricsServlet(DatabaseMetrics.registry)) + .addMapping(PROMETHEUS_ENDPOINT_V2); + } + + private void registerFilters(@NonNull Environment env, MarquezContext marquezContext) { + env.jersey().getResourceConfig().register(new LoggingMdcFilter()); + env.jersey() + .getResourceConfig() + .register(new JobRedirectFilter(marquezContext.getJobService())); + } +} diff --git a/api/read-only-replacements/MarquezContext.java b/api/read-only-replacements/MarquezContext.java new file mode 100644 index 0000000000..765c2e00c1 --- /dev/null +++ b/api/read-only-replacements/MarquezContext.java @@ -0,0 +1,253 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import graphql.kickstart.servlet.GraphQLHttpServlet; +import java.util.ArrayList; +import java.util.List; +import lombok.Getter; +import lombok.NonNull; +import marquez.api.ColumnLineageResource; +import marquez.api.DatasetResource; +import marquez.api.JobResource; +import marquez.api.NamespaceResource; +import marquez.api.OpenLineageResource; +import marquez.api.SearchResource; +import marquez.api.SourceResource; +import marquez.api.StatsResource; +import marquez.api.TagResource; +import marquez.api.exceptions.JdbiExceptionExceptionMapper; +import marquez.api.exceptions.JsonProcessingExceptionMapper; +import marquez.db.BaseDao; +import marquez.db.ColumnLineageDao; +import marquez.db.DatasetDao; +import marquez.db.DatasetFieldDao; +import marquez.db.DatasetVersionDao; +import marquez.db.JobDao; +import marquez.db.JobFacetsDao; +import marquez.db.JobVersionDao; +import marquez.db.LineageDao; +import marquez.db.NamespaceDao; +import marquez.db.OpenLineageDao; +import marquez.db.RunArgsDao; +import marquez.db.RunDao; +import marquez.db.RunFacetsDao; +import marquez.db.RunStateDao; +import marquez.db.SearchDao; +import marquez.db.SourceDao; +import marquez.db.StatsDao; +import marquez.db.TagDao; +import marquez.graphql.GraphqlSchemaBuilder; +import marquez.graphql.MarquezGraphqlServletBuilder; +import marquez.search.SearchConfig; +import marquez.service.ColumnLineageService; +import marquez.service.DatasetFieldService; +import marquez.service.DatasetService; +import marquez.service.DatasetVersionService; +import marquez.service.JobService; +import marquez.service.LineageService; +import marquez.service.NamespaceService; +import marquez.service.OpenLineageService; +import marquez.service.RunService; +import marquez.service.RunTransitionListener; +import marquez.service.SearchService; +import marquez.service.ServiceFactory; +import marquez.service.SourceService; +import marquez.service.StatsService; +import marquez.service.TagService; +import marquez.service.models.Tag; +import org.jdbi.v3.core.Jdbi; + +@Getter +public final class MarquezContext { + @Getter private final NamespaceDao namespaceDao; + @Getter private final SourceDao sourceDao; + @Getter private final DatasetDao datasetDao; + @Getter private final DatasetFieldDao datasetFieldDao; + @Getter private final DatasetVersionDao datasetVersionDao; + @Getter private final JobDao jobDao; + @Getter private final JobVersionDao jobVersionDao; + @Getter private final JobFacetsDao jobFacetsDao; + @Getter private final RunDao runDao; + @Getter private final RunArgsDao runArgsDao; + @Getter private final RunFacetsDao runFacetsDao; + @Getter private final RunStateDao runStateDao; + @Getter private final TagDao tagDao; + @Getter private final OpenLineageDao openLineageDao; + @Getter private final LineageDao lineageDao; + @Getter private final ColumnLineageDao columnLineageDao; + @Getter private final SearchDao searchDao; + @Getter private final StatsDao statsDao; + @Getter private final List runTransitionListeners; + + @Getter private final NamespaceService namespaceService; + @Getter private final SourceService sourceService; + @Getter private final DatasetService datasetService; + @Getter private final JobService jobService; + @Getter private final TagService tagService; + @Getter private final RunService runService; + @Getter private final OpenLineageService openLineageService; + @Getter private final LineageService lineageService; + @Getter private final ColumnLineageService columnLineageService; + @Getter private final SearchService searchService; + @Getter private final StatsService statsService; + @Getter private final NamespaceResource namespaceResource; + @Getter private final SourceResource sourceResource; + @Getter private final DatasetResource datasetResource; + @Getter private final ColumnLineageResource columnLineageResource; + @Getter private final JobResource jobResource; + @Getter private final TagResource tagResource; + @Getter private final OpenLineageResource openLineageResource; + @Getter private final marquez.api.v2beta.SearchResource v2BetasearchResource; + @Getter private final SearchResource searchResource; + @Getter private final StatsResource opsResource; + @Getter private final ImmutableList resources; + @Getter private final JdbiExceptionExceptionMapper jdbiException; + @Getter private final JsonProcessingExceptionMapper jsonException; + @Getter private final GraphQLHttpServlet graphqlServlet; + @Getter private final SearchConfig searchConfig; + + private MarquezContext( + @NonNull final Jdbi jdbi, + @NonNull final SearchConfig searchConfig, + @NonNull final ImmutableSet tags, + List runTransitionListeners) { + if (runTransitionListeners == null) { + runTransitionListeners = new ArrayList<>(); + } + this.searchConfig = searchConfig; + + final BaseDao baseDao = jdbi.onDemand(NamespaceDao.class); + this.namespaceDao = jdbi.onDemand(NamespaceDao.class); + this.sourceDao = jdbi.onDemand(SourceDao.class); + this.datasetDao = jdbi.onDemand(DatasetDao.class); + this.datasetFieldDao = jdbi.onDemand(DatasetFieldDao.class); + this.datasetVersionDao = jdbi.onDemand(DatasetVersionDao.class); + this.jobDao = jdbi.onDemand(JobDao.class); + this.jobVersionDao = jdbi.onDemand(JobVersionDao.class); + this.jobFacetsDao = jdbi.onDemand(JobFacetsDao.class); + this.runDao = jdbi.onDemand(RunDao.class); + this.runArgsDao = jdbi.onDemand(RunArgsDao.class); + this.runFacetsDao = jdbi.onDemand(RunFacetsDao.class); + this.runStateDao = jdbi.onDemand(RunStateDao.class); + this.tagDao = jdbi.onDemand(TagDao.class); + this.openLineageDao = jdbi.onDemand(OpenLineageDao.class); + this.lineageDao = jdbi.onDemand(LineageDao.class); + this.columnLineageDao = jdbi.onDemand(ColumnLineageDao.class); + this.searchDao = jdbi.onDemand(SearchDao.class); + this.statsDao = jdbi.onDemand(StatsDao.class); + this.runTransitionListeners = runTransitionListeners; + + this.namespaceService = new NamespaceService(baseDao); + this.sourceService = new SourceService(baseDao); + this.runService = new RunService(baseDao, runTransitionListeners); + this.datasetService = new DatasetService(datasetDao, runService); + + this.jobService = new JobService(baseDao, runService); + this.tagService = new TagService(baseDao); + // this.tagService.init(tags); + this.openLineageService = new OpenLineageService(baseDao, runService); + this.lineageService = new LineageService(lineageDao, jobDao, runDao); + this.columnLineageService = new ColumnLineageService(columnLineageDao, datasetFieldDao); + this.searchService = new SearchService(searchConfig); + this.statsService = new StatsService(statsDao); + this.jdbiException = new JdbiExceptionExceptionMapper(); + this.jsonException = new JsonProcessingExceptionMapper(); + final ServiceFactory serviceFactory = + ServiceFactory.builder() + .datasetService(datasetService) + .jobService(jobService) + .runService(runService) + .namespaceService(namespaceService) + .tagService(tagService) + .openLineageService(openLineageService) + .searchService(searchService) + .sourceService(sourceService) + .lineageService(lineageService) + .columnLineageService(columnLineageService) + .datasetFieldService(new DatasetFieldService(baseDao)) + .datasetVersionService(new DatasetVersionService(baseDao)) + .statsService(statsService) + .build(); + this.namespaceResource = new NamespaceResource(serviceFactory); + this.sourceResource = new SourceResource(serviceFactory); + this.datasetResource = new DatasetResource(serviceFactory); + this.columnLineageResource = new ColumnLineageResource(serviceFactory); + this.jobResource = new JobResource(serviceFactory, jobVersionDao, jobFacetsDao, runFacetsDao); + this.tagResource = new TagResource(serviceFactory); + this.openLineageResource = new OpenLineageResource(serviceFactory, openLineageDao); + this.searchResource = new SearchResource(searchDao); + this.opsResource = new StatsResource(serviceFactory); + this.v2BetasearchResource = new marquez.api.v2beta.SearchResource(serviceFactory); + + this.resources = + ImmutableList.of( + namespaceResource, + sourceResource, + datasetResource, + columnLineageResource, + jobResource, + tagResource, + jdbiException, + jsonException, + openLineageResource, + searchResource, + v2BetasearchResource, + opsResource); + + final MarquezGraphqlServletBuilder servlet = new MarquezGraphqlServletBuilder(); + this.graphqlServlet = servlet.getServlet(new GraphqlSchemaBuilder(jdbi)); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private Jdbi jdbi; + private SearchConfig searchConfig; + private ImmutableSet tags; + private List runTransitionListeners; + + Builder() { + this.tags = ImmutableSet.of(); + this.runTransitionListeners = new ArrayList<>(); + } + + public Builder jdbi(@NonNull Jdbi jdbi) { + this.jdbi = jdbi; + return this; + } + + public Builder searchConfig(@NonNull SearchConfig searchConfig) { + this.searchConfig = searchConfig; + return this; + } + + public Builder tags(@NonNull ImmutableSet tags) { + this.tags = tags; + return this; + } + + public Builder runTransitionListener(@NonNull RunTransitionListener runTransitionListener) { + return runTransitionListeners(Lists.newArrayList(runTransitionListener)); + } + + public Builder runTransitionListeners( + @NonNull List runTransitionListeners) { + this.runTransitionListeners.addAll(runTransitionListeners); + return this; + } + + public MarquezContext build() { + return new MarquezContext(jdbi, searchConfig, tags, runTransitionListeners); + } + } +} diff --git a/api/read-only-replacements/NamespaceService.java b/api/read-only-replacements/NamespaceService.java new file mode 100644 index 0000000000..180a3fd050 --- /dev/null +++ b/api/read-only-replacements/NamespaceService.java @@ -0,0 +1,44 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service; + +import io.prometheus.client.Counter; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; +import marquez.common.models.NamespaceName; +import marquez.common.models.OwnerName; +import marquez.db.BaseDao; +import marquez.service.models.Namespace; +import marquez.service.models.NamespaceMeta; + +@Slf4j +public class NamespaceService extends DelegatingDaos.DelegatingNamespaceDao { + private static final Counter namespaces = + Counter.build() + .namespace("marquez") + .name("namespace_total") + .help("Namespace creation invocations") + .register(); + + public NamespaceService(@NonNull final BaseDao baseDao) { + super(baseDao.createNamespaceDao()); + // init(); + } + + private void init() { + final NamespaceMeta meta = + new NamespaceMeta( + OwnerName.ANONYMOUS, + "The default global namespace for dataset, job, and run metadata " + + "not belonging to a user-specified namespace."); + upsertNamespaceMeta(NamespaceName.DEFAULT, meta); + } + + public Namespace createOrUpdate(@NonNull NamespaceName name, @NonNull NamespaceMeta meta) { + namespaces.inc(); + return upsertNamespaceMeta(name, meta); + } +}