Coverage for core / http_pool.py: 96.5%

57 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Connection-pooled HTTP session. 

3 

4Replaces 81+ bare `requests.post()` / `requests.get()` calls across the codebase 

5with a shared session that reuses TCP connections via keep-alive. 

6 

7Before: Each HTTP call opens a new TCP connection + TLS handshake. 

8After: Connections are pooled and reused (10 pool connections, 20 max per host). 

9 

10Typical improvement: 40-60% latency reduction on repeated calls to same host. 

11 

12Retry policy: 

13 - localhost: 0 retries (dead local services should fail instantly, not block 15s) 

14 - remote: 2 retries with 0.5s backoff (network can be flaky) 

15""" 

16 

17import logging 

18import threading 

19 

20import requests 

21from requests.adapters import HTTPAdapter 

22from urllib3.util.retry import Retry 

23 

24logger = logging.getLogger('hevolve_core') 

25 

26# urllib3.connectionpool emits a WARNING per retry attempt 

27# ("Retrying ... after connection broken by 'NameResolutionError'") 

28# which floods the log when central.hevolve.ai is unreachable 

29# (offline laptop, DNS down, captive portal). Real connection 

30# failures still surface via caller try/except handlers (see 

31# core.superadmin_report._post_report's logger.debug, peer_discovery's 

32# PeerBackoff exponential backoff, etc.) - urllib3's per-retry 

33# WARNING is pure noise on top of those. 

34# 

35# Downgrading to ERROR keeps genuine connection-pool issues visible 

36# while silencing the per-retry retry chatter. 

37logging.getLogger('urllib3.connectionpool').setLevel(logging.ERROR) 

38 

39# autobahn.asyncio.component emits a WARNING per connect-retry when a 

40# Crossbar WAMP router isn't running ("Connection failed with OS 

41# error: ConnectionRefusedError" + "trying transport 0 ws://localhost 

42# :8088/ws using connect delay 300"). The component's own retry loop 

43# already implements exponential backoff up to 300s; the per-attempt 

44# WARNING is informational noise on flat-mode installs that don't run 

45# Crossbar. Real session failures still surface via core/platform/ 

46# events.py:_run's "WAMP component exited: %s" warning, which carries 

47# context (which url, which realm) the autobahn line lacks. Bumping 

48# the autobahn logger to ERROR keeps genuine connect-attempt errors 

49# (e.g. ssl handshake failure on a real Crossbar URL) visible. 

50logging.getLogger('autobahn.asyncio.component').setLevel(logging.ERROR) 

51# autobahn.wamp.component (parent module) ALSO emits the connect-error 

52# traceback at handle_connect_error - separate logger from the 

53# .asyncio.component child. Plus txaio.aio formats the traceback at 

54# its own level. Without silencing both, the traceback still surfaces 

55# even when .asyncio.component is at ERROR (the traceback reaches the 

56# root logger via stderr emit). Bumping all three keeps the log clean 

57# on flat installs without Crossbar. 

58logging.getLogger('autobahn.wamp.component').setLevel(logging.ERROR) 

59logging.getLogger('autobahn').setLevel(logging.ERROR) 

60logging.getLogger('txaio').setLevel(logging.ERROR) 

61# Same logic for asyncio's "socket.send() raised exception" pairs that 

62# fire alongside the failed Crossbar connect attempts - the underlying 

63# transport-failure event is already covered by the autobahn warning 

64# we just silenced; asyncio's transport-level chatter is redundant. 

65logging.getLogger('asyncio').setLevel(logging.ERROR) 

66 

67_session = None 

68_session_lock = threading.Lock() 

69 

70# Default timeout for all requests (connect, read) in seconds 

71DEFAULT_TIMEOUT = (3, 15) 

72 

73 

74def get_http_session() -> requests.Session: 

75 """ 

76 Get or create a connection-pooled requests.Session. 

77 Thread-safe singleton. 

