Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update gcp operators #1818

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions digdag-docs/src/operators/bq.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
**bq>** operator runs a query on Google BigQuery.

_export:
gcp:
project: my_project_id
bq:
dataset: my_dataset

Expand Down Expand Up @@ -56,6 +58,17 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag.
location: asia-northeast1
```

* **gcp.project**: NAME

Specifies the default Google Cloud project to use in the query and in the `destination_table` parameter.

Examples:

```
gcp:
project: my_project_id
```

* **dataset**: NAME

Specifies the default dataset to use in the query and in the `destination_table` parameter.
Expand Down Expand Up @@ -167,6 +180,70 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag.
Describes user-defined function resources used in the query. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.userDefinedFunctionResources).


* **clustering**: OBJECT

Clustering specification for the destination table. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Clustering).

Examples:

```yaml
clustering:
fields:
- field1
```

* **encryption_configuration**: OBJECT

Custom encryption configuration. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/EncryptionConfiguration).

Examples:

```yaml
encryption_configuration:
kmsKeyName: key_name
```

* **maximum_bytes_billed**: LONG

Limits the bytes billed for this job. Queries that will have bytes billed beyond this limit will fail (without incurring a charge). If unspecified, this will be set to your project default. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationquery).

* **schema_update_options**: LIST

Allows the schema of the destination table to be updated as a side effect of the query job. Schema update options are supported in two cases: when writeDisposition is WRITE_APPEND; when writeDisposition is WRITE_TRUNCATE and the destination table is a partition of a table, specified by partition decorators. For normal tables, WRITE_TRUNCATE will always overwrite the schema. One or more of the following values are specified:
- ALLOW_FIELD_ADDITION: allow adding a nullable field to the schema.
- ALLOW_FIELD_RELAXATION: allow relaxing a required field in the original schema to nullable.

For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationquery).

* **range_partitioning**: OBJECT

Range partitioning specification for the destination table. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#RangePartitioning).

Examples:

```yaml
range_partitioning:
field: id
range:
start: 0
interval: 10
end: 100
```

* **time_partitioning**: OBJECT

Time-based partitioning specification for the destination table. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TimePartitioning).

Examples:

```yaml
time_partitioning:
type: DAY
field: date
requirePartitionFilter: true

```

## Output parameters

* **bq.last_job_id**
Expand Down
101 changes: 100 additions & 1 deletion digdag-docs/src/operators/bq_load.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,17 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag.
location: asia-northeast1
```

* **project**: NAME
* **gcp.project**: NAME

The project that the table is located in or should be created in. Can also be specified directly in the table reference or the dataset parameter.

Examples:

```
gcp:
project: my_project_id
```

* **source_format**: CSV | NEWLINE_DELIMITED_JSON | AVRO | DATASTORE_BACKUP

