Coverage for integrations / agent_engine / shell_system_apis.py: 66.1%

1197 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Shell System APIs — System management for HART OS. 

3 

4Covers: task/process manager, storage manager, startup apps, 

5bluetooth management, print manager, media indexer. 

6 

7All routes registered via register_shell_system_routes(app). 

8""" 

9 

10import configparser 

11import json 

12import logging 

13import os 

14import signal 

15import subprocess 

16import threading 

17import time 

18 

19logger = logging.getLogger('hevolve.shell.system') 

20 

21# ─── Helpers ──────────────────────────────────────────────────── 

22 

23def _run(cmd, timeout=10, **kw): 

24 try: 

25 return subprocess.run(cmd, capture_output=True, text=True, 

26 timeout=timeout, **kw) 

27 except (FileNotFoundError, subprocess.TimeoutExpired): 

28 return None 

29 

30 

31def _load_json(path, default=None): 

32 try: 

33 with open(path) as f: 

34 return json.load(f) 

35 except (FileNotFoundError, json.JSONDecodeError): 

36 return default if default is not None else {} 

37 

38 

39def _save_json(path, data): 

40 os.makedirs(os.path.dirname(path), exist_ok=True) 

41 with open(path, 'w') as f: 

42 json.dump(data, f, indent=2) 

43 

44 

45# ─── Bluetooth discovered devices (in-memory) ────────────────── 

46 

47_bt_discovered = [] 

48_bt_lock = threading.Lock() 

49 

50# ─── Media index (in-memory cache) ───────────────────────────── 

51 

52_media_index = {'photos': [], 'music': [], 'videos': [], 

53 'last_scan': 0, 'scan_dirs': []} 

54_media_lock = threading.Lock() 

55 

56 

57# ═══════════════════════════════════════════════════════════════ 

58# Route registration 

59# ═══════════════════════════════════════════════════════════════ 

60 

61def _require_system_auth(f): 

62 """Decorator: require local shell auth for destructive system ops.""" 

63 from functools import wraps 

64 @wraps(f) 

65 def decorated(*args, **kwargs): 

66 from flask import request, jsonify 

67 remote = request.remote_addr or '' 

68 if remote not in ('127.0.0.1', '::1', 'localhost'): 

69 token = request.headers.get('X-Shell-Token', '') 

70 expected = os.environ.get('HART_SHELL_TOKEN', '') 

71 if not expected or token != expected: 

72 return jsonify({'error': 'Unauthorized'}), 403 

73 return f(*args, **kwargs) 

74 return decorated 

75 

76 

77def _audit_system_op(action, detail=None): 

78 """Log a system operation to the immutable audit log (best-effort).""" 

79 try: 

80 from security.immutable_audit_log import get_audit_log 

81 get_audit_log().log_event( 

82 'shell_ops', 'shell_system_api', action, 

83 detail=detail or {}) 

84 except Exception: 

85 pass 

86 

87 

88def register_shell_system_routes(app): 

89 """Register all system management API routes.""" 

90 from flask import jsonify, request 

91 

92 # ─── 10. Task / Process Manager ──────────────────────── 

93 

94 _PROTECTED_NAMES = {'init', 'systemd', 'hart-backend', 'hart-agent', 

95 'hart-liquid', 'sshd', 'dbus-daemon', 

96 'dockerd', 'containerd', 'kubelet', 'kube-apiserver', 

97 'kube-controller', 'kube-scheduler', 'etcd', 

98 'podman', 'crio', 'runc'} 

99 

100 @app.route('/api/shell/tasks/processes', methods=['GET']) 

101 def shell_tasks_processes(): 

102 search = request.args.get('search', '').lower() 

103 sort_by = request.args.get('sort', 'cpu') 

104 limit = int(request.args.get('limit', 100)) 

105 try: 

106 import psutil 

107 except ImportError: 

108 return jsonify({'processes': [], 'total': 0, 'error': 'psutil not available'}) 

109 procs = [] 

110 for p in psutil.process_iter(['pid', 'name', 'username', 'cpu_percent', 

111 'memory_percent', 'memory_info', 'status', 

112 'nice', 'num_threads', 'create_time', 'cmdline']): 

113 try: 

114 info = p.info 

115 if search and search not in (info.get('name') or '').lower() and \ 

116 search not in ' '.join(info.get('cmdline') or []).lower(): 

117 continue 

118 mem = info.get('memory_info') 

119 procs.append({ 

120 'pid': info['pid'], 

121 'name': info.get('name', ''), 

122 'username': info.get('username', ''), 

123 'cpu_percent': round(info.get('cpu_percent', 0), 1), 

124 'memory_percent': round(info.get('memory_percent', 0), 1), 

125 'memory_mb': round(mem.rss / 1048576, 1) if mem else 0, 

126 'status': info.get('status', ''), 

127 'nice': info.get('nice', 0), 

128 'threads': info.get('num_threads', 0), 

129 'create_time': info.get('create_time', 0), 

130 'cmdline': ' '.join(info.get('cmdline') or [])[:200], 

131 }) 

132 except (psutil.NoSuchProcess, psutil.AccessDenied): 

133 continue 

134 key = 'cpu_percent' if sort_by == 'cpu' else \ 

135 'memory_percent' if sort_by == 'memory' else \ 

136 'pid' if sort_by == 'pid' else 'cpu_percent' 

137 procs.sort(key=lambda p: p.get(key, 0), reverse=(sort_by != 'pid')) 

138 total = len(procs) 

139 return jsonify({'processes': procs[:limit], 'total': total, 'showing': min(limit, total)}) 

140 

141 @app.route('/api/shell/tasks/kill', methods=['POST']) 

142 def shell_tasks_kill(): 

143 data = request.get_json(force=True) 

144 pid = data.get('pid', 0) 

145 sig_name = data.get('signal', 'SIGTERM') 

146 if not pid or pid <= 0: 

147 return jsonify({'error': 'Valid pid required'}), 400 

148 if pid == 1: 

149 return jsonify({'error': 'Cannot kill PID 1 (init)'}), 403 

150 try: 

151 import psutil 

152 proc = psutil.Process(pid) 

153 if proc.name() in _PROTECTED_NAMES: 

154 return jsonify({'error': f'Cannot kill protected process: {proc.name()}'}), 403 

155 except Exception: 

156 pass 

157 sig = getattr(signal, sig_name, signal.SIGTERM) 

158 try: 

159 os.kill(pid, sig) 

160 return jsonify({'killed': True, 'pid': pid, 'signal': sig_name}) 

161 except ProcessLookupError: 

162 return jsonify({'error': 'Process not found'}), 404 

163 except PermissionError: 

164 return jsonify({'error': 'Permission denied'}), 403 

165 

166 @app.route('/api/shell/tasks/priority', methods=['POST']) 

167 def shell_tasks_priority(): 

168 data = request.get_json(force=True) 

169 pid = data.get('pid', 0) 

170 nice = data.get('nice', 0) 

171 if not pid: 

172 return jsonify({'error': 'pid required'}), 400 

173 try: 

174 import psutil 

175 p = psutil.Process(pid) 

176 p.nice(nice) 

177 return jsonify({'set': True, 'pid': pid, 'nice': nice}) 

178 except ImportError: 

179 return jsonify({'error': 'psutil not available'}), 500 

180 except (psutil.NoSuchProcess, psutil.AccessDenied) as e: 

181 return jsonify({'error': str(e)}), 400 

182 

183 @app.route('/api/shell/tasks/resources', methods=['GET']) 

184 def shell_tasks_resources(): 

185 res = {'cpu': {}, 'ram': {}, 'gpu': None, 'disk_io': {}, 'network_io': {}} 

186 try: 

187 import psutil 

188 cpu_freq = psutil.cpu_freq() 

189 res['cpu'] = { 

190 'percent': psutil.cpu_percent(interval=0.1), 

191 'count': psutil.cpu_count(), 

192 'freq_mhz': round(cpu_freq.current) if cpu_freq else 0, 

193 'per_cpu': psutil.cpu_percent(percpu=True), 

194 } 

195 mem = psutil.virtual_memory() 

196 swap = psutil.swap_memory() 

197 res['ram'] = { 

198 'total_gb': round(mem.total / 1073741824, 1), 

199 'used_gb': round(mem.used / 1073741824, 1), 

200 'percent': mem.percent, 

201 'swap_total_gb': round(swap.total / 1073741824, 1), 

202 'swap_used_gb': round(swap.used / 1073741824, 1), 

203 } 

204 dio = psutil.disk_io_counters() 

205 if dio: 

206 res['disk_io'] = { 

207 'read_bytes': dio.read_bytes, 

208 'write_bytes': dio.write_bytes, 

209 } 

210 nio = psutil.net_io_counters() 

211 if nio: 

212 res['network_io'] = { 

213 'bytes_sent': nio.bytes_sent, 

214 'bytes_recv': nio.bytes_recv, 

215 } 

216 except ImportError: 

217 pass 

218 try: 

219 from integrations.service_tools.vram_manager import detect_gpu 

220 gpu = detect_gpu() 

221 if gpu: 

222 res['gpu'] = { 

223 'name': gpu.get('name', ''), 

224 'memory_gb': round(gpu.get('vram_mb', 0) / 1024, 1), 

225 'utilization': gpu.get('utilization', 0), 

226 'temperature': gpu.get('temperature', 0), 

227 } 

228 except (ImportError, Exception): 

229 pass 

230 return jsonify(res) 

231 

232 # ─── 11. Storage Manager ─────────────────────────────── 

233 

234 @app.route('/api/shell/storage', methods=['GET']) 

235 def shell_storage(): 

236 try: 

237 import psutil 

238 except ImportError: 

239 return jsonify({'partitions': [], 'error': 'psutil not available'}) 

240 partitions = [] 

241 for part in psutil.disk_partitions(all=False): 

242 try: 

243 usage = psutil.disk_usage(part.mountpoint) 

244 partitions.append({ 

245 'device': part.device, 

246 'mount': part.mountpoint, 

247 'fstype': part.fstype, 

248 'total_gb': round(usage.total / 1073741824, 1), 

249 'used_gb': round(usage.used / 1073741824, 1), 

250 'free_gb': round(usage.free / 1073741824, 1), 

251 'percent': usage.percent, 

252 }) 

253 except (PermissionError, OSError): 

254 pass 

255 total = sum(p['total_gb'] for p in partitions) 

256 used = sum(p['used_gb'] for p in partitions) 

257 return jsonify({ 

258 'partitions': partitions, 

259 'total_gb': round(total, 1), 

260 'used_gb': round(used, 1), 

261 'overall_percent': round(used / total * 100, 1) if total > 0 else 0, 

262 }) 

263 

264 @app.route('/api/shell/storage/usage', methods=['GET']) 

265 def shell_storage_usage(): 

266 path = request.args.get('path', os.path.expanduser('~')) 

267 if not os.path.isdir(path): 

268 return jsonify({'error': 'Valid directory path required'}), 400 

269 children = [] 

270 try: 

271 for entry in os.scandir(path): 

272 try: 

273 if entry.is_dir(follow_symlinks=False): 

274 r = _run(['du', '-sm', entry.path], timeout=5) 

275 size_mb = int(r.stdout.split()[0]) if r and r.returncode == 0 else 0 

276 else: 

277 size_mb = round(entry.stat().st_size / 1048576, 1) 

278 children.append({ 

279 'name': entry.name, 'path': entry.path, 

280 'size_mb': size_mb, 'is_dir': entry.is_dir(), 

281 }) 

282 except (OSError, ValueError): 

283 pass 

284 except PermissionError: 

285 return jsonify({'error': 'Permission denied'}), 403 

286 children.sort(key=lambda c: c['size_mb'], reverse=True) 

287 total = sum(c['size_mb'] for c in children) 

288 return jsonify({'path': path, 'total_size_mb': round(total, 1), 

289 'children': children[:100]}) 

290 

291 @app.route('/api/shell/storage/cleanup', methods=['GET']) 

292 def shell_storage_cleanup(): 

293 home = os.path.expanduser('~') 

294 reclaimable = [] 

295 for cat, path, desc in [ 

296 ('cache', os.path.join(home, '.cache'), 'Application caches'), 

297 ('temp', '/tmp', 'Temporary files'), 

298 ('trash', os.path.join(home, '.local/share/Trash'), 'Trash bin'), 

299 ('journal', '/var/log/journal', 'System journal logs'), 

300 ]: 

301 if os.path.isdir(path): 

302 r = _run(['du', '-sm', path], timeout=10) 

303 size = int(r.stdout.split()[0]) if r and r.returncode == 0 else 0 

304 reclaimable.append({ 

305 'category': cat, 'path': path, 

306 'size_mb': size, 'description': desc, 

307 }) 

308 r = _run(['nix-store', '--gc', '--print-dead'], timeout=15) 

309 if r and r.returncode == 0: 

310 dead_lines = r.stdout.strip().split('\n') 

311 reclaimable.append({ 

312 'category': 'nix_old', 'path': '/nix/store', 

313 'size_mb': len(dead_lines) * 10, 

314 'description': f'Old Nix generations (~{len(dead_lines)} store paths)', 

315 }) 

316 total = sum(r['size_mb'] for r in reclaimable) 

317 return jsonify({'reclaimable': reclaimable, 'total_reclaimable_mb': total}) 

318 

319 @app.route('/api/shell/storage/clean', methods=['POST']) 

320 def shell_storage_clean(): 

321 data = request.get_json(force=True) 

322 categories = data.get('categories', []) 

323 if not categories: 

324 return jsonify({'error': 'categories required'}), 400 

325 home = os.path.expanduser('~') 

326 freed = {} 

327 for cat in categories: 

328 if cat == 'cache': 

329 cache_dir = os.path.join(home, '.cache') 

330 r = _run(['du', '-sm', cache_dir], timeout=5) 

331 size = int(r.stdout.split()[0]) if r and r.returncode == 0 else 0 

332 _run(['find', cache_dir, '-type', 'f', '-atime', '+7', '-delete'], timeout=30) 

333 freed['cache'] = size 

334 elif cat == 'temp': 

335 r = _run(['du', '-sm', '/tmp'], timeout=5) 

336 size = int(r.stdout.split()[0]) if r and r.returncode == 0 else 0 

337 _run(['find', '/tmp', '-user', os.environ.get('USER', 'hart'), 

338 '-type', 'f', '-mtime', '+1', '-delete'], timeout=30) 

339 freed['temp'] = size 

340 elif cat == 'trash': 

341 trash = os.path.join(home, '.local/share/Trash') 

342 r = _run(['du', '-sm', trash], timeout=5) 

343 size = int(r.stdout.split()[0]) if r and r.returncode == 0 else 0 

344 _run(['gio', 'trash', '--empty'], timeout=15) 

345 freed['trash'] = size 

346 elif cat == 'nix_old': 

347 _run(['nix-collect-garbage', '-d'], timeout=120) 

348 freed['nix_old'] = 0 

349 elif cat == 'journal': 

350 _run(['journalctl', '--vacuum-time=7d'], timeout=30) 

351 freed['journal'] = 0 

352 total = sum(freed.values()) 

353 return jsonify({'cleaned': True, 'freed_mb': total, 'details': freed}) 

354 

355 @app.route('/api/shell/storage/smart', methods=['GET']) 

356 def shell_storage_smart(): 

357 device = request.args.get('device', '') 

358 if not device: 

359 return jsonify({'error': 'device required (e.g. /dev/nvme0n1)'}), 400 

360 r = _run(['smartctl', '-j', '-a', device], timeout=15) 

361 if not r or r.returncode not in (0, 4): # 4 = some attributes failed 

362 return jsonify({'error': 'smartctl not available or device not found'}), 500 

363 try: 

364 data = json.loads(r.stdout) 

365 health = data.get('smart_status', {}).get('passed', True) 

366 temp = data.get('temperature', {}).get('current', 0) 

367 poh = data.get('power_on_time', {}).get('hours', 0) 

368 return jsonify({ 

369 'device': device, 'healthy': health, 

370 'temperature_c': temp, 'power_on_hours': poh, 

371 'model': data.get('model_name', ''), 

372 'serial': data.get('serial_number', ''), 

373 'firmware': data.get('firmware_version', ''), 

374 }) 

375 except (json.JSONDecodeError, KeyError): 

376 return jsonify({'error': 'Failed to parse smartctl output'}), 500 

377 

378 # ─── 12. Startup Apps Manager ────────────────────────── 

379 

380 def _parse_desktop_file(path): 

381 cp = configparser.ConfigParser(interpolation=None) 

382 cp.read(path, encoding='utf-8') 

383 if not cp.has_section('Desktop Entry'): 

384 return None 

385 entry = cp['Desktop Entry'] 

386 hidden = entry.get('Hidden', 'false').lower() == 'true' 

387 enabled_key = entry.get('X-GNOME-Autostart-enabled', 'true') 

388 enabled = enabled_key.lower() != 'false' and not hidden 

389 return { 

390 'name': entry.get('Name', os.path.basename(path)), 

391 'exec': entry.get('Exec', ''), 

392 'icon': entry.get('Icon', ''), 

393 'comment': entry.get('Comment', ''), 

394 'enabled': enabled, 

395 'file': path, 

396 'system': path.startswith('/etc/') or path.startswith('/run/'), 

397 } 

398 

399 @app.route('/api/shell/startup', methods=['GET']) 

400 def shell_startup(): 

401 entries = [] 

402 dirs = ['/etc/xdg/autostart', os.path.expanduser('~/.config/autostart')] 

403 for d in dirs: 

404 if not os.path.isdir(d): 

405 continue 

406 for f in sorted(os.listdir(d)): 

407 if not f.endswith('.desktop'): 

408 continue 

409 info = _parse_desktop_file(os.path.join(d, f)) 

410 if info: 

411 entries.append(info) 

412 return jsonify({'entries': entries, 'count': len(entries)}) 

413 

414 @app.route('/api/shell/startup/toggle', methods=['POST']) 

415 def shell_startup_toggle(): 

416 data = request.get_json(force=True) 

417 filepath = data.get('file', '') 

418 enabled = data.get('enabled', True) 

419 if not filepath: 

420 return jsonify({'error': 'file required'}), 400 

421 filepath = os.path.expanduser(filepath) 

422 if not os.path.isfile(filepath): 

423 return jsonify({'error': 'File not found'}), 404 

424 if filepath.startswith('/etc/') or filepath.startswith('/run/'): 

425 user_dir = os.path.expanduser('~/.config/autostart') 

426 os.makedirs(user_dir, exist_ok=True) 

427 user_copy = os.path.join(user_dir, os.path.basename(filepath)) 

428 if not os.path.exists(user_copy): 

429 import shutil 

430 shutil.copy2(filepath, user_copy) 

431 filepath = user_copy 

432 cp = configparser.ConfigParser(interpolation=None) 

433 cp.read(filepath, encoding='utf-8') 

434 if not cp.has_section('Desktop Entry'): 

435 cp.add_section('Desktop Entry') 

436 cp.set('Desktop Entry', 'Hidden', 'false' if enabled else 'true') 

437 cp.set('Desktop Entry', 'X-GNOME-Autostart-enabled', str(enabled).lower()) 

438 with open(filepath, 'w') as f: 

439 cp.write(f) 

440 return jsonify({'toggled': True, 'file': filepath, 'enabled': enabled}) 

441 

442 @app.route('/api/shell/startup/add', methods=['POST']) 

443 def shell_startup_add(): 

444 data = request.get_json(force=True) 

445 name = data.get('name', '') 

446 exec_cmd = data.get('exec', '') 

447 if not name or not exec_cmd: 

448 return jsonify({'error': 'name and exec required'}), 400 

449 user_dir = os.path.expanduser('~/.config/autostart') 

450 os.makedirs(user_dir, exist_ok=True) 

451 safe_name = name.lower().replace(' ', '-') 

452 filepath = os.path.join(user_dir, f'{safe_name}.desktop') 

453 content = f"""[Desktop Entry] 

