Coverage for security / tls_config.py: 34.1%

44 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2TLS Configuration 

3Enforces HTTPS for all outbound HTTP calls and provides secure request sessions. 

4Defends against man-in-the-middle attacks on internal service communication. 

5""" 

6 

7import os 

8import logging 

9from urllib.parse import urlparse 

10 

11import requests 

12from requests.adapters import HTTPAdapter 

13 

14logger = logging.getLogger('hevolve_security') 

15 

16_LOCALHOST_HOSTS = frozenset(['localhost', '127.0.0.1', '::1', '0.0.0.0']) 

17 

18# Singleton session 

19_secure_session = None 

20 

21 

22def get_secure_session() -> requests.Session: 

23 """ 

24 Create or return a requests.Session with TLS verification enabled. 

25 Uses CA bundle from HEVOLVE_CA_BUNDLE env var if set. 

26 """ 

27 global _secure_session 

28 if _secure_session is not None: 

29 return _secure_session 

30 

31 session = requests.Session() 

32 

33 ca_bundle = os.environ.get('HEVOLVE_CA_BUNDLE') 

34 if ca_bundle and os.path.exists(ca_bundle): 

35 session.verify = ca_bundle 

36 else: 

37 session.verify = True 

38 

39 # Connection pooling with retry 

40 adapter = HTTPAdapter( 

41 pool_connections=20, 

42 pool_maxsize=20, 

43 max_retries=3, 

44 ) 

45 session.mount('https://', adapter) 

46 session.mount('http://', adapter) 

47 

48 _secure_session = session 

49 return session 

50 

51 

52def upgrade_url(url: str) -> str: 

53 """ 

54 Upgrade http:// to https:// for non-localhost URLs in production. 

55 In development mode (HEVOLVE_ENV=development), allows HTTP. 

56 """ 

57 if os.environ.get('HEVOLVE_ENV') == 'development': 

58 return url 

59 

60 parsed = urlparse(url) 

61 if parsed.scheme == 'http' and parsed.hostname not in _LOCALHOST_HOSTS: 

62 upgraded = url.replace('http://', 'https://', 1) 

63 logger.debug(f"Upgraded URL to HTTPS: {parsed.hostname}") 

64 return upgraded 

65 return url 

66 

67 

68def secure_request(method: str, url: str, **kwargs) -> requests.Response: 

69 """ 

70 Make an HTTP request through the secure session with URL upgrade. 

71 Drop-in replacement for requests.get/post/put/delete. 

72 """ 

73 session = get_secure_session() 

74 safe_url = upgrade_url(url) 

75 

76 # Set reasonable timeout if not provided 

77 if 'timeout' not in kwargs: 

78 kwargs['timeout'] = 30 

79 

80 return session.request(method, safe_url, **kwargs) 

81 

82 

83def secure_get(url: str, **kwargs) -> requests.Response: 

84 """Secure replacement for requests.get().""" 

85 return secure_request('GET', url, **kwargs) 

86 

87 

88def secure_post(url: str, **kwargs) -> requests.Response: 

89 """Secure replacement for requests.post().""" 

90 return secure_request('POST', url, **kwargs) 

91 

92 

93def secure_put(url: str, **kwargs) -> requests.Response: 

94 """Secure replacement for requests.put().""" 

95 return secure_request('PUT', url, **kwargs) 

96 

97 

98def secure_delete(url: str, **kwargs) -> requests.Response: 

99 """Secure replacement for requests.delete().""" 

100 return secure_request('DELETE', url, **kwargs)