Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection pool optimization: move socket polling from expiry checks to connection usage #928

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions httpcore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
ReadError,
ReadTimeout,
RemoteProtocolError,
ServerDisconnectedError,
TimeoutException,
UnsupportedProtocol,
WriteError,
Expand Down Expand Up @@ -114,6 +115,7 @@ def __init__(self, *args, **kwargs): # type: ignore
"SOCKET_OPTION",
# exceptions
"ConnectionNotAvailable",
"ServerDisconnectedError",
"ProxyError",
"ProtocolError",
"LocalProtocolError",
Expand Down
29 changes: 15 additions & 14 deletions httpcore/_async/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
closing_connections = []

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
# have expired, or surplus idle connections.
for connection in list(self._connections):
if connection.is_closed():
# log: "removing closed connection"
Expand All @@ -267,15 +267,12 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we could just find the first available one here?

            available_connections = next(
                (
                    connection
                    for connection in self._connections
                    if connection.can_handle_request(origin)
                    and connection.is_available()
                ),
                None,
            )
           ...
           if available_connections:
               connection = available_connections
               ...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I just noticed that this has been split into separate PRs. 😅

idle_connections = [
connection for connection in self._connections if connection.is_idle()
]

# There are three cases for how we may be able to handle the request:
#
# 1. There is an existing connection that can handle the request.
# 2. We can create a new connection to handle the request.
# 3. We can close an idle connection and then create a new connection
# 3. We can close an idle/expired connection and then create a new connection
# to handle the request.
if available_connections:
# log: "reusing existing connection"
Expand All @@ -286,15 +283,19 @@ def _assign_requests_to_connections(self) -> List[AsyncConnectionInterface]:
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
elif idle_connections:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
else:
purged_connection = next(
(c for c in self._connections if c.is_idle() or c.has_expired()),
Copy link
Contributor Author

@MarkusSintonen MarkusSintonen Jun 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also considers expired connections here for removal. Because A) clock may have moved forward so that some are now expired here B) expired ones are the ones that maybe have been disconnected by the server (they are not idle)

None,
)
if purged_connection is not None:
# log: "closing idle connection"
self._connections.remove(purged_connection)
closing_connections.append(purged_connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)

return closing_connections

Expand Down
52 changes: 33 additions & 19 deletions httpcore/_async/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
ConnectionNotAvailable,
LocalProtocolError,
RemoteProtocolError,
ServerDisconnectedError,
WriteError,
map_exceptions,
)
Expand All @@ -45,6 +46,7 @@ class HTTPConnectionState(enum.IntEnum):
ACTIVE = 1
IDLE = 2
CLOSED = 3
SERVER_DISCONNECTED = 4


class AsyncHTTP11Connection(AsyncConnectionInterface):
Expand All @@ -59,7 +61,7 @@ def __init__(
) -> None:
self._origin = origin
self._network_stream = stream
self._keepalive_expiry: Optional[float] = keepalive_expiry
self._keepalive_expiry = keepalive_expiry
self._expire_at: Optional[float] = None
self._state = HTTPConnectionState.NEW
self._state_lock = AsyncLock()
Expand All @@ -76,13 +78,7 @@ async def handle_async_request(self, request: Request) -> Response:
f"to {self._origin}"
)

async with self._state_lock:
if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
self._request_count += 1
self._state = HTTPConnectionState.ACTIVE
self._expire_at = None
else:
raise ConnectionNotAvailable()
await self._update_state()

try:
kwargs = {"request": request}
Expand Down Expand Up @@ -142,6 +138,29 @@ async def handle_async_request(self, request: Request) -> Response:
await self._response_closed()
raise exc

async def _update_state(self) -> None:
async with self._state_lock:
# If the HTTP connection is idle but the socket is readable, then the
# only valid state is that the socket is about to return b"", indicating
# a server-initiated disconnect.
server_disconnected = (
self._state == HTTPConnectionState.IDLE
and self._network_stream.get_extra_info("is_readable")
)
Comment on lines +146 to +149
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this check be also done in AsyncHTTP2Connection similarly as here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think yes, but cc @tomchristie who knows better about http2

if (
server_disconnected
or self._state == HTTPConnectionState.SERVER_DISCONNECTED
):
self._state = HTTPConnectionState.SERVER_DISCONNECTED
raise ServerDisconnectedError()

