Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Sdf Transport Support for v1.4.1-SNAPSHOT #1

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ ext {
jacksonDatabindVersion = "2.13.4.2"
junit5Version = '5.10.0'
lombokVersion = '1.18.30'
mockitoVersion = '5.2.0'
mockitoVersion = '4.11.0'
isReleaseVersion = !version.endsWith('SNAPSHOT')
}

Expand Down
2 changes: 1 addition & 1 deletion client/java/gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=1.4.1
version=1.4.1-SNAPSHOT
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
/* Copyright 2018-2023 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.client.transports;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.With;

@NoArgsConstructor
@AllArgsConstructor
@ToString
@With
public final class SdfConfig implements TransportConfig {
@Getter @Setter private String output;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
/* Copyright 2018-2023 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.client.transports;

import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.format.DateTimeFormatter;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;

/**
* Writes a new file for each Openlineage. Files are labeled with
* {run_id}_{yyyy_MM_dd_HH_mm_ss}.json
*/
@Slf4j
public class SdfTransport extends Transport {

Path output_dir;
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy_MM_dd_HH_mm_ss");

public SdfTransport(@NonNull final SdfConfig sdfConfig) {
super(Type.SDF);
output_dir = Paths.get(sdfConfig.getOutput());
}

@Override
public void emit(OpenLineage.RunEvent runEvent) {
// if DEBUG loglevel is enabled, this will double-log even due to OpenLineageClient also logging
emit(
OpenLineageClientUtils.toJson(runEvent),
runEvent.getRun().getRunId().toString(),
runEvent.getEventTime().format(formatter));
}

public void emit(String eventAsJson, String runId, String suffix) {
Path path_to_write = output_dir.resolve(runId + "_" + suffix + ".json");
try {
Files.write(path_to_write, eventAsJson.getBytes(StandardCharsets.UTF_8));
log.info("emitted event: " + eventAsJson);
} catch (IOException | IllegalArgumentException e) {
log.error("Writing event to a file {} failed: {}", path_to_write.toString(), e);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we catch all exceptions also?

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
/* Copyright 2018-2023 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.client.transports;

public class SdfTransportBuilder implements TransportBuilder {

@Override
public TransportConfig getConfig() {
return new SdfConfig();
}

@Override
public Transport build(TransportConfig config) {
return new SdfTransport((SdfConfig) config);
}

@Override
public String getType() {
return "sdf";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ enum Type {
HTTP,
KAFKA,
KINESIS,
NOOP
NOOP,
SDF
};

@SuppressWarnings("PMD") // unused constructor type used for @NonNull validation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ io.openlineage.client.transports.HttpTransportBuilder
io.openlineage.client.transports.KafkaTransportBuilder
io.openlineage.client.transports.ConsoleTransportBuilder
io.openlineage.client.transports.FileTransportBuilder
io.openlineage.client.transports.KinesisTransportBuilder
io.openlineage.client.transports.KinesisTransportBuilder
io.openlineage.client.transports.SdfTransportBuilder
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
/* Copyright 2018-2023 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.client.transports;

import io.openlineage.client.OpenLineage;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.UUID;
import lombok.SneakyThrows;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class SdfTransportTest {

private static final String FILE_LOCATION_DIR = "/tmp/openlineage_sdf_transport_test";

SdfConfig sdfConfig;
Transport transport;

@BeforeEach
@SneakyThrows
public void beforeEach() {
FileUtils.deleteDirectory(new File(FILE_LOCATION_DIR));

sdfConfig = new SdfConfig();
sdfConfig.setOutput(FILE_LOCATION_DIR);
transport = new SdfTransport(sdfConfig);
}

@Test
@SneakyThrows
public void transportWritesToFiles() {
transport.emit(
new OpenLineage(URI.create("http://test.producer"))
.newRunEventBuilder()
.job(new OpenLineage.JobBuilder().name("test-job").namespace("test-ns").build())
.run(new OpenLineage.RunBuilder().runId(UUID.randomUUID()).build())
.eventTime(ZonedDateTime.now())
.build());
Path output_path = Paths.get(FILE_LOCATION_DIR);

try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(output_path)) {
Path firstFile = directoryStream.iterator().next();
if (Files.exists(firstFile)) {
List<String> lines = Files.readAllLines(firstFile);

// Print the contents of the first file
for (String line : lines) {
System.out.println(line);
}
} else {
System.out.println("No files found in the directory.");
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
50 changes: 50 additions & 0 deletions how-to-build.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Instructions for Building the OpenLineage Jar

Note: Please run these instructions within an Intellij IDE, which helps to configure / install all of the gradle build dependencies & jars automatically

To install a clean jar, start by cleaning up your local maven cache:

```bash
sudo rm -r ~/.m2/repository
```

Next, navigate to `client/java` and run:

```bash
./gradlew clean build publishToMavenLocal
```

Once that completes successfully, go ahead and navigate to `integration/sql/iface-java` and run:

```
cargo clean
./script/compile.sh
./script/build.sh
```


Lastly, navigate to `integration/spark`. Run:

```
./gradlew clean --refresh-dependencies
```


You may run into an error if the `gradle.properties` defined version has a `-SNAPSHOT` suffix. Remove it, run the above command, and before proceeding further,ensure that you have added it back. Your `gradle.properties` should look as follows:

```gradle.properties
jdk8.build=true
version={YOUR-VERSION-NUMBER}-SNAPSHOT
spark.version=3.5.0
org.gradle.jvmargs=-Xmx1G
```

Finally run:

```
./gradlew build -x test
```

*Note:* This currently excludes tests as some are failing

The jar should be available at: `intergation/spark/build/libs/openlineage-spark-1.4.1-SNAPSHOT.jar`
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.openlineage.client.OpenLineage.OutputDataset;
import io.openlineage.client.OpenLineage.OutputDatasetFacet;
import io.openlineage.client.OpenLineage.RunFacet;
import io.openlineage.spark.agent.facets.builder.AnalyzedLogicalPlanRunFacetBuilder;
import io.openlineage.spark.agent.facets.builder.CustomEnvironmentFacetBuilder;
import io.openlineage.spark.agent.facets.builder.DatabricksEnvironmentFacetBuilder;
import io.openlineage.spark.agent.facets.builder.DebugRunFacetBuilder;
Expand Down Expand Up @@ -189,6 +190,7 @@ public Collection<PartialFunction<Object, List<OutputDataset>>> createOutputData
.add(
new ErrorFacetBuilder(),
new LogicalPlanRunFacetBuilder(context),
new AnalyzedLogicalPlanRunFacetBuilder(context),
new DebugRunFacetBuilder(context),
new SparkVersionFacetBuilder(context),
new SparkPropertyFacetBuilder(context),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ void testOpenlineageDisableDisablesExecution() throws URISyntaxException {
@Test
void testSparkSQLEndGetsQueryExecutionFromEvent() {
LogicalPlan query = UnresolvedRelation$.MODULE$.apply(TableIdentifier.apply("tableName"));

when(sparkSession.sparkContext()).thenReturn(sparkContext);
when(sparkContext.appName()).thenReturn("appName");
when(sparkContext.getConf()).thenReturn(new SparkConf());
Expand Down
2 changes: 1 addition & 1 deletion integration/spark/gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
jdk8.build=true
version=1.4.1
version=1.4.1-SNAPSHOT
spark.version=3.5.0
org.gradle.jvmargs=-Xmx1G
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
{
"$schema": "http://json-schema.org/schema#",
"definitions": {
"Run": {
"properties": {
"facets": {
"properties": {
"spark.analyzedLogicalPlan": {
"$ref": "#/definitions/AnalyzedLogicalPlanRunFacet"
}
}
}
}
},
"AnalyzedLogicalPlanRunFacet": {
"type": "object",
"properties": {
"type": {
"type": "string"
},
"name": {
"type": "string"
},
"children": {
"type": "array",
"items": {
"anyOf": [
{
"$ref": "#/definitions/AnalyzedLogicalPlanRunFacet"
},
{
"$ref": "#/definitions/InsertIntoHadoopFsRelationCommand"
}
]
}
}
},
"additionalProperties": true
},
"InsertIntoHadoopFsRelationCommand": {
"type": "object",
"properties":{
"outputPath": {
"type": "string"
}
},
"additionalProperties": true
},
"LogicalRelation": {
"type": "object",
"properties":{
"relation": {
"anyOf": [
{
"$ref": "#/definitions/HadoopFsRelation"
},
{
"$ref": "#/definitions/JDBCRelation"
}
]
}
},
"additionalProperties": true
},
"HadoopFsRelation": {
"type": "object",
"properties":{
"relation": {
"type": "object",
"properties": {
"location": {
"type": "string"
}
}
}
},
"additionalProperties": true
},
"JDBCRelation": {
"type": "object",
"properties":{
"relation": {
"type": "object",
"properties": {
"jdbcOptions": {
"type": "object"
}
}
}
},
"additionalProperties": true
}

}
}
Loading