Skip to content

Commit

Permalink
Move connection poll check away from pool expiry checks
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkusSintonen committed Jun 15, 2024
1 parent da86ca4 commit 27e73bb
Show file tree
Hide file tree
Showing 10 changed files with 304 additions and 28 deletions.
2 changes: 2 additions & 0 deletions httpcore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
ReadError,
ReadTimeout,
RemoteProtocolError,
ServerDisconnectedError,
TimeoutException,
UnsupportedProtocol,
WriteError,
Expand Down Expand Up @@ -114,6 +115,7 @@ def __init__(self, *args, **kwargs): # type: ignore
"SOCKET_OPTION",
# exceptions
"ConnectionNotAvailable",
"ServerDisconnectedError",
"ProxyError",
"ProtocolError",
"LocalProtocolError",
Expand Down
8 changes: 6 additions & 2 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@

from .._backends.auto import AutoBackend
from .._backends.base import SOCKET_OPTION, AsyncNetworkBackend
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._exceptions import (
ConnectionNotAvailable,
ServerDisconnectedError,
UnsupportedProtocol,
)
from .._models import Origin, Request, Response
from .._synchronization import AsyncEvent, AsyncShieldCancellation, AsyncThreadLock
from .connection import AsyncHTTPConnection
Expand Down Expand Up @@ -196,7 +200,7 @@ async def handle_async_request(self, request: Request) -> Response:
response = await connection.handle_async_request(
pool_request.request
)
except ConnectionNotAvailable:
except (ConnectionNotAvailable, ServerDisconnectedError):
# In some cases a connection may initially be available to
# handle a request, but then become unavailable.
#
Expand Down
35 changes: 23 additions & 12 deletions httpcore/_async/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
ConnectionNotAvailable,
LocalProtocolError,
RemoteProtocolError,
ServerDisconnectedError,
WriteError,
map_exceptions,
)
Expand All @@ -45,6 +46,7 @@ class HTTPConnectionState(enum.IntEnum):
ACTIVE = 1
IDLE = 2
CLOSED = 3
SERVER_DISCONNECTED = 4


class AsyncHTTP11Connection(AsyncConnectionInterface):
Expand All @@ -59,7 +61,7 @@ def __init__(
) -> None:
self._origin = origin
self._network_stream = stream
self._keepalive_expiry: Optional[float] = keepalive_expiry
self._keepalive_expiry = keepalive_expiry
self._expire_at: Optional[float] = None
self._state = HTTPConnectionState.NEW
self._state_lock = AsyncLock()
Expand All @@ -77,6 +79,20 @@ async def handle_async_request(self, request: Request) -> Response:
)

async with self._state_lock:
if self._state == HTTPConnectionState.SERVER_DISCONNECTED:
raise ServerDisconnectedError()

# If the HTTP connection is idle but the socket is readable, then the
# only valid state is that the socket is about to return b"", indicating
# a server-initiated disconnect.
server_disconnected = (
self._state == HTTPConnectionState.IDLE
and self._network_stream.get_extra_info("is_readable")
)
if server_disconnected:
self._state = HTTPConnectionState.SERVER_DISCONNECTED
raise ServerDisconnectedError()

if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
self._request_count += 1
self._state = HTTPConnectionState.ACTIVE
Expand Down Expand Up @@ -279,18 +295,13 @@ def is_available(self) -> bool:
return self._state == HTTPConnectionState.IDLE

def has_expired(self) -> bool:
now = time.monotonic()
keepalive_expired = self._expire_at is not None and now > self._expire_at

# If the HTTP connection is idle but the socket is readable, then the
# only valid state is that the socket is about to return b"", indicating
# a server-initiated disconnect.
server_disconnected = (
self._state == HTTPConnectionState.IDLE
and self._network_stream.get_extra_info("is_readable")
)
if self._state == HTTPConnectionState.SERVER_DISCONNECTED:
# Connection that is disconnected by the server is considered expired.
# Pool then cleans up this connection by closing it.
return True

return keepalive_expired or server_disconnected
now = time.monotonic()
return self._expire_at is not None and now > self._expire_at

