From 9d48321e6eef301221649177c50702b99b6fa33b Mon Sep 17 00:00:00 2001 From: Alexander Bogdanowicz Date: Tue, 24 Oct 2023 03:12:53 +0000 Subject: [PATCH 1/3] Add Sdf Transport Support for v1.4.0-SNAPSHOT --- client/java/build.gradle | 2 +- client/java/gradle.properties | 2 +- .../client/transports/SdfConfig.java | 21 ++++++ .../client/transports/SdfTransport.java | 52 ++++++++++++++ .../transports/SdfTransportBuilder.java | 24 +++++++ .../client/transports/Transport.java | 3 +- ...lineage.client.transports.TransportBuilder | 3 +- .../client/transports/SdfTransportTest.java | 69 +++++++++++++++++++ integration/spark/gradle.properties | 2 +- integration/sql/iface-java/gradle.properties | 2 +- 10 files changed, 174 insertions(+), 6 deletions(-) create mode 100644 client/java/src/main/java/io/openlineage/client/transports/SdfConfig.java create mode 100644 client/java/src/main/java/io/openlineage/client/transports/SdfTransport.java create mode 100644 client/java/src/main/java/io/openlineage/client/transports/SdfTransportBuilder.java create mode 100644 client/java/src/test/java/io/openlineage/client/transports/SdfTransportTest.java diff --git a/client/java/build.gradle b/client/java/build.gradle index 17cf1c6c08..165940d663 100644 --- a/client/java/build.gradle +++ b/client/java/build.gradle @@ -45,7 +45,7 @@ ext { jacksonDatabindVersion = "2.13.4.2" junit5Version = '5.10.0' lombokVersion = '1.18.30' - mockitoVersion = '5.2.0' + mockitoVersion = '4.11.0' isReleaseVersion = !version.endsWith('SNAPSHOT') } diff --git a/client/java/gradle.properties b/client/java/gradle.properties index 156e86deb9..3768476e12 100644 --- a/client/java/gradle.properties +++ b/client/java/gradle.properties @@ -1 +1 @@ -version=1.4.1 +version=1.4.1-SNAPSHOT diff --git a/client/java/src/main/java/io/openlineage/client/transports/SdfConfig.java b/client/java/src/main/java/io/openlineage/client/transports/SdfConfig.java new file mode 100644 index 0000000000..4c2ac19037 --- /dev/null +++ b/client/java/src/main/java/io/openlineage/client/transports/SdfConfig.java @@ -0,0 +1,21 @@ +/* +/* Copyright 2018-2023 contributors to the OpenLineage project +/* SPDX-License-Identifier: Apache-2.0 +*/ + +package io.openlineage.client.transports; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; +import lombok.With; + +@NoArgsConstructor +@AllArgsConstructor +@ToString +@With +public final class SdfConfig implements TransportConfig { + @Getter @Setter private String output; +} diff --git a/client/java/src/main/java/io/openlineage/client/transports/SdfTransport.java b/client/java/src/main/java/io/openlineage/client/transports/SdfTransport.java new file mode 100644 index 0000000000..f53c717619 --- /dev/null +++ b/client/java/src/main/java/io/openlineage/client/transports/SdfTransport.java @@ -0,0 +1,52 @@ +/* +/* Copyright 2018-2023 contributors to the OpenLineage project +/* SPDX-License-Identifier: Apache-2.0 +*/ + +package io.openlineage.client.transports; + +import io.openlineage.client.OpenLineage; +import io.openlineage.client.OpenLineageClientUtils; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.format.DateTimeFormatter; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; + +/** + * Writes a new file for each Openlineage. Files are labeled with + * {run_id}_{yyyy_MM_dd_HH_mm_ss}.json + */ +@Slf4j +public class SdfTransport extends Transport { + + Path output_dir; + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy_MM_dd_HH_mm_ss"); + + public SdfTransport(@NonNull final SdfConfig sdfConfig) { + super(Type.SDF); + output_dir = Paths.get(sdfConfig.getOutput()); + } + + @Override + public void emit(OpenLineage.RunEvent runEvent) { + // if DEBUG loglevel is enabled, this will double-log even due to OpenLineageClient also logging + emit( + OpenLineageClientUtils.toJson(runEvent), + runEvent.getRun().getRunId().toString(), + runEvent.getEventTime().format(formatter)); + } + + public void emit(String eventAsJson, String runId, String suffix) { + Path path_to_write = output_dir.resolve(runId + "_" + suffix + ".json"); + try { + Files.write(path_to_write, eventAsJson.getBytes(StandardCharsets.UTF_8)); + log.info("emitted event: " + eventAsJson); + } catch (IOException | IllegalArgumentException e) { + log.error("Writing event to a file {} failed: {}", path_to_write.toString(), e); + } + } +} diff --git a/client/java/src/main/java/io/openlineage/client/transports/SdfTransportBuilder.java b/client/java/src/main/java/io/openlineage/client/transports/SdfTransportBuilder.java new file mode 100644 index 0000000000..b3c15c79ae --- /dev/null +++ b/client/java/src/main/java/io/openlineage/client/transports/SdfTransportBuilder.java @@ -0,0 +1,24 @@ +/* +/* Copyright 2018-2023 contributors to the OpenLineage project +/* SPDX-License-Identifier: Apache-2.0 +*/ + +package io.openlineage.client.transports; + +public class SdfTransportBuilder implements TransportBuilder { + + @Override + public TransportConfig getConfig() { + return new SdfConfig(); + } + + @Override + public Transport build(TransportConfig config) { + return new SdfTransport((SdfConfig) config); + } + + @Override + public String getType() { + return "sdf"; + } +} diff --git a/client/java/src/main/java/io/openlineage/client/transports/Transport.java b/client/java/src/main/java/io/openlineage/client/transports/Transport.java index 208254c651..2fa22bee2e 100644 --- a/client/java/src/main/java/io/openlineage/client/transports/Transport.java +++ b/client/java/src/main/java/io/openlineage/client/transports/Transport.java @@ -17,7 +17,8 @@ enum Type { HTTP, KAFKA, KINESIS, - NOOP + NOOP, + SDF }; @SuppressWarnings("PMD") // unused constructor type used for @NonNull validation diff --git a/client/java/src/main/resources/META-INF/services/io.openlineage.client.transports.TransportBuilder b/client/java/src/main/resources/META-INF/services/io.openlineage.client.transports.TransportBuilder index 9bd0f556b3..0cbb7f734c 100644 --- a/client/java/src/main/resources/META-INF/services/io.openlineage.client.transports.TransportBuilder +++ b/client/java/src/main/resources/META-INF/services/io.openlineage.client.transports.TransportBuilder @@ -2,4 +2,5 @@ io.openlineage.client.transports.HttpTransportBuilder io.openlineage.client.transports.KafkaTransportBuilder io.openlineage.client.transports.ConsoleTransportBuilder io.openlineage.client.transports.FileTransportBuilder -io.openlineage.client.transports.KinesisTransportBuilder \ No newline at end of file +io.openlineage.client.transports.KinesisTransportBuilder +io.openlineage.client.transports.SdfTransportBuilder \ No newline at end of file diff --git a/client/java/src/test/java/io/openlineage/client/transports/SdfTransportTest.java b/client/java/src/test/java/io/openlineage/client/transports/SdfTransportTest.java new file mode 100644 index 0000000000..1f3e6d8847 --- /dev/null +++ b/client/java/src/test/java/io/openlineage/client/transports/SdfTransportTest.java @@ -0,0 +1,69 @@ +/* +/* Copyright 2018-2023 contributors to the OpenLineage project +/* SPDX-License-Identifier: Apache-2.0 +*/ + +package io.openlineage.client.transports; + +import io.openlineage.client.OpenLineage; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.UUID; +import lombok.SneakyThrows; +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class SdfTransportTest { + + private static final String FILE_LOCATION_DIR = "/tmp/openlineage_sdf_transport_test"; + + SdfConfig sdfConfig; + Transport transport; + + @BeforeEach + @SneakyThrows + public void beforeEach() { + FileUtils.deleteDirectory(new File(FILE_LOCATION_DIR)); + + sdfConfig = new SdfConfig(); + sdfConfig.setOutput(FILE_LOCATION_DIR); + transport = new SdfTransport(sdfConfig); + } + + @Test + @SneakyThrows + public void transportWritesToFiles() { + transport.emit( + new OpenLineage(URI.create("http://test.producer")) + .newRunEventBuilder() + .job(new OpenLineage.JobBuilder().name("test-job").namespace("test-ns").build()) + .run(new OpenLineage.RunBuilder().runId(UUID.randomUUID()).build()) + .eventTime(ZonedDateTime.now()) + .build()); + Path output_path = Paths.get(FILE_LOCATION_DIR); + + try (DirectoryStream directoryStream = Files.newDirectoryStream(output_path)) { + Path firstFile = directoryStream.iterator().next(); + if (Files.exists(firstFile)) { + List lines = Files.readAllLines(firstFile); + + // Print the contents of the first file + for (String line : lines) { + System.out.println(line); + } + } else { + System.out.println("No files found in the directory."); + } + } catch (IOException ex) { + ex.printStackTrace(); + } + } +} diff --git a/integration/spark/gradle.properties b/integration/spark/gradle.properties index 72dd2340b8..4b19c5278d 100644 --- a/integration/spark/gradle.properties +++ b/integration/spark/gradle.properties @@ -1,4 +1,4 @@ jdk8.build=true -version=1.4.1 +version=1.4.1-SNAPSHOT spark.version=3.5.0 org.gradle.jvmargs=-Xmx1G \ No newline at end of file diff --git a/integration/sql/iface-java/gradle.properties b/integration/sql/iface-java/gradle.properties index 156e86deb9..3768476e12 100644 --- a/integration/sql/iface-java/gradle.properties +++ b/integration/sql/iface-java/gradle.properties @@ -1 +1 @@ -version=1.4.1 +version=1.4.1-SNAPSHOT From d1ed85d31561e5038519ac6c3eefe9eda2381bd3 Mon Sep 17 00:00:00 2001 From: Alexander Bogdanowicz Date: Tue, 14 Nov 2023 19:16:40 +0000 Subject: [PATCH 2/3] Commit current stage of code --- .../InternalEventHandlerFactory.java | 2 + .../agent/OpenLineageSparkListenerTest.java | 1 - .../spark/v1/analyzed-logical-plan-facet.json | 95 +++++++++++++++++++ .../facets/AnalyzedLogicalPlanFacet.java | 30 ++++++ .../AnalyzedLogicalPlanRunFacetBuilder.java | 55 +++++++++++ 5 files changed, 182 insertions(+), 1 deletion(-) create mode 100644 integration/spark/shared/facets/spark/v1/analyzed-logical-plan-facet.json create mode 100644 integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/AnalyzedLogicalPlanFacet.java create mode 100644 integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/builder/AnalyzedLogicalPlanRunFacetBuilder.java diff --git a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/InternalEventHandlerFactory.java b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/InternalEventHandlerFactory.java index 0d85f8e530..3afb727eec 100644 --- a/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/InternalEventHandlerFactory.java +++ b/integration/spark/app/src/main/java/io/openlineage/spark/agent/lifecycle/InternalEventHandlerFactory.java @@ -14,6 +14,7 @@ import io.openlineage.client.OpenLineage.OutputDataset; import io.openlineage.client.OpenLineage.OutputDatasetFacet; import io.openlineage.client.OpenLineage.RunFacet; +import io.openlineage.spark.agent.facets.builder.AnalyzedLogicalPlanRunFacetBuilder; import io.openlineage.spark.agent.facets.builder.CustomEnvironmentFacetBuilder; import io.openlineage.spark.agent.facets.builder.DatabricksEnvironmentFacetBuilder; import io.openlineage.spark.agent.facets.builder.DebugRunFacetBuilder; @@ -189,6 +190,7 @@ public Collection>> createOutputData .add( new ErrorFacetBuilder(), new LogicalPlanRunFacetBuilder(context), + new AnalyzedLogicalPlanRunFacetBuilder(context), new DebugRunFacetBuilder(context), new SparkVersionFacetBuilder(context), new SparkPropertyFacetBuilder(context), diff --git a/integration/spark/app/src/test/java/io/openlineage/spark/agent/OpenLineageSparkListenerTest.java b/integration/spark/app/src/test/java/io/openlineage/spark/agent/OpenLineageSparkListenerTest.java index e7d1387ed7..85c86d205e 100644 --- a/integration/spark/app/src/test/java/io/openlineage/spark/agent/OpenLineageSparkListenerTest.java +++ b/integration/spark/app/src/test/java/io/openlineage/spark/agent/OpenLineageSparkListenerTest.java @@ -146,7 +146,6 @@ void testOpenlineageDisableDisablesExecution() throws URISyntaxException { @Test void testSparkSQLEndGetsQueryExecutionFromEvent() { LogicalPlan query = UnresolvedRelation$.MODULE$.apply(TableIdentifier.apply("tableName")); - when(sparkSession.sparkContext()).thenReturn(sparkContext); when(sparkContext.appName()).thenReturn("appName"); when(sparkContext.getConf()).thenReturn(new SparkConf()); diff --git a/integration/spark/shared/facets/spark/v1/analyzed-logical-plan-facet.json b/integration/spark/shared/facets/spark/v1/analyzed-logical-plan-facet.json new file mode 100644 index 0000000000..7b9166a718 --- /dev/null +++ b/integration/spark/shared/facets/spark/v1/analyzed-logical-plan-facet.json @@ -0,0 +1,95 @@ +{ + "$schema": "http://json-schema.org/schema#", + "definitions": { + "Run": { + "properties": { + "facets": { + "properties": { + "spark.analyzedLogicalPlan": { + "$ref": "#/definitions/AnalyzedLogicalPlanRunFacet" + } + } + } + } + }, + "AnalyzedLogicalPlanRunFacet": { + "type": "object", + "properties": { + "type": { + "type": "string" + }, + "name": { + "type": "string" + }, + "children": { + "type": "array", + "items": { + "anyOf": [ + { + "$ref": "#/definitions/AnalyzedLogicalPlanRunFacet" + }, + { + "$ref": "#/definitions/InsertIntoHadoopFsRelationCommand" + } + ] + } + } + }, + "additionalProperties": true + }, + "InsertIntoHadoopFsRelationCommand": { + "type": "object", + "properties":{ + "outputPath": { + "type": "string" + } + }, + "additionalProperties": true + }, + "LogicalRelation": { + "type": "object", + "properties":{ + "relation": { + "anyOf": [ + { + "$ref": "#/definitions/HadoopFsRelation" + }, + { + "$ref": "#/definitions/JDBCRelation" + } + ] + } + }, + "additionalProperties": true + }, + "HadoopFsRelation": { + "type": "object", + "properties":{ + "relation": { + "type": "object", + "properties": { + "location": { + "type": "string" + } + } + } + }, + "additionalProperties": true + }, + "JDBCRelation": { + "type": "object", + "properties":{ + "relation": { + "type": "object", + "properties": { + "jdbcOptions": { + "type": "object" + } + } + } + }, + "additionalProperties": true + } + + } +} \ No newline at end of file diff --git a/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/AnalyzedLogicalPlanFacet.java b/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/AnalyzedLogicalPlanFacet.java new file mode 100644 index 0000000000..a4680352b6 --- /dev/null +++ b/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/AnalyzedLogicalPlanFacet.java @@ -0,0 +1,30 @@ +/* +/* Copyright 2018-2023 contributors to the OpenLineage project +/* SPDX-License-Identifier: Apache-2.0 +*/ + +package io.openlineage.spark.agent.facets; + +import com.fasterxml.jackson.annotation.JsonRawValue; +import io.openlineage.client.OpenLineage; +import io.openlineage.spark.agent.Versions; +import lombok.Builder; +import lombok.ToString; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; + +@ToString +public class AnalyzedLogicalPlanFacet extends OpenLineage.DefaultRunFacet { + private final LogicalPlan plan; + + @Builder + public AnalyzedLogicalPlanFacet(LogicalPlan plan) { + super(Versions.OPEN_LINEAGE_PRODUCER_URI); + this.plan = plan; + } + + @JsonRawValue + public String getPlan() { + return plan.toJSON(); + } + +} diff --git a/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/builder/AnalyzedLogicalPlanRunFacetBuilder.java b/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/builder/AnalyzedLogicalPlanRunFacetBuilder.java new file mode 100644 index 0000000000..25cd189a44 --- /dev/null +++ b/integration/spark/shared/src/main/java/io/openlineage/spark/agent/facets/builder/AnalyzedLogicalPlanRunFacetBuilder.java @@ -0,0 +1,55 @@ +/* +/* Copyright 2018-2023 contributors to the OpenLineage project +/* SPDX-License-Identifier: Apache-2.0 +*/ + +package io.openlineage.spark.agent.facets.builder; + +import static io.openlineage.spark.agent.util.FacetUtils.isFacetDisabled; + +import io.openlineage.spark.agent.facets.AnalyzedLogicalPlanFacet; +import io.openlineage.spark.api.CustomFacetBuilder; +import io.openlineage.spark.api.OpenLineageContext; +import java.util.function.BiConsumer; +import org.apache.spark.scheduler.SparkListenerJobEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd; +import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart; + +/** + * {@link CustomFacetBuilder} that generates a {@link AnalyzedLogicalPlanFacet} for each {@link + * SparkListenerSQLExecutionStart}, {@link SparkListenerSQLExecutionEnd}, and {@link + * SparkListenerJobEnd} event if a {@link org.apache.spark.sql.execution.QueryExecution} is present. + */ +public class AnalyzedLogicalPlanRunFacetBuilder + extends CustomFacetBuilder { + private final OpenLineageContext openLineageContext; + + public AnalyzedLogicalPlanRunFacetBuilder(OpenLineageContext openLineageContext) { + this.openLineageContext = openLineageContext; + } + + @Override + public boolean isDefinedAt(Object x) { + if (isFacetDisabled(openLineageContext, "spark.analyzedLogicalPlan")) { + return false; + } + return (x instanceof SparkListenerSQLExecutionEnd + || x instanceof SparkListenerSQLExecutionStart + || x instanceof SparkListenerJobStart + || x instanceof SparkListenerJobEnd) + && openLineageContext.getQueryExecution().isPresent(); + } + + @Override + protected void build( + Object event, BiConsumer consumer) { + openLineageContext + .getQueryExecution() + .ifPresent( + qe -> + consumer.accept( + "spark.analyzedLogicalPlan", + AnalyzedLogicalPlanFacet.builder().plan(qe.analyzed()).build())); + } +} From 3e26236bc4964d7750e0b3e97656c1fcdbb54380 Mon Sep 17 00:00:00 2001 From: Alexander Bogdanowicz Date: Tue, 14 Nov 2023 19:52:44 +0000 Subject: [PATCH 3/3] Docs --- how-to-build.md | 50 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 how-to-build.md diff --git a/how-to-build.md b/how-to-build.md new file mode 100644 index 0000000000..54a2adb74d --- /dev/null +++ b/how-to-build.md @@ -0,0 +1,50 @@ +# Instructions for Building the OpenLineage Jar + +Note: Please run these instructions within an Intellij IDE, which helps to configure / install all of the gradle build dependencies & jars automatically + +To install a clean jar, start by cleaning up your local maven cache: + +```bash +sudo rm -r ~/.m2/repository +``` + +Next, navigate to `client/java` and run: + +```bash +./gradlew clean build publishToMavenLocal +``` + +Once that completes successfully, go ahead and navigate to `integration/sql/iface-java` and run: + +``` +cargo clean +./script/compile.sh +./script/build.sh +``` + + +Lastly, navigate to `integration/spark`. Run: + +``` +./gradlew clean --refresh-dependencies +``` + + +You may run into an error if the `gradle.properties` defined version has a `-SNAPSHOT` suffix. Remove it, run the above command, and before proceeding further,ensure that you have added it back. Your `gradle.properties` should look as follows: + +```gradle.properties +jdk8.build=true +version={YOUR-VERSION-NUMBER}-SNAPSHOT +spark.version=3.5.0 +org.gradle.jvmargs=-Xmx1G +``` + +Finally run: + +``` +./gradlew build -x test +``` + +*Note:* This currently excludes tests as some are failing + +The jar should be available at: `intergation/spark/build/libs/openlineage-spark-1.4.1-SNAPSHOT.jar` \ No newline at end of file