Skip to content

Commit

Permalink
*: support RPC server
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho committed Feb 24, 2022
1 parent 8e2b9da commit 8651abd
Show file tree
Hide file tree
Showing 29 changed files with 5,766 additions and 63 deletions.
36 changes: 36 additions & 0 deletions .github/workflows/build-test-release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Build + test + release

on:
push:
pull_request:

permissions:
contents: write

jobs:
build_test_release:
runs-on: ubuntu-latest
steps:
- name: Git checkout
uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.17
# - name: Run unit tests
# shell: bash
# run: scripts/tests.unit.sh
- name: Run e2e tests
shell: bash
run: scripts/tests.e2e.sh 1.7.4 1.7.5
- name: Run GoReleaser
uses: goreleaser/goreleaser-action@v2
with:
distribution: goreleaser
version: latest
args: release --rm-dist
env:
# https://docs.github.com/en/actions/security-guides/automatic-token-authentication#about-the-github_token-secret
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
17 changes: 17 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,20 @@

.netrc
*.swp

# Test binary, built with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Dependency directories (remove the comment below to include it)
# vendor/

# goreleaser
dist/

# https://docs.buf.build/installation
*.lock

bin/
23 changes: 23 additions & 0 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# ref. https://goreleaser.com/customization/build/
builds:
- id: avalanche-network-runner
main: ./cmd/avalanche-network-runner
binary: avalanche-network-runner
flags:
- -v
ignore:
- goos: darwin
goarch: 386
- goos: linux
goarch: 386
- goos: windows
goarch: 386
- goos: freebsd
goarch: 386

release:
# Repo in which the release will be created.
# Default is extracted from the origin remote URL or empty if its private hosted.
github:
owner: ava-labs
name: avalanche-network-runner
2 changes: 2 additions & 0 deletions LICENSE.header
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Copyright (C) 2019-2022, Ava Labs, Inc. All rights reserved.
See the file LICENSE for licensing terms.
12 changes: 12 additions & 0 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: v1
plugins:
- name: go
out: .
opt: paths=source_relative
- name: go-grpc
out: .
opt: paths=source_relative
# https://grpc-ecosystem.github.io/grpc-gateway/docs/tutorials/introduction/
- name: grpc-gateway
out: .
opt: paths=source_relative
11 changes: 11 additions & 0 deletions buf.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
version: v1
deps:
- buf.build/googleapis/googleapis
breaking:
use:
- FILE
lint:
use:
- DEFAULT
except:
- PACKAGE_VERSION_SUFFIX
259 changes: 259 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,259 @@
// Copyright (C) 2019-2022, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

// Package client implements client.
package client

import (
"context"
"errors"
"io"
"strings"
"sync"
"time"

"github.com/ava-labs/avalanche-network-runner/pkg/color"
"github.com/ava-labs/avalanche-network-runner/pkg/logutil"
"github.com/ava-labs/avalanche-network-runner/rpcpb"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
)

type Config struct {
LogLevel string
Endpoint string
DialTimeout time.Duration
}

type Client interface {
Ping(ctx context.Context) (*rpcpb.PingResponse, error)
Start(ctx context.Context, execPath string, opts ...OpOption) (*rpcpb.StartResponse, error)
Health(ctx context.Context) (*rpcpb.HealthResponse, error)
URIs(ctx context.Context) ([]string, error)
Status(ctx context.Context) (*rpcpb.StatusResponse, error)
StreamStatus(ctx context.Context, pushInterval time.Duration) (<-chan *rpcpb.ClusterInfo, error)
RemoveNode(ctx context.Context, name string) (*rpcpb.RemoveNodeResponse, error)
RestartNode(ctx context.Context, name string, execPath string, opts ...OpOption) (*rpcpb.RestartNodeResponse, error)
Stop(ctx context.Context) (*rpcpb.StopResponse, error)
Close() error
}

type client struct {
cfg Config

conn *grpc.ClientConn

pingc rpcpb.PingServiceClient
controlc rpcpb.ControlServiceClient

closed chan struct{}
closeOnce sync.Once
}