if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
self._request_count += 1
self._state = HTTPConnectionState.ACTIVE
self._expire_at = None
else:
raise ConnectionNotAvailable()

# Sending the request...

async def _send_request_headers(self, request: Request) -> None:
Expand Down Expand Up @@ -279,18 +298,13 @@ def is_available(self) -> bool:
return self._state == HTTPConnectionState.IDLE

def has_expired(self) -> bool:
now = time.monotonic()
keepalive_expired = self._expire_at is not None and now > self._expire_at

# If the HTTP connection is idle but the socket is readable, then the
# only valid state is that the socket is about to return b"", indicating
# a server-initiated disconnect.
server_disconnected = (
self._state == HTTPConnectionState.IDLE
and self._network_stream.get_extra_info("is_readable")
)
if self._state == HTTPConnectionState.SERVER_DISCONNECTED:
# Connection that is disconnected by the server is considered expired.
# Pool then cleans up this connection by closing it.
return True

return keepalive_expired or server_disconnected
now = time.monotonic()
return self._expire_at is not None and now > self._expire_at

def is_idle(self) -> bool:
return self._state == HTTPConnectionState.IDLE
Expand Down
17 changes: 10 additions & 7 deletions httpcore/_async/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,7 @@ async def handle_async_request(self, request: Request) -> Response:
f"to {self._origin}"
)

async with self._state_lock:
if self._state in (HTTPConnectionState.ACTIVE, HTTPConnectionState.IDLE):
self._request_count += 1
self._expire_at = None
self._state = HTTPConnectionState.ACTIVE
else:
raise ConnectionNotAvailable()
await self._update_state()

async with self._init_lock:
if not self._sent_connection_init:
Expand Down Expand Up @@ -184,6 +178,15 @@ async def handle_async_request(self, request: Request) -> Response:

raise exc

async def _update_state(self) -> None:
async with self._state_lock:
if self._state in (HTTPConnectionState.ACTIVE, HTTPConnectionState.IDLE):
self._request_count += 1
self._expire_at = None
self._state = HTTPConnectionState.ACTIVE
else:
raise ConnectionNotAvailable()

