Skip to content

Commit

Permalink
Merge tag 'v10.0.0' into update/v10.0.0
Browse files Browse the repository at this point in the history
 * Update README.md
  • Loading branch information
Robi9 committed Jan 21, 2025
2 parents 348acc0 + 10c87e2 commit 1685d62
Show file tree
Hide file tree
Showing 16 changed files with 279 additions and 164 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: CI
on: [push, pull_request]
env:
go-version: "1.22.x"
go-version: "1.23.x"
jobs:
test:
name: Test
Expand Down
28 changes: 28 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,31 @@
v10.0.0 (2025-01-07)
-------------------------
* Update README.md

v9.3.4 (2024-12-16)
-------------------------
* Update to latest gocommon and tweak default cloudwatch namespace

v9.3.3 (2024-12-13)
-------------------------
* Clean up old analytics use

v9.3.2 (2024-12-13)
-------------------------
* Update to latest gocommon

v9.3.1 (2024-12-13)
-------------------------
* Remove librato, use latest gocommon

v9.3.0 (2024-12-13)
-------------------------
* Send metrics to cloudwatch

v9.2.1 (2024-10-08)
-------------------------
* Don't include status groups in contact indexing

v9.2.0 (2024-07-17)
-------------------------
* Test against postgresql 15
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.22
FROM golang:1.23

WORKDIR /usr/src/app

Expand Down
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,19 @@ We recommend running it with no changes to the configuration and no parameters,
environment variables to configure it. You can use `% rp-indexer --help` to see a list of the
environment variables and parameters and for more details on each option.

### RapidPro

For use with RapidPro, you will want to configure these settings:

* `INDEXER_DB`: a URL connection string for your RapidPro database or read replica
* `INDEXER_ELASTIC_URL`: the URL for your ElasticSearch endpoint

### AWS services:

* `INDEXER_AWS_ACCESS_KEY_ID`: AWS access key id used to authenticate to AWS
* `INDEXER_AWS_SECRET_ACCESS_KEY` AWS secret access key used to authenticate to AWS
* `INDEXER_AWS_REGION`: AWS region (ex: `eu-west-1`)

Recommended settings for error reporting:
### Logging and error reporting:

* `INDEXER_SENTRY_DSN`: DSN to use when logging errors to Sentry
* `INDEXER_LOG_LEVEL`: logging level to use (default is `info`)

## Development

Expand Down
36 changes: 23 additions & 13 deletions cmd/rp-indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"github.com/getsentry/sentry-go"
_ "github.com/lib/pq"
"github.com/nyaruka/ezconf"
"github.com/nyaruka/gocommon/aws/cwatch"
indexer "github.com/nyaruka/rp-indexer/v9"
"github.com/nyaruka/rp-indexer/v9/indexers"
"github.com/nyaruka/rp-indexer/v9/runtime"
slogmulti "github.com/samber/slog-multi"
slogsentry "github.com/samber/slog-sentry"
)
Expand All @@ -25,7 +27,7 @@ var (
)

func main() {
cfg := indexer.NewDefaultConfig()
cfg := runtime.NewDefaultConfig()
loader := ezconf.NewLoader(cfg, "indexer", "Indexes RapidPro contacts to ElasticSearch", []string{"indexer.toml"})
loader.MustLoad()

Expand All @@ -36,15 +38,14 @@ func main() {
os.Exit(1)
}

rt := &runtime.Runtime{Config: cfg}

// configure our logger
logHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: level})
slog.SetDefault(slog.New(logHandler))

logger := slog.With("comp", "main")
logger.Info("starting indexer", "version", version, "released", date)

