-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
114 lines (98 loc) · 2.42 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package main
import (
"context"
"log/slog"
"os"
"os/exec"
"strconv"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"zgo.at/slog_align"
"zgo.at/zli"
)
type (
xrefArgs struct {
CollectionID int32 `json:"collection_id" river:"unique"`
}
xrefWorker struct {
river.WorkerDefaults[xrefArgs]
Dev, Debug bool
}
)
func (xrefArgs) Kind() string { return "xref-compute" }
func (w *xrefWorker) Work(ctx context.Context, job *river.Job[xrefArgs]) error {
args := []string{"./xref.py"}
if w.Dev {
args = append(args, "-dev")
}
if w.Debug {
args = append(args, "-debug")
}
cmd := exec.Command("python3", append(args, strconv.Itoa(int(job.Args.CollectionID)))...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
if err != nil {
return err
}
return nil
}
const usage = `
Run xref.py for every job in the river "xref" queue.
Flags:
-dev Format logs as plain text, rather than JSON.
-debug Enable debug logs.
`
func main() {
f := zli.NewFlags(os.Args)
var (
dev = f.Bool(false, "dev")
debug = f.Bool(false, "debug")
)
err := f.Parse()
zli.F(err)
lvl := slog.LevelInfo
if debug.Bool() {
lvl = slog.LevelDebug
}
var lh slog.Handler = slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: lvl})
if dev.Bool() {
lh = slog_align.NewAlignedHandler(os.Stdout, &slog.HandlerOptions{Level: lvl})
}
log := slog.New(lh)
out, err := exec.Command("python3", "./xref.py", "0").CombinedOutput()
if err != nil {
log.Error("xref.py failed", "err", err, "out", string(out))
os.Exit(1)
}
workers := river.NewWorkers()
river.AddWorker(workers, &xrefWorker{Dev: dev.Bool(), Debug: debug.Bool()})
conn, ok := os.LookupEnv("ALFRED_DB_FTM")
if !ok {
conn = "postgres://aleph:[email protected]/aleph_ftm"
}
dbpool, err := pgxpool.New(context.Background(), conn)
if err != nil {
log.Error("connecting to PostgreSQL", "err", err, "conn", conn)
os.Exit(1)
}
rc, err := river.NewClient(riverpgxv5.New(dbpool), &river.Config{
Workers: workers,
MaxAttempts: 2,
JobTimeout: 8 * time.Hour,
Queues: map[string]river.QueueConfig{"xref": {MaxWorkers: 1}},
Logger: log,
})
if err != nil {
log.Error("starting river client", "err", err)
os.Exit(1)
}
err = rc.Start(context.Background())
if err != nil {
log.Error("starting river", "err", err)
os.Exit(1)
}
<-make(chan struct{})
}