""" Custom transports, with nicely configured defaults. The following additional keyword arguments are currently supported by httpcore... * uds: str * local_address: str * retries: int Example usages... # Disable HTTP/2 on a single specific domain. mounts = { "all://": httpx.HTTPTransport(http2=True), "all://*example.org": httpx.HTTPTransport() } # Using advanced httpcore configuration, with connection retries. transport = httpx.HTTPTransport(retries=1) client = httpx.Client(transport=transport) # Using advanced httpcore configuration, with unix domain sockets. transport = httpx.HTTPTransport(uds="socket.uds") client = httpx.Client(transport=transport) """ from __future__ import annotations import contextlib import typing from types import TracebackType import httpcore from .._config import DEFAULT_LIMITS, Limits, Proxy, create_ssl_context from .._exceptions import ( ConnectError, ConnectTimeout, LocalProtocolError, NetworkError, PoolTimeout, ProtocolError, ProxyError, ReadError, ReadTimeout, RemoteProtocolError, TimeoutException, UnsupportedProtocol, WriteError, WriteTimeout, ) from .._models import Request, Response from .._types import AsyncByteStream, CertTypes, ProxyTypes, SyncByteStream, VerifyTypes from .._urls import URL from .base import AsyncBaseTransport, BaseTransport T = typing.TypeVar("T", bound="HTTPTransport") A = typing.TypeVar("A", bound="AsyncHTTPTransport") SOCKET_OPTION = typing.Union[ typing.Tuple[int, int, int], typing.Tuple[int, int, typing.Union[bytes, bytearray]], typing.Tuple[int, int, None, int], ] __all__ = ["AsyncHTTPTransport", "HTTPTransport"] @contextlib.contextmanager def map_httpcore_exceptions() -> typing.Iterator[None]: try: yield except Exception as exc: mapped_exc = None for from_exc, to_exc in HTTPCORE_EXC_MAP.items(): if not isinstance(exc, from_exc): continue # We want to map to the most specific exception we can find. # Eg if `exc` is an `httpcore.ReadTimeout`, we want to map to # `httpx.ReadTimeout`, not just `httpx.TimeoutException`. if mapped_exc is None or issubclass(to_exc, mapped_exc): mapped_exc = to_exc if mapped_exc is None: # pragma: no cover raise message = str(exc) raise mapped_exc(message) from exc HTTPCORE_EXC_MAP = { httpcore.TimeoutException: TimeoutException, httpcore.ConnectTimeout: ConnectTimeout, httpcore.ReadTimeout: ReadTimeout, httpcore.WriteTimeout: WriteTimeout, httpcore.PoolTimeout: PoolTimeout, httpcore.NetworkError: NetworkError, httpcore.ConnectError: ConnectError, httpcore.ReadError: ReadError, httpcore.WriteError: WriteError, httpcore.ProxyError: ProxyError, httpcore.UnsupportedProtocol: UnsupportedProtocol, httpcore.ProtocolError: ProtocolError, httpcore.LocalProtocolError: LocalProtocolError, httpcore.RemoteProtocolError: RemoteProtocolError, } class ResponseStream(SyncByteStream): def __init__(self, httpcore_stream: typing.Iterable[bytes]) -> None: self._httpcore_stream = httpcore_stream def __iter__(self) -> typing.Iterator[bytes]: with map_httpcore_exceptions(): for part in self._httpcore_stream: yield part def close(self) -> None: if hasattr(self._httpcore_stream, "close"): self._httpcore_stream.close() class HTTPTransport(BaseTransport): def __init__( self, verify: VerifyTypes = True, cert: CertTypes | None = None, http1: bool = True, http2: bool = False, limits: Limits = DEFAULT_LIMITS, trust_env: bool = True, proxy: ProxyTypes | None = None, uds: str | None = None, local_address: str | None = None, retries: int = 0, socket_options: typing.Iterable[SOCKET_OPTION] | None = None, ) -> None: ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy if proxy is None: self._pool = httpcore.ConnectionPool( ssl_context=ssl_context, max_connections=limits.max_connections, max_keepalive_connections=limits.max_keepalive_connections, keepalive_expiry=limits.keepalive_expiry, http1=http1, http2=http2, uds=uds, local_address=local_address, retries=retries, socket_options=socket_options, ) elif proxy.url.scheme in ("http", "https"): self._pool = httpcore.HTTPProxy( proxy_url=httpcore.URL( scheme=proxy.url.raw_scheme, host=proxy.url.raw_host, port=proxy.url.port, target=proxy.url.raw_path, ), proxy_auth=proxy.raw_auth, proxy_headers=proxy.headers.raw, ssl_context=ssl_context, proxy_ssl_context=proxy.ssl_context, max_connections=limits.max_connections, max_keepalive_connections=limits.max_keepalive_connections, keepalive_expiry=limits.keepalive_expiry, http1=http1, http2=http2, socket_options=socket_options, ) elif proxy.url.scheme == "socks5": try: import socksio # noqa except ImportError: # pragma: no cover raise ImportError( "Using SOCKS proxy, but the 'socksio' package is not installed. " "Make sure to install httpx using `pip install httpx[socks]`." ) from None self._pool = httpcore.SOCKSProxy( proxy_url=httpcore.URL( scheme=proxy.url.raw_scheme, host=proxy.url.raw_host, port=proxy.url.port, target=proxy.url.raw_path, ), proxy_auth=proxy.raw_auth, ssl_context=ssl_context, max_connections=limits.max_connections, max_keepalive_connections=limits.max_keepalive_connections, keepalive_expiry=limits.keepalive_expiry, http1=http1, http2=http2, ) else: # pragma: no cover raise ValueError( "Proxy protocol must be either 'http', 'https', or 'socks5'," f" but got {proxy.url.scheme!r}." ) def __enter__(self: T) -> T: # Use generics for subclass support. self._pool.__enter__() return self def __exit__( self, exc_type: type[BaseException] | None = None, exc_value: BaseException | None = None, traceback: TracebackType | None = None, ) -> None: with map_httpcore_exceptions(): self._pool.__exit__(exc_type, exc_value, traceback) def handle_request( self, request: Request, ) -> Response: assert isinstance(request.stream, SyncByteStream) req = httpcore.Request( method=request.method, url=httpcore.URL( scheme=request.url.raw_scheme, host=request.url.raw_host, port=request.url.port, target=request.url.raw_path, ), headers=request.headers.raw, content=request.stream, extensions=request.extensions, ) with map_httpcore_exceptions(): resp = self._pool.handle_request(req) assert isinstance(resp.stream, typing.Iterable) return Response( status_code=resp.status, headers=resp.headers, stream=ResponseStream(resp.stream), extensions=resp.extensions, ) def close(self) -> None: self._pool.close() class AsyncResponseStream(AsyncByteStream): def __init__(self, httpcore_stream: typing.AsyncIterable[bytes]) -> None: self._httpcore_stream = httpcore_stream async def __aiter__(self) -> typing.AsyncIterator[bytes]: with map_httpcore_exceptions(): async for part in self._httpcore_stream: yield part async def aclose(self) -> None: if hasattr(self._httpcore_stream, "aclose"): await self._httpcore_stream.aclose() class AsyncHTTPTransport(AsyncBaseTransport): def __init__( self, verify: VerifyTypes = True, cert: CertTypes | None = None, http1: bool = True, http2: bool = False, limits: Limits = DEFAULT_LIMITS, trust_env: bool = True, proxy: ProxyTypes | None = None, uds: str | None = None, local_address: str | None = None, retries: int = 0, socket_options: typing.Iterable[SOCKET_OPTION] | None = None, ) -> None: ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy if proxy is None: self._pool = httpcore.AsyncConnectionPool( ssl_context=ssl_context, max_connections=limits.max_connections, max_keepalive_connections=limits.max_keepalive_connections, keepalive_expiry=limits.keepalive_expiry, http1=http1, http2=http2, uds=uds, local_address=local_address, retries=retries, socket_options=socket_options, ) elif proxy.url.scheme in ("http", "https"): self._pool = httpcore.AsyncHTTPProxy( proxy_url=httpcore.URL( scheme=proxy.url.raw_scheme, host=proxy.url.raw_host, port=proxy.url.port, target=proxy.url.raw_path, ), proxy_auth=proxy.raw_auth, proxy_headers=proxy.headers.raw, proxy_ssl_context=proxy.ssl_context, ssl_context=ssl_context, max_connections=limits.max_connections, max_keepalive_connections=limits.max_keepalive_connections, keepalive_expiry=limits.keepalive_expiry, http1=http1, http2=http2, socket_options=socket_options, ) elif proxy.url.scheme == "socks5": try: import socksio # noqa except ImportError: # pragma: no cover raise ImportError( "Using SOCKS proxy, but the 'socksio' package is not installed. " "Make sure to install httpx using `pip install httpx[socks]`." ) from None self._pool = httpcore.AsyncSOCKSProxy( proxy_url=httpcore.URL( scheme=proxy.url.raw_scheme, host=proxy.url.raw_host, port=proxy.url.port, target=proxy.url.raw_path, ), proxy_auth=proxy.raw_auth, ssl_context=ssl_context, max_connections=limits.max_connections, max_keepalive_connections=limits.max_keepalive_connections, keepalive_expiry=limits.keepalive_expiry, http1=http1, http2=http2, ) else: # pragma: no cover raise ValueError( "Proxy protocol must be either 'http', 'https', or 'socks5'," " but got {proxy.url.scheme!r}." ) async def __aenter__(self: A) -> A: # Use generics for subclass support. await self._pool.__aenter__() return self async def __aexit__( self, exc_type: type[BaseException] | None = None, exc_value: BaseException | None = None, traceback: TracebackType | None = None, ) -> None: with map_httpcore_exceptions(): await self._pool.__aexit__(exc_type, exc_value, traceback) async def handle_async_request( self, request: Request, ) -> Response: assert isinstance(request.stream, AsyncByteStream) req = httpcore.Request( method=request.method, url=httpcore.URL( scheme=request.url.raw_scheme, host=request.url.raw_host, port=request.url.port, target=request.url.raw_path, ), headers=request.headers.raw, content=request.stream, extensions=request.extensions, ) with map_httpcore_exceptions(): resp = await self._pool.handle_async_request(req) assert isinstance(resp.stream, typing.AsyncIterable) return Response( status_code=resp.status, headers=resp.headers, stream=AsyncResponseStream(resp.stream), extensions=resp.extensions, ) async def aclose(self) -> None: await self._pool.aclose()