454Type=Application 

455Name={name} 

456Exec={exec_cmd} 

457Comment={data.get('comment', '')} 

458X-GNOME-Autostart-enabled=true 

459Hidden=false 

460""" 

461 with open(filepath, 'w') as f: 

462 f.write(content) 

463 return jsonify({'added': True, 'file': filepath, 'name': name}) 

464 

465 @app.route('/api/shell/startup/remove', methods=['POST']) 

466 def shell_startup_remove(): 

467 data = request.get_json(force=True) 

468 filepath = data.get('file', '') 

469 if not filepath: 

470 return jsonify({'error': 'file required'}), 400 

471 filepath = os.path.expanduser(filepath) 

472 if filepath.startswith('/etc/') or filepath.startswith('/run/'): 

473 return jsonify({'error': 'Cannot remove system startup entries'}), 403 

474 if os.path.isfile(filepath): 

475 os.remove(filepath) 

476 return jsonify({'removed': True, 'file': filepath}) 

477 return jsonify({'error': 'File not found'}), 404 

478 

479 # ─── 13. Bluetooth Full Management ───────────────────── 

480 

481 def _bt_run(cmd_str, timeout=5): 

482 return _run(['bluetoothctl'] + cmd_str.split(), timeout=timeout) 

483 

484 @app.route('/api/shell/bluetooth/status', methods=['GET']) 

485 def shell_bt_status(): 

486 info = {'powered': False, 'discoverable': False, 'pairable': False, 

487 'controller': {}, 'devices': []} 

488 r = _bt_run('show') 

489 if r and r.returncode == 0: 

490 for line in r.stdout.split('\n'): 

491 line = line.strip() 

492 if line.startswith('Controller'): 

493 parts = line.split() 

494 info['controller'] = {'address': parts[1] if len(parts) > 1 else ''} 

495 elif 'Powered:' in line: 

496 info['powered'] = 'yes' in line.lower() 

497 elif 'Discoverable:' in line: 

498 info['discoverable'] = 'yes' in line.lower() 

499 elif 'Pairable:' in line: 

500 info['pairable'] = 'yes' in line.lower() 

501 elif 'Name:' in line and not info['controller'].get('name'): 

502 info['controller']['name'] = line.split(':', 1)[1].strip() 

503 r2 = _bt_run('devices') 

504 if r2 and r2.returncode == 0: 

505 for line in r2.stdout.strip().split('\n'): 

506 parts = line.strip().split() 

507 if len(parts) >= 3 and parts[0] == 'Device': 

508 mac = parts[1] 

509 name = ' '.join(parts[2:]) 

510 dev = {'mac': mac, 'name': name, 'paired': True} 

511 r3 = _bt_run(f'info {mac}') 

512 if r3 and r3.returncode == 0: 

513 for dline in r3.stdout.split('\n'): 

514 dline = dline.strip() 

515 if 'Connected:' in dline: 

516 dev['connected'] = 'yes' in dline.lower() 

517 elif 'Trusted:' in dline: 

518 dev['trusted'] = 'yes' in dline.lower() 

519 elif 'Icon:' in dline: 

520 dev['icon'] = dline.split(':', 1)[1].strip() 

521 info['devices'].append(dev) 

522 return jsonify(info) 

523 

524 @app.route('/api/shell/bluetooth/scan', methods=['POST']) 

525 def shell_bt_scan(): 

526 data = request.get_json(force=True) 

527 duration = data.get('duration', 10) 

528 with _bt_lock: 

529 _bt_discovered.clear() 

530 

531 def _do_scan(): 

532 r = _run(['bluetoothctl', '--timeout', str(duration), 'scan', 'on'], 

533 timeout=duration + 5) 

534 if r and r.returncode == 0: 

535 with _bt_lock: 

536 for line in r.stdout.split('\n'): 

537 if 'NEW' in line and 'Device' in line: 

538 parts = line.strip().split() 

539 for i, p in enumerate(parts): 

540 if ':' in p and len(p) == 17: 

541 mac = p 

542 name = ' '.join(parts[i + 1:]) 

543 _bt_discovered.append({'mac': mac, 'name': name}) 

544 break 

545 

546 threading.Thread(target=_do_scan, daemon=True).start() 

547 return jsonify({'scanning': True, 'duration': duration}) 

548 

549 @app.route('/api/shell/bluetooth/discovered', methods=['GET']) 

550 def shell_bt_discovered(): 

551 with _bt_lock: 

552 devices = list(_bt_discovered) 

553 return jsonify({'devices': devices, 'count': len(devices)}) 

554 

555 @app.route('/api/shell/bluetooth/pair', methods=['POST']) 

556 def shell_bt_pair(): 

557 data = request.get_json(force=True) 

558 mac = data.get('mac', '') 

559 if not mac: 

560 return jsonify({'error': 'mac required'}), 400 

561 r = _bt_run(f'pair {mac}', timeout=15) 

562 ok = r and r.returncode == 0 

563 return jsonify({'paired': ok, 'mac': mac, 

564 'error': '' if ok else (r.stderr.strip() if r else 'bluetoothctl not available')}) 

565 

566 @app.route('/api/shell/bluetooth/connect', methods=['POST']) 

567 def shell_bt_connect(): 

568 data = request.get_json(force=True) 

569 mac = data.get('mac', '') 

570 if not mac: 

571 return jsonify({'error': 'mac required'}), 400 

572 r = _bt_run(f'connect {mac}', timeout=15) 

573 ok = r and r.returncode == 0 

574 return jsonify({'connected': ok, 'mac': mac}) 

575 

576 @app.route('/api/shell/bluetooth/disconnect', methods=['POST']) 

577 def shell_bt_disconnect(): 

578 data = request.get_json(force=True) 

579 mac = data.get('mac', '') 

580 if not mac: 

581 return jsonify({'error': 'mac required'}), 400 

582 r = _bt_run(f'disconnect {mac}') 

583 ok = r and r.returncode == 0 

584 return jsonify({'disconnected': ok, 'mac': mac}) 

585 

586 @app.route('/api/shell/bluetooth/trust', methods=['POST']) 

587 def shell_bt_trust(): 

588 data = request.get_json(force=True) 

589 mac = data.get('mac', '') 

590 trusted = data.get('trusted', True) 

591 if not mac: 

592 return jsonify({'error': 'mac required'}), 400 

593 cmd = 'trust' if trusted else 'untrust' 

594 r = _bt_run(f'{cmd} {mac}') 

595 ok = r and r.returncode == 0 

596 return jsonify({'trusted': trusted if ok else not trusted, 'mac': mac}) 

597 

598 @app.route('/api/shell/bluetooth/remove', methods=['POST']) 

599 def shell_bt_remove(): 

600 data = request.get_json(force=True) 

601 mac = data.get('mac', '') 

602 if not mac: 

603 return jsonify({'error': 'mac required'}), 400 

604 r = _bt_run(f'remove {mac}') 

605 ok = r and r.returncode == 0 

606 return jsonify({'removed': ok, 'mac': mac}) 

607 

608 @app.route('/api/shell/bluetooth/power', methods=['POST']) 

609 def shell_bt_power(): 

610 data = request.get_json(force=True) 

611 powered = data.get('powered', True) 

612 val = 'on' if powered else 'off' 

613 r = _bt_run(f'power {val}') 

614 ok = r and r.returncode == 0 

615 return jsonify({'powered': powered if ok else not powered}) 

616 

617 # ─── 14. Print Manager (CUPS) ────────────────────────── 

618 

619 @app.route('/api/shell/printers', methods=['GET']) 

620 def shell_printers(): 

621 printers = [] 

622 cups_running = False 

623 r = _run(['lpstat', '-p', '-d']) 

624 if r and r.returncode == 0: 

625 cups_running = True 

626 default_printer = '' 

627 for line in r.stdout.strip().split('\n'): 

628 if line.startswith('printer'): 

629 parts = line.split() 

630 if len(parts) >= 2: 

631 name = parts[1] 

632 state = 'idle' if 'idle' in line.lower() else \ 

633 'printing' if 'printing' in line.lower() else 'disabled' 

634 printers.append({ 

635 'name': name, 'state': state, 

636 'accepting': 'disabled' not in line.lower(), 

637 'default': False, 

638 }) 

639 elif 'system default destination' in line.lower(): 

640 default_printer = line.split(':')[-1].strip() 

641 for p in printers: 

642 if p['name'] == default_printer: 

643 p['default'] = True 

644 r2 = _run(['lpstat', '-v']) 

645 if r2 and r2.returncode == 0: 

646 for line in r2.stdout.strip().split('\n'): 

647 if 'device for' in line.lower(): 

648 parts = line.split(':', 1) 

649 if len(parts) == 2: 

650 pname = parts[0].split()[-1] 

651 uri = parts[1].strip() 

652 for p in printers: 

653 if p['name'] == pname: 

654 p['uri'] = uri 

655 return jsonify({ 

656 'printers': printers, 

657 'default': next((p['name'] for p in printers if p.get('default')), ''), 

658 'cups_running': cups_running, 

659 }) 

660 

661 @app.route('/api/shell/printers/jobs', methods=['GET']) 

662 def shell_printer_jobs(): 

663 printer = request.args.get('printer', '') 

664 cmd = ['lpstat', '-W', 'all'] 

665 if printer: 

666 cmd.extend(['-p', printer]) 

667 r = _run(cmd) 

668 jobs = [] 

669 if r and r.returncode == 0: 

670 for line in r.stdout.strip().split('\n'): 

671 if not line.strip(): 

672 continue 

673 parts = line.split() 

674 if len(parts) >= 4: 

675 jobs.append({ 

676 'id': parts[0], 

677 'user': parts[1] if len(parts) > 1 else '', 

678 'size': parts[2] if len(parts) > 2 else '', 

679 'state': 'pending', 

680 }) 

681 return jsonify({'jobs': jobs, 'count': len(jobs)}) 

682 

683 @app.route('/api/shell/printers/add', methods=['POST']) 

684 def shell_printer_add(): 

685 data = request.get_json(force=True) 

686 uri = data.get('uri', '') 

687 name = data.get('name', '') 

688 driver = data.get('driver', 'everywhere') 

689 if not uri or not name: 

690 return jsonify({'error': 'uri and name required'}), 400 

691 r = _run(['lpadmin', '-p', name, '-E', '-v', uri, '-m', driver], timeout=30) 

692 ok = r and r.returncode == 0 

693 return jsonify({'added': ok, 'name': name, 

694 'error': r.stderr.strip() if r and not ok else ''}) 

695 

696 @app.route('/api/shell/printers/remove', methods=['POST']) 

697 def shell_printer_remove(): 

698 data = request.get_json(force=True) 

699 name = data.get('name', '') 

700 if not name: 

701 return jsonify({'error': 'name required'}), 400 

702 r = _run(['lpadmin', '-x', name]) 

703 ok = r and r.returncode == 0 

704 return jsonify({'removed': ok, 'name': name}) 

705 

706 @app.route('/api/shell/printers/set-default', methods=['POST']) 

707 def shell_printer_set_default(): 

708 data = request.get_json(force=True) 

709 name = data.get('name', '') 

710 if not name: 

711 return jsonify({'error': 'name required'}), 400 

712 r = _run(['lpoptions', '-d', name]) 

713 ok = r and r.returncode == 0 

714 return jsonify({'set': ok, 'default': name}) 

715 

716 @app.route('/api/shell/printers/test', methods=['POST']) 

717 def shell_printer_test(): 

718 data = request.get_json(force=True) 

719 name = data.get('name', '') 

720 if not name: 

721 return jsonify({'error': 'name required'}), 400 

722 test_file = '/usr/share/cups/data/testprint.ps' 

723 if not os.path.isfile(test_file): 

724 test_file = '/dev/null' 

725 r = _run(['lp', '-d', name, test_file]) 

726 ok = r and r.returncode == 0 

727 return jsonify({'printed': ok, 'printer': name}) 

728 

729 @app.route('/api/shell/printers/cancel', methods=['POST']) 

730 def shell_printer_cancel(): 

731 data = request.get_json(force=True) 

732 job_id = data.get('job_id', '') 

733 if not job_id: 

734 return jsonify({'error': 'job_id required'}), 400 

735 r = _run(['cancel', str(job_id)]) 

736 ok = r and r.returncode == 0 

737 return jsonify({'cancelled': ok, 'job_id': job_id}) 

738 

739 # ─── 15. Media Indexer ───────────────────────────────── 

740 

741 @app.route('/api/shell/media/status', methods=['GET']) 

742 def shell_media_status(): 

743 with _media_lock: 

744 return jsonify({ 

745 'indexed': _media_index['last_scan'] > 0, 

746 'last_scan': _media_index['last_scan'], 

747 'counts': { 

748 'photos': len(_media_index['photos']), 

749 'music': len(_media_index['music']), 

750 'videos': len(_media_index['videos']), 

751 }, 

752 'scan_directories': _media_index['scan_dirs'], 

753 }) 

754 

755 @app.route('/api/shell/media/scan', methods=['POST']) 

756 def shell_media_scan(): 

757 data = request.get_json(force=True) 

758 directories = data.get('directories', []) 

759 if not directories: 

760 home = os.path.expanduser('~') 

761 directories = [ 

762 os.path.join(home, 'Pictures'), 

763 os.path.join(home, 'Videos'), 

764 os.path.join(home, 'Music'), 

765 ] 

766 

767 def _do_scan(): 

768 photos, music, videos = [], [], [] 

769 photo_exts = {'.jpg', '.jpeg', '.png', '.gif', '.heic', '.heif', 

770 '.raw', '.cr2', '.nef', '.webp', '.bmp', '.tiff'} 

771 music_exts = {'.mp3', '.flac', '.ogg', '.opus', '.m4a', '.wav', 

772 '.aac', '.wma', '.alac'} 

773 video_exts = {'.mp4', '.mkv', '.avi', '.mov', '.wmv', '.webm', 

774 '.flv', '.m4v', '.ts'} 

775 for directory in directories: 

776 if not os.path.isdir(directory): 

777 continue 

778 for root, dirs, files in os.walk(directory): 

779 dirs[:] = [d for d in dirs if not d.startswith('.')] 

780 for fname in files: 

781 ext = os.path.splitext(fname)[1].lower() 

782 fpath = os.path.join(root, fname) 

783 try: 

784 stat = os.stat(fpath) 

785 entry = { 

786 'path': fpath, 'name': fname, 

787 'size': stat.st_size, 

788 'modified': stat.st_mtime, 

789 } 

790 except OSError: 

791 continue 

792 if ext in photo_exts: 

793 r = _run(['exiftool', '-json', '-DateTimeOriginal', 

794 '-ImageWidth', '-ImageHeight', '-Model', fpath], 

795 timeout=5) 

796 if r and r.returncode == 0: 

797 try: 

798 meta = json.loads(r.stdout) 

799 if meta: 

800 entry.update({ 

801 'date_taken': meta[0].get('DateTimeOriginal', ''), 

802 'width': meta[0].get('ImageWidth', 0), 

803 'height': meta[0].get('ImageHeight', 0), 

804 'camera': meta[0].get('Model', ''), 

805 }) 

806 except (json.JSONDecodeError, IndexError): 

807 pass 

808 photos.append(entry) 

809 elif ext in music_exts: 

810 r = _run(['ffprobe', '-v', 'quiet', '-print_format', 'json', 

811 '-show_format', fpath], timeout=5) 

812 if r and r.returncode == 0: 

813 try: 

814 meta = json.loads(r.stdout) 

815 fmt = meta.get('format', {}) 

816 tags = fmt.get('tags', {}) 

817 entry.update({ 

818 'title': tags.get('title', fname), 

819 'artist': tags.get('artist', ''), 

820 'album': tags.get('album', ''), 

821 'duration': float(fmt.get('duration', 0)), 

822 'year': tags.get('date', '')[:4], 

823 }) 

824 except (json.JSONDecodeError, ValueError): 

825 pass 

826 music.append(entry) 

827 elif ext in video_exts: 

828 r = _run(['ffprobe', '-v', 'quiet', '-print_format', 'json', 

829 '-show_format', '-show_streams', fpath], timeout=5) 

830 if r and r.returncode == 0: 

831 try: 

832 meta = json.loads(r.stdout) 

833 fmt = meta.get('format', {}) 

834 vid_stream = next( 

835 (s for s in meta.get('streams', []) 

836 if s.get('codec_type') == 'video'), {}) 

837 entry.update({ 

838 'duration': float(fmt.get('duration', 0)), 

839 'resolution': f"{vid_stream.get('width', 0)}x{vid_stream.get('height', 0)}", 

840 'codec': vid_stream.get('codec_name', ''), 

841 }) 

842 except (json.JSONDecodeError, ValueError): 

843 pass 

844 videos.append(entry) 

845 

846 with _media_lock: 

847 _media_index['photos'] = photos 

848 _media_index['music'] = music 

849 _media_index['videos'] = videos 

850 _media_index['last_scan'] = time.time() 

851 _media_index['scan_dirs'] = directories 

852 

853 threading.Thread(target=_do_scan, daemon=True).start() 

854 return jsonify({'scanning': True, 'directories': directories}) 

855 

856 @app.route('/api/shell/media/photos', methods=['GET']) 

857 def shell_media_photos(): 

858 page = int(request.args.get('page', 1)) 

859 per_page = int(request.args.get('per_page', 50)) 

860 sort = request.args.get('sort', 'date') 

861 with _media_lock: 

862 photos = list(_media_index['photos']) 

863 if sort == 'date': 

864 photos.sort(key=lambda p: p.get('modified', 0), reverse=True) 

865 elif sort == 'name': 

866 photos.sort(key=lambda p: p.get('name', '')) 

867 elif sort == 'size': 

868 photos.sort(key=lambda p: p.get('size', 0), reverse=True) 

869 start = (page - 1) * per_page 

870 return jsonify({ 

871 'photos': photos[start:start + per_page], 

872 'total': len(photos), 'page': page, 

873 }) 

874 

875 @app.route('/api/shell/media/music', methods=['GET']) 

876 def shell_media_music(): 

877 page = int(request.args.get('page', 1)) 

878 per_page = int(request.args.get('per_page', 50)) 

879 artist = request.args.get('artist', '').lower() 

880 album = request.args.get('album', '').lower() 

881 with _media_lock: 

882 tracks = list(_media_index['music']) 

883 if artist: 

884 tracks = [t for t in tracks if artist in t.get('artist', '').lower()] 

885 if album: 

886 tracks = [t for t in tracks if album in t.get('album', '').lower()] 

887 start = (page - 1) * per_page 

888 return jsonify({ 

889 'tracks': tracks[start:start + per_page], 

890 'total': len(tracks), 'page': page, 

891 }) 

892 

893 @app.route('/api/shell/media/videos', methods=['GET']) 

894 def shell_media_videos(): 

895 page = int(request.args.get('page', 1)) 

896 per_page = int(request.args.get('per_page', 50)) 

897 with _media_lock: 

898 videos = list(_media_index['videos']) 

899 videos.sort(key=lambda v: v.get('modified', 0), reverse=True) 

900 start = (page - 1) * per_page 

901 return jsonify({ 

902 'videos': videos[start:start + per_page], 

903 'total': len(videos), 'page': page, 

904 }) 

905 

906 # ─── 15b. Media Player Controls ───────────────────────────── 

907 

908 _player_proc = {'pid': None, 'path': None, 'engine': None} 

909 _player_lock = threading.Lock() 

910 

911 @app.route('/api/shell/media/play', methods=['POST']) 

912 @_require_system_auth 

913 def shell_media_play(): 

914 """Play a media file using mpv (background process).""" 

915 body = request.get_json(silent=True) or {} 

916 path = body.get('path') 

917 if not path: 

918 return jsonify({'error': 'path required'}), 400 

919 if not os.path.isfile(path): 

920 return jsonify({'error': 'File not found'}), 404 

921 

922 # Path safety: only allow files under user home or /tmp 

923 home = os.path.expanduser('~') 

924 real = os.path.realpath(path) 

925 import tempfile 

926 allowed = [os.path.realpath(home), os.path.realpath(tempfile.gettempdir())] 

927 if not any(real.startswith(a) for a in allowed): 

928 return jsonify({'error': 'Path outside allowed roots'}), 403 

929 

930 # Stop any existing playback 

931 with _player_lock: 

932 if _player_proc['pid']: 

933 try: 

934 os.kill(_player_proc['pid'], signal.SIGTERM) 

935 except (OSError, ProcessLookupError): 

936 pass 

937 

938 # Try mpv first, then xdg-open fallback 

939 for engine in ['mpv', 'vlc', 'xdg-open']: 

940 r = _run(['which', engine], timeout=2) 

941 if r and r.returncode == 0: 

942 try: 

943 # Hide Windows console window on cross-platform engines 

944 # (mpv/vlc on Windows pop a cmd window for stdout 

945 # otherwise). Routes through canonical helper for 

946 # consistency with livekit_supervisor / vlm probes. 

947 from core.subprocess_safe import hidden_popen_kwargs 

948 proc = subprocess.Popen( 

949 [engine, '--', path], 

950 stdout=subprocess.DEVNULL, 

951 stderr=subprocess.DEVNULL, 

952 **hidden_popen_kwargs(), 

953 ) 

954 with _player_lock: 

955 _player_proc['pid'] = proc.pid 

956 _player_proc['path'] = path 

957 _player_proc['engine'] = engine 

958 _audit_system_op('media_play', {'path': path, 'engine': engine}) 

959 return jsonify({ 

960 'playing': True, 'path': path, 

961 'engine': engine, 'pid': proc.pid, 

962 }) 

963 except Exception as e: 

964 continue 

965 

966 return jsonify({'error': 'No media player found (install mpv)'}), 500 

967 

968 @app.route('/api/shell/media/stop', methods=['POST']) 

969 @_require_system_auth 

970 def shell_media_stop(): 

971 """Stop current media playback.""" 

972 with _player_lock: 

973 pid = _player_proc.get('pid') 

974 if not pid: 

975 return jsonify({'stopped': False, 'error': 'Nothing playing'}) 

976 try: 

977 os.kill(pid, signal.SIGTERM) 

978 except (OSError, ProcessLookupError): 

979 pass 

980 _player_proc['pid'] = None 

981 _player_proc['path'] = None 

982 _player_proc['engine'] = None 

983 return jsonify({'stopped': True}) 

984 

985 @app.route('/api/shell/media/player-status', methods=['GET']) 

986 def shell_media_player_status(): 

987 """Get current media player status.""" 

988 with _player_lock: 

989 pid = _player_proc.get('pid') 

990 running = False 

991 if pid: 

992 try: 

993 os.kill(pid, 0) # Signal 0 = check existence 

994 running = True 

995 except (OSError, ProcessLookupError): 

996 _player_proc['pid'] = None 

997 _player_proc['path'] = None 

998 return jsonify({ 

999 'playing': running, 

1000 'path': _player_proc.get('path'), 

1001 'engine': _player_proc.get('engine'), 

1002 }) 

1003 

1004 # ─── 16. Webcam / Camera ─────────────────────────────────── 

1005 

1006 @app.route('/api/shell/webcam/list', methods=['GET']) 

1007 def shell_webcam_list(): 

1008 """List available webcam/camera devices.""" 

1009 devices = [] 

1010 try: 

1011 import glob as _glob 

1012 for dev in sorted(_glob.glob('/dev/video*')): 

1013 info = {'device': dev} 

1014 r = _run(['v4l2-ctl', '--device', dev, '--info'], timeout=5) 

1015 if r and r.returncode == 0: 

1016 for line in r.stdout.split('\n'): 

1017 if 'Card type' in line: 

1018 info['name'] = line.split(':', 1)[1].strip() 

1019 elif 'Driver name' in line: 

1020 info['driver'] = line.split(':', 1)[1].strip() 

1021 devices.append(info) 

1022 except Exception: 

1023 pass 

1024 return jsonify({'devices': devices}) 

1025 

1026 @app.route('/api/shell/webcam/capture', methods=['POST']) 

1027 def shell_webcam_capture(): 

1028 """Capture a single frame from webcam.""" 

1029 body = request.get_json(silent=True) or {} 

1030 device = body.get('device', '/dev/video0') 

1031 import tempfile 

1032 out_path = os.path.join(tempfile.gettempdir(), f'hart_webcam_{int(time.time())}.jpg') 

1033 r = _run(['ffmpeg', '-f', 'v4l2', '-i', device, '-frames:v', '1', 

1034 '-y', out_path], timeout=10) 

1035 if r and r.returncode == 0 and os.path.isfile(out_path): 

1036 return jsonify({'status': 'ok', 'path': out_path}) 

1037 return jsonify({'status': 'error', 

1038 'error': r.stderr if r else 'ffmpeg not available'}), 500 

1039 

1040 # ─── 17. Scanner ────────────────────────────────────────── 

1041 

1042 @app.route('/api/shell/scanner/list', methods=['GET']) 

1043 def shell_scanner_list(): 

1044 """List available scanners via SANE.""" 

1045 r = _run(['scanimage', '-L'], timeout=15) 

1046 scanners = [] 

1047 if r and r.returncode == 0: 

1048 for line in r.stdout.strip().split('\n'): 

1049 if 'device' in line.lower(): 

1050 scanners.append({'raw': line.strip()}) 

1051 return jsonify({'scanners': scanners}) 

1052 

1053 @app.route('/api/shell/scanner/scan', methods=['POST']) 

1054 def shell_scanner_scan(): 

1055 """Scan a document/image.""" 

1056 body = request.get_json(silent=True) or {} 

1057 fmt = body.get('format', 'png') 

1058 import tempfile 

1059 out_path = os.path.join(tempfile.gettempdir(), f'hart_scan_{int(time.time())}.{fmt}') 

1060 r = _run(['scanimage', f'--format={fmt}', f'--output-file={out_path}'], 

1061 timeout=60) 

1062 if r and r.returncode == 0 and os.path.isfile(out_path): 

1063 return jsonify({'status': 'ok', 'path': out_path}) 

1064 return jsonify({'status': 'error', 

1065 'error': r.stderr if r else 'scanimage not available'}), 500 

1066 

1067 # ─── 18. Battery / Power Monitoring ────────────────────── 

1068 

1069 def _read_sysfs(path, default=''): 

1070 """Read a single sysfs file, return stripped string or default.""" 

1071 try: 

1072 with open(path) as f: 

1073 return f.read().strip() 

1074 except (FileNotFoundError, PermissionError, OSError): 

1075 return default 

1076 

1077 def _battery_info(): 

1078 """Gather battery information from psutil + Linux sysfs.""" 

1079 info = { 

1080 'present': False, 'status': 'unknown', 'capacity': None, 

1081 'voltage_v': None, 'power_w': None, 'temperature_c': None, 

1082 'technology': None, 'health': 'unknown', 

1083 'remaining_minutes': None, 'plugged_in': False, 

1084 } 

1085 

1086 # Try psutil first (cross-platform) 

1087 try: 

1088 import psutil 

1089 bat = psutil.sensors_battery() 

1090 if bat: 

1091 info['present'] = True 

1092 info['capacity'] = round(bat.percent, 1) 

1093 info['plugged_in'] = bat.power_plugged 

1094 if bat.power_plugged: 

1095 info['status'] = 'charging' if bat.percent < 100 else 'full' 

1096 else: 

1097 info['status'] = 'discharging' 

1098 if bat.secsleft and bat.secsleft > 0: 

1099 info['remaining_minutes'] = round(bat.secsleft / 60, 0) 

1100 except (ImportError, RuntimeError): 

1101 pass 

1102 

1103 # Enrich with Linux sysfs (more detail) 

1104 import glob as _glob 

1105 bat_dirs = sorted(_glob.glob('/sys/class/power_supply/BAT*')) 

1106 if bat_dirs: 

1107 d = bat_dirs[0] 

1108 info['present'] = True 

1109 sysfs_status = _read_sysfs(f'{d}/status') 

1110 if sysfs_status: 

1111 info['status'] = sysfs_status.lower() 

1112 

1113 cap = _read_sysfs(f'{d}/capacity') 

1114 if cap.isdigit(): 

1115 info['capacity'] = int(cap) 

1116 

1117 voltage = _read_sysfs(f'{d}/voltage_now') 

1118 if voltage.isdigit(): 

1119 info['voltage_v'] = round(int(voltage) / 1_000_000, 2) 

1120 

1121 power = _read_sysfs(f'{d}/power_now') 

1122 if power.isdigit(): 

1123 info['power_w'] = round(int(power) / 1_000_000, 2) 

1124 

1125 temp = _read_sysfs(f'{d}/temp') 

1126 if temp.isdigit(): 

1127 info['temperature_c'] = round(int(temp) / 10, 1) 

1128 

1129 info['technology'] = _read_sysfs(f'{d}/technology') or None 

1130 

1131 # Health classification 

1132 if info['capacity'] is not None: 

1133 if info['capacity'] > 20: 

1134 info['health'] = 'good' 

1135 elif info['capacity'] > 5: 

1136 info['health'] = 'low' 

1137 else: 

1138 info['health'] = 'critical' 

1139 

1140 # AC adapter 

1141 ac_dirs = sorted(_glob.glob('/sys/class/power_supply/AC*') + 

1142 _glob.glob('/sys/class/power_supply/ADP*')) 

1143 if ac_dirs: 

1144 online = _read_sysfs(f'{ac_dirs[0]}/online') 

1145 if online == '1': 

1146 info['plugged_in'] = True 

1147 

1148 return info 

1149 

1150 @app.route('/api/shell/battery', methods=['GET']) 

1151 def shell_battery_status(): 

1152 """Get current battery status.""" 

1153 return jsonify(_battery_info()) 

1154 

1155 @app.route('/api/shell/battery/profile', methods=['GET']) 

1156 def shell_battery_profile(): 

1157 """Get current power profile.""" 

1158 profiles = [] 

1159 r = _run(['powerprofilesctl', 'list'], timeout=5) 

1160 current = None 

1161 if r and r.returncode == 0: 

1162 for line in r.stdout.strip().split('\n'): 

1163 line = line.strip() 

1164 if line.endswith(':'): 

1165 name = line.rstrip(':').lstrip('* ') 

1166 profiles.append(name) 

1167 if line.startswith('*'): 

1168 current = name 

1169 if not profiles: 

1170 # Fallback: check TLP or cpufreq 

1171 r2 = _run(['cat', '/sys/devices/system/cpu/cpu0/cpufreq/scaling_governor'], 

1172 timeout=5) 

1173 if r2 and r2.returncode == 0: 

1174 current = r2.stdout.strip() 

1175 profiles = ['performance', 'powersave', 'schedutil'] 

1176 return jsonify({ 

1177 'current_profile': current, 

1178 'available': profiles, 

1179 }) 

1180 

1181 @app.route('/api/shell/battery/profile', methods=['POST']) 

1182 @_require_system_auth 

1183 def shell_battery_set_profile(): 

1184 """Set power profile.""" 

1185 body = request.get_json(silent=True) or {} 

1186 profile = body.get('profile') 

1187 if not profile: 

1188 return jsonify({'error': 'profile required'}), 400 

1189 r = _run(['powerprofilesctl', 'set', profile], timeout=5) 

1190 if r and r.returncode == 0: 

1191 _audit_system_op('battery_profile', {'profile': profile}) 

1192 return jsonify({'success': True, 'profile': profile}) 

1193 return jsonify({'error': 'Failed to set profile', 

1194 'detail': r.stderr if r else 'powerprofilesctl not available'}), 500 

1195 

1196 # ─── 19. WiFi Management ────────────────────────────────── 

1197 

1198 @app.route('/api/shell/wifi/status', methods=['GET']) 

1199 def shell_wifi_status(): 

1200 """Get current WiFi connection status.""" 

1201 info = {'enabled': False, 'connected': False, 'ssid': None, 

1202 'signal': None, 'frequency': None, 'ip': None} 

1203 # Check if WiFi is enabled 

1204 r = _run(['nmcli', 'radio', 'wifi'], timeout=5) 

1205 if r and r.returncode == 0: 

1206 info['enabled'] = r.stdout.strip().lower() == 'enabled' 

1207 

1208 # Current connection 

1209 r = _run(['nmcli', '-t', '-f', 'ACTIVE,SSID,SIGNAL,FREQ', 

1210 'device', 'wifi'], timeout=5) 

1211 if r and r.returncode == 0: 

1212 for line in r.stdout.strip().split('\n'): 

1213 parts = line.split(':') 

1214 if len(parts) >= 2 and parts[0] == 'yes': 

1215 info['connected'] = True 

1216 info['ssid'] = parts[1] 

1217 if len(parts) >= 3: 

1218 info['signal'] = int(parts[2]) if parts[2].isdigit() else None 

1219 if len(parts) >= 4: 

1220 info['frequency'] = parts[3] 

1221 break 

1222 

1223 # IP address 

1224 if info['connected']: 

1225 r = _run(['nmcli', '-t', '-f', 'IP4.ADDRESS', 'device', 'show', 

1226 'type', 'wifi'], timeout=5) 

1227 if r and r.returncode == 0: 

1228 for line in r.stdout.strip().split('\n'): 

1229 if 'IP4.ADDRESS' in line: 

1230 info['ip'] = line.split(':', 1)[1].strip() if ':' in line else None 

1231 break 

1232 return jsonify(info) 

1233 

1234 @app.route('/api/shell/wifi/networks', methods=['GET']) 

1235 def shell_wifi_networks(): 

1236 """Scan and list available WiFi networks.""" 

1237 rescan = request.args.get('rescan', 'false').lower() == 'true' 

1238 if rescan: 

1239 _run(['nmcli', 'device', 'wifi', 'rescan'], timeout=10) 

1240 time.sleep(2) # Give scan time to populate 

1241 

1242 r = _run(['nmcli', '-t', '-f', 'SSID,SIGNAL,SECURITY,FREQ,BSSID', 

1243 'device', 'wifi', 'list'], timeout=10) 

1244 networks = [] 

1245 seen = set() 

1246 if r and r.returncode == 0: 

1247 for line in r.stdout.strip().split('\n'): 

1248 parts = line.split(':') 

1249 if len(parts) >= 3 and parts[0] and parts[0] not in seen: 

1250 seen.add(parts[0]) 

1251 networks.append({ 

1252 'ssid': parts[0], 

1253 'signal': int(parts[1]) if parts[1].isdigit() else 0, 

1254 'security': parts[2] if len(parts) > 2 else '', 

1255 'frequency': parts[3] if len(parts) > 3 else '', 

1256 }) 

1257 networks.sort(key=lambda n: n['signal'], reverse=True) 

1258 return jsonify({'networks': networks, 'count': len(networks)}) 

1259 

1260 @app.route('/api/shell/wifi/connect', methods=['POST']) 

1261 @_require_system_auth 

1262 def shell_wifi_connect(): 

1263 """Connect to a WiFi network.""" 

1264 body = request.get_json(silent=True) or {} 

1265 ssid = body.get('ssid') 

1266 if not ssid: 

1267 return jsonify({'error': 'ssid required'}), 400 

1268 password = body.get('password') 

1269 hidden = body.get('hidden', False) 

1270 

1271 cmd = ['nmcli', 'device', 'wifi', 'connect', ssid] 

1272 if password: 

1273 cmd += ['password', password] 

1274 if hidden: 

1275 cmd += ['hidden', 'yes'] 

1276 

1277 r = _run(cmd, timeout=30) 

1278 if r and r.returncode == 0: 

1279 _audit_system_op('wifi_connect', {'ssid': ssid}) 

1280 return jsonify({'connected': True, 'ssid': ssid}) 

1281 return jsonify({'connected': False, 

1282 'error': r.stderr.strip() if r else 'nmcli not available'}), 400 

1283 

1284 @app.route('/api/shell/wifi/disconnect', methods=['POST']) 

1285 @_require_system_auth 

1286 def shell_wifi_disconnect(): 

1287 """Disconnect from current WiFi network.""" 

1288 r = _run(['nmcli', 'device', 'disconnect', 'type', 'wifi'], timeout=10) 

1289 if r and r.returncode == 0: 

1290 return jsonify({'disconnected': True}) 

1291 # Try finding the wifi device name 

1292 r2 = _run(['nmcli', '-t', '-f', 'DEVICE,TYPE', 'device'], timeout=5) 

1293 if r2 and r2.returncode == 0: 

1294 for line in r2.stdout.strip().split('\n'): 

1295 parts = line.split(':') 

1296 if len(parts) >= 2 and parts[1] == 'wifi': 

1297 r3 = _run(['nmcli', 'device', 'disconnect', parts[0]], timeout=10) 

1298 if r3 and r3.returncode == 0: 

1299 return jsonify({'disconnected': True}) 

1300 return jsonify({'disconnected': False, 

1301 'error': 'Failed to disconnect'}), 400 

1302 

1303 @app.route('/api/shell/wifi/saved', methods=['GET']) 

1304 def shell_wifi_saved(): 

1305 """List saved WiFi connections.""" 

1306 r = _run(['nmcli', '-t', '-f', 'NAME,TYPE,AUTOCONNECT', 

1307 'connection', 'show'], timeout=5) 

1308 connections = [] 

1309 if r and r.returncode == 0: 

1310 for line in r.stdout.strip().split('\n'): 

1311 parts = line.split(':') 

1312 if len(parts) >= 2 and '802-11-wireless' in parts[1]: 

1313 connections.append({ 

1314 'ssid': parts[0], 

1315 'autoconnect': parts[2].lower() == 'yes' if len(parts) > 2 else True, 

1316 }) 

1317 return jsonify({'connections': connections}) 

1318 

1319 @app.route('/api/shell/wifi/forget', methods=['POST']) 

1320 @_require_system_auth 

1321 def shell_wifi_forget(): 

1322 """Forget a saved WiFi connection.""" 

1323 body = request.get_json(silent=True) or {} 

1324 ssid = body.get('ssid') 

1325 if not ssid: 

1326 return jsonify({'error': 'ssid required'}), 400 

1327 r = _run(['nmcli', 'connection', 'delete', ssid], timeout=10) 

1328 if r and r.returncode == 0: 

1329 _audit_system_op('wifi_forget', {'ssid': ssid}) 

1330 return jsonify({'forgotten': True, 'ssid': ssid}) 

1331 return jsonify({'forgotten': False, 

1332 'error': r.stderr.strip() if r else 'nmcli not available'}), 400 

1333 

1334 @app.route('/api/shell/wifi/toggle', methods=['POST']) 

1335 @_require_system_auth 

1336 def shell_wifi_toggle(): 

1337 """Enable or disable WiFi radio.""" 

1338 body = request.get_json(silent=True) or {} 

1339 enable = body.get('enable', True) 

1340 state = 'on' if enable else 'off' 

1341 r = _run(['nmcli', 'radio', 'wifi', state], timeout=5) 

1342 if r and r.returncode == 0: 

1343 return jsonify({'enabled': enable}) 

1344 return jsonify({'error': 'Failed to toggle WiFi'}), 500 

1345 

1346 # ─── 20. VPN Client ─────────────────────────────────────── 

1347 

1348 @app.route('/api/shell/vpn/list', methods=['GET']) 

1349 def shell_vpn_list(): 

1350 """List configured VPN connections.""" 

1351 r = _run(['nmcli', '-t', '-f', 'NAME,TYPE,ACTIVE', 

1352 'connection', 'show'], timeout=5) 

1353 vpns = [] 

1354 if r and r.returncode == 0: 

1355 for line in r.stdout.strip().split('\n'): 

1356 parts = line.split(':') 

1357 if len(parts) >= 2 and 'vpn' in parts[1].lower(): 

1358 vpns.append({ 

1359 'name': parts[0], 

1360 'type': parts[1], 

1361 'active': parts[2].lower() == 'yes' if len(parts) > 2 else False, 

1362 }) 

1363 return jsonify({'connections': vpns}) 

1364 

1365 @app.route('/api/shell/vpn/status', methods=['GET']) 

1366 def shell_vpn_status(): 

1367 """Get VPN connection status.""" 

1368 r = _run(['nmcli', '-t', '-f', 'NAME,TYPE,IP4.ADDRESS', 

1369 'connection', 'show', '--active'], timeout=5) 

1370 vpn_active = None 

1371 if r and r.returncode == 0: 

1372 for line in r.stdout.strip().split('\n'): 

1373 parts = line.split(':') 

1374 if len(parts) >= 2 and 'vpn' in parts[1].lower(): 

1375 vpn_active = { 

1376 'name': parts[0], 

1377 'type': parts[1], 

1378 'ip': parts[2] if len(parts) > 2 else None, 

1379 } 

1380 break 

1381 return jsonify({ 

1382 'connected': vpn_active is not None, 

1383 'vpn': vpn_active, 

1384 }) 

1385 

1386 @app.route('/api/shell/vpn/connect', methods=['POST']) 

1387 @_require_system_auth 

1388 def shell_vpn_connect(): 

1389 """Activate a VPN connection.""" 

1390 body = request.get_json(silent=True) or {} 

1391 name = body.get('name') 

1392 if not name: 

1393 return jsonify({'error': 'name required'}), 400 

1394 r = _run(['nmcli', 'connection', 'up', name], timeout=30) 

1395 if r and r.returncode == 0: 

1396 _audit_system_op('vpn_connect', {'name': name}) 

1397 return jsonify({'connected': True, 'name': name}) 

1398 return jsonify({'connected': False, 

1399 'error': r.stderr.strip() if r else 'nmcli not available'}), 400 

1400 

1401 @app.route('/api/shell/vpn/disconnect', methods=['POST']) 

1402 @_require_system_auth 

1403 def shell_vpn_disconnect(): 

1404 """Deactivate VPN connection.""" 

1405 body = request.get_json(silent=True) or {} 

1406 name = body.get('name') 

1407 if not name: 

1408 return jsonify({'error': 'name required'}), 400 

1409 r = _run(['nmcli', 'connection', 'down', name], timeout=10) 

1410 if r and r.returncode == 0: 

1411 _audit_system_op('vpn_disconnect', {'name': name}) 

1412 return jsonify({'disconnected': True}) 

1413 return jsonify({'disconnected': False, 

1414 'error': r.stderr.strip() if r else 'nmcli not available'}), 400 

1415 

1416 @app.route('/api/shell/vpn/import', methods=['POST']) 

1417 @_require_system_auth 

1418 def shell_vpn_import(): 

1419 """Import a VPN configuration file.""" 

1420 body = request.get_json(silent=True) or {} 

1421 config_path = body.get('config_path') 

1422 vpn_type = body.get('type', 'openvpn') 

1423 if not config_path: 

1424 return jsonify({'error': 'config_path required'}), 400 

1425 if not os.path.isfile(config_path): 

1426 return jsonify({'error': 'Config file not found'}), 404 

1427 

1428 r = _run(['nmcli', 'connection', 'import', 'type', vpn_type, 

1429 'file', config_path], timeout=10) 

1430 if r and r.returncode == 0: 

1431 # Extract connection name from output 

1432 name = r.stdout.strip().split("'")[1] if "'" in r.stdout else os.path.basename(config_path) 

1433 return jsonify({'imported': True, 'name': name}) 

1434 return jsonify({'imported': False, 

1435 'error': r.stderr.strip() if r else 'nmcli not available'}), 400 

1436 

1437 @app.route('/api/shell/vpn/<name>', methods=['DELETE']) 

1438 @_require_system_auth 

1439 def shell_vpn_delete(name): 

1440 """Delete a VPN connection.""" 

1441 r = _run(['nmcli', 'connection', 'delete', name], timeout=10) 

1442 if r and r.returncode == 0: 

1443 _audit_system_op('vpn_delete', {'name': name}) 

1444 return jsonify({'deleted': True}) 

1445 return jsonify({'deleted': False, 

1446 'error': r.stderr.strip() if r else 'not found'}), 400 

1447 

1448 # ─── 21. Trash / Recycle Bin ────────────────────────────── 

1449 

1450 def _trash_dir(): 

1451 """Get XDG trash directory.""" 

1452 return os.path.join(os.path.expanduser('~'), '.local', 'share', 'Trash') 

1453 

1454 def _trash_list(): 

1455 """List items in trash with metadata.""" 

1456 trash = _trash_dir() 

1457 info_dir = os.path.join(trash, 'info') 

1458 files_dir = os.path.join(trash, 'files') 

1459 items = [] 

1460 if not os.path.isdir(info_dir): 

1461 return items 

1462 

1463 for fname in os.listdir(info_dir): 

1464 if not fname.endswith('.trashinfo'): 

1465 continue 

1466 item_name = fname[:-len('.trashinfo')] 

1467 item_path = os.path.join(files_dir, item_name) 

1468 info_path = os.path.join(info_dir, fname) 

1469 

1470 entry = {'id': item_name, 'name': item_name} 

1471 try: 

1472 cp = configparser.ConfigParser() 

1473 cp.read(info_path) 

1474 if cp.has_section('Trash Info'): 

1475 entry['original_path'] = cp.get('Trash Info', 'Path', fallback='') 

1476 entry['deleted_time'] = cp.get('Trash Info', 'DeletionDate', fallback='') 

1477 except Exception: 

1478 pass 

1479 

1480 if os.path.exists(item_path): 

1481 try: 

1482 st = os.stat(item_path) 

1483 entry['size_bytes'] = st.st_size 

1484 entry['is_dir'] = os.path.isdir(item_path) 

1485 except OSError: 

1486 entry['size_bytes'] = 0 

1487 items.append(entry) 

1488 

1489 items.sort(key=lambda x: x.get('deleted_time', ''), reverse=True) 

1490 return items 

1491 

1492 @app.route('/api/shell/trash', methods=['GET']) 

1493 def shell_trash_list(): 

1494 """List items in trash.""" 

1495 items = _trash_list() 

1496 total_size = sum(i.get('size_bytes', 0) for i in items) 

1497 return jsonify({ 

1498 'items': items, 

1499 'total_items': len(items), 

1500 'total_size_mb': round(total_size / 1048576, 2), 

1501 }) 

1502 

1503 @app.route('/api/shell/trash/move', methods=['POST']) 

1504 @_require_system_auth 

1505 def shell_trash_move_to(): 

1506 """Move a file to trash (instead of permanent delete).""" 

1507 body = request.get_json(silent=True) or {} 

1508 path = body.get('path') 

1509 if not path: 

1510 return jsonify({'error': 'path required'}), 400 

1511 if not os.path.exists(path): 

1512 return jsonify({'error': 'File not found'}), 404 

1513 

1514 r = _run(['gio', 'trash', path], timeout=10) 

1515 if r and r.returncode == 0: 

1516 _audit_system_op('trash_move', {'path': path}) 

1517 return jsonify({'trashed': True, 'path': path}) 

1518 # Fallback: manual move to ~/.local/share/Trash 

1519 try: 

1520 trash = _trash_dir() 

1521 files_dir = os.path.join(trash, 'files') 

1522 info_dir = os.path.join(trash, 'info') 

1523 os.makedirs(files_dir, exist_ok=True) 

1524 os.makedirs(info_dir, exist_ok=True) 

1525 

1526 name = os.path.basename(path) 

1527 dst = os.path.join(files_dir, name) 

1528 # Handle name collision 

1529 counter = 1 

1530 while os.path.exists(dst): 

1531 base, ext = os.path.splitext(name) 

1532 dst = os.path.join(files_dir, f'{base}.{counter}{ext}') 

1533 name = f'{base}.{counter}{ext}' 

1534 counter += 1 

1535 

1536 import shutil 

1537 shutil.move(path, dst) 

1538 

1539 # Write .trashinfo 

1540 from datetime import datetime, timezone 

1541 info_path = os.path.join(info_dir, f'{name}.trashinfo') 

1542 with open(info_path, 'w') as f: 

1543 f.write('[Trash Info]\n') 

1544 f.write(f'Path={os.path.abspath(path)}\n') 

1545 f.write(f'DeletionDate={datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S")}\n') 

1546 return jsonify({'trashed': True, 'path': path}) 

1547 except Exception as e: 

1548 return jsonify({'error': str(e)}), 500 

1549 

1550 @app.route('/api/shell/trash/restore', methods=['POST']) 

1551 @_require_system_auth 

1552 def shell_trash_restore(): 

1553 """Restore item(s) from trash to original location.""" 

1554 body = request.get_json(silent=True) or {} 

1555 item_id = body.get('id') 

1556 restore_all = body.get('all', False) 

1557 

1558 trash = _trash_dir() 

1559 files_dir = os.path.join(trash, 'files') 

1560 info_dir = os.path.join(trash, 'info') 

1561 restored = [] 

1562 

1563 items_to_restore = _trash_list() if restore_all else [] 

1564 if item_id and not restore_all: 

1565 items_to_restore = [i for i in _trash_list() if i['id'] == item_id] 

1566 

1567 for item in items_to_restore: 

1568 try: 

1569 src = os.path.join(files_dir, item['id']) 

1570 dst = item.get('original_path', '') 

1571 if not dst or not src: 

1572 continue 

1573 dst_dir = os.path.dirname(dst) 

1574 if dst_dir: 

1575 os.makedirs(dst_dir, exist_ok=True) 

1576 import shutil 

1577 shutil.move(src, dst) 

1578 # Remove .trashinfo 

1579 info_path = os.path.join(info_dir, f"{item['id']}.trashinfo") 

1580 if os.path.isfile(info_path): 

1581 os.remove(info_path) 

1582 restored.append(dst) 

1583 except Exception as e: 

1584 logger.debug(f"Trash restore failed for {item['id']}: {e}") 

1585 

1586 return jsonify({ 

1587 'restored_count': len(restored), 

1588 'restored_paths': restored, 

1589 }) 

1590 

1591 @app.route('/api/shell/trash/empty', methods=['DELETE']) 

1592 @_require_system_auth 

1593 def shell_trash_empty(): 

1594 """Empty the trash (permanent delete).""" 

1595 body = request.get_json(silent=True) or {} 

1596 older_than_days = body.get('older_than_days') 

1597 

1598 r = _run(['gio', 'trash', '--empty'], timeout=30) 

1599 if r and r.returncode == 0 and not older_than_days: 

1600 return jsonify({'emptied': True}) 

1601 

1602 # Fallback or age-filtered empty 

1603 trash = _trash_dir() 

1604 files_dir = os.path.join(trash, 'files') 

1605 info_dir = os.path.join(trash, 'info') 

1606 freed = 0 

1607 

1608 if older_than_days: 

1609 from datetime import datetime, timezone, timedelta 

1610 cutoff = datetime.now(timezone.utc) - timedelta(days=older_than_days) 

1611 items = _trash_list() 

1612 for item in items: 

1613 try: 

1614 dt_str = item.get('deleted_time', '') 

1615 if dt_str: 

1616 dt = datetime.fromisoformat(dt_str) 

1617 if dt.tzinfo is None: 

1618 dt = dt.replace(tzinfo=timezone.utc) 

1619 if dt > cutoff: 

1620 continue 

1621 except (ValueError, TypeError): 

1622 pass 

1623 

1624 item_path = os.path.join(files_dir, item['id']) 

1625 info_path = os.path.join(info_dir, f"{item['id']}.trashinfo") 

1626 try: 

1627 freed += item.get('size_bytes', 0) 

1628 import shutil 

1629 if os.path.isdir(item_path): 

1630 shutil.rmtree(item_path) 

1631 elif os.path.isfile(item_path): 

1632 os.remove(item_path) 

1633 if os.path.isfile(info_path): 

1634 os.remove(info_path) 

1635 except Exception as e: 

1636 logger.debug(f"Trash empty failed for {item['id']}: {e}") 

1637 else: 

1638 # Full empty fallback 

1639 import shutil 

1640 for d in [files_dir, info_dir]: 

1641 if os.path.isdir(d): 

1642 for item in os.listdir(d): 

1643 p = os.path.join(d, item) 

1644 try: 

1645 if os.path.isdir(p): 

1646 shutil.rmtree(p) 

1647 else: 

1648 os.remove(p) 

1649 except Exception: 

1650 pass 

1651 

1652 _audit_system_op('trash_empty', {'freed_mb': round(freed / 1048576, 2)}) 

1653 return jsonify({'emptied': True, 'freed_mb': round(freed / 1048576, 2)}) 

1654 

1655 # ─── 22. Screen Rotation ────────────────────────────────── 

1656 

1657 @app.route('/api/shell/display/rotation', methods=['GET']) 

1658 def shell_display_rotation(): 

1659 """Get current display rotation/orientation.""" 

1660 outputs = [] 

1661 r = _run(['swaymsg', '-t', 'get_outputs', '-r'], timeout=5) 

1662 if r and r.returncode == 0: 

1663 try: 

1664 for out in json.loads(r.stdout): 

1665 outputs.append({ 

1666 'name': out.get('name', ''), 

1667 'transform': out.get('transform', 'normal'), 

1668 'active': out.get('active', False), 

1669 }) 

1670 except (json.JSONDecodeError, TypeError): 

1671 pass 

1672 if not outputs: 

1673 # xrandr fallback 

1674 r2 = _run(['xrandr', '--query'], timeout=5) 

1675 if r2 and r2.returncode == 0: 

1676 for line in r2.stdout.split('\n'): 

1677 if ' connected' in line: 

1678 parts = line.split() 

1679 name = parts[0] if parts else 'unknown' 

1680 rotation = 'normal' 

1681 for kw in ('left', 'right', 'inverted'): 

1682 if kw in line: 

1683 rotation = kw 

1684 break 

1685 outputs.append({'name': name, 'transform': rotation, 

1686 'active': 'primary' in line or '+' in line}) 

1687 return jsonify({'outputs': outputs}) 

1688 

1689 @app.route('/api/shell/display/rotation', methods=['POST']) 

1690 @_require_system_auth 

1691 def shell_display_set_rotation(): 

1692 """Set display rotation. transform: normal|90|180|270|flipped.""" 

1693 body = request.get_json(silent=True) or {} 

1694 output = body.get('output', '') 

1695 transform = body.get('transform', 'normal') 

1696 if not output: 

1697 return jsonify({'error': 'output name required'}), 400 

1698 

1699 valid = {'normal', '90', '180', '270', 'flipped', 

1700 'flipped-90', 'flipped-180', 'flipped-270'} 

1701 if transform not in valid: 

1702 return jsonify({'error': f'transform must be one of: {sorted(valid)}'}), 400 

1703 

1704 # Try swaymsg (Wayland) 

1705 r = _run(['swaymsg', 'output', output, 'transform', transform], timeout=5) 

1706 if r and r.returncode == 0: 

1707 _audit_system_op('display_rotate', {'output': output, 'transform': transform}) 

1708 return jsonify({'rotated': True, 'output': output, 'transform': transform}) 

1709 

1710 # xrandr fallback (X11) 

1711 xrandr_map = {'normal': 'normal', '90': 'left', '180': 'inverted', 

1712 '270': 'right', 'flipped': 'normal'} 

1713 xr = xrandr_map.get(transform, 'normal') 

1714 r2 = _run(['xrandr', '--output', output, '--rotate', xr], timeout=5) 

1715 if r2 and r2.returncode == 0: 

1716 _audit_system_op('display_rotate', {'output': output, 'transform': transform}) 

1717 return jsonify({'rotated': True, 'output': output, 'transform': transform}) 

1718 

1719 return jsonify({'error': 'rotation failed (no swaymsg or xrandr)'}), 500 

1720 

1721 @app.route('/api/shell/display/auto-rotate', methods=['GET']) 

1722 def shell_display_auto_rotate_status(): 

1723 """Check if auto-rotate is available (iio-sensor-proxy).""" 

1724 r = _run(['monitor-sensor', '--help'], timeout=3) 

1725 available = r is not None 

1726 return jsonify({'available': available, 

1727 'sensor': 'iio-sensor-proxy' if available else None}) 

1728 

1729 logger.info("Registered shell system routes (10 features)")