Coverage for security / tls_config.py: 34.1%
44 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2TLS Configuration
3Enforces HTTPS for all outbound HTTP calls and provides secure request sessions.
4Defends against man-in-the-middle attacks on internal service communication.
5"""
7import os
8import logging
9from urllib.parse import urlparse
11import requests
12from requests.adapters import HTTPAdapter
14logger = logging.getLogger('hevolve_security')
16_LOCALHOST_HOSTS = frozenset(['localhost', '127.0.0.1', '::1', '0.0.0.0'])
18# Singleton session
19_secure_session = None
22def get_secure_session() -> requests.Session:
23 """
24 Create or return a requests.Session with TLS verification enabled.
25 Uses CA bundle from HEVOLVE_CA_BUNDLE env var if set.
26 """
27 global _secure_session
28 if _secure_session is not None:
29 return _secure_session
31 session = requests.Session()
33 ca_bundle = os.environ.get('HEVOLVE_CA_BUNDLE')
34 if ca_bundle and os.path.exists(ca_bundle):
35 session.verify = ca_bundle
36 else:
37 session.verify = True
39 # Connection pooling with retry
40 adapter = HTTPAdapter(
41 pool_connections=20,
42 pool_maxsize=20,
43 max_retries=3,
44 )
45 session.mount('https://', adapter)
46 session.mount('http://', adapter)
48 _secure_session = session
49 return session
52def upgrade_url(url: str) -> str:
53 """
54 Upgrade http:// to https:// for non-localhost URLs in production.
55 In development mode (HEVOLVE_ENV=development), allows HTTP.
56 """
57 if os.environ.get('HEVOLVE_ENV') == 'development':
58 return url
60 parsed = urlparse(url)
61 if parsed.scheme == 'http' and parsed.hostname not in _LOCALHOST_HOSTS:
62 upgraded = url.replace('http://', 'https://', 1)
63 logger.debug(f"Upgraded URL to HTTPS: {parsed.hostname}")
64 return upgraded
65 return url
68def secure_request(method: str, url: str, **kwargs) -> requests.Response:
69 """
70 Make an HTTP request through the secure session with URL upgrade.
71 Drop-in replacement for requests.get/post/put/delete.
72 """
73 session = get_secure_session()
74 safe_url = upgrade_url(url)
76 # Set reasonable timeout if not provided
77 if 'timeout' not in kwargs:
78 kwargs['timeout'] = 30
80 return session.request(method, safe_url, **kwargs)
83def secure_get(url: str, **kwargs) -> requests.Response:
84 """Secure replacement for requests.get()."""
85 return secure_request('GET', url, **kwargs)
88def secure_post(url: str, **kwargs) -> requests.Response:
89 """Secure replacement for requests.post()."""
90 return secure_request('POST', url, **kwargs)
93def secure_put(url: str, **kwargs) -> requests.Response:
94 """Secure replacement for requests.put()."""
95 return secure_request('PUT', url, **kwargs)
98def secure_delete(url: str, **kwargs) -> requests.Response:
99 """Secure replacement for requests.delete()."""
100 return secure_request('DELETE', url, **kwargs)