Skip to content

Commit

Permalink
go/archive: fix runtime queries on archive node
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrus committed Apr 4, 2024
1 parent 5d44df5 commit 30cb4df
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 7 deletions.
4 changes: 4 additions & 0 deletions .changelog/5622.bugfix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/archive: fix runtime queries on archive nodes

Fixes storage worker initialization on archive nodes which was causing runtime
queries to fail.
61 changes: 60 additions & 1 deletion go/consensus/cometbft/full/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
cmtcore "github.com/cometbft/cometbft/rpc/core"
"github.com/cometbft/cometbft/state"
"github.com/cometbft/cometbft/store"
cmttypes "github.com/cometbft/cometbft/types"
"github.com/spf13/viper"

"github.com/oasisprotocol/oasis-core/go/common/identity"
Expand All @@ -37,6 +38,7 @@ type archiveService struct {
*commonNode

abciClient abcicli.Client
eb *cmttypes.EventBus

quitCh chan struct{}

Expand All @@ -49,6 +51,10 @@ func (srv *archiveService) Start() error {
return fmt.Errorf("cometbft: service already started")
}

if err := srv.eb.Start(); err != nil {
return err
}

if err := srv.commonNode.start(); err != nil {
return err
}
Expand All @@ -70,6 +76,15 @@ func (srv *archiveService) Start() error {
}
}()

// Start command dispatchers for all the service clients.
srv.serviceClientsWg.Add(len(srv.serviceClients))
for _, svc := range srv.serviceClients {
go func() {
defer srv.serviceClientsWg.Done()
srv.serviceClientWorker(srv.ctx, svc)
}()
}

srv.commonNode.finishStart()

return nil
Expand Down Expand Up @@ -213,6 +228,7 @@ func NewArchive(
return nil, err
}

srv.eb = cmttypes.NewEventBus()
// Setup minimal CometBFT environment needed to support consensus queries.
cmtcore.SetEnvironment(&cmtcore.Environment{
ProxyAppQuery: cmtproxy.NewAppConnQuery(srv.abciClient, nil),
Expand All @@ -224,7 +240,7 @@ func NewArchive(
GenDoc: tmGenDoc,
Logger: logger,
Config: *cmtConfig.RPC,
EventBus: nil,
EventBus: srv.eb,
P2PPeers: nil,
P2PTransport: nil,
PubKey: nil,
Expand All @@ -236,3 +252,46 @@ func NewArchive(

return srv, srv.initialize()
}

// serviceClientWorker handles command dispatching.
func (srv *archiveService) serviceClientWorker(ctx context.Context, svc api.ServiceClient) {
sd := svc.ServiceDescriptor()
if sd == nil {
// Some services don't actually need a worker.
return
}

// Archive only handles commands.
cmdCh := sd.Commands()
if cmdCh == nil {
// Services without commands do not need a worker.
return
}

logger := srv.Logger.With("service", sd.Name())
logger.Info("starting command dispatcher")

// Fetch and remember the latest block. This won't change on an archive node.
latestBlock, err := srv.commonNode.GetBlock(ctx, consensusAPI.HeightLatest)
if err != nil {
logger.Error("failed to fetch latest block",
"err", err,
)
return
}

// Service client event loop.
for {
select {
case <-ctx.Done():
return
case cmd := <-cmdCh:
if err := svc.DeliverCommand(ctx, latestBlock.Height, cmd); err != nil {
logger.Error("failed to deliver command to service client",
"err", err,
)
continue
}
}
}
}
32 changes: 26 additions & 6 deletions go/oasis-test-runner/scenario/e2e/runtime/archive_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/env"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/oasis"
"github.com/oasisprotocol/oasis-core/go/oasis-test-runner/scenario"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
"github.com/oasisprotocol/oasis-core/go/runtime/client/api"
)

Expand Down Expand Up @@ -74,12 +75,8 @@ func (sc *archiveAPI) testArchiveAPI(ctx context.Context, archiveCtrl *oasis.Con
if err != nil {
return err
}
switch {
case runtime && isReady:
return fmt.Errorf("runtime archive node reports ready to accept runtime work")
case !runtime && !isReady:
return fmt.Errorf("consensus archive node reports as not ready")
default:
if !isReady {
return fmt.Errorf("archive node reports as not ready")
}

sc.Logger.Info("testing GetStatus")
Expand Down Expand Up @@ -282,6 +279,29 @@ func (sc *archiveAPI) testArchiveAPI(ctx context.Context, archiveCtrl *oasis.Con
return fmt.Errorf("runtime WatchBlocks: %w", err)
}
defer sub.Close()

// Temporary configure the archive as the client controller.
clientCtrl := sc.Net.ClientController()
sc.Net.SetClientController(archiveCtrl)
defer func() {
sc.Net.SetClientController(clientCtrl)
}()

// Test runtime client query.
sc.Logger.Info("testing runtime client query")
rsp, err := sc.submitKeyValueRuntimeGetQuery(
ctx,
KeyValueRuntimeID,
"my_key",
roothash.RoundLatest,
)
if err != nil {
return fmt.Errorf("failed to query runtime: %w", err)
}
if rsp != "my_value" {
return fmt.Errorf("response does not have expected value (got: '%v', expected: '%v')", rsp, "my_value")
}

return nil
}

Expand Down
6 changes: 6 additions & 0 deletions go/runtime/history/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/pubsub"
"github.com/oasisprotocol/oasis-core/go/config"
roothash "github.com/oasisprotocol/oasis-core/go/roothash/api"
"github.com/oasisprotocol/oasis-core/go/roothash/api/block"
)
Expand Down Expand Up @@ -176,6 +177,11 @@ func (h *runtimeHistory) ConsensusCheckpoint(height int64) error {
}

func (h *runtimeHistory) StorageSyncCheckpoint(round uint64) error {
if config.GlobalConfig.Mode == config.ModeArchive {
// If we are in archive mode, ignore storage sync checkpoints.
return nil
}

if !h.haveLocalStorageWorker {
panic("received storage sync checkpoint when local storage worker is disabled")
}
Expand Down

0 comments on commit 30cb4df

Please sign in to comment.