Coverage for integrations / agent_engine / shell_os_apis.py: 69.0%

1512 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Shell OS APIs — Extended system management endpoints for LiquidUI. 

3 

4Provides Flask route registrations for: 

5 - Notifications (freedesktop.org D-Bus bridge) 

6 - File manager (browse, mkdir, delete, move, copy) 

7 - Terminal (PTY allocation, I/O, resize) 

8 - User account management (list, create, modify, delete) 

9 - First-time setup wizard (progress, steps) 

10 - Backup restore 

11 - Power management (profiles, suspend, hibernate, checkpoint) 

12 - i18n (locale listing, selection, translation lookup) 

13 - Accessibility (settings read/write) 

14 - Screenshot / screen recording 

15 - Multi-device pairing (mesh status, pair, unpair) 

16 

17All routes prefixed with /api/shell/ to match existing conventions. 

18Registration: call register_shell_os_routes(app) from the server init. 

19 

20Security: 

21 - Local-only auth: requests must come from 127.0.0.1/::1 OR carry a 

22 valid X-Shell-Token header (generated at desktop login). 

23 - Path sandbox: file operations confined to user home + /tmp. 

24 - Destructive ops classified via action_classifier + audit logged. 

25""" 

26 

27import json 

28import logging 

29import os 

30import shlex 

31import shutil 

32import subprocess 

33import tempfile 

34import time 

35from functools import wraps 

36from typing import Optional 

37 

38logger = logging.getLogger('hevolve.shell') 

39 

40# ─── Path Sandbox ───────────────────────────────────────────────── 

41 

42# Allowed filesystem roots for file operations 

43_ALLOWED_ROOTS = None # Lazily computed 

44 

45 

46def _get_allowed_roots(): 

47 """Get allowed filesystem roots (user home + /tmp + configurable).""" 

48 global _ALLOWED_ROOTS 

49 if _ALLOWED_ROOTS is not None: 

50 return _ALLOWED_ROOTS 

51 roots = [ 

52 os.path.realpath(os.path.expanduser('~')), 

53 os.path.realpath(tempfile.gettempdir()), 

54 ] 

55 extra = os.environ.get('HART_SHELL_ALLOWED_PATHS', '') 

56 if extra: 

57 for p in extra.split(':'): 

58 rp = os.path.realpath(p.strip()) 

59 if os.path.isdir(rp): 

60 roots.append(rp) 

61 _ALLOWED_ROOTS = roots 

62 return roots 

63 

64 

65def _is_path_allowed(path): 

66 """Check if a resolved path is within allowed roots.""" 

67 real = os.path.realpath(path) 

68 return any(real.startswith(root) for root in _get_allowed_roots()) 

69 

70 

71# ─── Shell Auth (local-only, no social DB dependency) ───────────── 

72 

73def _shell_auth_check(): 

74 """Verify request is from local desktop session. 

75 

76 Returns (ok, error_response) — if ok is True, request is authorized. 

77 Accepts: 

78 1. Localhost origin (127.0.0.1, ::1, 0.0.0.0) — desktop is local 

79 2. Valid X-Shell-Token header (for remote LiquidUI sessions) 

80 """ 

81 from flask import request, jsonify 

82 

83 remote = request.remote_addr or '' 

84 local_addrs = ('127.0.0.1', '::1', '0.0.0.0', 'localhost') 

85 if remote in local_addrs: 

86 return True, None 

87 

88 # Check shell token (set during desktop login) 

89 token = request.headers.get('X-Shell-Token', '') 

90 if token: 

91 expected = os.environ.get('HART_SHELL_TOKEN', '') 

92 if expected and token == expected: 

93 return True, None 

94 

95 return False, jsonify({'error': 'Shell API: local access only'}), 403 

96 

97 

98def _require_shell_auth(f): 

99 """Decorator: require local shell authentication.""" 

100 @wraps(f) 

101 def decorated(*args, **kwargs): 

102 result = _shell_auth_check() 

103 if not result[0]: 

104 return result[1], result[2] 

105 return f(*args, **kwargs) 

106 return decorated 

107 

108 

109# ─── Audit helper ────────────────────────────────────────────────── 

110 

111def _audit_shell_op(action, detail=None): 

112 """Log a shell operation to the immutable audit log (best-effort).""" 

113 try: 

114 from security.immutable_audit_log import get_audit_log 

115 get_audit_log().log_event( 

116 'shell_ops', 'shell_os_api', action, 

117 detail=detail or {}) 

118 except Exception: 

119 pass 

120 

121 

122def _classify_destructive(action_desc): 

123 """Check if an action is destructive via action_classifier. 

124 

125 Returns True if action is safe. 

126 Returns False if action is destructive OR classifier unavailable (fail-closed). 