The format of the files to be imported. *Default*: `CSV`.
Expand Down Expand Up @@ -283,6 +290,98 @@ When you set those parameters, use [digdag secrets command](https://docs.digdag.
# schema: path/to/schema.yml
```

* **clustering**: OBJECT

Clustering specification for the destination table. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Clustering).

Examples:

```yaml
clustering:
fields:
- field1
```

* **decimal_target_types**: STRING

Defines the list of possible SQL data types to which the source decimal values are converted. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#decimaltargettype).

* **encryption_configuration**: OBJECT

Custom encryption configuration. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/EncryptionConfiguration).

Examples:

```yaml
encryption_configuration:
kmsKeyName: key_name
```

* **hive_partitioning_options**: OBJECT

Options for configuring hive partitioning detect. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#hivepartitioningoptions).

Examples:

```yaml
hive_partitioning_options:
mode: AUTO
sourceUriPrefix: gs://my_bucket/path
```

* **json_extension**: STRING

If sourceFormat is set to newline-delimited JSON, indicates whether it should be processed as a JSON variant such as GeoJSON. For a sourceFormat other than JSON, omit this field. If the sourceFormat is newline-delimited JSON: - for newline-delimited GeoJSON: set to GEOJSON.

* **null_marker**: STRING

Specifies a string that represents a null value in a CSV file. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload).

* **parquet_options**: OBJECT

Parquet Options for load and make external tables. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#parquetoptions).

Examples:

```yaml
parquet_options:
enableListInference: true
enumAsString: true
```

* **use_avro_logical_types**: BOOLEAN

If sourceFormat is set to "AVRO", indicates whether to interpret logical types as the corresponding BigQuery data type (for example, TIMESTAMP), instead of using the raw type (for example, INTEGER). For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload).

* **range_partitioning**: OBJECT

Range partitioning specification for the destination table. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#RangePartitioning).

Examples:

```yaml
range_partitioning:
field: id
range:
start: 0
interval: 10
end: 100
```

* **time_partitioning**: OBJECT

Time-based partitioning specification for the destination table. For more information see [BigQuery documentation](https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TimePartitioning).

Examples:

```yaml
time_partitioning:
type: DAY
field: date
requirePartitionFilter: true

```

## Output parameters

* **bq.last_job_id**
Expand Down
2 changes: 1 addition & 1 deletion digdag-standards/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ dependencies {
compile "com.amazonaws:aws-java-sdk-dynamodb:${project.ext.awsJavaSdkVersion}"

// bigquery
compile ('com.google.apis:google-api-services-bigquery:v2-rev325-1.22.0') { exclude group: 'com.google.guava', module: 'guava-jdk5' }
compile('com.google.apis:google-api-services-bigquery:v2-rev20230520-2.0.0') { exclude group: 'com.google.guava', module: 'guava-jdk5' }

// gcs
compile ('com.google.apis:google-api-services-storage:v1-rev20190910-1.30.3') {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.RangePartitioning;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.common.base.Optional;
import io.digdag.client.config.Config;
import io.digdag.client.config.ConfigFactory;
Expand Down Expand Up @@ -43,4 +45,33 @@ private TaskResult result(Job job)
}

protected abstract JobConfiguration jobConfiguration(String projectId);

protected RangePartitioning rangePartitioning(Config params) {
Config rangeParams = params.getNested("range");
RangePartitioning.Range range = new RangePartitioning.Range();
range.setStart(rangeParams.get("start", Long.class))
.setEnd(rangeParams.get("end", Long.class))
.setInterval(rangeParams.get("interval", Long.class));

RangePartitioning rPart = new RangePartitioning();
rPart.setField(params.get("field", String.class))
.setRange(range);

return rPart;
}

protected TimePartitioning timePartitioning(Config params) {
TimePartitioning tPart = new TimePartitioning();
// required fields
tPart.setType(params.get("type", String.class));

// optional fields
params.getOptional("field", String.class).transform(tPart::setField);
params.getOptional("requirePartitionFilter", Boolean.class).transform(tPart::setRequirePartitionFilter);
if (params.has("expirationMs")) {
tPart.setExpirationMs(params.get("expirationMs", Long.class));
}

return tPart;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ public TaskResult runTask()

private String projectId(GcpCredential credential)
{
Optional<String> projectId = context.getSecrets().getSecretOptional("gcp.project")
Optional<String> projectId = request.getConfig().getNestedOrGetEmpty("gcp")
.getOptional("project", String.class)
.or(context.getSecrets().getSecretOptional("gcp.project"))
.or(credential.projectId());
if (!projectId.isPresent()) {
throw new TaskExecutionException("Missing 'gcp.project' secret");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

class Bq
{
private static final Pattern TABLE_REFERENCE_PATTERN = Pattern.compile("^(?:(?<project>[^:]+):)?(?:(?<dataset>[^.]+)\\.)?(?<table>[a-zA-Z0-9_]{1,1024}(?:\\$[0-9]{8})?)$");
private static final Pattern TABLE_REFERENCE_PATTERN = Pattern.compile(
"^(?:(?<project>[^:]+):)?(?:(?<dataset>[^.]+)\\.)?(?<table>[a-zA-Z0-9_]{1,1024}(?:\\$[0-9]{4,10})?)$");

static TableReference tableReference(String defaultProjectId, Optional<DatasetReference> defaultDataset, String s)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package io.digdag.standards.operator.gcp;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.api.services.bigquery.model.Clustering;
import com.google.api.services.bigquery.model.DatasetReference;
import com.google.api.services.bigquery.model.EncryptionConfiguration;
import com.google.api.services.bigquery.model.HivePartitioningOptions;
import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.ParquetOptions;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -99,6 +102,22 @@ protected JobConfiguration jobConfiguration(String projectId)
Optional.of(params.getListOrEmpty("projection_fields", String.class)).transform(cfg::setProjectionFields);
params.getOptional("autodetect", boolean.class).transform(cfg::setAutodetect);
Optional.of(params.getListOrEmpty("schema_update_options", String.class)).transform(cfg::setSchemaUpdateOptions);
params.getOptional("clustering", Clustering.class).transform(cfg::setClustering);
Optional.of(params.getListOrEmpty("decimal_target_types", String.class)).transform(cfg::setDecimalTargetTypes);
params.getOptional("encryption_configuration", EncryptionConfiguration.class).transform(cfg::setDestinationEncryptionConfiguration);
params.getOptional("hive_partitioning_options", HivePartitioningOptions.class).transform(cfg::setHivePartitioningOptions);
params.getOptional("json_extension", String.class).transform(cfg::setJsonExtension);
params.getOptional("null_marker", String.class).transform(cfg::setNullMarker);
params.getOptional("parquet_options", ParquetOptions.class).transform(cfg::setParquetOptions);
params.getOptional("use_avro_logical_types", boolean.class).transform(cfg::setUseAvroLogicalTypes);

if (params.has("range_partitioning")) {
cfg.setRangePartitioning(rangePartitioning(params.getNested("range_partitioning")));
}

if (params.has("time_partitioning")) {
cfg.setTimePartitioning(timePartitioning(params.getNested("time_partitioning")));
}

return new JobConfiguration()
.setLoad(cfg);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.digdag.standards.operator.gcp;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.api.services.bigquery.model.Clustering;
import com.google.api.services.bigquery.model.DatasetReference;
import com.google.api.services.bigquery.model.EncryptionConfiguration;
import com.google.api.services.bigquery.model.ExternalDataConfiguration;
import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
Expand Down Expand Up @@ -86,6 +88,18 @@ protected JobConfiguration jobConfiguration(String projectId)

params.getOptional("destination_table", String.class)
.transform(s -> cfg.setDestinationTable(tableReference(projectId, defaultDataset, s)));
params.getOptional("clustering", Clustering.class).transform(cfg::setClustering);
params.getOptional("encryption_configuration", EncryptionConfiguration.class).transform(cfg::setDestinationEncryptionConfiguration);
params.getOptional("maximum_bytes_billed", Long.class).transform(cfg::setMaximumBytesBilled);
Optional.of(params.getListOrEmpty("schema_update_options", String.class)).transform(cfg::setSchemaUpdateOptions);

if (params.has("range_partitioning")) {
cfg.setRangePartitioning(rangePartitioning(params.getNested("range_partitioning")));
}

if (params.has("time_partitioning")) {
cfg.setTimePartitioning(timePartitioning(params.getNested("time_partitioning")));
}

return new JobConfiguration()
.setQuery(cfg);
Expand Down
Loading