Skip to content

Commit

Permalink
Read your own group writes (#415)
Browse files Browse the repository at this point in the history
This does not solve all possible cases of read-your-own-writes, but it
makes things a little bit easier.

This guarantees that when a node confirms a payer write, future reads
will return the data.

This does not guarantee:
- the same for blockchain writes
- a different node will have the data

Relates to #358
  • Loading branch information
mkysel authored Jan 15, 2025
1 parent 93e176c commit 5a22840
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 13 deletions.
17 changes: 10 additions & 7 deletions pkg/api/message/publishWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package message
import (
"context"
"database/sql"
"sync/atomic"
"time"

"github.com/xmtp/xmtpd/pkg/db"
Expand All @@ -13,13 +14,14 @@ import (
)

type publishWorker struct {
ctx context.Context
log *zap.Logger
listener <-chan []queries.StagedOriginatorEnvelope
notifier chan<- bool
registrant *registrant.Registrant
store *sql.DB
subscription db.DBSubscription[queries.StagedOriginatorEnvelope, int64]
ctx context.Context
log *zap.Logger
listener <-chan []queries.StagedOriginatorEnvelope
notifier chan<- bool
registrant *registrant.Registrant
store *sql.DB
subscription db.DBSubscription[queries.StagedOriginatorEnvelope, int64]
lastProcessed atomic.Int64
}

func startPublishWorker(
Expand Down Expand Up @@ -92,6 +94,7 @@ func (p *publishWorker) start() {
// continue to the next envelope until this one is processed
time.Sleep(time.Second)
}
p.lastProcessed.Store(stagedEnv.ID)
}
}
}
Expand Down
34 changes: 34 additions & 0 deletions pkg/api/message/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,8 @@ func (s *Service) PublishPayerEnvelopes(
return nil, status.Errorf(codes.Internal, "could not sign envelope: %v", err)
}

s.waitForGatewayPublish(ctx, stagedEnv)

return &message_api.PublishPayerEnvelopesResponse{
OriginatorEnvelopes: []*envelopesProto.OriginatorEnvelope{originatorEnv},
}, nil
Expand Down Expand Up @@ -452,3 +454,35 @@ func (s *Service) validateClientInfo(clientEnv *envelopes.ClientEnvelope) error

return nil
}

func (s *Service) waitForGatewayPublish(
ctx context.Context,
stagedEnv queries.StagedOriginatorEnvelope,
) {
startTime := time.Now()
timeout := time.After(30 * time.Second)

select {
case <-timeout:
s.log.Warn("Timeout waiting for publisher",
zap.Int64("envelope_id", stagedEnv.ID),
zap.Int64("last_processed", s.publishWorker.lastProcessed.Load()))
return
case <-ctx.Done():
s.log.Warn("Context cancelled while waiting for publisher",
zap.Int64("envelope_id", stagedEnv.ID),
zap.Int64("last_processed", s.publishWorker.lastProcessed.Load()))
return
default:
// Check if the last processed ID has reached or exceeded the current ID
if s.publishWorker.lastProcessed.Load() >= stagedEnv.ID {
s.log.Debug(
"Finished waiting for publisher",
zap.Int64("envelope_id", stagedEnv.ID),
zap.Int64("wait_time", time.Since(startTime).Milliseconds()),
)
return
}
time.Sleep(10 * time.Millisecond)
}
}
95 changes: 89 additions & 6 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"database/sql"
"encoding/hex"
"fmt"
"net"
"testing"
"time"

Expand All @@ -28,8 +29,19 @@ import (

const server1NodeID = uint32(100)
const server2NodeID = uint32(200)
const server1Port = 1111
const server2Port = 2222

func getNextOpenPort() (int, error) {
// Listen on a random available port
listener, err := net.Listen("tcp", "localhost:0")
if err != nil {
return 0, fmt.Errorf("could not find open port: %w", err)
}
defer listener.Close()

// Extract the port number from the listener
addr := listener.Addr().(*net.TCPAddr)
return addr.Port, nil
}

func NewTestServer(
t *testing.T,
Expand Down Expand Up @@ -61,10 +73,10 @@ func NewTestServer(
Replication: config.ReplicationOptions{
Enable: true,
},
//TODO(mkysel): this is not fully mocked yet
//Payer: config.PayerOptions{
// Enable: true,
//},
Payer: config.PayerOptions{
Enable: true,
PrivateKey: hex.EncodeToString(crypto.FromECDSA(privateKey)),
},
}, registry, db, messagePublisher, fmt.Sprintf("localhost:%d", port), nil)
require.NoError(t, err)

Expand All @@ -81,6 +93,11 @@ func TestCreateServer(t *testing.T) {
privateKey2, err := crypto.GenerateKey()
require.NoError(t, err)

server1Port, err := getNextOpenPort()
require.NoError(t, err)
server2Port, err := getNextOpenPort()
require.NoError(t, err)

nodes := []r.Node{
{
NodeID: server1NodeID,
Expand Down Expand Up @@ -194,3 +211,69 @@ func TestCreateServer(t *testing.T) {
return true
}, 3000*time.Millisecond, 200*time.Millisecond)
}

func TestReadOwnWritesGuarantee(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dbs, dbCleanup := testutils.NewDBs(t, ctx, 1)
defer dbCleanup()
privateKey1, err := crypto.GenerateKey()
require.NoError(t, err)
server1Port, err := getNextOpenPort()
require.NoError(t, err)

nodes := []r.Node{
{
NodeID: server1NodeID,
SigningKey: &privateKey1.PublicKey,
HttpAddress: fmt.Sprintf("http://localhost:%d", server1Port),
IsHealthy: true,
IsValidConfig: true,
}}

registry := mocks.NewMockNodeRegistry(t)
registry.On("GetNodes").Return(nodes, nil)

nodesChan := make(chan []r.Node)
cancelOnNewFunc := func() {
close(nodesChan)
}
registry.On("OnNewNodes").
Return((<-chan []r.Node)(nodesChan), r.CancelSubscription(cancelOnNewFunc))

server1 := NewTestServer(t, server1Port, dbs[0], registry, privateKey1)

client1, cleanup1 := apiTestUtils.NewReplicationAPIClient(t, ctx, server1.Addr().String())
defer cleanup1()

targetTopic := topic.NewTopic(topic.TOPIC_KIND_GROUP_MESSAGES_V1, []byte{1, 2, 3}).
Bytes()

_, err = client1.PublishPayerEnvelopes(
ctx,
&message_api.PublishPayerEnvelopesRequest{
PayerEnvelopes: []*envelopes.PayerEnvelope{envelopeTestUtils.CreatePayerEnvelope(
t,
envelopeTestUtils.CreateClientEnvelope(&envelopes.AuthenticatedData{
TargetOriginator: server1NodeID,
TargetTopic: targetTopic,
LastSeen: &envelopes.Cursor{},
}),
)},
},
)
require.NoError(t, err)

// query the same server immediately after writing
// the server should return the write on the first attempt

q1, err := client1.QueryEnvelopes(ctx, &message_api.QueryEnvelopesRequest{
Query: &message_api.EnvelopesQuery{
OriginatorNodeIds: []uint32{server1NodeID},
LastSeen: &envelopes.Cursor{},
},
Limit: 10,
})
require.NoError(t, err)
require.Len(t, q1.Envelopes, 1)
}

0 comments on commit 5a22840

Please sign in to comment.