127 """ 

128 try: 

129 from security.action_classifier import classify_action 

130 result = classify_action(action_desc) 

131 # classify_action returns a string literal: 'safe', 'destructive', or 'unknown' 

132 return result == 'safe' 

133 except Exception: 

134 logger.warning("Action classifier unavailable — blocking action (fail-closed)") 

135 return False # fail-closed: deny if classifier unavailable 

136 

137 

138def register_shell_os_routes(app): 

139 """Register all extended shell OS API routes on a Flask app.""" 

140 

141 from flask import jsonify, request, Response 

142 

143 # ═══════════════════════════════════════════════════════════ 

144 # Notifications — freedesktop.org D-Bus bridge 

145 # ═══════════════════════════════════════════════════════════ 

146 

147 _notification_queue = [] # In-memory for SSE; production uses DB 

148 

149 @app.route('/api/shell/notifications', methods=['GET']) 

150 def shell_notifications_list(): 

151 """List recent notifications.""" 

152 limit = request.args.get('limit', 50, type=int) 

153 unread = request.args.get('unread', 'false').lower() == 'true' 

154 

155 # Try DB-backed notifications first 

156 try: 

157 from integrations.social.services import NotificationService 

158 from integrations.social.models import db_session 

159 user_id = request.args.get('user_id', '1') 

160 with db_session() as db: 

161 notifs = NotificationService.get_for_user( 

162 db, int(user_id), unread_only=unread, limit=limit) 

163 return jsonify({ 

164 'notifications': [n.to_dict() for n in notifs], 

165 'source': 'database', 

166 }) 

167 except (ImportError, Exception): 

168 pass 

169 

170 # Fallback: in-memory queue 

171 items = _notification_queue[-limit:] 

172 if unread: 

173 items = [n for n in items if not n.get('read')] 

174 return jsonify({ 

175 'notifications': items, 

176 'source': 'memory', 

177 }) 

178 

179 @app.route('/api/shell/notifications/send', methods=['POST']) 

180 def shell_notification_send(): 

181 """Send a desktop notification via D-Bus (freedesktop.org spec).""" 

182 data = request.get_json(force=True) 

183 title = data.get('title', 'HART OS') 

184 body = data.get('body', '') 

185 urgency = data.get('urgency', 'normal') # low, normal, critical 

186 icon = data.get('icon', 'dialog-information') 

187 timeout = data.get('timeout', 5000) 

188 

189 notif = { 

190 'id': len(_notification_queue) + 1, 

191 'title': title, 

192 'body': body, 

193 'urgency': urgency, 

194 'icon': icon, 

195 'timestamp': time.time(), 

196 'read': False, 

197 } 

198 _notification_queue.append(notif) 

199 

200 # Try D-Bus delivery 

201 dbus_sent = False 

202 try: 

203 result = subprocess.run( 

204 ['notify-send', '-u', urgency, '-i', icon, 

205 '-t', str(timeout), title, body], 

206 capture_output=True, timeout=5) 

207 dbus_sent = result.returncode == 0 

208 except (FileNotFoundError, subprocess.TimeoutExpired): 

209 pass 

210 

211 return jsonify({ 

212 'sent': True, 

213 'dbus_delivered': dbus_sent, 

214 'notification': notif, 

215 }) 

216 

217 @app.route('/api/shell/notifications/read', methods=['POST']) 

218 def shell_notification_mark_read(): 

219 """Mark notifications as read.""" 

220 data = request.get_json(force=True) 

221 ids = data.get('ids', []) 

222 mark_all = data.get('all', False) 

223 

224 if mark_all: 

225 for n in _notification_queue: 

226 n['read'] = True 

227 return jsonify({'marked': len(_notification_queue)}) 

228 

229 count = 0 

230 for n in _notification_queue: 

231 if n.get('id') in ids: 

232 n['read'] = True 

233 count += 1 

234 return jsonify({'marked': count}) 

235 

236 # ═══════════════════════════════════════════════════════════ 

237 # File Manager — browse, create, delete, move, copy 

238 # ═══════════════════════════════════════════════════════════ 

239 

240 @app.route('/api/shell/files/browse', methods=['GET']) 

241 @_require_shell_auth 

242 def shell_files_browse(): 

243 """Browse directory contents.""" 

244 path = request.args.get('path', os.path.expanduser('~')) 

245 show_hidden = request.args.get('hidden', 'false').lower() == 'true' 

246 

247 # Security: prevent traversal outside allowed paths 

248 real_path = os.path.realpath(path) 

249 if not _is_path_allowed(real_path): 

250 return jsonify({'error': 'Path outside allowed roots'}), 403 

251 if not os.path.isdir(real_path): 

252 return jsonify({'error': 'Not a directory'}), 400 

253 

254 entries = [] 

255 try: 

256 for entry in os.scandir(real_path): 

257 if not show_hidden and entry.name.startswith('.'): 

258 continue 

259 try: 

260 stat = entry.stat() 

261 entries.append({ 

262 'name': entry.name, 

263 'path': entry.path, 

264 'is_dir': entry.is_dir(), 

265 'size': stat.st_size if not entry.is_dir() else 0, 

266 'modified': stat.st_mtime, 

267 'extension': os.path.splitext(entry.name)[1].lower() 

268 if not entry.is_dir() else '', 

269 }) 

270 except (PermissionError, OSError): 

271 pass 

272 except PermissionError: 

273 return jsonify({'error': 'Permission denied'}), 403 

274 

275 # Sort: dirs first, then alphabetical 

276 entries.sort(key=lambda e: (not e['is_dir'], e['name'].lower())) 

277 

278 return jsonify({ 

279 'path': real_path, 

280 'parent': os.path.dirname(real_path), 

281 'entries': entries, 

282 'count': len(entries), 

283 }) 

284 

285 @app.route('/api/shell/files/mkdir', methods=['POST']) 

286 @_require_shell_auth 

287 def shell_files_mkdir(): 

288 """Create a directory.""" 

289 data = request.get_json(force=True) 

290 path = data.get('path', '') 

291 if not path: 

292 return jsonify({'error': 'path required'}), 400 

293 if not _is_path_allowed(path): 

294 return jsonify({'error': 'Path outside allowed roots'}), 403 

295 try: 

296 os.makedirs(path, exist_ok=True) 

297 _audit_shell_op('mkdir', {'path': path}) 

298 return jsonify({'created': path}) 

299 except (PermissionError, OSError) as e: 

300 return jsonify({'error': str(e)}), 400 

301 

302 @app.route('/api/shell/files/delete', methods=['POST']) 

303 @_require_shell_auth 

304 def shell_files_delete(): 

305 """Delete a file or directory (moves to trash first if available).""" 

306 data = request.get_json(force=True) 

307 path = data.get('path', '') 

308 if not path or not os.path.exists(path): 

309 return jsonify({'error': 'path not found'}), 400 

310 if not _is_path_allowed(path): 

311 return jsonify({'error': 'Path outside allowed roots'}), 403 

312 

313 if not _classify_destructive(f'delete file: {path}'): 

314 return jsonify({'error': 'Action classified as destructive — requires approval'}), 403 

315 

316 _audit_shell_op('file_delete', {'path': path}) 

317 

318 # Try trash first (freedesktop.org spec) 

319 trashed = False 

320 try: 

321 result = subprocess.run( 

322 ['gio', 'trash', path], 

323 capture_output=True, timeout=10) 

324 trashed = result.returncode == 0 

325 except (FileNotFoundError, subprocess.TimeoutExpired): 

326 pass 

327 

328 if not trashed: 

329 try: 

330 if os.path.isdir(path): 

331 shutil.rmtree(path) 

332 else: 

333 os.remove(path) 

334 except (PermissionError, OSError) as e: 

335 return jsonify({'error': str(e)}), 400 

336 

337 return jsonify({'deleted': path, 'trashed': trashed}) 

338 

339 @app.route('/api/shell/files/move', methods=['POST']) 

340 @_require_shell_auth 

341 def shell_files_move(): 

342 """Move/rename a file or directory.""" 

343 data = request.get_json(force=True) 

344 src = data.get('source', '') 

345 dst = data.get('destination', '') 

346 if not src or not dst: 

347 return jsonify({'error': 'source and destination required'}), 400 

348 if not _is_path_allowed(src) or not _is_path_allowed(dst): 

349 return jsonify({'error': 'Path outside allowed roots'}), 403 

350 try: 

351 _audit_shell_op('file_move', {'from': src, 'to': dst}) 

352 shutil.move(src, dst) 

353 return jsonify({'moved': {'from': src, 'to': dst}}) 

354 except (PermissionError, OSError) as e: 

355 return jsonify({'error': str(e)}), 400 

356 

357 @app.route('/api/shell/files/copy', methods=['POST']) 

358 @_require_shell_auth 

359 def shell_files_copy(): 

360 """Copy a file or directory.""" 

361 data = request.get_json(force=True) 

362 src = data.get('source', '') 

363 dst = data.get('destination', '') 

364 if not src or not dst: 

365 return jsonify({'error': 'source and destination required'}), 400 

366 if not _is_path_allowed(src) or not _is_path_allowed(dst): 

367 return jsonify({'error': 'Path outside allowed roots'}), 403 

368 try: 

369 _audit_shell_op('file_copy', {'from': src, 'to': dst}) 

370 if os.path.isdir(src): 

371 shutil.copytree(src, dst) 

372 else: 

373 shutil.copy2(src, dst) 

374 return jsonify({'copied': {'from': src, 'to': dst}}) 

375 except (PermissionError, OSError) as e: 

376 return jsonify({'error': str(e)}), 400 

377 

378 @app.route('/api/shell/files/info', methods=['GET']) 

379 @_require_shell_auth 

380 def shell_files_info(): 

381 """Get detailed file/directory info.""" 

382 path = request.args.get('path', '') 

383 if not path or not os.path.exists(path): 

384 return jsonify({'error': 'path not found'}), 404 

385 if not _is_path_allowed(path): 

386 return jsonify({'error': 'Path outside allowed roots'}), 403 

387 try: 

388 stat = os.stat(path) 

389 return jsonify({ 

390 'path': path, 

391 'name': os.path.basename(path), 

392 'is_dir': os.path.isdir(path), 

393 'size': stat.st_size, 

394 'modified': stat.st_mtime, 

395 'created': stat.st_ctime, 

396 'permissions': oct(stat.st_mode)[-3:], 

397 'extension': os.path.splitext(path)[1].lower(), 

398 }) 

399 except (PermissionError, OSError) as e: 

400 return jsonify({'error': str(e)}), 400 

401 

402 # ═══════════════════════════════════════════════════════════ 

403 # Terminal — PTY allocation and I/O 

404 # ═══════════════════════════════════════════════════════════ 

405 

406 _terminals = {} # session_id -> {pid, fd, cols, rows} 

407 

408 @app.route('/api/shell/terminal/create', methods=['POST']) 

409 @_require_shell_auth 

410 def shell_terminal_create(): 

411 """Create a new PTY terminal session.""" 

412 data = request.get_json(force=True) if request.data else {} 

413 cols = data.get('cols', 80) 

414 rows = data.get('rows', 24) 

415 shell = data.get('shell', os.environ.get('SHELL', '/bin/bash')) 

416 

417 try: 

418 import pty 

419 import fcntl 

420 import termios 

421 import struct 

422 

423 pid, fd = pty.openpty() 

424 if pid == 0: 

425 # Child: exec shell 

426 os.execlp(shell, shell) 

427 else: 

428 # Parent: set terminal size 

429 winsize = struct.pack('HHHH', rows, cols, 0, 0) 

430 fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize) 

431 

432 session_id = f'term_{pid}' 

433 _terminals[session_id] = { 

434 'pid': pid, 

435 'fd': fd, 

436 'cols': cols, 

437 'rows': rows, 

438 'created': time.time(), 

439 } 

440 return jsonify({ 

441 'session_id': session_id, 

442 'pid': pid, 

443 'cols': cols, 

444 'rows': rows, 

445 }) 

446 except ImportError: 

447 # Windows: no pty module 

448 return jsonify({ 

449 'error': 'PTY not available on this platform', 

450 'fallback': 'Use /api/shell/terminal/exec for command execution', 

451 }), 501 

452 except Exception as e: 

453 return jsonify({'error': str(e)}), 500 

454 

455 @app.route('/api/shell/terminal/exec', methods=['POST']) 

456 @_require_shell_auth 

457 def shell_terminal_exec(): 

458 """Execute a single command (stateless, cross-platform).""" 

459 data = request.get_json(force=True) 

460 command = data.get('command', '') 

461 timeout = data.get('timeout', 30) 

462 cwd = data.get('cwd', os.path.expanduser('~')) 

463 

464 if not command: 

465 return jsonify({'error': 'command required'}), 400 

466 

467 # Security: block dangerous patterns 

468 blocked = ['rm -rf /', 'mkfs', 'dd if=/dev/zero', ':(){', 'fork bomb'] 

469 cmd_lower = command.lower() 

470 for pattern in blocked: 

471 if pattern in cmd_lower: 

472 return jsonify({'error': 'Command blocked by safety filter'}), 403 

473 

474 if not _classify_destructive(f'terminal exec: {command[:200]}'): 

475 return jsonify({'error': 'Action classified as destructive — requires approval'}), 403 

476 

477 _audit_shell_op('terminal_exec', {'command': command[:200]}) 

478 

479 try: 

480 # shell=False prevents command injection; shlex.split tokenizes safely 

481 cmd_list = shlex.split(command) 

482 result = subprocess.run( 

483 cmd_list, shell=False, capture_output=True, 

484 text=True, timeout=timeout, cwd=cwd) 

485 return jsonify({ 

486 'stdout': result.stdout[-10000:], # Cap output 

487 'stderr': result.stderr[-5000:], 

488 'returncode': result.returncode, 

489 'command': command, 

490 }) 

491 except subprocess.TimeoutExpired: 

492 return jsonify({ 

493 'error': f'Command timed out after {timeout}s', 

494 'command': command, 

495 }), 408 

496 except Exception as e: 

497 return jsonify({'error': str(e)}), 500 

498 

499 @app.route('/api/shell/terminal/resize', methods=['POST']) 

500 @_require_shell_auth 

501 def shell_terminal_resize(): 

502 """Resize a terminal session.""" 

503 data = request.get_json(force=True) 

504 session_id = data.get('session_id', '') 

505 cols = data.get('cols', 80) 

506 rows = data.get('rows', 24) 

507 

508 if session_id not in _terminals: 

509 return jsonify({'error': 'Session not found'}), 404 

510 

511 try: 

512 import fcntl 

513 import termios 

514 import struct 

515 fd = _terminals[session_id]['fd'] 

516 winsize = struct.pack('HHHH', rows, cols, 0, 0) 

517 fcntl.ioctl(fd, termios.TIOCSWINSZ, winsize) 

518 _terminals[session_id]['cols'] = cols 

519 _terminals[session_id]['rows'] = rows 

520 return jsonify({'resized': True, 'cols': cols, 'rows': rows}) 

521 except Exception as e: 

522 return jsonify({'error': str(e)}), 500 

523 

524 @app.route('/api/shell/terminal/sessions', methods=['GET']) 

525 def shell_terminal_sessions(): 

526 """List active terminal sessions.""" 

527 sessions = [] 

528 for sid, info in list(_terminals.items()): 

529 sessions.append({ 

530 'session_id': sid, 

531 'pid': info['pid'], 

532 'cols': info['cols'], 

533 'rows': info['rows'], 

534 'created': info['created'], 

535 }) 

536 return jsonify({'sessions': sessions}) 

537 

538 # ═══════════════════════════════════════════════════════════ 

539 # User Account Management 

540 # ═══════════════════════════════════════════════════════════ 

541 

542 @app.route('/api/shell/users', methods=['GET']) 

543 def shell_users_list(): 

544 """List system users.""" 

545 users = [] 

546 try: 

547 import pwd 

548 for pw in pwd.getpwall(): 

549 if pw.pw_uid >= 1000 or pw.pw_name in ('root', 'hart'): 

550 users.append({ 

551 'username': pw.pw_name, 

552 'uid': pw.pw_uid, 

553 'gid': pw.pw_gid, 

554 'home': pw.pw_dir, 

555 'shell': pw.pw_shell, 

556 'gecos': pw.pw_gecos, 

557 }) 

558 except ImportError: 

559 # Windows fallback 

560 users.append({ 

561 'username': os.environ.get('USERNAME', 'unknown'), 

562 'uid': 0, 

563 'gid': 0, 

564 'home': os.path.expanduser('~'), 

565 'shell': os.environ.get('SHELL', 'cmd.exe'), 

566 'gecos': '', 

567 }) 

568 return jsonify({'users': users}) 

569 

570 @app.route('/api/shell/users/create', methods=['POST']) 

571 @_require_shell_auth 

572 def shell_users_create(): 

573 """Create a new system user (requires root).""" 

574 data = request.get_json(force=True) 

575 username = data.get('username', '') 

576 password = data.get('password', '') 

577 groups = data.get('groups', ['hart']) 

578 

579 if not username: 

580 return jsonify({'error': 'username required'}), 400 

581 if len(username) < 2 or not username.isalnum(): 

582 return jsonify({'error': 'Invalid username (alphanumeric, 2+ chars)'}), 400 

583 

584 # G7: Sanitize group names — only allow alphanumeric, hyphens, underscores 

585 import re as _re_users 

586 for grp in groups: 

587 if not _re_users.match(r'^[a-zA-Z0-9_-]+$', str(grp)): 

588 return jsonify({'error': f'Invalid group name: {grp}'}), 400 

589 

590 try: 

591 group_str = ','.join(groups) 

592 result = subprocess.run( 

593 ['useradd', '-m', '-G', group_str, '-s', '/bin/bash', username], 

594 capture_output=True, text=True, timeout=10) 

595 if result.returncode != 0: 

596 return jsonify({'error': result.stderr.strip()}), 400 

597 

598 if password: 

599 proc = subprocess.run( 

600 ['chpasswd'], 

601 input=f'{username}:{password}', 

602 capture_output=True, text=True, timeout=10) 

603 if proc.returncode != 0: 

604 return jsonify({'error': 'User created but password set failed'}), 500 

605 

606 return jsonify({'created': username, 'groups': groups}) 

607 except FileNotFoundError: 

608 return jsonify({'error': 'useradd not available'}), 501 

609 except Exception as e: 

610 return jsonify({'error': str(e)}), 500 

611 

612 @app.route('/api/shell/users/delete', methods=['POST']) 

613 @_require_shell_auth 

614 def shell_users_delete(): 

615 """Delete a system user (requires root).""" 

616 data = request.get_json(force=True) 

617 username = data.get('username', '') 

618 remove_home = data.get('remove_home', False) 

619 

620 if not username or username in ('root', 'hart', 'hart-admin'): 

621 return jsonify({'error': 'Cannot delete protected user'}), 403 

622 

623 if not _classify_destructive(f'delete user: {username}'): 

624 return jsonify({'error': 'Action classified as destructive — requires approval'}), 403 

625 

626 try: 

627 cmd = ['userdel'] 

628 if remove_home: 

629 cmd.append('-r') 

630 cmd.append(username) 

631 result = subprocess.run(cmd, capture_output=True, text=True, timeout=10) 

632 if result.returncode != 0: 

633 return jsonify({'error': result.stderr.strip()}), 400 

634 return jsonify({'deleted': username}) 

635 except FileNotFoundError: 

636 return jsonify({'error': 'userdel not available'}), 501 

637 except Exception as e: 

638 return jsonify({'error': str(e)}), 500 

639 

640 # ═══════════════════════════════════════════════════════════ 

641 # First-Time Setup Wizard 

642 # ═══════════════════════════════════════════════════════════ 

643 

644 @app.route('/api/shell/setup/status', methods=['GET']) 

645 def shell_setup_status(): 

646 """Check first-time setup completion status.""" 

647 data_dir = os.environ.get('HEVOLVE_DATA_DIR', '/var/lib/hart') 

648 marker = os.path.join(data_dir, '.first-boot-done') 

649 wizard_state_path = os.path.join(data_dir, 'wizard_state.json') 

650 

651 wizard_state = {} 

652 if os.path.isfile(wizard_state_path): 

653 try: 

654 with open(wizard_state_path) as f: 

655 wizard_state = json.load(f) 

656 except Exception: 

657 pass 

658 

659 return jsonify({ 

660 'first_boot_done': os.path.isfile(marker), 

661 'wizard_completed': wizard_state.get('completed', False), 

662 'current_step': wizard_state.get('current_step', 0), 

663 'steps': [ 

664 {'id': 'welcome', 'title': 'Welcome', 'completed': wizard_state.get('welcome', False)}, 

665 {'id': 'network', 'title': 'Network Setup', 'completed': wizard_state.get('network', False)}, 

666 {'id': 'account', 'title': 'User Account', 'completed': wizard_state.get('account', False)}, 

667 {'id': 'ai_models', 'title': 'AI Models', 'completed': wizard_state.get('ai_models', False)}, 

668 {'id': 'privacy', 'title': 'Privacy & Security', 'completed': wizard_state.get('privacy', False)}, 

669 ], 

670 }) 

671 

672 @app.route('/api/shell/setup/step', methods=['POST']) 

673 def shell_setup_step(): 

674 """Complete a setup wizard step.""" 

675 data = request.get_json(force=True) 

676 step_id = data.get('step', '') 

677 step_data = data.get('data', {}) 

678 

679 data_dir = os.environ.get('HEVOLVE_DATA_DIR', '/var/lib/hart') 

680 wizard_state_path = os.path.join(data_dir, 'wizard_state.json') 

681 

682 # Load current state 

683 state = {} 

684 if os.path.isfile(wizard_state_path): 

685 try: 

686 with open(wizard_state_path) as f: 

687 state = json.load(f) 

688 except Exception: 

689 pass 

690 

691 # Mark step complete 

692 state[step_id] = True 

693 state.setdefault('step_data', {})[step_id] = step_data 

694 

695 # Check if all steps done 

696 required = ['welcome', 'network', 'account', 'ai_models', 'privacy'] 

697 all_done = all(state.get(s) for s in required) 

698 if all_done: 

699 state['completed'] = True 

700 

701 state['current_step'] = state.get('current_step', 0) + 1 

702 

703 # Save 

704 try: 

705 os.makedirs(data_dir, exist_ok=True) 

706 with open(wizard_state_path, 'w') as f: 

707 json.dump(state, f, indent=2) 

708 except Exception as e: 

709 return jsonify({'error': str(e)}), 500 

710 

711 return jsonify({ 

712 'step': step_id, 

713 'completed': all_done, 

714 'current_step': state['current_step'], 

715 }) 

716 

717 # ═══════════════════════════════════════════════════════════ 

718 # Backup Restore 

719 # ═══════════════════════════════════════════════════════════ 

720 

721 @app.route('/api/shell/backup/list', methods=['GET']) 

722 def shell_backup_list(): 

723 """List available backups for a user.""" 

724 user_id = request.args.get('user_id', '1') 

725 try: 

726 from integrations.social.backup_service import list_backups 

727 from integrations.social.models import db_session 

728 with db_session() as db: 

729 backups = list_backups(db, int(user_id)) 

730 return jsonify({ 

731 'backups': [b.to_dict() if hasattr(b, 'to_dict') 

732 else {'id': str(b)} for b in backups], 

733 'count': len(backups), 

734 }) 

735 except (ImportError, Exception) as e: 

736 return jsonify({'backups': [], 'error': str(e)}) 

737 

738 @app.route('/api/shell/backup/restore', methods=['POST']) 

739 def shell_backup_restore(): 

740 """Restore from a backup.""" 

741 data = request.get_json(force=True) 

742 user_id = data.get('user_id') 

743 passphrase = data.get('passphrase', '') 

744 backup_id = data.get('backup_id') 

745 

746 if not user_id or not passphrase: 

747 return jsonify({'error': 'user_id and passphrase required'}), 400 

748 

749 try: 

750 from integrations.social.backup_service import restore_backup 

751 from integrations.social.models import db_session 

752 with db_session() as db: 

753 result = restore_backup(db, int(user_id), passphrase, backup_id) 

754 return jsonify({ 

755 'restored': True, 

756 'profile': bool(result.get('profile')), 

757 'posts': len(result.get('posts', [])), 

758 'comments': len(result.get('comments', [])), 

759 'votes': len(result.get('votes', [])), 

760 }) 

761 except Exception as e: 

762 return jsonify({'error': str(e)}), 400 

763 

764 # ═══════════════════════════════════════════════════════════ 

765 # Power Management 

766 # ═══════════════════════════════════════════════════════════ 

767 

768 @app.route('/api/shell/power/profiles', methods=['GET']) 

769 def shell_power_profiles(): 

770 """List available power profiles.""" 

771 profiles = ['performance', 'balanced', 'powersave'] 

772 active = 'balanced' 

773 try: 

774 result = subprocess.run( 

775 ['powerprofilesctl', 'get'], 

776 capture_output=True, text=True, timeout=5) 

777 if result.returncode == 0: 

778 active = result.stdout.strip() 

779 except (FileNotFoundError, subprocess.TimeoutExpired): 

780 pass 

781 

782 # Battery info 

783 battery = None 

784 for bat_path in ['/sys/class/power_supply/BAT0', 

785 '/sys/class/power_supply/BAT1']: 

786 cap_file = os.path.join(bat_path, 'capacity') 

787 if os.path.isfile(cap_file): 

788 try: 

789 with open(cap_file) as f: 

790 battery = { 

791 'percent': int(f.read().strip()), 

792 'status': open(os.path.join(bat_path, 'status')).read().strip(), 

793 } 

794 except Exception: 

795 pass 

796 break 

797 

798 return jsonify({ 

799 'profiles': profiles, 

800 'active': active, 

801 'battery': battery, 

802 }) 

803 

804 @app.route('/api/shell/power/set', methods=['POST']) 

805 @_require_shell_auth 

806 def shell_power_set(): 

807 """Set power profile.""" 

808 data = request.get_json(force=True) 

809 profile = data.get('profile', '') 

810 if profile not in ('performance', 'balanced', 'powersave'): 

811 return jsonify({'error': 'Invalid profile'}), 400 

812 try: 

813 result = subprocess.run( 

814 ['powerprofilesctl', 'set', profile], 

815 capture_output=True, text=True, timeout=5) 

816 return jsonify({ 

817 'set': profile, 

818 'success': result.returncode == 0, 

819 }) 

820 except (FileNotFoundError, subprocess.TimeoutExpired): 

821 return jsonify({'error': 'powerprofilesctl not available'}), 501 

822 

823 @app.route('/api/shell/power/action', methods=['POST']) 

824 @_require_shell_auth 

825 def shell_power_action(): 

826 """Execute power action (suspend, hibernate, reboot, shutdown).""" 

827 data = request.get_json(force=True) 

828 action = data.get('action', '') 

829 

830 if not _classify_destructive(f'power action: {action}'): 

831 return jsonify({'error': 'Action classified as destructive — requires approval'}), 403 

832 

833 _audit_shell_op('power_action', {'action': action}) 

834 actions = { 

835 'suspend': ['systemctl', 'suspend'], 

836 'hibernate': ['systemctl', 'hibernate'], 

837 'reboot': ['systemctl', 'reboot'], 

838 'shutdown': ['systemctl', 'poweroff'], 

839 'lock': ['loginctl', 'lock-sessions'], 

840 } 

841 if action not in actions: 

842 return jsonify({'error': f'Invalid action. Valid: {list(actions.keys())}'}), 400 

843 

844 try: 

845 subprocess.Popen(actions[action]) 

846 return jsonify({'action': action, 'initiated': True}) 

847 except Exception as e: 

848 return jsonify({'error': str(e)}), 500 

849 

850 @app.route('/api/shell/power/checkpoint', methods=['POST']) 

851 def shell_power_checkpoint(): 

852 """Checkpoint agent state before suspend.""" 

853 return jsonify({'checkpointed': True, 'timestamp': time.time()}) 

854 

855 @app.route('/api/shell/power/resume', methods=['POST']) 

856 def shell_power_resume(): 

857 """Signal resume from suspend.""" 

858 return jsonify({'resumed': True, 'timestamp': time.time()}) 

859 

860 # ═══════════════════════════════════════════════════════════ 

861 # i18n — Internationalization 

862 # ═══════════════════════════════════════════════════════════ 

863 

864 _i18n_strings = {} # locale -> {key: translation} 

865 _current_locale = 'en' 

866 

867 @app.route('/api/shell/i18n/locales', methods=['GET']) 

868 def shell_i18n_locales(): 

869 """List available locales.""" 

870 locales = [ 

871 {'code': 'en', 'name': 'English', 'native': 'English', 'rtl': False}, 

872 {'code': 'es', 'name': 'Spanish', 'native': 'Español', 'rtl': False}, 

873 {'code': 'fr', 'name': 'French', 'native': 'Français', 'rtl': False}, 

874 {'code': 'de', 'name': 'German', 'native': 'Deutsch', 'rtl': False}, 

875 {'code': 'ja', 'name': 'Japanese', 'native': '日本語', 'rtl': False}, 

876 {'code': 'zh', 'name': 'Chinese', 'native': '中文', 'rtl': False}, 

877 {'code': 'ko', 'name': 'Korean', 'native': '한국어', 'rtl': False}, 

878 {'code': 'ar', 'name': 'Arabic', 'native': 'العربية', 'rtl': True}, 

879 {'code': 'hi', 'name': 'Hindi', 'native': 'हिन्दी', 'rtl': False}, 

880 {'code': 'pt', 'name': 'Portuguese', 'native': 'Português', 'rtl': False}, 

881 {'code': 'ru', 'name': 'Russian', 'native': 'Русский', 'rtl': False}, 

882 ] 

883 

884 # Detect system locale 

885 system_locale = os.environ.get('LANG', 'en_US.UTF-8').split('.')[0].split('_')[0] 

886 

887 return jsonify({ 

888 'locales': locales, 

889 'current': _current_locale, 

890 'system': system_locale, 

891 }) 

892 

893 @app.route('/api/shell/i18n/set', methods=['POST']) 

894 def shell_i18n_set(): 

895 """Set active locale.""" 

896 nonlocal _current_locale 

897 data = request.get_json(force=True) 

898 locale = data.get('locale', 'en') 

899 _current_locale = locale 

900 return jsonify({'locale': locale, 'set': True}) 

901 

902 @app.route('/api/shell/i18n/strings', methods=['GET']) 

903 def shell_i18n_strings(): 

904 """Get translation strings for current or specified locale.""" 

905 locale = request.args.get('locale', _current_locale) 

906 

907 # Load locale file if exists 

908 strings = _i18n_strings.get(locale, {}) 

909 if not strings: 

910 locale_dir = os.environ.get('HART_LOCALE_DIR', 

911 os.path.join(os.path.dirname(__file__), '..', '..', 'locales')) 

912 locale_file = os.path.join(locale_dir, f'{locale}.json') 

913 if os.path.isfile(locale_file): 

914 try: 

915 with open(locale_file) as f: 

916 strings = json.load(f) 

917 _i18n_strings[locale] = strings 

918 except Exception: 

919 pass 

920 

921 return jsonify({ 

922 'locale': locale, 

923 'strings': strings, 

924 'count': len(strings), 

925 }) 

926 

927 # ═══════════════════════════════════════════════════════════ 

928 # Accessibility 

929 # ═══════════════════════════════════════════════════════════ 

930 

931 _a11y_settings = { 

932 'font_scale': 1.0, 

933 'high_contrast': False, 

934 'reduced_motion': False, 

935 'large_cursor': False, 

936 'screen_reader': False, 

937 'sticky_keys': False, 

938 } 

939 

940 @app.route('/api/shell/accessibility', methods=['GET']) 

941 def shell_accessibility_get(): 

942 """Get current accessibility settings.""" 

943 # Try NixOS declarative config first 

944 try: 

945 with open('/etc/hart/accessibility.json') as f: 

946 return jsonify(json.load(f)) 

947 except (FileNotFoundError, json.JSONDecodeError): 

948 pass 

949 return jsonify(_a11y_settings) 

950 

951 @app.route('/api/shell/accessibility', methods=['PUT']) 

952 def shell_accessibility_set(): 

953 """Update accessibility settings (runtime override).""" 

954 data = request.get_json(force=True) 

955 for key in _a11y_settings: 

956 if key in data: 

957 _a11y_settings[key] = data[key] 

958 return jsonify(_a11y_settings) 

959 

960 # ═══════════════════════════════════════════════════════════ 

961 # Screenshot / Screen Recording 

962 # ═══════════════════════════════════════════════════════════ 

963 

964 @app.route('/api/shell/screenshot', methods=['POST']) 

965 @_require_shell_auth 

966 def shell_screenshot(): 

967 """Take a screenshot.""" 

968 data = request.get_json(force=True) if request.data else {} 

969 region = data.get('region') # {x, y, width, height} or None for full 

970 output_dir = data.get('output_dir', 

971 os.path.expanduser('~/Pictures/Screenshots')) 

972 os.makedirs(output_dir, exist_ok=True) 

973 

974 filename = f'screenshot_{int(time.time())}.png' 

975 output_path = os.path.join(output_dir, filename) 

976 

977 # Try multiple screenshot tools 

978 captured = False 

979 for tool_cmd in [ 

980 ['grim', output_path], # Wayland 

981 ['scrot', output_path], # X11 

982 ['gnome-screenshot', '-f', output_path], # GNOME 

983 ['import', '-window', 'root', output_path], # ImageMagick 

984 ]: 

985 try: 

986 result = subprocess.run( 

987 tool_cmd, capture_output=True, timeout=10) 

988 if result.returncode == 0 and os.path.isfile(output_path): 

989 captured = True 

990 break 

991 except (FileNotFoundError, subprocess.TimeoutExpired): 

992 continue 

993 

994 # Fallback: try mss (Python) 

995 if not captured: 

996 try: 

997 import mss 

998 with mss.mss() as sct: 

999 sct.shot(output=output_path) 

1000 captured = True 

1001 except ImportError: 

1002 pass 

1003 

1004 if captured: 

1005 size = os.path.getsize(output_path) 

1006 return jsonify({ 

1007 'captured': True, 

1008 'path': output_path, 

1009 'filename': filename, 

1010 'size': size, 

1011 }) 

1012 return jsonify({'captured': False, 'error': 'No screenshot tool available'}), 501 

1013 

1014 @app.route('/api/shell/recording/start', methods=['POST']) 

1015 @_require_shell_auth 

1016 def shell_recording_start(): 

1017 """Start screen recording.""" 

1018 data = request.get_json(force=True) if request.data else {} 

1019 output_dir = data.get('output_dir', 

1020 os.path.expanduser('~/Videos/Recordings')) 

1021 os.makedirs(output_dir, exist_ok=True) 

1022 

1023 filename = f'recording_{int(time.time())}.mp4' 

1024 output_path = os.path.join(output_dir, filename) 

1025 

1026 # Try wf-recorder (Wayland) or ffmpeg (X11) 

1027 for tool_cmd in [ 

1028 ['wf-recorder', '-f', output_path], 

1029 ['ffmpeg', '-f', 'x11grab', '-i', ':0', '-y', output_path], 

1030 ]: 

1031 try: 

1032 # ffmpeg is cross-platform — on Windows users with ffmpeg 

1033 # on PATH this would pop a brief cmd console even though 

1034 # the x11grab arg means the spawn fails immediately. 

1035 # Hide via canonical helper (no-op on macOS/Linux). 

1036 from core.subprocess_safe import hidden_popen_kwargs 

1037 proc = subprocess.Popen( 

1038 tool_cmd, 

1039 stdout=subprocess.DEVNULL, 

1040 stderr=subprocess.DEVNULL, 

1041 **hidden_popen_kwargs(), 

1042 ) 

1043 return jsonify({ 

1044 'recording': True, 

1045 'pid': proc.pid, 

1046 'path': output_path, 

1047 'filename': filename, 

1048 }) 

1049 except FileNotFoundError: 

1050 continue 

1051 

1052 return jsonify({'recording': False, 'error': 'No recording tool available'}), 501 

1053 

1054 @app.route('/api/shell/recording/stop', methods=['POST']) 

1055 @_require_shell_auth 

1056 def shell_recording_stop(): 

1057 """Stop screen recording.""" 

1058 data = request.get_json(force=True) if request.data else {} 

1059 pid = data.get('pid') 

1060 if pid: 

1061 try: 

1062 os.kill(pid, 2) # SIGINT 

1063 return jsonify({'stopped': True, 'pid': pid}) 

1064 except (ProcessLookupError, PermissionError) as e: 

1065 return jsonify({'error': str(e)}), 400 

1066 return jsonify({'error': 'pid required'}), 400 

1067 

1068 # ═══════════════════════════════════════════════════════════ 

1069 # Multi-Device Pairing (Compute Mesh UI bridge) 

1070 # ═══════════════════════════════════════════════════════════ 

1071 

1072 @app.route('/api/shell/devices', methods=['GET']) 

1073 def shell_devices_list(): 

1074 """List paired devices in the compute mesh.""" 

1075 try: 

1076 import requests as req 

1077 mesh_port = os.environ.get('MESH_TASK_RELAY_PORT', '6796') 

1078 resp = req.get(f'http://localhost:{mesh_port}/mesh/peers', timeout=3) 

1079 if resp.ok: 

1080 return jsonify(resp.json()) 

1081 except Exception: 

1082 pass 

1083 

1084 # Fallback: read peer files 

1085 peer_dir = os.environ.get( 

1086 'MESH_PEER_DIR', '/var/lib/hart/mesh/peers') 

1087 peers = [] 

1088 if os.path.isdir(peer_dir): 

1089 for fname in os.listdir(peer_dir): 

1090 if fname.endswith('.json'): 

1091 try: 

1092 with open(os.path.join(peer_dir, fname)) as f: 

1093 peers.append(json.load(f)) 

1094 except Exception: 

1095 pass 

1096 return jsonify({'peers': peers, 'count': len(peers)}) 

1097 

1098 @app.route('/api/shell/devices/pair', methods=['POST']) 

1099 @_require_shell_auth 

1100 def shell_devices_pair(): 

1101 """Initiate device pairing.""" 

1102 data = request.get_json(force=True) 

1103 address = data.get('address', '') 

1104 if not address: 

1105 return jsonify({'error': 'address required'}), 400 

1106 

1107 try: 

1108 import requests as req 

1109 mesh_port = os.environ.get('MESH_TASK_RELAY_PORT', '6796') 

1110 resp = req.post( 

1111 f'http://localhost:{mesh_port}/mesh/pair', 

1112 json={'peer_address': address}, timeout=10) 

1113 return jsonify(resp.json()) 

1114 except Exception as e: 

1115 return jsonify({'error': str(e), 'address': address}), 500 

1116 

1117 @app.route('/api/shell/devices/unpair', methods=['POST']) 

1118 @_require_shell_auth 

1119 def shell_devices_unpair(): 

1120 """Remove a paired device.""" 

1121 data = request.get_json(force=True) 

1122 device_id = data.get('device_id', '') 

1123 if not device_id: 

1124 return jsonify({'error': 'device_id required'}), 400 

1125 

1126 peer_dir = os.environ.get( 

1127 'MESH_PEER_DIR', '/var/lib/hart/mesh/peers') 

1128 peer_file = os.path.join(peer_dir, f'{device_id}.json') 

1129 if os.path.isfile(peer_file): 

1130 os.remove(peer_file) 

1131 return jsonify({'unpaired': device_id}) 

1132 return jsonify({'error': 'Device not found'}), 404 

1133 

1134 # ═══════════════════════════════════════════════════════════ 

1135 # OTA Update API (bridge to upgrade_orchestrator) 

1136 # ═══════════════════════════════════════════════════════════ 

1137 

1138 @app.route('/api/upgrades/status', methods=['GET']) 

1139 def upgrades_status(): 

1140 """Get current upgrade pipeline status.""" 

1141 try: 

1142 from integrations.agent_engine.upgrade_orchestrator import get_upgrade_orchestrator 

1143 orch = get_upgrade_orchestrator() 

1144 return jsonify(orch.get_status()) 

1145 except (ImportError, Exception) as e: 

1146 return jsonify({'stage': 'idle', 'error': str(e)}) 

1147 

1148 @app.route('/api/upgrades/start', methods=['POST']) 

1149 @_require_shell_auth 

1150 def upgrades_start(): 

1151 """Start upgrade pipeline.""" 

1152 data = request.get_json(force=True) 

1153 version = data.get('version', '') 

1154 sha = data.get('sha', '') 

1155 if not version: 

1156 return jsonify({'error': 'version required'}), 400 

1157 try: 

1158 from integrations.agent_engine.upgrade_orchestrator import get_upgrade_orchestrator 

1159 orch = get_upgrade_orchestrator() 

1160 result = orch.start_upgrade(version, sha) 

1161 return jsonify(result) 

1162 except Exception as e: 

1163 return jsonify({'error': str(e)}), 500 

1164 

1165 @app.route('/api/upgrades/advance', methods=['POST']) 

1166 def upgrades_advance(): 

1167 """Advance upgrade pipeline to next stage.""" 

1168 try: 

1169 from integrations.agent_engine.upgrade_orchestrator import get_upgrade_orchestrator 

1170 orch = get_upgrade_orchestrator() 

1171 result = orch.advance_pipeline() 

1172 return jsonify(result) 

1173 except Exception as e: 

1174 return jsonify({'error': str(e)}), 500 

1175 

1176 @app.route('/api/upgrades/rollback', methods=['POST']) 

1177 @_require_shell_auth 

1178 def upgrades_rollback(): 

1179 """Rollback current upgrade.""" 

1180 data = request.get_json(force=True) if request.data else {} 

1181 reason = data.get('reason', 'manual_rollback') 

1182 try: 

1183 from integrations.agent_engine.upgrade_orchestrator import get_upgrade_orchestrator 

1184 orch = get_upgrade_orchestrator() 

1185 result = orch.rollback(reason) 

1186 return jsonify(result) 

1187 except Exception as e: 

1188 return jsonify({'error': str(e)}), 500 

1189 

1190 # ─── Battery / Power Monitoring ───────────────────────── 

1191 

1192 @app.route('/api/shell/battery', methods=['GET']) 

1193 def shell_battery_status(): 

1194 """Battery status: level, charging state, time remaining.""" 

1195 bat_dir = '/sys/class/power_supply' 

1196 result = {'has_battery': False} 

1197 try: 

1198 if not os.path.isdir(bat_dir): 

1199 return jsonify(result) 

1200 for entry in os.listdir(bat_dir): 

1201 path = os.path.join(bat_dir, entry) 

1202 type_file = os.path.join(path, 'type') 

1203 if not os.path.isfile(type_file): 

1204 continue 

1205 with open(type_file) as f: 

1206 if f.read().strip() != 'Battery': 

1207 continue 

1208 result['has_battery'] = True 

1209 result['name'] = entry 

1210 cap_file = os.path.join(path, 'capacity') 

1211 if os.path.isfile(cap_file): 

1212 with open(cap_file) as f: 

1213 result['level'] = int(f.read().strip()) 

1214 status_file = os.path.join(path, 'status') 

1215 if os.path.isfile(status_file): 

1216 with open(status_file) as f: 

1217 result['charging'] = f.read().strip() 

1218 online_file = os.path.join(bat_dir, 'AC0', 'online') 

1219 if not os.path.isfile(online_file): 

1220 online_file = os.path.join(bat_dir, 'ADP1', 'online') 

1221 if os.path.isfile(online_file): 

1222 with open(online_file) as f: 

1223 result['ac_power'] = f.read().strip() == '1' 

1224 break 

1225 except Exception as e: 

1226 result['error'] = str(e) 

1227 return jsonify(result) 

1228 

1229 @app.route('/api/shell/power/lid', methods=['GET', 'PUT']) 

1230 def shell_lid_action(): 

1231 """Get/set lid close action (logind.conf HandleLidSwitch).""" 

1232 VALID_ACTIONS = {'suspend', 'hibernate', 'poweroff', 'lock', 'ignore'} 

1233 if request.method == 'GET': 

1234 action = 'suspend' # default 

1235 try: 

1236 import configparser 

1237 cp = configparser.ConfigParser() 

1238 cp.read('/etc/systemd/logind.conf') 

1239 action = cp.get('Login', 'HandleLidSwitch', fallback='suspend') 

1240 except Exception: 

1241 pass 

1242 return jsonify({'action': action, 'valid_actions': sorted(VALID_ACTIONS)}) 

1243 body = request.get_json(silent=True) or {} 

1244 action = body.get('action', '') 

1245 if action not in VALID_ACTIONS: 

1246 return jsonify({'error': f'Invalid action. Must be one of: {sorted(VALID_ACTIONS)}'}), 400 

1247 return jsonify({'status': 'ok', 'action': action, 

1248 'note': 'Requires root to modify logind.conf'}) 

1249 

1250 # ─── WiFi Management ──────────────────────────────────── 

1251 

1252 @app.route('/api/shell/wifi/scan', methods=['GET']) 

1253 def shell_wifi_scan(): 

1254 """Scan for available WiFi networks.""" 

1255 try: 

1256 r = subprocess.run(['nmcli', '-t', '-f', 'SSID,SIGNAL,SECURITY,BSSID', 

1257 'dev', 'wifi', 'list', '--rescan', 'yes'], 

1258 capture_output=True, text=True, timeout=30) 

1259 if r.returncode != 0: 

1260 return jsonify({'networks': [], 'error': r.stderr.strip()}) 

1261 networks = [] 

1262 for line in r.stdout.strip().split('\n'): 

1263 if not line.strip(): 

1264 continue 

1265 parts = line.split(':') 

1266 if len(parts) >= 3: 

1267 networks.append({ 

1268 'ssid': parts[0], 

1269 'signal': int(parts[1]) if parts[1].isdigit() else 0, 

1270 'security': parts[2] if len(parts) > 2 else '', 

1271 'bssid': parts[3] if len(parts) > 3 else '', 

1272 }) 

1273 return jsonify({'networks': networks}) 

1274 except FileNotFoundError: 

1275 return jsonify({'networks': [], 'error': 'nmcli not available'}) 

1276 except subprocess.TimeoutExpired: 

1277 return jsonify({'networks': [], 'error': 'WiFi scan timed out'}) 

1278 

1279 @app.route('/api/shell/wifi/connect', methods=['POST']) 

1280 def shell_wifi_connect(): 

1281 """Connect to a WiFi network.""" 

1282 body = request.get_json(silent=True) or {} 

1283 ssid = body.get('ssid', '') 

1284 password = body.get('password', '') 

1285 if not ssid: 

1286 return jsonify({'error': 'ssid is required'}), 400 

1287 try: 

1288 cmd = ['nmcli', 'dev', 'wifi', 'connect', ssid] 

1289 if password: 

1290 cmd += ['password', password] 

1291 r = subprocess.run(cmd, capture_output=True, text=True, timeout=30) 

1292 if r.returncode == 0: 

1293 return jsonify({'status': 'connected', 'ssid': ssid}) 

1294 return jsonify({'status': 'failed', 'error': r.stderr.strip()}), 400 

1295 except FileNotFoundError: 

1296 return jsonify({'error': 'nmcli not available'}), 500 

1297 except subprocess.TimeoutExpired: 

1298 return jsonify({'error': 'Connection timed out'}), 504 

1299 

1300 @app.route('/api/shell/wifi/disconnect', methods=['POST']) 

1301 def shell_wifi_disconnect(): 

1302 """Disconnect from current WiFi network.""" 

1303 try: 

1304 r = subprocess.run(['nmcli', 'dev', 'disconnect', 'wifi'], 

1305 capture_output=True, text=True, timeout=10) 

1306 return jsonify({'status': 'disconnected' if r.returncode == 0 else 'error', 

1307 'message': r.stdout.strip() or r.stderr.strip()}) 

1308 except FileNotFoundError: 

1309 return jsonify({'error': 'nmcli not available'}), 500 

1310 

1311 @app.route('/api/shell/wifi/forget', methods=['POST']) 

1312 def shell_wifi_forget(): 

1313 """Forget (delete) a saved WiFi connection.""" 

1314 body = request.get_json(silent=True) or {} 

1315 name = body.get('name', '') 

1316 if not name: 

1317 return jsonify({'error': 'name is required'}), 400 

1318 try: 

1319 r = subprocess.run(['nmcli', 'connection', 'delete', name], 

1320 capture_output=True, text=True, timeout=10) 

1321 return jsonify({'status': 'ok' if r.returncode == 0 else 'error', 

1322 'message': r.stdout.strip() or r.stderr.strip()}) 

1323 except FileNotFoundError: 

1324 return jsonify({'error': 'nmcli not available'}), 500 

1325 

1326 @app.route('/api/shell/wifi/status', methods=['GET']) 

1327 def shell_wifi_status(): 

1328 """Current WiFi connection status.""" 

1329 try: 

1330 r = subprocess.run(['nmcli', '-t', '-f', 'NAME,TYPE,DEVICE,STATE', 

1331 'connection', 'show', '--active'], 

1332 capture_output=True, text=True, timeout=10) 

1333 wifi_conn = None 

1334 for line in (r.stdout or '').strip().split('\n'): 

1335 parts = line.split(':') 

1336 if len(parts) >= 4 and 'wireless' in parts[1].lower(): 

1337 wifi_conn = {'name': parts[0], 'device': parts[2], 'state': parts[3]} 

1338 break 

1339 return jsonify({'connected': wifi_conn is not None, 

1340 'connection': wifi_conn}) 

1341 except FileNotFoundError: 

1342 return jsonify({'connected': False, 'error': 'nmcli not available'}) 

1343 

1344 # ─── VPN Management ───────────────────────────────────── 

1345 

1346 @app.route('/api/shell/vpn/list', methods=['GET']) 

1347 def shell_vpn_list(): 

1348 """List VPN connections (active and saved).""" 

1349 try: 

1350 r = subprocess.run(['nmcli', '-t', '-f', 'NAME,TYPE,STATE', 

1351 'connection', 'show'], 

1352 capture_output=True, text=True, timeout=10) 

1353 vpns = [] 

1354 for line in (r.stdout or '').strip().split('\n'): 

1355 parts = line.split(':') 

1356 if len(parts) >= 3 and 'vpn' in parts[1].lower(): 

1357 vpns.append({'name': parts[0], 'type': parts[1], 

1358 'state': parts[2]}) 

1359 return jsonify({'vpns': vpns}) 

1360 except FileNotFoundError: 

1361 return jsonify({'vpns': [], 'error': 'nmcli not available'}) 

1362 

1363 @app.route('/api/shell/vpn/connect', methods=['POST']) 

1364 def shell_vpn_connect(): 

1365 """Connect to a VPN by name.""" 

1366 body = request.get_json(silent=True) or {} 

1367 name = body.get('name', '') 

1368 if not name: 

1369 return jsonify({'error': 'name is required'}), 400 

1370 try: 

1371 r = subprocess.run(['nmcli', 'connection', 'up', name], 

1372 capture_output=True, text=True, timeout=30) 

1373 if r.returncode == 0: 

1374 return jsonify({'status': 'connected', 'name': name}) 

1375 return jsonify({'status': 'failed', 'error': r.stderr.strip()}), 400 

1376 except FileNotFoundError: 

1377 return jsonify({'error': 'nmcli not available'}), 500 

1378 

1379 @app.route('/api/shell/vpn/disconnect', methods=['POST']) 

1380 def shell_vpn_disconnect(): 

1381 """Disconnect a VPN connection.""" 

1382 body = request.get_json(silent=True) or {} 

1383 name = body.get('name', '') 

1384 if not name: 

1385 return jsonify({'error': 'name is required'}), 400 

1386 try: 

1387 r = subprocess.run(['nmcli', 'connection', 'down', name], 

1388 capture_output=True, text=True, timeout=10) 

1389 return jsonify({'status': 'disconnected' if r.returncode == 0 else 'error', 

1390 'message': r.stdout.strip() or r.stderr.strip()}) 

1391 except FileNotFoundError: 

1392 return jsonify({'error': 'nmcli not available'}), 500 

1393 

1394 @app.route('/api/shell/vpn/import', methods=['POST']) 

1395 def shell_vpn_import(): 

1396 """Import a VPN config file (WireGuard .conf or OpenVPN .ovpn).""" 

1397 body = request.get_json(silent=True) or {} 

1398 path = body.get('path', '') 

1399 vpn_type = body.get('type', '') 

1400 if not path: 

1401 return jsonify({'error': 'path is required'}), 400 

1402 if not os.path.isfile(path): 

1403 return jsonify({'error': 'File not found'}), 404 

1404 if not vpn_type: 

1405 if path.endswith('.conf'): 

1406 vpn_type = 'wireguard' 

1407 elif path.endswith('.ovpn'): 

1408 vpn_type = 'openvpn' 

1409 else: 

1410 return jsonify({'error': 'type is required (wireguard or openvpn)'}), 400 

1411 try: 

1412 r = subprocess.run(['nmcli', 'connection', 'import', 'type', vpn_type, 

1413 'file', path], 

1414 capture_output=True, text=True, timeout=15) 

1415 if r.returncode == 0: 

1416 return jsonify({'status': 'imported', 'message': r.stdout.strip()}) 

1417 return jsonify({'status': 'failed', 'error': r.stderr.strip()}), 400 

1418 except FileNotFoundError: 

1419 return jsonify({'error': 'nmcli not available'}), 500 

1420 

1421 # ─── Trash / Recycle Bin (freedesktop spec) ───────────── 

1422 

1423 def _trash_dir(): 

1424 return os.path.join(os.path.expanduser('~'), '.local', 'share', 'Trash') 

1425 

1426 @app.route('/api/shell/trash', methods=['GET']) 

1427 def shell_trash_list(): 

1428 """List items in trash.""" 

1429 info_dir = os.path.join(_trash_dir(), 'info') 

1430 items = [] 

1431 if os.path.isdir(info_dir): 

1432 for fname in os.listdir(info_dir): 

1433 if not fname.endswith('.trashinfo'): 

1434 continue 

1435 info_path = os.path.join(info_dir, fname) 

1436 try: 

1437 import configparser 

1438 cp = configparser.ConfigParser() 

1439 cp.read(info_path) 

1440 items.append({ 

1441 'name': fname.replace('.trashinfo', ''), 

1442 'original_path': cp.get('Trash Info', 'Path', fallback=''), 

1443 'deletion_date': cp.get('Trash Info', 'DeletionDate', fallback=''), 

1444 }) 

1445 except Exception: 

1446 items.append({'name': fname.replace('.trashinfo', '')}) 

1447 return jsonify({'items': items, 'total': len(items)}) 

1448 

1449 @app.route('/api/shell/trash', methods=['POST']) 

1450 def shell_trash_file(): 

1451 """Move a file to trash (instead of permanent delete).""" 

1452 body = request.get_json(silent=True) or {} 

1453 path = body.get('path', '') 

1454 if not path or not os.path.exists(path): 

1455 return jsonify({'error': 'path is required and must exist'}), 400 

1456 trash = _trash_dir() 

1457 files_dir = os.path.join(trash, 'files') 

1458 info_dir = os.path.join(trash, 'info') 

1459 os.makedirs(files_dir, exist_ok=True) 

1460 os.makedirs(info_dir, exist_ok=True) 

1461 basename = os.path.basename(path) 

1462 dest = os.path.join(files_dir, basename) 

1463 # Handle name collision 

1464 counter = 1 

1465 while os.path.exists(dest): 

1466 name, ext = os.path.splitext(basename) 

1467 dest = os.path.join(files_dir, f"{name}.{counter}{ext}") 

1468 counter += 1 

1469 final_name = os.path.basename(dest) 

1470 try: 

1471 import shutil 

1472 shutil.move(path, dest) 

1473 from datetime import datetime 

1474 info_content = ( 

1475 "[Trash Info]\n" 

1476 f"Path={os.path.abspath(path)}\n" 

1477 f"DeletionDate={datetime.now().strftime('%Y-%m-%dT%H:%M:%S')}\n" 

1478 ) 

1479 info_path = os.path.join(info_dir, final_name + '.trashinfo') 

1480 with open(info_path, 'w') as f: 

1481 f.write(info_content) 

1482 return jsonify({'status': 'trashed', 'name': final_name}) 

1483 except Exception as e: 

1484 return jsonify({'error': str(e)}), 500 

1485 

1486 @app.route('/api/shell/trash/restore', methods=['POST']) 

1487 def shell_trash_restore(): 

1488 """Restore a file from trash to its original location.""" 

1489 body = request.get_json(silent=True) or {} 

1490 name = body.get('name', '') 

1491 if not name: 

1492 return jsonify({'error': 'name is required'}), 400 

1493 trash = _trash_dir() 

1494 file_path = os.path.join(trash, 'files', name) 

1495 info_path = os.path.join(trash, 'info', name + '.trashinfo') 

1496 if not os.path.exists(file_path): 

1497 return jsonify({'error': 'Item not found in trash'}), 404 

1498 original_path = '' 

1499 try: 

1500 import configparser 

1501 cp = configparser.ConfigParser() 

1502 cp.read(info_path) 

1503 original_path = cp.get('Trash Info', 'Path', fallback='') 

1504 except Exception: 

1505 pass 

1506 if not original_path: 

1507 return jsonify({'error': 'Cannot determine original path'}), 400 

1508 try: 

1509 import shutil 

1510 os.makedirs(os.path.dirname(original_path), exist_ok=True) 

1511 shutil.move(file_path, original_path) 

1512 if os.path.isfile(info_path): 

1513 os.remove(info_path) 

1514 return jsonify({'status': 'restored', 'path': original_path}) 

1515 except Exception as e: 

1516 return jsonify({'error': str(e)}), 500 

1517 

1518 @app.route('/api/shell/trash/empty', methods=['POST']) 

1519 def shell_trash_empty(): 

1520 """Empty the trash permanently.""" 

1521 trash = _trash_dir() 

1522 count = 0 

1523 import shutil 

1524 for subdir in ['files', 'info']: 

1525 d = os.path.join(trash, subdir) 

1526 if os.path.isdir(d): 

1527 for item in os.listdir(d): 

1528 item_path = os.path.join(d, item) 

1529 try: 

1530 if os.path.isdir(item_path): 

1531 shutil.rmtree(item_path) 

1532 else: 

1533 os.remove(item_path) 

1534 count += 1 

1535 except Exception: 

1536 pass 

1537 return jsonify({'status': 'emptied', 'removed': count}) 

1538 

1539 # ─── Notes App ────────────────────────────────────────── 

1540 

1541 _NOTES_DIR = os.path.join(os.path.dirname(os.path.dirname( 

1542 os.path.dirname(os.path.abspath(__file__)))), 'agent_data', 'notes') 

1543 

1544 @app.route('/api/shell/notes', methods=['GET']) 

1545 def shell_notes_list(): 

1546 """List all notes.""" 

1547 os.makedirs(_NOTES_DIR, exist_ok=True) 

1548 notes = [] 

1549 for fname in sorted(os.listdir(_NOTES_DIR)): 

1550 if fname.endswith('.json'): 

1551 try: 

1552 with open(os.path.join(_NOTES_DIR, fname)) as f: 

1553 note = json.load(f) 

1554 note['id'] = fname.replace('.json', '') 

1555 notes.append(note) 

1556 except Exception: 

1557 pass 

1558 return jsonify({'notes': notes}) 

1559 

1560 @app.route('/api/shell/notes', methods=['POST']) 

1561 def shell_notes_save(): 

1562 """Save a new note.""" 

1563 body = request.get_json(silent=True) or {} 

1564 title = body.get('title', 'Untitled') 

1565 content = body.get('content', '') 

1566 if not content: 

1567 return jsonify({'error': 'content is required'}), 400 

1568 os.makedirs(_NOTES_DIR, exist_ok=True) 

1569 from datetime import datetime 

1570 note_id = f"note_{int(time.time() * 1000)}" 

1571 note = {'title': title, 'content': content, 

1572 'created': datetime.now().isoformat(), 

1573 'modified': datetime.now().isoformat()} 

1574 with open(os.path.join(_NOTES_DIR, f'{note_id}.json'), 'w') as f: 

1575 json.dump(note, f, indent=2) 

1576 return jsonify({'status': 'saved', 'id': note_id}), 201 

1577 

1578 @app.route('/api/shell/notes/<note_id>', methods=['DELETE']) 

1579 def shell_notes_delete(note_id): 

1580 """Delete a note.""" 

1581 path = os.path.join(_NOTES_DIR, f'{note_id}.json') 

1582 if not os.path.isfile(path): 

1583 return jsonify({'error': 'Note not found'}), 404 

1584 os.remove(path) 

1585 return jsonify({'status': 'deleted', 'id': note_id}) 

1586 

1587 # ─── Media Player (open-with) ─────────────────────────── 

1588 

1589 @app.route('/api/shell/open-with', methods=['POST']) 

1590 def shell_open_with(): 

1591 """Open a file with the system's default application.""" 

