Coverage for integrations / agent_engine / shell_desktop_apis.py: 73.8%

961 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Shell Desktop APIs — Desktop experience management for HART OS. 

3 

4Covers: default apps, font manager, sound themes, clipboard history, 

5date/time/timezone, wallpaper, input methods, night light, workspaces. 

6 

7All routes registered via register_shell_desktop_routes(app). 

8""" 

9 

10import collections 

11import json 

12import logging 

13import os 

14import shutil 

15import subprocess 

16import sys 

17import threading 

18import time 

19 

20logger = logging.getLogger('hevolve.shell.desktop') 

21 

22# ─── Helpers ──────────────────────────────────────────────────── 

23 

24_HART_CONFIG = os.path.expanduser(os.environ.get( 

25 'HART_CONFIG_DIR', '~/.config/hart')) 

26 

27 

28def _config_path(name): 

29 os.makedirs(_HART_CONFIG, exist_ok=True) 

30 return os.path.join(_HART_CONFIG, name) 

31 

32 

33def _load_json(path, default=None): 

34 try: 

35 with open(path) as f: 

36 return json.load(f) 

37 except (FileNotFoundError, json.JSONDecodeError): 

38 return default if default is not None else {} 

39 

40 

41def _save_json(path, data): 

42 os.makedirs(os.path.dirname(path), exist_ok=True) 

43 with open(path, 'w') as f: 

44 json.dump(data, f, indent=2) 

45 

46 

47def _is_wayland(): 

48 """Detect Wayland compositor — env var check + GNOME session fallback.""" 

49 if os.environ.get('WAYLAND_DISPLAY'): 

50 return True 

51 # Fallback: some GNOME sessions don't set WAYLAND_DISPLAY 

52 if os.environ.get('XDG_SESSION_TYPE', '').lower() == 'wayland': 

53 return True 

54 # pgrep is Linux-only; skip on other platforms 

55 if sys.platform == 'linux': 

56 try: 

57 r = subprocess.run(['pgrep', '-x', 'sway|labwc|hyprland'], 

58 capture_output=True, text=True, timeout=3) 

59 if r.returncode == 0: 

60 return True 

61 except (FileNotFoundError, subprocess.TimeoutExpired): 

62 pass 

63 return False 

64 

65 

66def _run(cmd, timeout=10, **kw): 

67 try: 

68 r = subprocess.run(cmd, capture_output=True, text=True, 

69 timeout=timeout, **kw) 

70 return r 

71 except (FileNotFoundError, subprocess.TimeoutExpired): 

72 return None 

73 

74 

75# ─── Clipboard state (in-memory) ─────────────────────────────── 

76 

77_clipboard_history = collections.deque(maxlen=100) 

78_clipboard_lock = threading.Lock() 

79_clipboard_counter = 0 

80 

81 

82# ═══════════════════════════════════════════════════════════════ 

83# Route registration 

84# ═══════════════════════════════════════════════════════════════ 

85 

86def _require_desktop_auth(f): 

87 """Decorator: require local shell auth for destructive desktop ops.""" 

88 from functools import wraps 

89 @wraps(f) 

90 def decorated(*args, **kwargs): 

91 from flask import request, jsonify 

92 remote = request.remote_addr or '' 

93 if remote not in ('127.0.0.1', '::1', 'localhost'): 

94 token = request.headers.get('X-Shell-Token', '') 

95 expected = os.environ.get('HART_SHELL_TOKEN', '') 

96 if not expected or token != expected: 

97 return jsonify({'error': 'Unauthorized'}), 403 

98 return f(*args, **kwargs) 

99 return decorated 

100 

101 

102def _audit_desktop_op(action, detail=None): 

103 """Log a desktop operation to the immutable audit log (best-effort).""" 

104 try: 

105 from security.immutable_audit_log import get_audit_log 

106 get_audit_log().log_event( 

107 'shell_ops', 'shell_desktop_api', action, 

108 detail=detail or {}) 

109 except Exception: 

110 pass 

111 

112 

113def register_shell_desktop_routes(app): 

114 """Register all desktop experience API routes.""" 

115 from flask import jsonify, request 

116 

117 # ─── 1. Default Apps / File Associations ──────────────── 

118 

119 _COMMON_MIMES = [ 

120 'text/html', 'text/plain', 'application/pdf', 

121 'image/png', 'image/jpeg', 'image/gif', 'image/svg+xml', 

122 'video/mp4', 'video/x-matroska', 'audio/mpeg', 'audio/flac', 

123 'application/zip', 'application/json', 'application/xml', 

124 'inode/directory', 

125 ] 

126 

127 _CATEGORIES = { 

128 'browser': ('xdg-settings', 'get', 'default-web-browser'), 

129 'email': ('xdg-settings', 'get', 'default-url-scheme-handler', 'mailto'), 

130 } 

131 

132 @app.route('/api/shell/default-apps', methods=['GET']) 

133 def shell_default_apps(): 

134 defaults = {} 

135 for mime in _COMMON_MIMES: 

136 r = _run(['xdg-mime', 'query', 'default', mime]) 

137 if r and r.returncode == 0 and r.stdout.strip(): 

138 desktop = r.stdout.strip() 

139 defaults[mime] = { 

140 'app': desktop, 

141 'name': desktop.replace('.desktop', '').replace('org.', '').replace('.', ' '), 

142 } 

143 categories = {} 

144 for cat, cmd in _CATEGORIES.items(): 

145 r = _run(list(cmd)) 

146 if r and r.returncode == 0 and r.stdout.strip(): 

147 categories[cat] = r.stdout.strip() 

148 return jsonify({'defaults': defaults, 'categories': categories}) 

149 

150 @app.route('/api/shell/default-apps/candidates', methods=['GET']) 

151 def shell_default_apps_candidates(): 

152 mime = request.args.get('mime_type', '') 

153 if not mime: 

154 return jsonify({'error': 'mime_type required'}), 400 

155 candidates = [] 

156 if sys.platform == 'win32': 

157 apps_dir = os.path.join(os.environ.get('PROGRAMDATA', r'C:\ProgramData'), 

158 'Microsoft', 'Windows', 'Start Menu', 'Programs') 

159 elif sys.platform == 'darwin': 

160 apps_dir = '/Applications' 

161 else: 

162 apps_dir = '/usr/share/applications' 

163 if os.path.isdir(apps_dir): 

164 for f in os.listdir(apps_dir): 

165 if not f.endswith('.desktop'): 

166 continue 

167 path = os.path.join(apps_dir, f) 

168 try: 

169 with open(path) as fh: 

170 content = fh.read() 

171 if mime in content: 

172 name = f.replace('.desktop', '') 

173 for line in content.splitlines(): 

174 if line.startswith('Name='): 

175 name = line.split('=', 1)[1] 

176 break 

177 candidates.append({'app': f, 'name': name}) 

178 except (IOError, UnicodeDecodeError): 

179 pass 

180 return jsonify({'mime_type': mime, 'candidates': candidates}) 

181 

182 @app.route('/api/shell/default-apps/set', methods=['POST']) 

183 def shell_default_apps_set(): 

184 data = request.get_json(force=True) 

185 mime = data.get('mime_type', '') 

186 desktop_app = data.get('app', '') 

187 if not mime or not desktop_app: 

188 return jsonify({'error': 'mime_type and app required'}), 400 

189 r = _run(['xdg-mime', 'default', desktop_app, mime]) 

190 if r and r.returncode == 0: 

191 return jsonify({'set': True, 'mime_type': mime, 'app': desktop_app}) 

192 return jsonify({'set': False, 'error': r.stderr.strip() if r else 'xdg-mime not available'}), 500 

193 

194 @app.route('/api/shell/default-apps/set-category', methods=['POST']) 

195 def shell_default_apps_set_category(): 

196 data = request.get_json(force=True) 

197 cat = data.get('category', '') 

198 desktop_app = data.get('app', '') 

199 if not cat or not desktop_app: 

200 return jsonify({'error': 'category and app required'}), 400 

201 if cat == 'browser': 

202 r = _run(['xdg-settings', 'set', 'default-web-browser', desktop_app]) 

203 elif cat == 'email': 

204 r = _run(['xdg-settings', 'set', 'default-url-scheme-handler', 'mailto', desktop_app]) 

205 else: 

206 return jsonify({'error': f'Unknown category: {cat}'}), 400 

207 ok = r and r.returncode == 0 

208 return jsonify({'set': ok, 'category': cat, 'app': desktop_app}) 

209 

210 # ─── 2. Font Manager ─────────────────────────────────── 

211 

212 @app.route('/api/shell/fonts', methods=['GET']) 

213 def shell_fonts_list(): 

214 search = request.args.get('search', '').lower() 

215 category = request.args.get('category', '') 

216 r = _run(['fc-list', '--format=%{family}|%{style}|%{file}|%{fontformat}\n']) 

217 fonts = [] 

218 seen = set() 

219 if r and r.returncode == 0: 

220 for line in r.stdout.strip().split('\n'): 

221 parts = line.split('|') 

222 if len(parts) < 3: 

223 continue 

224 family = parts[0].split(',')[0].strip() 

225 if family in seen: 

226 continue 

227 seen.add(family) 

228 style = parts[1].strip() if len(parts) > 1 else '' 

229 path = parts[2].strip() if len(parts) > 2 else '' 

230 fmt = parts[3].strip() if len(parts) > 3 else '' 

231 cat = 'monospace' if any(k in family.lower() for k in ['mono', 'code', 'consol']) \ 

232 else 'serif' if any(k in family.lower() for k in ['serif', 'times', 'georgia']) \ 

233 else 'sans-serif' 

234 if search and search not in family.lower(): 

235 continue 

236 if category and category != cat: 

237 continue 

238 fonts.append({ 

239 'family': family, 'style': style, 

240 'path': path, 'format': fmt, 'category': cat, 

241 }) 

242 fonts.sort(key=lambda f: f['family']) 

243 cats = {} 

244 for f in fonts: 

245 cats[f['category']] = cats.get(f['category'], 0) + 1 

246 return jsonify({'fonts': fonts, 'count': len(fonts), 'categories': cats}) 

247 

248 @app.route('/api/shell/fonts/preview', methods=['GET']) 

249 def shell_fonts_preview(): 

250 family = request.args.get('family', '') 

251 text = request.args.get('text', 'The quick brown fox jumps over the lazy dog') 

252 size = request.args.get('size', '18') 

253 if not family: 

254 return jsonify({'error': 'family required'}), 400 

255 return jsonify({ 

256 'family': family, 'text': text, 'size': int(size), 

257 'css': f'font-family: "{family}"; font-size: {size}px;', 

258 }) 

259 

260 @app.route('/api/shell/fonts/install', methods=['POST']) 

261 def shell_fonts_install(): 

262 data = request.get_json(force=True) 

263 path = data.get('path', '') 

264 if not path or not os.path.isfile(path): 

265 return jsonify({'error': 'Valid font file path required'}), 400 

266 ext = os.path.splitext(path)[1].lower() 

267 if ext not in ('.ttf', '.otf', '.woff', '.woff2', '.ttc'): 

268 return jsonify({'error': f'Unsupported font format: {ext}'}), 400 

269 fonts_dir = os.path.expanduser('~/.local/share/fonts') 

270 os.makedirs(fonts_dir, exist_ok=True) 

271 dest = os.path.join(fonts_dir, os.path.basename(path)) 

272 try: 

273 shutil.copy2(path, dest) 

274 _run(['fc-cache', '-f'], timeout=30) 

275 return jsonify({'installed': True, 'path': dest, 

276 'family': os.path.splitext(os.path.basename(path))[0]}) 

277 except (IOError, PermissionError) as e: 

278 return jsonify({'error': str(e)}), 500 

279 

280 @app.route('/api/shell/fonts/remove', methods=['POST']) 

281 def shell_fonts_remove(): 

282 data = request.get_json(force=True) 

283 family = data.get('family', '') 

284 if not family: 

285 return jsonify({'error': 'family required'}), 400 

286 fonts_dir = os.path.expanduser('~/.local/share/fonts') 

287 removed = [] 

288 if os.path.isdir(fonts_dir): 

289 for f in os.listdir(fonts_dir): 

290 if family.lower().replace(' ', '') in f.lower().replace(' ', ''): 

291 fp = os.path.join(fonts_dir, f) 

292 os.remove(fp) 

293 removed.append(f) 

294 if removed: 

295 _run(['fc-cache', '-f'], timeout=30) 

296 return jsonify({'removed': True, 'family': family, 'files': removed}) 

297 return jsonify({'removed': False, 'error': 'Font not found in user fonts (system fonts cannot be removed)'}), 404 

298 

299 # ─── 3. Sound Manager & System Sounds ────────────────── 

300 

301 _SOUND_EVENTS = [ 

302 {'id': 'bell', 'name': 'Notification'}, 

303 {'id': 'dialog-error', 'name': 'Error'}, 

304 {'id': 'dialog-warning', 'name': 'Warning'}, 

305 {'id': 'dialog-information', 'name': 'Information'}, 

306 {'id': 'desktop-login', 'name': 'Login'}, 

307 {'id': 'desktop-logout', 'name': 'Logout'}, 

308 {'id': 'device-added', 'name': 'Device Connected'}, 

309 {'id': 'device-removed', 'name': 'Device Disconnected'}, 

310 {'id': 'screen-capture', 'name': 'Screenshot'}, 

311 {'id': 'trash-empty', 'name': 'Empty Trash'}, 

312 {'id': 'message-new-instant', 'name': 'New Message'}, 

313 {'id': 'battery-low', 'name': 'Low Battery'}, 

314 ] 

315 

316 @app.route('/api/shell/sounds/themes', methods=['GET']) 

317 def shell_sounds_themes(): 

318 themes = [] 

319 for d in ['/usr/share/sounds', '/run/current-system/sw/share/sounds']: 

320 if not os.path.isdir(d): 

321 continue 

322 for name in os.listdir(d): 

323 theme_dir = os.path.join(d, name) 

324 if os.path.isdir(theme_dir) and name not in [t['id'] for t in themes]: 

325 themes.append({'id': name, 'name': name.replace('-', ' ').title(), 

326 'path': theme_dir}) 

327 if not themes: 

328 themes.append({'id': 'freedesktop', 'name': 'FreeDesktop (default)', 'path': ''}) 

329 cfg = _load_json(_config_path('sound-theme.json'), {'active': 'freedesktop', 'enabled': True}) 

330 return jsonify({'themes': themes, 'active': cfg.get('active', 'freedesktop'), 

331 'enabled': cfg.get('enabled', True)}) 

332 

333 @app.route('/api/shell/sounds/events', methods=['GET']) 

334 def shell_sounds_events(): 

335 cfg = _load_json(_config_path('sound-theme.json'), {'active': 'freedesktop', 'enabled': True}) 

336 overrides = _load_json(_config_path('sound-overrides.json'), {}) 

337 events = [] 

338 for evt in _SOUND_EVENTS: 

339 entry = dict(evt) 

340 if evt['id'] in overrides: 

341 entry['file'] = overrides[evt['id']] 

342 entry['custom'] = True 

343 else: 

344 for ext in ('oga', 'ogg', 'wav'): 

345 for base in ['/usr/share/sounds', '/run/current-system/sw/share/sounds']: 

346 p = os.path.join(base, cfg.get('active', 'freedesktop'), 'stereo', f"{evt['id']}.{ext}") 

347 if os.path.isfile(p): 

348 entry['file'] = p 

349 break 

350 events.append(entry) 

351 return jsonify({'events': events, 'enabled': cfg.get('enabled', True)}) 

352 

353 @app.route('/api/shell/sounds/set-theme', methods=['POST']) 

354 def shell_sounds_set_theme(): 

355 data = request.get_json(force=True) 

356 theme = data.get('theme', '') 

357 if not theme: 

358 return jsonify({'error': 'theme required'}), 400 

359 cfg = _load_json(_config_path('sound-theme.json'), {}) 

360 cfg['active'] = theme 

361 _save_json(_config_path('sound-theme.json'), cfg) 

362 return jsonify({'set': True, 'theme': theme}) 

363 

364 @app.route('/api/shell/sounds/set-event', methods=['POST']) 

365 def shell_sounds_set_event(): 

366 data = request.get_json(force=True) 

367 event = data.get('event', '') 

368 file = data.get('file', '') 

369 if not event or not file: 

370 return jsonify({'error': 'event and file required'}), 400 

371 overrides = _load_json(_config_path('sound-overrides.json'), {}) 

372 overrides[event] = file 

373 _save_json(_config_path('sound-overrides.json'), overrides) 

374 return jsonify({'set': True, 'event': event, 'file': file}) 

375 

376 @app.route('/api/shell/sounds/toggle', methods=['POST']) 

377 def shell_sounds_toggle(): 

378 data = request.get_json(force=True) 

379 enabled = data.get('enabled', True) 

380 cfg = _load_json(_config_path('sound-theme.json'), {}) 

381 cfg['enabled'] = bool(enabled) 

382 _save_json(_config_path('sound-theme.json'), cfg) 

383 return jsonify({'enabled': bool(enabled)}) 

384 

385 @app.route('/api/shell/sounds/play', methods=['POST']) 

386 def shell_sounds_play(): 

387 data = request.get_json(force=True) 

388 file = data.get('file', '') 

389 event = data.get('event', '') 

390 if not file and not event: 

391 return jsonify({'error': 'file or event required'}), 400 

392 if event and not file: 

393 overrides = _load_json(_config_path('sound-overrides.json'), {}) 

394 if event in overrides: 

395 file = overrides[event] 

396 else: 

397 cfg = _load_json(_config_path('sound-theme.json'), {'active': 'freedesktop'}) 

398 theme = cfg.get('active', 'freedesktop') 

399 for ext in ('oga', 'ogg', 'wav'): 

400 for base in ['/usr/share/sounds', '/run/current-system/sw/share/sounds']: 

401 p = os.path.join(base, theme, 'stereo', f'{event}.{ext}') 

402 if os.path.isfile(p): 

403 file = p 

404 break 

405 if file: 

406 break 

407 if not file: 

408 return jsonify({'played': False, 'error': 'Sound file not found'}), 404 

409 player = shutil.which('pw-play') or shutil.which('paplay') or shutil.which('aplay') 

410 if player: 

411 _run([player, file], timeout=5) 

412 return jsonify({'played': True, 'file': file}) 

413 return jsonify({'played': False, 'error': 'No audio player available'}), 500 

414 

415 # ─── 4. Clipboard Manager ────────────────────────────── 

416 

417 @app.route('/api/shell/clipboard/history', methods=['GET']) 

418 def shell_clipboard_history(): 

419 limit = int(request.args.get('limit', 50)) 

420 ctype = request.args.get('type', '') 

421 with _clipboard_lock: 

422 entries = list(_clipboard_history) 

423 if ctype: 

424 entries = [e for e in entries if ctype in e.get('content_type', '')] 

425 entries = entries[:limit] 

426 return jsonify({'entries': entries, 'count': len(entries)}) 

427 

428 @app.route('/api/shell/clipboard/current', methods=['GET']) 

429 def shell_clipboard_current(): 

430 if _is_wayland(): 

431 r = _run(['wl-paste', '--no-newline']) 

432 else: 

433 r = _run(['xclip', '-selection', 'clipboard', '-o']) 

434 content = r.stdout if r and r.returncode == 0 else '' 

435 return jsonify({'content': content, 'content_type': 'text/plain', 

436 'timestamp': time.time()}) 

437 

438 @app.route('/api/shell/clipboard/copy', methods=['POST']) 

439 def shell_clipboard_copy(): 

440 global _clipboard_counter 

441 data = request.get_json(force=True) 

442 content = data.get('content', '') 

443 ctype = data.get('content_type', 'text/plain') 

444 if not content: 

445 return jsonify({'error': 'content required'}), 400 

446 if _is_wayland(): 

447 _run(['wl-copy', content]) 

448 else: 

449 subprocess.run(['xclip', '-selection', 'clipboard'], 

450 input=content, text=True, timeout=5, 

451 capture_output=True) 

452 with _clipboard_lock: 

453 _clipboard_counter += 1 

454 _clipboard_history.appendleft({ 

455 'id': _clipboard_counter, 'content': content[:1000], 

456 'content_type': ctype, 'timestamp': time.time(), 

457 'pinned': False, 

458 }) 

459 return jsonify({'copied': True, 'id': _clipboard_counter}) 

460 

461 @app.route('/api/shell/clipboard/pin', methods=['POST']) 

462 def shell_clipboard_pin(): 

463 data = request.get_json(force=True) 

464 entry_id = data.get('id', 0) 

465 with _clipboard_lock: 

466 for e in _clipboard_history: 

467 if e['id'] == entry_id: 

468 e['pinned'] = True 

469 return jsonify({'pinned': True, 'id': entry_id}) 

470 return jsonify({'error': 'Entry not found'}), 404 

471 

472 @app.route('/api/shell/clipboard/clear', methods=['POST']) 

473 def shell_clipboard_clear(): 

474 data = request.get_json(force=True) 

475 if data.get('all'): 

476 with _clipboard_lock: 

477 pinned = [e for e in _clipboard_history if e.get('pinned')] 

478 count = len(_clipboard_history) - len(pinned) 

479 _clipboard_history.clear() 

480 _clipboard_history.extend(pinned) 

481 return jsonify({'cleared': count}) 

482 entry_id = data.get('id', 0) 

483 with _clipboard_lock: 

484 for i, e in enumerate(_clipboard_history): 

485 if e['id'] == entry_id: 

486 del _clipboard_history[i] 

487 return jsonify({'cleared': 1}) 

488 return jsonify({'error': 'Entry not found'}), 404 

489 

490 # ─── 5. Date/Time/Timezone ───────────────────────────── 

491 

492 @app.route('/api/shell/datetime', methods=['GET']) 

493 def shell_datetime(): 

494 info = {'timezone': 'UTC', 'utc_offset': '+00:00', 

495 'ntp_enabled': False, 'ntp_synced': False, 

496 'clock_format': '24h', 'rtc_in_local_time': False} 

497 r = _run(['timedatectl', 'show']) 

498 if r and r.returncode == 0: 

499 for line in r.stdout.strip().split('\n'): 

500 if '=' not in line: 

501 continue 

502 k, v = line.split('=', 1) 

503 if k == 'Timezone': 

504 info['timezone'] = v 

505 elif k == 'NTP': 

506 info['ntp_enabled'] = v == 'yes' 

507 elif k == 'NTPSynchronized': 

508 info['ntp_synced'] = v == 'yes' 

509 elif k == 'LocalRTC': 

510 info['rtc_in_local_time'] = v == 'yes' 

511 import datetime 

512 now = datetime.datetime.now() 

513 info['datetime'] = now.isoformat() 

514 tz = datetime.datetime.now(datetime.timezone.utc).astimezone() 

515 off = tz.strftime('%z') 

516 info['utc_offset'] = f'{off[:3]}:{off[3:]}' if len(off) >= 5 else off 

517 cfg = _load_json(_config_path('clock-format.json'), {'format': '24h'}) 

518 info['clock_format'] = cfg.get('format', '24h') 

519 return jsonify(info) 

520 

521 @app.route('/api/shell/datetime/timezones', methods=['GET']) 

522 def shell_datetime_timezones(): 

523 r = _run(['timedatectl', 'list-timezones'], timeout=5) 

524 if r and r.returncode == 0: 

525 zones = [z for z in r.stdout.strip().split('\n') if z] 

526 return jsonify({'timezones': zones, 'count': len(zones)}) 

527 import zoneinfo 

528 zones = sorted(zoneinfo.available_timezones()) 

529 return jsonify({'timezones': zones, 'count': len(zones)}) 

530 

531 @app.route('/api/shell/datetime/set-timezone', methods=['POST']) 

532 def shell_datetime_set_tz(): 

533 data = request.get_json(force=True) 

534 tz = data.get('timezone', '') 

535 if not tz or '/' not in tz: 

536 return jsonify({'error': 'Valid timezone required (e.g. US/Pacific)'}), 400 

537 r = _run(['timedatectl', 'set-timezone', tz]) 

538 if r and r.returncode == 0: 

539 return jsonify({'set': True, 'timezone': tz}) 

540 return jsonify({'set': False, 'error': r.stderr.strip() if r else 'timedatectl not available'}), 500 

541 

542 @app.route('/api/shell/datetime/set-ntp', methods=['POST']) 

543 def shell_datetime_set_ntp(): 

544 data = request.get_json(force=True) 

545 enabled = data.get('enabled', True) 

546 val = 'true' if enabled else 'false' 

547 r = _run(['timedatectl', 'set-ntp', val]) 

548 ok = r and r.returncode == 0 

549 return jsonify({'set': ok, 'ntp_enabled': bool(enabled)}) 

550 

551 @app.route('/api/shell/datetime/set-format', methods=['POST']) 

552 def shell_datetime_set_format(): 

553 data = request.get_json(force=True) 

554 fmt = data.get('format', '') 

555 if fmt not in ('12h', '24h'): 

556 return jsonify({'error': 'format must be 12h or 24h'}), 400 

557 _save_json(_config_path('clock-format.json'), {'format': fmt}) 

558 return jsonify({'set': True, 'clock_format': fmt}) 

559 

560 # ─── 6. Wallpaper Manager ────────────────────────────── 

561 

562 _WALL_CFG = _config_path('wallpaper.json') 

563 _WALL_EXTS = {'.png', '.jpg', '.jpeg', '.webp', '.svg', '.bmp'} 

564 

565 @app.route('/api/shell/wallpaper', methods=['GET']) 

566 def shell_wallpaper(): 

567 cfg = _load_json(_WALL_CFG, { 

568 'current': '', 'lock_screen': '', 

569 'mode': 'fill', 

570 'slideshow': {'enabled': False, 'interval_minutes': 30, 'directory': ''}, 

571 }) 

572 return jsonify(cfg) 

573 

574 @app.route('/api/shell/wallpaper/collection', methods=['GET']) 

575 def shell_wallpaper_collection(): 

576 directory = request.args.get('directory', '/usr/share/backgrounds') 

577 if not os.path.isdir(directory): 

578 return jsonify({'images': [], 'count': 0}) 

579 images = [] 

580 for f in sorted(os.listdir(directory)): 

581 ext = os.path.splitext(f)[1].lower() 

582 if ext not in _WALL_EXTS: 

583 continue 

584 fp = os.path.join(directory, f) 

585 try: 

586 stat = os.stat(fp) 

587 images.append({ 

588 'path': fp, 'name': os.path.splitext(f)[0], 

589 'size': stat.st_size, 

590 }) 

591 except OSError: 

592 pass 

593 return jsonify({'images': images, 'count': len(images)}) 

594 

595 @app.route('/api/shell/wallpaper/set', methods=['POST']) 

596 def shell_wallpaper_set(): 

597 data = request.get_json(force=True) 

598 path = data.get('path', '') 

599 mode = data.get('mode', 'fill') 

600 if not path: 

601 return jsonify({'error': 'path required'}), 400 

602 if _is_wayland(): 

603 _run(['swaymsg', 'output', '*', 'bg', path, mode]) 

604 else: 

605 if mode == 'fill': 

606 _run(['feh', '--bg-fill', path]) 

607 elif mode == 'fit': 

608 _run(['feh', '--bg-max', path]) 

609 elif mode == 'center': 

610 _run(['feh', '--bg-center', path]) 

611 elif mode == 'tile': 

612 _run(['feh', '--bg-tile', path]) 

613 else: 

614 _run(['feh', '--bg-scale', path]) 

615 cfg = _load_json(_WALL_CFG, {}) 

616 cfg['current'] = path 

617 cfg['mode'] = mode 

618 _save_json(_WALL_CFG, cfg) 

619 return jsonify({'set': True, 'path': path, 'mode': mode}) 

620 

621 @app.route('/api/shell/wallpaper/set-lock', methods=['POST']) 

622 def shell_wallpaper_set_lock(): 

623 data = request.get_json(force=True) 

624 path = data.get('path', '') 

625 if not path: 

626 return jsonify({'error': 'path required'}), 400 

627 cfg = _load_json(_WALL_CFG, {}) 

628 cfg['lock_screen'] = path 

629 _save_json(_WALL_CFG, cfg) 

630 return jsonify({'set': True, 'lock_screen': path}) 

631 

632 @app.route('/api/shell/wallpaper/slideshow', methods=['POST']) 

633 def shell_wallpaper_slideshow(): 

634 data = request.get_json(force=True) 

635 enabled = data.get('enabled', False) 

636 interval = data.get('interval_minutes', 30) 

637 directory = data.get('directory', '') 

638 img_count = 0 

639 if directory and os.path.isdir(directory): 

640 img_count = sum(1 for f in os.listdir(directory) 

641 if os.path.splitext(f)[1].lower() in _WALL_EXTS) 

642 cfg = _load_json(_WALL_CFG, {}) 

643 cfg['slideshow'] = { 

644 'enabled': bool(enabled), 

645 'interval_minutes': int(interval), 

646 'directory': directory, 

647 } 

648 _save_json(_WALL_CFG, cfg) 

649 return jsonify({'slideshow': {**cfg['slideshow'], 'image_count': img_count}}) 

650 

651 # ─── 7. Input Methods / Keyboard Layouts ─────────────── 

652 

653 @app.route('/api/shell/input-methods', methods=['GET']) 

654 def shell_input_methods(): 

655 info = {'layouts': [], 'active': '', 'compose_key': '', 

656 'ime': {'engine': 'none', 'running': False, 'input_methods': []}} 

657 r = _run(['setxkbmap', '-query']) 

658 if r and r.returncode == 0: 

659 for line in r.stdout.strip().split('\n'): 

660 if ':' not in line: 

661 continue 

662 key, val = line.split(':', 1) 

663 key, val = key.strip(), val.strip() 

664 if key == 'layout': 

665 layouts = val.split(',') 

666 info['active'] = layouts[0] if layouts else '' 

667 for l in layouts: 

668 info['layouts'].append({'id': l.strip(), 'name': l.strip()}) 

669 elif key == 'options': 

670 for opt in val.split(','): 

671 opt = opt.strip() 

672 if opt.startswith('compose:'): 

673 info['compose_key'] = opt.split(':')[1] 

674 for engine in ('fcitx5', 'ibus-daemon'): 

675 r2 = _run(['pgrep', '-x', engine]) 

676 if r2 and r2.returncode == 0: 

677 info['ime']['engine'] = engine.replace('-daemon', '') 

678 info['ime']['running'] = True 

679 break 

680 return jsonify(info) 

681 

682 @app.route('/api/shell/input-methods/available', methods=['GET']) 

683 def shell_input_methods_available(): 

684 r = _run(['localectl', 'list-x11-keymap-layouts'], timeout=5) 

685 layouts = [] 

686 if r and r.returncode == 0: 

687 for lay in r.stdout.strip().split('\n'): 

688 if lay.strip(): 

689 layouts.append({'id': lay.strip(), 'name': lay.strip()}) 

690 ime_engines = ['pinyin', 'mozc', 'hangul', 'anthy', 'chewing', 'libpinyin'] 

691 return jsonify({'layouts': layouts[:200], 'ime_engines': ime_engines}) 

692 

693 @app.route('/api/shell/input-methods/add', methods=['POST']) 

694 def shell_input_methods_add(): 

695 data = request.get_json(force=True) 

696 layout = data.get('layout', '') 

697 if not layout: 

698 return jsonify({'error': 'layout required'}), 400 

699 r = _run(['setxkbmap', '-query']) 

700 current = '' 

701 if r and r.returncode == 0: 

702 for line in r.stdout.strip().split('\n'): 

703 if line.strip().startswith('layout'): 

704 current = line.split(':', 1)[1].strip() 

705 new_layouts = f'{current},{layout}' if current else layout 

706 _run(['setxkbmap', '-layout', new_layouts, '-option', 'grp:alt_shift_toggle']) 

707 return jsonify({'added': True, 'layout': layout, 'all_layouts': new_layouts}) 

708 

709 @app.route('/api/shell/input-methods/remove', methods=['POST']) 

710 def shell_input_methods_remove(): 

711 data = request.get_json(force=True) 

712 layout = data.get('layout', '') 

713 if not layout: 

714 return jsonify({'error': 'layout required'}), 400 

715 r = _run(['setxkbmap', '-query']) 

716 current_layouts = [] 

717 if r and r.returncode == 0: 

718 for line in r.stdout.strip().split('\n'): 

719 if line.strip().startswith('layout'): 

720 current_layouts = [l.strip() for l in line.split(':', 1)[1].strip().split(',')] 

721 if layout in current_layouts: 

722 current_layouts.remove(layout) 

723 if current_layouts: 

724 _run(['setxkbmap', '-layout', ','.join(current_layouts)]) 

725 return jsonify({'removed': True, 'layout': layout}) 

726 

727 @app.route('/api/shell/input-methods/switch', methods=['POST']) 

728 def shell_input_methods_switch(): 

729 data = request.get_json(force=True) 

730 layout = data.get('layout', '') 

731 if not layout: 

732 return jsonify({'error': 'layout required'}), 400 

733 _run(['setxkbmap', layout]) 

734 return jsonify({'switched': True, 'active': layout}) 

735 

736 @app.route('/api/shell/input-methods/compose-key', methods=['POST']) 

737 def shell_input_methods_compose_key(): 

738 data = request.get_json(force=True) 

739 key = data.get('key', 'ralt') 

740 _run(['setxkbmap', '-option', f'compose:{key}']) 

741 return jsonify({'set': True, 'compose_key': key}) 

742 

743 # ─── 8. Night Light / Blue Light Filter ──────────────── 

744 

745 _NL_CFG = _config_path('nightlight.json') 

746 

747 @app.route('/api/shell/nightlight', methods=['GET']) 

748 def shell_nightlight(): 

749 cfg = _load_json(_NL_CFG, { 

750 'enabled': False, 'temperature': 4500, 

751 'schedule': {'mode': 'sunset', 'start': '20:00', 'end': '06:00'}, 

752 }) 

753 active = False 

754 for proc in ('gammastep', 'redshift'): 

755 r = _run(['pgrep', '-x', proc]) 

756 if r and r.returncode == 0: 

757 active = True 

758 break 

759 cfg['active'] = active 

760 return jsonify(cfg) 

761 

762 @app.route('/api/shell/nightlight/toggle', methods=['POST']) 

763 def shell_nightlight_toggle(): 

764 data = request.get_json(force=True) 

765 enabled = data.get('enabled', True) 

766 cfg = _load_json(_NL_CFG, {'temperature': 4500}) 

767 cfg['enabled'] = bool(enabled) 

768 _save_json(_NL_CFG, cfg) 

769 if enabled: 

770 temp = cfg.get('temperature', 4500) 

771 tool = shutil.which('gammastep') or shutil.which('redshift') 

772 if tool: 

773 _run(['pkill', '-x', os.path.basename(tool)]) 

774 subprocess.Popen([tool, '-O', str(temp)], 

775 stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) 

776 return jsonify({'enabled': True, 'active': True, 'temperature': temp}) 

777 return jsonify({'enabled': True, 'active': False, 

778 'error': 'Neither gammastep nor redshift available'}), 500 

779 for proc in ('gammastep', 'redshift'): 

780 _run(['pkill', '-x', proc]) 

781 return jsonify({'enabled': False, 'active': False}) 

782 

783 @app.route('/api/shell/nightlight/temperature', methods=['POST']) 

784 def shell_nightlight_temperature(): 

785 data = request.get_json(force=True) 

786 temp = data.get('temperature', 4500) 

787 if not (1000 <= temp <= 6500): 

788 return jsonify({'error': 'temperature must be 1000-6500'}), 400 

789 cfg = _load_json(_NL_CFG, {}) 

790 cfg['temperature'] = temp 

791 _save_json(_NL_CFG, cfg) 

792 tool = shutil.which('gammastep') or shutil.which('redshift') 

793 if tool: 

794 _run(['pkill', '-x', os.path.basename(tool)]) 

795 subprocess.Popen([tool, '-O', str(temp)], 

796 stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) 

797 return jsonify({'set': True, 'temperature': temp}) 

798 

799 @app.route('/api/shell/nightlight/schedule', methods=['POST']) 

800 def shell_nightlight_schedule(): 

801 data = request.get_json(force=True) 

802 mode = data.get('mode', 'manual') 

803 if mode not in ('sunset', 'manual', 'disabled'): 

804 return jsonify({'error': 'mode must be sunset, manual, or disabled'}), 400 

805 cfg = _load_json(_NL_CFG, {}) 

806 cfg['schedule'] = { 

807 'mode': mode, 

808 'start': data.get('start', '20:00'), 

809 'end': data.get('end', '06:00'), 

810 } 

811 _save_json(_NL_CFG, cfg) 

812 return jsonify({'set': True, 'schedule': cfg['schedule']}) 

813 

814 # ─── 9. Window / Workspace Manager ───────────────────── 

815 

816 def _detect_compositor(): 

817 if os.environ.get('SWAYSOCK'): 

818 return 'sway' 

819 if os.environ.get('HYPRLAND_INSTANCE_SIGNATURE'): 

820 return 'hyprland' 

821 if _is_wayland(): 

822 return 'wayland' 

823 return 'x11' 

824 

825 @app.route('/api/shell/workspaces', methods=['GET']) 

826 def shell_workspaces(): 

827 comp = _detect_compositor() 

828 workspaces = [] 

829 if comp == 'sway': 

830 r = _run(['swaymsg', '-t', 'get_workspaces', '--raw']) 

831 if r and r.returncode == 0: 

832 try: 

833 for ws in json.loads(r.stdout): 

834 workspaces.append({ 

835 'id': ws.get('num', ws.get('id')), 

836 'name': ws.get('name', ''), 

837 'focused': ws.get('focused', False), 

838 'visible': ws.get('visible', False), 

839 }) 

840 except json.JSONDecodeError: 

841 pass 

842 elif comp == 'x11': 

843 r = _run(['wmctrl', '-d']) 

844 if r and r.returncode == 0: 

845 for line in r.stdout.strip().split('\n'): 

846 parts = line.split() 

847 if len(parts) >= 9: 

848 workspaces.append({ 

849 'id': int(parts[0]), 

850 'name': parts[-1], 

851 'focused': parts[1] == '*', 

852 'visible': parts[1] == '*', 

853 }) 

854 if not workspaces: 

855 workspaces.append({'id': 1, 'name': 'Main', 'focused': True, 'visible': True}) 

856 return jsonify({'workspaces': workspaces, 'compositor': comp}) 

857 

858 @app.route('/api/shell/workspaces/windows', methods=['GET']) 

859 def shell_workspaces_windows(): 

860 comp = _detect_compositor() 

861 workspace = request.args.get('workspace', '') 

862 windows = [] 

863 if comp == 'sway': 

864 r = _run(['swaymsg', '-t', 'get_tree', '--raw']) 

865 if r and r.returncode == 0: 

866 try: 

867 tree = json.loads(r.stdout) 

868 _extract_sway_windows(tree, windows) 

869 except json.JSONDecodeError: 

870 pass 

871 elif comp == 'x11': 

872 r = _run(['wmctrl', '-l', '-G']) 

873 if r and r.returncode == 0: 

874 for line in r.stdout.strip().split('\n'): 

875 parts = line.split(None, 7) 

876 if len(parts) >= 8: 

877 windows.append({ 

878 'id': parts[0], 'workspace': int(parts[1]), 

879 'x': int(parts[2]), 'y': int(parts[3]), 

880 'w': int(parts[4]), 'h': int(parts[5]), 

881 'title': parts[7] if len(parts) > 7 else '', 

882 }) 

883 if workspace: 

884 try: 

885 ws_id = int(workspace) 

886 windows = [w for w in windows if w.get('workspace') == ws_id] 

887 except ValueError: 

888 pass 

889 return jsonify({'windows': windows, 'count': len(windows)}) 

890 

891 def _extract_sway_windows(node, results): 

892 if node.get('type') == 'con' and node.get('name') and node.get('pid'): 

893 rect = node.get('rect', {}) 

894 results.append({ 

895 'id': node.get('id'), 

896 'title': node.get('name', ''), 

897 'app': node.get('app_id', ''), 

898 'workspace': node.get('workspace', ''), 

899 'focused': node.get('focused', False), 

900 'x': rect.get('x', 0), 'y': rect.get('y', 0), 

901 'w': rect.get('width', 0), 'h': rect.get('height', 0), 

902 }) 

903 for child in node.get('nodes', []) + node.get('floating_nodes', []): 

904 _extract_sway_windows(child, results) 

905 

906 @app.route('/api/shell/workspaces/create', methods=['POST']) 

907 def shell_workspaces_create(): 

908 data = request.get_json(force=True) 

909 name = data.get('name', '') 

910 comp = _detect_compositor() 

911 if comp == 'sway': 

912 ws_name = name or str(int(time.time()) % 100) 

913 _run(['swaymsg', f'workspace {ws_name}']) 

914 return jsonify({'created': True, 'name': ws_name}) 

915 return jsonify({'created': False, 'error': f'{comp}: workspace creation not supported'}), 500 

916 

917 @app.route('/api/shell/workspaces/switch', methods=['POST']) 

918 def shell_workspaces_switch(): 

919 data = request.get_json(force=True) 

920 ws_id = data.get('id', '') 

921 name = data.get('name', str(ws_id)) 

922 comp = _detect_compositor() 

923 if comp == 'sway': 

924 _run(['swaymsg', f'workspace {name}']) 

925 return jsonify({'switched': True, 'workspace': name}) 

926 elif comp == 'x11': 

927 _run(['wmctrl', '-s', str(ws_id)]) 

928 return jsonify({'switched': True, 'workspace': ws_id}) 

929 return jsonify({'switched': False}), 500 

930 

931 @app.route('/api/shell/workspaces/close', methods=['POST']) 

932 def shell_workspaces_close(): 

933 data = request.get_json(force=True) 

934 ws_id = data.get('id', '') 

935 return jsonify({'closed': True, 'id': ws_id}) 

936 

937 _SNAP_MAP = { 

938 'left-half': 'move position 0 0; resize set 50 ppt 100 ppt', 

939 'right-half': 'move position 50 ppt 0; resize set 50 ppt 100 ppt', 

940 'top-half': 'move position 0 0; resize set 100 ppt 50 ppt', 

941 'bottom-half': 'move position 0 50 ppt; resize set 100 ppt 50 ppt', 

942 'top-left': 'move position 0 0; resize set 50 ppt 50 ppt', 

943 'top-right': 'move position 50 ppt 0; resize set 50 ppt 50 ppt', 

944 'bottom-left': 'move position 0 50 ppt; resize set 50 ppt 50 ppt', 

945 'bottom-right': 'move position 50 ppt 50 ppt; resize set 50 ppt 50 ppt', 

946 'maximize': 'fullscreen enable', 

947 'center': 'move position center', 

948 } 

949 

950 @app.route('/api/shell/workspaces/snap', methods=['POST']) 

951 def shell_workspaces_snap(): 

952 data = request.get_json(force=True) 

953 window_id = data.get('window_id', '') 

954 position = data.get('position', '') 

955 if position not in _SNAP_MAP: 

956 return jsonify({'error': f'Invalid position. Valid: {list(_SNAP_MAP.keys())}'}), 400 

957 comp = _detect_compositor() 

958 if comp == 'sway': 

959 cmd_str = _SNAP_MAP[position] 

960 if window_id: 

961 _run(['swaymsg', f'[con_id={window_id}] {cmd_str}']) 

962 else: 

963 _run(['swaymsg', cmd_str]) 

964 return jsonify({'snapped': True, 'position': position}) 

965 elif comp == 'x11' and window_id: 

966 if position == 'maximize': 

967 _run(['wmctrl', '-i', '-r', window_id, '-b', 'add,maximized_vert,maximized_horz']) 

968 return jsonify({'snapped': True, 'position': position}) 

969 return jsonify({'snapped': False, 'error': f'{comp}: snap not supported'}), 500 

970 

971 # ─── Multi-Monitor Management ────────────────────────── 

972 

973 @app.route('/api/shell/displays', methods=['GET']) 

974 def shell_displays_list(): 

975 """List connected displays with resolution and position.""" 

976 displays = [] 

977 if _is_wayland(): 

978 r = _run(['swaymsg', '-t', 'get_outputs', '-r'], timeout=5) 

979 if r and r.returncode == 0: 

980 try: 

981 outputs = json.loads(r.stdout) 

982 for out in outputs: 

983 displays.append({ 

984 'name': out.get('name', ''), 

985 'make': out.get('make', ''), 

986 'model': out.get('model', ''), 

987 'resolution': f"{out.get('rect', {}).get('width', 0)}x{out.get('rect', {}).get('height', 0)}", 

988 'position': f"{out.get('rect', {}).get('x', 0)},{out.get('rect', {}).get('y', 0)}", 

989 'scale': out.get('scale', 1.0), 

990 'active': out.get('active', False), 

991 }) 

992 except (json.JSONDecodeError, KeyError): 

993 pass 

994 else: 

995 r = _run(['xrandr', '--query'], timeout=5) 

996 if r and r.returncode == 0: 

997 import re 

998 for line in r.stdout.split('\n'): 

999 m = re.match(r'^(\S+)\s+(connected|disconnected)\s*(primary)?\s*(\d+x\d+\+\d+\+\d+)?', line) 

1000 if m and m.group(2) == 'connected': 

1001 displays.append({ 

1002 'name': m.group(1), 

1003 'primary': m.group(3) == 'primary', 

1004 'geometry': m.group(4) or '', 

1005 'connected': True, 

1006 }) 

1007 return jsonify({'displays': displays, 'compositor': 'wayland' if _is_wayland() else 'x11'}) 

1008 

1009 @app.route('/api/shell/displays/arrange', methods=['PUT']) 

1010 def shell_displays_arrange(): 

1011 """Arrange displays (position, resolution, scale).""" 

1012 body = request.get_json(silent=True) or {} 

1013 display = body.get('display', '') 

1014 if not display: 

1015 return jsonify({'error': 'display name is required'}), 400 

1016 if _is_wayland(): 

1017 cmd = ['swaymsg', 'output', display] 

1018 if 'position' in body: 

1019 cmd += ['position', str(body['position'])] 

1020 if 'resolution' in body: 

1021 cmd += ['resolution', str(body['resolution'])] 

1022 if 'scale' in body: 

1023 cmd += ['scale', str(body['scale'])] 

1024 r = _run(cmd, timeout=5) 

1025 ok = r and r.returncode == 0 

1026 else: 

1027 cmd = ['xrandr', '--output', display] 

1028 if 'resolution' in body: 

1029 cmd += ['--mode', str(body['resolution'])] 

1030 if 'position' in body: 

1031 cmd += ['--pos', str(body['position'])] 

1032 r = _run(cmd, timeout=5) 

1033 ok = r and r.returncode == 0 

1034 return jsonify({'status': 'ok' if ok else 'error', 

1035 'message': (r.stderr if r else 'Command failed') if not ok else ''}) 

1036 

1037 # ─── HiDPI Scaling ───────────────────────────────────── 

1038 

1039 @app.route('/api/shell/display/scale', methods=['GET', 'PUT']) 

1040 def shell_display_scale(): 

1041 """Get or set display scale factor.""" 

1042 if request.method == 'GET': 

1043 scale = 1.0 

1044 if _is_wayland(): 

1045 r = _run(['swaymsg', '-t', 'get_outputs', '-r'], timeout=5) 

1046 if r and r.returncode == 0: 

1047 try: 

1048 outputs = json.loads(r.stdout) 

1049 if outputs: 

1050 scale = outputs[0].get('scale', 1.0) 

1051 except (json.JSONDecodeError, KeyError): 

1052 pass 

1053 else: 

1054 gdk_scale = os.environ.get('GDK_SCALE', '1') 

1055 try: 

1056 scale = float(gdk_scale) 

1057 except ValueError: 

1058 scale = 1.0 

1059 return jsonify({'scale': scale, 'compositor': 'wayland' if _is_wayland() else 'x11'}) 

1060 # PUT 

1061 body = request.get_json(silent=True) or {} 

1062 scale = body.get('scale', 1.0) 

1063 display = body.get('display', '') 

1064 if _is_wayland() and display: 

1065 r = _run(['swaymsg', 'output', display, 'scale', str(scale)], timeout=5) 

1066 ok = r and r.returncode == 0 

1067 else: 

1068 # X11 fallback: set GDK_SCALE env 

1069 os.environ['GDK_SCALE'] = str(int(scale)) 

1070 ok = True 

1071 return jsonify({'status': 'ok' if ok else 'error', 'scale': scale}) 

1072 

1073 # ─── Per-App Volume Control ──────────────────────────── 

1074 

1075 @app.route('/api/shell/audio/apps', methods=['GET']) 

1076 def shell_audio_per_app(): 

1077 """List audio streams with per-app volume.""" 

1078 apps = [] 

1079 r = _run(['pw-cli', 'list-objects'], timeout=5) 

1080 if r and r.returncode == 0: 

1081 current = {} 

1082 for line in r.stdout.split('\n'): 

1083 line = line.strip() 

1084 if line.startswith('id '): 

1085 if current.get('type') == 'PipeWire:Interface:Node': 

1086 apps.append(current) 

1087 current = {'id': line.split()[1].rstrip(',')} 

1088 elif 'type' in line and 'PipeWire:Interface:Node' in line: 

1089 current['type'] = 'PipeWire:Interface:Node' 

1090 elif 'application.name' in line: 

1091 current['name'] = line.split('=', 1)[1].strip().strip('"') 

1092 elif 'node.name' in line: 

1093 current['node_name'] = line.split('=', 1)[1].strip().strip('"') 

1094 if current.get('type') == 'PipeWire:Interface:Node': 

1095 apps.append(current) 

1096 # Filter to only app nodes 

1097 app_nodes = [a for a in apps if a.get('name') or a.get('node_name')] 

1098 return jsonify({'apps': app_nodes}) 

1099 

1100 @app.route('/api/shell/audio/apps/<node_id>/volume', methods=['PUT']) 

1101 def shell_audio_per_app_volume(node_id): 

1102 """Set volume for a specific app/node.""" 

1103 body = request.get_json(silent=True) or {} 

1104 volume = body.get('volume', 1.0) 

1105 if not isinstance(volume, (int, float)) or volume < 0 or volume > 2.0: 

1106 return jsonify({'error': 'volume must be 0.0 to 2.0'}), 400 

1107 r = _run(['pw-cli', 'set-param', node_id, 'Props', 

1108 json.dumps({'volume': volume})], timeout=5) 

1109 if r and r.returncode == 0: 

1110 return jsonify({'status': 'ok', 'node_id': node_id, 'volume': volume}) 

1111 return jsonify({'error': r.stderr if r else 'pw-cli not available'}), 500 

1112 

1113 # ─── RTL Layout Support ──────────────────────────────── 

1114 

1115 _RTL_LOCALES = {'ar', 'he', 'fa', 'ur', 'yi', 'ps', 'sd'} 

1116 

1117 @app.route('/api/shell/rtl/status', methods=['GET']) 

1118 def shell_rtl_status(): 

1119 """Check if current locale requires RTL layout.""" 

1120 locale_val = os.environ.get('LANG', 'en_US.UTF-8') 

1121 lang_code = locale_val.split('_')[0].lower() 

1122 is_rtl = lang_code in _RTL_LOCALES 

1123 return jsonify({ 

1124 'rtl': is_rtl, 

1125 'locale': locale_val, 

1126 'css_direction': 'rtl' if is_rtl else 'ltr', 

1127 }) 

1128 

1129 # ─── 10. Keyboard Shortcuts ──────────────────────────── 

1130 

1131 # Two profiles: 'windows' (default) and 'mac' 

1132 # Users can switch between them without conflicts 

1133 _SHORTCUTS_CFG = _config_path('keyboard_shortcuts.json') 

1134 

1135 _SHORTCUT_PROFILES = { 

1136 'windows': { 

1137 'close_window': 'Alt+F4', 

1138 'minimize': 'Super+H', 

1139 'maximize': 'Super+Up', 

1140 'switch_apps': 'Alt+Tab', 

1141 'switch_windows': 'Alt+`', 

1142 'file_manager': 'Super+E', 

1143 'terminal': 'Ctrl+Alt+T', 

1144 'lock_screen': 'Super+L', 

1145 'search': 'Super+S', 

1146 'screenshot': 'Print', 

1147 'screenshot_window': 'Alt+Print', 

1148 'screenshot_area': 'Shift+Print', 

1149 'workspace_left': 'Super+Ctrl+Left', 

1150 'workspace_right': 'Super+Ctrl+Right', 

1151 'move_workspace_left': 'Super+Shift+Left', 

1152 'move_workspace_right': 'Super+Shift+Right', 

1153 'snap_left': 'Super+Left', 

1154 'snap_right': 'Super+Right', 

1155 'overview': 'Super', 

1156 'app_grid': 'Super+A', 

1157 'task_manager': 'Ctrl+Shift+Escape', 

1158 'browser': 'Super+B', 

1159 'calculator': 'Super+C', 

1160 'copy': 'Ctrl+C', 

1161 'paste': 'Ctrl+V', 

1162 'cut': 'Ctrl+X', 

1163 'undo': 'Ctrl+Z', 

1164 'redo': 'Ctrl+Shift+Z', 

1165 'select_all': 'Ctrl+A', 

1166 'save': 'Ctrl+S', 

1167 'find': 'Ctrl+F', 

1168 }, 

1169 'mac': { 

1170 'close_window': 'Super+W', 

1171 'minimize': 'Super+M', 

1172 'maximize': 'Super+Ctrl+F', 

1173 'switch_apps': 'Super+Tab', 

1174 'switch_windows': 'Super+`', 

1175 'file_manager': 'Super+Shift+F', 

1176 'terminal': 'Super+Space', 

1177 'lock_screen': 'Super+Ctrl+Q', 

1178 'search': 'Super+Space', 

1179 'screenshot': 'Super+Shift+3', 

1180 'screenshot_window': 'Super+Shift+4', 

1181 'screenshot_area': 'Super+Shift+5', 

1182 'workspace_left': 'Ctrl+Left', 

1183 'workspace_right': 'Ctrl+Right', 

1184 'move_workspace_left': 'Ctrl+Shift+Left', 

1185 'move_workspace_right': 'Ctrl+Shift+Right', 

1186 'snap_left': 'Super+Ctrl+Left', 

1187 'snap_right': 'Super+Ctrl+Right', 

1188 'overview': 'Super+Up', 

1189 'app_grid': 'F4', 

1190 'task_manager': 'Super+Alt+Escape', 

1191 'browser': 'Super+Shift+B', 

1192 'calculator': 'Super+Shift+C', 

1193 'copy': 'Super+C', 

1194 'paste': 'Super+V', 

1195 'cut': 'Super+X', 

1196 'undo': 'Super+Z', 

1197 'redo': 'Super+Shift+Z', 

1198 'select_all': 'Super+A', 

1199 'save': 'Super+S', 

1200 'find': 'Super+F', 

1201 }, 

1202 } 

1203 

1204 @app.route('/api/shell/shortcuts', methods=['GET']) 

1205 def shell_shortcuts(): 

1206 """Get current keyboard shortcuts with active profile.""" 

1207 cfg = _load_json(_SHORTCUTS_CFG, { 

1208 'profile': 'windows', 

1209 'custom': {}, 

1210 }) 

1211 profile_name = cfg.get('profile', 'windows') 

1212 base = dict(_SHORTCUT_PROFILES.get(profile_name, _SHORTCUT_PROFILES['windows'])) 

1213 # Custom overrides on top of profile 

1214 base.update(cfg.get('custom', {})) 

1215 return jsonify({ 

1216 'profile': profile_name, 

1217 'available_profiles': list(_SHORTCUT_PROFILES.keys()), 

1218 'shortcuts': base, 

1219 }) 

1220 

1221 @app.route('/api/shell/shortcuts/profile', methods=['POST']) 

1222 def shell_shortcuts_set_profile(): 

1223 """Switch shortcut profile (windows or mac).""" 

1224 data = request.get_json(force=True) 

1225 profile = data.get('profile', 'windows') 

1226 if profile not in _SHORTCUT_PROFILES: 

1227 return jsonify({'error': f'Unknown profile: {profile}. ' 

1228 f'Available: {list(_SHORTCUT_PROFILES.keys())}'}), 400 

1229 cfg = _load_json(_SHORTCUTS_CFG, {'profile': 'windows', 'custom': {}}) 

1230 cfg['profile'] = profile 

1231 _save_json(_SHORTCUTS_CFG, cfg) 

1232 return jsonify({'profile': profile, 'shortcuts': _SHORTCUT_PROFILES[profile]}) 

1233 

1234 @app.route('/api/shell/shortcuts/set', methods=['POST']) 

1235 def shell_shortcuts_set(): 

1236 """Set a custom keybinding (overrides profile default).""" 

1237 data = request.get_json(force=True) 

1238 action = data.get('action', '') 

1239 binding = data.get('binding', '') 

1240 if not action or not binding: 

1241 return jsonify({'error': 'action and binding required'}), 400 

1242 cfg = _load_json(_SHORTCUTS_CFG, {'profile': 'windows', 'custom': {}}) 

1243 if 'custom' not in cfg: 

1244 cfg['custom'] = {} 

1245 cfg['custom'][action] = binding 

1246 _save_json(_SHORTCUTS_CFG, cfg) 

1247 return jsonify({'set': True, 'action': action, 'binding': binding}) 

1248 

1249 @app.route('/api/shell/shortcuts/reset', methods=['POST']) 

1250 def shell_shortcuts_reset(): 

1251 """Reset all custom overrides, revert to profile defaults.""" 

1252 cfg = _load_json(_SHORTCUTS_CFG, {'profile': 'windows', 'custom': {}}) 

1253 cfg['custom'] = {} 

1254 _save_json(_SHORTCUTS_CFG, cfg) 

1255 return jsonify({'reset': True, 'profile': cfg['profile']}) 

1256 

1257 # ─── 15. On-Screen Keyboard ───────────────────────────── 

1258 

1259 @app.route('/api/shell/keyboard/osk-status', methods=['GET']) 

1260 def shell_osk_status(): 

1261 """Get on-screen keyboard status.""" 

1262 backends = ['squeekboard', 'onboard', 'maliit-keyboard'] 

1263 running = None 

1264 for name in backends: 

1265 r = _run(['pgrep', '-x', name], timeout=3) 

1266 if r and r.returncode == 0: 

1267 running = name 

1268 break 

1269 

1270 enabled = False 

1271 r = _run(['systemctl', '--user', 'is-active', 'squeekboard'], timeout=3) 

1272 if r and r.returncode == 0 and 'active' in r.stdout: 

1273 enabled = True 

1274 

1275 return jsonify({ 

1276 'enabled': enabled, 

1277 'running': running is not None, 

1278 'backend': running or 'none', 

1279 'available': [b for b in backends 

1280 if _run(['which', b], timeout=2) is not None 

1281 and _run(['which', b], timeout=2).returncode == 0], 

1282 }) 

1283 

1284 @app.route('/api/shell/keyboard/osk-toggle', methods=['POST']) 

1285 @_require_desktop_auth 

1286 def shell_osk_toggle(): 

1287 """Toggle on-screen keyboard.""" 

1288 body = request.get_json(silent=True) or {} 

1289 enable = body.get('enable') # None = toggle 

1290 

1291 # Check current state 

1292 r = _run(['pgrep', '-x', 'squeekboard'], timeout=3) 

1293 is_running = r and r.returncode == 0 

1294 

1295 if enable is None: 

1296 enable = not is_running 

1297 

1298 if enable and not is_running: 

1299 _run(['systemctl', '--user', 'start', 'squeekboard'], timeout=5) 

1300 elif not enable and is_running: 

1301 _run(['systemctl', '--user', 'stop', 'squeekboard'], timeout=5) 

1302 

1303 return jsonify({'enabled': enable}) 

1304 

1305 # ─── Voice Control (AI-native: Whisper STT → HART agent) ── 

1306 

1307 _voice_state = {'listening': False, 'pid': None, 'last_transcript': None} 

1308 _voice_lock = threading.Lock() 

1309 

1310 @app.route('/api/shell/voice/status', methods=['GET']) 

1311 def shell_voice_status(): 

1312 """Get voice control status.""" 

1313 with _voice_lock: 

1314 return jsonify({ 

1315 'listening': _voice_state['listening'], 

1316 'stt_available': _check_whisper_available(), 

1317 'last_transcript': _voice_state['last_transcript'], 

1318 }) 

1319 

1320 def _check_whisper_available(): 

1321 """Check if Whisper STT is available.""" 

1322 try: 

1323 from integrations.service_tools.whisper_tool import whisper_transcribe 

1324 return True 

1325 except ImportError: 

1326 return False 

1327 

1328 @app.route('/api/shell/voice/start', methods=['POST']) 

1329 @_require_desktop_auth 

1330 def shell_voice_start(): 

1331 """Start listening for voice commands. 

