Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use sync/atomic instead of Uber's implementation #256

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions controller/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"time"

"github.com/stretchr/testify/require"
"go.uber.org/atomic"

"github.com/apache/kvrocks-controller/consts"
"github.com/apache/kvrocks-controller/store"
Expand Down Expand Up @@ -66,7 +65,7 @@ func (s *MockClusterStore) GetCluster(ctx context.Context, ns, cluster string) (
}

func (s *MockClusterStore) UpdateCluster(ctx context.Context, ns string, cluster *store.Cluster) error {
cluster.Version.Inc()
cluster.Version.Add(1)
return s.SetCluster(ctx, ns, cluster)
}

Expand Down Expand Up @@ -104,8 +103,7 @@ func TestCluster_FailureCount(t *testing.T) {
mockNode3.Sequence = 101

clusterInfo := &store.Cluster{
Name: clusterName,
Version: *atomic.NewInt64(1),
Name: clusterName,
Shards: []*store.Shard{{
Nodes: []store.Node{
mockNode0, mockNode1, mockNode2, mockNode3,
Expand All @@ -115,7 +113,10 @@ func TestCluster_FailureCount(t *testing.T) {
TargetShardIndex: -1,
}},
}
clusterInfo.Version.Store(1)

require.NoError(t, s.CreateCluster(ctx, ns, clusterInfo))

cluster := &ClusterChecker{
clusterStore: s,
namespace: ns,
Expand Down
6 changes: 3 additions & 3 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/apache/kvrocks-controller/config"
Expand Down Expand Up @@ -66,7 +66,7 @@ func New(s *store.ClusterStore, config *config.ControllerConfig) (*Controller, e
}

func (c *Controller) Start(ctx context.Context) error {
if !c.state.CAS(stateInit, stateRunning) {
if !c.state.CompareAndSwap(stateInit, stateRunning) {
return nil
}

Expand Down Expand Up @@ -234,7 +234,7 @@ func (c *Controller) updateCluster(namespace, clusterName string) {
}

func (c *Controller) Close() {
if !c.state.CAS(stateRunning, stateClosed) {
if !c.state.CompareAndSwap(stateRunning, stateClosed) {
return
}

Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ require (
go.etcd.io/etcd/client/v3 v3.5.17
go.etcd.io/etcd/raft/v3 v3.5.17
go.etcd.io/etcd/server/v3 v3.5.17
go.uber.org/atomic v1.11.0
go.uber.org/zap v1.27.0
gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0
)
Expand Down
8 changes: 0 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ github.com/go-playground/validator/v10 v10.23.0 h1:/PwmTwZhS0dPkav3cdK9kV1FsAmrL
github.com/go-playground/validator/v10 v10.23.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-resty/resty/v2 v2.16.2 h1:CpRqTjIzq/rweXUt9+GxzzQdlkqMdt8Lm/fuK/CAbAg=
github.com/go-resty/resty/v2 v2.16.2/go.mod h1:0fHAoK7JoBy/Ch36N8VFeMsK7xQOHhvWaC3iOktwmIU=
github.com/go-resty/resty/v2 v2.16.3 h1:zacNT7lt4b8M/io2Ahj6yPypL7bqx9n1iprfQuodV+E=
github.com/go-resty/resty/v2 v2.16.3/go.mod h1:hkJtXbA2iKHzJheXYvQ8snQES5ZLGKMwQ07xAwp/fiA=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
Expand Down Expand Up @@ -184,16 +182,13 @@ github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVc
github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
Expand Down Expand Up @@ -325,8 +320,6 @@ go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4Jjx
go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8=
go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys=
go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
Expand Down Expand Up @@ -384,7 +377,6 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Expand Down
2 changes: 1 addition & 1 deletion server/api/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestClusterBasics(t *testing.T) {
after, err := handler.s.GetCluster(ctx, ns, "test-cluster")
require.NoError(t, err)

require.EqualValues(t, before.Version.Inc(), after.Version.Load())
require.EqualValues(t, before.Version.Add(1), after.Version.Load())
require.Len(t, after.Shards[0].SlotRanges, 2)
require.EqualValues(t, store.SlotRange{Start: 0, Stop: 2}, after.Shards[0].SlotRanges[0])
require.EqualValues(t, store.SlotRange{Start: 4, Stop: 8191}, after.Shards[0].SlotRanges[1])
Expand Down
14 changes: 8 additions & 6 deletions server/api/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

"github.com/gin-gonic/gin"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

"github.com/apache/kvrocks-controller/consts"
"github.com/apache/kvrocks-controller/server/middleware"
Expand All @@ -48,11 +47,14 @@ func TestShardBasics(t *testing.T) {
shard := store.NewShard()
shard.SlotRanges = []store.SlotRange{{Start: 0, Stop: 16383}}
shard.Nodes = []store.Node{store.NewClusterNode("127.0.0.1:1234", "")}
err := handler.s.CreateCluster(context.Background(), ns, &store.Cluster{
Name: clusterName,
Version: *atomic.NewInt64(1),
Shards: []*store.Shard{shard},
})

clusterInfo := &store.Cluster{
Name: clusterName,
Shards: []*store.Shard{shard},
}
clusterInfo.Version.Store(1)

err := handler.s.CreateCluster(context.Background(), ns, clusterInfo)
require.NoError(t, err)

runCreate := func(t *testing.T, expectedStatusCode int) {
Expand Down
50 changes: 42 additions & 8 deletions store/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ package store

import (
"context"
"encoding/json"
"errors"
"fmt"
"sort"
"strconv"
"strings"

"go.uber.org/atomic"
"sync/atomic"

"github.com/apache/kvrocks-controller/consts"
)

type Cluster struct {
Name string `json:"name"`
Version atomic.Int64 `json:"version"`
Version atomic.Int64 `json:"-"`
Shards []*Shard `json:"shards"`
}

Expand Down Expand Up @@ -232,7 +232,7 @@ func (cluster *Cluster) MigrateSlot(ctx context.Context, slot int, targetShardId
}

func (cluster *Cluster) SetSlot(ctx context.Context, slot int, targetNodeID string) error {
version := cluster.Version.Inc()
version := cluster.Version.Add(1)
for i := 0; i < len(cluster.Shards); i++ {
for _, node := range cluster.Shards[i].Nodes {
clusterNode, ok := node.(*ClusterNode)
Expand Down Expand Up @@ -309,8 +309,42 @@ func ParseCluster(clusterStr string) (*Cluster, error) {
masterNode := shards[i].Nodes[0]
shards[i].Nodes = append(shards[i].Nodes, slaveNodes[masterNode.ID()]...)
}
return &Cluster{
Version: *atomic.NewInt64(clusterVer),
Shards: shards,
}, nil

clusterInfo := &Cluster{
Shards: shards,
}
clusterInfo.Version.Store(clusterVer)

return clusterInfo, nil
}

// MarshalJSON is a custom function since the atomic.Int64 type does not directly implement JSON marshaling.
func (cluster *Cluster) MarshalJSON() ([]byte, error) {
type Alias Cluster // to avoid recursion

return json.Marshal(&struct {
Version int64 `json:"version"`
*Alias
}{
Version: cluster.Version.Load(),
Alias: (*Alias)(cluster),
})
}

// UnmarshalJSON is a custom function since the atomic.Int64 type does not directly implement JSON unmarshaling.
func (cluster *Cluster) UnmarshalJSON(data []byte) error {
type Alias Cluster

aux := &struct {
Version int64 `json:"version"`
*Alias
}{
Alias: (*Alias)(cluster),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}

cluster.Version.Store(aux.Version)
return nil
}
3 changes: 2 additions & 1 deletion store/engine/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@
* under the License.
*
*/

package etcd

import (
"context"
"errors"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/apache/kvrocks-controller/consts"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/pkg/transport"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/apache/kvrocks-controller/logger"
Expand Down
7 changes: 3 additions & 4 deletions store/engine/raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"net/url"
"os"
"sync"
"sync/atomic"
"time"

"github.com/apache/kvrocks-controller/logger"
Expand All @@ -39,8 +40,6 @@ import (
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp"
stats "go.etcd.io/etcd/server/v3/etcdserver/api/v2stats"

"go.uber.org/atomic"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -131,7 +130,7 @@ func (n *Node) SetSnapshotThreshold(threshold uint64) {

func (n *Node) run() error {
// The node is already running
if !n.isRunning.CAS(false, true) {
if !n.isRunning.CompareAndSwap(false, true) {
return nil
}
n.shutdown = make(chan struct{})
Expand Down Expand Up @@ -526,7 +525,7 @@ func (n *Node) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
}

func (n *Node) Close() error {
if !n.isRunning.CAS(true, false) {
if !n.isRunning.CompareAndSwap(true, false) {
return nil
}
close(n.shutdown)
Expand Down
10 changes: 5 additions & 5 deletions store/engine/zookeeper/zookeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,27 @@
* under the License.
*
*/

package zookeeper

import (
"context"
"errors"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/apache/kvrocks-controller/logger"
"github.com/apache/kvrocks-controller/store/engine"
"github.com/go-zookeeper/zk"
"go.uber.org/atomic"
)

const (
sessionTTL = 6 * time.Second
sessionTTL = 6 * time.Second
defaultElectPath = "/kvrocks/controller/leader"
)

const defaultElectPath = "/kvrocks/controller/leader"

type Config struct {
Addrs []string `yaml:"addrs"`
Scheme string `yaml:"scheme"`
Expand Down Expand Up @@ -143,7 +143,7 @@ func (e *Zookeeper) Exists(ctx context.Context, key string) (bool, error) {
return exists, nil
}

// If the key exists, it will be set; if not, it will be created.
// Set sets the value for the key. If the key exists, it will be set; if not, it will be created.
func (e *Zookeeper) Set(ctx context.Context, key string, value []byte) error {
exist, _ := e.Exists(ctx, key)
if exist {
Expand Down
2 changes: 1 addition & 1 deletion store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (s *ClusterStore) UpdateCluster(ctx context.Context, ns string, clusterInfo
return fmt.Errorf("the cluster has been updated by others")
}

clusterInfo.Version.Inc()
clusterInfo.Version.Add(1)
clusterBytes, err := json.Marshal(clusterInfo)
if err != nil {
return fmt.Errorf("cluster: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestClusterStore(t *testing.T) {
gotCluster, err := store.GetCluster(ctx, ns, "cluster0")
require.NoError(t, err)
require.Equal(t, cluster0.Name, gotCluster.Name)
require.Equal(t, cluster0.Version, gotCluster.Version)
require.Equal(t, cluster0.Version.Load(), gotCluster.Version.Load())

gotClusters, err := store.ListCluster(ctx, ns)
require.NoError(t, err)
Expand Down
Loading