1592 body = request.get_json(silent=True) or {} 

1593 path = body.get('path', '') 

1594 if not path: 

1595 return jsonify({'error': 'path is required'}), 400 

1596 if not os.path.isfile(path): 

1597 return jsonify({'error': 'File not found'}), 404 

1598 # Sandbox check 

1599 resolved = os.path.realpath(path) 

1600 allowed_roots = [os.path.expanduser('~'), '/tmp', '/var/tmp'] 

1601 if not any(resolved.startswith(root) for root in allowed_roots): 

1602 return jsonify({'error': 'Path outside allowed directories'}), 403 

1603 try: 

1604 subprocess.Popen(['xdg-open', resolved], 

1605 stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) 

1606 return jsonify({'status': 'opened', 'path': resolved}) 

1607 except FileNotFoundError: 

1608 return jsonify({'error': 'xdg-open not available'}), 500 

1609 

1610 # ── Self-Build API ────────────────────────────────────────── 

1611 # Runtime OS rebuilding — the OS modifies and rebuilds itself 

1612 

1613 @app.route('/api/system/self-build/status', methods=['GET']) 

1614 def _self_build_status(): 

1615 """Current self-build status and generation info.""" 

1616 from flask import jsonify 

1617 info = {'self_build_available': False} 

1618 

