-
Notifications
You must be signed in to change notification settings - Fork 52
/
Copy pathtesting.go
70 lines (58 loc) · 2.27 KB
/
testing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package service
import (
"context"
"fmt"
"github.com/streamingfast/bstream"
"github.com/streamingfast/substreams/wasm"
"github.com/streamingfast/substreams/reqctx"
"github.com/streamingfast/bstream/stream"
"github.com/streamingfast/substreams"
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2"
"github.com/streamingfast/substreams/pipeline/exec"
"github.com/streamingfast/substreams/service/config"
)
func TestNewService(runtimeConfig config.RuntimeConfig, linearHandoffBlockNum uint64, streamFactoryFunc StreamFactoryFunc) *Tier1Service {
return &Tier1Service{
blockType: "sf.substreams.v1.test.Block",
streamFactoryFunc: streamFactoryFunc,
runtimeConfig: runtimeConfig,
getRecentFinalBlock: func() (uint64, error) {
if linearHandoffBlockNum != 0 {
return linearHandoffBlockNum, nil
}
return 0, fmt.Errorf("no live feed")
},
tracer: nil,
logger: zlog,
}
}
func (s *Tier1Service) TestBlocks(ctx context.Context, isSubRequest bool, request *pbsubstreamsrpc.Request, respFunc substreams.ResponseFunc) error {
execGraph, err := exec.NewOutputModuleGraph(request.OutputModule, request.ProductionMode, request.Modules, bstream.GetProtocolFirstStreamableBlock)
if err != nil {
return stream.NewErrInvalidArg(err.Error())
}
s.checkPendingShutdown = func() bool {
return false
}
return s.blocks(ctx, request, execGraph, respFunc)
}
func TestNewServiceTier2(moduleExecutionTracing bool, streamFactoryFunc StreamFactoryFunc) *Tier2Service {
return &Tier2Service{
moduleExecutionTracing: moduleExecutionTracing,
tracer: nil,
logger: zlog,
streamFactoryFuncOverride: streamFactoryFunc,
}
}
func (s *Tier2Service) TestProcessRange(ctx context.Context, request *pbssinternal.ProcessRangeRequest, respFunc substreams.ResponseFunc) error {
tier2req, ok := reqctx.GetTier2RequestParameters(ctx)
if !ok {
return fmt.Errorf("missing tier2 request parameters")
}
s.tier2RequestParameters = &tier2req
s.wasmExtensions = func(m map[string]string) (map[string]map[string]wasm.WASMExtension, error) {
return make(map[string]map[string]wasm.WASMExtension), nil
}
return s.processRange(ctx, request, respFunc)
}