Coverage for integrations / coding_agent / api.py: 72.3%

130 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial - Distributed Coding Agent API 

3 

4Thin API layer. All actual coding work flows through the existing 

5/chat endpoint (CREATE/REUSE pipeline). This just manages: 

6- Goals: what repo/objective to work on (admin, central-only) 

7- Opt-in: which agents contribute idle compute (user self-service) 

8- Stats: idle agent counts 

9 

10Security: 

11- Admin + central-only for goal management 

12- Auth required for all endpoints 

13- Repo allowlist via HEVOLVE_CODING_ALLOWED_REPOS 

14- Users can only opt themselves in/out (admin can do anyone) 

15""" 

16import os 

17import re 

18import logging 

19from functools import wraps 

20from typing import Optional 

21from flask import Blueprint, request, jsonify, g 

22 

23from integrations.social.auth import require_auth, require_admin 

24 

25logger = logging.getLogger('hevolve_social') 

26 

27coding_agent_bp = Blueprint('coding_agent', __name__) 

28 

29_IS_CENTRAL = os.environ.get('HEVOLVE_NODE_TIER') == 'central' 

30 

31ALLOWED_REPOS = [r.strip() for r in os.environ.get( 

32 'HEVOLVE_CODING_ALLOWED_REPOS', '').split(',') if r.strip()] 

33 

34 

35def _require_central(f): 

36 """Decorator: rejects request if node is not central.""" 

37 @wraps(f) 

38 def decorated(*args, **kwargs): 

39 if not _IS_CENTRAL: 

40 return jsonify({'success': False, 'error': 'Central node only'}), 403 

41 return f(*args, **kwargs) 

42 return decorated 

43 

44 

45def _validate_repo(repo_url: str) -> Optional[str]: 

46 """Returns error string if repo is not allowed, None if OK.""" 

47 if not repo_url: 

48 return 'repo_url is required' 

49 if '/' not in repo_url or len(repo_url.split('/')) != 2: 

50 return 'repo_url must be in owner/repo format' 

51 if not re.match(r'^[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+$', repo_url): 

52 return 'repo_url contains invalid characters' 

53 if ALLOWED_REPOS and repo_url not in ALLOWED_REPOS: 

54 return 'Repository not in allowlist' 

55 return None 

56 

57 

58# ─── Goals (admin + central only) ─── 

59 

60@coding_agent_bp.route('/api/coding/goals', methods=['POST']) 

61@require_admin 

62@_require_central 

63def create_goal(): 

64 from .goal_manager import CodingGoalManager 

65 

66 data = request.get_json() or {} 

67 repo_url = data.get('repo_url', '') 

68 error = _validate_repo(repo_url) 

69 if error: 

70 return jsonify({'success': False, 'error': error}), 400 

71 

72 result = CodingGoalManager.create_goal( 

73 g.db, 

74 title=data.get('title', ''), 

75 description=data.get('description', ''), 

76 repo_url=repo_url, 

77 branch=data.get('branch', 'main'), 

78 target_path=data.get('target_path', ''), 

79 created_by=str(g.user.id), 

80 ) 

81 return jsonify({'success': True, 'goal': result}) 

82 

83 

84@coding_agent_bp.route('/api/coding/goals', methods=['GET']) 

85@require_auth 

86def list_goals(): 

87 from .goal_manager import CodingGoalManager 

88 

89 status = request.args.get('status') 

90 goals = CodingGoalManager.list_goals(g.db, status=status) 

91 return jsonify({'success': True, 'goals': goals}) 

92 

93 

94@coding_agent_bp.route('/api/coding/goals/<goal_id>', methods=['GET']) 

95@require_auth 

96def get_goal(goal_id): 

97 from .goal_manager import CodingGoalManager 

98 

99 result = CodingGoalManager.get_goal(g.db, goal_id) 

100 return jsonify(result) 

101 

102 

103@coding_agent_bp.route('/api/coding/goals/<goal_id>', methods=['PATCH']) 

104@require_admin 

105@_require_central 

106def update_goal(goal_id): 

107 from .goal_manager import CodingGoalManager 

108 

109 data = request.get_json() or {} 

110 result = CodingGoalManager.update_goal_status(g.db, goal_id, data.get('status', 'active')) 

111 return jsonify(result) 

112 

113 

114# ─── Opt-In / Opt-Out (self-service) ─── 

115 

116@coding_agent_bp.route('/api/coding/opt-in', methods=['POST']) 

117@require_auth 

118def opt_in(): 

119 from .idle_detection import IdleDetectionService 

120 

121 data = request.get_json() or {} 

122 user_id = data.get('user_id') 

123 if not user_id: 

124 return jsonify({'success': False, 'error': 'user_id required'}), 400 

125 

126 if not g.user.is_admin and str(g.user.id) != str(user_id): 

127 return jsonify({'success': False, 'error': 'Can only opt in yourself'}), 403 

128 

129 result = IdleDetectionService.opt_in(g.db, user_id) 

130 return jsonify(result) 

131 

132 

133@coding_agent_bp.route('/api/coding/opt-out', methods=['POST']) 

134@require_auth 

135def opt_out(): 

136 from .idle_detection import IdleDetectionService 

137 

138 data = request.get_json() or {} 

139 user_id = data.get('user_id') 

140 if not user_id: 

141 return jsonify({'success': False, 'error': 'user_id required'}), 400 

142 

143 if not g.user.is_admin and str(g.user.id) != str(user_id): 

144 return jsonify({'success': False, 'error': 'Can only opt out yourself'}), 403 

145 

146 result = IdleDetectionService.opt_out(g.db, user_id) 

147 return jsonify(result) 

148 

149 

150# ─── Stats ─── 

151 

152@coding_agent_bp.route('/api/coding/idle-stats', methods=['GET']) 

153@require_admin 

154def idle_stats(): 

155 from .idle_detection import IdleDetectionService 

156 

157 stats = IdleDetectionService.get_idle_stats(g.db) 

158 return jsonify({'success': True, **stats}) 

159 

160 

161# ─── Distributed Execution Endpoint (receives shards from peers) ─── 

162 

163@coding_agent_bp.route('/coding/execute', methods=['POST']) 

164def execute_shard(): 

165 """Receive an encrypted coding shard from a peer, execute locally, return result. 

