Coverage for integrations / agent_engine / private_repo_access.py: 76.3%

152 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Private Repo Access Service — GitHub invite/revoke + access control 

3 

4Controls access to private repos (e.g. HevolveAI hivemind core). 

5Regional hosts get push access via GitHub collaborator invite after 

6steward approval. Central has full access. Local nodes are denied. 

7 

8Uses GitHub REST API via `gh` CLI or direct HTTP with PAT. 

9""" 

10import json 

11import logging 

12import os 

13import re 

14import subprocess 

15import sys 

16from typing import Dict, List, Optional 

17 

18# Windows: suppress console windows for all subprocess calls 

19_SUBPROCESS_KW = {} 

20if sys.platform == 'win32': 

21 _SUBPROCESS_KW['creationflags'] = subprocess.CREATE_NO_WINDOW 

22 

23logger = logging.getLogger('hevolve_social') 

24 

25# GitHub username: alphanumeric + hyphens, 1-39 chars, no leading/trailing hyphen 

26_GITHUB_USERNAME_RE = re.compile(r'^[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,37}[a-zA-Z0-9])?$') 

27 

28 

29def _validate_github_username(username: str) -> bool: 

30 """Validate GitHub username format to prevent API path traversal.""" 

31 return bool(username and _GITHUB_USERNAME_RE.match(username)) 

32 

33_PRIVATE_REPOS = None 

34_GITHUB_TOKEN = None 

35 

36 

37def _get_private_repos() -> List[str]: 

38 global _PRIVATE_REPOS 

39 if _PRIVATE_REPOS is None: 

40 raw = os.environ.get('HEVOLVE_PRIVATE_REPOS', '') 

41 _PRIVATE_REPOS = [r.strip() for r in raw.split(',') if r.strip()] 

42 return _PRIVATE_REPOS 

43 

44 

45def _get_github_token() -> str: 

46 global _GITHUB_TOKEN 

47 if _GITHUB_TOKEN is None: 

48 _GITHUB_TOKEN = os.environ.get('HEVOLVE_GITHUB_TOKEN', '') 

49 return _GITHUB_TOKEN 

50 

51 

52class PrivateRepoAccessService: 

53 """Access control for private repos. Static methods only.""" 

54 

55 @staticmethod 

56 def is_private_repo(repo_url: str) -> bool: 

57 """Check if a repo URL is in the private repos list.""" 

58 repos = _get_private_repos() 

59 if not repos: 

60 return False 

61 # Normalize: strip .git, trailing slash 

62 normalized = repo_url.rstrip('/').removesuffix('.git').lower() 

63 for r in repos: 

64 rn = r.rstrip('/').removesuffix('.git').lower() 

65 if rn and (normalized == rn or normalized.endswith('/' + rn)): 

66 return True 

67 return False 

68 

69 @staticmethod 

70 def verify_access( 

71 node_certificate: Optional[Dict], 

72 repo_url: str, 

73 access_level: str = 'read', 

74 ) -> Dict: 

75 """Verify a node's access to a private repo. 

76 

77 Central: full read/write 

78 Regional (with valid certificate + invite): push to branches 

79 Local: DENIED 

80 """ 

81 if not PrivateRepoAccessService.is_private_repo(repo_url): 

82 return {'allowed': True, 'reason': 'Not a private repo'} 

83 

84 if not node_certificate: 

85 return {'allowed': False, 'reason': 'No certificate provided'} 

86 

87 tier = node_certificate.get('tier', 'local') 

88 

89 if tier == 'central': 

90 return {'allowed': True, 'tier': 'central', 

91 'access_level': 'full'} 

92 

93 if tier == 'regional': 

94 # Regional hosts can push if they have a valid certificate 

95 try: 

96 from security.key_delegation import verify_certificate_chain 

97 valid = verify_certificate_chain(node_certificate) 

98 if not valid: 

99 return {'allowed': False, 

100 'reason': 'Invalid certificate chain'} 

101 except Exception as e: 

102 logger.debug(f"Certificate verification failed: {e}") 

103 return {'allowed': False, 

104 'reason': f'Certificate verification error: {e}'} 

105 

106 if access_level in ('read', 'push'): 

107 return {'allowed': True, 'tier': 'regional', 

108 'access_level': 'push'} 

109 return {'allowed': False, 

110 'reason': 'Regional hosts limited to push access'} 

111 

112 return {'allowed': False, 'tier': tier, 

113 'reason': 'Only central and regional hosts can access ' 

114 'private repos'} 

115 

116 @staticmethod 

117 def send_github_invite( 

118 repo_url: str, 

119 github_username: str, 

120 permission: str = 'push', 

121 ) -> Dict: 

122 """Send GitHub collaborator invite. 

