Skip to content

Commit

Permalink
Remove spaces in transform names
Browse files Browse the repository at this point in the history
  • Loading branch information
iht committed Dec 21, 2024
1 parent 1b2a413 commit f97a5ca
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ public static Pipeline createPipeline(TaxiSessionsOptions opts) {
// triggers, etc).
PCollection<String> rides =
pipeline.apply(
"Read rides",
"ReadRides",
PubsubIO.readStrings()
.fromSubscription(opts.getRideEventsSubscription())
.withTimestampAttribute("ts"));

PCollectionTuple parsed = rides.apply("Parse JSON", Parser.TaxiEventParser.parseJson());
PCollectionTuple parsed = rides.apply("ParseJSON", Parser.TaxiEventParser.parseJson());
PCollection<RideEvent> rideEvents = parsed.get(Parser.TaxiEventParser.TAXI_EVENT_TAG);
PCollection<ParsingError> parsingErrors = parsed.get(Parser.TaxiEventParser.ERROR_TAG);

Expand All @@ -79,7 +79,7 @@ public static Pipeline createPipeline(TaxiSessionsOptions opts) {
.build());

parsingErrors.apply(
"Append errors to BQ",
"AppendErrorsToBQ",
BigQueryIO.<ParsingError>write()
.to(errorsTable)
.useBeamSchema()
Expand All @@ -100,7 +100,7 @@ public static Pipeline createPipeline(TaxiSessionsOptions opts) {
(ValueInSingleWindow<RideSession> s) -> rideSessionTDestination(s, project, dataset);

rideSessions.apply(
"Upsert sessions in BQ",
"UpsertSessionsBQ",
BigQueryIO.<RideSession>write()
.to(tableFunc)
.useBeamSchema()
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/google/cloud/pso/transforms/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public PCollection<RideSession> expand(PCollection<RideEvent> rideEvents) {
// Add keys to rideEvents
PCollection<KV<String, RideEvent>> withKeys =
rideEvents.apply(
"Add keys", WithKeys.of(RideEvent::getRideId).withKeyType(TypeDescriptors.strings()));
"AddKeys", WithKeys.of(RideEvent::getRideId).withKeyType(TypeDescriptors.strings()));

// Use a late trigger only if late data wait is not zero
Trigger sessionTrigger;
Expand All @@ -99,9 +99,9 @@ public PCollection<RideSession> expand(PCollection<RideEvent> rideEvents) {
.withAllowedLateness(Duration.standardSeconds(lateDataWaitSeconds())));

PCollection<KV<String, RideAccumulator>> accumulated =
sessions.apply("Combine events", Combine.perKey(new SessionPropertiesCombinerFn()));
sessions.apply("CombineEvents", Combine.perKey(new SessionPropertiesCombinerFn()));

return accumulated.apply("Session properties", ParDo.of(new SessionPropertiesDoFn()));
return accumulated.apply("SessionProperties", ParDo.of(new SessionPropertiesDoFn()));
}
}

Expand Down

0 comments on commit f97a5ca

Please sign in to comment.