def is_idle(self) -> bool:
return self._state == HTTPConnectionState.IDLE
Expand Down
4 changes: 4 additions & 0 deletions httpcore/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ class ConnectionNotAvailable(Exception):
pass


class ServerDisconnectedError(Exception):
pass


class ProxyError(Exception):
pass

Expand Down
8 changes: 6 additions & 2 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@

from .._backends.sync import SyncBackend
from .._backends.base import SOCKET_OPTION, NetworkBackend
from .._exceptions import ConnectionNotAvailable, UnsupportedProtocol
from .._exceptions import (
ConnectionNotAvailable,
ServerDisconnectedError,
UnsupportedProtocol,
)
from .._models import Origin, Request, Response
from .._synchronization import Event, ShieldCancellation, ThreadLock
from .connection import HTTPConnection
Expand Down Expand Up @@ -196,7 +200,7 @@ def handle_request(self, request: Request) -> Response:
response = connection.handle_request(
pool_request.request
)
except ConnectionNotAvailable:
except (ConnectionNotAvailable, ServerDisconnectedError):
# In some cases a connection may initially be available to
# handle a request, but then become unavailable.
#
Expand Down
35 changes: 23 additions & 12 deletions httpcore/_sync/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
ConnectionNotAvailable,
LocalProtocolError,
RemoteProtocolError,
ServerDisconnectedError,
WriteError,
map_exceptions,
)
Expand All @@ -45,6 +46,7 @@ class HTTPConnectionState(enum.IntEnum):
ACTIVE = 1
IDLE = 2
CLOSED = 3
SERVER_DISCONNECTED = 4


class HTTP11Connection(ConnectionInterface):
Expand All @@ -59,7 +61,7 @@ def __init__(
) -> None:
self._origin = origin
self._network_stream = stream
self._keepalive_expiry: Optional[float] = keepalive_expiry
self._keepalive_expiry = keepalive_expiry
self._expire_at: Optional[float] = None
self._state = HTTPConnectionState.NEW
self._state_lock = Lock()
Expand All @@ -77,6 +79,20 @@ def handle_request(self, request: Request) -> Response:
)

with self._state_lock:
if self._state == HTTPConnectionState.SERVER_DISCONNECTED:
raise ServerDisconnectedError()

# If the HTTP connection is idle but the socket is readable, then the
# only valid state is that the socket is about to return b"", indicating
# a server-initiated disconnect.
server_disconnected = (
self._state == HTTPConnectionState.IDLE
and self._network_stream.get_extra_info("is_readable")
)
if server_disconnected:
self._state = HTTPConnectionState.SERVER_DISCONNECTED
raise ServerDisconnectedError()

if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
self._request_count += 1
self._state = HTTPConnectionState.ACTIVE
Expand Down Expand Up @@ -279,18 +295,13 @@ def is_available(self) -> bool:
return self._state == HTTPConnectionState.IDLE

def has_expired(self) -> bool:
now = time.monotonic()
keepalive_expired = self._expire_at is not None and now > self._expire_at

# If the HTTP connection is idle but the socket is readable, then the
# only valid state is that the socket is about to return b"", indicating
# a server-initiated disconnect.
server_disconnected = (
self._state == HTTPConnectionState.IDLE
and self._network_stream.get_extra_info("is_readable")
)
if self._state == HTTPConnectionState.SERVER_DISCONNECTED:
# Connection that is disconnected by the server is considered expired.
# Pool then cleans up this connection by closing it.
return True

return keepalive_expired or server_disconnected
now = time.monotonic()
return self._expire_at is not None and now > self._expire_at

def is_idle(self) -> bool:
return self._state == HTTPConnectionState.IDLE
Expand Down
76 changes: 76 additions & 0 deletions tests/_async/test_connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,82 @@ async def test_connection_pool_closed_while_request_in_flight():
await response.aread()


@pytest.mark.anyio
async def test_connection_pool_with_idle_broken_connection():
"""
Pool gives a new connection when an idle connection gets readable (ie broken) while in the pool.
"""