1332 

1333 AI-native: Uses Whisper STT (local, offline) to transcribe, 

1334 then dispatches the transcript to the HART agent pipeline 

1335 for full natural language understanding — not keyword matching. 

1336 """ 

1337 with _voice_lock: 

1338 if _voice_state['listening']: 

1339 return jsonify({'error': 'already listening'}), 409 

1340 

1341 if not _check_whisper_available(): 

1342 return jsonify({'error': 'whisper STT not available'}), 503 

1343 

1344 with _voice_lock: 

1345 _voice_state['listening'] = True 

1346 

1347 _audit_desktop_op('voice_start', {}) 

1348 return jsonify({'started': True, 'mode': 'whisper_stt_to_agent'}) 

1349 

1350 @app.route('/api/shell/voice/stop', methods=['POST']) 

1351 @_require_desktop_auth 

1352 def shell_voice_stop(): 

1353 """Stop voice control listening.""" 

1354 with _voice_lock: 

1355 _voice_state['listening'] = False 

1356 _voice_state['pid'] = None 

1357 _audit_desktop_op('voice_stop', {}) 

1358 return jsonify({'stopped': True}) 

1359 

1360 @app.route('/api/shell/voice/process', methods=['POST']) 

1361 @_require_desktop_auth 

1362 def shell_voice_process(): 

1363 """Process a voice command: transcribe audio → dispatch to HART agent. 