123 

124 Uses gh CLI if available, falls back to HTTP API. 

125 permission: 'pull', 'push', or 'admin' 

126 """ 

127 if not github_username: 

128 return {'invited': False, 'error': 'No GitHub username'} 

129 if not _validate_github_username(github_username): 

130 return {'invited': False, 

131 'error': f'Invalid GitHub username format: {github_username}'} 

132 

133 owner_repo = _extract_owner_repo(repo_url) 

134 if not owner_repo: 

135 return {'invited': False, 

136 'error': f'Cannot parse repo: {repo_url}'} 

137 

138 owner, repo = owner_repo 

139 

140 # Try gh CLI first 

141 try: 

142 result = subprocess.run( 

143 ['gh', 'api', '--method', 'PUT', 

144 f'repos/{owner}/{repo}/collaborators/{github_username}', 

145 '-f', f'permission={permission}'], 

146 capture_output=True, text=True, timeout=30, 

147 **_SUBPROCESS_KW) 

148 if result.returncode == 0: 

149 logger.info( 

150 f"GitHub invite sent: {github_username} → " 

151 f"{owner}/{repo} ({permission})") 

152 return {'invited': True, 'method': 'gh_cli', 

153 'username': github_username} 

154 except FileNotFoundError: 

155 pass # gh CLI not installed, try HTTP 

156 except Exception as e: 

157 logger.debug(f"gh CLI invite failed: {e}") 

158 

159 # Fallback: direct HTTP with PAT 

160 token = _get_github_token() 

161 if not token: 

162 return {'invited': False, 

163 'error': 'No HEVOLVE_GITHUB_TOKEN configured'} 

164 

165 try: 

166 from core.http_pool import pooled_put 

167 resp = pooled_put( 

168 f'https://api.github.com/repos/{owner}/{repo}' 

169 f'/collaborators/{github_username}', 

170 headers={ 

171 'Authorization': f'token {token}', 

172 'Accept': 'application/vnd.github.v3+json', 

173 }, 

174 json={'permission': permission}, 

175 timeout=30, 

176 ) 

177 if resp.status_code in (201, 204): 

178 logger.info( 

179 f"GitHub invite sent via API: {github_username} → " 

180 f"{owner}/{repo}") 

181 return {'invited': True, 'method': 'http_api', 

182 'username': github_username} 

183 return {'invited': False, 'status': resp.status_code, 

184 'error': resp.text[:200]} 

185 except Exception as e: 

186 return {'invited': False, 'error': str(e)} 

187 

188 @staticmethod 

189 def revoke_github_access( 

190 repo_url: str, 

191 github_username: str, 

192 ) -> Dict: 

193 """Revoke GitHub collaborator access.""" 

194 if not github_username: 

195 return {'revoked': False, 'error': 'No GitHub username'} 

196 if not _validate_github_username(github_username): 

197 return {'revoked': False, 

198 'error': f'Invalid GitHub username format: {github_username}'} 

199 

200 owner_repo = _extract_owner_repo(repo_url) 

201 if not owner_repo: 

202 return {'revoked': False, 

203 'error': f'Cannot parse repo: {repo_url}'} 

204 

205 owner, repo = owner_repo 

206 

207 # Try gh CLI first 

208 try: 

209 result = subprocess.run( 

210 ['gh', 'api', '--method', 'DELETE', 

211 f'repos/{owner}/{repo}/collaborators/{github_username}'], 

212 capture_output=True, text=True, timeout=30, 

213 **_SUBPROCESS_KW) 

214 if result.returncode == 0: 

215 logger.info( 

216 f"GitHub access revoked: {github_username} from " 

217 f"{owner}/{repo}") 

218 return {'revoked': True, 'method': 'gh_cli'} 

219 except FileNotFoundError: 

220 pass 

221 except Exception as e: 

222 logger.debug(f"gh CLI revoke failed: {e}") 

223 

224 # Fallback: HTTP API 

225 token = _get_github_token() 

226 if not token: 

227 return {'revoked': False, 

228 'error': 'No HEVOLVE_GITHUB_TOKEN configured'} 

229 

230 try: 

231 from core.http_pool import pooled_delete 

232 resp = pooled_delete( 

233 f'https://api.github.com/repos/{owner}/{repo}' 

234 f'/collaborators/{github_username}', 

235 headers={ 

236 'Authorization': f'token {token}', 

237 'Accept': 'application/vnd.github.v3+json', 

238 }, 

239 timeout=30, 

240 ) 

241 if resp.status_code == 204: 

242 return {'revoked': True, 'method': 'http_api'} 

243 return {'revoked': False, 'status': resp.status_code, 

244 'error': resp.text[:200]} 

245 except Exception as e: 

246 return {'revoked': False, 'error': str(e)} 

247 

248 @staticmethod 

249 def split_repo_task( 

250 task_description: str, 

251 repo_url: str, 

252 target_files: Optional[List[str]] = None, 

253 ) -> List[Dict]: 

254 """Central splits a full-repo task into file-level subtasks. 