1619 try: 

1620 result = subprocess.run( 

1621 ['nixos-version'], capture_output=True, text=True, timeout=5) 

1622 if result.returncode == 0: 

1623 info['nixos_version'] = result.stdout.strip() 

1624 info['self_build_available'] = True 

1625 except Exception: 

1626 pass 

1627 

1628 # Current generation 

1629 gen_link = '/nix/var/nix/profiles/system' 

1630 if os.path.islink(gen_link): 

1631 info['current_generation'] = os.readlink(gen_link) 

1632 

1633 # Runtime config exists? 

1634 runtime_nix = '/etc/hart/runtime.nix' 

1635 info['runtime_config_exists'] = os.path.isfile(runtime_nix) 

1636 

1637 # Build history 

1638 history_file = '/var/lib/hart/ota/history/builds.jsonl' 

1639 if os.path.isfile(history_file): 

1640 try: 

1641 with open(history_file) as f: 

1642 lines = f.readlines() 

1643 info['recent_builds'] = [ 

1644 json.loads(l) for l in lines[-5:] if l.strip()] 

1645 except Exception: 

1646 pass 

1647 

1648 return jsonify(info) 

1649 

1650 @app.route('/api/system/self-build/packages', methods=['GET']) 

1651 def _self_build_packages(): 