78 """ 

79 global _session 

80 if _session is not None: 

81 return _session 

82 

83 with _session_lock: 

84 if _session is not None: 

85 return _session 

86 

87 session = requests.Session() 

88 

89 # Localhost: zero retries — dead local services should fail instantly. 

90 # This prevents the retry storm (36 failed TCP connects/min) that kills 

91 # the system when optional sidecars (MiniCPM:9891, etc.) aren't running. 

92 local_adapter = HTTPAdapter( 

93 pool_connections=10, 

94 pool_maxsize=20, 

95 max_retries=Retry(total=0), 

96 ) 

97 

98 # Remote: modest retries with backoff (network can be flaky) 

99 remote_retry = Retry( 

100 total=2, 

101 backoff_factor=0.5, 

102 status_forcelist=[502, 503, 504], 

103 allowed_methods=["GET", "POST", "PUT", "PATCH", "DELETE"], 

104 ) 

105 remote_adapter = HTTPAdapter( 

106 pool_connections=10, 

107 pool_maxsize=20, 

108 max_retries=remote_retry, 

109 ) 

110 

111 session.mount('http://localhost', local_adapter) 

112 session.mount('http://127.0.0.1', local_adapter) 

113 session.mount('http://', remote_adapter) 

114 session.mount('https://', remote_adapter) 

115 

116 # Default headers 

117 session.headers.update({ 

118 'Content-Type': 'application/json', 

119 }) 

120 

121 _session = session 

122 logger.info("HTTP pool initialized (localhost=0 retries, remote=2 retries)") 

123 return _session 

124 

125 

126def pooled_get(url: str, timeout=DEFAULT_TIMEOUT, **kwargs) -> requests.Response: 

127 """Connection-pooled GET request.""" 

128 return get_http_session().get(url, timeout=timeout, **kwargs) 

129 

130 

131def pooled_post(url: str, timeout=DEFAULT_TIMEOUT, **kwargs) -> requests.Response: 

132 """Connection-pooled POST request.""" 

133 resp = get_http_session().post(url, timeout=timeout, **kwargs) 

134 # Log LLM input/output for observability 

135 if '/chat/completions' in url: 

136 try: 

137 import json as _json 

138 body = kwargs.get('json', {}) 

139 msgs = body.get('messages', []) 

140 prompt_preview = msgs[-1].get('content', '')[:200] if msgs else '' 

141 rj = resp.json() 

142 content = rj.get('choices', [{}])[0].get('message', {}).get('content', '') 

143 reasoning = rj.get('choices', [{}])[0].get('message', {}).get('reasoning_content', '') 

144 usage = rj.get('usage', {}) 

145 logger.info( 

146 f"[LLM] IN: {prompt_preview}... | " 

147 f"OUT({usage.get('completion_tokens',0)}tok): {content[:200]}... | " 

148 f"THINK: {len(reasoning)}chars") 

149 except Exception: 

150 pass 

151 return resp 

152 

153 

154def pooled_put(url: str, timeout=DEFAULT_TIMEOUT, **kwargs) -> requests.Response: 

155 """Connection-pooled PUT request.""" 

156 return get_http_session().put(url, timeout=timeout, **kwargs) 

157 

158 

159def pooled_patch(url: str, timeout=DEFAULT_TIMEOUT, **kwargs) -> requests.Response: 

160 """Connection-pooled PATCH request.""" 

161 return get_http_session().patch(url, timeout=timeout, **kwargs) 

162 

163 

164def pooled_delete(url: str, timeout=DEFAULT_TIMEOUT, **kwargs) -> requests.Response: 

165 """Connection-pooled DELETE request.""" 

166 return get_http_session().delete(url, timeout=timeout, **kwargs) 

167 

168 

169def pooled_request(method: str, url: str, timeout=DEFAULT_TIMEOUT, **kwargs) -> requests.Response: 

170 """Connection-pooled generic request.""" 

171 return get_http_session().request(method, url, timeout=timeout, **kwargs)