From f97a5caffcee58a3f79010f5582923ad17c568c5 Mon Sep 17 00:00:00 2001 From: Israel Herraiz Date: Sat, 21 Dec 2024 16:06:48 +0100 Subject: [PATCH] Remove spaces in transform names --- .../google/cloud/pso/pipelines/TaxiSessionsPipeline.java | 8 ++++---- .../java/com/google/cloud/pso/transforms/Session.java | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/google/cloud/pso/pipelines/TaxiSessionsPipeline.java b/src/main/java/com/google/cloud/pso/pipelines/TaxiSessionsPipeline.java index d23e047..edebfcc 100644 --- a/src/main/java/com/google/cloud/pso/pipelines/TaxiSessionsPipeline.java +++ b/src/main/java/com/google/cloud/pso/pipelines/TaxiSessionsPipeline.java @@ -61,12 +61,12 @@ public static Pipeline createPipeline(TaxiSessionsOptions opts) { // triggers, etc). PCollection rides = pipeline.apply( - "Read rides", + "ReadRides", PubsubIO.readStrings() .fromSubscription(opts.getRideEventsSubscription()) .withTimestampAttribute("ts")); - PCollectionTuple parsed = rides.apply("Parse JSON", Parser.TaxiEventParser.parseJson()); + PCollectionTuple parsed = rides.apply("ParseJSON", Parser.TaxiEventParser.parseJson()); PCollection rideEvents = parsed.get(Parser.TaxiEventParser.TAXI_EVENT_TAG); PCollection parsingErrors = parsed.get(Parser.TaxiEventParser.ERROR_TAG); @@ -79,7 +79,7 @@ public static Pipeline createPipeline(TaxiSessionsOptions opts) { .build()); parsingErrors.apply( - "Append errors to BQ", + "AppendErrorsToBQ", BigQueryIO.write() .to(errorsTable) .useBeamSchema() @@ -100,7 +100,7 @@ public static Pipeline createPipeline(TaxiSessionsOptions opts) { (ValueInSingleWindow s) -> rideSessionTDestination(s, project, dataset); rideSessions.apply( - "Upsert sessions in BQ", + "UpsertSessionsBQ", BigQueryIO.write() .to(tableFunc) .useBeamSchema() diff --git a/src/main/java/com/google/cloud/pso/transforms/Session.java b/src/main/java/com/google/cloud/pso/transforms/Session.java index 985d83d..3e8b83f 100644 --- a/src/main/java/com/google/cloud/pso/transforms/Session.java +++ b/src/main/java/com/google/cloud/pso/transforms/Session.java @@ -72,7 +72,7 @@ public PCollection expand(PCollection rideEvents) { // Add keys to rideEvents PCollection> withKeys = rideEvents.apply( - "Add keys", WithKeys.of(RideEvent::getRideId).withKeyType(TypeDescriptors.strings())); + "AddKeys", WithKeys.of(RideEvent::getRideId).withKeyType(TypeDescriptors.strings())); // Use a late trigger only if late data wait is not zero Trigger sessionTrigger; @@ -99,9 +99,9 @@ public PCollection expand(PCollection rideEvents) { .withAllowedLateness(Duration.standardSeconds(lateDataWaitSeconds()))); PCollection> accumulated = - sessions.apply("Combine events", Combine.perKey(new SessionPropertiesCombinerFn())); + sessions.apply("CombineEvents", Combine.perKey(new SessionPropertiesCombinerFn())); - return accumulated.apply("Session properties", ParDo.of(new SessionPropertiesDoFn())); + return accumulated.apply("SessionProperties", ParDo.of(new SessionPropertiesDoFn())); } }