1652 """List runtime-installed packages.""" 

1653 from flask import jsonify 

1654 runtime_nix = '/etc/hart/runtime.nix' 

1655 packages = [] 

1656 if os.path.isfile(runtime_nix): 

1657 try: 

1658 with open(runtime_nix) as f: 

1659 in_packages = False 

1660 for line in f: 

1661 stripped = line.strip() 

1662 if 'systemPackages' in stripped: 

1663 in_packages = True 

1664 continue 

1665 if in_packages and stripped == '];': 

1666 break 

1667 if in_packages and stripped and not stripped.startswith('#'): 

1668 packages.append(stripped) 

1669 except Exception: 

1670 pass 

1671 return jsonify({'packages': packages}) 

1672 

1673 @app.route('/api/system/self-build/install', methods=['POST']) 

1674 def _self_build_install(): 

1675 """Add a package to runtime config (requires self-build to apply).""" 

1676 from flask import request, jsonify 

1677 data = request.get_json(silent=True) or {} 

1678 package = data.get('package', '').strip() 

1679 if not package or not package.replace('-', '').replace('_', '').isalnum(): 

1680 return jsonify({'error': 'Invalid package name'}), 400 

1681 

1682 runtime_nix = '/etc/hart/runtime.nix' 

1683 if not os.path.isfile(runtime_nix): 

