Skip to content

Commit

Permalink
Added proto registry support
Browse files Browse the repository at this point in the history
  • Loading branch information
jubeless committed Dec 5, 2023
1 parent e851074 commit bf490db
Show file tree
Hide file tree
Showing 12 changed files with 334 additions and 85 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.idea
/build
/dist
/dist
.envrc
.env
12 changes: 9 additions & 3 deletions cmd/tools/firehose/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"github.com/spf13/cobra"
"github.com/streamingfast/cli/sflags"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/cmd/tools/print"
"github.com/streamingfast/firehose-core/types"
"github.com/streamingfast/jsonpb"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
"go.uber.org/zap"
)
Expand All @@ -24,7 +24,7 @@ func NewToolsFirehoseClientCmd[B firecore.Block](chain *firecore.Chain[B], logge

addFirehoseStreamClientFlagsToSet(cmd.Flags(), chain)

cmd.Flags().StringSlice("proto-paths", []string{"~/.proto"}, "Paths to proto files to use for dynamic decoding of blocks")
cmd.Flags().StringSlice("proto-paths", []string{""}, "Paths to proto files to use for dynamic decoding of blocks")
cmd.Flags().Bool("final-blocks-only", false, "Only ask for final blocks")
cmd.Flags().Bool("print-cursor-only", false, "Skip block decoding, only print the step cursor (useful for performance testing)")

Expand Down Expand Up @@ -89,6 +89,11 @@ func getFirehoseClientE[B firecore.Block](chain *firecore.Chain[B], rootLog *zap
}()
}

jencoder, err := print.SetupJsonEncoder(cmd)
if err != nil {
return fmt.Errorf("unable to create json encoder: %w", err)
}

