Skip to content

Commit

Permalink
Merge pull request #39 from aktsk/feature/write-query-result-to-table
Browse files Browse the repository at this point in the history
Write a query result to a table that has an expiration
  • Loading branch information
otofune authored Oct 18, 2024
2 parents 8126a4f + 849520e commit 5abb005
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 12 deletions.
73 changes: 65 additions & 8 deletions lib/bq/bq.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,70 @@ package bq
import (
"context"
"fmt"
"time"

"cloud.google.com/go/bigquery"
"github.com/aktsk/bqnotify/lib/config"
"google.golang.org/api/iterator"
)

type Result struct {
Headers []string
Rows [][]string
DatasetID string
TableID string
}

// Query runs queries to BigQuery and return results
func Query(project string, query config.Query) ([]string, [][]string, error) {
func Query(project string, query config.Query) (*Result, error) {
ctx := context.Background()

client, err := bigquery.NewClient(ctx, project)
if err != nil {
return nil, nil, err
return nil, err
}

defer client.Close()

q := client.Query(query.SQL)

var datasetID string
var tableID string

// Write the query result to the table specified in config.yaml
if query.ResultTable != nil {
datasetID = query.ResultTable.DatasetID
tableID = fmt.Sprintf("%s%s", query.ResultTable.TableIDPrefix, time.Now().Format("20060102150405"))

// Create dataset if it does not exist
metadata, _ := client.Dataset(datasetID).Metadata(ctx)
if metadata == nil {
err := client.Dataset(datasetID).Create(ctx, &bigquery.DatasetMetadata{})
if err != nil {
return nil, err
}
}

// Set the destination table to write the query result
q.QueryConfig.Dst = client.Dataset(datasetID).Table(tableID)
}

job, err := q.Run(ctx)
if err != nil {
return nil, nil, err
return nil, err
}

status, err := job.Wait(ctx)
if err != nil {
return nil, nil, err
return nil, err
}
if err := status.Err(); err != nil {
return nil, nil, err
return nil, err
}

it, err := job.Read(ctx)
if err != nil {
return nil, nil, err
return nil, err
}

headers := []string{}
Expand All @@ -56,7 +85,7 @@ func Query(project string, query config.Query) ([]string, [][]string, error) {
}

if err != nil {
return nil, nil, err
return nil, err
}

values := []string{}
Expand All @@ -66,5 +95,33 @@ func Query(project string, query config.Query) ([]string, [][]string, error) {
rows = append(rows, values)
}

return headers, rows, nil
// Set expiration time to the result table
if query.ResultTable != nil {
tableRef := client.Dataset(datasetID).Table(tableID)

meta, err := tableRef.Metadata(ctx)
if err != nil {
return nil, err
}

if query.ResultTable.ExpirationInDays == 0 {
query.ResultTable.ExpirationInDays = 30
}

update := bigquery.TableMetadataToUpdate{
ExpirationTime: time.Now().Add(time.Duration(query.ResultTable.ExpirationInDays*24) * time.Hour),
}

_, err = tableRef.Update(ctx, update, meta.ETag)
if err != nil {
return nil, err
}
}

return &Result{
Headers: headers,
Rows: rows,
DatasetID: datasetID,
TableID: tableID,
}, nil
}
11 changes: 9 additions & 2 deletions lib/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,15 @@ type Config struct {
}

type Query struct {
SQL string
Slack *notify.Slack
SQL string
Slack *notify.Slack
ResultTable *ResultTable `yaml:"result_table"`
}

type ResultTable struct {
DatasetID string `yaml:"dataset_id"`
TableIDPrefix string `yaml:"table_id_prefix"`
ExpirationInDays int `yaml:"expiration_in_days"`
}

// Parse parses config.yaml
Expand Down
21 changes: 19 additions & 2 deletions lib/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package runner
import (
"bytes"
"encoding/csv"
"fmt"
"os"

"github.com/aktsk/bqnotify/lib/bq"
Expand Down Expand Up @@ -39,15 +40,18 @@ func Run(file string) error {
}

func run(project string, query config.Query) error {
headers, rows, err := bq.Query(project, query)
result, err := bq.Query(project, query)
if err != nil {
return err
}

rows := result.Rows
if len(rows) == 0 {
return nil
}

headers := result.Headers

// Human-readable table
query.Slack.URL = os.Getenv("SLACK_WEBHOOK_URL")
if query.Slack.URL != "" {
Expand Down Expand Up @@ -90,7 +94,20 @@ func run(project string, query config.Query) error {
return nil
}

return query.Slack.Upload(&csvBuffer)
err = query.Slack.Upload(&csvBuffer)
if err != nil {
return err
}
}

// Notify the Dataset ID and Table ID of the result table
if query.Slack.URL != "" && query.ResultTable != nil {
url := fmt.Sprintf("https://console.cloud.google.com/bigquery?p=%s&d=%s&t=%s&page=table", project, result.DatasetID, result.TableID)
message := fmt.Sprintf("This query result has been written to the following table:\n\nProject ID: %s\nDataset ID: %s\nTable ID: %s\n\n%s", project, result.DatasetID, result.TableID, url)
err = query.Slack.Notify(message)
if err != nil {
return err
}
}

return nil
Expand Down

0 comments on commit 5abb005

Please sign in to comment.