Skip to content

Commit

Permalink
feature:自适应重试控制策略 (#270)
Browse files Browse the repository at this point in the history
* feature:自适应重试控制策略

* feature:自适应重试控制策略

* feature:自适应重试控制策略

* feature:自适应重试控制策略

* feature:自适应重试控制策略

* feature:自适应重试控制策略
  • Loading branch information
Stone-afk authored Jan 3, 2025
1 parent 8abadae commit 97490cd
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 2 deletions.
90 changes: 90 additions & 0 deletions retry/adaptive.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2021 ecodeclub
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"math/bits"
"sync/atomic"
"time"
)

var _ Strategy = (*AdaptiveTimeoutRetryStrategy)(nil)

type AdaptiveTimeoutRetryStrategy struct {
strategy Strategy // 基础重试策略
threshold int // 超时比率阈值 (单位:比特数量)
ringBuffer []uint64 // 比特环(滑动窗口存储超时信息)
reqCount uint64 // 请求数量
bufferLen int // 滑动窗口长度
bitCnt uint64 // 比特位总数
}

func (s *AdaptiveTimeoutRetryStrategy) Next() (time.Duration, bool) {
failCount := s.getFailed()
if failCount >= s.threshold {
return 0, false
}
return s.strategy.Next()
}

func (s *AdaptiveTimeoutRetryStrategy) Report(err error) Strategy {
if err == nil {
s.markSuccess()
} else {
s.markFail()
}
return s
}

func (s *AdaptiveTimeoutRetryStrategy) markSuccess() {
count := atomic.AddUint64(&s.reqCount, 1)
count = count % s.bitCnt
// 对2^x进行取模或者整除运算时可以用位运算代替除法和取模
// count / 64 可以转换成 count >> 6。 位运算会更高效。
idx := count >> 6
// count % 64 可以转换成 count & 63
bitPos := count & 63
old := atomic.LoadUint64(&s.ringBuffer[idx])
atomic.StoreUint64(&s.ringBuffer[idx], old&^(uint64(1)<<bitPos))
}

func (s *AdaptiveTimeoutRetryStrategy) markFail() {
count := atomic.AddUint64(&s.reqCount, 1)
count = count % s.bitCnt
idx := count >> 6
bitPos := count & 63
old := atomic.LoadUint64(&s.ringBuffer[idx])
// (uint64(1)<<bitPos) 将目标位设置为1
atomic.StoreUint64(&s.ringBuffer[idx], old|(uint64(1)<<bitPos))
}

func (s *AdaptiveTimeoutRetryStrategy) getFailed() int {
var failCount int
for i := 0; i < len(s.ringBuffer); i++ {
v := atomic.LoadUint64(&s.ringBuffer[i])
failCount += bits.OnesCount64(v)
}
return failCount
}

func NewAdaptiveTimeoutRetryStrategy(strategy Strategy, bufferLen, threshold int) *AdaptiveTimeoutRetryStrategy {
return &AdaptiveTimeoutRetryStrategy{
strategy: strategy,
threshold: threshold,
bufferLen: bufferLen,
ringBuffer: make([]uint64, bufferLen),
bitCnt: uint64(64) * uint64(bufferLen),
}
}
172 changes: 172 additions & 0 deletions retry/adaptive_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright 2021 ecodeclub
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package retry

