Skip to content

Commit

Permalink
Publish PipelineDP4j, JVM version of PipelineDP
Browse files Browse the repository at this point in the history
PipelineDP4j is an end-to-end differential privacy solution for JVM that supports various frameworks for distributed data processing such as Apache Spark and Apache Beam. It is the JVM implementation of PipelineDP (https://pipelinedp.io/), and is conceptually similar to Privacy on Beam.

Change-Id: I84a0da8fbfd55bbbb37a7013e66c22e1f222e566
GitOrigin-RevId: f574c7e2a8601092d073377ef4afc07187697132
  • Loading branch information
Differential Privacy Team authored and RamSaw committed Oct 29, 2024
1 parent 0e99a6f commit 3328603
Show file tree
Hide file tree
Showing 105 changed files with 15,058 additions and 0 deletions.
18 changes: 18 additions & 0 deletions examples/pipelinedp4j/.bazelrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

build --java_language_version=11
build --java_runtime_version=remotejdk_11
1 change: 1 addition & 0 deletions examples/pipelinedp4j/.bazelversion
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
7.3.1
34 changes: 34 additions & 0 deletions examples/pipelinedp4j/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

java_binary(
name = "BeamExample",
srcs = [
"BeamExample.java",
"MovieMetrics.java",
"MovieView.java",
],
main_class = "com.google.privacy.differentialprivacy.pipelinedp4j.examples.BeamExample",
runtime_deps = [
"@maven//:org_apache_beam_beam_runners_direct_java",
],
deps = [
"@com_google_privacy_differentialprivacy_pipielinedp4j//main/com/google/privacy/differentialprivacy/pipelinedp4j/api",
"@maven//:com_google_guava_guava",
"@maven//:info_picocli_picocli",
"@maven//:org_apache_beam_beam_sdks_java_core",
"@maven//:org_apache_beam_beam_sdks_java_extensions_avro",
"@maven//:org_jetbrains_kotlin_kotlin_stdlib",
],
)
205 changes: 205 additions & 0 deletions examples/pipelinedp4j/BeamExample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* An end-to-end example how to compute DP metrics on a Netflix dataset using the library.
*
* <p>See README for details including how to run the example.
*/
package com.google.privacy.differentialprivacy.pipelinedp4j.examples;

import static java.lang.Math.round;
import static java.util.stream.Collectors.toCollection;

import com.google.privacy.differentialprivacy.pipelinedp4j.api.NoiseKind;
import com.google.privacy.differentialprivacy.pipelinedp4j.api.QueryBuilder;
import com.google.privacy.differentialprivacy.pipelinedp4j.api.QueryPerGroupResult;
import com.google.privacy.differentialprivacy.pipelinedp4j.api.TotalBudget;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.stream.IntStream;
import kotlin.jvm.functions.Function1;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;

/**
* An end-to-end example how to compute DP metrics on a Netflix dataset using the library on Beam.
*
* <p>See README for details including how to run the example.
*/
@Command(
name = "BeamExample",
version = {"BeamExample 1.0"},
mixinStandardHelpOptions = true)
public class BeamExample implements Runnable {
@Option(
names = "--use-public-groups",
description =
"If true we will assume in the example that movie ids are publicly known and are from "
+ "4500 to 4509"
+ ". Default is false, i.e. we will choose movie ids in a differentially"
+ " private way.",
defaultValue = "false")
private boolean usePublicGroups = false;

@Option(
names = "--local-input-file-path",
description =
"Input file. For using as input file you can download data from"
+ " https://www.kaggle.com/datasets/netflix-inc/netflix-prize-data. Use only part of"
+ " it to speed up the calculations.",
required = true)
private String localInputFilePath;

@Option(
names = "--local-output-file-path",
description = "Output file.",
defaultValue = "/tmp/anonymized_output.txt")
private String localOutputFilePath;

public static void main(String[] args) {
int exitCode = new CommandLine(new BeamExample()).execute(args);
System.exit(exitCode);
}

@Override
public void run() {
System.out.println("Starting calculations...");

var pipeline = initBeam();
// Read the input data, these are movie views that contain movie id, user id and rating.
PCollection<MovieView> data = readData(pipeline);

// Define the query
var query =
QueryBuilder.from(data, new UserIdExtractor())
.groupBy(
/* groupKeyExtractor= */ new MovieIdExtractor(),
/* maxGroupsContributed= */ 3,
/* maxContributionsPerGroup= */ 1,
usePublicGroups ? publiclyKnownMovieIds(pipeline) : null)
.count(/* outputColumnName= */ "numberOfViews")
.sum(
new RatingExtractor(),
/* minTotalValuePerPrivacyUnitInGroup= */ 1.0,
/* maxTotalValuePerPrivacyUnitInGroup= */ 5.0,
/* outputColumnName= */ "sumOfRatings",
/* budget= */ null)
.build();
// Run the query with DP parameters.
PCollection<QueryPerGroupResult> result =
query.run(new TotalBudget(/* epsilon= */ 1.1, /* delta= */ 1e-10), NoiseKind.LAPLACE);

// Convert the result to better representation, i.e. to MovieMetrics.
var movieMetricsCoder = AvroCoder.of(MovieMetrics.class);
SerializableFunction<QueryPerGroupResult, MovieMetrics> mapToMovieMetricsFn =
perGroupResult -> {
String movieId = perGroupResult.getGroupKey();
long numberOfViews = round(perGroupResult.getAggregationResults().get("numberOfViews"));
long sumOfRatings = round(perGroupResult.getAggregationResults().get("sumOfRatings"));
return new MovieMetrics(movieId, numberOfViews, sumOfRatings);
};
// We now have our anonymized metrics of movie views.
PCollection<MovieMetrics> anonymizedMovieMetrics =
result
.apply(
"Map query result to MovieMetrics",
MapElements.into(movieMetricsCoder.getEncodedTypeDescriptor())
.via(mapToMovieMetricsFn))
.setCoder(movieMetricsCoder);

// Save the result to a file.
writeOutput(anonymizedMovieMetrics);

// Run the scheduled calculations in the pipeline.
pipeline.run().waitUntilFinish();
System.out.println("Finished calculations.");
}

// Data extractors. They always have to implement Function1 and Serializable interfaces. If it
// doesn't implement Serializable interface, it will fail on Beam. If it doesn't implement
// Function1, it will at compile time due to types mismatch. Do not use lambdas for data
// extractors as they won't be serializable.
static class UserIdExtractor implements Function1<MovieView, String>, Serializable {
@Override
public String invoke(MovieView movieView) {
return movieView.getUserId();
}
}

static class MovieIdExtractor implements Function1<MovieView, String>, Serializable {
@Override
public String invoke(MovieView movieView) {
return movieView.getMovieId();
}
}

static class RatingExtractor implements Function1<MovieView, Double>, Serializable {
@Override
public Double invoke(MovieView movieView) {
return movieView.getRating();
}
}

private static Pipeline initBeam() {
var options = PipelineOptionsFactory.create();
return Pipeline.create(options);
}

private PCollection<MovieView> readData(Pipeline pipeline) {
PCollection<String> inputPCollection =
pipeline.apply("Read input", TextIO.read().from(localInputFilePath));
var coder = AvroCoder.of(MovieView.class);
SerializableFunction<String, MovieView> parseFunction = MovieView::parseView;
return inputPCollection
.apply("Parse input", MapElements.into(coder.getEncodedTypeDescriptor()).via(parseFunction))
.setCoder(coder);
}

/**
* Movie ids (which are group keys for this dataset) are integers from 1 to ~17000. Set public
* groups 1-10.
*/
private static PCollection<String> publiclyKnownMovieIds(Pipeline pipeline) {
var publicGroupsAsJavaList =
IntStream.rangeClosed(
4500, 4509
)
.mapToObj(Integer::toString)
.collect(toCollection(ArrayList::new));
return pipeline.apply("Create public groups", Create.of(publicGroupsAsJavaList));
}

private void writeOutput(PCollection<MovieMetrics> result) {
SerializableFunction<MovieMetrics, String> toStringFunction = MovieMetrics::toString;
var lines =
result.apply(
"Map MovieMetrics to string",
MapElements.into(StringUtf8Coder.of().getEncodedTypeDescriptor())
.via(toStringFunction));
lines.apply("Write output to file", TextIO.write().withoutSharding().to(localOutputFilePath));
}
}
45 changes: 45 additions & 0 deletions examples/pipelinedp4j/MovieMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.privacy.differentialprivacy.pipelinedp4j.examples;

/**
* Metrics for a single movie in the Netflix dataset.
*
* <p>It is the result of the DP metrics query.
*/
final class MovieMetrics {
private final String movieId;
private final long numberOfViews;
private final long sumOfRatings;

MovieMetrics(String movieId, long numberOfViews, long sumOfRatings) {
this.movieId = movieId;
this.numberOfViews = numberOfViews;
this.sumOfRatings = sumOfRatings;
}

// 0-arg constructor is necessary for serialization to work.
private MovieMetrics() {
this("", 0, 0);
}

@Override
public String toString() {
return String.format(
"movieId=%s, numberOfViews=%s, sumOfRatings=%s", movieId, numberOfViews, sumOfRatings);
}
}
55 changes: 55 additions & 0 deletions examples/pipelinedp4j/MovieView.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.privacy.differentialprivacy.pipelinedp4j.examples;

import com.google.common.base.Splitter;
import java.util.List;

/** Represents a single movie view from the Netflix dataset. */
final class MovieView {
private final String userId;
private final String movieId;
private final Double rating;

MovieView(String userId, String movieId, Double rating) {
this.userId = userId;
this.movieId = movieId;
this.rating = rating;
}

// 0-arg constructor is necessary for serialization to work.
private MovieView() {
this("", "", 0.0);
}

String getUserId() {
return userId;
}

String getMovieId() {
return movieId;
}

Double getRating() {
return rating;
}

static MovieView parseView(String s) {
List<String> spl = Splitter.on(',').splitToList(s);
return new MovieView(spl.get(1), spl.get(0), Double.parseDouble(spl.get(2)));
}
}
Loading

0 comments on commit 3328603

Please sign in to comment.