Async Support
HTTPX offers a standard synchronous API by default, but also gives you the option of an async client if you need it.
Async is a concurrency model that is far more efficient than multi-threading, and can provide significant performance benefits and enable the use of long-lived network connections such as WebSockets.
If you're working with an async web framework then you'll also want to use an async client for sending outgoing HTTP requests.
Launching concurrent async tasks is far more resource efficient than spawning multiple threads. The Python interpreter should be able to comfortably handle switching between over 1000 concurrent tasks, while a sensible number of threads in a thread pool might be to enable around 10 or 20 concurrent threads.
Enabling Async support
If you're using async with Python's stdlib asyncio
support, install the optional dependencies using:
$ pip install 'httpcore[asyncio]'
Alternatively, if you're working with the Python trio
package:
$ pip install 'httpcore[trio]'
We highly recommend trio
for async support. The trio
project pioneered the principles of structured concurrency, and has a more carefully constrained API against which to work from.
API differences
When using async support, you need make sure to use an async connection pool class:
# The async variation of `httpcore.ConnectionPool`
async with httpcore.AsyncConnectionPool() as http:
...
Sending requests
Sending requests with the async version of httpcore
requires the await
keyword:
import asyncio
import httpcore
async def main():
async with httpcore.AsyncConnectionPool() as http:
response = await http.request("GET", "https://www.example.com/")
asyncio.run(main())
When including content in the request, the content must either be bytes or an async iterable yielding bytes.
Streaming responses
Streaming responses also require a slightly different interface to the sync version:
with <pool>.stream(...) as response
→async with <pool>.stream() as response
.for chunk in response.iter_stream()
→async for chunk in response.aiter_stream()
.response.read()
→await response.aread()
.response.close()
→await response.aclose()
For example:
import asyncio
import httpcore
async def main():
async with httpcore.AsyncConnectionPool() as http:
async with http.stream("GET", "https://www.example.com/") as response:
async for chunk in response.aiter_stream():
print(f"Downloaded: {chunk}")
asyncio.run(main())
Pool lifespans
When using httpcore
in an async environment it is strongly recommended that you instantiate and use connection pools using the context managed style:
async with httpcore.AsyncConnectionPool() as http:
...
To benefit from connection pooling it is recommended that you instantiate a single connection pool in this style, and pass it around throughout your application.
If you do want to use a connection pool without this style then you'll need to ensure that you explicitly close the pool once it is no longer required:
try:
http = httpcore.AsyncConnectionPool()
...
finally:
await http.aclose()
This is a little different to the threaded context, where it's okay to simply instantiate a globally available connection pool, and then allow Python's garbage collection to deal with closing any connections in the pool, once the __del__
method is called.
The reason for this difference is that asynchronous code is not able to run within the context of the synchronous __del__
method, so there is no way for connections to be automatically closed at the point of garbage collection. This can lead to unterminated TCP connections still remaining after the Python interpreter quits.
Supported environments
HTTPX supports either asyncio
or trio
as an async environment.
It will auto-detect which of those two to use as the backend for socket operations and concurrency primitives.
AsyncIO
AsyncIO is Python's built-in library for writing concurrent code with the async/await syntax.
Let's take a look at sending several outgoing HTTP requests concurrently, using asyncio
:
import asyncio
import httpcore
import time
async def download(http, year):
await http.request("GET", f"https://en.wikipedia.org/wiki/{year}")
async def main():
async with httpcore.AsyncConnectionPool() as http:
started = time.time()
# Here we use `asyncio.gather()` in order to run several tasks concurrently...
tasks = [download(http, year) for year in range(2000, 2020)]
await asyncio.gather(*tasks)
complete = time.time()
for connection in http.connections:
print(connection)
print("Complete in %.3f seconds" % (complete - started))
asyncio.run(main())
Trio
Trio is an alternative async library, designed around the the principles of structured concurrency.
import httpcore
import trio
import time
async def download(http, year):
await http.request("GET", f"https://en.wikipedia.org/wiki/{year}")
async def main():
async with httpcore.AsyncConnectionPool() as http:
started = time.time()
async with trio.open_nursery() as nursery:
for year in range(2000, 2020):
nursery.start_soon(download, http, year)
complete = time.time()
for connection in http.connections:
print(connection)
print("Complete in %.3f seconds" % (complete - started))
trio.run(main)
AnyIO
AnyIO is an asynchronous networking and concurrency library that works on top of either asyncio or trio. It blends in with native libraries of your chosen backend (defaults to asyncio).
The anyio
library is designed around the the principles of structured concurrency, and brings many of the same correctness and usability benefits that Trio provides, while interoperating with existing asyncio
libraries.
import httpcore
import anyio
import time
async def download(http, year):
await http.request("GET", f"https://en.wikipedia.org/wiki/{year}")
async def main():
async with httpcore.AsyncConnectionPool() as http:
started = time.time()
async with anyio.create_task_group() as task_group:
for year in range(2000, 2020):
task_group.start_soon(download, http, year)
complete = time.time()
for connection in http.connections:
print(connection)
print("Complete in %.3f seconds" % (complete - started))
anyio.run(main)
Reference
httpcore.AsyncConnectionPool
A connection pool for making HTTP requests.
connections: list[AsyncConnectionInterface]
property
readonly
Return a list of the connections currently in the pool.
For example:
>>> pool.connections
[
<AsyncHTTPConnection ['https://example.com:443', HTTP/1.1, ACTIVE, Request Count: 6]>,
<AsyncHTTPConnection ['https://example.com:443', HTTP/1.1, IDLE, Request Count: 9]> ,
<AsyncHTTPConnection ['http://example.com:80', HTTP/1.1, IDLE, Request Count: 1]>,
]
__init__(self, ssl_context=None, proxy=None, max_connections=10, max_keepalive_connections=None, keepalive_expiry=None, http1=True, http2=False, retries=0, local_address=None, uds=None, network_backend=None, socket_options=None)
special
A connection pool for making HTTP requests.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ssl_context |
ssl.SSLContext | None |
An SSL context to use for verifying connections.
If not specified, the default |
None |
max_connections |
int | None |
The maximum number of concurrent HTTP connections that the pool should allow. Any attempt to send a request on a pool that would exceed this amount will block until a connection is available. |
10 |
max_keepalive_connections |
int | None |
The maximum number of idle HTTP connections that will be maintained in the pool. |
None |
keepalive_expiry |
float | None |
The duration in seconds that an idle HTTP connection may be maintained for before being expired from the pool. |
None |
http1 |
bool |
A boolean indicating if HTTP/1.1 requests should be supported by the connection pool. Defaults to True. |
True |
http2 |
bool |
A boolean indicating if HTTP/2 requests should be supported by the connection pool. Defaults to False. |
False |
retries |
int |
The maximum number of retries when trying to establish a connection. |
0 |
local_address |
str | None |
Local address to connect from. Can also be used to connect
using a particular address family. Using |
None |
uds |
str | None |
Path to a Unix Domain Socket to use instead of TCP sockets. |
None |
network_backend |
AsyncNetworkBackend | None |
A backend instance to use for handling network I/O. |
None |
socket_options |
Iterable[SOCKET_OPTION] | None |
Socket options that have to be included in the TCP socket when the connection was established. |
None |
Source code in httpcore/__init__.py
def __init__(
self,
ssl_context: ssl.SSLContext | None = None,
proxy: Proxy | None = None,
max_connections: int | None = 10,
max_keepalive_connections: int | None = None,
keepalive_expiry: float | None = None,
http1: bool = True,
http2: bool = False,
retries: int = 0,
local_address: str | None = None,
uds: str | None = None,
network_backend: AsyncNetworkBackend | None = None,
socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
) -> None:
"""
A connection pool for making HTTP requests.
Parameters:
ssl_context: An SSL context to use for verifying connections.
If not specified, the default `httpcore.default_ssl_context()`
will be used.
max_connections: The maximum number of concurrent HTTP connections that
the pool should allow. Any attempt to send a request on a pool that
would exceed this amount will block until a connection is available.
max_keepalive_connections: The maximum number of idle HTTP connections
that will be maintained in the pool.
keepalive_expiry: The duration in seconds that an idle HTTP connection
may be maintained for before being expired from the pool.
http1: A boolean indicating if HTTP/1.1 requests should be supported
by the connection pool. Defaults to True.
http2: A boolean indicating if HTTP/2 requests should be supported by
the connection pool. Defaults to False.
retries: The maximum number of retries when trying to establish a
connection.
local_address: Local address to connect from. Can also be used to connect
using a particular address family. Using `local_address="0.0.0.0"`
will connect using an `AF_INET` address (IPv4), while using
`local_address="::"` will connect using an `AF_INET6` address (IPv6).
uds: Path to a Unix Domain Socket to use instead of TCP sockets.
network_backend: A backend instance to use for handling network I/O.
socket_options: Socket options that have to be included
in the TCP socket when the connection was established.
"""
self._ssl_context = ssl_context
self._proxy = proxy
self._max_connections = (
sys.maxsize if max_connections is None else max_connections
)
self._max_keepalive_connections = (
sys.maxsize
if max_keepalive_connections is None
else max_keepalive_connections
)
self._max_keepalive_connections = min(
self._max_connections, self._max_keepalive_connections
)
self._keepalive_expiry = keepalive_expiry
self._http1 = http1
self._http2 = http2
self._retries = retries
self._local_address = local_address
self._uds = uds
self._network_backend = (
AutoBackend() if network_backend is None else network_backend
)
self._socket_options = socket_options
# The mutable state on a connection pool is the queue of incoming requests,
# and the set of connections that are servicing those requests.
self._connections: list[AsyncConnectionInterface] = []
self._requests: list[AsyncPoolRequest] = []
# We only mutate the state of the connection pool within an 'optional_thread_lock'
# context. This holds a threading lock unless we're running in async mode,
# in which case it is a no-op.
self._optional_thread_lock = AsyncThreadLock()
handle_async_request(self, request)
async
Send an HTTP request, and return an HTTP response.
This is the core implementation that is called into by .request()
or .stream()
.
Source code in httpcore/__init__.py
async def handle_async_request(self, request: Request) -> Response:
"""
Send an HTTP request, and return an HTTP response.
This is the core implementation that is called into by `.request()` or `.stream()`.
"""
scheme = request.url.scheme.decode()
if scheme == "":
raise UnsupportedProtocol(
"Request URL is missing an 'http://' or 'https://' protocol."
)
if scheme not in ("http", "https", "ws", "wss"):
raise UnsupportedProtocol(
f"Request URL has an unsupported protocol '{scheme}://'."
)
timeouts = request.extensions.get("timeout", {})
timeout = timeouts.get("pool", None)
with self._optional_thread_lock:
# Add the incoming request to our request queue.
pool_request = AsyncPoolRequest(request)
self._requests.append(pool_request)
try:
while True:
with self._optional_thread_lock:
# Assign incoming requests to available connections,
# closing or creating new connections as required.
closing = self._assign_requests_to_connections()
await self._close_connections(closing)
# Wait until this request has an assigned connection.
connection = await pool_request.wait_for_connection(timeout=timeout)
try:
# Send the request on the assigned connection.
response = await connection.handle_async_request(
pool_request.request
)
except ConnectionNotAvailable:
# In some cases a connection may initially be available to
# handle a request, but then become unavailable.
#
# In this case we clear the connection and try again.
pool_request.clear_connection()
else:
break # pragma: nocover
except BaseException as exc:
with self._optional_thread_lock:
# For any exception or cancellation we remove the request from
# the queue, and then re-assign requests to connections.
self._requests.remove(pool_request)
closing = self._assign_requests_to_connections()
await self._close_connections(closing)
raise exc from None
# Return the response. Note that in this case we still have to manage
# the point at which the response is closed.
assert isinstance(response.stream, typing.AsyncIterable)
return Response(
status=response.status,
headers=response.headers,
content=PoolByteStream(
stream=response.stream, pool_request=pool_request, pool=self
),
extensions=response.extensions,
)