Skip to content

Commit

Permalink
feat: parallel execution and tx receipts
Browse files Browse the repository at this point in the history
  • Loading branch information
praetoriansentry committed Oct 18, 2022
1 parent 7510761 commit 4662edd
Showing 1 changed file with 150 additions and 19 deletions.
169 changes: 150 additions & 19 deletions cmd/dumpblocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"net/url"
"strconv"
"sync"
"time"

ethrpc "github.com/ethereum/go-ethereum/rpc"
Expand All @@ -35,6 +36,7 @@ type (
Start int64
End int64
BatchSize uint64
Threads *uint
}
)

Expand All @@ -51,37 +53,76 @@ var dumpblocksCmd = &cobra.Command{
if err != nil {
return err
}
if *inputDumpblocks.Threads == 0 {
*inputDumpblocks.Threads = 1
}

var wg sync.WaitGroup
log.Info().Uint("thread", *inputDumpblocks.Threads).Msg("thread count")
var pool = make(chan bool, *inputDumpblocks.Threads)
// TODO Support parallel execution
// TODO Support retries when there is a failure
start := inputDumpblocks.Start
end := inputDumpblocks.End
failCount := 0

for start < end {
rangeStart := start
rangeEnd := rangeStart + int64(inputDumpblocks.BatchSize)
log.Info().Int64("start", rangeStart).Int64("end", rangeEnd).Msg("getting range")

if rangeEnd > end {
rangeEnd = end
}
blocks, err := getBlockRange(ctx, rangeStart, rangeEnd, ec, inputDumpblocks.URL)
if err != nil {
failCount = failCount + 1
if failCount > 5 {
return fmt.Errorf("Failed to get blockrange(%d - %d) after %d attempts", rangeStart, rangeEnd, failCount)

pool <- true
wg.Add(1)
log.Info().Int64("start", rangeStart).Int64("end", rangeEnd).Msg("getting range")
go func() {
defer wg.Done()
for {
failCount := 0
blocks, err := getBlockRange(ctx, rangeStart, rangeEnd, ec, inputDumpblocks.URL)
if err != nil {
failCount = failCount + 1
if failCount > 5 {
log.Error().Int64("rangeStart", rangeStart).Int64("rangeEnd", rangeEnd).Msg("unable to fetch blocks")
break
}
time.Sleep(5 * time.Second)
continue
}

failCount = 0
receipts, err := getReceipts(ctx, blocks, ec, inputDumpblocks.URL)
if err != nil {
failCount = failCount + 1
if failCount > 5 {
log.Error().Int64("rangeStart", rangeStart).Int64("rangeEnd", rangeEnd).Msg("unable to fetch receipts")
break
}
time.Sleep(5 * time.Second)
continue
}

err = writeResponses(blocks)
if err != nil {
log.Error().Err(err).Msg("error writing blocks")
}
err = writeResponses(receipts)
if err != nil {
log.Error().Err(err).Msg("error writing receipts")
}

break
}
time.Sleep(5 * time.Second)
continue
}
err = writeBlocks(blocks)
if err != nil {
return err
}
failCount = 0
<-pool
}()
start = rangeEnd

}

log.Info().Msg("finished requesting data starting to wait")
wg.Wait()
log.Info().Msg("done")

return nil
},
Args: func(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -110,14 +151,18 @@ var dumpblocksCmd = &cobra.Command{
inputDumpblocks.URL = args[0]
inputDumpblocks.Start = start
inputDumpblocks.End = end
inputDumpblocks.BatchSize = 500
// realistically, this probably shoudln't be bigger than 999. Most Providers seem to cap at 1000
inputDumpblocks.BatchSize = 150

return nil
},
}

func init() {
rootCmd.AddCommand(dumpblocksCmd)

inputDumpblocks.Threads = dumpblocksCmd.PersistentFlags().UintP("concurrency", "c", 1, "how many go routines to leverage")

}

func getBlockRange(ctx context.Context, from, to int64, c *ethrpc.Client, url string) ([]*json.RawMessage, error) {
Expand All @@ -134,21 +179,107 @@ func getBlockRange(ctx context.Context, from, to int64, c *ethrpc.Client, url st
}
err := c.BatchCallContext(ctx, blms)
if err != nil {
log.Error().Err(err).Msg("rpc issue fetching blocks")
return nil, err
}
blocks := make([]*json.RawMessage, 0)

for _, b := range blms {
if b.Error != nil {
return nil, err
return nil, b.Error
}
blocks = append(blocks, b.Result.(*json.RawMessage))

}
return blocks, nil
}

func writeBlocks(blocks []*json.RawMessage) error {
type (
simpleRPCTransaction struct {
Hash string `json:"hash"`
}
simpleRPCBlock struct {
Transactions []simpleRPCTransaction `json:"transactions"`
}
)

func getReceipts(ctx context.Context, rawBlocks []*json.RawMessage, c *ethrpc.Client, url string) ([]*json.RawMessage, error) {
txHashes := make([]string, 0)
for _, rb := range rawBlocks {
var sb simpleRPCBlock
err := json.Unmarshal(*rb, &sb)
if err != nil {
return nil, err

}
for _, tx := range sb.Transactions {
txHashes = append(txHashes, tx.Hash)
}

}
if len(txHashes) == 0 {
return nil, nil
}

blms := make([]ethrpc.BatchElem, 0)
for _, tx := range txHashes {
r := new(json.RawMessage)
var err error
blms = append(blms, ethrpc.BatchElem{
Method: "eth_getTransactionReceipt",
Args: []interface{}{tx},
Result: r,
Error: err,
})
}

var start uint64 = 0
for {
last := false
end := start + inputDumpblocks.BatchSize
if int(end) >= len(blms) {
last = true
end = uint64(len(blms) - 1)
}

// json: cannot unmarshal object into Go value of type []rpc.jsonrpcMessage
// The error occurs when we call batchcallcontext with a single transaction for some reason.
// polycli dumpblocks -c 1 http://127.0.0.1:9209/ 34457958 34458108
// To handle this i'm making an exception when start and end are equal to make a single call
if start == end {
err := c.CallContext(ctx, &blms[start].Result, "eth_getTransactionReceipt", blms[start].Args[0])
if err != nil {
log.Error().Err(err).Uint64("start", start).Uint64("end", end).Msg("rpc issue fetching single receipt")
return nil, err
}
break
}

err := c.BatchCallContext(ctx, blms[start:end])
if err != nil {
log.Error().Err(err).Str("randtx", txHashes[0]).Uint64("start", start).Uint64("end", end).Msg("rpc issue fetching receipts")
return nil, err
}
start = end
if last {
break
}
}

receipts := make([]*json.RawMessage, 0)

for _, b := range blms {
if b.Error != nil {
log.Error().Err(b.Error).Msg("block resp err")
return nil, b.Error
}
receipts = append(receipts, b.Result.(*json.RawMessage))
}
log.Info().Int("hashes", len(txHashes)).Int("receipts", len(receipts)).Msg("fetched tx receipts")
return receipts, nil
}

func writeResponses(blocks []*json.RawMessage) error {
for _, b := range blocks {
fmt.Println(string(*b))
}
Expand Down

0 comments on commit 4662edd

Please sign in to comment.