diff --git a/internal/conn/conn.go b/internal/conn/conn.go index 33aef1d05..ba03b6647 100644 --- a/internal/conn/conn.go +++ b/internal/conn/conn.go @@ -7,7 +7,6 @@ import ( "sync/atomic" "time" - "github.com/jonboulle/clockwork" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb" "google.golang.org/grpc" "google.golang.org/grpc/connectivity" @@ -327,7 +326,7 @@ func (c *conn) Invoke( return c.wrapError(err) } - defer c.lastUsage.Lock()() + defer c.lastUsage.Touch()() ctx, traceID, err := meta.TraceID(ctx) if err != nil { @@ -412,7 +411,7 @@ func (c *conn) NewStream( return nil, c.wrapError(err) } - defer c.lastUsage.Lock()() + defer c.lastUsage.Touch()() ctx, traceID, err := meta.TraceID(ctx) if err != nil { @@ -487,15 +486,11 @@ func withOnTransportError(onTransportError func(ctx context.Context, cc Conn, ca } func newConn(e endpoint.Endpoint, config Config, opts ...option) *conn { - clock := clockwork.NewRealClock() c := &conn{ - endpoint: e, - config: config, - done: make(chan struct{}), - lastUsage: &lastUsage{ - t: clock.Now(), - clock: clock, - }, + endpoint: e, + config: config, + done: make(chan struct{}), + lastUsage: newLastUsage(nil), } c.state.Store(uint32(Created)) for _, opt := range opts { diff --git a/internal/conn/grpc_client_stream.go b/internal/conn/grpc_client_stream.go index 8af9e3b15..0e1b15903 100644 --- a/internal/conn/grpc_client_stream.go +++ b/internal/conn/grpc_client_stream.go @@ -30,7 +30,7 @@ func (s *grpcClientStream) CloseSend() (err error) { onDone(err) }() - defer s.c.lastUsage.Lock()() + defer s.c.lastUsage.Touch()() err = s.ClientStream.CloseSend() @@ -61,7 +61,7 @@ func (s *grpcClientStream) SendMsg(m interface{}) (err error) { onDone(err) }() - defer s.c.lastUsage.Lock()() + defer s.c.lastUsage.Touch()() err = s.ClientStream.SendMsg(m) @@ -100,7 +100,7 @@ func (s *grpcClientStream) RecvMsg(m interface{}) (err error) { onDone(err) }() - defer s.c.lastUsage.Lock()() + defer s.c.lastUsage.Touch()() defer func() { if err != nil { diff --git a/internal/conn/last_usage.go b/internal/conn/last_usage.go index c04a30374..0020a3274 100644 --- a/internal/conn/last_usage.go +++ b/internal/conn/last_usage.go @@ -6,40 +6,42 @@ import ( "time" "github.com/jonboulle/clockwork" - - "github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync" ) type lastUsage struct { locks atomic.Int64 - mu xsync.RWMutex - t time.Time + t atomic.Pointer[time.Time] clock clockwork.Clock } -func (l *lastUsage) Get() time.Time { - if l.locks.CompareAndSwap(0, 1) { - defer func() { - l.locks.Add(-1) - }() +func newLastUsage(clock clockwork.Clock) *lastUsage { + if clock == nil { + clock = clockwork.NewRealClock() + } + now := clock.Now() + usage := &lastUsage{ + clock: clock, + } + usage.t.Store(&now) - l.mu.RLock() - defer l.mu.RUnlock() + return usage +} - return l.t +func (l *lastUsage) Get() time.Time { + if l.locks.Load() == 0 { + return *l.t.Load() } return l.clock.Now() } -func (l *lastUsage) Lock() (releaseFunc func()) { +func (l *lastUsage) Touch() (releaseFunc func()) { l.locks.Add(1) return sync.OnceFunc(func() { if l.locks.Add(-1) == 0 { - l.mu.WithLock(func() { - l.t = l.clock.Now() - }) + now := l.clock.Now() + l.t.Store(&now) } }) } diff --git a/internal/conn/last_usage_test.go b/internal/conn/last_usage_test.go index f39abfb6c..1e3fad0a2 100644 --- a/internal/conn/last_usage_test.go +++ b/internal/conn/last_usage_test.go @@ -13,12 +13,12 @@ func Test_lastUsage_Lock(t *testing.T) { start := time.Unix(0, 0) clock := clockwork.NewFakeClockAt(start) lu := &lastUsage{ - t: start, clock: clock, } + lu.t.Store(&start) t1 := lu.Get() require.Equal(t, start, t1) - f := lu.Lock() + f := lu.Touch() clock.Advance(time.Hour) t2 := lu.Get() require.Equal(t, start.Add(time.Hour), t2) @@ -34,19 +34,19 @@ func Test_lastUsage_Lock(t *testing.T) { start := time.Unix(0, 0) clock := clockwork.NewFakeClockAt(start) lu := &lastUsage{ - t: start, clock: clock, } + lu.t.Store(&start) t1 := lu.Get() require.Equal(t, start, t1) - f1 := lu.Lock() + f1 := lu.Touch() clock.Advance(time.Hour) t2 := lu.Get() require.Equal(t, start.Add(time.Hour), t2) - f2 := lu.Lock() + f2 := lu.Touch() clock.Advance(time.Hour) f1() - f3 := lu.Lock() + f3 := lu.Touch() clock.Advance(time.Hour) t3 := lu.Get() require.Equal(t, start.Add(3*time.Hour), t3) @@ -72,9 +72,10 @@ func Test_lastUsage_Lock(t *testing.T) { start := time.Unix(0, 0) clock := clockwork.NewFakeClockAt(start) lu := &lastUsage{ - t: start, clock: clock, } + lu.t.Store(&start) + func() { t1 := lu.Get() require.Equal(t, start, t1) @@ -82,7 +83,7 @@ func Test_lastUsage_Lock(t *testing.T) { t2 := lu.Get() require.Equal(t, start, t2) clock.Advance(time.Hour) - defer lu.Lock()() + defer lu.Touch()() t3 := lu.Get() require.Equal(t, start.Add(2*time.Hour), t3) clock.Advance(time.Hour)