Skip to content

Commit

Permalink
fix fetching of transactions with a hard limit to 200, adding strict …
Browse files Browse the repository at this point in the history
…decoder for results when fetching the latest ledger, the ledgers and the transactions
  • Loading branch information
Eduard-Voiculescu committed Jan 23, 2025
1 parent 23f4caf commit 8f7f7c2
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 64 deletions.
2 changes: 1 addition & 1 deletion cmd/firestellar/tool_fix_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func toolFixBlockRunE(cmd *cobra.Command, args []string) error {

convertedTransactions, err := convertOldTransactions(stellarBlock.Transactions)
if err != nil {
return fmt.Errorf("converting old transactions: %w", err)
return fmt.Errorf("converting old transactions at %d: %w", currentBlock.Number, err)
}

fixedStellarBlock := &pbstellarv1.Block{
Expand Down
75 changes: 12 additions & 63 deletions rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ func (c *Client) GetLatestLedger() (*types.GetLatestLedgerResult, error) {
}

var response types.GetLatestLedgerResponse
err = json.Unmarshal(body, &response)
decoder := json.NewDecoder(bytes.NewBuffer(body))
decoder.DisallowUnknownFields() // Fail on unknown fields
err = decoder.Decode(&response)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal JSON: %w", err)
return nil, fmt.Errorf("original body: %s failed to unmarshal JSON: %w", string(body), err)
}

return &response.Result, nil
Expand All @@ -62,9 +64,11 @@ func (c *Client) GetLedgers(startLedgerNum uint64) ([]types.Ledger, error) {
}

var response types.GetLedgersResponse
err = json.Unmarshal(body, &response)
decoder := json.NewDecoder(bytes.NewBuffer(body))
decoder.DisallowUnknownFields() // Fail on unknown fields
err = decoder.Decode(&response)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal JSON: %w", err)
return nil, fmt.Errorf("original body: %s failed to unmarshal JSON: %w", string(body), err)
}

return response.Result.Ledgers, nil
Expand Down Expand Up @@ -116,72 +120,17 @@ func (c *Client) getTransactions(ledgerNum uint64, limit int, cursor string) (st
}

var transactions types.GetTransactionResponse
err = json.Unmarshal(body, &transactions)
decoder := json.NewDecoder(bytes.NewBuffer(body))
decoder.DisallowUnknownFields() // Fail on unknown fields
err = decoder.Decode(&transactions)
if err != nil {
return cursor, nil, fmt.Errorf("failed to unmarshal JSON: %w", err)
return cursor, nil, fmt.Errorf("original body: %s failed to unmarshal JSON: %w", string(body), err)
}

cursor = transactions.Result.Cursor
return cursor, transactions.Result.Transactions, nil
}

// TODO: handle the cursor in the same way as the cursor in GetTransactions
// need the feedback from Syd on the events
// so the Stellar team actually uses the ResultMetaXdr from the transaction to get the events
// so this means that we will probably remove the getEvents call and use the ResultMetaXdr instead

// GetEvents returns the events for a given ledger
func (c *Client) GetEvents(ledgerNum uint64, limit int, lastCursor string) (string, []types.Event, error) {
events := make([]types.Event, 0)

for {
currentCursor, fetchedEvents, err := c.getEvents(ledgerNum, limit, lastCursor)
if err != nil {
return lastCursor, nil, fmt.Errorf("failed to get events: %w", err)
}

allEventsFetched := len(fetchedEvents) == 0 || currentCursor == ""

for _, f := range fetchedEvents {
if f.Ledger != ledgerNum {
allEventsFetched = true
break
}
events = append(events, f)
}

if allEventsFetched {
break
}
lastCursor = currentCursor
}

return lastCursor, events, nil
}

func (c *Client) getEvents(ledgerNum uint64, limit int, cursor string) (string, []types.Event, error) {
payload := types.NewEventsRequest(ledgerNum, types.NewPagination(limit, cursor))

rpcBody, err := json.Marshal(payload)
if err != nil {
return cursor, nil, fmt.Errorf("failed to marshal JSON: %w", err)
}

body, err := c.makeRquest(rpcBody)
if err != nil {
return cursor, nil, fmt.Errorf("failed to get events: %w", err)
}

var response types.GetEventsResponse
err = json.Unmarshal(body, &response)
if err != nil {
return cursor, nil, fmt.Errorf("failed to unmarshal JSON: %w", err)
}

cursor = response.Result.Cursor
return cursor, response.Result.Events, nil
}

func (c *Client) makeRquest(reqBody []byte) ([]byte, error) {
req, err := http.NewRequest("POST", c.rpcEndpoint, bytes.NewBuffer(reqBody))
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions rpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,11 @@ func Test_GetTransactions(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, transactions)
}

func Test_GetTransactionsWithLimitTooHigh(t *testing.T) {
c := NewClient("https://mainnet.sorobanrpc.com", nil)
ledger, err := c.GetLatestLedger()
require.NoError(t, err)
_, err = c.GetTransactions(uint64(ledger.Sequence), 2000, "")
require.Error(t, err)
}
5 changes: 5 additions & 0 deletions rpc/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ func (f *Fetcher) Fetch(ctx context.Context, client *Client, requestBlockNum uin

numOfTransactions := len(ledgerMetadata.V1.TxProcessing)
f.logger.Debug("fetching transactions", zap.Uint64("block_num", requestBlockNum), zap.Int("num_of_transactions", numOfTransactions))
if numOfTransactions > 200 {
// There is a hard limit on the number of transactions
// to fetch. The RPC providers tipically set the maximum limit to 200.
numOfTransactions = 200
}
transactions, err := client.GetTransactions(requestBlockNum, numOfTransactions, f.lastBlockInfo.cursor)
if err != nil {
return nil, false, fmt.Errorf("fetching transactions: %w", err)
Expand Down

0 comments on commit 8f7f7c2

Please sign in to comment.