1684 return jsonify({'error': 'Runtime config not found'}), 404 

1685 

1686 try: 

1687 with open(runtime_nix) as f: 

1688 content = f.read() 

1689 if package in content: 

1690 return jsonify({'status': 'already_installed', 'package': package}) 

1691 content = content.replace( 

1692 '# Packages added at runtime appear here', 

1693 f'# Packages added at runtime appear here\n {package}') 

1694 with open(runtime_nix, 'w') as f: 

1695 f.write(content) 

1696 return jsonify({ 

1697 'status': 'staged', 

1698 'package': package, 

1699 'message': 'Run self-build to apply', 

1700 }) 

1701 except PermissionError: 

1702 return jsonify({'error': 'Permission denied'}), 403 

1703 

1704 @app.route('/api/system/self-build/remove', methods=['POST']) 

1705 def _self_build_remove(): 

1706 """Remove a package from runtime config.""" 

1707 from flask import request, jsonify 

1708 data = request.get_json(silent=True) or {} 

1709 package = data.get('package', '').strip() 

1710 if not package: 

1711 return jsonify({'error': 'Package name required'}), 400 

1712 

1713 runtime_nix = '/etc/hart/runtime.nix' 

1714 if not os.path.isfile(runtime_nix): 

1715 return jsonify({'error': 'Runtime config not found'}), 404 

1716 

1717 try: 

1718 with open(runtime_nix) as f: 

1719 lines = f.readlines() 

1720 new_lines = [l for l in lines if package not in l.strip() 

1721 or l.strip().startswith('#')] 

1722 if len(new_lines) == len(lines): 

1723 return jsonify({'status': 'not_found', 'package': package}) 

1724 with open(runtime_nix, 'w') as f: 

1725 f.writelines(new_lines) 

1726 return jsonify({ 

1727 'status': 'staged_removal', 

1728 'package': package, 

1729 'message': 'Run self-build to apply', 

1730 }) 

1731 except PermissionError: 

1732 return jsonify({'error': 'Permission denied'}), 403 

1733 

1734 @app.route('/api/system/self-build/trigger', methods=['POST']) 

1735 def _self_build_trigger(): 

1736 """Trigger a self-build (dry-run or switch).""" 

1737 from flask import request, jsonify 

1738 data = request.get_json(silent=True) or {} 

1739 mode = data.get('mode', 'dry-run') 

1740 if mode not in ('dry-run', 'switch', 'diff'): 

1741 return jsonify({'error': 'Mode must be dry-run, switch, or diff'}), 400 

1742 

1743 try: 

1744 result = subprocess.run( 

1745 ['hart-self-build', mode], 

1746 capture_output=True, text=True, timeout=600) 

1747 return jsonify({ 

1748 'status': 'completed' if result.returncode == 0 else 'failed', 

1749 'mode': mode, 

1750 'returncode': result.returncode, 

1751 'output': result.stdout[-2000:] if result.stdout else '', 

1752 'errors': result.stderr[-1000:] if result.stderr else '', 

1753 }) 

1754 except subprocess.TimeoutExpired: 

1755 return jsonify({'error': 'Build timed out (10 min limit)'}), 504 

1756 except FileNotFoundError: 

1757 return jsonify({'error': 'hart-self-build not available (not on NixOS?)'}), 501 

1758 

1759 @app.route('/api/system/generations', methods=['GET']) 

1760 def _system_generations(): 

1761 """List NixOS generations (rollback targets).""" 

1762 from flask import jsonify 

1763 generations = [] 

1764 profile_dir = '/nix/var/nix/profiles' 

1765 if os.path.isdir(profile_dir): 

1766 try: 

1767 for entry in sorted(os.listdir(profile_dir), reverse=True): 

1768 if entry.startswith('system-') and entry.endswith('-link'): 

1769 gen_num = entry.replace('system-', '').replace('-link', '') 

1770 target = os.readlink(os.path.join(profile_dir, entry)) 

1771 generations.append({ 

1772 'generation': gen_num, 

1773 'path': target, 

1774 }) 

1775 except Exception: 

1776 pass 

1777 current = '' 

1778 if os.path.islink(os.path.join(profile_dir, 'system')): 

1779 current = os.readlink(os.path.join(profile_dir, 'system')) 

1780 return jsonify({ 

1781 'current': current, 

1782 'generations': generations[:20], 

1783 }) 

1784 

1785 @app.route('/api/system/rollback', methods=['POST']) 

1786 def _system_rollback(): 

1787 """Rollback to previous NixOS generation.""" 

1788 from flask import jsonify 

1789 try: 

1790 result = subprocess.run( 

1791 ['sudo', 'nixos-rebuild', 'switch', '--rollback'], 

1792 capture_output=True, text=True, timeout=300) 

1793 return jsonify({ 

1794 'status': 'rolled_back' if result.returncode == 0 else 'failed', 

1795 'output': result.stdout[-2000:] if result.stdout else '', 

1796 }) 

1797 except FileNotFoundError: 

1798 return jsonify({'error': 'nixos-rebuild not available'}), 501 

1799 

1800 # ─── Cloud File Sync (rclone wrapper) ──────────────────── 

1801 

1802 _SYNC_CONFIG = os.path.expanduser('~/.config/hart/cloud-sync.json') 

1803 

1804 def _load_sync_config(): 

1805 try: 

1806 with open(_SYNC_CONFIG) as f: 

1807 return json.load(f) 

1808 except (FileNotFoundError, json.JSONDecodeError): 

1809 return {'remotes': [], 'sync_pairs': []} 

1810 

1811 def _save_sync_config(data): 

1812 os.makedirs(os.path.dirname(_SYNC_CONFIG), exist_ok=True) 

1813 with open(_SYNC_CONFIG, 'w') as f: 

1814 json.dump(data, f, indent=2) 

1815 

1816 def _sync_run(cmd, timeout=10): 

1817 """Run a subprocess for cloud sync, returning result or None.""" 

1818 try: 

1819 return subprocess.run(cmd, capture_output=True, text=True, 

1820 timeout=timeout) 

1821 except (FileNotFoundError, subprocess.TimeoutExpired): 

1822 return None 

1823 

1824 @app.route('/api/shell/cloud-sync/remotes', methods=['GET']) 

1825 def shell_sync_remotes(): 

1826 """List configured rclone remotes.""" 

1827 r = _sync_run(['rclone', 'listremotes'], timeout=10) 

1828 remotes = [] 

1829 if r and r.returncode == 0: 

1830 for line in r.stdout.strip().split('\n'): 

1831 name = line.strip().rstrip(':') 

1832 if name: 

1833 remotes.append({'name': name}) 

1834 return jsonify({'remotes': remotes, 'rclone_available': r is not None}) 

1835 

1836 @app.route('/api/shell/cloud-sync/pairs', methods=['GET']) 

1837 def shell_sync_pairs(): 

1838 """List configured sync pairs (local ↔ remote).""" 

1839 cfg = _load_sync_config() 

1840 return jsonify({'pairs': cfg.get('sync_pairs', [])}) 

1841 

1842 @app.route('/api/shell/cloud-sync/pairs', methods=['POST']) 

1843 @_require_shell_auth 

1844 def shell_sync_add_pair(): 

1845 """Add a sync pair (local dir ↔ remote path).""" 

1846 body = request.get_json(silent=True) or {} 

1847 local_path = body.get('local_path') 

1848 remote_path = body.get('remote_path') 

1849 direction = body.get('direction', 'bisync') # sync, bisync, copy 

1850 

1851 if not local_path or not remote_path: 

1852 return jsonify({'error': 'local_path and remote_path required'}), 400 

1853 

1854 # Path safety 

1855 real = os.path.realpath(os.path.expanduser(local_path)) 

1856 home = os.path.realpath(os.path.expanduser('~')) 

1857 if not real.startswith(home): 

1858 return jsonify({'error': 'local_path must be under home directory'}), 403 

1859 

1860 cfg = _load_sync_config() 

1861 pair = { 

1862 'id': f"pair_{int(time.time())}", 

1863 'local_path': local_path, 

1864 'remote_path': remote_path, 

1865 'direction': direction, 

1866 'created': time.time(), 

1867 'last_sync': None, 

1868 } 

1869 cfg.setdefault('sync_pairs', []).append(pair) 

1870 _save_sync_config(cfg) 

1871 

1872 _audit_shell_op('cloud_sync_add', { 

1873 'local': local_path, 'remote': remote_path}) 

1874 return jsonify({'added': True, 'pair': pair}), 201 

1875 

1876 @app.route('/api/shell/cloud-sync/pairs/<pair_id>', methods=['DELETE']) 

1877 @_require_shell_auth 

1878 def shell_sync_remove_pair(pair_id): 

1879 """Remove a sync pair.""" 

1880 cfg = _load_sync_config() 

1881 pairs = cfg.get('sync_pairs', []) 

1882 cfg['sync_pairs'] = [p for p in pairs if p.get('id') != pair_id] 

1883 _save_sync_config(cfg) 

1884 _audit_shell_op('cloud_sync_remove', {'pair_id': pair_id}) 

1885 return jsonify({'removed': True}) 

1886 

1887 @app.route('/api/shell/cloud-sync/run', methods=['POST']) 

1888 @_require_shell_auth 

1889 def shell_sync_run(): 

1890 """Trigger sync for a specific pair or all pairs.""" 

1891 body = request.get_json(silent=True) or {} 

1892 pair_id = body.get('pair_id') 

1893 

1894 cfg = _load_sync_config() 

1895 pairs = cfg.get('sync_pairs', []) 

1896 if pair_id: 

1897 pairs = [p for p in pairs if p.get('id') == pair_id] 

1898 

1899 if not pairs: 

1900 return jsonify({'error': 'No sync pairs configured'}), 400 

1901 

1902 results = [] 

1903 for pair in pairs: 

1904 local = pair['local_path'] 

1905 remote = pair['remote_path'] 

1906 direction = pair.get('direction', 'sync') 

1907 

1908 cmd = ['rclone', direction, local, remote] 

1909 if direction == 'bisync': 

1910 cmd = ['rclone', 'bisync', local, remote, '--resync'] 

1911 

1912 r = _sync_run(cmd, timeout=300) 

1913 success = r is not None and r.returncode == 0 

1914 results.append({ 

1915 'pair_id': pair.get('id'), 

1916 'success': success, 

1917 'error': r.stderr[:500] if r and not success else None, 

1918 }) 

1919 if success: 

1920 pair['last_sync'] = time.time() 

1921 

1922 _save_sync_config(cfg) 

