Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sd): default namespace is graphite, reduce weight if overload #253

Merged
merged 3 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 18 additions & 14 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,12 @@ type Common struct {
MemoryReturnInterval time.Duration `toml:"memory-return-interval" json:"memory-return-interval" comment:"daemon will return the freed memory to the OS when it>0"`
HeadersToLog []string `toml:"headers-to-log" json:"headers-to-log" comment:"additional request headers to log"`

BaseWeight int `toml:"base_weight" json:"base_weight" comment:"service discovery base weight (on idle)"`
SDType SDType `toml:"service-discovery-type" json:"service-discovery-type" comment:"service discovery type"`
SD string `toml:"service-discovery" json:"service-discovery" comment:"service discovery address (consul)"`
SDNamespace string `toml:"service-discovery-ns" json:"service-discovery-ns" comment:"service discovery namespace (graphite by default)"`
SDDc []string `toml:"service-discovery-ds" json:"service-discovery-ds" comment:"service discovery datacenters (first - is primary, in other register as backup)"`
BaseWeight int `toml:"base_weight" json:"base_weight" comment:"service discovery base weight (on idle)"`
SDType SDType `toml:"service-discovery-type" json:"service-discovery-type" comment:"service discovery type"`
SD string `toml:"service-discovery" json:"service-discovery" comment:"service discovery address (consul)"`
SDNamespace string `toml:"service-discovery-ns" json:"service-discovery-ns" comment:"service discovery namespace (graphite by default)"`
SDDc []string `toml:"service-discovery-ds" json:"service-discovery-ds" comment:"service discovery datacenters (first - is primary, in other register as backup)"`
SDExpire time.Duration `toml:"service-discovery-expire" json:"service-discovery-expire" comment:"service discovery expire duration for cleanup (minimum is 24h, if enabled)"`

FindCacheConfig CacheConfig `toml:"find-cache" json:"find-cache" comment:"find/tags cache config"`

Expand Down Expand Up @@ -278,11 +279,11 @@ type Carbonlink struct {

// Prometheus configuration
type Prometheus struct {
Listen string `toml:"listen" json:"listen" comment:"listen addr for prometheus ui and api"`
ExternalURLRaw string `toml:"external-url" json:"external-url" comment:"allows to set URL for redirect manually"`
ExternalURL *url.URL `toml:"-" json:"-"`
PageTitle string `toml:"page-title" json:"page-title"`
LookbackDelta time.Duration `toml:"lookback-delta" json:"lookback-delta"`
Listen string `toml:"listen" json:"listen" comment:"listen addr for prometheus ui and api"`
ExternalURLRaw string `toml:"external-url" json:"external-url" comment:"allows to set URL for redirect manually"`
ExternalURL *url.URL `toml:"-" json:"-"`
PageTitle string `toml:"page-title" json:"page-title"`
LookbackDelta time.Duration `toml:"lookback-delta" json:"lookback-delta"`
RemoteReadConcurrencyLimit int `toml:"remote-read-concurrency-limit" json:"remote-read-concurrency-limit" comment:"concurrently handled remote read requests"`
}

Expand Down Expand Up @@ -393,10 +394,10 @@ func New() *Config {
TotalTimeout: 500 * time.Millisecond,
},
Prometheus: Prometheus{
ExternalURLRaw: "",
PageTitle: "Prometheus Time Series Collection and Processing Server",
Listen: ":9092",
LookbackDelta: 5 * time.Minute,
ExternalURLRaw: "",
PageTitle: "Prometheus Time Series Collection and Processing Server",
Listen: ":9092",
LookbackDelta: 5 * time.Minute,
RemoteReadConcurrencyLimit: 10,
},
Debug: Debug{
Expand Down Expand Up @@ -722,6 +723,9 @@ func (c *Config) NeedLoadAvgColect() bool {
if c.Common.SDNamespace == "" {
c.Common.SDNamespace = "graphite"
}
if c.Common.SDExpire < 24*time.Hour {
c.Common.SDExpire = 24 * time.Hour
}
return true
}
if c.ClickHouse.RenderAdaptiveQueries > 0 {
Expand Down
2 changes: 2 additions & 0 deletions doc/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ Only one tag used as filter for index field Tag1, see graphite_tagged table [str
service-discovery-ns = ""
# service discovery datacenters (first - is primary, in other register as backup)
service-discovery-ds = []
# service discovery expire duration for cleanup (minimum is 24h, if enabled)
service-discovery-expire = "0s"

# find/tags cache config
[common.find-cache]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/lomik/zapwriter v0.0.0-20210624082824-c1161d1eb463
github.com/msaf1980/go-expirecache v0.0.2
github.com/msaf1980/go-metrics v0.0.14
github.com/msaf1980/go-stringutils v0.1.4
github.com/msaf1980/go-stringutils v0.1.6
github.com/msaf1980/go-syncutils v0.0.3
github.com/msaf1980/go-timeutils v0.0.3
github.com/pelletier/go-toml v1.9.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,8 @@ github.com/msaf1980/go-metrics v0.0.11/go.mod h1:8VcR8MdyvIJpcVLOVFKbhb27+60tXy0
github.com/msaf1980/go-metrics v0.0.14 h1:gD0kCG5MDbon33Nkz49yW6kz3yu0DHzDN0SxjGTWlTA=
github.com/msaf1980/go-metrics v0.0.14/go.mod h1:8VcR8MdyvIJpcVLOVFKbhb27+60tXy0M+zq7Ag8a6Pw=
github.com/msaf1980/go-stringutils v0.1.2/go.mod h1:AxmV/6JuQUAtZJg5XmYATB5ZwCWgtpruVHY03dswRf8=
github.com/msaf1980/go-stringutils v0.1.4 h1:UwsIT0hplHVucqbknk3CoNqKkmIuSHhsbBldXxyld5U=
github.com/msaf1980/go-stringutils v0.1.4/go.mod h1:AxmV/6JuQUAtZJg5XmYATB5ZwCWgtpruVHY03dswRf8=
github.com/msaf1980/go-stringutils v0.1.6 h1:qri8o+4XLJCJYemHcvJY6xJhrGTmllUoPwayKEj4NSg=
github.com/msaf1980/go-stringutils v0.1.6/go.mod h1:xpicaTIpLAVzL0gUQkciB1zjypDGKsOCI25cKQbRQYA=
github.com/msaf1980/go-syncutils v0.0.3 h1:bd6+yTSB8/CmpG7M6j1gq5sJMyPqecjJcBf19s2Y6u4=
github.com/msaf1980/go-syncutils v0.0.3/go.mod h1:zoZwQNkDATcfKq5lQPK6dmJT7Z01COxw/vd8bcJyC9w=
github.com/msaf1980/go-timeutils v0.0.3 h1:c0NIpJBcU6KoMeMCPdnbGFcaP4sm7VCwoW1cdgsmUkU=
Expand Down
96 changes: 63 additions & 33 deletions graphite-clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"os/signal"
"runtime"
"runtime/debug"
"strings"
"sync"
"syscall"
"time"
Expand All @@ -33,7 +34,6 @@ import (
"github.com/lomik/graphite-clickhouse/prometheus"
"github.com/lomik/graphite-clickhouse/render"
"github.com/lomik/graphite-clickhouse/sd"
"github.com/lomik/graphite-clickhouse/sd/nginx"
"github.com/lomik/graphite-clickhouse/tagger"
)

Expand Down Expand Up @@ -108,7 +108,10 @@ func main() {
)

sdList := flag.Bool("sd-list", false, "List registered nodes in SD")
sdClean := flag.Bool("sd-clean", false, "Cleanup registered nodes in SD")
sdDelete := flag.Bool("sd-delete", false, "Delete registered nodes for this hostname in SD")
sdEvict := flag.String("sd-evict", "", "Delete registered nodes for hostname in SD")
sdClean := flag.Bool("sd-clean", false, "Cleanup expired registered nodes in SD")
sdExpired := flag.Bool("sd-expired", false, "List expired registered nodes in SD")

printVersion := flag.Bool("version", false, "Print version")
verbose := flag.Bool("verbose", false, "Verbose (print config on startup)")
Expand Down Expand Up @@ -137,35 +140,61 @@ func main() {
return
}

if *sdList || *sdClean {
if *sdEvict != "" {
if cfg.Common.SD != "" && cfg.NeedLoadAvgColect() {
var sd sd.SD
var s sd.SD
logger := zapwriter.Default()
switch cfg.Common.SDType {
case config.SDNginx:
sd = nginx.New(cfg.Common.SD, cfg.Common.SDNamespace, "", logger)
default:
panic(fmt.Errorf("service discovery type %q can be registered", cfg.Common.SDType.String()))
if s, err = sd.New(&cfg.Common, *sdEvict, logger); err != nil {
fmt.Fprintf(os.Stderr, "service discovery type %q can be registered", cfg.Common.SDType.String())
os.Exit(1)
}
ts := time.Now().Unix() - 3600
if nodes, err := sd.Nodes(); err == nil {
for _, node := range nodes {
if *sdClean && node.Flags > 0 {
if ts > node.Flags {
fmt.Printf("%s: %s (%s), deleted\n", node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
// sd.Delete(node.Key, node.Value)
} else {
fmt.Printf("%s: %s (%s)\n", node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
}
} else {
fmt.Printf("%s: %s (%s)\n", node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
}
err = s.Clear("", "")
}
return
} else if *sdList || *sdDelete || *sdExpired || *sdClean {
if cfg.Common.SD != "" && cfg.NeedLoadAvgColect() {
var s sd.SD
logger := zapwriter.Default()
if s, err = sd.New(&cfg.Common, "", logger); err != nil {
fmt.Fprintf(os.Stderr, "service discovery type %q can be registered", cfg.Common.SDType.String())
os.Exit(1)
}

// sdList := flag.Bool("sd-list", false, "List registered nodes in SD")
// sdDelete := flag.Bool("sd-delete", false, "Delete registered nodes for this hostname in SD")
// sdEvict := flag.String("sd-evict", "", "Delete registered nodes for hostname in SD")
// sdClean := flag.Bool("sd-clean", false, "Cleanup expired registered nodes in SD")

if *sdDelete {
hostname, _ := os.Hostname()
hostname, _, _ = strings.Cut(hostname, ".")
if err = s.Clear("", ""); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
} else if *sdExpired {
if err = sd.Cleanup(&cfg.Common, s, true); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
} else if *sdClean {
if err = sd.Cleanup(&cfg.Common, s, false); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
} else {
log.Fatal(err)
if nodes, err := s.Nodes(); err == nil {
for _, node := range nodes {
fmt.Printf("%s/%s: %s (%s)\n", s.Namespace(), node.Key, node.Value, time.Unix(node.Flags, 0).UTC().Format(time.RFC3339Nano))
}
} else {
fmt.Fprintln(os.Stderr, err.Error())
os.Exit(1)
}
}
} else {
fmt.Fprintln(os.Stderr, "SD not enabled")
os.Exit(1)
}
return
}
Expand Down Expand Up @@ -288,29 +317,30 @@ func main() {
}
}()

if cfg.Common.SD != "" && cfg.NeedLoadAvgColect() {
go func() {
time.Sleep(time.Millisecond * 100)
sdLogger := localManager.Logger("service discovery")
sd.Register(&cfg.Common, sdLogger)
}()
}

go func() {
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGTERM, syscall.SIGINT)
<-stop
logger.Info("stoping graphite-clickhouse")
if cfg.Common.SDType != config.SDNone {
if cfg.Common.SD != "" {
// unregister SD
sd.Stop()
time.Sleep(10 * time.Second)
}
// initiating the shutdown
ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
srv.Shutdown(ctx)
cancel()
}()

if cfg.Common.SD != "" {
go func() {
time.Sleep(time.Millisecond * 100)
sdLogger := localManager.Logger("service discovery")
sd.Register(cfg, sdLogger)
}()
}

exitWait.Wait()

logger.Info("stop graphite-clickhouse")
Expand Down
10 changes: 8 additions & 2 deletions load_avg/load_avg.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,21 @@ func Store(f float64) {
}

func Weight(n int, l float64) int64 {
if n <= 0 || l >= 2.0 {
return 1
}
// (1 / normalized_load_avg - 1)
l = math.Round(10*l) / 10
if l == 0 {
return 2 * int64(n)
}
if l > 1.0 {
l *= 4
}
l = math.Log10(l)
w := int64(n) - int64(float64(n)*l)
if w < 0 {
return 0
if w <= 0 {
return 1
}
return w
}
26 changes: 14 additions & 12 deletions load_avg/load_avg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ func TestWeight(t *testing.T) {
{n: 100, l: 0.5, want: 130},
{n: 100, l: 0.9, want: 104},
{n: 100, l: 1, want: 100},
{n: 100, l: 1.1, want: 96},
{n: 100, l: 2, want: 70},
{n: 100, l: 4, want: 40},
{n: 100, l: 9, want: 5},
{n: 100, l: 10, want: 0},
{n: 100, l: 20, want: 0},
{n: 100, l: 1.1, want: 36},
{n: 100, l: 1.9, want: 12},
{n: 100, l: 2, want: 1},
{n: 100, l: 9, want: 1},
{n: 100, l: 10, want: 1},
{n: 100, l: 20, want: 1},
// n : 1000
{n: 1000, l: 0, want: 2000},
{n: 1000, l: 0.1, want: 1999},
Expand All @@ -33,12 +33,14 @@ func TestWeight(t *testing.T) {
{n: 1000, l: 0.5, want: 1301},
{n: 1000, l: 0.9, want: 1045},
{n: 1000, l: 1, want: 1000},
{n: 1000, l: 1.1, want: 959},
{n: 1000, l: 2, want: 699},
{n: 1000, l: 4, want: 398},
{n: 1000, l: 9, want: 46},
{n: 1000, l: 10, want: 0},
{n: 1000, l: 20, want: 0},
{n: 1000, l: 1.1, want: 357},
{n: 1000, l: 1.9, want: 120},
{n: 1000, l: 2, want: 1},
{n: 1000, l: 3, want: 1},
{n: 1000, l: 4, want: 1},
{n: 1000, l: 9, want: 1},
{n: 1000, l: 10, want: 1},
{n: 1000, l: 20, want: 1},
}
for _, tt := range tests {
t.Run(fmt.Sprintf("%d#%f", tt.n, tt.l), func(t *testing.T) {
Expand Down
Loading
Loading