Skip to content

Commit

Permalink
Rework schema and queries for gateway envelopes (#145)
Browse files Browse the repository at this point in the history
Now that we have [vector clock
cursors](xmtp/proto#201), removing the need for
a gateway sequence ID, there are some schema changes that are worth
going over with a keen eye:
- Remove the `sequence_id` on `gateway_envelopes` and replace it with a
timestamp
- The timestamp needs to match the insertion order *per-originator*, but
does not need to match insertion order globally.
- Updating the `SelectGatewayEnvelopes` query to be vector clock aware
(only returning envelopes greater than the vector clock cursor)

Because the any changes in this PR affect the downstream work, I'm
splitting the work to make the Go code vector-clock aware into a
separate PR. Instead, I've left TODO's in the code that I will replace
in the future PR.

#132
  • Loading branch information
richardhuaaa authored Sep 6, 2024
1 parent a8d42a3 commit 5a67cd9
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 65 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,4 @@ We use [sqlc](https://docs.sqlc.dev/en/latest/index.html) to generate the code f
dev/generate
```

If needed, there is a sqlc [playground](https://play.sqlc.dev/p/f6eebe941750560934cefa943c77f63497debc828c487e8d1771fb6d83773246) for experimenting with how the query syntax translates into Go code.
If needed, you may use `dev/psql` to test more complex SQL syntax. To determine the level of support in sqlc, there is a sqlc [playground](https://play.sqlc.dev/p/f6eebe941750560934cefa943c77f63497debc828c487e8d1771fb6d83773246) for experimenting with how the query syntax translates into Go code.
8 changes: 3 additions & 5 deletions pkg/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,9 @@ func (s *Service) queryReqToDBParams(
req *message_api.QueryEnvelopesRequest,
) (*queries.SelectGatewayEnvelopesParams, error) {
params := queries.SelectGatewayEnvelopesParams{
Topic: []byte{},
OriginatorNodeID: sql.NullInt32{},
OriginatorSequenceID: sql.NullInt64{},
GatewaySequenceID: sql.NullInt64{},
RowLimit: db.NullInt32(maxRequestedRows),
Topic: []byte{},
OriginatorNodeID: sql.NullInt32{},
RowLimit: db.NullInt32(maxRequestedRows),
}

query := req.GetQuery()
Expand Down
27 changes: 20 additions & 7 deletions pkg/db/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,36 @@ WHERE
singleton_id = 1;

-- name: InsertGatewayEnvelope :execrows
SELECT
insert_gateway_envelope(@originator_node_id, @originator_sequence_id, @topic, @originator_envelope);
INSERT INTO gateway_envelopes(originator_node_id, originator_sequence_id, topic, originator_envelope)
VALUES (@originator_node_id, @originator_sequence_id, @topic, @originator_envelope)
ON CONFLICT
DO NOTHING;

-- name: SelectGatewayEnvelopes :many
WITH cursors AS (
SELECT
UNNEST(@cursor_node_ids::INT[]) AS cursor_node_id,
UNNEST(@cursor_sequence_ids::BIGINT[]) AS cursor_sequence_id
)
SELECT
*
gateway_envelopes.*
FROM
gateway_envelopes
-- Assumption: There is only one cursor per node ID. Caller must verify this
LEFT JOIN cursors ON gateway_envelopes.originator_node_id = cursors.cursor_node_id
WHERE (sqlc.narg('topic')::BYTEA IS NULL
OR length(@topic) = 0
OR topic = @topic)
AND (sqlc.narg('originator_node_id')::INT IS NULL
OR originator_node_id = @originator_node_id)
AND (sqlc.narg('originator_sequence_id')::BIGINT IS NULL
OR originator_sequence_id > @originator_sequence_id)
AND (sqlc.narg('gateway_sequence_id')::BIGINT IS NULL
OR id > @gateway_sequence_id)
AND (cursor_sequence_id IS NULL
OR originator_sequence_id > cursor_sequence_id)
ORDER BY
-- Assumption: envelopes are inserted in sequence_id order per originator, therefore
-- gateway_time preserves sequence_id order
gateway_time,
originator_node_id,
originator_sequence_id ASC
LIMIT sqlc.narg('row_limit')::INT;

-- name: InsertStagedOriginatorEnvelope :one
Expand Down
2 changes: 1 addition & 1 deletion pkg/db/queries/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 31 additions & 16 deletions pkg/db/queries/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 11 additions & 11 deletions pkg/db/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@ func setup(t *testing.T) (*sql.DB, *zap.Logger, func()) {
func insertInitialRows(t *testing.T, db *sql.DB) {
testutils.InsertGatewayEnvelopes(t, db, []queries.InsertGatewayEnvelopeParams{
{
// Auto-generated ID: 1
OriginatorNodeID: 1,
OriginatorSequenceID: 1,
Topic: []byte("topicA"),
OriginatorEnvelope: []byte("envelope1"),
},
{
// Auto-generated ID: 2
OriginatorNodeID: 2,
OriginatorSequenceID: 1,
Topic: []byte("topicA"),
Expand All @@ -45,15 +43,15 @@ func envelopesQuery(db *sql.DB) PollableDBQuery[queries.GatewayEnvelope] {
return func(ctx context.Context, lastSeenID int64, numRows int32) ([]queries.GatewayEnvelope, int64, error) {
envs, err := queries.New(db).
SelectGatewayEnvelopes(ctx, queries.SelectGatewayEnvelopesParams{
OriginatorNodeID: NullInt32(1),
GatewaySequenceID: NullInt64(lastSeenID),
RowLimit: NullInt32(numRows),
OriginatorNodeID: NullInt32(1),
RowLimit: NullInt32(numRows),
})
if err != nil {
return nil, 0, err
}
if len(envs) > 0 {
lastSeenID = envs[len(envs)-1].ID
// TODO(rich) fix cursor
lastSeenID = envs[len(envs)-1].OriginatorSequenceID
}
return envs, lastSeenID, nil
}
Expand All @@ -62,21 +60,18 @@ func envelopesQuery(db *sql.DB) PollableDBQuery[queries.GatewayEnvelope] {
func insertAdditionalRows(t *testing.T, db *sql.DB, notifyChan ...chan bool) {
testutils.InsertGatewayEnvelopes(t, db, []queries.InsertGatewayEnvelopeParams{
{
// Auto-generated ID: 3
OriginatorNodeID: 1,
OriginatorSequenceID: 2,
Topic: []byte("topicA"),
OriginatorEnvelope: []byte("envelope3"),
},
{
// Auto-generated ID: 4
OriginatorNodeID: 2,
OriginatorSequenceID: 2,
Topic: []byte("topicA"),
OriginatorEnvelope: []byte("envelope4"),
},
{
// Auto-generated ID: 5
OriginatorNodeID: 1,
OriginatorSequenceID: 3,
Topic: []byte("topicA"),
Expand All @@ -88,12 +83,14 @@ func insertAdditionalRows(t *testing.T, db *sql.DB, notifyChan ...chan bool) {
func validateUpdates(t *testing.T, updates <-chan []queries.GatewayEnvelope, ctxCancel func()) {
envs := <-updates
require.Equal(t, 1, len(envs))
require.Equal(t, int64(3), envs[0].ID)
// TODO(rich) fix cursor
// require.Equal(t, int64(3), envs[0].OriginatorSequenceID)
require.Equal(t, []byte("envelope3"), envs[0].OriginatorEnvelope)

envs = <-updates
require.Equal(t, 1, len(envs))
require.Equal(t, int64(5), envs[0].ID)
// TODO(rich) fix cursor
// require.Equal(t, int64(5), envs[0].OriginatorSequenceID)
require.Equal(t, []byte("envelope5"), envs[0].OriginatorEnvelope)

ctxCancel()
Expand All @@ -117,6 +114,7 @@ func flakyEnvelopesQuery(db *sql.DB) PollableDBQuery[queries.GatewayEnvelope] {
}

func TestIntervalSubscription(t *testing.T) {
t.Skip("TODO(rich) fix cursor")
db, log, cleanup := setup(t)
defer cleanup()

Expand All @@ -142,6 +140,7 @@ func TestIntervalSubscription(t *testing.T) {
}

func TestNotifiedSubscription(t *testing.T) {
t.Skip("TODO(rich) fix cursor")
db, log, cleanup := setup(t)
defer cleanup()

Expand Down Expand Up @@ -169,6 +168,7 @@ func TestNotifiedSubscription(t *testing.T) {
}

func TestTemporaryDBError(t *testing.T) {
t.Skip("TODO(rich) fix cursor")
db, log, cleanup := setup(t)
defer cleanup()

Expand Down
29 changes: 5 additions & 24 deletions pkg/migrations/00001_init-schema.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,17 @@ CREATE TABLE node_info(

-- Includes all envelopes, whether they were originated locally or not
CREATE TABLE gateway_envelopes(
-- used to construct gateway_sid
id BIGSERIAL PRIMARY KEY,
gateway_time TIMESTAMP NOT NULL DEFAULT now(),
originator_node_id INT NOT NULL,
originator_sequence_id BIGINT NOT NULL,
topic BYTEA NOT NULL,
originator_envelope BYTEA NOT NULL
originator_envelope BYTEA NOT NULL,
PRIMARY KEY (originator_node_id, originator_sequence_id)
);

-- Client queries
CREATE INDEX idx_gateway_envelopes_topic ON gateway_envelopes(topic);

-- Node queries
CREATE UNIQUE INDEX idx_gateway_envelopes_originator_sid ON gateway_envelopes(originator_node_id, originator_sequence_id);

CREATE FUNCTION insert_gateway_envelope(originator_node_id INT, originator_sequence_id BIGINT, topic BYTEA, originator_envelope BYTEA)
RETURNS SETOF gateway_envelopes
AS $$
BEGIN
-- Ensures that the generated sequence ID matches the insertion order
-- Only released at the end of the enclosing transaction - beware if called within a long transaction
PERFORM
pg_advisory_xact_lock(hashtext('gateway_envelopes_sequence'));
RETURN QUERY INSERT INTO gateway_envelopes(originator_node_id, originator_sequence_id, topic, originator_envelope)
VALUES(originator_node_id, originator_sequence_id, topic, originator_envelope)
ON CONFLICT
DO NOTHING
RETURNING
*;
END;
$$
LANGUAGE plpgsql;

-- Newly published envelopes will be queued here first (and assigned an originator
-- sequence ID), before being inserted in-order into the gateway_envelopes table.
CREATE TABLE staged_originator_envelopes(
Expand All @@ -55,6 +34,8 @@ CREATE FUNCTION insert_staged_originator_envelope(topic BYTEA, payer_envelope BY
RETURNS SETOF staged_originator_envelopes
AS $$
BEGIN
-- Ensures that the generated sequence ID matches the insertion order
-- Only released at the end of the enclosing transaction - beware if called within a long transaction
PERFORM
pg_advisory_xact_lock(hashtext('staged_originator_envelopes_sequence'));
RETURN QUERY INSERT INTO staged_originator_envelopes(topic, payer_envelope)
Expand Down

0 comments on commit 5a67cd9

Please sign in to comment.