Skip to content

Commit

Permalink
Change source for BigQuery table ETag
Browse files Browse the repository at this point in the history
  • Loading branch information
grzr committed Jan 16, 2025
1 parent 44e02f9 commit 6fdc9c0
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 20 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
module github.com/ottogroup/penelope

require (
cloud.google.com/go v0.116.0
cloud.google.com/go/bigquery v1.64.0
cloud.google.com/go/iam v1.2.2
cloud.google.com/go/logging v1.12.0
Expand Down Expand Up @@ -31,6 +30,7 @@ require (

require (
cel.dev/expr v0.18.0 // indirect
cloud.google.com/go v0.116.0 // indirect
cloud.google.com/go/auth v0.10.1 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.5 // indirect
cloud.google.com/go/compute/metadata v0.5.2 // indirect
Expand Down
4 changes: 2 additions & 2 deletions pkg/http/mock/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ Content-Type: application/json; charset=UTF-8
{"kind":"bigquery#getQueryResultsResponse","etag":"P0Ea2Yx1PihRPFrsH/Q3fA==","schema":{"fields":[{"name":"total","type":"INTEGER","mode":"NULLABLE"},{"name":"p","type":"TIMESTAMP","mode":"NULLABLE"}]},"jobReference":{"projectId":"local-ability-backup","jobID":"PYRwoNSDhUNuqUtVbxtkjbSvmx7","location":"EU"},"totalRows":"76","totalBytesProcessed":"0","jobComplete":true,"cacheHit":false}`

getMetadataTablePartitionsQueryResponse = `HTTP/2.0 200 OK
Content-Length: 1462
Content-Length: 2857
Content-Type: application/json; charset=UTF-8
{"kind": "bigquery#getQueryResultsResponse", "etag": "P0Ea2Yx1PihRPFrsH/Q3fA==", "schema": {"fields": [{"name": "total", "fullname": "total", "type": "INTEGER", "mode": "NULLABLE", "description": "", "fields": [], "hasPermission": true, "dataPolicies": [], "maxLength": "0", "precision": "0", "scale": "0", "roundingMode": "ROUNDING_MODE_UNSPECIFIED", "collation": "", "defaultValueExpression": "", "isMeasure": false, "rangeElementType": {"type": ""}, "foreignTypeDefinition": "", "dataGovernanceTags": [], "identityColumnInfo": {"generatedMode": "GENERATED_MODE_UNSPECIFIED", "start": "", "increment": ""}}, {"name": "partition_id", "fullname": "partition_id", "type": "STRING", "mode": "NULLABLE", "description": "", "fields": [], "hasPermission": true, "dataPolicies": [], "maxLength": "0", "precision": "0", "scale": "0", "roundingMode": "ROUNDING_MODE_UNSPECIFIED", "collation": "", "defaultValueExpression": "", "isMeasure": false, "rangeElementType": {"type": ""}, "foreignTypeDefinition": "", "dataGovernanceTags": [], "identityColumnInfo": {"generatedMode": "GENERATED_MODE_UNSPECIFIED", "start": "", "increment": ""}}]}, "rows": [{"f": [{"v": "22498"}, {"v": "20190110"}]}, {"f": [{"v": "65"}, {"v": "20200229"}]}], "jobReference": {"projectId": "local-ability-backup", "jobID": "PYRwoNSDhUNuqUtVbxtkjbSvmx7", "location": "EU"}, "totalRows": "2", "totalBytesProcessed": "0", "jobComplete": true, "cacheHit": false}`
{"kind":"bigquery#getQueryResultsResponse","etag":"P0Ea2Yx1PihRPFrsH/Q3fA==","schema":{"fields":[{"name":"total_rows","fullname":"total_rows","type":"INTEGER","mode":"NULLABLE","description":"","fields":[],"hasPermission":true,"dataPolicies":[],"maxLength":"0","precision":"0","scale":"0","roundingMode":"ROUNDING_MODE_UNSPECIFIED","collation":"","defaultValueExpression":"","isMeasure":false,"rangeElementType":{"type":""},"foreignTypeDefinition":"","dataGovernanceTags":[],"identityColumnInfo":{"generatedMode":"GENERATED_MODE_UNSPECIFIED","start":"","increment":""}},{"name":"total_logical_bytes","fullname":"total_logical_bytes","type":"INTEGER","mode":"NULLABLE","description":"","fields":[],"hasPermission":true,"dataPolicies":[],"maxLength":"0","precision":"0","scale":"0","roundingMode":"ROUNDING_MODE_UNSPECIFIED","collation":"","defaultValueExpression":"","isMeasure":false,"rangeElementType":{"type":""},"foreignTypeDefinition":"","dataGovernanceTags":[],"identityColumnInfo":{"generatedMode":"GENERATED_MODE_UNSPECIFIED","start":"","increment":""}},{"name":"partition_id","fullname":"partition_id","type":"STRING","mode":"NULLABLE","description":"","fields":[],"hasPermission":true,"dataPolicies":[],"maxLength":"0","precision":"0","scale":"0","roundingMode":"ROUNDING_MODE_UNSPECIFIED","collation":"","defaultValueExpression":"","isMeasure":false,"rangeElementType":{"type":""},"foreignTypeDefinition":"","dataGovernanceTags":[],"identityColumnInfo":{"generatedMode":"GENERATED_MODE_UNSPECIFIED","start":"","increment":""}},{"name":"table_name","fullname":"table_name","type":"STRING","mode":"NULLABLE","description":"","fields":[],"hasPermission":true,"dataPolicies":[],"maxLength":"0","precision":"0","scale":"0","roundingMode":"ROUNDING_MODE_UNSPECIFIED","collation":"","defaultValueExpression":"","isMeasure":false,"rangeElementType":{"type":""},"foreignTypeDefinition":"","dataGovernanceTags":[],"identityColumnInfo":{"generatedMode":"GENERATED_MODE_UNSPECIFIED","start":"","increment":""}},{"name":"last_modified_time","fullname":"last_modified_time","type":"TIMESTAMP","mode":"NULLABLE","description":"","fields":[],"hasPermission":true,"dataPolicies":[],"maxLength":"0","precision":"0","scale":"0","roundingMode":"ROUNDING_MODE_UNSPECIFIED","collation":"","defaultValueExpression":"","isMeasure":false,"rangeElementType":{"type":""},"foreignTypeDefinition":"","dataGovernanceTags":[],"identityColumnInfo":{"generatedMode":"GENERATED_MODE_UNSPECIFIED","start":"","increment":""}}]},"rows":[{"f":[{"v":"1"},{"v":"3"},{"v":"20250116"},{"v":"b1"},{"v":"1737011737327000"}]},{"f":[{"v":"2"},{"v":"5"},{"v":"20250115"},{"v":"b1"},{"v":"1737011737322000"}]}],"jobReference":{"projectId":"local-ability-backup","jobID":"PYRwoNSDhUNuqUtVbxtkjbSvmx7","location":"EU"},"totalRows":"1","totalBytesProcessed":"0","jobComplete":true,"cacheHit":false}`

getExtractJobResultOkResponse = `HTTP/2.0 200 OK
Content-Length: 1191
Expand Down
52 changes: 35 additions & 17 deletions pkg/service/bigquery/bigquery_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bigquery

import (
"context"
"crypto/sha1"
"errors"
"fmt"
"net/http"
Expand All @@ -24,12 +25,30 @@ type Table struct {
LastModifiedTime time.Time
}

func newTableEntry(name string, tableMetadata *bq.TableMetadata) *Table {
func newTableEntry(name string, t tablePartition) *Table {
// Generate a short SHA from the last modified time as ETag from BigQuery API call changes
h := sha1.New()
h.Write([]byte(t.LastModifiedTime.String()))
shortSHA := fmt.Sprintf("%x", h.Sum(nil))[:8]

return &Table{
Name: name,
Checksum: shortSHA,
SizeInBytes: float64(t.TotalLogicalBytes),
LastModifiedTime: t.LastModifiedTime,
}
}
func newTableEntryFromMetadata(name string, t *bq.TableMetadata) *Table {
// Generate a short SHA from the last modified time as ETag from BigQuery API call changes
h := sha1.New()
h.Write([]byte(t.LastModifiedTime.String()))
shortSHA := fmt.Sprintf("%x", h.Sum(nil))[:8]

return &Table{
Name: name,
Checksum: tableMetadata.ETag,
SizeInBytes: float64(tableMetadata.NumBytes),
LastModifiedTime: tableMetadata.LastModifiedTime,
Checksum: shortSHA,
SizeInBytes: float64(t.NumBytes),
LastModifiedTime: t.LastModifiedTime,
}
}

Expand Down Expand Up @@ -166,7 +185,7 @@ func (d *defaultBigQueryClient) GetTable(ctxIn context.Context, project string,
if err != nil {
return &Table{}, err
}
return newTableEntry(table, tableMetadata), nil
return newTableEntryFromMetadata(table, tableMetadata), nil
}

// GetTablesInDataset list all tables in a dataset
Expand All @@ -179,10 +198,10 @@ func (d *defaultBigQueryClient) GetTablesInDataset(ctxIn context.Context, projec
tableIt := d.client.DatasetInProject(project, dataset).Tables(ctx)
for {
oTable, err := tableIt.Next()
if err == iterator.Done {
if errors.Is(err, iterator.Done) {
break
}
if err != iterator.Done && err != nil {
if err != nil {
return nil, err
}
if oTable == nil {
Expand All @@ -194,7 +213,7 @@ func (d *defaultBigQueryClient) GetTablesInDataset(ctxIn context.Context, projec
return []*Table{}, err
}
if tableMetadata.Type == bq.RegularTable {
tables = append(tables, newTableEntry(oTable.TableID, tableMetadata))
tables = append(tables, newTableEntryFromMetadata(oTable.TableID, tableMetadata))
}
}
return tables, nil
Expand All @@ -213,8 +232,11 @@ func (d *defaultBigQueryClient) HasTablePartitions(ctxIn context.Context, projec
}

type tablePartition struct {
Total int64 `bigquery:"total"`
PartitionID string `bigquery:"partition_id"`
TotalLogicalBytes int64 `bigquery:"total_logical_bytes"`
TotalRows int64 `bigquery:"total_rows"`
PartitionID string `bigquery:"partition_id"`
TableName string `bigquery:"table_name"`
LastModifiedTime time.Time `bigquery:"last_modified_time"`
}

// GetTablePartitions list all partitions in table
Expand All @@ -232,7 +254,7 @@ func (d *defaultBigQueryClient) GetTablePartitions(ctxIn context.Context, projec
}

var partitions []*Table
q := fmt.Sprintf("SELECT total_rows AS total, partition_id FROM `%s.%s.INFORMATION_SCHEMA.PARTITIONS` WHERE TABLE_NAME = '%s'",
q := fmt.Sprintf("SELECT total_rows, total_logical_bytes, partition_id, table_name, last_modified_time FROM `%s.%s.INFORMATION_SCHEMA.PARTITIONS` WHERE TABLE_NAME = '%s'",
project, dataset, table,
)

Expand All @@ -253,7 +275,7 @@ func (d *defaultBigQueryClient) GetTablePartitions(ctxIn context.Context, projec
} else if err != nil {
return nil, err
}
if s.Total == 0 {
if s.TotalRows == 0 {
continue
}

Expand All @@ -267,11 +289,7 @@ func (d *defaultBigQueryClient) GetTablePartitions(ctxIn context.Context, projec
continue
}
partitionTable := fmt.Sprintf("%s$%s", table, partition)
tableMetadata, err := d.client.DatasetInProject(project, dataset).Table(partitionTable).Metadata(ctx)
if err != nil {
return []*Table{}, err
}
partitions = append(partitions, newTableEntry(partitionTable, tableMetadata))
partitions = append(partitions, newTableEntry(partitionTable, s))
partitionMetadataCollected[partition] = true
}
return partitions, nil
Expand Down

0 comments on commit 6fdc9c0

Please sign in to comment.