Coverage for core / http_pool.py: 96.5%
57 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Connection-pooled HTTP session.
4Replaces 81+ bare `requests.post()` / `requests.get()` calls across the codebase
5with a shared session that reuses TCP connections via keep-alive.
7Before: Each HTTP call opens a new TCP connection + TLS handshake.
8After: Connections are pooled and reused (10 pool connections, 20 max per host).
10Typical improvement: 40-60% latency reduction on repeated calls to same host.
12Retry policy:
13 - localhost: 0 retries (dead local services should fail instantly, not block 15s)
14 - remote: 2 retries with 0.5s backoff (network can be flaky)
15"""
17import logging
18import threading
20import requests
21from requests.adapters import HTTPAdapter
22from urllib3.util.retry import Retry
24logger = logging.getLogger('hevolve_core')
26# urllib3.connectionpool emits a WARNING per retry attempt
27# ("Retrying ... after connection broken by 'NameResolutionError'")
28# which floods the log when central.hevolve.ai is unreachable
29# (offline laptop, DNS down, captive portal). Real connection
30# failures still surface via caller try/except handlers (see
31# core.superadmin_report._post_report's logger.debug, peer_discovery's
32# PeerBackoff exponential backoff, etc.) - urllib3's per-retry
33# WARNING is pure noise on top of those.
34#
35# Downgrading to ERROR keeps genuine connection-pool issues visible
36# while silencing the per-retry retry chatter.
37logging.getLogger('urllib3.connectionpool').setLevel(logging.ERROR)
39# autobahn.asyncio.component emits a WARNING per connect-retry when a
40# Crossbar WAMP router isn't running ("Connection failed with OS
41# error: ConnectionRefusedError" + "trying transport 0 ws://localhost
42# :8088/ws using connect delay 300"). The component's own retry loop
43# already implements exponential backoff up to 300s; the per-attempt
44# WARNING is informational noise on flat-mode installs that don't run
45# Crossbar. Real session failures still surface via core/platform/
46# events.py:_run's "WAMP component exited: %s" warning, which carries
47# context (which url, which realm) the autobahn line lacks. Bumping
48# the autobahn logger to ERROR keeps genuine connect-attempt errors
49# (e.g. ssl handshake failure on a real Crossbar URL) visible.
50logging.getLogger('autobahn.asyncio.component').setLevel(logging.ERROR)
51# autobahn.wamp.component (parent module) ALSO emits the connect-error
52# traceback at handle_connect_error - separate logger from the
53# .asyncio.component child. Plus txaio.aio formats the traceback at
54# its own level. Without silencing both, the traceback still surfaces
55# even when .asyncio.component is at ERROR (the traceback reaches the
56# root logger via stderr emit). Bumping all three keeps the log clean
57# on flat installs without Crossbar.
58logging.getLogger('autobahn.wamp.component').setLevel(logging.ERROR)
59logging.getLogger('autobahn').setLevel(logging.ERROR)
60logging.getLogger('txaio').setLevel(logging.ERROR)
61# Same logic for asyncio's "socket.send() raised exception" pairs that
62# fire alongside the failed Crossbar connect attempts - the underlying
63# transport-failure event is already covered by the autobahn warning
64# we just silenced; asyncio's transport-level chatter is redundant.
65logging.getLogger('asyncio').setLevel(logging.ERROR)
67_session = None
68_session_lock = threading.Lock()
70# Default timeout for all requests (connect, read) in seconds
71DEFAULT_TIMEOUT = (3, 15)
74def get_http_session() -> requests.Session:
75 """
76 Get or create a connection-pooled requests.Session.
77 Thread-safe singleton.
78 """
79 global _session
80 if _session is not None:
81 return _session
83 with _session_lock:
84 if _session is not None:
85 return _session
87 session = requests.Session()
89 # Localhost: zero retries — dead local services should fail instantly.
90 # This prevents the retry storm (36 failed TCP connects/min) that kills
91 # the system when optional sidecars (MiniCPM:9891, etc.) aren't running.
92 local_adapter = HTTPAdapter(
93 pool_connections=10,
94 pool_maxsize=20,
95 max_retries=Retry(total=0),
96 )
98 # Remote: modest retries with backoff (network can be flaky)
99 remote_retry = Retry(
100 total=2,
101 backoff_factor=0.5,
102 status_forcelist=[502, 503, 504],
103 allowed_methods=["GET", "POST", "PUT", "PATCH", "DELETE"],
104 )
105 remote_adapter = HTTPAdapter(
106 pool_connections=10,
107 pool_maxsize=20,
108 max_retries=remote_retry,
109 )
111 session.mount('http://localhost', local_adapter)
112 session.mount('http://127.0.0.1', local_adapter)
113 session.mount('http://', remote_adapter)
114 session.mount('https://', remote_adapter)
116 # Default headers
117 session.headers.update({
118 'Content-Type': 'application/json',
119 })
121 _session = session
122 logger.info("HTTP pool initialized (localhost=0 retries, remote=2 retries)")
123 return _session
126def pooled_get(url: str, timeout=DEFAULT_TIMEOUT, **kwargs) -> requests.Response:
127 """Connection-pooled GET request."""
128 return get_http_session().get(url, timeout=timeout, **kwargs)
131def pooled_post(url: str, timeout=DEFAULT_TIMEOUT, **kwargs) -> requests.Response:
132 """Connection-pooled POST request."""
133 resp = get_http_session().post(url, timeout=timeout, **kwargs)
134 # Log LLM input/output for observability
135 if '/chat/completions' in url:
136 try:
137 import json as _json
138 body = kwargs.get('json', {})
139 msgs = body.get('messages', [])
140 prompt_preview = msgs[-1].get('content', '')[:200] if msgs else ''
141 rj = resp.json()
142 content = rj.get('choices', [{}])[0].get('message', {}).get('content', '')
143 reasoning = rj.get('choices', [{}])[0].get('message', {}).get('reasoning_content', '')
144 usage = rj.get('usage', {})
145 logger.info(
146 f"[LLM] IN: {prompt_preview}... | "
147 f"OUT({usage.get('completion_tokens',0)}tok): {content[:200]}... | "
148 f"THINK: {len(reasoning)}chars")
149 except Exception:
150 pass
151 return resp
154def pooled_put(url: str, timeout=DEFAULT_TIMEOUT, **kwargs) -> requests.Response:
155 """Connection-pooled PUT request."""
156 return get_http_session().put(url, timeout=timeout, **kwargs)
159def pooled_patch(url: str, timeout=DEFAULT_TIMEOUT, **kwargs) -> requests.Response:
160 """Connection-pooled PATCH request."""
161 return get_http_session().patch(url, timeout=timeout, **kwargs)
164def pooled_delete(url: str, timeout=DEFAULT_TIMEOUT, **kwargs) -> requests.Response:
165 """Connection-pooled DELETE request."""
166 return get_http_session().delete(url, timeout=timeout, **kwargs)
169def pooled_request(method: str, url: str, timeout=DEFAULT_TIMEOUT, **kwargs) -> requests.Response:
170 """Connection-pooled generic request."""
171 return get_http_session().request(method, url, timeout=timeout, **kwargs)