// if we have a DSN entry, try to initialize it
if cfg.SentryDSN != "" {
if rt.Config.SentryDSN != "" {
err := sentry.Init(sentry.ClientOptions{
Dsn: cfg.SentryDSN,
EnableTracing: false,
Expand All @@ -55,7 +56,8 @@ func main() {
}

defer sentry.Flush(2 * time.Second)
logger = slog.New(

logger := slog.New(
slogmulti.Fanout(
logHandler,
slogsentry.Option{Level: slog.LevelError}.NewSentryHandler(),
Expand All @@ -65,24 +67,32 @@ func main() {
slog.SetDefault(logger)
}

db, err := sql.Open("postgres", cfg.DB)
log := slog.With("comp", "main")
log.Info("starting indexer", "version", version, "released", date)

rt.DB, err = sql.Open("postgres", cfg.DB)
if err != nil {
log.Error("unable to connect to database", "error", err)
}

rt.CW, err = cwatch.NewService(rt.Config.AWSAccessKeyID, rt.Config.AWSSecretAccessKey, rt.Config.AWSRegion, rt.Config.CloudwatchNamespace, rt.Config.DeploymentID)
if err != nil {
logger.Error("unable to connect to database")
log.Error("unable to create cloudwatch service", "error", err)
}

idxrs := []indexers.Indexer{
indexers.NewContactIndexer(cfg.ElasticURL, cfg.ContactsIndex, cfg.ContactsShards, cfg.ContactsReplicas, 500),
indexers.NewContactIndexer(rt.Config.ElasticURL, rt.Config.ContactsIndex, rt.Config.ContactsShards, rt.Config.ContactsReplicas, 500),
}

if cfg.Rebuild {
if rt.Config.Rebuild {
// if rebuilding, just do a complete index and quit. In future when we support multiple indexers,
// the rebuild argument can be become the name of the index to rebuild, e.g. --rebuild=contacts
idxr := idxrs[0]
if _, err := idxr.Index(db, true, cfg.Cleanup); err != nil {
logger.Error("error during rebuilding", "error", err, "indexer", idxr.Name())
if _, err := idxr.Index(rt, true, rt.Config.Cleanup); err != nil {
log.Error("error during rebuilding", "error", err, "indexer", idxr.Name())
}
} else {
d := indexer.NewDaemon(cfg, db, idxrs, time.Duration(cfg.Poll)*time.Second)
d := indexer.NewDaemon(rt, idxrs)
d.Start()

handleSignals(d)
Expand Down
38 changes: 0 additions & 38 deletions config.go

This file was deleted.

48 changes: 21 additions & 27 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ package indexer

import (
"context"
"database/sql"
"fmt"
"log/slog"
"sync"
"time"

"github.com/nyaruka/gocommon/analytics"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/rp-indexer/v9/indexers"
"github.com/nyaruka/rp-indexer/v9/runtime"
)

type Daemon struct {
cfg *Config
db *sql.DB
rt *runtime.Runtime
wg *sync.WaitGroup
quit chan bool
indexers []indexers.Indexer
Expand All @@ -24,27 +24,19 @@ type Daemon struct {
}

// NewDaemon creates a new daemon to run the given indexers
func NewDaemon(cfg *Config, db *sql.DB, ixs []indexers.Indexer, poll time.Duration) *Daemon {
func NewDaemon(rt *runtime.Runtime, ixs []indexers.Indexer) *Daemon {
return &Daemon{
cfg: cfg,
db: db,
rt: rt,
wg: &sync.WaitGroup{},
quit: make(chan bool),
indexers: ixs,
poll: poll,
poll: time.Duration(rt.Config.Poll) * time.Second,
prevStats: make(map[indexers.Indexer]indexers.Stats, len(ixs)),
}
}

// Start starts this daemon
func (d *Daemon) Start() {
// if we have a librato token, configure it
if d.cfg.LibratoToken != "" {
analytics.RegisterBackend(analytics.NewLibrato(d.cfg.LibratoUsername, d.cfg.LibratoToken, d.cfg.InstanceName, time.Second, d.wg))
}

analytics.Start()

for _, i := range d.indexers {
d.startIndexer(i)
}
Expand All @@ -68,7 +60,7 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer) {
case <-d.quit:
return
case <-time.After(d.poll):
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup)
_, err := indexer.Index(d.rt, d.rt.Config.Rebuild, d.rt.Config.Cleanup)
if err != nil {
log.Error("error during indexing", "error", err)
}
Expand All @@ -85,7 +77,7 @@ func (d *Daemon) startStatsReporter(interval time.Duration) {

go func() {
defer func() {
slog.Info("analytics exiting")
slog.Info("metrics reporter exiting")
d.wg.Done()
}()

Expand All @@ -107,7 +99,7 @@ func (d *Daemon) reportStats(includeLag bool) {
defer cancel()

log := slog.New(slog.Default().Handler())
metrics := make(map[string]float64, len(d.indexers)*2)
metrics := make([]types.MetricDatum, 0, len(d.indexers)*3)

for _, ix := range d.indexers {
stats := ix.Stats()
Expand All @@ -121,9 +113,13 @@ func (d *Daemon) reportStats(includeLag bool) {
rateInPeriod = float64(indexedInPeriod) / (float64(elapsedInPeriod) / float64(time.Second))
}

metrics[ix.Name()+"_indexed"] = float64(indexedInPeriod)
metrics[ix.Name()+"_deleted"] = float64(deletedInPeriod)
metrics[ix.Name()+"_rate"] = rateInPeriod
idxDim := cwatch.Dimension("Index", ix.Name())

metrics = append(metrics,
cwatch.Datum("RecordsIndexed", float64(indexedInPeriod), types.StandardUnitCount, idxDim),
cwatch.Datum("RecordsDeleted", float64(deletedInPeriod), types.StandardUnitCount, idxDim),
cwatch.Datum("IndexingRate", rateInPeriod, types.StandardUnitCountSecond, idxDim),
)

d.prevStats[ix] = stats

Expand All @@ -132,14 +128,13 @@ func (d *Daemon) reportStats(includeLag bool) {
if err != nil {
log.Error("error getting db last modified", "index", ix.Name(), "error", err)
} else {
metrics[ix.Name()+"_lag"] = lag.Seconds()
metrics = append(metrics, cwatch.Datum("IndexingLag", lag.Seconds(), types.StandardUnitSeconds, idxDim))
}
}
}

for k, v := range metrics {
analytics.Gauge("indexer."+k, v)
log = log.With(k, v)
if err := d.rt.CW.Send(ctx, metrics...); err != nil {
log.Error("error putting metrics", "error", err)
}

log.Info("stats reported")
Expand All @@ -151,7 +146,7 @@ func (d *Daemon) calculateLag(ctx context.Context, ix indexers.Indexer) (time.Du
return 0, fmt.Errorf("error getting ES last modified: %w", err)
}

dbLastModified, err := ix.GetDBLastModified(ctx, d.db)
dbLastModified, err := ix.GetDBLastModified(ctx, d.rt.DB)
if err != nil {
return 0, fmt.Errorf("error getting DB last modified: %w", err)
}
Expand All @@ -162,7 +157,6 @@ func (d *Daemon) calculateLag(ctx context.Context, ix indexers.Indexer) (time.Du
// Stop stops this daemon
func (d *Daemon) Stop() {
slog.Info("daemon stopping")
analytics.Stop()

close(d.quit)
d.wg.Wait()
Expand Down
34 changes: 24 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,37 +1,51 @@
module github.com/nyaruka/rp-indexer/v9

go 1.22
go 1.23

require (
github.com/getsentry/sentry-go v0.29.0
github.com/getsentry/sentry-go v0.28.1
github.com/lib/pq v1.10.9
github.com/nyaruka/ezconf v0.3.0
github.com/nyaruka/gocommon v1.55.8
github.com/samber/slog-multi v1.2.0
github.com/nyaruka/gocommon v1.60.4
github.com/samber/slog-multi v1.2.4
github.com/samber/slog-sentry v1.2.2
github.com/stretchr/testify v1.9.0
github.com/stretchr/testify v1.10.0
)

require golang.org/x/text v0.16.0 // indirect

require (
github.com/aws/aws-sdk-go-v2 v1.32.6 // indirect
github.com/aws/aws-sdk-go-v2/config v1.28.6 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.47 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.6 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.24.7 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.33.2 // indirect
github.com/aws/smithy-go v1.22.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/structs v1.1.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.4 // indirect
github.com/go-chi/chi/v5 v5.1.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.7 // indirect
github.com/go-chi/chi/v5 v5.2.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/naoina/go-stringutil v0.1.0 // indirect
github.com/naoina/toml v0.1.1 // indirect
github.com/nyaruka/librato v1.1.1 // indirect
github.com/nyaruka/null/v2 v2.0.3 // indirect
github.com/nyaruka/phonenumbers v1.4.0 // indirect
github.com/nyaruka/phonenumbers v1.4.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/samber/lo v1.46.0 // indirect
github.com/samber/lo v1.47.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
golang.org/x/exp v0.0.0-20240716175740-e3f259677ff7 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 1685d62

Please sign in to comment.