import (
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestNewAdaptiveTimeoutRetryStrategy_New(t *testing.T) {
testCases := []struct {
name string
threshold int
ringBufferLen int
strategy Strategy
want *AdaptiveTimeoutRetryStrategy
wantErr error
}{
{
name: "valid strategy and threshold",
strategy: &MockStrategy{}, // 假设有一个 MockStrategy 用于测试
threshold: 50,
ringBufferLen: 16,
want: NewAdaptiveTimeoutRetryStrategy(&MockStrategy{}, 16, 50),
wantErr: nil,
},
}

for _, tt := range testCases {
t.Run(tt.name, func(t *testing.T) {
s := NewAdaptiveTimeoutRetryStrategy(tt.strategy, tt.ringBufferLen, tt.threshold)
assert.Equal(t, tt.want, s)
})
}
}

func TestAdaptiveTimeoutRetryStrategy_Next(t *testing.T) {
baseStrategy := &MockStrategy{}
strategy := NewAdaptiveTimeoutRetryStrategy(baseStrategy, 16, 50)

tests := []struct {
name string
wantDelay time.Duration
wantOk bool
}{
{
name: "error below threshold",
wantDelay: 1 * time.Second,
wantOk: true,
},
{
name: "error above threshold",
wantDelay: 1 * time.Second,
wantOk: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
delay, ok := strategy.Next()
assert.Equal(t, tt.wantDelay, delay)
assert.Equal(t, tt.wantOk, ok)
})
}
}

// 测试场景
// 阈值是50
// 2000个请求 有1500个成功的 有500个失败的 最后统计500个失败的有50个可以执行 有450个不能执行 1500成功的都能执行
func TestAdaptiveTimeoutRetryStrategy_Next_Concurrent(t *testing.T) {
// 创建一个基础策略
baseStrategy := &MockStrategy{}

// 创建升级版自适应策略,设置阈值为50
strategy := NewAdaptiveTimeoutRetryStrategy(baseStrategy, 16, 50)

var wg sync.WaitGroup
var successCount, errCount int64
mockErr := errors.New("mock error")

// 并发执行2000个请求
for i := 0; i < 2000; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
// 前1500个请求成功,后500个失败
var err error
if index >= 1500 {
err = mockErr
}
strategy.Report(err)
_, allowed := strategy.Next()
if err != nil {
// 失败请求的统计
if allowed {
atomic.AddInt64(&successCount, 1)
} else {
atomic.AddInt64(&errCount, 1)
}
}
}(i)
}

// 等待所有goroutine完成
wg.Wait()

// 验证结果:期望大约50个失败请求可以执行,450个被拒绝
// 由于是环形缓冲区和并发执行,可能会有一些误差,这里使用一个合理的范围进行判断
finalSuccessCount := int(atomic.LoadInt64(&successCount))
finalErrCount := int(atomic.LoadInt64(&errCount))
if finalSuccessCount < 45 || finalSuccessCount > 55 {
t.Errorf("期望大约50个失败请求被允许执行,实际允许执行的失败请求数量为: %d", finalSuccessCount)
}

if finalErrCount < 445 || finalErrCount > 455 {
t.Errorf("期望大约450个失败请求被拒绝执行,实际被拒绝的失败请求数量为: %d", finalErrCount)
}
}

func ExampleAdaptiveTimeoutRetryStrategy_Next() {
baseStrategy, err := NewExponentialBackoffRetryStrategy(time.Second, time.Second*5, 10)
if err != nil {
fmt.Println(err)
return
}
strategy := NewAdaptiveTimeoutRetryStrategy(baseStrategy, 16, 50)
interval, ok := strategy.Next()
for ok {
fmt.Println(interval)
interval, ok = strategy.Next()
}
// Output:
// 1s
// 2s
// 4s
// 5s
// 5s
// 5s
// 5s
// 5s
// 5s
// 5s
}

type MockStrategy struct {
}

func (m MockStrategy) Next() (time.Duration, bool) {
return 1 * time.Second, true
}

func (m MockStrategy) Report(err error) Strategy {
return m
}
6 changes: 6 additions & 0 deletions retry/exponential.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/ecodeclub/ekit/internal/errs"
)

var _ Strategy = (*ExponentialBackoffRetryStrategy)(nil)

// ExponentialBackoffRetryStrategy 指数退避重试
type ExponentialBackoffRetryStrategy struct {
// 初始重试间隔
Expand Down Expand Up @@ -49,6 +51,10 @@ func NewExponentialBackoffRetryStrategy(initialInterval, maxInterval time.Durati
}, nil
}