166 

167 Auth: The encrypted envelope IS the auth — only peers who know our X25519 

168 public key (exchanged during PeerLink handshake) can encrypt to us. 

169 Decryption failure = unauthorized. No additional token needed. 

170 """ 

171 import time 

172 

173 start = time.time() 

174 data = request.get_json(silent=True) or {} 

175 encrypted = data.get('encrypted') 

176 

177 if not encrypted: 

178 return jsonify({'error': 'Missing encrypted payload'}), 400 

179 

180 try: 

181 from security.channel_encryption import ( 

182 decrypt_json_from_peer, encrypt_json_for_peer 

183 ) 

184 

185 # Decrypt the shard — this IS the auth check. 

186 # Only peers with our public key (from PeerLink handshake) can produce 

187 # a valid envelope. Failed decryption = unauthorized peer. 

188 payload = decrypt_json_from_peer(encrypted) 

189 if not payload: 

190 logger.warning("[SHARD-EXEC] Decryption failed — unauthorized peer") 

191 return jsonify({'error': 'Unauthorized'}), 403 

192 

193 task = payload.get('task', '') 

194 task_type = payload.get('task_type', 'feature') 

195 preferred_tool = payload.get('preferred_tool', '') 

196 model = payload.get('model', '') 

197 file_content = payload.get('file_content', {}) 

198 

199 # Write shard files to a temporary working directory 

200 import tempfile 

201 with tempfile.TemporaryDirectory(prefix='hart_shard_') as tmpdir: 

202 for rel_path, content in file_content.items(): 

203 # Security: prevent path traversal 

204 safe_path = os.path.normpath(rel_path) 

205 if safe_path.startswith('..') or os.path.isabs(safe_path): 

206 logger.warning(f"[SHARD-EXEC] Path traversal blocked: {rel_path}") 

207 continue 

208 fpath = os.path.join(tmpdir, safe_path) 

209 os.makedirs(os.path.dirname(fpath), exist_ok=True) 

210 with open(fpath, 'w', encoding='utf-8') as f: 

211 f.write(content) 

212 

213 # Execute via local orchestrator (always local — we ARE the peer) 

214 from .orchestrator import get_coding_orchestrator 

215 orchestrator = get_coding_orchestrator() 

216 result = orchestrator._execute_local( 

217 task=task, 

218 task_type=task_type, 

219 preferred_tool=preferred_tool, 

220 user_id='peer', 

221 model=model, 

222 working_dir=tmpdir, 

223 ) 

224 

225 result['execution_time_s'] = time.time() - start 

226 

227 # Encrypt result back to sender using their public key (from INSIDE the 

228 # encrypted envelope — not from the plaintext request, which MITM could replace) 

229 sender_pub = payload.get('sender_public_key') 

230 if sender_pub: 

231 encrypted_result = encrypt_json_for_peer(result, sender_pub) 

232 return jsonify({'encrypted': encrypted_result}) 

233 else: 

234 # SAME_USER trust — no encryption needed for user's own machines 

235 return jsonify({'encrypted': None, 'result': result}) 

236 

237 except Exception as e: 

238 logger.error(f"[SHARD-EXEC] Error: {e}") 

239 return jsonify({'error': 'Execution failed'}), 500