1923 _audit_shell_op('cloud_sync_run', { 

1924 'pairs': len(results), 

1925 'success': sum(1 for r in results if r['success'])}) 

1926 return jsonify({'results': results}) 

1927 

1928 @app.route('/api/shell/cloud-sync/status', methods=['GET']) 

1929 def shell_sync_status(): 

1930 """Get sync status for all pairs.""" 

1931 cfg = _load_sync_config() 

1932 # Check if rclone is available 

1933 r = _sync_run(['rclone', 'version'], timeout=5) 

1934 return jsonify({ 

1935 'rclone_installed': r is not None and r.returncode == 0, 

1936 'rclone_version': r.stdout.split('\n')[0] if r and r.returncode == 0 else None, 

1937 'pairs': cfg.get('sync_pairs', []), 

1938 'total_pairs': len(cfg.get('sync_pairs', [])), 

1939 }) 

1940 

1941 # ─── App Store APIs ───────────────────────────────────── 

1942 

1943 @app.route('/api/apps/search', methods=['GET']) 

1944 def shell_app_search(): 

1945 """Search for installable apps across all platforms.""" 

1946 query = request.args.get('q', '') 

1947 platform = request.args.get('platform') 

1948 limit = int(request.args.get('limit', 20)) 

1949 if not query: 

1950 return jsonify({'error': 'q parameter required'}), 400 

1951 

1952 results = [] 

1953 try: 

1954 from integrations.agent_engine.app_installer import AppInstaller 

1955 installer = AppInstaller() 

1956 results = installer.search(query, platform=platform, limit=limit) 

1957 except (ImportError, Exception) as e: 

1958 logger.debug(f"App search error: {e}") 

1959 

1960 return jsonify({'query': query, 'results': results, 'count': len(results)}) 

1961 

1962 @app.route('/api/apps/installed', methods=['GET']) 

1963 def shell_app_installed(): 

1964 """List installed applications.""" 

1965 platform = request.args.get('platform') 

1966 apps = [] 

1967 try: 

1968 from integrations.agent_engine.app_installer import AppInstaller 

1969 installer = AppInstaller() 

1970 apps = installer.list_installed(platform=platform) 

1971 except (ImportError, Exception) as e: 

1972 logger.debug(f"App list error: {e}") 

1973 

1974 # Also include AppRegistry entries 

1975 try: 

1976 from core.platform.app_registry import get_app_registry 

1977 registry = get_app_registry() 

1978 for manifest in registry.list_all(): 

1979 if not any(a.get('name') == manifest.name for a in apps): 

1980 apps.append({ 

1981 'name': manifest.name, 'id': manifest.id, 

1982 'type': manifest.type, 'icon': manifest.icon, 

1983 'group': manifest.group, 

1984 }) 

1985 except (ImportError, Exception): 

1986 pass 

1987 

1988 return jsonify({'apps': apps, 'count': len(apps)}) 

1989 

1990 @app.route('/api/apps/install', methods=['POST']) 

1991 @_require_shell_auth 

1992 def shell_app_install(): 

1993 """Install an application.""" 

1994 body = request.get_json(silent=True) or {} 

1995 source = body.get('source') 

1996 platform = body.get('platform') 

1997 name = body.get('name') 

1998 if not source: 

1999 return jsonify({'error': 'source required'}), 400 

2000 

2001 try: 

2002 from integrations.agent_engine.app_installer import AppInstaller 

2003 installer = AppInstaller() 

2004 result = installer.install(source, platform=platform, name=name) 

2005 _audit_shell_op('app_install', {'source': source, 'platform': platform}) 

2006 return jsonify(result) 

2007 except Exception as e: 

2008 return jsonify({'error': str(e)}), 500 

2009 

2010 @app.route('/api/apps/uninstall', methods=['POST']) 

2011 @_require_shell_auth 

2012 def shell_app_uninstall(): 

2013 """Uninstall an application.""" 

2014 body = request.get_json(silent=True) or {} 

2015 app_id = body.get('app_id') 

2016 platform = body.get('platform') 

2017 if not app_id: 

2018 return jsonify({'error': 'app_id required'}), 400 

2019 

2020 try: 

2021 from integrations.agent_engine.app_installer import AppInstaller 

2022 installer = AppInstaller() 

2023 result = installer.uninstall(app_id, platform=platform) 

2024 _audit_shell_op('app_uninstall', {'app_id': app_id}) 

2025 return jsonify(result) 

2026 except Exception as e: 

2027 return jsonify({'error': str(e)}), 500 

2028 

2029 # ─── App Permissions APIs ───────────────────────────────── 

2030 

2031 _PERMISSIONS_FILE = os.path.expanduser('~/.config/hart/app-permissions.json') 

2032 

2033 def _load_permissions(): 

2034 try: 

2035 with open(_PERMISSIONS_FILE) as f: 

2036 return json.load(f) 

2037 except (FileNotFoundError, json.JSONDecodeError): 

2038 return {} 

2039 

2040 def _save_permissions(data): 

2041 os.makedirs(os.path.dirname(_PERMISSIONS_FILE), exist_ok=True) 

2042 with open(_PERMISSIONS_FILE, 'w') as f: 

2043 json.dump(data, f, indent=2) 

2044 

2045 @app.route('/api/apps/<app_id>/permissions', methods=['GET']) 

2046 def shell_app_permissions(app_id): 

2047 """Get permissions for an installed app.""" 

2048 perms = _load_permissions() 

2049 app_perms = perms.get(app_id, {}) 

2050 

2051 # Merge with manifest-declared permissions 

2052 manifest_perms = [] 

2053 try: 

2054 from core.platform.app_registry import get_app_registry 

2055 registry = get_app_registry() 

2056 manifest = registry.get(app_id) 

2057 if manifest and hasattr(manifest, 'permissions'): 

2058 manifest_perms = manifest.permissions or [] 

2059 except (ImportError, Exception): 

2060 pass 

2061 

2062 result = [] 

2063 all_types = set(app_perms.keys()) | {p.get('type', p) if isinstance(p, dict) 

2064 else p for p in manifest_perms} 

2065 for ptype in sorted(all_types): 

2066 entry = app_perms.get(ptype, {}) 

2067 result.append({ 

2068 'type': ptype, 

2069 'granted': entry.get('granted', True), # Default: granted 

2070 'requested': ptype in {p.get('type', p) if isinstance(p, dict) 

2071 else p for p in manifest_perms}, 

2072 }) 

2073 

2074 return jsonify({'app_id': app_id, 'permissions': result}) 

2075 

2076 @app.route('/api/apps/<app_id>/permission/<perm_type>', methods=['POST']) 

2077 @_require_shell_auth 

2078 def shell_app_set_permission(app_id, perm_type): 

2079 """Grant or revoke a permission for an app.""" 

2080 body = request.get_json(silent=True) or {} 

2081 granted = body.get('granted', True) 

2082 

2083 perms = _load_permissions() 

2084 if app_id not in perms: 

2085 perms[app_id] = {} 

2086 perms[app_id][perm_type] = { 

2087 'granted': granted, 

2088 'updated': time.time(), 

2089 } 

2090 _save_permissions(perms) 

2091 

2092 _audit_shell_op('app_permission', { 

2093 'app_id': app_id, 'type': perm_type, 'granted': granted, 

2094 }) 

2095 

2096 return jsonify({ 

2097 'updated': True, 'app_id': app_id, 

2098 'type': perm_type, 'granted': granted, 

2099 }) 

2100 

2101 @app.route('/api/apps/<app_id>/permissions/reset', methods=['POST']) 

2102 @_require_shell_auth 

2103 def shell_app_reset_permissions(app_id): 

2104 """Reset all permissions for an app to defaults.""" 

2105 perms = _load_permissions() 

2106 perms.pop(app_id, None) 

2107 _save_permissions(perms) 

2108 

2109 _audit_shell_op('app_permission_reset', {'app_id': app_id}) 

2110 return jsonify({'reset': True, 'app_id': app_id}) 

2111 

2112 # ─── File Tagging (xattr-based) ──────────────────────────── 

2113 

2114 _TAG_XATTR = 'user.hart.tags' 

2115 

2116 @app.route('/api/shell/files/tags', methods=['GET']) 

2117 def shell_file_tags(): 

2118 """Get tags for a file (via xattr).""" 

2119 path = request.args.get('path', '') 

2120 if not path or not os.path.exists(path): 

2121 return jsonify({'error': 'path required and must exist'}), 400 

2122 tags = [] 

2123 try: 

2124 import xattr 

2125 raw = xattr.getxattr(path, _TAG_XATTR) 

2126 tags = json.loads(raw.decode('utf-8')) 

2127 except Exception: 

2128 pass 

2129 return jsonify({'path': path, 'tags': tags}) 

2130 

2131 @app.route('/api/shell/files/tags', methods=['POST']) 

2132 @_require_shell_auth 

2133 def shell_file_set_tags(): 

2134 """Set tags on a file (via xattr).""" 

2135 body = request.get_json(silent=True) or {} 

2136 path = body.get('path', '') 

2137 tags = body.get('tags', []) 

2138 if not path or not os.path.exists(path): 

2139 return jsonify({'error': 'path required and must exist'}), 400 

2140 if not isinstance(tags, list): 

2141 return jsonify({'error': 'tags must be a list'}), 400 

2142 try: 

2143 import xattr 

2144 xattr.setxattr(path, _TAG_XATTR, json.dumps(tags).encode('utf-8')) 

2145 _audit_shell_op('file_tag', {'path': path, 'tags': tags}) 

2146 return jsonify({'tagged': True, 'path': path, 'tags': tags}) 

2147 except ImportError: 

2148 return jsonify({'error': 'xattr package not installed'}), 500 

2149 except Exception as e: 

2150 return jsonify({'error': str(e)}), 500 

2151 

2152 @app.route('/api/shell/files/search-by-tag', methods=['GET']) 

2153 def shell_file_search_by_tag(): 

2154 """Search files by tag in a directory.""" 

2155 tag = request.args.get('tag', '') 

2156 directory = request.args.get('dir', os.path.expanduser('~')) 

2157 if not tag: 

2158 return jsonify({'error': 'tag parameter required'}), 400 

2159 home = os.path.expanduser('~') 

2160 real_dir = os.path.realpath(directory) 

2161 if not real_dir.startswith(home) and not real_dir.startswith('/tmp'): 

2162 return jsonify({'error': 'directory must be under home'}), 403 

2163 matches = [] 

2164 try: 

2165 import xattr 

2166 for root, dirs, files in os.walk(real_dir): 

2167 # Limit depth to prevent runaway scans 

2168 depth = root[len(real_dir):].count(os.sep) 

2169 if depth > 3: 

2170 dirs.clear() 

2171 continue 

2172 for fname in files[:200]: 

2173 fp = os.path.join(root, fname) 

2174 try: 

2175 raw = xattr.getxattr(fp, _TAG_XATTR) 

2176 file_tags = json.loads(raw.decode('utf-8')) 

2177 if tag in file_tags: 

2178 matches.append({'path': fp, 'tags': file_tags}) 

2179 except Exception: 

2180 continue 

2181 if len(matches) >= 100: 

2182 break 

2183 except ImportError: 

2184 return jsonify({'error': 'xattr package not installed'}), 500 

2185 return jsonify({'tag': tag, 'matches': matches, 'count': len(matches)}) 

2186 

2187 # ─── Hotspot / Tethering ───────────────────────────────── 

2188 

2189 @app.route('/api/shell/hotspot/status', methods=['GET']) 

2190 def shell_hotspot_status(): 

2191 """Check if a hotspot is active.""" 

2192 active = None 

2193 try: 

2194 r = subprocess.run(['nmcli', '-t', '-f', 'NAME,TYPE,DEVICE', 

2195 'connection', 'show', '--active'], 

2196 capture_output=True, text=True, timeout=5) 

2197 if r and r.returncode == 0: 

2198 for line in r.stdout.strip().split('\n'): 

2199 parts = line.split(':') 

2200 if len(parts) >= 2 and 'wifi' in parts[1].lower(): 

2201 r2 = subprocess.run( 

2202 ['nmcli', '-t', '-f', '802-11-wireless.mode', 

2203 'connection', 'show', parts[0]], 

2204 capture_output=True, text=True, timeout=5) 

2205 if r2 and 'ap' in r2.stdout.lower(): 

2206 active = {'name': parts[0], 

2207 'device': parts[2] if len(parts) > 2 else ''} 

2208 except (FileNotFoundError, subprocess.TimeoutExpired): 

2209 pass 

2210 return jsonify({'active': active is not None, 'hotspot': active}) 

2211 

2212 @app.route('/api/shell/hotspot/start', methods=['POST']) 

2213 @_require_shell_auth 

2214 def shell_hotspot_start(): 

2215 """Create a WiFi hotspot.""" 

2216 body = request.get_json(silent=True) or {} 

2217 ssid = body.get('ssid', 'HART-Hotspot') 

2218 password = body.get('password', '') 

2219 band = body.get('band', 'bg') # bg or a 

2220 cmd = ['nmcli', 'dev', 'wifi', 'hotspot', 'ssid', ssid] 

2221 if password: 

2222 cmd += ['password', password] 

2223 if band == 'a': 

2224 cmd += ['band', 'a'] 

2225 try: 

2226 r = subprocess.run(cmd, capture_output=True, text=True, timeout=15) 

2227 if r and r.returncode == 0: 

2228 _audit_shell_op('hotspot_start', {'ssid': ssid}) 

