Skip to content

Commit

Permalink
Merge pull request #13 from xucong053/keep-alive
Browse files Browse the repository at this point in the history
fix: add heartbeat feature to keep the plugin alive
  • Loading branch information
debugtalk authored Aug 21, 2024
2 parents a0040bd + daefe7e commit 66d9835
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 45 deletions.
4 changes: 4 additions & 0 deletions go_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,7 @@ func (p *goPlugin) Quit() error {
// no need to quit for go plugin
return nil
}

func (p *goPlugin) StartHeartbeat() {

}
138 changes: 93 additions & 45 deletions hashicorp_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"os"
"os/exec"
"sync"
"time"

"github.com/hashicorp/go-hclog"

"github.com/hashicorp/go-plugin"
"github.com/pkg/errors"
Expand Down Expand Up @@ -39,16 +42,87 @@ func newHashicorpPlugin(path string, option *pluginOption) (*hashicorpPlugin, er
option: option,
}

// cmd
// plugin type, grpc or rpc
p.rpcType = rpcType(os.Getenv(fungo.PluginTypeEnvName))
if p.rpcType != rpcTypeRPC {
p.rpcType = rpcTypeGRPC // default
}
// logger
logger = logger.ResetNamed(fmt.Sprintf("hc-%v-%v", p.rpcType, p.option.langType))

// 失败则继续尝试,连续三次失败则返回错误
err := p.startPlugin()
if err == nil {
return p, err
}
logger.Info("load hashicorp go plugin success", "path", path)

return nil, err
}

func (p *hashicorpPlugin) Type() string {
return fmt.Sprintf("hashicorp-%s-%v", p.rpcType, p.option.langType)
}

func (p *hashicorpPlugin) Path() string {
return p.path
}

func (p *hashicorpPlugin) Has(funcName string) bool {
logger.Debug("check if plugin has function", "funcName", funcName)
flag, ok := p.cachedFunctions.Load(funcName)
if ok {
return flag.(bool)
}

funcNames, err := p.funcCaller.GetNames()
if err != nil {
return false
}

for _, name := range funcNames {
if name == funcName {
p.cachedFunctions.Store(funcName, true) // cache as exists
return true
}
}

p.cachedFunctions.Store(funcName, false) // cache as not exists
return false
}

func (p *hashicorpPlugin) Call(funcName string, args ...interface{}) (interface{}, error) {
return p.funcCaller.Call(funcName, args...)
}

func (p *hashicorpPlugin) StartHeartbeat() {
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
var err error

for range ticker.C {
// Check the client connection status
logger.Info("heartbreak......")
if p.client.Exited() {
logger.Error(fmt.Sprintf("plugin exited, restarting..."))
err = p.startPlugin()
if err != nil {
break
}
}
}
}

func (p *hashicorpPlugin) startPlugin() error {
var cmd *exec.Cmd
if p.option.langType == langTypePython {
// hashicorp python plugin
cmd = exec.Command(p.option.python3, path)
cmd = exec.Command(p.option.python3, p.path)
// hashicorp python plugin only supports gRPC
p.rpcType = rpcTypeGRPC
} else {
// hashicorp go plugin
cmd = exec.Command(path)
cmd = exec.Command(p.path)
// hashicorp go plugin supports grpc and rpc
p.rpcType = rpcType(os.Getenv(fungo.PluginTypeEnvName))
if p.rpcType != rpcTypeRPC {
Expand All @@ -57,11 +131,21 @@ func newHashicorpPlugin(path string, option *pluginOption) (*hashicorpPlugin, er
}
cmd.Env = append(os.Environ(), fmt.Sprintf("%s=%s", fungo.PluginTypeEnvName, p.rpcType))

// logger
logger = logger.ResetNamed(fmt.Sprintf("hc-%v-%v", p.rpcType, p.option.langType))
var err error
maxRetryCount := 3
for i := 0; i < maxRetryCount; i++ {
err = p.tryStartPlugin(cmd, logger)
if err == nil {
return nil
}
time.Sleep(time.Second * time.Duration(i*i)) // sleep temporarily before next try
}
logger.Error("failed to start plugin after max retries")
return errors.Wrap(err, "failed to start plugin after max retries")
}

func (p *hashicorpPlugin) tryStartPlugin(cmd *exec.Cmd, logger hclog.Logger) error {
// launch the plugin process
logger.Info("launch the plugin process")
p.client = plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: fungo.HandshakeConfig,
Plugins: map[string]plugin.Plugin{
Expand All @@ -79,58 +163,22 @@ func newHashicorpPlugin(path string, option *pluginOption) (*hashicorpPlugin, er
// Connect via RPC/gRPC
rpcClient, err := p.client.Client()
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("connect %s plugin failed", p.rpcType))
return errors.Wrap(err, fmt.Sprintf("connect %s plugin failed", p.rpcType))
}

// Request the plugin
raw, err := rpcClient.Dispense(p.rpcType.String())
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("request %s plugin failed", p.rpcType))
return errors.Wrap(err, fmt.Sprintf("request %s plugin failed", p.rpcType))
}

// We should have a Function now! This feels like a normal interface
// implementation but is in fact over an RPC connection.
p.funcCaller = raw.(fungo.IFuncCaller)

p.cachedFunctions = sync.Map{}
logger.Info("load hashicorp go plugin success", "path", path)

return p, nil
}

func (p *hashicorpPlugin) Type() string {
return fmt.Sprintf("hashicorp-%s-%v", p.rpcType, p.option.langType)
}

func (p *hashicorpPlugin) Path() string {
return p.path
}

func (p *hashicorpPlugin) Has(funcName string) bool {
logger.Debug("check if plugin has function", "funcName", funcName)
flag, ok := p.cachedFunctions.Load(funcName)
if ok {
return flag.(bool)
}

funcNames, err := p.funcCaller.GetNames()
if err != nil {
return false
}

for _, name := range funcNames {
if name == funcName {
p.cachedFunctions.Store(funcName, true) // cache as exists
return true
}
}

p.cachedFunctions.Store(funcName, false) // cache as not exists
return false
}

func (p *hashicorpPlugin) Call(funcName string, args ...interface{}) (interface{}, error) {
return p.funcCaller.Call(funcName, args...)
return nil
}

func (p *hashicorpPlugin) Quit() error {
Expand Down
1 change: 1 addition & 0 deletions init.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type IPlugin interface {
Has(funcName string) bool // check if plugin has function
Call(funcName string, args ...interface{}) (interface{}, error) // call function
Quit() error // quit plugin
StartHeartbeat() // heartbeat to keep the plugin alive
}

type langType string
Expand Down

0 comments on commit 66d9835

Please sign in to comment.