async def _send_connection_init(self, request: Request) -> None:
"""
The HTTP/2 connection requires some initial setup before we can start
Expand Down
12 changes: 11 additions & 1 deletion httpcore/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,17 @@ def map_exceptions(map: ExceptionMapping) -> Iterator[None]:


class ConnectionNotAvailable(Exception):
pass
"""
This error is handled by the connection pool.
Users should not see this error directly when using connection pool.
"""


class ServerDisconnectedError(ConnectionNotAvailable):
"""
This error is handled by the connection pool.
Users should not see this error directly when using connection pool.
"""


class ProxyError(Exception):
Expand Down
29 changes: 15 additions & 14 deletions httpcore/_sync/connection_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
closing_connections = []

# First we handle cleaning up any connections that are closed,
# have expired their keep-alive, or surplus idle connections.
# have expired, or surplus idle connections.
for connection in list(self._connections):
if connection.is_closed():
# log: "removing closed connection"
Expand All @@ -267,15 +267,12 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
for connection in self._connections
if connection.can_handle_request(origin) and connection.is_available()
]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

idle_connections = [
connection for connection in self._connections if connection.is_idle()
]

# There are three cases for how we may be able to handle the request:
#
# 1. There is an existing connection that can handle the request.
# 2. We can create a new connection to handle the request.
# 3. We can close an idle connection and then create a new connection
# 3. We can close an idle/expired connection and then create a new connection
# to handle the request.
if available_connections:
# log: "reusing existing connection"
Expand All @@ -286,15 +283,19 @@ def _assign_requests_to_connections(self) -> List[ConnectionInterface]:
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
elif idle_connections:
# log: "closing idle connection"
connection = idle_connections[0]
self._connections.remove(connection)
closing_connections.append(connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)
else:
purged_connection = next(
(c for c in self._connections if c.is_idle() or c.has_expired()),
None,
)
if purged_connection is not None:
# log: "closing idle connection"
self._connections.remove(purged_connection)
closing_connections.append(purged_connection)
# log: "creating new connection"
connection = self.create_connection(origin)
self._connections.append(connection)
pool_request.assign_to_connection(connection)

return closing_connections

Expand Down
52 changes: 33 additions & 19 deletions httpcore/_sync/http11.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
ConnectionNotAvailable,
LocalProtocolError,
RemoteProtocolError,
ServerDisconnectedError,
WriteError,
map_exceptions,
)
Expand All @@ -45,6 +46,7 @@ class HTTPConnectionState(enum.IntEnum):
ACTIVE = 1
IDLE = 2
CLOSED = 3
SERVER_DISCONNECTED = 4


class HTTP11Connection(ConnectionInterface):
Expand All @@ -59,7 +61,7 @@ def __init__(
) -> None:
self._origin = origin
self._network_stream = stream
self._keepalive_expiry: Optional[float] = keepalive_expiry
self._keepalive_expiry = keepalive_expiry
self._expire_at: Optional[float] = None
self._state = HTTPConnectionState.NEW
self._state_lock = Lock()
Expand All @@ -76,13 +78,7 @@ def handle_request(self, request: Request) -> Response:
f"to {self._origin}"
)

with self._state_lock:
if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
self._request_count += 1
self._state = HTTPConnectionState.ACTIVE
self._expire_at = None
else:
raise ConnectionNotAvailable()
self._update_state()

try:
kwargs = {"request": request}
Expand Down Expand Up @@ -142,6 +138,29 @@ def handle_request(self, request: Request) -> Response:
self._response_closed()
raise exc

def _update_state(self) -> None:
with self._state_lock:
# If the HTTP connection is idle but the socket is readable, then the
# only valid state is that the socket is about to return b"", indicating
# a server-initiated disconnect.
server_disconnected = (
self._state == HTTPConnectionState.IDLE
and self._network_stream.get_extra_info("is_readable")
)
if (
server_disconnected
or self._state == HTTPConnectionState.SERVER_DISCONNECTED
):
self._state = HTTPConnectionState.SERVER_DISCONNECTED
raise ServerDisconnectedError()

if self._state in (HTTPConnectionState.NEW, HTTPConnectionState.IDLE):
self._request_count += 1
self._state = HTTPConnectionState.ACTIVE
self._expire_at = None
else:
raise ConnectionNotAvailable()

# Sending the request...

def _send_request_headers(self, request: Request) -> None:
Expand Down Expand Up @@ -279,18 +298,13 @@ def is_available(self) -> bool:
return self._state == HTTPConnectionState.IDLE

def has_expired(self) -> bool:
now = time.monotonic()
keepalive_expired = self._expire_at is not None and now > self._expire_at

# If the HTTP connection is idle but the socket is readable, then the
# only valid state is that the socket is about to return b"", indicating
# a server-initiated disconnect.
server_disconnected = (
self._state == HTTPConnectionState.IDLE
and self._network_stream.get_extra_info("is_readable")
)
if self._state == HTTPConnectionState.SERVER_DISCONNECTED:
# Connection that is disconnected by the server is considered expired.
# Pool then cleans up this connection by closing it.
return True

return keepalive_expired or server_disconnected
now = time.monotonic()
return self._expire_at is not None and now > self._expire_at

def is_idle(self) -> bool:
return self._state == HTTPConnectionState.IDLE
Expand Down
17 changes: 10 additions & 7 deletions httpcore/_sync/http2.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,7 @@ def handle_request(self, request: Request) -> Response:
f"to {self._origin}"
)

with self._state_lock:
if self._state in (HTTPConnectionState.ACTIVE, HTTPConnectionState.IDLE):
self._request_count += 1
self._expire_at = None
self._state = HTTPConnectionState.ACTIVE
else:
raise ConnectionNotAvailable()
self._update_state()

with self._init_lock:
if not self._sent_connection_init:
Expand Down Expand Up @@ -184,6 +178,15 @@ def handle_request(self, request: Request) -> Response:

raise exc

def _update_state(self) -> None:
with self._state_lock:
if self._state in (HTTPConnectionState.ACTIVE, HTTPConnectionState.IDLE):
self._request_count += 1
self._expire_at = None
self._state = HTTPConnectionState.ACTIVE
else:
raise ConnectionNotAvailable()

def _send_connection_init(self, request: Request) -> None:
"""
The HTTP/2 connection requires some initial setup before we can start
Expand Down
Loading
Loading