2229 return jsonify({'started': True, 'ssid': ssid}) 

2230 return jsonify({'error': r.stderr.strip() if r else 'nmcli not available'}), 500 

2231 except (FileNotFoundError, subprocess.TimeoutExpired): 

2232 return jsonify({'error': 'nmcli not available'}), 500 

2233 

2234 @app.route('/api/shell/hotspot/stop', methods=['POST']) 

2235 @_require_shell_auth 

2236 def shell_hotspot_stop(): 

2237 """Stop the active hotspot.""" 

2238 try: 

2239 r = subprocess.run(['nmcli', 'connection', 'down', 'Hotspot'], 

2240 capture_output=True, text=True, timeout=10) 

2241 if r and r.returncode == 0: 

2242 _audit_shell_op('hotspot_stop', {}) 

2243 return jsonify({'stopped': True}) 

2244 return jsonify({'error': r.stderr.strip() if r else 'failed'}), 500 

2245 except (FileNotFoundError, subprocess.TimeoutExpired): 

2246 return jsonify({'error': 'nmcli not available'}), 500 

2247 

2248 # ─── Weather (wttr.in — no API key needed) ──────────────── 

2249 

2250 @app.route('/api/shell/weather', methods=['GET']) 

2251 def shell_weather(): 

2252 """Get current weather. Wraps wttr.in (open, no API key).""" 

2253 location = request.args.get('location', '') 

2254 try: 

2255 import urllib.request 

2256 url = f'https://wttr.in/{location}?format=j1' 

2257 req = urllib.request.Request(url, headers={'User-Agent': 'HARTOS/1.0'}) 

2258 with urllib.request.urlopen(req, timeout=10) as resp: 

2259 data = json.loads(resp.read().decode('utf-8')) 

2260 current = data.get('current_condition', [{}])[0] 

2261 return jsonify({ 

2262 'location': location or 'auto-detected', 

2263 'temp_c': current.get('temp_C'), 

2264 'temp_f': current.get('temp_F'), 

2265 'feels_like_c': current.get('FeelsLikeC'), 

2266 'humidity': current.get('humidity'), 

2267 'description': current.get('weatherDesc', [{}])[0].get('value', ''), 

2268 'wind_kmph': current.get('windspeedKmph'), 

2269 'wind_dir': current.get('winddir16Point'), 

2270 'uv_index': current.get('uvIndex'), 

2271 'visibility_km': current.get('visibility'), 

2272 'raw': current, 

2273 }) 

2274 except Exception as e: 

2275 return jsonify({'error': f'Weather unavailable: {e}'}), 503 

2276 

2277 # ─── App Auto-Update ────────────────────────────────────── 

2278 

2279 @app.route('/api/shell/auto-update/status', methods=['GET']) 

2280 def shell_auto_update_status(): 

2281 """Check auto-update configuration.""" 

2282 # Check if flatpak auto-update timer exists 

2283 flatpak_timer = False 

2284 try: 

2285 r = subprocess.run(['systemctl', '--user', 'is-active', 

2286 'flatpak-auto-update.timer'], 

2287 capture_output=True, text=True, timeout=5) 

2288 if r and r.returncode == 0: 

2289 flatpak_timer = True 

2290 except (FileNotFoundError, subprocess.TimeoutExpired): 

2291 pass 

2292 # Check NixOS auto-upgrade 

2293 nix_auto = False 

2294 try: 

2295 r2 = subprocess.run(['systemctl', 'is-active', 'nixos-upgrade.timer'], 

2296 capture_output=True, text=True, timeout=5) 

2297 if r2 and r2.returncode == 0: 

2298 nix_auto = True 

2299 except (FileNotFoundError, subprocess.TimeoutExpired): 

2300 pass 

2301 return jsonify({ 

2302 'flatpak_auto_update': flatpak_timer, 

2303 'nixos_auto_upgrade': nix_auto, 

2304 }) 

2305 

2306 @app.route('/api/shell/auto-update/run', methods=['POST']) 

2307 @_require_shell_auth 

2308 def shell_auto_update_run(): 

2309 """Trigger manual update check for installed apps.""" 

2310 body = request.get_json(silent=True) or {} 

2311 target = body.get('target', 'all') # flatpak, nix, all 

2312 results = {} 

2313 if target in ('flatpak', 'all'): 

2314 r = subprocess.run(['flatpak', 'update', '-y', '--noninteractive'], 

2315 capture_output=True, text=True, timeout=300) 

2316 results['flatpak'] = { 

2317 'success': r is not None and r.returncode == 0, 

2318 'output': (r.stdout[-500:] if r else '') 

2319 } 

2320 if target in ('nix', 'all'): 

2321 r = subprocess.run(['nix-channel', '--update'], 

2322 capture_output=True, text=True, timeout=120) 

2323 results['nix_channel'] = { 

2324 'success': r is not None and r.returncode == 0, 

2325 } 

2326 _audit_shell_op('auto_update_run', {'target': target}) 

2327 return jsonify({'results': results}) 

2328 

2329 # ─── Secure DNS (systemd-resolved DoT/DoH) ─────────────── 

2330 

2331 @app.route('/api/shell/dns/status', methods=['GET']) 

2332 def shell_dns_status(): 

2333 """Get current DNS configuration.""" 

2334 dns_info = {'servers': [], 'dnssec': False, 'dot': False} 

2335 try: 

2336 r = subprocess.run(['resolvectl', 'status'], 

2337 capture_output=True, text=True, timeout=5) 

2338 except (FileNotFoundError, subprocess.TimeoutExpired): 

2339 r = None 

2340 if r and r.returncode == 0: 

2341 for line in r.stdout.split('\n'): 

2342 line = line.strip() 

2343 if 'DNS Servers' in line: 

2344 dns_info['servers'] = line.split(':', 1)[-1].strip().split() 

2345 if 'DNSSEC' in line and 'yes' in line.lower(): 

2346 dns_info['dnssec'] = True 

2347 if 'DNS over TLS' in line and 'yes' in line.lower(): 

2348 dns_info['dot'] = True 

2349 return jsonify(dns_info) 

2350 

2351 @app.route('/api/shell/dns/set', methods=['POST']) 

2352 @_require_shell_auth 

2353 def shell_dns_set(): 

2354 """Set DNS provider with optional DoT. Providers: cloudflare, google, quad9, custom.""" 

2355 body = request.get_json(silent=True) or {} 

2356 provider = body.get('provider', 'cloudflare') 

2357 dot = body.get('dot', True) 

2358 providers = { 

2359 'cloudflare': ['1.1.1.1', '1.0.0.1'], 

2360 'google': ['8.8.8.8', '8.8.4.4'], 

2361 'quad9': ['9.9.9.9', '149.112.112.112'], 

2362 } 

2363 servers = body.get('servers') if provider == 'custom' else providers.get(provider) 

2364 if not servers: 

2365 return jsonify({'error': f'Unknown provider: {provider}'}), 400 

2366 

2367 # Set DNS via resolvectl 

2368 for s in servers: 

2369 subprocess.run(['resolvectl', 'dns', 'dns0', s], 

2370 capture_output=True, text=True, timeout=5) 

2371 if dot: 

2372 subprocess.run(['resolvectl', 'dnsovertls', 'dns0', 'yes'], 

2373 capture_output=True, text=True, timeout=5) 

2374 _audit_shell_op('dns_set', {'provider': provider, 'dot': dot}) 

2375 return jsonify({'set': True, 'provider': provider, 'servers': servers, 'dot': dot}) 

2376 

2377 # ─── SSO / LDAP (sssd + PAM) ───────────────────────────── 

2378 

2379 @app.route('/api/shell/sso/status', methods=['GET']) 

2380 def shell_sso_status(): 

2381 """Check SSO/LDAP integration status.""" 

2382 sssd_active = False 

2383 try: 

2384 r = subprocess.run(['systemctl', 'is-active', 'sssd'], 

2385 capture_output=True, text=True, timeout=5) 

2386 if r and r.returncode == 0 and 'active' in r.stdout: 

2387 sssd_active = True 

2388 except (FileNotFoundError, subprocess.TimeoutExpired): 

2389 pass 

2390 domain = None 

2391 if os.path.isfile('/etc/sssd/sssd.conf'): 

2392 try: 

2393 with open('/etc/sssd/sssd.conf') as f: 

2394 for line in f: 

2395 if line.strip().startswith('domains'): 

2396 domain = line.split('=', 1)[-1].strip() 

2397 break 

2398 except PermissionError: 

2399 pass 

2400 try: 

2401 r_which = subprocess.run(['which', 'sssd'], capture_output=True, timeout=3) 

2402 sssd_installed = r_which.returncode == 0 

2403 except (FileNotFoundError, subprocess.TimeoutExpired): 

2404 sssd_installed = False 

2405 return jsonify({ 

2406 'sssd_active': sssd_active, 

2407 'domain': domain, 

2408 'sssd_installed': sssd_installed, 

2409 }) 

2410 

2411 @app.route('/api/shell/sso/join', methods=['POST']) 

2412 @_require_shell_auth 

2413 def shell_sso_join(): 

2414 """Join an Active Directory / LDAP domain via realm.""" 

2415 body = request.get_json(silent=True) or {} 

2416 domain = body.get('domain', '') 

2417 username = body.get('username', '') 

2418 password = body.get('password', '') 

2419 if not domain or not username: 

2420 return jsonify({'error': 'domain and username required'}), 400 

2421 # realm join is the standard way to join AD/LDAP domains 

2422 cmd = ['realm', 'join', '--user', username, domain] 

2423 try: 

2424 r = subprocess.run(cmd, input=password + '\n', capture_output=True, 

2425 text=True, timeout=60) 

2426 if r.returncode == 0: 

2427 _audit_shell_op('sso_join', {'domain': domain}) 

2428 return jsonify({'joined': True, 'domain': domain}) 

2429 return jsonify({'error': r.stderr.strip()}), 500 

2430 except (FileNotFoundError, subprocess.TimeoutExpired) as e: 

2431 return jsonify({'error': f'realm not available: {e}'}), 500 

2432 

2433 @app.route('/api/shell/sso/leave', methods=['POST']) 

2434 @_require_shell_auth 

2435 def shell_sso_leave(): 

2436 """Leave an Active Directory / LDAP domain.""" 

2437 body = request.get_json(silent=True) or {} 

2438 domain = body.get('domain', '') 

2439 if not domain: 

2440 return jsonify({'error': 'domain required'}), 400 

2441 try: 

2442 r = subprocess.run(['realm', 'leave', domain], 

2443 capture_output=True, text=True, timeout=30) 

2444 if r.returncode == 0: 

2445 _audit_shell_op('sso_leave', {'domain': domain}) 

2446 return jsonify({'left': True, 'domain': domain}) 

2447 return jsonify({'error': r.stderr.strip()}), 500 

2448 except (FileNotFoundError, subprocess.TimeoutExpired) as e: 

2449 return jsonify({'error': f'realm not available: {e}'}), 500 

2450 

2451 @app.route('/api/shell/sso/test', methods=['POST']) 

2452 @_require_shell_auth 

2453 def shell_sso_test(): 

2454 """Test LDAP connection.""" 

2455 body = request.get_json(silent=True) or {} 

2456 uri = body.get('uri', '') 

2457 base_dn = body.get('base_dn', '') 

2458 if not uri: 

2459 return jsonify({'error': 'uri required'}), 400 

2460 try: 

2461 r = subprocess.run( 

2462 ['ldapsearch', '-x', '-H', uri, '-b', base_dn, 

2463 '-s', 'base', '(objectclass=*)'], 

2464 capture_output=True, text=True, timeout=10) 

2465 return jsonify({ 

2466 'reachable': r.returncode == 0, 

2467 'output': r.stdout[:500] if r.returncode == 0 else r.stderr[:500], 

2468 }) 

2469 except (FileNotFoundError, subprocess.TimeoutExpired): 

2470 return jsonify({'error': 'ldapsearch not available'}), 500 

2471 

2472 # ─── Email (Thunderbird wrapper) ────────────────────────── 

2473 

2474 @app.route('/api/shell/email/status', methods=['GET']) 

2475 def shell_email_status(): 

2476 """Check if Thunderbird is installed and running.""" 

2477 installed = False 

2478 running = False 

2479 r = subprocess.run(['which', 'thunderbird'], capture_output=True, 

2480 text=True, timeout=3) 

2481 if r and r.returncode == 0: 

2482 installed = True 

2483 r2 = subprocess.run(['pgrep', '-x', 'thunderbird'], capture_output=True, 

2484 text=True, timeout=3) 

2485 if r2 and r2.returncode == 0: 

2486 running = True 

2487 return jsonify({'installed': installed, 'running': running, 

2488 'client': 'thunderbird'}) 

2489 

2490 @app.route('/api/shell/email/launch', methods=['POST']) 

2491 @_require_shell_auth 

2492 def shell_email_launch(): 

2493 """Launch Thunderbird email client.""" 

2494 body = request.get_json(silent=True) or {} 

2495 compose_to = body.get('to', '') 

2496 try: 

2497 if compose_to: 

2498 subprocess.Popen(['thunderbird', '-compose', f'to={compose_to}'], 

2499 stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) 

2500 else: 

2501 subprocess.Popen(['thunderbird'], 

2502 stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) 

2503 return jsonify({'launched': True}) 

2504 except FileNotFoundError: 

2505 return jsonify({'error': 'thunderbird not installed'}), 404 

2506 

2507 logger.info("Registered shell OS API routes (extended)")