1364 

1365 This is the AI-native pipeline: 

1366 1. Audio file → Whisper STT (local, offline) → transcript 

1367 2. Transcript → HART /chat endpoint → agent executes task 

1368 3. Agent response returned to caller 

1369 

1370 Unlike Siri/Cortana/Alexa that use keyword matching and fixed intents, 

1371 this sends the full natural language to the agent for understanding. 

1372 The agent can execute ANY task — file ops, code, web search, etc. 

1373 """ 

1374 body = request.get_json(silent=True) or {} 

1375 audio_path = body.get('audio_path', '') 

1376 text = body.get('text', '') # Pre-transcribed text (skip STT) 

1377 

1378 # Step 1: Transcribe if audio provided 

1379 if audio_path and not text: 

1380 if not os.path.isfile(audio_path): 

1381 return jsonify({'error': 'audio file not found'}), 404 

1382 try: 

1383 from integrations.service_tools.whisper_tool import whisper_transcribe 

1384 result = json.loads(whisper_transcribe(audio_path)) 

1385 text = result.get('text', '').strip() 

1386 except Exception as e: 

1387 return jsonify({'error': f'transcription failed: {e}'}), 500 

1388 

1389 if not text: 

1390 return jsonify({'error': 'no audio_path or text provided'}), 400 

1391 

1392 with _voice_lock: 

1393 _voice_state['last_transcript'] = text 

1394 

1395 # Step 2: Dispatch to HART agent pipeline 

1396 try: 

1397 import urllib.request 

1398 from core.port_registry import get_port 

1399 port = get_port('backend') 

1400 payload = json.dumps({ 

1401 'user_id': 'voice_user', 

1402 'prompt_id': 'voice_cmd', 

1403 'prompt': text, 

1404 }).encode('utf-8') 

1405 req = urllib.request.Request( 

1406 f'http://localhost:{port}/chat', 

1407 data=payload, 

1408 headers={'Content-Type': 'application/json'}, 

1409 ) 

1410 with urllib.request.urlopen(req, timeout=30) as resp: 

1411 agent_response = json.loads(resp.read().decode('utf-8')) 

1412 return jsonify({ 

1413 'transcript': text, 

1414 'agent_response': agent_response, 

1415 'mode': 'ai_native', 

1416 }) 

1417 except Exception as e: 

1418 # Even if agent dispatch fails, return the transcript 

1419 return jsonify({ 

1420 'transcript': text, 

1421 'agent_response': None, 

1422 'error': f'agent dispatch failed: {e}', 

1423 'mode': 'ai_native', 

1424 }), 200 

1425 

1426 logger.info("Registered shell desktop routes (16 features)")