diff --git a/plugins/extractors/bigquery/README.md b/plugins/extractors/bigquery/README.md index 64c88aa8..34a2380e 100644 --- a/plugins/extractors/bigquery/README.md +++ b/plugins/extractors/bigquery/README.md @@ -8,6 +8,7 @@ source: config: project_id: google-project-id table_pattern: gofood.fact_ + max_preview_rows: 3 exclude: datasets: - dataset_a @@ -17,7 +18,7 @@ source: max_page_size: 100 profile_column: true build_view_lineage: true - # Only one of service_account_base64 / service_account_json is needed. + # Only one of service_account_base64 / service_account_json is needed. # If both are present, service_account_base64 takes precedence service_account_base64: _________BASE64_ENCODED_SERVICE_ACCOUNT_________________ service_account_json: @@ -30,7 +31,7 @@ source: "auth_uri": "https://accounts.google.com/o/oauth2/auth", "token_uri": "https://oauth2.googleapis.com/token", "auth_provider_x509_cert_url": "xxxxxxx", - "client_x509_cert_url": "xxxxxxx" + "client_x509_cert_url": "xxxxxxx", } collect_table_usage: false usage_period_in_day: 7 @@ -41,61 +42,61 @@ source: ## Inputs -| Key | Value | Example | Description | | -| :-- | :---- | :------ | :---------- | :-- | -| `project_id` | `string` | `my-project` | BigQuery Project ID | *required* | -| `service_account_base64` | `string` | `____BASE64_ENCODED_SERVICE_ACCOUNT____` | Service Account in base64 encoded string. Takes precedence over `service_account_json` value | *optional* | -| `service_account_json` | `string` | `{"private_key": .., "private_id": ...}` | Service Account in JSON string | *optional* | -| `table_pattern` | `string` | `gofood.fact_` | Regex pattern to filter which bigquery table to scan (whitelist) | *optional* | -| `max_page_size` | `int` | `100` | max page size hint used for fetching datasets/tables/rows from bigquery | *optional* | -| `include_column_profile` | `bool` | `true` | true if you want to profile the column value such min, max, med, avg, top, and freq | *optional* | -| `max_preview_rows` | `int` | `30` | max number of preview rows to fetch, `0` will skip preview fetching. Default to `30`. | *optional* | -| `mix_values` | `bool` | `false` | true if you want to mix the column values with the preview rows. Default to `false`. | *optional* | -| `collect_table_usage` | `boolean` | `false` | toggle feature to collect table usage, `true` will enable collecting table usage. Default to `false`. | *optional* | -| `usage_period_in_day` | `int` | `7` | collecting log from `(now - usage_period_in_day)` until `now`. only matter if `collect_table_usage` is true. Default to `7`. | *optional* | -| `usage_project_ids` | `[]string` | `[google-project-id, other-google-project-id]` | collecting log from defined GCP Project IDs. Default to BigQuery Project ID. | *optional* | - -### *Notes* +| Key | Value | Example | Description | | +| :----------------------- | :--------- | :--------------------------------------------- | :----------------------------------------------------------------------------------------------------------------------------------------------- | :--------- | +| `project_id` | `string` | `my-project` | BigQuery Project ID | _required_ | +| `service_account_base64` | `string` | `____BASE64_ENCODED_SERVICE_ACCOUNT____` | Service Account in base64 encoded string. Takes precedence over `service_account_json` value | _optional_ | +| `service_account_json` | `string` | `{"private_key": .., "private_id": ...}` | Service Account in JSON string | _optional_ | +| `table_pattern` | `string` | `gofood.fact_` | Regex pattern to filter which bigquery table to scan (whitelist) | _optional_ | +| `max_page_size` | `int` | `100` | max page size hint used for fetching datasets/tables/rows from bigquery | _optional_ | +| `include_column_profile` | `bool` | `true` | true if you want to profile the column value such min, max, med, avg, top, and freq | _optional_ | +| `max_preview_rows` | `int` | `30` | max number of preview rows to fetch, `0` will skip preview fetching, `-1` will restrict adding preview_rows key in asset data . Default to `30`. | _optional_ | +| `mix_values` | `bool` | `false` | true if you want to mix the column values with the preview rows. Default to `false`. | _optional_ | +| `collect_table_usage` | `boolean` | `false` | toggle feature to collect table usage, `true` will enable collecting table usage. Default to `false`. | _optional_ | +| `usage_period_in_day` | `int` | `7` | collecting log from `(now - usage_period_in_day)` until `now`. only matter if `collect_table_usage` is true. Default to `7`. | _optional_ | +| `usage_project_ids` | `[]string` | `[google-project-id, other-google-project-id]` | collecting log from defined GCP Project IDs. Default to BigQuery Project ID. | _optional_ | + +### _Notes_ - Leaving `service_account_json` and `service_account_base64` blank will default to [Google's default authentication][google-default-auth]. It is recommended if Meteor instance runs inside the same Google Cloud environment as the BigQuery project. -- Service account needs to have `bigquery.privateLogsViewer` role to be able to collect bigquery audit logs +- Service account needs to have `bigquery.privateLogsViewer` role to be able to collect bigquery audit logs. +- Setting `max_preview_rows` to `-1` will restrict adding preview_rows key in asset data ## Outputs -| Field | Sample Value | Description | -|:-------------------------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:--------------------------------------------------------| -| `resource.urn` | `project_id.dataset_name.table_name` | | -| `resource.name` | `table_name` | | -| `resource.service` | `bigquery` | | -| `description` | `table description` | | -| `profile.total_rows` | `2100` | | -| `profile.usage_count` | `15` | | -| `profile.joins` | [][Join](#Join) | | -| `profile.filters` | [`"WHERE t.param_3 = 'the_param' AND t.column_1 = \"xxxxxx-xxxx-xxxx-xxxx-xxxxxxxxx\""`,`"WHERE event_timestamp >= TIMESTAMP(\"2021-10-29\", \"UTC\") AND event_timestamp < TIMESTAMP(\"2021-11-22T02:01:06Z\")"`] | | -| `schema` | [][Column](#column) | | -| `properties.partition_data` | `"partition_data": {"partition_field": "data_date", "require_partition_filter": false, "time_partition": {"partition_by": "DAY","partition_expire": 0 } }` | partition related data for time and range partitioning. | -| `properties.clustering_fields` | `['created_at', 'updated_at']` | list of fields on which the table is clustered | -| `properties.partition_field` | `created_at` | returns the field on which table is time partitioned | +| Field | Sample Value | Description | +| :----------------------------- | :----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | :------------------------------------------------------ | +| `resource.urn` | `project_id.dataset_name.table_name` | | +| `resource.name` | `table_name` | | +| `resource.service` | `bigquery` | | +| `description` | `table description` | | +| `profile.total_rows` | `2100` | | +| `profile.usage_count` | `15` | | +| `profile.joins` | [][Join](#Join) | | +| `profile.filters` | [`"WHERE t.param_3 = 'the_param' AND t.column_1 = \"xxxxxx-xxxx-xxxx-xxxx-xxxxxxxxx\""`,`"WHERE event_timestamp >= TIMESTAMP(\"2021-10-29\", \"UTC\") AND event_timestamp < TIMESTAMP(\"2021-11-22T02:01:06Z\")"`] | | +| `schema` | [][Column](#column) | | +| `properties.partition_data` | `"partition_data": {"partition_field": "data_date", "require_partition_filter": false, "time_partition": {"partition_by": "DAY","partition_expire": 0 } }` | partition related data for time and range partitioning. | +| `properties.clustering_fields` | `['created_at', 'updated_at']` | list of fields on which the table is clustered | +| `properties.partition_field` | `created_at` | returns the field on which table is time partitioned | ### Partition Data -| Field | Sample Value | Description | -|:------------------------------------------|:-------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `partition_field` | `created_at` | field on which the table is partitioned either by TimePartitioning or RangePartitioning. In case field is empty for TimePartitioning _PARTITIONTIME is returned instead of empty. | -| `require_partition_filter` | `true` | boolean value which denotes if every query on the bigquery table must include at least one predicate that only references the partitioning column | -| `time_partition.partition_by` | `HOUR` | returns partition type HOUR/DAY/MONTH/YEAR | -| `time_partition.partition_expire_seconds` | `0` | time in which data will expire from this partition. If 0 it will not expire. | -| `range_partition.interval` | `10` | width of a interval range | -| `range_partition.start` | `0` | start value for partition inclusive of this value | -| `range_partition.end` | `100` | end value for partition exclusive of this value | - +| Field | Sample Value | Description | +| :---------------------------------------- | :----------- | :--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `partition_field` | `created_at` | field on which the table is partitioned either by TimePartitioning or RangePartitioning. In case field is empty for TimePartitioning \_PARTITIONTIME is returned instead of empty. | +| `require_partition_filter` | `true` | boolean value which denotes if every query on the bigquery table must include at least one predicate that only references the partitioning column | +| `time_partition.partition_by` | `HOUR` | returns partition type HOUR/DAY/MONTH/YEAR | +| `time_partition.partition_expire_seconds` | `0` | time in which data will expire from this partition. If 0 it will not expire. | +| `range_partition.interval` | `10` | width of a interval range | +| `range_partition.start` | `0` | start value for partition inclusive of this value | +| `range_partition.end` | `100` | end value for partition exclusive of this value | ### Column | Field | Sample Value | -|:--------------|:---------------------------------------| +| :------------ | :------------------------------------- | | `name` | `total_price` | | `description` | `item's total price` | | `data_type` | `decimal` | @@ -106,14 +107,14 @@ source: ### Join | Field | Sample Value | -|:-------------|:--------------------------------------------------------------------------------------------------------------------------------------------------------| +| :----------- | :------------------------------------------------------------------------------------------------------------------------------------------------------ | | `urn` | `project_id.dataset_name.table_name` | | `count` | `3` | | `conditions` | [`"ON target.column_1 = source.column_1 and target.param_name = source.param_name"`,`"ON DATE(target.event_timestamp) = DATE(source.event_timestamp)"`] | ## Contributing -Refer to the [contribution guidelines](../../../docs/docs/contribute/guide.md#adding-a-new-extractor) for information on +Refer to the [contribution guidelines](../../../docs/docs/contribute/guide.md#adding-a-new-extractor) for information on contributing to this module. [google-default-auth]: https://cloud.google.com/docs/authentication/production#automatically diff --git a/plugins/extractors/bigquery/bigquery.go b/plugins/extractors/bigquery/bigquery.go index d3826b73..6eba8254 100755 --- a/plugins/extractors/bigquery/bigquery.go +++ b/plugins/extractors/bigquery/bigquery.go @@ -41,20 +41,21 @@ var summary string type Config struct { ProjectID string `json:"project_id" yaml:"project_id" mapstructure:"project_id" validate:"required"` // ServiceAccountBase64 takes precedence over ServiceAccountJSON field - ServiceAccountBase64 string `json:"service_account_base64" yaml:"service_account_base64" mapstructure:"service_account_base64"` - ServiceAccountJSON string `json:"service_account_json" yaml:"service_account_json" mapstructure:"service_account_json"` - MaxPageSize int `json:"max_page_size" yaml:"max_page_size" mapstructure:"max_page_size"` - DatasetPageSize int `json:"dataset_page_size" mapstructure:"dataset_page_size"` - TablePageSize int `json:"table_page_size" mapstructure:"table_page_size"` - TablePattern string `json:"table_pattern" yaml:"table_pattern" mapstructure:"table_pattern"` - Exclude Exclude `json:"exclude" yaml:"exclude" mapstructure:"exclude"` - IncludeColumnProfile bool `json:"include_column_profile" yaml:"include_column_profile" mapstructure:"include_column_profile"` - MaxPreviewRows int `json:"max_preview_rows" yaml:"max_preview_rows" mapstructure:"max_preview_rows" default:"30"` - MixValues bool `json:"mix_values" mapstructure:"mix_values" default:"false"` - IsCollectTableUsage bool `json:"collect_table_usage" yaml:"collect_table_usage" mapstructure:"collect_table_usage" default:"false"` - UsagePeriodInDay int64 `json:"usage_period_in_day" yaml:"usage_period_in_day" mapstructure:"usage_period_in_day" default:"7"` - UsageProjectIDs []string `json:"usage_project_ids" yaml:"usage_project_ids" mapstructure:"usage_project_ids"` - BuildViewLineage bool `json:"build_view_lineage" yaml:"build_view_lineage" mapstructure:"build_view_lineage" default:"false"` + ServiceAccountBase64 string `mapstructure:"service_account_base64"` + ServiceAccountJSON string `mapstructure:"service_account_json"` + MaxPageSize int `mapstructure:"max_page_size"` + DatasetPageSize int `mapstructure:"dataset_page_size"` + TablePageSize int `mapstructure:"table_page_size"` + TablePattern string `mapstructure:"table_pattern"` + Exclude Exclude `mapstructure:"exclude"` + IncludeColumnProfile bool `mapstructure:"include_column_profile"` + // MaxPreviewRows can also be set to -1 to restrict adding preview_rows key in asset data + MaxPreviewRows int `mapstructure:"max_preview_rows" default:"30"` + MixValues bool `mapstructure:"mix_values" default:"false"` + IsCollectTableUsage bool `mapstructure:"collect_table_usage" default:"false"` + UsagePeriodInDay int64 `mapstructure:"usage_period_in_day" default:"7"` + UsageProjectIDs []string `mapstructure:"usage_project_ids"` + BuildViewLineage bool `mapstructure:"build_view_lineage" default:"false"` } type Exclude struct { @@ -443,15 +444,19 @@ func (e *Extractor) buildAsset(ctx context.Context, t *bigquery.Table, md *bigqu } } } - table, err := anypb.New(&v1beta2.Table{ - Columns: e.buildColumns(ctx, md.Schema, md), - PreviewFields: previewFields, - PreviewRows: previewRows, - Profile: tableProfile, - Attributes: utils.TryParseMapToProto(attributesData), - CreateTime: timestamppb.New(md.CreationTime), - UpdateTime: timestamppb.New(md.LastModifiedTime), - }) + tableData := &v1beta2.Table{ + Columns: e.buildColumns(ctx, md.Schema, md), + Profile: tableProfile, + Attributes: utils.TryParseMapToProto(attributesData), + CreateTime: timestamppb.New(md.CreationTime), + UpdateTime: timestamppb.New(md.LastModifiedTime), + } + maxPreviewRows := e.config.MaxPreviewRows + if maxPreviewRows != -1 { + tableData.PreviewFields = previewFields + tableData.PreviewRows = previewRows + } + table, err := anypb.New(tableData) if err != nil { e.logger.Warn("error creating Any struct", "error", err) } @@ -513,7 +518,7 @@ func (e *Extractor) buildColumn(ctx context.Context, field *bigquery.FieldSchema func (e *Extractor) buildPreview(ctx context.Context, t *bigquery.Table, md *bigquery.TableMetadata) (fields []string, rows *structpb.ListValue, err error) { maxPreviewRows := e.config.MaxPreviewRows - if maxPreviewRows == 0 { + if maxPreviewRows <= 0 { return nil, nil, nil } diff --git a/plugins/sinks/compass/README.md b/plugins/sinks/compass/README.md index 13ae2a2b..e858fa79 100644 --- a/plugins/sinks/compass/README.md +++ b/plugins/sinks/compass/README.md @@ -15,8 +15,13 @@ sinks: labels: myCustom: $attributes.myCustomField sampleLabel: $labels.sampleLabelField + remove_unset_fields_in_data: false ``` +### _Notes_ + +- Setting `remove_unset_fields_in_data` to `true` will not populate fields in final data which are not set initially in source. Defaults to `false`. + ## Contributing Refer to the [contribution guidelines](../../../docs/docs/contribute/guide.md#adding-a-new-sink) for information on contributing to this module. diff --git a/plugins/sinks/compass/sink.go b/plugins/sinks/compass/sink.go index 8ff1213f..02a60746 100644 --- a/plugins/sinks/compass/sink.go +++ b/plugins/sinks/compass/sink.go @@ -31,6 +31,8 @@ type Config struct { Host string `json:"host" yaml:"host" mapstructure:"host" validate:"required"` Headers map[string]string `json:"headers" yaml:"headers" mapstructure:"headers"` Labels map[string]string `json:"labels" yaml:"labels" mapstructure:"labels"` + // RemoveUnsetFieldsInData if set to true do not populate fields in final sink data which are unset in initial data. + RemoveUnsetFieldsInData bool `mapstructure:"remove_unset_fields_in_data"` } var info = plugins.Info{ @@ -193,7 +195,7 @@ func (s *Sink) buildCompassData(anyData *anypb.Any) (map[string]interface{}, err data, err := protojson.MarshalOptions{ UseProtoNames: true, - EmitUnpopulated: true, + EmitUnpopulated: !s.config.RemoveUnsetFieldsInData, }.Marshal(anyData) if err != nil { return nil, fmt.Errorf("marshaling asset data: %w", err)