diff --git a/ably/ably_test.go b/ably/ably_test.go index 13ed5165..608598b6 100644 --- a/ably/ably_test.go +++ b/ably/ably_test.go @@ -492,6 +492,10 @@ type interceptConn struct { active *activeIntercept } +func (c interceptConn) Unwrap() ably.Conn { + return c.Conn +} + func (c interceptConn) Receive(deadline time.Time) (*ably.ProtocolMessage, error) { msg, err := c.Conn.Receive(deadline) if err != nil { diff --git a/ably/realtime_channel_integration_test.go b/ably/realtime_channel_integration_test.go index c5a0ec0a..0deefa96 100644 --- a/ably/realtime_channel_integration_test.go +++ b/ably/realtime_channel_integration_test.go @@ -294,6 +294,31 @@ func TestRealtimeChannel_ShouldSetProvidedReadLimit(t *testing.T) { assert.Equal(t, int64(2048), client.Connection.ReadLimit()) } +func TestRealtimeChannel_SetDefaultReadLimitIfServerHasNoLimit(t *testing.T) { + + dial := func(proto string, url *url.URL, timeout time.Duration) (ably.Conn, error) { + return ably.DialWebsocket(proto, url, timeout) + } + wrappedDialWebsocket, interceptMsg := DialIntercept(dial) + + ctx, cancel := context.WithCancel(context.Background()) + msgCh := interceptMsg(ctx, ably.ActionConnected) + + app, client := ablytest.NewRealtime(ably.WithDial(wrappedDialWebsocket)) + defer safeclose(t, ablytest.FullRealtimeCloser(client), app) + connectedWaiter := ablytest.ConnWaiter(client, nil, ably.ConnectionEventConnected) + + connectedMsg := <-msgCh + connectedMsg.ConnectionDetails.MaxMessageSize = 0 // 0 represents limitless message size + cancel() // unblocks updated message to be processed + + err := ablytest.Wait(connectedWaiter, nil) + assert.Nil(t, err) + + // If server set limit is 0, value is set to default readlimit + assert.Equal(t, int64(65536), client.Connection.ReadLimit()) +} + func TestRealtimeChannel_ShouldReturnErrorIfReadLimitExceeded(t *testing.T) { app, client1 := ablytest.NewRealtime(ably.WithEchoMessages(false)) defer safeclose(t, ablytest.FullRealtimeCloser(client1), app) diff --git a/ably/realtime_conn.go b/ably/realtime_conn.go index e82eca03..99cf7bc2 100644 --- a/ably/realtime_conn.go +++ b/ably/realtime_conn.go @@ -826,7 +826,7 @@ func (c *Connection) eventloop() { c.connStateTTL = connDetails.ConnectionStateTTL // Spec RSA7b3, RSA7b4, RSA12a c.auth.updateClientID(connDetails.ClientID) - if !c.isReadLimitSetExternally { + if !c.isReadLimitSetExternally && connDetails.MaxMessageSize > 0 { c.readLimit = connDetails.MaxMessageSize // set MaxMessageSize limit as per TO3l8 } } @@ -988,6 +988,10 @@ func (vc verboseConn) Close() error { return vc.conn.Close() } +func (vc verboseConn) Unwrap() conn { + return vc.conn +} + func (c *Connection) setState(state ConnectionState, err error, retryIn time.Duration) error { c.mtx.Lock() defer c.mtx.Unlock() diff --git a/ably/websocket.go b/ably/websocket.go index 9a9370a7..febfc6ae 100644 --- a/ably/websocket.go +++ b/ably/websocket.go @@ -114,12 +114,19 @@ func dialWebsocketTimeout(uri, origin string, timeout time.Duration, agents map[ return c, nil } -func setConnectionReadLimit(c conn, readLimit int64) error { - verboseConn, ok := c.(verboseConn) +func unwrapConn(c conn) conn { + u, ok := c.(interface { + Unwrap() conn + }) if !ok { - return errors.New("cannot set readlimit for connection, connection does not use verboseConn") + return c } - websocketConn, ok := verboseConn.conn.(*websocketConn) + return unwrapConn(u.Unwrap()) +} + +func setConnectionReadLimit(c conn, readLimit int64) error { + unwrappedConn := unwrapConn(c) + websocketConn, ok := unwrappedConn.(*websocketConn) if !ok { return errors.New("cannot set readlimit for connection, connection does not use nhooyr.io/websocket") }