class MockStream(httpcore.AsyncMockStream):
def __init__(self, buffer: typing.List[bytes]):
super().__init__(buffer)
self.mock_is_readable = False

def get_extra_info(self, info: str) -> typing.Any:
if info == "is_readable":
return self.mock_is_readable
return super().get_extra_info(info) # pragma: nocover

streams = [
MockStream(
[
b"HTTP/1.1 200 OK\r\n",
b"Content-Type: plain/text\r\n",
b"Content-Length: 15\r\n",
b"\r\n",
b"Hello, world 1!",
b"HTTP/1.1 200 OK\r\n",
b"Content-Type: plain/text\r\n",
b"Content-Length: 15\r\n",
b"\r\n",
b"Hello, world 2!",
]
),
MockStream(
[
b"HTTP/1.1 200 OK\r\n",
b"Content-Type: plain/text\r\n",
b"Content-Length: 29\r\n",
b"\r\n",
b"Hello, world from new stream!",
]
),
]

class MockBackend(httpcore.AsyncMockBackend):
async def connect_tcp(
self, *args: typing.Any, **kwargs: typing.Any
) -> MockStream:
return streams.pop(0)

async with httpcore.AsyncConnectionPool(
network_backend=MockBackend([]), max_connections=1
) as pool:
res = await pool.request("GET", "https://example.com/")
assert (await res.aread()) == b"Hello, world 1!"

assert len(pool.connections) == 1
conn = pool.connections[0]

res = await pool.request("GET", "https://example.com/")
assert (await res.aread()) == b"Hello, world 2!"

assert len(pool.connections) == 1
assert conn is pool.connections[0], "Should reuse connection"

# Simulate network breakage
assert conn.is_idle()
conn._connection._network_stream.mock_is_readable = True # type: ignore[attr-defined]

res = await pool.request("GET", "https://example.com/")
assert (await res.aread()) == b"Hello, world from new stream!"

assert len(pool.connections) == 1
new_conn = pool.connections[0]
assert new_conn is not conn, "Should be a new connection"
assert not new_conn._connection._network_stream.mock_is_readable # type: ignore[attr-defined]


@pytest.mark.anyio
async def test_connection_pool_timeout():
"""
Expand Down
44 changes: 44 additions & 0 deletions tests/_async/test_http11.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import typing

import pytest

import httpcore
from httpcore._exceptions import ServerDisconnectedError


@pytest.mark.anyio
Expand Down Expand Up @@ -167,6 +170,47 @@ async def test_http11_connection_handles_one_active_request():
await conn.request("GET", "https://example.com/")


@pytest.mark.anyio
async def test_http11_idle_connection_checks_readable_state():
"""
Idle connection can not be readable when requesting.
"""

class MockStream(httpcore.AsyncMockStream):
def __init__(self, buffer: typing.List[bytes]):
super().__init__(buffer)
self.mock_is_readable = False

def get_extra_info(self, info: str) -> typing.Any:
if info == "is_readable":
return self.mock_is_readable
return super().get_extra_info(info) # pragma: nocover

origin = httpcore.Origin(b"https", b"example.com", 443)
stream = MockStream(
[
b"HTTP/1.1 200 OK\r\n",
b"Content-Type: plain/text\r\n",
b"Content-Length: 13\r\n",
b"\r\n",
b"Hello, world!",
]
)
async with httpcore.AsyncHTTP11Connection(origin=origin, stream=stream) as conn:
await conn.request("GET", "https://example.com/")

assert conn.is_idle() and not conn.has_expired()
stream.mock_is_readable = True # Simulate connection breakage

with pytest.raises(ServerDisconnectedError):
await conn.request("GET", "https://example.com/")
assert conn.has_expired() and not conn.is_idle()

with pytest.raises(ServerDisconnectedError):
await conn.request("GET", "https://example.com/")
assert conn.has_expired() and not conn.is_idle()


@pytest.mark.anyio
async def test_http11_connection_attempt_close():
"""
Expand Down
Loading

0 comments on commit 27e73bb

Please sign in to comment.