Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DB subscription #114

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,5 @@ We use [sqlc](https://docs.sqlc.dev/en/latest/index.html) to generate the code f
```sh
dev/generate
```

If needed, there is a sqlc [playground](https://play.sqlc.dev/p/f6eebe941750560934cefa943c77f63497debc828c487e8d1771fb6d83773246) for experimenting with how the query syntax translates into Go code.
2 changes: 1 addition & 1 deletion dev/test
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ set -e

ulimit -n 2048

go test ./... "$@"
go test -timeout 3s `go list ./... | grep -v -e 'pkg/abis' -e 'pkg/config' -e 'pkg/proto' -e 'pkg/mock' -e 'pkg/testing'` "$@"

if [ -n "${RACE:-}" ]; then
echo
Expand Down
17 changes: 16 additions & 1 deletion pkg/db/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,22 @@ WHERE

-- name: InsertGatewayEnvelope :execrows
SELECT
insert_gateway_envelope(@originator_id, @sequence_id, @topic, @originator_envelope);
insert_gateway_envelope(@originator_id, @originator_sequence_id, @topic, @originator_envelope);

-- name: SelectGatewayEnvelopes :many
SELECT
*
FROM
gateway_envelopes
WHERE (sqlc.narg('topic')::BYTEA IS NULL
OR topic = @topic)
AND (sqlc.narg('originator_node_id')::INT IS NULL
OR originator_node_id = @originator_node_id)
AND (sqlc.narg('originator_sequence_id')::BIGINT IS NULL
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated to the PR, but who generates the sequence ID?

Copy link
Contributor Author

@richardhuaaa richardhuaaa Aug 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question - the originator does, this would be extracted from the originator_sid field of UnsignedOriginatorEnvelope proto.

As for how the originator allocates these ID's in the first place, it is created by the BIGSERIAL id field of the staged_originator_envelope table - so we're using a postgres sequence for this.

OR originator_sequence_id > @originator_sequence_id)
AND (sqlc.narg('gateway_sequence_id')::BIGINT IS NULL
OR id > @gateway_sequence_id)
LIMIT sqlc.narg('row_limit')::INT;

-- name: InsertStagedOriginatorEnvelope :one
SELECT
Expand Down
10 changes: 5 additions & 5 deletions pkg/db/queries/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 65 additions & 5 deletions pkg/db/queries/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

101 changes: 101 additions & 0 deletions pkg/db/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package db

import (
"context"
"fmt"
"time"

"go.uber.org/zap"
)

type PollableDBQuery[ValueType any] func(ctx context.Context, lastSeenID int64, numRows int32) (results []ValueType, lastID int64, err error)

// Poll whenever notified, or at an interval if not notified
type PollingOptions struct {
Interval time.Duration
Notifier <-chan bool
NumRows int32
}

type DBSubscription[ValueType any] struct {
ctx context.Context
log *zap.Logger
lastSeenID int64
options PollingOptions
query PollableDBQuery[ValueType]
updates chan<- []ValueType
}

func NewDBSubscription[ValueType any](
ctx context.Context,
log *zap.Logger,
query PollableDBQuery[ValueType],
lastSeenID int64,
options PollingOptions,
) *DBSubscription[ValueType] {
return &DBSubscription[ValueType]{
ctx: ctx,
log: log,
lastSeenID: lastSeenID,
options: options,
query: query,
updates: nil,
}
}

func (s *DBSubscription[ValueType]) Start() (<-chan []ValueType, error) {
if s.updates != nil {
return nil, fmt.Errorf("Already started")
}
if s.options.NumRows <= 0 || s.log == nil {
return nil, fmt.Errorf("Required params not provided")
}
updates := make(chan []ValueType)
s.updates = updates

go func() {
s.poll()

timer := time.NewTimer(s.options.Interval)
for {
timer.Reset(s.options.Interval)
select {
case <-s.ctx.Done():
s.log.Info("Context done; stopping subscription")
close(s.updates)
return
case <-s.options.Notifier:
neekolas marked this conversation as resolved.
Show resolved Hide resolved
s.poll()
case <-timer.C:
s.poll()
neekolas marked this conversation as resolved.
Show resolved Hide resolved
}
}
}()

return updates, nil
}

func (s *DBSubscription[ValueType]) poll() {
// Repeatedly query page by page until no more results
for {
results, lastID, err := s.query(s.ctx, s.lastSeenID, s.options.NumRows)
if err != nil {
s.log.Error(
"Error querying for DB subscription",
zap.Error(err),
zap.Int64("lastSeenID", s.lastSeenID),
zap.Int32("numRows", s.options.NumRows),
)
// Did not update lastSeenID; will retry on next poll
break
}
if len(results) == 0 {
break
}
s.lastSeenID = lastID
s.updates <- results
if int32(len(results)) < s.options.NumRows {
break
}
}
}
Loading
Loading