Skip to content

Commit

Permalink
Initial addition of working block-meta app, not supported nor docum…
Browse files Browse the repository at this point in the history
…ented for now
  • Loading branch information
maoueh committed Jan 26, 2024
1 parent 04dcb69 commit 7115763
Show file tree
Hide file tree
Showing 28 changed files with 923 additions and 162 deletions.
82 changes: 82 additions & 0 deletions block-meta/app/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package app

import (
"context"
"fmt"

"github.com/streamingfast/dmetrics"
"github.com/streamingfast/dstore"
blockmeta "github.com/streamingfast/firehose-core/block-meta"
"github.com/streamingfast/firehose-core/block-meta/metrics"
"github.com/streamingfast/logging"
"github.com/streamingfast/shutter"
"go.uber.org/zap"
)

type Config struct {
StartBlockResolver func(ctx context.Context) (uint64, error)
EndBlock uint64
MergedBlocksStoreURL string
BlockMetaStoreURL string
GRPCListenAddr string
}

type App struct {
*shutter.Shutter
config *Config
logger *zap.Logger
tracer logging.Tracer
}

func New(config *Config, logger *zap.Logger, tracer logging.Tracer) *App {
return &App{
Shutter: shutter.New(),
config: config,
logger: logger,
tracer: tracer,
}
}

func (a *App) Run() error {
mergedBlocksStore, err := dstore.NewDBinStore(a.config.MergedBlocksStoreURL)
if err != nil {
return err
}

ctx, cancel := context.WithCancel(context.Background())
a.OnTerminating(func(error) {
cancel()
})

startBlock, err := a.config.StartBlockResolver(ctx)
if err != nil {
return err
}

store, err := blockmeta.NewStore(a.config.BlockMetaStoreURL, a.logger, a.tracer)
if err != nil {
return fmt.Errorf("unable to create block meta store: %w", err)
}

indexer := blockmeta.NewIndexer(
zlog,
a.config.GRPCListenAddr,
startBlock,
a.config.EndBlock,
mergedBlocksStore,
store,
)

dmetrics.Register(metrics.MetricSet)

a.OnTerminating(indexer.Shutdown)
indexer.OnTerminated(a.Shutdown)

go indexer.Launch()

zlog.Info("block meta indexer running",
zap.Uint64("start_block", startBlock),
zap.Uint64("end_block", a.config.EndBlock),
)
return nil
}
7 changes: 7 additions & 0 deletions block-meta/app/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package app

import (
"github.com/streamingfast/logging"
)

var zlog, tracer = logging.PackageLogger("block-meta", "github.com/streamingfast/firehose-core/block-meta/app")
5 changes: 5 additions & 0 deletions block-meta/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package blockmeta

import "errors"

var ErrBlockNotFound = errors.New("block not found")
28 changes: 28 additions & 0 deletions block-meta/healthz.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package blockmeta

import (
"context"

pbhealth "google.golang.org/grpc/health/grpc_health_v1"
)

// Check is basic GRPC Healthcheck
func (app *Indexer) Check(ctx context.Context, in *pbhealth.HealthCheckRequest) (*pbhealth.HealthCheckResponse, error) {
status := pbhealth.HealthCheckResponse_SERVING
return &pbhealth.HealthCheckResponse{
Status: status,
}, nil
}

// Watch is basic GRPC Healthcheck as a stream
func (app *Indexer) Watch(req *pbhealth.HealthCheckRequest, stream pbhealth.Health_WatchServer) error {
err := stream.Send(&pbhealth.HealthCheckResponse{
Status: pbhealth.HealthCheckResponse_SERVING,
})
if err != nil {
return err
}

<-stream.Context().Done()
return nil
}
135 changes: 135 additions & 0 deletions block-meta/indexer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package blockmeta

import (
"context"
"errors"
"fmt"
"net/http"
"time"

"github.com/streamingfast/bstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/bstream/stream"
"github.com/streamingfast/dgrpc/server"
"github.com/streamingfast/dgrpc/server/factory"
"github.com/streamingfast/dstore"
firecore "github.com/streamingfast/firehose-core"
"github.com/streamingfast/firehose-core/block-meta/metrics"
pbfirehose "github.com/streamingfast/pbgo/sf/firehose/v2"
"github.com/streamingfast/shutter"
"go.uber.org/zap"
)