255 

256 Regional hosts receive individual file-level subtasks instead of 

257 full repository access. Central coordinates and merges results. 

258 """ 

259 if not target_files: 

260 # Infer target files from task description keywords 

261 target_files = [] 

262 

263 subtasks = [] 

264 for i, fpath in enumerate(target_files): 

265 subtasks.append({ 

266 'subtask_id': i + 1, 

267 'file_path': fpath, 

268 'repo_url': repo_url, 

269 'description': f'{task_description} [file: {fpath}]', 

270 'access_level': 'push', 

271 }) 

272 

273 if not subtasks: 

274 subtasks.append({ 

275 'subtask_id': 1, 

276 'file_path': None, 

277 'repo_url': repo_url, 

278 'description': task_description, 

279 'access_level': 'push', 

280 }) 

281 

282 return subtasks 

283 

284 @staticmethod 

285 def create_file_extract( 

286 repo_url: str, 

287 file_paths: List[str], 

288 ) -> Dict: 

289 """Extract specific files from a repo for delegation. 

290 

291 Regional gets file content, NOT full clone. 

292 Uses gh CLI to fetch individual file contents. 

293 """ 

294 owner_repo = _extract_owner_repo(repo_url) 

295 if not owner_repo: 

296 return {'error': f'Cannot parse repo: {repo_url}'} 

297 

298 owner, repo = owner_repo 

299 files = {} 

300 

301 for fpath in file_paths: 

302 # Sanitize: reject path traversal attempts 

303 if '..' in fpath or fpath.startswith('/') or '\\' in fpath: 

304 files[fpath] = None 

305 logger.warning(f"Path traversal rejected in file extract: {fpath}") 

306 continue 

307 try: 

308 result = subprocess.run( 

309 ['gh', 'api', 

310 f'repos/{owner}/{repo}/contents/{fpath}', 

311 '--jq', '.content'], 

312 capture_output=True, text=True, timeout=30, 

313 **_SUBPROCESS_KW) 

314 if result.returncode == 0 and result.stdout.strip(): 

315 import base64 

316 content = base64.b64decode( 

317 result.stdout.strip()).decode('utf-8', errors='replace') 

318 files[fpath] = content 

319 else: 

320 files[fpath] = None 

321 except Exception as e: 

322 files[fpath] = None 

323 logger.debug(f"File extract failed for {fpath}: {e}") 

324 

325 return {'repo_url': repo_url, 'files': files, 

326 'extracted': sum(1 for v in files.values() if v is not None)} 

327 

328 

329def _extract_owner_repo(repo_url: str) -> Optional[tuple]: 

330 """Extract (owner, repo) from a GitHub URL or owner/repo string.""" 

331 if not repo_url: 

332 return None 

333 

334 # Handle owner/repo format 

335 if '/' in repo_url and '://' not in repo_url: 

336 parts = repo_url.strip('/').split('/') 

337 if len(parts) == 2: 

338 return (parts[0], parts[1].removesuffix('.git')) 

339 

340 # Handle full URL 

341 try: 

342 from urllib.parse import urlparse 

343 parsed = urlparse(repo_url) 

344 path = parsed.path.strip('/').removesuffix('.git') 

345 parts = path.split('/') 

346 if len(parts) >= 2: 

347 return (parts[0], parts[1]) 

348 except Exception: 

349 pass 

350 

351 return None