diff --git a/httpcore/__init__.py b/httpcore/__init__.py index 014213ba..88255f60 100644 --- a/httpcore/__init__.py +++ b/httpcore/__init__.py @@ -29,6 +29,7 @@ ReadError, ReadTimeout, RemoteProtocolError, + ServerDisconnectedError, TimeoutException, UnsupportedProtocol, WriteError, @@ -114,6 +115,7 @@ def __init__(self, *args, **kwargs): # type: ignore "SOCKET_OPTION", # exceptions "ConnectionNotAvailable", + "ServerDisconnectedError", "ProxyError", "ProtocolError", "LocalProtocolError", diff --git a/httpcore/_async/connection_pool.py b/httpcore/_async/connection_pool.py index 214dfc4b..86baa016 100644 --- a/httpcore/_async/connection_pool.py +++ b/httpcore/_async/connection_pool.py @@ -240,7 +240,7 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]: closing_connections = [] # First we handle cleaning up any connections that are closed, - # have expired their keep-alive, or surplus idle connections. + # have expired, or surplus idle connections. for connection in list(self._connections): if connection.is_closed(): # log: "removing closed connection" @@ -267,15 +267,12 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]: for connection in self._connections if connection.can_handle_request(origin) and connection.is_available() ] - idle_connections = [ - connection for connection in self._connections if connection.is_idle() - ] # There are three cases for how we may be able to handle the request: # # 1. There is an existing connection that can handle the request. # 2. We can create a new connection to handle the request. - # 3. We can close an idle connection and then create a new connection + # 3. We can close an idle/expired connection and then create a new connection # to handle the request. if available_connections: # log: "reusing existing connection" @@ -286,15 +283,19 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]: connection = self.create_connection(origin) self._connections.append(connection) pool_request.assign_to_connection(connection) - elif idle_connections: - # log: "closing idle connection" - connection = idle_connections[0] - self._connections.remove(connection) - closing_connections.append(connection) - # log: "creating new connection" - connection = self.create_connection(origin) - self._connections.append(connection) - pool_request.assign_to_connection(connection) + else: + purged_connection = next( + (c for c in self._connections if c.is_idle() or c.has_expired()), + None, + ) + if purged_connection is not None: + # log: "closing idle connection" + self._connections.remove(purged_connection) + closing_connections.append(purged_connection) + # log: "creating new connection" + connection = self.create_connection(origin) + self._connections.append(connection) + pool_request.assign_to_connection(connection) return closing_connections diff --git a/httpcore/_async/http11.py b/httpcore/_async/http11.py index 0493a923..0e47623a 100644 --- a/httpcore/_async/http11.py +++ b/httpcore/_async/http11.py @@ -21,6 +21,7 @@ ConnectionNotAvailable, LocalProtocolError, RemoteProtocolError, + ServerDisconnectedError, WriteError, map_exceptions, ) @@ -45,6 +46,7 @@ class HTTPConnectionState(enum.IntEnum): ACTIVE = 1 IDLE = 2 CLOSED = 3 + SERVER_DISCONNECTED = 4 class AsyncHTTP11Connection(AsyncConnectionInterface): @@ -59,7 +61,7 @@ def __init__( ) -> None: self._origin = origin self._network_stream = stream - self._keepalive_expiry: Optional[float] = keepalive_expiry + self._keepalive_expiry = keepalive_expiry self._expire_at: Optional[float] = None self._state = HTTPConnectionState.NEW self._state_lock = AsyncLock() @@ -76,13 +78,7 @@ async def handle_async_request(self, request: Request) -> Response: f"to {self._origin}" ) - async with self._state_lock: - if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE): - self._request_count += 1 - self._state = HTTPConnectionState.ACTIVE - self._expire_at = None - else: - raise ConnectionNotAvailable() + await self._update_state() try: kwargs = {"request": request} @@ -142,6 +138,29 @@ async def handle_async_request(self, request: Request) -> Response: await self._response_closed() raise exc + async def _update_state(self) -> None: + async with self._state_lock: + # If the HTTP connection is idle but the socket is readable, then the + # only valid state is that the socket is about to return b"", indicating + # a server-initiated disconnect. + server_disconnected = ( + self._state == HTTPConnectionState.IDLE + and self._network_stream.get_extra_info("is_readable") + ) + if ( + server_disconnected + or self._state == HTTPConnectionState.SERVER_DISCONNECTED + ): + self._state = HTTPConnectionState.SERVER_DISCONNECTED + raise ServerDisconnectedError() + + if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE): + self._request_count += 1 + self._state = HTTPConnectionState.ACTIVE + self._expire_at = None + else: + raise ConnectionNotAvailable() + # Sending the request... async def _send_request_headers(self, request: Request) -> None: @@ -279,18 +298,13 @@ def is_available(self) -> bool: return self._state == HTTPConnectionState.IDLE def has_expired(self) -> bool: - now = time.monotonic() - keepalive_expired = self._expire_at is not None and now > self._expire_at - - # If the HTTP connection is idle but the socket is readable, then the - # only valid state is that the socket is about to return b"", indicating - # a server-initiated disconnect. - server_disconnected = ( - self._state == HTTPConnectionState.IDLE - and self._network_stream.get_extra_info("is_readable") - ) + if self._state == HTTPConnectionState.SERVER_DISCONNECTED: + # Connection that is disconnected by the server is considered expired. + # Pool then cleans up this connection by closing it. + return True - return keepalive_expired or server_disconnected + now = time.monotonic() + return self._expire_at is not None and now > self._expire_at def is_idle(self) -> bool: return self._state == HTTPConnectionState.IDLE diff --git a/httpcore/_async/http2.py b/httpcore/_async/http2.py index c201ee4c..09fa9d72 100644 --- a/httpcore/_async/http2.py +++ b/httpcore/_async/http2.py @@ -93,13 +93,7 @@ async def handle_async_request(self, request: Request) -> Response: f"to {self._origin}" ) - async with self._state_lock: - if self._state in (HTTPConnectionState.ACTIVE, HTTPConnectionState.IDLE): - self._request_count += 1 - self._expire_at = None - self._state = HTTPConnectionState.ACTIVE - else: - raise ConnectionNotAvailable() + await self._update_state() async with self._init_lock: if not self._sent_connection_init: @@ -184,6 +178,15 @@ async def handle_async_request(self, request: Request) -> Response: raise exc + async def _update_state(self) -> None: + async with self._state_lock: + if self._state in (HTTPConnectionState.ACTIVE, HTTPConnectionState.IDLE): + self._request_count += 1 + self._expire_at = None + self._state = HTTPConnectionState.ACTIVE + else: + raise ConnectionNotAvailable() + async def _send_connection_init(self, request: Request) -> None: """ The HTTP/2 connection requires some initial setup before we can start diff --git a/httpcore/_exceptions.py b/httpcore/_exceptions.py index 81e7fc61..88568256 100644 --- a/httpcore/_exceptions.py +++ b/httpcore/_exceptions.py @@ -16,7 +16,17 @@ def map_exceptions(map: ExceptionMapping) -> Iterator[None]: class ConnectionNotAvailable(Exception): - pass + """ + This error is handled by the connection pool. + Users should not see this error directly when using connection pool. + """ + + +class ServerDisconnectedError(ConnectionNotAvailable): + """ + This error is handled by the connection pool. + Users should not see this error directly when using connection pool. + """ class ProxyError(Exception): diff --git a/httpcore/_sync/connection_pool.py b/httpcore/_sync/connection_pool.py index 01bec59e..9f0f5fc6 100644 --- a/httpcore/_sync/connection_pool.py +++ b/httpcore/_sync/connection_pool.py @@ -240,7 +240,7 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]: closing_connections = [] # First we handle cleaning up any connections that are closed, - # have expired their keep-alive, or surplus idle connections. + # have expired, or surplus idle connections. for connection in list(self._connections): if connection.is_closed(): # log: "removing closed connection" @@ -267,15 +267,12 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]: for connection in self._connections if connection.can_handle_request(origin) and connection.is_available() ] - idle_connections = [ - connection for connection in self._connections if connection.is_idle() - ] # There are three cases for how we may be able to handle the request: # # 1. There is an existing connection that can handle the request. # 2. We can create a new connection to handle the request. - # 3. We can close an idle connection and then create a new connection + # 3. We can close an idle/expired connection and then create a new connection # to handle the request. if available_connections: # log: "reusing existing connection" @@ -286,15 +283,19 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]: connection = self.create_connection(origin) self._connections.append(connection) pool_request.assign_to_connection(connection) - elif idle_connections: - # log: "closing idle connection" - connection = idle_connections[0] - self._connections.remove(connection) - closing_connections.append(connection) - # log: "creating new connection" - connection = self.create_connection(origin) - self._connections.append(connection) - pool_request.assign_to_connection(connection) + else: + purged_connection = next( + (c for c in self._connections if c.is_idle() or c.has_expired()), + None, + ) + if purged_connection is not None: + # log: "closing idle connection" + self._connections.remove(purged_connection) + closing_connections.append(purged_connection) + # log: "creating new connection" + connection = self.create_connection(origin) + self._connections.append(connection) + pool_request.assign_to_connection(connection) return closing_connections diff --git a/httpcore/_sync/http11.py b/httpcore/_sync/http11.py index a74ff8e8..ad463a76 100644 --- a/httpcore/_sync/http11.py +++ b/httpcore/_sync/http11.py @@ -21,6 +21,7 @@ ConnectionNotAvailable, LocalProtocolError, RemoteProtocolError, + ServerDisconnectedError, WriteError, map_exceptions, ) @@ -45,6 +46,7 @@ class HTTPConnectionState(enum.IntEnum): ACTIVE = 1 IDLE = 2 CLOSED = 3 + SERVER_DISCONNECTED = 4 class HTTP11Connection(ConnectionInterface): @@ -59,7 +61,7 @@ def __init__( ) -> None: self._origin = origin self._network_stream = stream - self._keepalive_expiry: Optional[float] = keepalive_expiry + self._keepalive_expiry = keepalive_expiry self._expire_at: Optional[float] = None self._state = HTTPConnectionState.NEW self._state_lock = Lock() @@ -76,13 +78,7 @@ def handle_request(self, request: Request) -> Response: f"to {self._origin}" ) - with self._state_lock: - if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE): - self._request_count += 1 - self._state = HTTPConnectionState.ACTIVE - self._expire_at = None - else: - raise ConnectionNotAvailable() + self._update_state() try: kwargs = {"request": request} @@ -142,6 +138,29 @@ def handle_request(self, request: Request) -> Response: self._response_closed() raise exc + def _update_state(self) -> None: + with self._state_lock: + # If the HTTP connection is idle but the socket is readable, then the + # only valid state is that the socket is about to return b"", indicating + # a server-initiated disconnect. + server_disconnected = ( + self._state == HTTPConnectionState.IDLE + and self._network_stream.get_extra_info("is_readable") + ) + if ( + server_disconnected + or self._state == HTTPConnectionState.SERVER_DISCONNECTED + ): + self._state = HTTPConnectionState.SERVER_DISCONNECTED + raise ServerDisconnectedError() + + if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE): + self._request_count += 1 + self._state = HTTPConnectionState.ACTIVE + self._expire_at = None + else: + raise ConnectionNotAvailable() + # Sending the request... def _send_request_headers(self, request: Request) -> None: @@ -279,18 +298,13 @@ def is_available(self) -> bool: return self._state == HTTPConnectionState.IDLE def has_expired(self) -> bool: - now = time.monotonic() - keepalive_expired = self._expire_at is not None and now > self._expire_at - - # If the HTTP connection is idle but the socket is readable, then the - # only valid state is that the socket is about to return b"", indicating - # a server-initiated disconnect. - server_disconnected = ( - self._state == HTTPConnectionState.IDLE - and self._network_stream.get_extra_info("is_readable") - ) + if self._state == HTTPConnectionState.SERVER_DISCONNECTED: + # Connection that is disconnected by the server is considered expired. + # Pool then cleans up this connection by closing it. + return True - return keepalive_expired or server_disconnected + now = time.monotonic() + return self._expire_at is not None and now > self._expire_at def is_idle(self) -> bool: return self._state == HTTPConnectionState.IDLE diff --git a/httpcore/_sync/http2.py b/httpcore/_sync/http2.py index 1ee4bbb3..b8e1ad08 100644 --- a/httpcore/_sync/http2.py +++ b/httpcore/_sync/http2.py @@ -93,13 +93,7 @@ def handle_request(self, request: Request) -> Response: f"to {self._origin}" ) - with self._state_lock: - if self._state in (HTTPConnectionState.ACTIVE, HTTPConnectionState.IDLE): - self._request_count += 1 - self._expire_at = None - self._state = HTTPConnectionState.ACTIVE - else: - raise ConnectionNotAvailable() + self._update_state() with self._init_lock: if not self._sent_connection_init: @@ -184,6 +178,15 @@ def handle_request(self, request: Request) -> Response: raise exc + def _update_state(self) -> None: + with self._state_lock: + if self._state in (HTTPConnectionState.ACTIVE, HTTPConnectionState.IDLE): + self._request_count += 1 + self._expire_at = None + self._state = HTTPConnectionState.ACTIVE + else: + raise ConnectionNotAvailable() + def _send_connection_init(self, request: Request) -> None: """ The HTTP/2 connection requires some initial setup before we can start diff --git a/tests/_async/test_connection_pool.py b/tests/_async/test_connection_pool.py index 2fc27204..93ff9cda 100644 --- a/tests/_async/test_connection_pool.py +++ b/tests/_async/test_connection_pool.py @@ -687,6 +687,82 @@ async def test_connection_pool_closed_while_request_in_flight(): await response.aread() +@pytest.mark.anyio +async def test_connection_pool_with_idle_broken_connection(): + """ + Pool gives a new connection when an idle connection gets readable (ie broken) while in the pool. + """ + + class MockStream(httpcore.AsyncMockStream): + def __init__(self, buffer: typing.List[bytes]): + super().__init__(buffer) + self.mock_is_readable = False + + def get_extra_info(self, info: str) -> typing.Any: + if info == "is_readable": + return self.mock_is_readable + return super().get_extra_info(info) # pragma: nocover + + streams = [ + MockStream( + [ + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 15\r\n", + b"\r\n", + b"Hello, world 1!", + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 15\r\n", + b"\r\n", + b"Hello, world 2!", + ] + ), + MockStream( + [ + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 29\r\n", + b"\r\n", + b"Hello, world from new stream!", + ] + ), + ] + + class MockBackend(httpcore.AsyncMockBackend): + async def connect_tcp( + self, *args: typing.Any, **kwargs: typing.Any + ) -> MockStream: + return streams.pop(0) + + async with httpcore.AsyncConnectionPool( + network_backend=MockBackend([]), max_connections=1 + ) as pool: + res = await pool.request("GET", "https://example.com/") + assert (await res.aread()) == b"Hello, world 1!" + + assert len(pool.connections) == 1 + conn = pool.connections[0] + + res = await pool.request("GET", "https://example.com/") + assert (await res.aread()) == b"Hello, world 2!" + + assert len(pool.connections) == 1 + assert conn is pool.connections[0], "Should reuse connection" + + # Simulate network breakage + assert conn.is_idle() + conn._connection._network_stream.mock_is_readable = True # type: ignore[attr-defined] + + res = await pool.request("GET", "https://example.com/") + assert (await res.aread()) == b"Hello, world from new stream!" + + assert len(pool.connections) == 1 + new_conn = pool.connections[0] + assert new_conn is not conn, "Should be a new connection" + assert not new_conn._connection._network_stream.mock_is_readable # type: ignore[attr-defined] + + @pytest.mark.anyio async def test_connection_pool_timeout(): """ diff --git a/tests/_async/test_http11.py b/tests/_async/test_http11.py index 94f2febf..d8f9b10c 100644 --- a/tests/_async/test_http11.py +++ b/tests/_async/test_http11.py @@ -1,3 +1,5 @@ +import typing + import pytest import httpcore @@ -167,6 +169,43 @@ async def test_http11_connection_handles_one_active_request(): await conn.request("GET", "https://example.com/") +@pytest.mark.anyio +async def test_http11_idle_connection_checks_readable_state(): + """ + Idle connection can not be readable when requesting. + """ + + class MockStream(httpcore.AsyncMockStream): + def __init__(self, buffer: typing.List[bytes]): + super().__init__(buffer) + self.mock_is_readable = False + + def get_extra_info(self, info: str) -> typing.Any: + if info == "is_readable": + return self.mock_is_readable + return super().get_extra_info(info) # pragma: nocover + + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = MockStream( + [ + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 13\r\n", + b"\r\n", + b"Hello, world!", + ] + ) + async with httpcore.AsyncHTTP11Connection(origin=origin, stream=stream) as conn: + await conn.request("GET", "https://example.com/") + + assert conn.is_idle() and not conn.has_expired() + stream.mock_is_readable = True # Simulate connection breakage + + with pytest.raises(httpcore.ServerDisconnectedError): + await conn.request("GET", "https://example.com/") + assert conn.has_expired() and not conn.is_idle() + + @pytest.mark.anyio async def test_http11_connection_attempt_close(): """ diff --git a/tests/_sync/test_connection_pool.py b/tests/_sync/test_connection_pool.py index ee303e5c..b183f20a 100644 --- a/tests/_sync/test_connection_pool.py +++ b/tests/_sync/test_connection_pool.py @@ -688,6 +688,82 @@ def test_connection_pool_closed_while_request_in_flight(): +def test_connection_pool_with_idle_broken_connection(): + """ + Pool gives a new connection when an idle connection gets readable (ie broken) while in the pool. + """ + + class MockStream(httpcore.MockStream): + def __init__(self, buffer: typing.List[bytes]): + super().__init__(buffer) + self.mock_is_readable = False + + def get_extra_info(self, info: str) -> typing.Any: + if info == "is_readable": + return self.mock_is_readable + return super().get_extra_info(info) # pragma: nocover + + streams = [ + MockStream( + [ + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 15\r\n", + b"\r\n", + b"Hello, world 1!", + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 15\r\n", + b"\r\n", + b"Hello, world 2!", + ] + ), + MockStream( + [ + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 29\r\n", + b"\r\n", + b"Hello, world from new stream!", + ] + ), + ] + + class MockBackend(httpcore.MockBackend): + def connect_tcp( + self, *args: typing.Any, **kwargs: typing.Any + ) -> MockStream: + return streams.pop(0) + + with httpcore.ConnectionPool( + network_backend=MockBackend([]), max_connections=1 + ) as pool: + res = pool.request("GET", "https://example.com/") + assert (res.read()) == b"Hello, world 1!" + + assert len(pool.connections) == 1 + conn = pool.connections[0] + + res = pool.request("GET", "https://example.com/") + assert (res.read()) == b"Hello, world 2!" + + assert len(pool.connections) == 1 + assert conn is pool.connections[0], "Should reuse connection" + + # Simulate network breakage + assert conn.is_idle() + conn._connection._network_stream.mock_is_readable = True # type: ignore[attr-defined] + + res = pool.request("GET", "https://example.com/") + assert (res.read()) == b"Hello, world from new stream!" + + assert len(pool.connections) == 1 + new_conn = pool.connections[0] + assert new_conn is not conn, "Should be a new connection" + assert not new_conn._connection._network_stream.mock_is_readable # type: ignore[attr-defined] + + + def test_connection_pool_timeout(): """ Ensure that exceeding max_connections can cause a request to timeout. diff --git a/tests/_sync/test_http11.py b/tests/_sync/test_http11.py index f2fa28f4..f77c39f0 100644 --- a/tests/_sync/test_http11.py +++ b/tests/_sync/test_http11.py @@ -1,3 +1,5 @@ +import typing + import pytest import httpcore @@ -168,6 +170,43 @@ def test_http11_connection_handles_one_active_request(): +def test_http11_idle_connection_checks_readable_state(): + """ + Idle connection can not be readable when requesting. + """ + + class MockStream(httpcore.MockStream): + def __init__(self, buffer: typing.List[bytes]): + super().__init__(buffer) + self.mock_is_readable = False + + def get_extra_info(self, info: str) -> typing.Any: + if info == "is_readable": + return self.mock_is_readable + return super().get_extra_info(info) # pragma: nocover + + origin = httpcore.Origin(b"https", b"example.com", 443) + stream = MockStream( + [ + b"HTTP/1.1 200 OK\r\n", + b"Content-Type: plain/text\r\n", + b"Content-Length: 13\r\n", + b"\r\n", + b"Hello, world!", + ] + ) + with httpcore.HTTP11Connection(origin=origin, stream=stream) as conn: + conn.request("GET", "https://example.com/") + + assert conn.is_idle() and not conn.has_expired() + stream.mock_is_readable = True # Simulate connection breakage + + with pytest.raises(httpcore.ServerDisconnectedError): + conn.request("GET", "https://example.com/") + assert conn.has_expired() and not conn.is_idle() + + + def test_http11_connection_attempt_close(): """ A connection can only be closed when it is idle.