Coverage for integrations / coding_agent / api.py: 72.3%
130 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial - Distributed Coding Agent API
4Thin API layer. All actual coding work flows through the existing
5/chat endpoint (CREATE/REUSE pipeline). This just manages:
6- Goals: what repo/objective to work on (admin, central-only)
7- Opt-in: which agents contribute idle compute (user self-service)
8- Stats: idle agent counts
10Security:
11- Admin + central-only for goal management
12- Auth required for all endpoints
13- Repo allowlist via HEVOLVE_CODING_ALLOWED_REPOS
14- Users can only opt themselves in/out (admin can do anyone)
15"""
16import os
17import re
18import logging
19from functools import wraps
20from typing import Optional
21from flask import Blueprint, request, jsonify, g
23from integrations.social.auth import require_auth, require_admin
25logger = logging.getLogger('hevolve_social')
27coding_agent_bp = Blueprint('coding_agent', __name__)
29_IS_CENTRAL = os.environ.get('HEVOLVE_NODE_TIER') == 'central'
31ALLOWED_REPOS = [r.strip() for r in os.environ.get(
32 'HEVOLVE_CODING_ALLOWED_REPOS', '').split(',') if r.strip()]
35def _require_central(f):
36 """Decorator: rejects request if node is not central."""
37 @wraps(f)
38 def decorated(*args, **kwargs):
39 if not _IS_CENTRAL:
40 return jsonify({'success': False, 'error': 'Central node only'}), 403
41 return f(*args, **kwargs)
42 return decorated
45def _validate_repo(repo_url: str) -> Optional[str]:
46 """Returns error string if repo is not allowed, None if OK."""
47 if not repo_url:
48 return 'repo_url is required'
49 if '/' not in repo_url or len(repo_url.split('/')) != 2:
50 return 'repo_url must be in owner/repo format'
51 if not re.match(r'^[a-zA-Z0-9_.-]+/[a-zA-Z0-9_.-]+$', repo_url):
52 return 'repo_url contains invalid characters'
53 if ALLOWED_REPOS and repo_url not in ALLOWED_REPOS:
54 return 'Repository not in allowlist'
55 return None
58# ─── Goals (admin + central only) ───
60@coding_agent_bp.route('/api/coding/goals', methods=['POST'])
61@require_admin
62@_require_central
63def create_goal():
64 from .goal_manager import CodingGoalManager
66 data = request.get_json() or {}
67 repo_url = data.get('repo_url', '')
68 error = _validate_repo(repo_url)
69 if error:
70 return jsonify({'success': False, 'error': error}), 400
72 result = CodingGoalManager.create_goal(
73 g.db,
74 title=data.get('title', ''),
75 description=data.get('description', ''),
76 repo_url=repo_url,
77 branch=data.get('branch', 'main'),
78 target_path=data.get('target_path', ''),
79 created_by=str(g.user.id),
80 )
81 return jsonify({'success': True, 'goal': result})
84@coding_agent_bp.route('/api/coding/goals', methods=['GET'])
85@require_auth
86def list_goals():
87 from .goal_manager import CodingGoalManager
89 status = request.args.get('status')
90 goals = CodingGoalManager.list_goals(g.db, status=status)
91 return jsonify({'success': True, 'goals': goals})
94@coding_agent_bp.route('/api/coding/goals/<goal_id>', methods=['GET'])
95@require_auth
96def get_goal(goal_id):
97 from .goal_manager import CodingGoalManager
99 result = CodingGoalManager.get_goal(g.db, goal_id)
100 return jsonify(result)
103@coding_agent_bp.route('/api/coding/goals/<goal_id>', methods=['PATCH'])
104@require_admin
105@_require_central
106def update_goal(goal_id):
107 from .goal_manager import CodingGoalManager
109 data = request.get_json() or {}
110 result = CodingGoalManager.update_goal_status(g.db, goal_id, data.get('status', 'active'))
111 return jsonify(result)
114# ─── Opt-In / Opt-Out (self-service) ───
116@coding_agent_bp.route('/api/coding/opt-in', methods=['POST'])
117@require_auth
118def opt_in():
119 from .idle_detection import IdleDetectionService
121 data = request.get_json() or {}
122 user_id = data.get('user_id')
123 if not user_id:
124 return jsonify({'success': False, 'error': 'user_id required'}), 400
126 if not g.user.is_admin and str(g.user.id) != str(user_id):
127 return jsonify({'success': False, 'error': 'Can only opt in yourself'}), 403
129 result = IdleDetectionService.opt_in(g.db, user_id)
130 return jsonify(result)
133@coding_agent_bp.route('/api/coding/opt-out', methods=['POST'])
134@require_auth
135def opt_out():
136 from .idle_detection import IdleDetectionService
138 data = request.get_json() or {}
139 user_id = data.get('user_id')
140 if not user_id:
141 return jsonify({'success': False, 'error': 'user_id required'}), 400
143 if not g.user.is_admin and str(g.user.id) != str(user_id):
144 return jsonify({'success': False, 'error': 'Can only opt out yourself'}), 403
146 result = IdleDetectionService.opt_out(g.db, user_id)
147 return jsonify(result)
150# ─── Stats ───
152@coding_agent_bp.route('/api/coding/idle-stats', methods=['GET'])
153@require_admin
154def idle_stats():
155 from .idle_detection import IdleDetectionService
157 stats = IdleDetectionService.get_idle_stats(g.db)
158 return jsonify({'success': True, **stats})
161# ─── Distributed Execution Endpoint (receives shards from peers) ───
163@coding_agent_bp.route('/coding/execute', methods=['POST'])
164def execute_shard():
165 """Receive an encrypted coding shard from a peer, execute locally, return result.
167 Auth: The encrypted envelope IS the auth — only peers who know our X25519
168 public key (exchanged during PeerLink handshake) can encrypt to us.
169 Decryption failure = unauthorized. No additional token needed.
170 """
171 import time
173 start = time.time()
174 data = request.get_json(silent=True) or {}
175 encrypted = data.get('encrypted')
177 if not encrypted:
178 return jsonify({'error': 'Missing encrypted payload'}), 400
180 try:
181 from security.channel_encryption import (
182 decrypt_json_from_peer, encrypt_json_for_peer
183 )
185 # Decrypt the shard — this IS the auth check.
186 # Only peers with our public key (from PeerLink handshake) can produce
187 # a valid envelope. Failed decryption = unauthorized peer.
188 payload = decrypt_json_from_peer(encrypted)
189 if not payload:
190 logger.warning("[SHARD-EXEC] Decryption failed — unauthorized peer")
191 return jsonify({'error': 'Unauthorized'}), 403
193 task = payload.get('task', '')
194 task_type = payload.get('task_type', 'feature')
195 preferred_tool = payload.get('preferred_tool', '')
196 model = payload.get('model', '')
197 file_content = payload.get('file_content', {})
199 # Write shard files to a temporary working directory
200 import tempfile
201 with tempfile.TemporaryDirectory(prefix='hart_shard_') as tmpdir:
202 for rel_path, content in file_content.items():
203 # Security: prevent path traversal
204 safe_path = os.path.normpath(rel_path)
205 if safe_path.startswith('..') or os.path.isabs(safe_path):
206 logger.warning(f"[SHARD-EXEC] Path traversal blocked: {rel_path}")
207 continue
208 fpath = os.path.join(tmpdir, safe_path)
209 os.makedirs(os.path.dirname(fpath), exist_ok=True)
210 with open(fpath, 'w', encoding='utf-8') as f:
211 f.write(content)
213 # Execute via local orchestrator (always local — we ARE the peer)
214 from .orchestrator import get_coding_orchestrator
215 orchestrator = get_coding_orchestrator()
216 result = orchestrator._execute_local(
217 task=task,
218 task_type=task_type,
219 preferred_tool=preferred_tool,
220 user_id='peer',
221 model=model,
222 working_dir=tmpdir,
223 )
225 result['execution_time_s'] = time.time() - start
227 # Encrypt result back to sender using their public key (from INSIDE the
228 # encrypted envelope — not from the plaintext request, which MITM could replace)
229 sender_pub = payload.get('sender_public_key')
230 if sender_pub:
231 encrypted_result = encrypt_json_for_peer(result, sender_pub)
232 return jsonify({'encrypted': encrypted_result})
233 else:
234 # SAME_USER trust — no encryption needed for user's own machines
235 return jsonify({'encrypted': None, 'result': result})
237 except Exception as e:
238 logger.error(f"[SHARD-EXEC] Error: {e}")
239 return jsonify({'error': 'Execution failed'}), 500