Skip to content

Commit

Permalink
feat(java-emitter): improvements to builder API-s, moving spark-linea…
Browse files Browse the repository at this point in the history
…ge under metadata-integration/java (datahub-project#3819)
  • Loading branch information
swaroopjagadish authored Jan 4, 2022
1 parent 40b6734 commit 3e234a9
Show file tree
Hide file tree
Showing 39 changed files with 278 additions and 283 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ jobs:
# running build first without datahub-web-react:yarnBuild and then with it is 100% stable
# datahub-frontend:unzipAssets depends on datahub-web-react:yarnBuild but gradle does not know about it
run: |
./gradlew build -x :metadata-ingestion:build -x :metadata-ingestion:check -x docs-website:build -x datahub-web-react:yarnBuild -x datahub-frontend:unzipAssets
./gradlew build -x :metadata-ingestion:build -x :metadata-ingestion:check -x docs-website:build
./gradlew :metadata-integration:java:spark-lineage:build
./gradlew build -x :metadata-ingestion:build -x :metadata-ingestion:check -x docs-website:build -x datahub-web-react:yarnBuild -x datahub-frontend:unzipAssets -x :metadata-integration:java:spark-lineage:test
./gradlew build -x :metadata-ingestion:build -x :metadata-ingestion:check -x docs-website:build -x :metadata-integration:java:spark-lineage:test
- uses: actions/upload-artifact@v2
if: always()
with:
Expand Down
2 changes: 1 addition & 1 deletion docs-website/sidebars.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ module.exports = {
"docs/lineage/airflow",
"docker/airflow/local_airflow",
"docs/lineage/sample_code",
"spark-lineage/README",
"metadata-integration/java/spark-lineage/README",
],
},
{
Expand Down
2 changes: 2 additions & 0 deletions lombok.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
config.stopBubbling = true
lombok.addLombokGeneratedAnnotation = true
4 changes: 2 additions & 2 deletions metadata-integration/java/as-a-library.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ RestEmitter emitter = RestEmitter.createWithDefaults();

MetadataChangeProposalWrapper mcpw = MetadataChangeProposalWrapper.builder()
.entityType("dataset")
.changeType(ChangeType.UPSERT)
.aspect(new DatasetProperties().setDescription("This is the canonical User profile dataset"))
.entityUrn("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-project.my-dataset.user-table,PROD)")
.upsert()
.aspect(new DatasetProperties().setDescription("This is the canonical User profile dataset"))
.build();

// Blocking call using future
Expand Down
6 changes: 6 additions & 0 deletions metadata-integration/java/datahub-client/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
apply plugin: 'java'
apply plugin: 'com.github.johnrengelman.shadow'
apply plugin: 'jacoco'

dependencies {

Expand All @@ -13,8 +14,13 @@ dependencies {
testCompile externalDependency.mockServerClient
}

jacocoTestReport {
dependsOn test // tests are required to run before generating the report
}

test {
useJUnit()
finalizedBy jacocoTestReport
}

shadowJar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,93 +6,138 @@
import com.linkedin.mxe.MetadataChangeProposal;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import lombok.Builder;
import lombok.Data;
import lombok.NonNull;
import lombok.AllArgsConstructor;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;


/**
* A class that makes it easy to create new {@link MetadataChangeProposal} events
* @param <T>
*/
@Data
@Builder
@Value
@Slf4j
@AllArgsConstructor
public class MetadataChangeProposalWrapper<T extends DataTemplate> {

@NonNull
String entityType;
@NonNull
String entityUrn;
@Builder.Default
ChangeType changeType = ChangeType.UPSERT;
ChangeType changeType;
T aspect;
String aspectName;

/**
* Validates that this class is well formed.
* Mutates the class to auto-fill
* @throws EventValidationException is the event is not valid
*/
protected static void validate(MetadataChangeProposalWrapper mcpw) throws EventValidationException {
try {
Urn.createFromString(mcpw.entityUrn);
} catch (URISyntaxException uie) {
throw new EventValidationException("Failed to parse a valid entity urn", uie);
public interface EntityTypeStepBuilder {
EntityUrnStepBuilder entityType(String entityType);
}

public interface EntityUrnStepBuilder {
ChangeStepBuilder entityUrn(String entityUrn);

ChangeStepBuilder entityUrn(Urn entityUrn);
}

public interface ChangeStepBuilder {
AspectStepBuilder upsert();
}

public interface AspectStepBuilder {
Build aspect(DataTemplate aspect);
}

public interface Build {
MetadataChangeProposalWrapper build();

Build aspectName(String aspectName);
}

public static class MetadataChangeProposalWrapperBuilder
implements EntityUrnStepBuilder, EntityTypeStepBuilder, ChangeStepBuilder, AspectStepBuilder, Build {

private String entityUrn;
private String entityType;
private ChangeType changeType;
private String aspectName;
private DataTemplate aspect;

@Override
public EntityUrnStepBuilder entityType(String entityType) {
Objects.requireNonNull(entityType, "entityType cannot be null");
this.entityType = entityType;
return this;
}

@Override
public ChangeStepBuilder entityUrn(String entityUrn) {
Objects.requireNonNull(entityUrn, "entityUrn cannot be null");
try {
Urn.createFromString(entityUrn);
} catch (URISyntaxException uie) {
throw new EventValidationException("Failed to parse a valid entity urn", uie);
}
this.entityUrn = entityUrn;
return this;
}

if (mcpw.getAspect() != null && mcpw.getAspectName() == null) {
@Override
public ChangeStepBuilder entityUrn(Urn entityUrn) {
Objects.requireNonNull(entityUrn, "entityUrn cannot be null");
this.entityUrn = entityUrn.toString();
return this;
}

@Override
public AspectStepBuilder upsert() {
this.changeType = ChangeType.UPSERT;
return this;
}

@Override
public Build aspect(DataTemplate aspect) {
this.aspect = aspect;
// Try to guess the aspect name from the aspect
Map<String, Object> schemaProps = mcpw.getAspect().schema().getProperties();
Map<String, Object> schemaProps = aspect.schema().getProperties();
if (schemaProps != null && schemaProps.containsKey("Aspect")) {
Object aspectProps = schemaProps.get("Aspect");
if (aspectProps != null && aspectProps instanceof Map) {
Map aspectMap = (Map) aspectProps;
String aspectName = (String) aspectMap.get("name");
mcpw.setAspectName(aspectName);
this.aspectName = aspectName;
log.debug("Inferring aspectName as {}", aspectName);
}
}
if (mcpw.getAspectName() == null) {
throw new EventValidationException("Aspect name was null and could not be inferred.");
if (this.aspectName == null) {
log.warn("Could not infer aspect name from aspect");
}
return this;
}
if (mcpw.getChangeType() != ChangeType.UPSERT) {
throw new EventValidationException("Change type other than UPSERT is not supported at this time. Supplied " + mcpw.getChangeType());
}
if (mcpw.getChangeType() == ChangeType.UPSERT && mcpw.getAspect() == null) {
throw new EventValidationException("Aspect cannot be null if ChangeType is UPSERT");
}

}


public static MetadataChangeProposalWrapper create(Consumer<ValidatingMCPWBuilder> builderConsumer) {
return new ValidatingMCPWBuilder().with(builderConsumer).build();
}


public static MetadataChangeProposalWrapperBuilder builder() {
return new ValidatingMCPWBuilder();
}

public static class ValidatingMCPWBuilder extends MetadataChangeProposalWrapperBuilder {

@Override
public MetadataChangeProposalWrapper build() {
MetadataChangeProposalWrapper mcpw = super.build();
validate(mcpw);
return mcpw;
try {
Objects.requireNonNull(this.aspectName,
"aspectName could not be inferred from provided aspect and was not explicitly provided as an override");
return new MetadataChangeProposalWrapper(entityType, entityUrn, changeType, aspect, aspectName);
} catch (Exception e) {
throw new EventValidationException("Failed to create a metadata change proposal event", e);
}
}

public ValidatingMCPWBuilder with(Consumer<ValidatingMCPWBuilder> builderConsumer) {
builderConsumer.accept(this);
@Override
public Build aspectName(String aspectName) {
this.aspectName = aspectName;
return this;
}
}


public static MetadataChangeProposalWrapper create(Consumer<EntityTypeStepBuilder> builderConsumer) {
MetadataChangeProposalWrapperBuilder builder = new MetadataChangeProposalWrapperBuilder();
builderConsumer.accept(builder);
return builder.build();
}

public static EntityTypeStepBuilder builder() {
return new MetadataChangeProposalWrapperBuilder();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.dataset.DatasetProperties;
import com.linkedin.events.metadata.ChangeType;
import datahub.client.Callback;
import datahub.client.MetadataWriteResponse;
import datahub.event.MetadataChangeProposalWrapper;
Expand Down Expand Up @@ -97,9 +96,9 @@ public void testExceptions() throws URISyntaxException, IOException, ExecutionEx
RestEmitter emitter = RestEmitter.create($ -> $.asyncHttpClientBuilder(mockHttpClientFactory));

MetadataChangeProposalWrapper mcp = MetadataChangeProposalWrapper.create(b -> b.entityType("dataset")
.changeType(ChangeType.UPSERT)
.aspect(new DatasetProperties().setDescription("Test Dataset"))
.entityUrn("urn:li:dataset:(urn:li:dataPlatform:hive,foo.bar,PROD)"));
.entityUrn("urn:li:dataset:(urn:li:dataPlatform:hive,foo.bar,PROD)")
.upsert()
.aspect(new DatasetProperties().setDescription("Test Dataset")));

Future<HttpResponse> mockFuture = Mockito.mock(Future.class);
Mockito.when(mockClient.execute(Mockito.any(), Mockito.any())).thenReturn(mockFuture);
Expand All @@ -117,7 +116,7 @@ public void testExtraHeaders() throws Exception {
RestEmitter emitter = RestEmitter.create(b -> b.asyncHttpClientBuilder(mockHttpClientFactory)
.extraHeaders(Collections.singletonMap("Test-Header", "Test-Value")));
MetadataChangeProposalWrapper mcpw = MetadataChangeProposalWrapper.create(
b -> b.entityType("dataset").entityUrn("urn:li:dataset:foo").aspect(new DatasetProperties()));
b -> b.entityType("dataset").entityUrn("urn:li:dataset:foo").upsert().aspect(new DatasetProperties()));
Future<HttpResponse> mockFuture = Mockito.mock(Future.class);
Mockito.when(mockClient.execute(Mockito.any(), Mockito.any())).thenReturn(mockFuture);
emitter.emit(mcpw, null);
Expand Down Expand Up @@ -210,9 +209,9 @@ public void multithreadedTestExecutors() throws Exception {
private MetadataChangeProposalWrapper getMetadataChangeProposalWrapper(String description, String entityUrn) {
return MetadataChangeProposalWrapper.builder()
.entityType("dataset")
.changeType(ChangeType.UPSERT)
.aspect(new DatasetProperties().setDescription(description))
.entityUrn(entityUrn)
.upsert()
.aspect(new DatasetProperties().setDescription(description))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ public void testPartialMCPW() throws URISyntaxException, IOException, EventValid
MetadataChangeProposalWrapper metadataChangeProposalWrapper = MetadataChangeProposalWrapper.builder()
.entityType("dataset")
.entityUrn("urn:li:foo")
.upsert()
.aspect(new DatasetProperties().setDescription("A test dataset"))
.build();
MetadataChangeProposalWrapper.validate(metadataChangeProposalWrapper);
EventFormatter eventFormatter = new EventFormatter();
MetadataChangeProposal mcp = eventFormatter.convert(metadataChangeProposalWrapper);
Assert.assertEquals(mcp.getAspect().getContentType(), "application/json");
Expand Down
Loading

0 comments on commit 3e234a9

Please sign in to comment.