From 849520ee3f7c6615f04231612c23d5b0bdfa9b36 Mon Sep 17 00:00:00 2001 From: Gosuke Miyashita Date: Thu, 17 Oct 2024 17:38:59 +0900 Subject: [PATCH] Write a query result to a table that has an expiration --- lib/bq/bq.go | 73 +++++++++++++++++++++++++++++++++++++++----- lib/config/config.go | 11 +++++-- lib/runner/runner.go | 21 +++++++++++-- 3 files changed, 93 insertions(+), 12 deletions(-) diff --git a/lib/bq/bq.go b/lib/bq/bq.go index f24e7b6..a2efa57 100644 --- a/lib/bq/bq.go +++ b/lib/bq/bq.go @@ -3,41 +3,70 @@ package bq import ( "context" "fmt" + "time" "cloud.google.com/go/bigquery" "github.com/aktsk/bqnotify/lib/config" "google.golang.org/api/iterator" ) +type Result struct { + Headers []string + Rows [][]string + DatasetID string + TableID string +} + // Query runs queries to BigQuery and return results -func Query(project string, query config.Query) ([]string, [][]string, error) { +func Query(project string, query config.Query) (*Result, error) { ctx := context.Background() client, err := bigquery.NewClient(ctx, project) if err != nil { - return nil, nil, err + return nil, err } defer client.Close() q := client.Query(query.SQL) + var datasetID string + var tableID string + + // Write the query result to the table specified in config.yaml + if query.ResultTable != nil { + datasetID = query.ResultTable.DatasetID + tableID = fmt.Sprintf("%s%s", query.ResultTable.TableIDPrefix, time.Now().Format("20060102150405")) + + // Create dataset if it does not exist + metadata, _ := client.Dataset(datasetID).Metadata(ctx) + if metadata == nil { + err := client.Dataset(datasetID).Create(ctx, &bigquery.DatasetMetadata{}) + if err != nil { + return nil, err + } + } + + // Set the destination table to write the query result + q.QueryConfig.Dst = client.Dataset(datasetID).Table(tableID) + } + job, err := q.Run(ctx) if err != nil { - return nil, nil, err + return nil, err } status, err := job.Wait(ctx) if err != nil { - return nil, nil, err + return nil, err } if err := status.Err(); err != nil { - return nil, nil, err + return nil, err } it, err := job.Read(ctx) if err != nil { - return nil, nil, err + return nil, err } headers := []string{} @@ -56,7 +85,7 @@ func Query(project string, query config.Query) ([]string, [][]string, error) { } if err != nil { - return nil, nil, err + return nil, err } values := []string{} @@ -66,5 +95,33 @@ func Query(project string, query config.Query) ([]string, [][]string, error) { rows = append(rows, values) } - return headers, rows, nil + // Set expiration time to the result table + if query.ResultTable != nil { + tableRef := client.Dataset(datasetID).Table(tableID) + + meta, err := tableRef.Metadata(ctx) + if err != nil { + return nil, err + } + + if query.ResultTable.ExpirationInDays == 0 { + query.ResultTable.ExpirationInDays = 30 + } + + update := bigquery.TableMetadataToUpdate{ + ExpirationTime: time.Now().Add(time.Duration(query.ResultTable.ExpirationInDays*24) * time.Hour), + } + + _, err = tableRef.Update(ctx, update, meta.ETag) + if err != nil { + return nil, err + } + } + + return &Result{ + Headers: headers, + Rows: rows, + DatasetID: datasetID, + TableID: tableID, + }, nil } diff --git a/lib/config/config.go b/lib/config/config.go index d9b0b4e..d7b53b7 100644 --- a/lib/config/config.go +++ b/lib/config/config.go @@ -17,8 +17,15 @@ type Config struct { } type Query struct { - SQL string - Slack *notify.Slack + SQL string + Slack *notify.Slack + ResultTable *ResultTable `yaml:"result_table"` +} + +type ResultTable struct { + DatasetID string `yaml:"dataset_id"` + TableIDPrefix string `yaml:"table_id_prefix"` + ExpirationInDays int `yaml:"expiration_in_days"` } // Parse parses config.yaml diff --git a/lib/runner/runner.go b/lib/runner/runner.go index 6d8eaf5..65e96f1 100644 --- a/lib/runner/runner.go +++ b/lib/runner/runner.go @@ -3,6 +3,7 @@ package runner import ( "bytes" "encoding/csv" + "fmt" "os" "github.com/aktsk/bqnotify/lib/bq" @@ -39,15 +40,18 @@ func Run(file string) error { } func run(project string, query config.Query) error { - headers, rows, err := bq.Query(project, query) + result, err := bq.Query(project, query) if err != nil { return err } + rows := result.Rows if len(rows) == 0 { return nil } + headers := result.Headers + // Human-readable table query.Slack.URL = os.Getenv("SLACK_WEBHOOK_URL") if query.Slack.URL != "" { @@ -90,7 +94,20 @@ func run(project string, query config.Query) error { return nil } - return query.Slack.Upload(&csvBuffer) + err = query.Slack.Upload(&csvBuffer) + if err != nil { + return err + } + } + + // Notify the Dataset ID and Table ID of the result table + if query.Slack.URL != "" && query.ResultTable != nil { + url := fmt.Sprintf("https://console.cloud.google.com/bigquery?p=%s&d=%s&t=%s&page=table", project, result.DatasetID, result.TableID) + message := fmt.Sprintf("This query result has been written to the following table:\n\nProject ID: %s\nDataset ID: %s\nTable ID: %s\n\n%s", project, result.DatasetID, result.TableID, url) + err = query.Slack.Notify(message) + if err != nil { + return err + } } return nil