From c28ddc7f91570360990ee6d19d4073d25283749e Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Sun, 24 Mar 2024 14:12:36 +0300 Subject: [PATCH 1/2] simplify lastUsage code --- internal/conn/conn.go | 17 +++++---------- internal/conn/grpc_client_stream.go | 6 ++--- internal/conn/last_usage.go | 34 +++++++++++++++-------------- internal/conn/last_usage_test.go | 17 ++++++++------- 4 files changed, 36 insertions(+), 38 deletions(-) diff --git a/internal/conn/conn.go b/internal/conn/conn.go index 33aef1d05..aa465eae7 100644 --- a/internal/conn/conn.go +++ b/internal/conn/conn.go @@ -7,7 +7,6 @@ import ( "sync/atomic" "time" - "github.com/jonboulle/clockwork" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" @@ -327,7 +326,7 @@ func (c *conn) Invoke( return c.wrapError(err) } - defer c.lastUsage.Lock()() + defer c.lastUsage.SharedLock()() ctx, traceID, err := meta.TraceID(ctx) if err != nil { @@ -412,7 +411,7 @@ func (c *conn) NewStream( return nil, c.wrapError(err) } - defer c.lastUsage.Lock()() + defer c.lastUsage.SharedLock()() ctx, traceID, err := meta.TraceID(ctx) if err != nil { @@ -487,15 +486,11 @@ func withOnTransportError(onTransportError func(ctx context.Context, cc Conn, ca } func newConn(e endpoint.Endpoint, config Config, opts ...option) *conn { - clock := clockwork.NewRealClock() c := &conn{ - endpoint: e, - config: config, - done: make(chan struct{}), - lastUsage: &lastUsage{ - t: clock.Now(), - clock: clock, - }, + endpoint: e, + config: config, + done: make(chan struct{}), + lastUsage: newLastUsage(nil), } c.state.Store(uint32(Created)) for _, opt := range opts { diff --git a/internal/conn/grpc_client_stream.go b/internal/conn/grpc_client_stream.go index 8af9e3b15..8b92e9e13 100644 --- a/internal/conn/grpc_client_stream.go +++ b/internal/conn/grpc_client_stream.go @@ -30,7 +30,7 @@ func (s *grpcClientStream) CloseSend() (err error) { onDone(err) }() - defer s.c.lastUsage.Lock()() + defer s.c.lastUsage.SharedLock()() err = s.ClientStream.CloseSend() @@ -61,7 +61,7 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) { onDone(err) }() - defer s.c.lastUsage.Lock()() + defer s.c.lastUsage.SharedLock()() err = s.ClientStream.SendMsg(m) @@ -100,7 +100,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) { onDone(err) }() - defer s.c.lastUsage.Lock()() + defer s.c.lastUsage.SharedLock()() defer func() { if err != nil { diff --git a/internal/conn/last_usage.go b/internal/conn/last_usage.go index c04a30374..038420e26 100644 --- a/internal/conn/last_usage.go +++ b/internal/conn/last_usage.go @@ -6,40 +6,42 @@ import ( "time" "github.com/jonboulle/clockwork" - - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" ) type lastUsage struct { locks atomic.Int64 - mu xsync.RWMutex - t time.Time + t atomic.Pointer[time.Time] clock clockwork.Clock } -func (l *lastUsage) Get() time.Time { - if l.locks.CompareAndSwap(0, 1) { - defer func() { - l.locks.Add(-1) - }() +func newLastUsage(clock clockwork.Clock) *lastUsage { + if clock == nil { + clock = clockwork.NewRealClock() + } + now := clock.Now() + usage := &lastUsage{ + clock: clock, + } + usage.t.Store(&now) - l.mu.RLock() - defer l.mu.RUnlock() + return usage +} - return l.t +func (l *lastUsage) Get() time.Time { + if l.locks.Load() == 0 { + return *l.t.Load() } return l.clock.Now() } -func (l *lastUsage) Lock() (releaseFunc func()) { +func (l *lastUsage) SharedLock() (releaseFunc func()) { l.locks.Add(1) return sync.OnceFunc(func() { if l.locks.Add(-1) == 0 { - l.mu.WithLock(func() { - l.t = l.clock.Now() - }) + now := l.clock.Now() + l.t.Store(&now) } }) } diff --git a/internal/conn/last_usage_test.go b/internal/conn/last_usage_test.go index f39abfb6c..b640e9a37 100644 --- a/internal/conn/last_usage_test.go +++ b/internal/conn/last_usage_test.go @@ -13,12 +13,12 @@ func Test_lastUsage_Lock(t *testing.T) { start := time.Unix(0, 0) clock := clockwork.NewFakeClockAt(start) lu := &lastUsage{ - t: start, clock: clock, } + lu.t.Store(&start) t1 := lu.Get() require.Equal(t, start, t1) - f := lu.Lock() + f := lu.SharedLock() clock.Advance(time.Hour) t2 := lu.Get() require.Equal(t, start.Add(time.Hour), t2) @@ -34,19 +34,19 @@ func Test_lastUsage_Lock(t *testing.T) { start := time.Unix(0, 0) clock := clockwork.NewFakeClockAt(start) lu := &lastUsage{ - t: start, clock: clock, } + lu.t.Store(&start) t1 := lu.Get() require.Equal(t, start, t1) - f1 := lu.Lock() + f1 := lu.SharedLock() clock.Advance(time.Hour) t2 := lu.Get() require.Equal(t, start.Add(time.Hour), t2) - f2 := lu.Lock() + f2 := lu.SharedLock() clock.Advance(time.Hour) f1() - f3 := lu.Lock() + f3 := lu.SharedLock() clock.Advance(time.Hour) t3 := lu.Get() require.Equal(t, start.Add(3*time.Hour), t3) @@ -72,9 +72,10 @@ func Test_lastUsage_Lock(t *testing.T) { start := time.Unix(0, 0) clock := clockwork.NewFakeClockAt(start) lu := &lastUsage{ - t: start, clock: clock, } + lu.t.Store(&start) + func() { t1 := lu.Get() require.Equal(t, start, t1) @@ -82,7 +83,7 @@ func Test_lastUsage_Lock(t *testing.T) { t2 := lu.Get() require.Equal(t, start, t2) clock.Advance(time.Hour) - defer lu.Lock()() + defer lu.SharedLock()() t3 := lu.Get() require.Equal(t, start.Add(2*time.Hour), t3) clock.Advance(time.Hour) From e079f4f773d7d5ca267312feea983c28a68761f0 Mon Sep 17 00:00:00 2001 From: Aleksey Myasnikov <79263256394@ya.ru> Date: Sun, 24 Mar 2024 16:44:31 +0300 Subject: [PATCH 2/2] SharedLock -> Touch --- internal/conn/conn.go | 4 ++-- internal/conn/grpc_client_stream.go | 6 +++--- internal/conn/last_usage.go | 2 +- internal/conn/last_usage_test.go | 10 +++++----- 4 files changed, 11 insertions(+), 11 deletions(-) diff --git a/internal/conn/conn.go b/internal/conn/conn.go index aa465eae7..ba03b6647 100644 --- a/internal/conn/conn.go +++ b/internal/conn/conn.go @@ -326,7 +326,7 @@ func (c *conn) Invoke( return c.wrapError(err) } - defer c.lastUsage.SharedLock()() + defer c.lastUsage.Touch()() ctx, traceID, err := meta.TraceID(ctx) if err != nil { @@ -411,7 +411,7 @@ func (c *conn) NewStream( return nil, c.wrapError(err) } - defer c.lastUsage.SharedLock()() + defer c.lastUsage.Touch()() ctx, traceID, err := meta.TraceID(ctx) if err != nil { diff --git a/internal/conn/grpc_client_stream.go b/internal/conn/grpc_client_stream.go index 8b92e9e13..0e1b15903 100644 --- a/internal/conn/grpc_client_stream.go +++ b/internal/conn/grpc_client_stream.go @@ -30,7 +30,7 @@ func (s *grpcClientStream) CloseSend() (err error) { onDone(err) }() - defer s.c.lastUsage.SharedLock()() + defer s.c.lastUsage.Touch()() err = s.ClientStream.CloseSend() @@ -61,7 +61,7 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) { onDone(err) }() - defer s.c.lastUsage.SharedLock()() + defer s.c.lastUsage.Touch()() err = s.ClientStream.SendMsg(m) @@ -100,7 +100,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) { onDone(err) }() - defer s.c.lastUsage.SharedLock()() + defer s.c.lastUsage.Touch()() defer func() { if err != nil { diff --git a/internal/conn/last_usage.go b/internal/conn/last_usage.go index 038420e26..0020a3274 100644 --- a/internal/conn/last_usage.go +++ b/internal/conn/last_usage.go @@ -35,7 +35,7 @@ func (l *lastUsage) Get() time.Time { return l.clock.Now() } -func (l *lastUsage) SharedLock() (releaseFunc func()) { +func (l *lastUsage) Touch() (releaseFunc func()) { l.locks.Add(1) return sync.OnceFunc(func() { diff --git a/internal/conn/last_usage_test.go b/internal/conn/last_usage_test.go index b640e9a37..1e3fad0a2 100644 --- a/internal/conn/last_usage_test.go +++ b/internal/conn/last_usage_test.go @@ -18,7 +18,7 @@ func Test_lastUsage_Lock(t *testing.T) { lu.t.Store(&start) t1 := lu.Get() require.Equal(t, start, t1) - f := lu.SharedLock() + f := lu.Touch() clock.Advance(time.Hour) t2 := lu.Get() require.Equal(t, start.Add(time.Hour), t2) @@ -39,14 +39,14 @@ func Test_lastUsage_Lock(t *testing.T) { lu.t.Store(&start) t1 := lu.Get() require.Equal(t, start, t1) - f1 := lu.SharedLock() + f1 := lu.Touch() clock.Advance(time.Hour) t2 := lu.Get() require.Equal(t, start.Add(time.Hour), t2) - f2 := lu.SharedLock() + f2 := lu.Touch() clock.Advance(time.Hour) f1() - f3 := lu.SharedLock() + f3 := lu.Touch() clock.Advance(time.Hour) t3 := lu.Get() require.Equal(t, start.Add(3*time.Hour), t3) @@ -83,7 +83,7 @@ func Test_lastUsage_Lock(t *testing.T) { t2 := lu.Get() require.Equal(t, start, t2) clock.Advance(time.Hour) - defer lu.SharedLock()() + defer lu.Touch()() t3 := lu.Get() require.Equal(t, start.Add(2*time.Hour), t3) clock.Advance(time.Hour)