for {
response, err := stream.Recv()
if err != nil {
Expand All @@ -110,10 +115,11 @@ func getFirehoseClientE[B firecore.Block](chain *firecore.Chain[B], rootLog *zap

// async process the response
go func() {
line, err := jsonpb.MarshalToString(response)
line, err := jencoder.MarshalToString(response)
if err != nil {
rootLog.Error("marshalling to string", zap.Error(err))
}

resp.ch <- line
}()
}
Expand Down
40 changes: 16 additions & 24 deletions cmd/tools/print/tools_print.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import (
"os"
"strconv"

"github.com/streamingfast/firehose-core/types"

"github.com/spf13/cobra"
"github.com/streamingfast/bstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
Expand All @@ -30,7 +28,7 @@ import (
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/jsonencoder"
"github.com/streamingfast/firehose-core/protoregistry"
"google.golang.org/protobuf/proto"
"github.com/streamingfast/firehose-core/types"
)

func NewToolsPrintCmd[B firecore.Block](chain *firecore.Chain[B]) *cobra.Command {
Expand All @@ -55,7 +53,7 @@ func NewToolsPrintCmd[B firecore.Block](chain *firecore.Chain[B]) *cobra.Command
toolsPrintCmd.AddCommand(toolsPrintMergedBlocksCmd)

toolsPrintCmd.PersistentFlags().StringP("output", "o", "text", "Output mode for block printing, either 'text', 'json' or 'jsonl'")
toolsPrintCmd.PersistentFlags().StringSlice("proto-paths", []string{"~/.proto"}, "Paths to proto files to use for dynamic decoding of blocks")
toolsPrintCmd.PersistentFlags().StringSlice("proto-paths", []string{""}, "Paths to proto files to use for dynamic decoding of blocks")
toolsPrintCmd.PersistentFlags().Bool("transactions", false, "When in 'text' output mode, also print transactions summary")

toolsPrintOneBlockCmd.RunE = createToolsPrintOneBlockE(chain)
Expand Down Expand Up @@ -101,7 +99,7 @@ func createToolsPrintMergedBlocksE[B firecore.Block](chain *firecore.Chain[B]) f
return err
}

jencoder, err := setupJsonEncoder(cmd)
jencoder, err := SetupJsonEncoder(cmd)
if err != nil {
return fmt.Errorf("unable to create json encoder: %w", err)
}
Expand Down Expand Up @@ -143,7 +141,7 @@ func createToolsPrintOneBlockE[B firecore.Block](chain *firecore.Chain[B]) firec

printTransactions := sflags.MustGetBool(cmd, "transactions")

jencoder, err := setupJsonEncoder(cmd)
jencoder, err := SetupJsonEncoder(cmd)
if err != nil {
return fmt.Errorf("unable to create json encoder: %w", err)
}
Expand Down Expand Up @@ -227,18 +225,12 @@ func displayBlock[B firecore.Block](pbBlock *pbbstream.Block, chain *firecore.Ch
return nil
}

isLegacyBlock := pbBlock.Payload == nil
if !chain.CoreBinaryEnabled {
// since we are running via the chain specific binary (i.e. fireeth) we can use a BlockFactory
marshallableBlock := chain.BlockFactory()
if isLegacyBlock {
if err := proto.Unmarshal(pbBlock.GetPayloadBuffer(), marshallableBlock); err != nil {
return fmt.Errorf("unmarshal legacy block payload to protocol block: %w", err)
}
} else {
if err := pbBlock.Payload.UnmarshalTo(marshallableBlock); err != nil {
return fmt.Errorf("pbBlock payload unmarshal: %w", err)
}

if err := pbBlock.Payload.UnmarshalTo(marshallableBlock); err != nil {
return fmt.Errorf("pbBlock payload unmarshal: %w", err)
}

err := encoder.Marshal(marshallableBlock)
Expand All @@ -247,12 +239,8 @@ func displayBlock[B firecore.Block](pbBlock *pbbstream.Block, chain *firecore.Ch
}
return nil
}
// since we are running directly the firecore binary we will *NOT* use the BlockFactory

if isLegacyBlock {
return encoder.MarshalLegacy(pbBlock.GetPayloadKind(), pbBlock.GetPayloadBuffer())
}

// since we are running directly the firecore binary we will *NOT* use the BlockFactory
return encoder.Marshal(pbBlock.Payload)
}

Expand All @@ -279,13 +267,17 @@ func PrintBStreamBlock(b *pbbstream.Block, printTransactions bool, out io.Writer
return nil
}

func setupJsonEncoder(cmd *cobra.Command) (*jsonencoder.Encoder, error) {
protoPaths := sflags.MustGetStringSlice(cmd, "proto-paths")
func SetupJsonEncoder(cmd *cobra.Command) (*jsonencoder.Encoder, error) {
pbregistry := protoregistry.New()
if err := pbregistry.RegisterFiles(protoPaths); err != nil {
return nil, fmt.Errorf("unable to create dynamic printer: %w", err)
protoPaths := sflags.MustGetStringSlice(cmd, "proto-paths")
if len(protoPaths) > 0 {
if err := pbregistry.RegisterFiles(protoPaths); err != nil {
return nil, fmt.Errorf("unable to create dynamic printer: %w", err)
}
}

pbregistry.Extends(protoregistry.WellKnownRegistry)

options := []jsonencoder.Option{
jsonencoder.WithBytesAsHex(),
}
Expand Down
8 changes: 6 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@ module github.com/streamingfast/firehose-core
go 1.21

require (
buf.build/gen/go/bufbuild/reflect/connectrpc/go v1.12.0-20230822193137-310c9c4845dd.1
buf.build/gen/go/bufbuild/reflect/protocolbuffers/go v1.31.0-20230822193137-310c9c4845dd.2
github.com/ShinyTrinkets/overseer v0.3.0
github.com/dustin/go-humanize v1.0.1
github.com/go-json-experiment/json v0.0.0-20231013223334-54c864be5b8d
github.com/iancoleman/strcase v0.2.0
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
github.com/mostynb/go-grpc-compression v1.1.17
github.com/prometheus/client_golang v1.16.0
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.15.0
github.com/streamingfast/bstream v0.0.2-0.20231205163051-ade2f311eca3
github.com/streamingfast/bstream v0.0.2-0.20231205185208-7e21cc7e64bc
github.com/streamingfast/cli v0.0.4-0.20230825151644-8cc84512cd80
github.com/streamingfast/dauth v0.0.0-20230929180355-921f9c9be330
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c
Expand All @@ -37,6 +40,7 @@ require (
)

require (
connectrpc.com/connect v1.12.0 // indirect
github.com/bufbuild/protocompile v0.4.0 // indirect
github.com/google/s2a-go v0.1.4 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 // indirect
Expand Down Expand Up @@ -71,7 +75,7 @@ require (
github.com/bits-and-blooms/bitset v1.3.1 // indirect
github.com/blendle/zapdriver v1.3.2-0.20200203083823-9200777f8a3d // indirect
github.com/bobg/go-generics/v2 v2.1.1 // indirect
github.com/bufbuild/connect-go v1.10.0 // indirect
github.com/bufbuild/connect-go v1.10.0
github.com/bufbuild/connect-grpchealth-go v1.1.1 // indirect
github.com/bufbuild/connect-grpcreflect-go v1.0.0 // indirect
github.com/bufbuild/connect-opentelemetry-go v0.3.0 // indirect
Expand Down
14 changes: 14 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
buf.build/gen/go/bufbuild/reflect/connectrpc/go v1.12.0-20230822193137-310c9c4845dd.1 h1:xnX/gxrGjg0FKB/YLHwCLRPdGoVOblm3s9MZa1oNFIw=
buf.build/gen/go/bufbuild/reflect/connectrpc/go v1.12.0-20230822193137-310c9c4845dd.1/go.mod h1:ru4ObfnijLo+YjfhJFd5Xjljz+d8M+QD+ZZLn4zz6lw=
buf.build/gen/go/bufbuild/reflect/protocolbuffers/go v1.31.0-20230822193137-310c9c4845dd.2 h1:RF8bm8mWobc2HVWCrr5PUlCQcpfsrzL/dcydKLmVC7Y=
buf.build/gen/go/bufbuild/reflect/protocolbuffers/go v1.31.0-20230822193137-310c9c4845dd.2/go.mod h1:3JED1QGgFgqC45IIPkydCq6dIcQKfG6/Ghf0RfKr2Ok=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand Down Expand Up @@ -66,6 +70,8 @@ cloud.google.com/go/storage v1.30.1/go.mod h1:NfxhC0UJE1aXSx7CIIbCf7y9HKT7Biccwk
cloud.google.com/go/trace v1.0.0/go.mod h1:4iErSByzxkyHWzzlAj63/Gmjz0NH1ASqhJguHpGcr6A=
cloud.google.com/go/trace v1.10.1 h1:EwGdOLCNfYOOPtgqo+D2sDLZmRCEO1AagRTJCU6ztdg=
cloud.google.com/go/trace v1.10.1/go.mod h1:gbtL94KE5AJLH3y+WVpfWILmqgc6dXcqgNXdOPAQTYk=
connectrpc.com/connect v1.12.0 h1:HwKdOY0lGhhoHdsza+hW55aqHEC64pYpObRNoAgn70g=
connectrpc.com/connect v1.12.0/go.mod h1:3AGaO6RRGMx5IKFfqbe3hvK1NqLosFNP2BxDYTPmNPo=
contrib.go.opencensus.io/exporter/stackdriver v0.12.6/go.mod h1:8x999/OcIPy5ivx/wDiV7Gx4D+VUPODf0mWRGRc5kSk=
contrib.go.opencensus.io/exporter/stackdriver v0.13.10 h1:a9+GZPUe+ONKUwULjlEOucMMG0qfSCCenlji0Nhqbys=
contrib.go.opencensus.io/exporter/stackdriver v0.13.10/go.mod h1:I5htMbyta491eUxufwwZPQdcKvvgzMB4O9ni41YnIM8=
Expand Down Expand Up @@ -383,6 +389,8 @@ github.com/hashicorp/golang-lru v0.5.3/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uG
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/iancoleman/strcase v0.2.0 h1:05I4QRnGpI0m37iZQRuskXh+w77mr6Z41lwQzuHLwW0=
github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
Expand Down Expand Up @@ -572,6 +580,12 @@ github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jH
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
github.com/streamingfast/bstream v0.0.2-0.20231205163051-ade2f311eca3 h1:u8orpRssS8rYceziOQ/mbBQHlYh5w06oOtTXK90/yMc=
github.com/streamingfast/bstream v0.0.2-0.20231205163051-ade2f311eca3/go.mod h1:08GVb+DXyz6jVNIsbf+2zlaC81UeEGu5o1h49KrSR3Y=
github.com/streamingfast/bstream v0.0.2-0.20231205174934-869fb7d64fd2 h1:TQPPxjBXflVinpSSbYaiMuNgw1HB1YnMcFiR52M8EVo=
github.com/streamingfast/bstream v0.0.2-0.20231205174934-869fb7d64fd2/go.mod h1:08GVb+DXyz6jVNIsbf+2zlaC81UeEGu5o1h49KrSR3Y=
github.com/streamingfast/bstream v0.0.2-0.20231205175345-609448673b00 h1:U/8aQZOpOzLTVcuEVdbEVffVu00ixotkTe8DRhEXxao=
github.com/streamingfast/bstream v0.0.2-0.20231205175345-609448673b00/go.mod h1:08GVb+DXyz6jVNIsbf+2zlaC81UeEGu5o1h49KrSR3Y=
github.com/streamingfast/bstream v0.0.2-0.20231205185208-7e21cc7e64bc h1:ioohiLa+d59fqToa2OhbUx418YMrqt2bLT+m+fmjOG8=
github.com/streamingfast/bstream v0.0.2-0.20231205185208-7e21cc7e64bc/go.mod h1:08GVb+DXyz6jVNIsbf+2zlaC81UeEGu5o1h49KrSR3Y=
github.com/streamingfast/cli v0.0.4-0.20230825151644-8cc84512cd80 h1:UxJUTcEVkdZy8N77E3exz0iNlgQuxl4m220GPvzdZ2s=
github.com/streamingfast/cli v0.0.4-0.20230825151644-8cc84512cd80/go.mod h1:QxjVH73Lkqk+mP8bndvhMuQDUINfkgsYhdCH/5TJFKI=
github.com/streamingfast/dauth v0.0.0-20230929180355-921f9c9be330 h1:49JYZkn8ALGe+LhcACZyX3L9B8tIxRZ3F3l+OxmNMhY=
Expand Down
25 changes: 11 additions & 14 deletions jsonencoder/encoder.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
package jsonencoder

import (
"fmt"
"bytes"
"os"

"github.com/go-json-experiment/json"
"github.com/go-json-experiment/json/jsontext"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/firehose-core/protoregistry"
)

type Encoder struct {
e *jsontext.Encoder
files *protoregistry.Files
marshallers []*json.Marshalers
protoRegistry *protoregistry.Registry
marshallers []*json.Marshalers
}

func New(files *protoregistry.Files, opts ...Option) *Encoder {
func New(files *protoregistry.Registry, opts ...Option) *Encoder {
e := &Encoder{
e: jsontext.NewEncoder(os.Stdout),
files: files,
protoRegistry: files,
}

e.marshallers = []*json.Marshalers{
Expand All @@ -33,14 +30,14 @@ func New(files *protoregistry.Files, opts ...Option) *Encoder {
}

func (e *Encoder) Marshal(in any) error {
return json.MarshalEncode(e.e, in, json.WithMarshalers(json.NewMarshalers(e.marshallers...)))
return json.MarshalEncode(jsontext.NewEncoder(os.Stdout), in, json.WithMarshalers(json.NewMarshalers(e.marshallers...)))
}

func (e *Encoder) MarshalLegacy(protocol pbbstream.Protocol, value []byte) error {
msg, err := e.files.UnmarshallLegacy(protocol, value)
if err != nil {
return fmt.Errorf("unmarshalling proto any: %w", err)
func (e *Encoder) MarshalToString(in any) (string, error) {
buf := bytes.NewBuffer(nil)
if err := json.MarshalEncode(jsontext.NewEncoder(buf), in, json.WithMarshalers(json.NewMarshalers(e.marshallers...))); err != nil {
return "", err
}
return buf.String(), nil

return json.MarshalEncode(e.e, msg, json.WithMarshalers(json.NewMarshalers(e.marshallers...)))
}
2 changes: 1 addition & 1 deletion jsonencoder/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func (e *Encoder) protoAny(encoder *jsontext.Encoder, t *anypb.Any, options json.Options) error {
msg, err := e.files.Unmarshall(t.TypeUrl, t.Value)
msg, err := e.protoRegistry.Unmarshall(t.TypeUrl, t.Value)
if err != nil {
return fmt.Errorf("unmarshalling proto any: %w", err)
}
Expand Down
Loading

0 comments on commit bf490db

Please sign in to comment.