diff --git a/httpcore/__init__.py b/httpcore/__init__.py index 014213bae..88255f60b 100644 --- a/httpcore/__init__.py +++ b/httpcore/__init__.py @@ -29,6 +29,7 @@ ReadError, ReadTimeout, RemoteProtocolError, + ServerDisconnectedError, TimeoutException, UnsupportedProtocol, WriteError, @@ -114,6 +115,7 @@ def __init__(self, *args, **kwargs): # type: ignore "SOCKET_OPTION", # exceptions "ConnectionNotAvailable", + "ServerDisconnectedError", "ProxyError", "ProtocolError", "LocalProtocolError", diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 214dfc4be..8fa9eecf8 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -5,7 +5,11 @@ from .._backends.auto import AutoBackend from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend -from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol +from .._exceptions import ( + ConnectionNotAvailable, + ServerDisconnectedError, + UnsupportedProtocol, +) from .._models import Origin, Request, Response from .._synchronization import AsyncEvent, AsyncShieldCancellation, AsyncThreadLock from .connection import AsyncHTTPConnection @@ -196,7 +200,7 @@ async def handle_async_request(self, request: Request) -> Response: response = await connection.handle_async_request( pool_request.request ) - except ConnectionNotAvailable: + except (ConnectionNotAvailable, ServerDisconnectedError): # In some cases a connection may initially be available to # handle a request, but then become unavailable. # diff --git a/httpcore/_async/http11.py b/httpcore/_async/http11.py index 0493a923d..9a3cb98c1 100644 --- a/httpcore/_async/http11.py +++ b/httpcore/_async/http11.py @@ -21,6 +21,7 @@ ConnectionNotAvailable, LocalProtocolError, RemoteProtocolError, + ServerDisconnectedError, WriteError, map_exceptions, ) @@ -45,6 +46,7 @@ class HTTPConnectionState(enum.IntEnum): ACTIVE = 1 IDLE = 2 CLOSED = 3 + SERVER_DISCONNECTED = 4 class AsyncHTTP11Connection(AsyncConnectionInterface): @@ -59,7 +61,7 @@ def __init__( ) -> None: self._origin = origin self._network_stream = stream - self._keepalive_expiry: Optional[float] = keepalive_expiry + self._keepalive_expiry = keepalive_expiry self._expire_at: Optional[float] = None self._state = HTTPConnectionState.NEW self._state_lock = AsyncLock() @@ -77,6 +79,20 @@ async def handle_async_request(self, request: Request) -> Response: ) async with self._state_lock: + if self._state == HTTPConnectionState.SERVER_DISCONNECTED: + raise ServerDisconnectedError() + + # If the HTTP connection is idle but the socket is readable, then the + # only valid state is that the socket is about to return b"", indicating + # a server-initiated disconnect. + server_disconnected = ( + self._state == HTTPConnectionState.IDLE + and self._network_stream.get_extra_info("is_readable") + ) + if server_disconnected: + self._state = HTTPConnectionState.SERVER_DISCONNECTED + raise ServerDisconnectedError() + if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE): self._request_count += 1 self._state = HTTPConnectionState.ACTIVE @@ -279,18 +295,13 @@ def is_available(self) -> bool: return self._state == HTTPConnectionState.IDLE def has_expired(self) -> bool: - now = time.monotonic() - keepalive_expired = self._expire_at is not None and now > self._expire_at - - # If the HTTP connection is idle but the socket is readable, then the - # only valid state is that the socket is about to return b"", indicating - # a server-initiated disconnect. - server_disconnected = ( - self._state == HTTPConnectionState.IDLE - and self._network_stream.get_extra_info("is_readable") - ) + if self._state == HTTPConnectionState.SERVER_DISCONNECTED: + # Connection that is disconnected by the server is considered expired. + # Pool then cleans up this connection by closing it. + return True - return keepalive_expired or server_disconnected + now = time.monotonic() + return self._expire_at is not None and now > self._expire_at def is_idle(self) -> bool: return self._state == HTTPConnectionState.IDLE diff --git a/httpcore/_exceptions.py b/httpcore/_exceptions.py index 81e7fc61d..d1b152c94 100644 --- a/httpcore/_exceptions.py +++ b/httpcore/_exceptions.py @@ -19,6 +19,10 @@ class ConnectionNotAvailable(Exception): pass +class ServerDisconnectedError(Exception): + pass + + class ProxyError(Exception): pass diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index 01bec59e8..b5ecc0338 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -5,7 +5,11 @@ from .._backends.sync import SyncBackend from .._backends.base import SOCKET_OPTION, NetworkBackend -from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol +from .._exceptions import ( + ConnectionNotAvailable, + ServerDisconnectedError, + UnsupportedProtocol, +) from .._models import Origin, Request, Response from .._synchronization import Event, ShieldCancellation, ThreadLock from .connection import HTTPConnection @@ -196,7 +200,7 @@ def handle_request(self, request: Request) -> Response: response = connection.handle_request( pool_request.request ) - except ConnectionNotAvailable: + except (ConnectionNotAvailable, ServerDisconnectedError): # In some cases a connection may initially be available to # handle a request, but then become unavailable. # diff --git a/httpcore/_sync/http11.py b/httpcore/_sync/http11.py index a74ff8e80..3aa977c83 100644 --- a/httpcore/_sync/http11.py +++ b/httpcore/_sync/http11.py @@ -21,6 +21,7 @@ ConnectionNotAvailable, LocalProtocolError, RemoteProtocolError, + ServerDisconnectedError, WriteError, map_exceptions, ) @@ -45,6 +46,7 @@ class HTTPConnectionState(enum.IntEnum): ACTIVE = 1 IDLE = 2 CLOSED = 3 + SERVER_DISCONNECTED = 4 class HTTP11Connection(ConnectionInterface): @@ -59,7 +61,7 @@ def __init__( ) -> None: self._origin = origin self._network_stream = stream - self._keepalive_expiry: Optional[float] = keepalive_expiry + self._keepalive_expiry = keepalive_expiry self._expire_at: Optional[float] = None self._state = HTTPConnectionState.NEW self._state_lock = Lock() @@ -77,6 +79,20 @@ def handle_request(self, request: Request) -> Response: ) with self._state_lock: + if self._state == HTTPConnectionState.SERVER_DISCONNECTED: + raise ServerDisconnectedError() + + # If the HTTP connection is idle but the socket is readable, then the + # only valid state is that the socket is about to return b"", indicating + # a server-initiated disconnect. + server_disconnected = ( + self._state == HTTPConnectionState.IDLE + and self._network_stream.get_extra_info("is_readable") + ) + if server_disconnected: + self._state = HTTPConnectionState.SERVER_DISCONNECTED + raise ServerDisconnectedError() + if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE): self._request_count += 1 self._state = HTTPConnectionState.ACTIVE @@ -279,18 +295,13 @@ def is_available(self) -> bool: return self._state == HTTPConnectionState.IDLE def has_expired(self) -> bool: - now = time.monotonic() - keepalive_expired = self._expire_at is not None and now > self._expire_at - - # If the HTTP connection is idle but the socket is readable, then the - # only valid state is that the socket is about to return b"", indicating - # a server-initiated disconnect. - server_disconnected = ( - self._state == HTTPConnectionState.IDLE - and self._network_stream.get_extra_info("is_readable") - ) + if self._state == HTTPConnectionState.SERVER_DISCONNECTED: + # Connection that is disconnected by the server is considered expired. + # Pool then cleans up this connection by closing it. + return True - return keepalive_expired or server_disconnected + now = time.monotonic() + return self._expire_at is not None and now > self._expire_at def is_idle(self) -> bool: return self._state == HTTPConnectionState.IDLE diff --git a/tests/_async/test_connection_pool.py b/tests/_async/test_connection_pool.py index 2fc272049..93ff9cda5 100644 --- a/tests/_async/test_connection_pool.py +++ b/tests/_async/test_connection_pool.py @@ -687,6 +687,82 @@ async def test_connection_pool_closed_while_request_in_flight(): await response.aread() +@pytest.mark.anyio +async def test_connection_pool_with_idle_broken_connection(): + """ + Pool gives a new connection when an idle connection gets readable (ie broken) while in the pool. + """ + + class MockStream(httpcore.AsyncMockStream): + def __init__(self, buffer: typing.List[bytes]): + super().__init__(buffer) + self.mock_is_readable = False + + def get_extra_info(self, info: str) -> typing.Any: + if info == "is_readable": + return self.mock_is_readable + return super().get_extra_info(info) # pragma: nocover + + streams = [ + MockStream( + [ + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 15\r\n", + b"\r\n", + b"Hello, world 1!", + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 15\r\n", + b"\r\n", + b"Hello, world 2!", + ] + ), + MockStream( + [ + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 29\r\n", + b"\r\n", + b"Hello, world from new stream!", + ] + ), + ] + + class MockBackend(httpcore.AsyncMockBackend): + async def connect_tcp( + self, *args: typing.Any, **kwargs: typing.Any + ) -> MockStream: + return streams.pop(0) + + async with httpcore.AsyncConnectionPool( + network_backend=MockBackend([]), max_connections=1 + ) as pool: + res = await pool.request("GET", "https://example.com/") + assert (await res.aread()) == b"Hello, world 1!" + + assert len(pool.connections) == 1 + conn = pool.connections[0] + + res = await pool.request("GET", "https://example.com/") + assert (await res.aread()) == b"Hello, world 2!" + + assert len(pool.connections) == 1 + assert conn is pool.connections[0], "Should reuse connection" + + # Simulate network breakage + assert conn.is_idle() + conn._connection._network_stream.mock_is_readable = True # type: ignore[attr-defined] + + res = await pool.request("GET", "https://example.com/") + assert (await res.aread()) == b"Hello, world from new stream!" + + assert len(pool.connections) == 1 + new_conn = pool.connections[0] + assert new_conn is not conn, "Should be a new connection" + assert not new_conn._connection._network_stream.mock_is_readable # type: ignore[attr-defined] + + @pytest.mark.anyio async def test_connection_pool_timeout(): """ diff --git a/tests/_async/test_http11.py b/tests/_async/test_http11.py index 94f2febf0..968051066 100644 --- a/tests/_async/test_http11.py +++ b/tests/_async/test_http11.py @@ -1,6 +1,9 @@ +import typing + import pytest import httpcore +from httpcore._exceptions import ServerDisconnectedError @pytest.mark.anyio @@ -167,6 +170,47 @@ async def test_http11_connection_handles_one_active_request(): await conn.request("GET", "https://example.com/") +@pytest.mark.anyio +async def test_http11_idle_connection_checks_readable_state(): + """ + Idle connection can not be readable when requesting. + """ + + class MockStream(httpcore.AsyncMockStream): + def __init__(self, buffer: typing.List[bytes]): + super().__init__(buffer) + self.mock_is_readable = False + + def get_extra_info(self, info: str) -> typing.Any: + if info == "is_readable": + return self.mock_is_readable + return super().get_extra_info(info) # pragma: nocover + + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = MockStream( + [ + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 13\r\n", + b"\r\n", + b"Hello, world!", + ] + ) + async with httpcore.AsyncHTTP11Connection(origin=origin, stream=stream) as conn: + await conn.request("GET", "https://example.com/") + + assert conn.is_idle() and not conn.has_expired() + stream.mock_is_readable = True # Simulate connection breakage + + with pytest.raises(ServerDisconnectedError): + await conn.request("GET", "https://example.com/") + assert conn.has_expired() and not conn.is_idle() + + with pytest.raises(ServerDisconnectedError): + await conn.request("GET", "https://example.com/") + assert conn.has_expired() and not conn.is_idle() + + @pytest.mark.anyio async def test_http11_connection_attempt_close(): """ diff --git a/tests/_sync/test_connection_pool.py b/tests/_sync/test_connection_pool.py index ee303e5cf..b183f20a3 100644 --- a/tests/_sync/test_connection_pool.py +++ b/tests/_sync/test_connection_pool.py @@ -688,6 +688,82 @@ def test_connection_pool_closed_while_request_in_flight(): +def test_connection_pool_with_idle_broken_connection(): + """ + Pool gives a new connection when an idle connection gets readable (ie broken) while in the pool. + """ + + class MockStream(httpcore.MockStream): + def __init__(self, buffer: typing.List[bytes]): + super().__init__(buffer) + self.mock_is_readable = False + + def get_extra_info(self, info: str) -> typing.Any: + if info == "is_readable": + return self.mock_is_readable + return super().get_extra_info(info) # pragma: nocover + + streams = [ + MockStream( + [ + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 15\r\n", + b"\r\n", + b"Hello, world 1!", + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 15\r\n", + b"\r\n", + b"Hello, world 2!", + ] + ), + MockStream( + [ + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 29\r\n", + b"\r\n", + b"Hello, world from new stream!", + ] + ), + ] + + class MockBackend(httpcore.MockBackend): + def connect_tcp( + self, *args: typing.Any, **kwargs: typing.Any + ) -> MockStream: + return streams.pop(0) + + with httpcore.ConnectionPool( + network_backend=MockBackend([]), max_connections=1 + ) as pool: + res = pool.request("GET", "https://example.com/") + assert (res.read()) == b"Hello, world 1!" + + assert len(pool.connections) == 1 + conn = pool.connections[0] + + res = pool.request("GET", "https://example.com/") + assert (res.read()) == b"Hello, world 2!" + + assert len(pool.connections) == 1 + assert conn is pool.connections[0], "Should reuse connection" + + # Simulate network breakage + assert conn.is_idle() + conn._connection._network_stream.mock_is_readable = True # type: ignore[attr-defined] + + res = pool.request("GET", "https://example.com/") + assert (res.read()) == b"Hello, world from new stream!" + + assert len(pool.connections) == 1 + new_conn = pool.connections[0] + assert new_conn is not conn, "Should be a new connection" + assert not new_conn._connection._network_stream.mock_is_readable # type: ignore[attr-defined] + + + def test_connection_pool_timeout(): """ Ensure that exceeding max_connections can cause a request to timeout. diff --git a/tests/_sync/test_http11.py b/tests/_sync/test_http11.py index f2fa28f4c..0a6096aa9 100644 --- a/tests/_sync/test_http11.py +++ b/tests/_sync/test_http11.py @@ -1,6 +1,9 @@ +import typing + import pytest import httpcore +from httpcore._exceptions import ServerDisconnectedError @@ -168,6 +171,47 @@ def test_http11_connection_handles_one_active_request(): +def test_http11_idle_connection_checks_readable_state(): + """ + Idle connection can not be readable when requesting. + """ + + class MockStream(httpcore.MockStream): + def __init__(self, buffer: typing.List[bytes]): + super().__init__(buffer) + self.mock_is_readable = False + + def get_extra_info(self, info: str) -> typing.Any: + if info == "is_readable": + return self.mock_is_readable + return super().get_extra_info(info) # pragma: nocover + + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = MockStream( + [ + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 13\r\n", + b"\r\n", + b"Hello, world!", + ] + ) + with httpcore.HTTP11Connection(origin=origin, stream=stream) as conn: + conn.request("GET", "https://example.com/") + + assert conn.is_idle() and not conn.has_expired() + stream.mock_is_readable = True # Simulate connection breakage + + with pytest.raises(ServerDisconnectedError): + conn.request("GET", "https://example.com/") + assert conn.has_expired() and not conn.is_idle() + + with pytest.raises(ServerDisconnectedError): + conn.request("GET", "https://example.com/") + assert conn.has_expired() and not conn.is_idle() + + + def test_http11_connection_attempt_close(): """ A connection can only be closed when it is idle.