Coverage for integrations / agent_engine / private_repo_access.py: 76.3%
152 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Private Repo Access Service — GitHub invite/revoke + access control
4Controls access to private repos (e.g. HevolveAI hivemind core).
5Regional hosts get push access via GitHub collaborator invite after
6steward approval. Central has full access. Local nodes are denied.
8Uses GitHub REST API via `gh` CLI or direct HTTP with PAT.
9"""
10import json
11import logging
12import os
13import re
14import subprocess
15import sys
16from typing import Dict, List, Optional
18# Windows: suppress console windows for all subprocess calls
19_SUBPROCESS_KW = {}
20if sys.platform == 'win32':
21 _SUBPROCESS_KW['creationflags'] = subprocess.CREATE_NO_WINDOW
23logger = logging.getLogger('hevolve_social')
25# GitHub username: alphanumeric + hyphens, 1-39 chars, no leading/trailing hyphen
26_GITHUB_USERNAME_RE = re.compile(r'^[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,37}[a-zA-Z0-9])?$')
29def _validate_github_username(username: str) -> bool:
30 """Validate GitHub username format to prevent API path traversal."""
31 return bool(username and _GITHUB_USERNAME_RE.match(username))
33_PRIVATE_REPOS = None
34_GITHUB_TOKEN = None
37def _get_private_repos() -> List[str]:
38 global _PRIVATE_REPOS
39 if _PRIVATE_REPOS is None:
40 raw = os.environ.get('HEVOLVE_PRIVATE_REPOS', '')
41 _PRIVATE_REPOS = [r.strip() for r in raw.split(',') if r.strip()]
42 return _PRIVATE_REPOS
45def _get_github_token() -> str:
46 global _GITHUB_TOKEN
47 if _GITHUB_TOKEN is None:
48 _GITHUB_TOKEN = os.environ.get('HEVOLVE_GITHUB_TOKEN', '')
49 return _GITHUB_TOKEN
52class PrivateRepoAccessService:
53 """Access control for private repos. Static methods only."""
55 @staticmethod
56 def is_private_repo(repo_url: str) -> bool:
57 """Check if a repo URL is in the private repos list."""
58 repos = _get_private_repos()
59 if not repos:
60 return False
61 # Normalize: strip .git, trailing slash
62 normalized = repo_url.rstrip('/').removesuffix('.git').lower()
63 for r in repos:
64 rn = r.rstrip('/').removesuffix('.git').lower()
65 if rn and (normalized == rn or normalized.endswith('/' + rn)):
66 return True
67 return False
69 @staticmethod
70 def verify_access(
71 node_certificate: Optional[Dict],
72 repo_url: str,
73 access_level: str = 'read',
74 ) -> Dict:
75 """Verify a node's access to a private repo.
77 Central: full read/write
78 Regional (with valid certificate + invite): push to branches
79 Local: DENIED
80 """
81 if not PrivateRepoAccessService.is_private_repo(repo_url):
82 return {'allowed': True, 'reason': 'Not a private repo'}
84 if not node_certificate:
85 return {'allowed': False, 'reason': 'No certificate provided'}
87 tier = node_certificate.get('tier', 'local')
89 if tier == 'central':
90 return {'allowed': True, 'tier': 'central',
91 'access_level': 'full'}
93 if tier == 'regional':
94 # Regional hosts can push if they have a valid certificate
95 try:
96 from security.key_delegation import verify_certificate_chain
97 valid = verify_certificate_chain(node_certificate)
98 if not valid:
99 return {'allowed': False,
100 'reason': 'Invalid certificate chain'}
101 except Exception as e:
102 logger.debug(f"Certificate verification failed: {e}")
103 return {'allowed': False,
104 'reason': f'Certificate verification error: {e}'}
106 if access_level in ('read', 'push'):
107 return {'allowed': True, 'tier': 'regional',
108 'access_level': 'push'}
109 return {'allowed': False,
110 'reason': 'Regional hosts limited to push access'}
112 return {'allowed': False, 'tier': tier,
113 'reason': 'Only central and regional hosts can access '
114 'private repos'}
116 @staticmethod
117 def send_github_invite(
118 repo_url: str,
119 github_username: str,
120 permission: str = 'push',
121 ) -> Dict:
122 """Send GitHub collaborator invite.
124 Uses gh CLI if available, falls back to HTTP API.
125 permission: 'pull', 'push', or 'admin'
126 """
127 if not github_username:
128 return {'invited': False, 'error': 'No GitHub username'}
129 if not _validate_github_username(github_username):
130 return {'invited': False,
131 'error': f'Invalid GitHub username format: {github_username}'}
133 owner_repo = _extract_owner_repo(repo_url)
134 if not owner_repo:
135 return {'invited': False,
136 'error': f'Cannot parse repo: {repo_url}'}
138 owner, repo = owner_repo
140 # Try gh CLI first
141 try:
142 result = subprocess.run(
143 ['gh', 'api', '--method', 'PUT',
144 f'repos/{owner}/{repo}/collaborators/{github_username}',
145 '-f', f'permission={permission}'],
146 capture_output=True, text=True, timeout=30,
147 **_SUBPROCESS_KW)
148 if result.returncode == 0:
149 logger.info(
150 f"GitHub invite sent: {github_username} → "
151 f"{owner}/{repo} ({permission})")
152 return {'invited': True, 'method': 'gh_cli',
153 'username': github_username}
154 except FileNotFoundError:
155 pass # gh CLI not installed, try HTTP
156 except Exception as e:
157 logger.debug(f"gh CLI invite failed: {e}")
159 # Fallback: direct HTTP with PAT
160 token = _get_github_token()
161 if not token:
162 return {'invited': False,
163 'error': 'No HEVOLVE_GITHUB_TOKEN configured'}
165 try:
166 from core.http_pool import pooled_put
167 resp = pooled_put(
168 f'https://api.github.com/repos/{owner}/{repo}'
169 f'/collaborators/{github_username}',
170 headers={
171 'Authorization': f'token {token}',
172 'Accept': 'application/vnd.github.v3+json',
173 },
174 json={'permission': permission},
175 timeout=30,
176 )
177 if resp.status_code in (201, 204):
178 logger.info(
179 f"GitHub invite sent via API: {github_username} → "
180 f"{owner}/{repo}")
181 return {'invited': True, 'method': 'http_api',
182 'username': github_username}
183 return {'invited': False, 'status': resp.status_code,
184 'error': resp.text[:200]}
185 except Exception as e:
186 return {'invited': False, 'error': str(e)}
188 @staticmethod
189 def revoke_github_access(
190 repo_url: str,
191 github_username: str,
192 ) -> Dict:
193 """Revoke GitHub collaborator access."""
194 if not github_username:
195 return {'revoked': False, 'error': 'No GitHub username'}
196 if not _validate_github_username(github_username):
197 return {'revoked': False,
198 'error': f'Invalid GitHub username format: {github_username}'}
200 owner_repo = _extract_owner_repo(repo_url)
201 if not owner_repo:
202 return {'revoked': False,
203 'error': f'Cannot parse repo: {repo_url}'}
205 owner, repo = owner_repo
207 # Try gh CLI first
208 try:
209 result = subprocess.run(
210 ['gh', 'api', '--method', 'DELETE',
211 f'repos/{owner}/{repo}/collaborators/{github_username}'],
212 capture_output=True, text=True, timeout=30,
213 **_SUBPROCESS_KW)
214 if result.returncode == 0:
215 logger.info(
216 f"GitHub access revoked: {github_username} from "
217 f"{owner}/{repo}")
218 return {'revoked': True, 'method': 'gh_cli'}
219 except FileNotFoundError:
220 pass
221 except Exception as e:
222 logger.debug(f"gh CLI revoke failed: {e}")
224 # Fallback: HTTP API
225 token = _get_github_token()
226 if not token:
227 return {'revoked': False,
228 'error': 'No HEVOLVE_GITHUB_TOKEN configured'}
230 try:
231 from core.http_pool import pooled_delete
232 resp = pooled_delete(
233 f'https://api.github.com/repos/{owner}/{repo}'
234 f'/collaborators/{github_username}',
235 headers={
236 'Authorization': f'token {token}',
237 'Accept': 'application/vnd.github.v3+json',
238 },
239 timeout=30,
240 )
241 if resp.status_code == 204:
242 return {'revoked': True, 'method': 'http_api'}
243 return {'revoked': False, 'status': resp.status_code,
244 'error': resp.text[:200]}
245 except Exception as e:
246 return {'revoked': False, 'error': str(e)}
248 @staticmethod
249 def split_repo_task(
250 task_description: str,
251 repo_url: str,
252 target_files: Optional[List[str]] = None,
253 ) -> List[Dict]:
254 """Central splits a full-repo task into file-level subtasks.
256 Regional hosts receive individual file-level subtasks instead of
257 full repository access. Central coordinates and merges results.
258 """
259 if not target_files:
260 # Infer target files from task description keywords
261 target_files = []
263 subtasks = []
264 for i, fpath in enumerate(target_files):
265 subtasks.append({
266 'subtask_id': i + 1,
267 'file_path': fpath,
268 'repo_url': repo_url,
269 'description': f'{task_description} [file: {fpath}]',
270 'access_level': 'push',
271 })
273 if not subtasks:
274 subtasks.append({
275 'subtask_id': 1,
276 'file_path': None,
277 'repo_url': repo_url,
278 'description': task_description,
279 'access_level': 'push',
280 })
282 return subtasks
284 @staticmethod
285 def create_file_extract(
286 repo_url: str,
287 file_paths: List[str],
288 ) -> Dict:
289 """Extract specific files from a repo for delegation.
291 Regional gets file content, NOT full clone.
292 Uses gh CLI to fetch individual file contents.
293 """
294 owner_repo = _extract_owner_repo(repo_url)
295 if not owner_repo:
296 return {'error': f'Cannot parse repo: {repo_url}'}
298 owner, repo = owner_repo
299 files = {}
301 for fpath in file_paths:
302 # Sanitize: reject path traversal attempts
303 if '..' in fpath or fpath.startswith('/') or '\\' in fpath:
304 files[fpath] = None
305 logger.warning(f"Path traversal rejected in file extract: {fpath}")
306 continue
307 try:
308 result = subprocess.run(
309 ['gh', 'api',
310 f'repos/{owner}/{repo}/contents/{fpath}',
311 '--jq', '.content'],
312 capture_output=True, text=True, timeout=30,
313 **_SUBPROCESS_KW)
314 if result.returncode == 0 and result.stdout.strip():
315 import base64
316 content = base64.b64decode(
317 result.stdout.strip()).decode('utf-8', errors='replace')
318 files[fpath] = content
319 else:
320 files[fpath] = None
321 except Exception as e:
322 files[fpath] = None
323 logger.debug(f"File extract failed for {fpath}: {e}")
325 return {'repo_url': repo_url, 'files': files,
326 'extracted': sum(1 for v in files.values() if v is not None)}
329def _extract_owner_repo(repo_url: str) -> Optional[tuple]:
330 """Extract (owner, repo) from a GitHub URL or owner/repo string."""
331 if not repo_url:
332 return None
334 # Handle owner/repo format
335 if '/' in repo_url and '://' not in repo_url:
336 parts = repo_url.strip('/').split('/')
337 if len(parts) == 2:
338 return (parts[0], parts[1].removesuffix('.git'))
340 # Handle full URL
341 try:
342 from urllib.parse import urlparse
343 parsed = urlparse(repo_url)
344 path = parsed.path.strip('/').removesuffix('.git')
345 parts = path.split('/')
346 if len(parts) >= 2:
347 return (parts[0], parts[1])
348 except Exception:
349 pass
351 return None