type Indexer struct {
*shutter.Shutter
logger *zap.Logger

grpcListenAddr string
startBlockNum uint64
stopBlockNum uint64
mergedBlocksStore dstore.Store
blockMetaStore *Store

seenFirstBlock bool
}

func NewIndexer(logger *zap.Logger, grpcListenAddr string, startBlockNum, stopBlockNum uint64, mergedBlocksStore dstore.Store, blockMetaStore *Store) *Indexer {
return &Indexer{
Shutter: shutter.New(),

grpcListenAddr: grpcListenAddr,
startBlockNum: startBlockNum,
stopBlockNum: stopBlockNum,
mergedBlocksStore: mergedBlocksStore,
blockMetaStore: blockMetaStore,
logger: logger,
}
}

func (app *Indexer) Launch() {
server := factory.ServerFromOptions(
server.WithPlainTextServer(),
server.WithLogger(app.logger),
server.WithHealthCheck(server.HealthCheckOverHTTP|server.HealthCheckOverGRPC, app.healthCheck),
)

app.OnTerminating(func(_ error) {
server.Shutdown(5 * time.Second)
})

server.OnTerminated(func(err error) {
if err != nil && !errors.Is(err, http.ErrServerClosed) {
app.logger.Error("gRPC server unexpected failure", zap.Error(err))
}
app.Shutdown(err)
})

go server.Launch(app.grpcListenAddr)

// Blocking call of the indexer
err := app.launch()
if errors.Is(err, stream.ErrStopBlockReached) {
app.logger.Info("block meta reached stop block", zap.Uint64("stop_block_num", app.stopBlockNum))
err = nil
}

app.logger.Info("block meta exited", zap.Error(err))
app.Shutdown(err)
}

func (app *Indexer) launch() error {
startBlockNum := app.startBlockNum
stopBlockNum := app.stopBlockNum

streamFactory := firecore.NewStreamFactory(
app.mergedBlocksStore,
nil,
nil,
nil,
)
ctx := context.Background()

req := &pbfirehose.Request{
StartBlockNum: int64(startBlockNum),
StopBlockNum: stopBlockNum,
FinalBlocksOnly: true,
}

handlerFunc := func(block *pbbstream.Block, _ interface{}) error {
app.logger.Debug("handling block", zap.Uint64("block_num", block.Number))
app.seenFirstBlock = true

metrics.HeadBlockNumber.SetUint64(block.Number)
metrics.HeadBlockTimeDrift.SetBlockTime(block.Time())
metrics.AppReadiness.SetReady()

app.logger.Debug("updated head block metrics", zap.Uint64("block_num", block.Number), zap.Time("block_time", block.Time()))

writeCtx, cancelWrite := context.WithTimeout(ctx, 15*time.Second)
defer cancelWrite()

err := app.blockMetaStore.WriteBlockMeta(writeCtx, block.ToBlocKMeta())
if err != nil {
return fmt.Errorf("writing block meta: %w", err)
}

return nil
}

stream, err := streamFactory.New(
ctx,
bstream.HandlerFunc(handlerFunc),
req,
app.logger,
)

if err != nil {
return fmt.Errorf("getting firehose stream: %w", err)
}

return stream.Run(ctx)
}

func (app *Indexer) healthCheck(ctx context.Context) (isReady bool, out interface{}, err error) {
return app.seenFirstBlock, nil, nil
}
9 changes: 9 additions & 0 deletions block-meta/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package metrics

import "github.com/streamingfast/dmetrics"

var MetricSet = dmetrics.NewSet()

var HeadBlockTimeDrift = MetricSet.NewHeadTimeDrift("block-meta")
var HeadBlockNumber = MetricSet.NewHeadBlockNumber("block-meta")
var AppReadiness = MetricSet.NewAppReadiness("block-meta")
23 changes: 23 additions & 0 deletions block-meta/start_block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package blockmeta

import (
"context"
"fmt"

"github.com/streamingfast/logging"
"go.uber.org/zap"
)

func GetStartBlock(ctx context.Context, blockMetaStoreURL string, logger *zap.Logger, tracer logging.Tracer) (uint64, error) {
store, err := NewStore(blockMetaStoreURL, logger, tracer)
if err != nil {
return 0, fmt.Errorf("unable to create block meta store: %w", err)
}

startBlock, err := store.GetLastWrittenBlockNum(ctx)
if err != nil {
return 0, fmt.Errorf("unable to get start block from store: %w", err)
}

return startBlock, nil
}
Loading

0 comments on commit 7115763

Please sign in to comment.