func New(cfg Config) (Client, error) {
lcfg := logutil.GetDefaultZapLoggerConfig()
lcfg.Level = zap.NewAtomicLevelAt(logutil.ConvertToZapLevel(cfg.LogLevel))
logger, err := lcfg.Build()
if err != nil {
return nil, err
}
_ = zap.ReplaceGlobals(logger)

color.Outf("{{blue}}dialing endpoint %q{{/}}\n", cfg.Endpoint)
ctx, cancel := context.WithTimeout(context.Background(), cfg.DialTimeout)
conn, err := grpc.DialContext(
ctx,
cfg.Endpoint,
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
cancel()
if err != nil {
return nil, err
}

return &client{
cfg: cfg,
conn: conn,
pingc: rpcpb.NewPingServiceClient(conn),
controlc: rpcpb.NewControlServiceClient(conn),
closed: make(chan struct{}),
}, nil
}

func (c *client) Ping(ctx context.Context) (*rpcpb.PingResponse, error) {
zap.L().Info("ping")

// ref. https://grpc-ecosystem.github.io/grpc-gateway/docs/tutorials/adding_annotations/
// curl -X POST -k http://localhost:8081/v1/ping -d ''
return c.pingc.Ping(ctx, &rpcpb.PingRequest{})
}

func (c *client) Start(ctx context.Context, execPath string, opts ...OpOption) (*rpcpb.StartResponse, error) {
ret := &Op{}
ret.applyOpts(opts)

zap.L().Info("start")
return c.controlc.Start(ctx, &rpcpb.StartRequest{
ExecPath: execPath,
WhitelistedSubnets: &ret.whitelistedSubnets,
LogLevel: &ret.logLevel,
})
}

func (c *client) Health(ctx context.Context) (*rpcpb.HealthResponse, error) {
zap.L().Info("health")
return c.controlc.Health(ctx, &rpcpb.HealthRequest{})
}

func (c *client) URIs(ctx context.Context) ([]string, error) {
zap.L().Info("uris")
resp, err := c.controlc.URIs(ctx, &rpcpb.URIsRequest{})
if err != nil {
return nil, err
}
return resp.Uris, nil
}

func (c *client) Status(ctx context.Context) (*rpcpb.StatusResponse, error) {
zap.L().Info("status")
return c.controlc.Status(ctx, &rpcpb.StatusRequest{})
}

func (c *client) StreamStatus(ctx context.Context, pushInterval time.Duration) (<-chan *rpcpb.ClusterInfo, error) {
stream, err := c.controlc.StreamStatus(ctx, &rpcpb.StreamStatusRequest{
PushInterval: int64(pushInterval),
})
if err != nil {
return nil, err
}

ch := make(chan *rpcpb.ClusterInfo, 1)
go func() {
defer func() {
zap.L().Debug("closing stream send", zap.Error(stream.CloseSend()))
close(ch)
}()
zap.L().Info("start receive routine")
for {
select {
case <-ctx.Done():
return
case <-c.closed:
return
default:
}

// receive data from stream
msg := new(rpcpb.StatusResponse)
err := stream.RecvMsg(msg)
if err == nil {
ch <- msg.GetClusterInfo()
continue
}

if errors.Is(err, io.EOF) {
zap.L().Debug("received EOF from client; returning to close the stream from server side")
return
}
if isClientCanceled(stream.Context().Err(), err) {
zap.L().Warn("failed to receive status request from gRPC stream due to client cancellation", zap.Error(err))
} else {
zap.L().Warn("failed to receive status request from gRPC stream", zap.Error(err))
}
return
}
}()
return ch, nil
}

func (c *client) Stop(ctx context.Context) (*rpcpb.StopResponse, error) {
zap.L().Info("stop")
return c.controlc.Stop(ctx, &rpcpb.StopRequest{})
}

func (c *client) RemoveNode(ctx context.Context, name string) (*rpcpb.RemoveNodeResponse, error) {
zap.L().Info("remove node", zap.String("name", name))
return c.controlc.RemoveNode(ctx, &rpcpb.RemoveNodeRequest{Name: name})
}

func (c *client) RestartNode(ctx context.Context, name string, execPath string, opts ...OpOption) (*rpcpb.RestartNodeResponse, error) {
ret := &Op{}
ret.applyOpts(opts)

zap.L().Info("restart node", zap.String("name", name))
return c.controlc.RestartNode(ctx, &rpcpb.RestartNodeRequest{
Name: name,
StartRequest: &rpcpb.StartRequest{
ExecPath: execPath,
WhitelistedSubnets: &ret.whitelistedSubnets,
},
})
}

func (c *client) Close() error {
c.closeOnce.Do(func() {
close(c.closed)
})
return c.conn.Close()
}

type Op struct {
whitelistedSubnets string
logLevel string
}

type OpOption func(*Op)

func (op *Op) applyOpts(opts []OpOption) {
for _, opt := range opts {
opt(op)
}
}

func WithWhitelistedSubnets(whitelistedSubnets string) OpOption {
return func(op *Op) {
op.whitelistedSubnets = whitelistedSubnets
}
}

func WithLogLevel(logLevel string) OpOption {
return func(op *Op) {
op.logLevel = logLevel
}
}

func isClientCanceled(ctxErr error, err error) bool {
if ctxErr != nil {
return true
}

ev, ok := status.FromError(err)
if !ok {
return false
}

switch ev.Code() {
case codes.Canceled, codes.DeadlineExceeded:
// client-side context cancel or deadline exceeded
// "rpc error: code = Canceled desc = context canceled"
// "rpc error: code = DeadlineExceeded desc = context deadline exceeded"
return true
case codes.Unavailable:
msg := ev.Message()
// client-side context cancel or deadline exceeded with TLS ("http2.errClientDisconnected")
// "rpc error: code = Unavailable desc = client disconnected"
if msg == "client disconnected" {
return true
}
// "grpc/transport.ClientTransport.CloseStream" on canceled streams
// "rpc error: code = Unavailable desc = stream error: stream ID 21; CANCEL")
if strings.HasPrefix(msg, "stream error: ") && strings.HasSuffix(msg, "; CANCEL") {
return true
}
}
return false
}
Loading

0 comments on commit 8651abd

Please sign in to comment.