func (s *ExponentialBackoffRetryStrategy) Report(err error) Strategy {
return s
}

func (s *ExponentialBackoffRetryStrategy) Next() (time.Duration, bool) {
retries := atomic.AddInt32(&s.retries, 1)
if s.maxRetries <= 0 || retries <= s.maxRetries {
Expand Down
4 changes: 4 additions & 0 deletions retry/exponential_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package retry

import (
"context"
"fmt"
"testing"
"time"
Expand Down Expand Up @@ -82,12 +83,14 @@ func TestNewExponentialBackoffRetryStrategy_New(t *testing.T) {
func TestExponentialBackoffRetryStrategy_Next(t *testing.T) {
testCases := []struct {
name string
ctx context.Context
strategy *ExponentialBackoffRetryStrategy

wantIntervals []time.Duration
}{
{
name: "stop if retries reaches maxRetries",
ctx: context.Background(),
strategy: func() *ExponentialBackoffRetryStrategy {
s, err := NewExponentialBackoffRetryStrategy(1*time.Second, 10*time.Second, 3)
require.NoError(t, err)
Expand All @@ -98,6 +101,7 @@ func TestExponentialBackoffRetryStrategy_Next(t *testing.T) {
},
{
name: "initialInterval over maxInterval",
ctx: context.Background(),
strategy: func() *ExponentialBackoffRetryStrategy {
s, err := NewExponentialBackoffRetryStrategy(1*time.Second, 4*time.Second, 5)
require.NoError(t, err)
Expand Down
6 changes: 6 additions & 0 deletions retry/fixed_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/ecodeclub/ekit/internal/errs"
)

var _ Strategy = (*FixedIntervalRetryStrategy)(nil)

// FixedIntervalRetryStrategy 等间隔重试
type FixedIntervalRetryStrategy struct {
maxRetries int32 // 最大重试次数,如果是 0 或负数,表示无限重试
Expand All @@ -45,3 +47,7 @@ func (s *FixedIntervalRetryStrategy) Next() (time.Duration, bool) {
}
return 0, false
}

func (s *FixedIntervalRetryStrategy) Report(err error) Strategy {
return s
}
8 changes: 7 additions & 1 deletion retry/fixed_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package retry

import (
"context"
"fmt"
"testing"
"time"
Expand All @@ -30,23 +31,25 @@ func TestFixedIntervalRetryStrategy_Next(t *testing.T) {

testCases := []struct {
name string
ctx context.Context
s *FixedIntervalRetryStrategy
interval time.Duration

isContinue bool
}{
{
name: "init case, retries 0",
ctx: context.Background(),
s: &FixedIntervalRetryStrategy{
maxRetries: 3,
interval: time.Second,
},

interval: time.Second,
isContinue: true,
},
{
name: "retries equals to MaxRetries 3 after the increase",
ctx: context.Background(),
s: &FixedIntervalRetryStrategy{
maxRetries: 3,
interval: time.Second,
Expand All @@ -57,6 +60,7 @@ func TestFixedIntervalRetryStrategy_Next(t *testing.T) {
},
{
name: "retries over MaxRetries after the increase",
ctx: context.Background(),
s: &FixedIntervalRetryStrategy{
maxRetries: 3,
interval: time.Second,
Expand All @@ -67,6 +71,7 @@ func TestFixedIntervalRetryStrategy_Next(t *testing.T) {
},
{
name: "MaxRetries equals to 0",
ctx: context.Background(),
s: &FixedIntervalRetryStrategy{
maxRetries: 0,
interval: time.Second,
Expand All @@ -76,6 +81,7 @@ func TestFixedIntervalRetryStrategy_Next(t *testing.T) {
},
{
name: "negative MaxRetries",
ctx: context.Background(),
s: &FixedIntervalRetryStrategy{
maxRetries: -1,
interval: time.Second,
Expand Down
Loading

0 comments on commit 97490cd

Please sign in to comment.