Skip to content

Commit

Permalink
add output module hash to incoming-request log on tier1
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Mar 1, 2024
1 parent 340f069 commit 47ddf07
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
9 changes: 6 additions & 3 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased
## v1.3.6

This release brings important server-side improvements regarding performance, especially while processing over historical blocks in production-mode.

### Backend (through firehose-core)

* Performance: prevent reprocessing jobs when there is only a mapper in production mode and everything is already cached
* Performance: prevent "UpdateStats" from running too often and affecting performance
* Performance: prevent "UpdateStats" from running too often and stalling other operations when running with a high parallel jobs count
* Performance: fixed bug in scheduler ramp-up function sometimes waiting before raising the number of workers
* Added support for authentication using api keys. The env variable can be specified with `--substreams-api-key-envvar` and defaults to `SUBSTREAMS_API_KEY`.
* Fixed bug in scheduler ramp-up function sometimes waiting before raising the number of workers
* Added the output module's hash to the "incoming request"
* Added `trace_id` in grpc authentication calls
* Bumped connect-go library to new "connectrpc.com/connect" location

Expand Down
23 changes: 13 additions & 10 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,17 @@ func (s *Tier1Service) Blocks(
if request.Modules == nil {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("missing modules in request"))
}

if err := outputmodules.ValidateTier1Request(request, s.blockType); err != nil {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err))
}

outputGraph, err := outputmodules.NewOutputModuleGraph(request.OutputModule, request.ProductionMode, request.Modules)
if err != nil {
return bsstream.NewErrInvalidArg(err.Error())
}
outputModuleHash := outputGraph.ModuleHashes().Get(request.OutputModule)

moduleNames := make([]string, len(request.Modules.Modules))
for i := 0; i < len(moduleNames); i++ {
moduleNames[i] = request.Modules.Modules[i].Name
Expand All @@ -209,6 +220,7 @@ func (s *Tier1Service) Blocks(
zap.String("cursor", request.StartCursor),
zap.Strings("modules", moduleNames),
zap.String("output_module", request.OutputModule),
zap.String("output_module_hash", outputModuleHash),
}
fields = append(fields, zap.Bool("production_mode", request.ProductionMode))
if auth := dauth.FromContext(ctx); auth != nil {
Expand All @@ -229,17 +241,8 @@ func (s *Tier1Service) Blocks(
metrics.ActiveSubstreams.Inc()
defer metrics.ActiveSubstreams.Dec()

if err := outputmodules.ValidateTier1Request(request, s.blockType); err != nil {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("validate request: %w", err))
}

outputGraph, err := outputmodules.NewOutputModuleGraph(request.OutputModule, request.ProductionMode, request.Modules)
if err != nil {
return bsstream.NewErrInvalidArg(err.Error())
}

requestID := fmt.Sprintf("%s:%d:%d:%s:%t:%t:%s",
outputGraph.ModuleHashes().Get(request.OutputModule),
outputModuleHash,
request.StartBlockNum,
request.StopBlockNum,
request.StartCursor,
Expand Down

0 comments on commit 47ddf07

Please sign in to comment.