Coverage for integrations / agent_engine / shell_desktop_apis.py: 73.8%
961 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Shell Desktop APIs — Desktop experience management for HART OS.
4Covers: default apps, font manager, sound themes, clipboard history,
5date/time/timezone, wallpaper, input methods, night light, workspaces.
7All routes registered via register_shell_desktop_routes(app).
8"""
10import collections
11import json
12import logging
13import os
14import shutil
15import subprocess
16import sys
17import threading
18import time
20logger = logging.getLogger('hevolve.shell.desktop')
22# ─── Helpers ────────────────────────────────────────────────────
24_HART_CONFIG = os.path.expanduser(os.environ.get(
25 'HART_CONFIG_DIR', '~/.config/hart'))
28def _config_path(name):
29 os.makedirs(_HART_CONFIG, exist_ok=True)
30 return os.path.join(_HART_CONFIG, name)
33def _load_json(path, default=None):
34 try:
35 with open(path) as f:
36 return json.load(f)
37 except (FileNotFoundError, json.JSONDecodeError):
38 return default if default is not None else {}
41def _save_json(path, data):
42 os.makedirs(os.path.dirname(path), exist_ok=True)
43 with open(path, 'w') as f:
44 json.dump(data, f, indent=2)
47def _is_wayland():
48 """Detect Wayland compositor — env var check + GNOME session fallback."""
49 if os.environ.get('WAYLAND_DISPLAY'):
50 return True
51 # Fallback: some GNOME sessions don't set WAYLAND_DISPLAY
52 if os.environ.get('XDG_SESSION_TYPE', '').lower() == 'wayland':
53 return True
54 # pgrep is Linux-only; skip on other platforms
55 if sys.platform == 'linux':
56 try:
57 r = subprocess.run(['pgrep', '-x', 'sway|labwc|hyprland'],
58 capture_output=True, text=True, timeout=3)
59 if r.returncode == 0:
60 return True
61 except (FileNotFoundError, subprocess.TimeoutExpired):
62 pass
63 return False
66def _run(cmd, timeout=10, **kw):
67 try:
68 r = subprocess.run(cmd, capture_output=True, text=True,
69 timeout=timeout, **kw)
70 return r
71 except (FileNotFoundError, subprocess.TimeoutExpired):
72 return None
75# ─── Clipboard state (in-memory) ───────────────────────────────
77_clipboard_history = collections.deque(maxlen=100)
78_clipboard_lock = threading.Lock()
79_clipboard_counter = 0
82# ═══════════════════════════════════════════════════════════════
83# Route registration
84# ═══════════════════════════════════════════════════════════════
86def _require_desktop_auth(f):
87 """Decorator: require local shell auth for destructive desktop ops."""
88 from functools import wraps
89 @wraps(f)
90 def decorated(*args, **kwargs):
91 from flask import request, jsonify
92 remote = request.remote_addr or ''
93 if remote not in ('127.0.0.1', '::1', 'localhost'):
94 token = request.headers.get('X-Shell-Token', '')
95 expected = os.environ.get('HART_SHELL_TOKEN', '')
96 if not expected or token != expected:
97 return jsonify({'error': 'Unauthorized'}), 403
98 return f(*args, **kwargs)
99 return decorated
102def _audit_desktop_op(action, detail=None):
103 """Log a desktop operation to the immutable audit log (best-effort)."""
104 try:
105 from security.immutable_audit_log import get_audit_log
106 get_audit_log().log_event(
107 'shell_ops', 'shell_desktop_api', action,
108 detail=detail or {})
109 except Exception:
110 pass
113def register_shell_desktop_routes(app):
114 """Register all desktop experience API routes."""
115 from flask import jsonify, request
117 # ─── 1. Default Apps / File Associations ────────────────
119 _COMMON_MIMES = [
120 'text/html', 'text/plain', 'application/pdf',
121 'image/png', 'image/jpeg', 'image/gif', 'image/svg+xml',
122 'video/mp4', 'video/x-matroska', 'audio/mpeg', 'audio/flac',
123 'application/zip', 'application/json', 'application/xml',
124 'inode/directory',
125 ]
127 _CATEGORIES = {
128 'browser': ('xdg-settings', 'get', 'default-web-browser'),
129 'email': ('xdg-settings', 'get', 'default-url-scheme-handler', 'mailto'),
130 }
132 @app.route('/api/shell/default-apps', methods=['GET'])
133 def shell_default_apps():
134 defaults = {}
135 for mime in _COMMON_MIMES:
136 r = _run(['xdg-mime', 'query', 'default', mime])
137 if r and r.returncode == 0 and r.stdout.strip():
138 desktop = r.stdout.strip()
139 defaults[mime] = {
140 'app': desktop,
141 'name': desktop.replace('.desktop', '').replace('org.', '').replace('.', ' '),
142 }
143 categories = {}
144 for cat, cmd in _CATEGORIES.items():
145 r = _run(list(cmd))
146 if r and r.returncode == 0 and r.stdout.strip():
147 categories[cat] = r.stdout.strip()
148 return jsonify({'defaults': defaults, 'categories': categories})
150 @app.route('/api/shell/default-apps/candidates', methods=['GET'])
151 def shell_default_apps_candidates():
152 mime = request.args.get('mime_type', '')
153 if not mime:
154 return jsonify({'error': 'mime_type required'}), 400
155 candidates = []
156 if sys.platform == 'win32':
157 apps_dir = os.path.join(os.environ.get('PROGRAMDATA', r'C:\ProgramData'),
158 'Microsoft', 'Windows', 'Start Menu', 'Programs')
159 elif sys.platform == 'darwin':
160 apps_dir = '/Applications'
161 else:
162 apps_dir = '/usr/share/applications'
163 if os.path.isdir(apps_dir):
164 for f in os.listdir(apps_dir):
165 if not f.endswith('.desktop'):
166 continue
167 path = os.path.join(apps_dir, f)
168 try:
169 with open(path) as fh:
170 content = fh.read()
171 if mime in content:
172 name = f.replace('.desktop', '')
173 for line in content.splitlines():
174 if line.startswith('Name='):
175 name = line.split('=', 1)[1]
176 break
177 candidates.append({'app': f, 'name': name})
178 except (IOError, UnicodeDecodeError):
179 pass
180 return jsonify({'mime_type': mime, 'candidates': candidates})
182 @app.route('/api/shell/default-apps/set', methods=['POST'])
183 def shell_default_apps_set():
184 data = request.get_json(force=True)
185 mime = data.get('mime_type', '')
186 desktop_app = data.get('app', '')
187 if not mime or not desktop_app:
188 return jsonify({'error': 'mime_type and app required'}), 400
189 r = _run(['xdg-mime', 'default', desktop_app, mime])
190 if r and r.returncode == 0:
191 return jsonify({'set': True, 'mime_type': mime, 'app': desktop_app})
192 return jsonify({'set': False, 'error': r.stderr.strip() if r else 'xdg-mime not available'}), 500
194 @app.route('/api/shell/default-apps/set-category', methods=['POST'])
195 def shell_default_apps_set_category():
196 data = request.get_json(force=True)
197 cat = data.get('category', '')
198 desktop_app = data.get('app', '')
199 if not cat or not desktop_app:
200 return jsonify({'error': 'category and app required'}), 400
201 if cat == 'browser':
202 r = _run(['xdg-settings', 'set', 'default-web-browser', desktop_app])
203 elif cat == 'email':
204 r = _run(['xdg-settings', 'set', 'default-url-scheme-handler', 'mailto', desktop_app])
205 else:
206 return jsonify({'error': f'Unknown category: {cat}'}), 400
207 ok = r and r.returncode == 0
208 return jsonify({'set': ok, 'category': cat, 'app': desktop_app})
210 # ─── 2. Font Manager ───────────────────────────────────
212 @app.route('/api/shell/fonts', methods=['GET'])
213 def shell_fonts_list():
214 search = request.args.get('search', '').lower()
215 category = request.args.get('category', '')
216 r = _run(['fc-list', '--format=%{family}|%{style}|%{file}|%{fontformat}\n'])
217 fonts = []
218 seen = set()
219 if r and r.returncode == 0:
220 for line in r.stdout.strip().split('\n'):
221 parts = line.split('|')
222 if len(parts) < 3:
223 continue
224 family = parts[0].split(',')[0].strip()
225 if family in seen:
226 continue
227 seen.add(family)
228 style = parts[1].strip() if len(parts) > 1 else ''
229 path = parts[2].strip() if len(parts) > 2 else ''
230 fmt = parts[3].strip() if len(parts) > 3 else ''
231 cat = 'monospace' if any(k in family.lower() for k in ['mono', 'code', 'consol']) \
232 else 'serif' if any(k in family.lower() for k in ['serif', 'times', 'georgia']) \
233 else 'sans-serif'
234 if search and search not in family.lower():
235 continue
236 if category and category != cat:
237 continue
238 fonts.append({
239 'family': family, 'style': style,
240 'path': path, 'format': fmt, 'category': cat,
241 })
242 fonts.sort(key=lambda f: f['family'])
243 cats = {}
244 for f in fonts:
245 cats[f['category']] = cats.get(f['category'], 0) + 1
246 return jsonify({'fonts': fonts, 'count': len(fonts), 'categories': cats})
248 @app.route('/api/shell/fonts/preview', methods=['GET'])
249 def shell_fonts_preview():
250 family = request.args.get('family', '')
251 text = request.args.get('text', 'The quick brown fox jumps over the lazy dog')
252 size = request.args.get('size', '18')
253 if not family:
254 return jsonify({'error': 'family required'}), 400
255 return jsonify({
256 'family': family, 'text': text, 'size': int(size),
257 'css': f'font-family: "{family}"; font-size: {size}px;',
258 })
260 @app.route('/api/shell/fonts/install', methods=['POST'])
261 def shell_fonts_install():
262 data = request.get_json(force=True)
263 path = data.get('path', '')
264 if not path or not os.path.isfile(path):
265 return jsonify({'error': 'Valid font file path required'}), 400
266 ext = os.path.splitext(path)[1].lower()
267 if ext not in ('.ttf', '.otf', '.woff', '.woff2', '.ttc'):
268 return jsonify({'error': f'Unsupported font format: {ext}'}), 400
269 fonts_dir = os.path.expanduser('~/.local/share/fonts')
270 os.makedirs(fonts_dir, exist_ok=True)
271 dest = os.path.join(fonts_dir, os.path.basename(path))
272 try:
273 shutil.copy2(path, dest)
274 _run(['fc-cache', '-f'], timeout=30)
275 return jsonify({'installed': True, 'path': dest,
276 'family': os.path.splitext(os.path.basename(path))[0]})
277 except (IOError, PermissionError) as e:
278 return jsonify({'error': str(e)}), 500
280 @app.route('/api/shell/fonts/remove', methods=['POST'])
281 def shell_fonts_remove():
282 data = request.get_json(force=True)
283 family = data.get('family', '')
284 if not family:
285 return jsonify({'error': 'family required'}), 400
286 fonts_dir = os.path.expanduser('~/.local/share/fonts')
287 removed = []
288 if os.path.isdir(fonts_dir):
289 for f in os.listdir(fonts_dir):
290 if family.lower().replace(' ', '') in f.lower().replace(' ', ''):
291 fp = os.path.join(fonts_dir, f)
292 os.remove(fp)
293 removed.append(f)
294 if removed:
295 _run(['fc-cache', '-f'], timeout=30)
296 return jsonify({'removed': True, 'family': family, 'files': removed})
297 return jsonify({'removed': False, 'error': 'Font not found in user fonts (system fonts cannot be removed)'}), 404
299 # ─── 3. Sound Manager & System Sounds ──────────────────
301 _SOUND_EVENTS = [
302 {'id': 'bell', 'name': 'Notification'},
303 {'id': 'dialog-error', 'name': 'Error'},
304 {'id': 'dialog-warning', 'name': 'Warning'},
305 {'id': 'dialog-information', 'name': 'Information'},
306 {'id': 'desktop-login', 'name': 'Login'},
307 {'id': 'desktop-logout', 'name': 'Logout'},
308 {'id': 'device-added', 'name': 'Device Connected'},
309 {'id': 'device-removed', 'name': 'Device Disconnected'},
310 {'id': 'screen-capture', 'name': 'Screenshot'},
311 {'id': 'trash-empty', 'name': 'Empty Trash'},
312 {'id': 'message-new-instant', 'name': 'New Message'},
313 {'id': 'battery-low', 'name': 'Low Battery'},
314 ]
316 @app.route('/api/shell/sounds/themes', methods=['GET'])
317 def shell_sounds_themes():
318 themes = []
319 for d in ['/usr/share/sounds', '/run/current-system/sw/share/sounds']:
320 if not os.path.isdir(d):
321 continue
322 for name in os.listdir(d):
323 theme_dir = os.path.join(d, name)
324 if os.path.isdir(theme_dir) and name not in [t['id'] for t in themes]:
325 themes.append({'id': name, 'name': name.replace('-', ' ').title(),
326 'path': theme_dir})
327 if not themes:
328 themes.append({'id': 'freedesktop', 'name': 'FreeDesktop (default)', 'path': ''})
329 cfg = _load_json(_config_path('sound-theme.json'), {'active': 'freedesktop', 'enabled': True})
330 return jsonify({'themes': themes, 'active': cfg.get('active', 'freedesktop'),
331 'enabled': cfg.get('enabled', True)})
333 @app.route('/api/shell/sounds/events', methods=['GET'])
334 def shell_sounds_events():
335 cfg = _load_json(_config_path('sound-theme.json'), {'active': 'freedesktop', 'enabled': True})
336 overrides = _load_json(_config_path('sound-overrides.json'), {})
337 events = []
338 for evt in _SOUND_EVENTS:
339 entry = dict(evt)
340 if evt['id'] in overrides:
341 entry['file'] = overrides[evt['id']]
342 entry['custom'] = True
343 else:
344 for ext in ('oga', 'ogg', 'wav'):
345 for base in ['/usr/share/sounds', '/run/current-system/sw/share/sounds']:
346 p = os.path.join(base, cfg.get('active', 'freedesktop'), 'stereo', f"{evt['id']}.{ext}")
347 if os.path.isfile(p):
348 entry['file'] = p
349 break
350 events.append(entry)
351 return jsonify({'events': events, 'enabled': cfg.get('enabled', True)})
353 @app.route('/api/shell/sounds/set-theme', methods=['POST'])
354 def shell_sounds_set_theme():
355 data = request.get_json(force=True)
356 theme = data.get('theme', '')
357 if not theme:
358 return jsonify({'error': 'theme required'}), 400
359 cfg = _load_json(_config_path('sound-theme.json'), {})
360 cfg['active'] = theme
361 _save_json(_config_path('sound-theme.json'), cfg)
362 return jsonify({'set': True, 'theme': theme})
364 @app.route('/api/shell/sounds/set-event', methods=['POST'])
365 def shell_sounds_set_event():
366 data = request.get_json(force=True)
367 event = data.get('event', '')
368 file = data.get('file', '')
369 if not event or not file:
370 return jsonify({'error': 'event and file required'}), 400
371 overrides = _load_json(_config_path('sound-overrides.json'), {})
372 overrides[event] = file
373 _save_json(_config_path('sound-overrides.json'), overrides)
374 return jsonify({'set': True, 'event': event, 'file': file})
376 @app.route('/api/shell/sounds/toggle', methods=['POST'])
377 def shell_sounds_toggle():
378 data = request.get_json(force=True)
379 enabled = data.get('enabled', True)
380 cfg = _load_json(_config_path('sound-theme.json'), {})
381 cfg['enabled'] = bool(enabled)
382 _save_json(_config_path('sound-theme.json'), cfg)
383 return jsonify({'enabled': bool(enabled)})
385 @app.route('/api/shell/sounds/play', methods=['POST'])
386 def shell_sounds_play():
387 data = request.get_json(force=True)
388 file = data.get('file', '')
389 event = data.get('event', '')
390 if not file and not event:
391 return jsonify({'error': 'file or event required'}), 400
392 if event and not file:
393 overrides = _load_json(_config_path('sound-overrides.json'), {})
394 if event in overrides:
395 file = overrides[event]
396 else:
397 cfg = _load_json(_config_path('sound-theme.json'), {'active': 'freedesktop'})
398 theme = cfg.get('active', 'freedesktop')
399 for ext in ('oga', 'ogg', 'wav'):
400 for base in ['/usr/share/sounds', '/run/current-system/sw/share/sounds']:
401 p = os.path.join(base, theme, 'stereo', f'{event}.{ext}')
402 if os.path.isfile(p):
403 file = p
404 break
405 if file:
406 break
407 if not file:
408 return jsonify({'played': False, 'error': 'Sound file not found'}), 404
409 player = shutil.which('pw-play') or shutil.which('paplay') or shutil.which('aplay')
410 if player:
411 _run([player, file], timeout=5)
412 return jsonify({'played': True, 'file': file})
413 return jsonify({'played': False, 'error': 'No audio player available'}), 500
415 # ─── 4. Clipboard Manager ──────────────────────────────
417 @app.route('/api/shell/clipboard/history', methods=['GET'])
418 def shell_clipboard_history():
419 limit = int(request.args.get('limit', 50))
420 ctype = request.args.get('type', '')
421 with _clipboard_lock:
422 entries = list(_clipboard_history)
423 if ctype:
424 entries = [e for e in entries if ctype in e.get('content_type', '')]
425 entries = entries[:limit]
426 return jsonify({'entries': entries, 'count': len(entries)})
428 @app.route('/api/shell/clipboard/current', methods=['GET'])
429 def shell_clipboard_current():
430 if _is_wayland():
431 r = _run(['wl-paste', '--no-newline'])
432 else:
433 r = _run(['xclip', '-selection', 'clipboard', '-o'])
434 content = r.stdout if r and r.returncode == 0 else ''
435 return jsonify({'content': content, 'content_type': 'text/plain',
436 'timestamp': time.time()})
438 @app.route('/api/shell/clipboard/copy', methods=['POST'])
439 def shell_clipboard_copy():
440 global _clipboard_counter
441 data = request.get_json(force=True)
442 content = data.get('content', '')
443 ctype = data.get('content_type', 'text/plain')
444 if not content:
445 return jsonify({'error': 'content required'}), 400
446 if _is_wayland():
447 _run(['wl-copy', content])
448 else:
449 subprocess.run(['xclip', '-selection', 'clipboard'],
450 input=content, text=True, timeout=5,
451 capture_output=True)
452 with _clipboard_lock:
453 _clipboard_counter += 1
454 _clipboard_history.appendleft({
455 'id': _clipboard_counter, 'content': content[:1000],
456 'content_type': ctype, 'timestamp': time.time(),
457 'pinned': False,
458 })
459 return jsonify({'copied': True, 'id': _clipboard_counter})
461 @app.route('/api/shell/clipboard/pin', methods=['POST'])
462 def shell_clipboard_pin():
463 data = request.get_json(force=True)
464 entry_id = data.get('id', 0)
465 with _clipboard_lock:
466 for e in _clipboard_history:
467 if e['id'] == entry_id:
468 e['pinned'] = True
469 return jsonify({'pinned': True, 'id': entry_id})
470 return jsonify({'error': 'Entry not found'}), 404
472 @app.route('/api/shell/clipboard/clear', methods=['POST'])
473 def shell_clipboard_clear():
474 data = request.get_json(force=True)
475 if data.get('all'):
476 with _clipboard_lock:
477 pinned = [e for e in _clipboard_history if e.get('pinned')]
478 count = len(_clipboard_history) - len(pinned)
479 _clipboard_history.clear()
480 _clipboard_history.extend(pinned)
481 return jsonify({'cleared': count})
482 entry_id = data.get('id', 0)
483 with _clipboard_lock:
484 for i, e in enumerate(_clipboard_history):
485 if e['id'] == entry_id:
486 del _clipboard_history[i]
487 return jsonify({'cleared': 1})
488 return jsonify({'error': 'Entry not found'}), 404
490 # ─── 5. Date/Time/Timezone ─────────────────────────────
492 @app.route('/api/shell/datetime', methods=['GET'])
493 def shell_datetime():
494 info = {'timezone': 'UTC', 'utc_offset': '+00:00',
495 'ntp_enabled': False, 'ntp_synced': False,
496 'clock_format': '24h', 'rtc_in_local_time': False}
497 r = _run(['timedatectl', 'show'])
498 if r and r.returncode == 0:
499 for line in r.stdout.strip().split('\n'):
500 if '=' not in line:
501 continue
502 k, v = line.split('=', 1)
503 if k == 'Timezone':
504 info['timezone'] = v
505 elif k == 'NTP':
506 info['ntp_enabled'] = v == 'yes'
507 elif k == 'NTPSynchronized':
508 info['ntp_synced'] = v == 'yes'
509 elif k == 'LocalRTC':
510 info['rtc_in_local_time'] = v == 'yes'
511 import datetime
512 now = datetime.datetime.now()
513 info['datetime'] = now.isoformat()
514 tz = datetime.datetime.now(datetime.timezone.utc).astimezone()
515 off = tz.strftime('%z')
516 info['utc_offset'] = f'{off[:3]}:{off[3:]}' if len(off) >= 5 else off
517 cfg = _load_json(_config_path('clock-format.json'), {'format': '24h'})
518 info['clock_format'] = cfg.get('format', '24h')
519 return jsonify(info)
521 @app.route('/api/shell/datetime/timezones', methods=['GET'])
522 def shell_datetime_timezones():
523 r = _run(['timedatectl', 'list-timezones'], timeout=5)
524 if r and r.returncode == 0:
525 zones = [z for z in r.stdout.strip().split('\n') if z]
526 return jsonify({'timezones': zones, 'count': len(zones)})
527 import zoneinfo
528 zones = sorted(zoneinfo.available_timezones())
529 return jsonify({'timezones': zones, 'count': len(zones)})
531 @app.route('/api/shell/datetime/set-timezone', methods=['POST'])
532 def shell_datetime_set_tz():
533 data = request.get_json(force=True)
534 tz = data.get('timezone', '')
535 if not tz or '/' not in tz:
536 return jsonify({'error': 'Valid timezone required (e.g. US/Pacific)'}), 400
537 r = _run(['timedatectl', 'set-timezone', tz])
538 if r and r.returncode == 0:
539 return jsonify({'set': True, 'timezone': tz})
540 return jsonify({'set': False, 'error': r.stderr.strip() if r else 'timedatectl not available'}), 500
542 @app.route('/api/shell/datetime/set-ntp', methods=['POST'])
543 def shell_datetime_set_ntp():
544 data = request.get_json(force=True)
545 enabled = data.get('enabled', True)
546 val = 'true' if enabled else 'false'
547 r = _run(['timedatectl', 'set-ntp', val])
548 ok = r and r.returncode == 0
549 return jsonify({'set': ok, 'ntp_enabled': bool(enabled)})
551 @app.route('/api/shell/datetime/set-format', methods=['POST'])
552 def shell_datetime_set_format():
553 data = request.get_json(force=True)
554 fmt = data.get('format', '')
555 if fmt not in ('12h', '24h'):
556 return jsonify({'error': 'format must be 12h or 24h'}), 400
557 _save_json(_config_path('clock-format.json'), {'format': fmt})
558 return jsonify({'set': True, 'clock_format': fmt})
560 # ─── 6. Wallpaper Manager ──────────────────────────────
562 _WALL_CFG = _config_path('wallpaper.json')
563 _WALL_EXTS = {'.png', '.jpg', '.jpeg', '.webp', '.svg', '.bmp'}
565 @app.route('/api/shell/wallpaper', methods=['GET'])
566 def shell_wallpaper():
567 cfg = _load_json(_WALL_CFG, {
568 'current': '', 'lock_screen': '',
569 'mode': 'fill',
570 'slideshow': {'enabled': False, 'interval_minutes': 30, 'directory': ''},
571 })
572 return jsonify(cfg)
574 @app.route('/api/shell/wallpaper/collection', methods=['GET'])
575 def shell_wallpaper_collection():
576 directory = request.args.get('directory', '/usr/share/backgrounds')
577 if not os.path.isdir(directory):
578 return jsonify({'images': [], 'count': 0})
579 images = []
580 for f in sorted(os.listdir(directory)):
581 ext = os.path.splitext(f)[1].lower()
582 if ext not in _WALL_EXTS:
583 continue
584 fp = os.path.join(directory, f)
585 try:
586 stat = os.stat(fp)
587 images.append({
588 'path': fp, 'name': os.path.splitext(f)[0],
589 'size': stat.st_size,
590 })
591 except OSError:
592 pass
593 return jsonify({'images': images, 'count': len(images)})
595 @app.route('/api/shell/wallpaper/set', methods=['POST'])
596 def shell_wallpaper_set():
597 data = request.get_json(force=True)
598 path = data.get('path', '')
599 mode = data.get('mode', 'fill')
600 if not path:
601 return jsonify({'error': 'path required'}), 400
602 if _is_wayland():
603 _run(['swaymsg', 'output', '*', 'bg', path, mode])
604 else:
605 if mode == 'fill':
606 _run(['feh', '--bg-fill', path])
607 elif mode == 'fit':
608 _run(['feh', '--bg-max', path])
609 elif mode == 'center':
610 _run(['feh', '--bg-center', path])
611 elif mode == 'tile':
612 _run(['feh', '--bg-tile', path])
613 else:
614 _run(['feh', '--bg-scale', path])
615 cfg = _load_json(_WALL_CFG, {})
616 cfg['current'] = path
617 cfg['mode'] = mode
618 _save_json(_WALL_CFG, cfg)
619 return jsonify({'set': True, 'path': path, 'mode': mode})
621 @app.route('/api/shell/wallpaper/set-lock', methods=['POST'])
622 def shell_wallpaper_set_lock():
623 data = request.get_json(force=True)
624 path = data.get('path', '')
625 if not path:
626 return jsonify({'error': 'path required'}), 400
627 cfg = _load_json(_WALL_CFG, {})
628 cfg['lock_screen'] = path
629 _save_json(_WALL_CFG, cfg)
630 return jsonify({'set': True, 'lock_screen': path})
632 @app.route('/api/shell/wallpaper/slideshow', methods=['POST'])
633 def shell_wallpaper_slideshow():
634 data = request.get_json(force=True)
635 enabled = data.get('enabled', False)
636 interval = data.get('interval_minutes', 30)
637 directory = data.get('directory', '')
638 img_count = 0
639 if directory and os.path.isdir(directory):
640 img_count = sum(1 for f in os.listdir(directory)
641 if os.path.splitext(f)[1].lower() in _WALL_EXTS)
642 cfg = _load_json(_WALL_CFG, {})
643 cfg['slideshow'] = {
644 'enabled': bool(enabled),
645 'interval_minutes': int(interval),
646 'directory': directory,
647 }
648 _save_json(_WALL_CFG, cfg)
649 return jsonify({'slideshow': {**cfg['slideshow'], 'image_count': img_count}})
651 # ─── 7. Input Methods / Keyboard Layouts ───────────────
653 @app.route('/api/shell/input-methods', methods=['GET'])
654 def shell_input_methods():
655 info = {'layouts': [], 'active': '', 'compose_key': '',
656 'ime': {'engine': 'none', 'running': False, 'input_methods': []}}
657 r = _run(['setxkbmap', '-query'])
658 if r and r.returncode == 0:
659 for line in r.stdout.strip().split('\n'):
660 if ':' not in line:
661 continue
662 key, val = line.split(':', 1)
663 key, val = key.strip(), val.strip()
664 if key == 'layout':
665 layouts = val.split(',')
666 info['active'] = layouts[0] if layouts else ''
667 for l in layouts:
668 info['layouts'].append({'id': l.strip(), 'name': l.strip()})
669 elif key == 'options':
670 for opt in val.split(','):
671 opt = opt.strip()
672 if opt.startswith('compose:'):
673 info['compose_key'] = opt.split(':')[1]
674 for engine in ('fcitx5', 'ibus-daemon'):
675 r2 = _run(['pgrep', '-x', engine])
676 if r2 and r2.returncode == 0:
677 info['ime']['engine'] = engine.replace('-daemon', '')
678 info['ime']['running'] = True
679 break
680 return jsonify(info)
682 @app.route('/api/shell/input-methods/available', methods=['GET'])
683 def shell_input_methods_available():
684 r = _run(['localectl', 'list-x11-keymap-layouts'], timeout=5)
685 layouts = []
686 if r and r.returncode == 0:
687 for lay in r.stdout.strip().split('\n'):
688 if lay.strip():
689 layouts.append({'id': lay.strip(), 'name': lay.strip()})
690 ime_engines = ['pinyin', 'mozc', 'hangul', 'anthy', 'chewing', 'libpinyin']
691 return jsonify({'layouts': layouts[:200], 'ime_engines': ime_engines})
693 @app.route('/api/shell/input-methods/add', methods=['POST'])
694 def shell_input_methods_add():
695 data = request.get_json(force=True)
696 layout = data.get('layout', '')
697 if not layout:
698 return jsonify({'error': 'layout required'}), 400
699 r = _run(['setxkbmap', '-query'])
700 current = ''
701 if r and r.returncode == 0:
702 for line in r.stdout.strip().split('\n'):
703 if line.strip().startswith('layout'):
704 current = line.split(':', 1)[1].strip()
705 new_layouts = f'{current},{layout}' if current else layout
706 _run(['setxkbmap', '-layout', new_layouts, '-option', 'grp:alt_shift_toggle'])
707 return jsonify({'added': True, 'layout': layout, 'all_layouts': new_layouts})
709 @app.route('/api/shell/input-methods/remove', methods=['POST'])
710 def shell_input_methods_remove():
711 data = request.get_json(force=True)
712 layout = data.get('layout', '')
713 if not layout:
714 return jsonify({'error': 'layout required'}), 400
715 r = _run(['setxkbmap', '-query'])
716 current_layouts = []
717 if r and r.returncode == 0:
718 for line in r.stdout.strip().split('\n'):
719 if line.strip().startswith('layout'):
720 current_layouts = [l.strip() for l in line.split(':', 1)[1].strip().split(',')]
721 if layout in current_layouts:
722 current_layouts.remove(layout)
723 if current_layouts:
724 _run(['setxkbmap', '-layout', ','.join(current_layouts)])
725 return jsonify({'removed': True, 'layout': layout})
727 @app.route('/api/shell/input-methods/switch', methods=['POST'])
728 def shell_input_methods_switch():
729 data = request.get_json(force=True)
730 layout = data.get('layout', '')
731 if not layout:
732 return jsonify({'error': 'layout required'}), 400
733 _run(['setxkbmap', layout])
734 return jsonify({'switched': True, 'active': layout})
736 @app.route('/api/shell/input-methods/compose-key', methods=['POST'])
737 def shell_input_methods_compose_key():
738 data = request.get_json(force=True)
739 key = data.get('key', 'ralt')
740 _run(['setxkbmap', '-option', f'compose:{key}'])
741 return jsonify({'set': True, 'compose_key': key})
743 # ─── 8. Night Light / Blue Light Filter ────────────────
745 _NL_CFG = _config_path('nightlight.json')
747 @app.route('/api/shell/nightlight', methods=['GET'])
748 def shell_nightlight():
749 cfg = _load_json(_NL_CFG, {
750 'enabled': False, 'temperature': 4500,
751 'schedule': {'mode': 'sunset', 'start': '20:00', 'end': '06:00'},
752 })
753 active = False
754 for proc in ('gammastep', 'redshift'):
755 r = _run(['pgrep', '-x', proc])
756 if r and r.returncode == 0:
757 active = True
758 break
759 cfg['active'] = active
760 return jsonify(cfg)
762 @app.route('/api/shell/nightlight/toggle', methods=['POST'])
763 def shell_nightlight_toggle():
764 data = request.get_json(force=True)
765 enabled = data.get('enabled', True)
766 cfg = _load_json(_NL_CFG, {'temperature': 4500})
767 cfg['enabled'] = bool(enabled)
768 _save_json(_NL_CFG, cfg)
769 if enabled:
770 temp = cfg.get('temperature', 4500)
771 tool = shutil.which('gammastep') or shutil.which('redshift')
772 if tool:
773 _run(['pkill', '-x', os.path.basename(tool)])
774 subprocess.Popen([tool, '-O', str(temp)],
775 stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
776 return jsonify({'enabled': True, 'active': True, 'temperature': temp})
777 return jsonify({'enabled': True, 'active': False,
778 'error': 'Neither gammastep nor redshift available'}), 500
779 for proc in ('gammastep', 'redshift'):
780 _run(['pkill', '-x', proc])
781 return jsonify({'enabled': False, 'active': False})
783 @app.route('/api/shell/nightlight/temperature', methods=['POST'])
784 def shell_nightlight_temperature():
785 data = request.get_json(force=True)
786 temp = data.get('temperature', 4500)
787 if not (1000 <= temp <= 6500):
788 return jsonify({'error': 'temperature must be 1000-6500'}), 400
789 cfg = _load_json(_NL_CFG, {})
790 cfg['temperature'] = temp
791 _save_json(_NL_CFG, cfg)
792 tool = shutil.which('gammastep') or shutil.which('redshift')
793 if tool:
794 _run(['pkill', '-x', os.path.basename(tool)])
795 subprocess.Popen([tool, '-O', str(temp)],
796 stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
797 return jsonify({'set': True, 'temperature': temp})
799 @app.route('/api/shell/nightlight/schedule', methods=['POST'])
800 def shell_nightlight_schedule():
801 data = request.get_json(force=True)
802 mode = data.get('mode', 'manual')
803 if mode not in ('sunset', 'manual', 'disabled'):
804 return jsonify({'error': 'mode must be sunset, manual, or disabled'}), 400
805 cfg = _load_json(_NL_CFG, {})
806 cfg['schedule'] = {
807 'mode': mode,
808 'start': data.get('start', '20:00'),
809 'end': data.get('end', '06:00'),
810 }
811 _save_json(_NL_CFG, cfg)
812 return jsonify({'set': True, 'schedule': cfg['schedule']})
814 # ─── 9. Window / Workspace Manager ─────────────────────
816 def _detect_compositor():
817 if os.environ.get('SWAYSOCK'):
818 return 'sway'
819 if os.environ.get('HYPRLAND_INSTANCE_SIGNATURE'):
820 return 'hyprland'
821 if _is_wayland():
822 return 'wayland'
823 return 'x11'
825 @app.route('/api/shell/workspaces', methods=['GET'])
826 def shell_workspaces():
827 comp = _detect_compositor()
828 workspaces = []
829 if comp == 'sway':
830 r = _run(['swaymsg', '-t', 'get_workspaces', '--raw'])
831 if r and r.returncode == 0:
832 try:
833 for ws in json.loads(r.stdout):
834 workspaces.append({
835 'id': ws.get('num', ws.get('id')),
836 'name': ws.get('name', ''),
837 'focused': ws.get('focused', False),
838 'visible': ws.get('visible', False),
839 })
840 except json.JSONDecodeError:
841 pass
842 elif comp == 'x11':
843 r = _run(['wmctrl', '-d'])
844 if r and r.returncode == 0:
845 for line in r.stdout.strip().split('\n'):
846 parts = line.split()
847 if len(parts) >= 9:
848 workspaces.append({
849 'id': int(parts[0]),
850 'name': parts[-1],
851 'focused': parts[1] == '*',
852 'visible': parts[1] == '*',
853 })
854 if not workspaces:
855 workspaces.append({'id': 1, 'name': 'Main', 'focused': True, 'visible': True})
856 return jsonify({'workspaces': workspaces, 'compositor': comp})
858 @app.route('/api/shell/workspaces/windows', methods=['GET'])
859 def shell_workspaces_windows():
860 comp = _detect_compositor()
861 workspace = request.args.get('workspace', '')
862 windows = []
863 if comp == 'sway':
864 r = _run(['swaymsg', '-t', 'get_tree', '--raw'])
865 if r and r.returncode == 0:
866 try:
867 tree = json.loads(r.stdout)
868 _extract_sway_windows(tree, windows)
869 except json.JSONDecodeError:
870 pass
871 elif comp == 'x11':
872 r = _run(['wmctrl', '-l', '-G'])
873 if r and r.returncode == 0:
874 for line in r.stdout.strip().split('\n'):
875 parts = line.split(None, 7)
876 if len(parts) >= 8:
877 windows.append({
878 'id': parts[0], 'workspace': int(parts[1]),
879 'x': int(parts[2]), 'y': int(parts[3]),
880 'w': int(parts[4]), 'h': int(parts[5]),
881 'title': parts[7] if len(parts) > 7 else '',
882 })
883 if workspace:
884 try:
885 ws_id = int(workspace)
886 windows = [w for w in windows if w.get('workspace') == ws_id]
887 except ValueError:
888 pass
889 return jsonify({'windows': windows, 'count': len(windows)})
891 def _extract_sway_windows(node, results):
892 if node.get('type') == 'con' and node.get('name') and node.get('pid'):
893 rect = node.get('rect', {})
894 results.append({
895 'id': node.get('id'),
896 'title': node.get('name', ''),
897 'app': node.get('app_id', ''),
898 'workspace': node.get('workspace', ''),
899 'focused': node.get('focused', False),
900 'x': rect.get('x', 0), 'y': rect.get('y', 0),
901 'w': rect.get('width', 0), 'h': rect.get('height', 0),
902 })
903 for child in node.get('nodes', []) + node.get('floating_nodes', []):
904 _extract_sway_windows(child, results)
906 @app.route('/api/shell/workspaces/create', methods=['POST'])
907 def shell_workspaces_create():
908 data = request.get_json(force=True)
909 name = data.get('name', '')
910 comp = _detect_compositor()
911 if comp == 'sway':
912 ws_name = name or str(int(time.time()) % 100)
913 _run(['swaymsg', f'workspace {ws_name}'])
914 return jsonify({'created': True, 'name': ws_name})
915 return jsonify({'created': False, 'error': f'{comp}: workspace creation not supported'}), 500
917 @app.route('/api/shell/workspaces/switch', methods=['POST'])
918 def shell_workspaces_switch():
919 data = request.get_json(force=True)
920 ws_id = data.get('id', '')
921 name = data.get('name', str(ws_id))
922 comp = _detect_compositor()
923 if comp == 'sway':
924 _run(['swaymsg', f'workspace {name}'])
925 return jsonify({'switched': True, 'workspace': name})
926 elif comp == 'x11':
927 _run(['wmctrl', '-s', str(ws_id)])
928 return jsonify({'switched': True, 'workspace': ws_id})
929 return jsonify({'switched': False}), 500
931 @app.route('/api/shell/workspaces/close', methods=['POST'])
932 def shell_workspaces_close():
933 data = request.get_json(force=True)
934 ws_id = data.get('id', '')
935 return jsonify({'closed': True, 'id': ws_id})
937 _SNAP_MAP = {
938 'left-half': 'move position 0 0; resize set 50 ppt 100 ppt',
939 'right-half': 'move position 50 ppt 0; resize set 50 ppt 100 ppt',
940 'top-half': 'move position 0 0; resize set 100 ppt 50 ppt',
941 'bottom-half': 'move position 0 50 ppt; resize set 100 ppt 50 ppt',
942 'top-left': 'move position 0 0; resize set 50 ppt 50 ppt',
943 'top-right': 'move position 50 ppt 0; resize set 50 ppt 50 ppt',
944 'bottom-left': 'move position 0 50 ppt; resize set 50 ppt 50 ppt',
945 'bottom-right': 'move position 50 ppt 50 ppt; resize set 50 ppt 50 ppt',
946 'maximize': 'fullscreen enable',
947 'center': 'move position center',
948 }
950 @app.route('/api/shell/workspaces/snap', methods=['POST'])
951 def shell_workspaces_snap():
952 data = request.get_json(force=True)
953 window_id = data.get('window_id', '')
954 position = data.get('position', '')
955 if position not in _SNAP_MAP:
956 return jsonify({'error': f'Invalid position. Valid: {list(_SNAP_MAP.keys())}'}), 400
957 comp = _detect_compositor()
958 if comp == 'sway':
959 cmd_str = _SNAP_MAP[position]
960 if window_id:
961 _run(['swaymsg', f'[con_id={window_id}] {cmd_str}'])
962 else:
963 _run(['swaymsg', cmd_str])
964 return jsonify({'snapped': True, 'position': position})
965 elif comp == 'x11' and window_id:
966 if position == 'maximize':
967 _run(['wmctrl', '-i', '-r', window_id, '-b', 'add,maximized_vert,maximized_horz'])
968 return jsonify({'snapped': True, 'position': position})
969 return jsonify({'snapped': False, 'error': f'{comp}: snap not supported'}), 500
971 # ─── Multi-Monitor Management ──────────────────────────
973 @app.route('/api/shell/displays', methods=['GET'])
974 def shell_displays_list():
975 """List connected displays with resolution and position."""
976 displays = []
977 if _is_wayland():
978 r = _run(['swaymsg', '-t', 'get_outputs', '-r'], timeout=5)
979 if r and r.returncode == 0:
980 try:
981 outputs = json.loads(r.stdout)
982 for out in outputs:
983 displays.append({
984 'name': out.get('name', ''),
985 'make': out.get('make', ''),
986 'model': out.get('model', ''),
987 'resolution': f"{out.get('rect', {}).get('width', 0)}x{out.get('rect', {}).get('height', 0)}",
988 'position': f"{out.get('rect', {}).get('x', 0)},{out.get('rect', {}).get('y', 0)}",
989 'scale': out.get('scale', 1.0),
990 'active': out.get('active', False),
991 })
992 except (json.JSONDecodeError, KeyError):
993 pass
994 else:
995 r = _run(['xrandr', '--query'], timeout=5)
996 if r and r.returncode == 0:
997 import re
998 for line in r.stdout.split('\n'):
999 m = re.match(r'^(\S+)\s+(connected|disconnected)\s*(primary)?\s*(\d+x\d+\+\d+\+\d+)?', line)
1000 if m and m.group(2) == 'connected':
1001 displays.append({
1002 'name': m.group(1),
1003 'primary': m.group(3) == 'primary',
1004 'geometry': m.group(4) or '',
1005 'connected': True,
1006 })
1007 return jsonify({'displays': displays, 'compositor': 'wayland' if _is_wayland() else 'x11'})
1009 @app.route('/api/shell/displays/arrange', methods=['PUT'])
1010 def shell_displays_arrange():
1011 """Arrange displays (position, resolution, scale)."""
1012 body = request.get_json(silent=True) or {}
1013 display = body.get('display', '')
1014 if not display:
1015 return jsonify({'error': 'display name is required'}), 400
1016 if _is_wayland():
1017 cmd = ['swaymsg', 'output', display]
1018 if 'position' in body:
1019 cmd += ['position', str(body['position'])]
1020 if 'resolution' in body:
1021 cmd += ['resolution', str(body['resolution'])]
1022 if 'scale' in body:
1023 cmd += ['scale', str(body['scale'])]
1024 r = _run(cmd, timeout=5)
1025 ok = r and r.returncode == 0
1026 else:
1027 cmd = ['xrandr', '--output', display]
1028 if 'resolution' in body:
1029 cmd += ['--mode', str(body['resolution'])]
1030 if 'position' in body:
1031 cmd += ['--pos', str(body['position'])]
1032 r = _run(cmd, timeout=5)
1033 ok = r and r.returncode == 0
1034 return jsonify({'status': 'ok' if ok else 'error',
1035 'message': (r.stderr if r else 'Command failed') if not ok else ''})
1037 # ─── HiDPI Scaling ─────────────────────────────────────
1039 @app.route('/api/shell/display/scale', methods=['GET', 'PUT'])
1040 def shell_display_scale():
1041 """Get or set display scale factor."""
1042 if request.method == 'GET':
1043 scale = 1.0
1044 if _is_wayland():
1045 r = _run(['swaymsg', '-t', 'get_outputs', '-r'], timeout=5)
1046 if r and r.returncode == 0:
1047 try:
1048 outputs = json.loads(r.stdout)
1049 if outputs:
1050 scale = outputs[0].get('scale', 1.0)
1051 except (json.JSONDecodeError, KeyError):
1052 pass
1053 else:
1054 gdk_scale = os.environ.get('GDK_SCALE', '1')
1055 try:
1056 scale = float(gdk_scale)
1057 except ValueError:
1058 scale = 1.0
1059 return jsonify({'scale': scale, 'compositor': 'wayland' if _is_wayland() else 'x11'})
1060 # PUT
1061 body = request.get_json(silent=True) or {}
1062 scale = body.get('scale', 1.0)
1063 display = body.get('display', '')
1064 if _is_wayland() and display:
1065 r = _run(['swaymsg', 'output', display, 'scale', str(scale)], timeout=5)
1066 ok = r and r.returncode == 0
1067 else:
1068 # X11 fallback: set GDK_SCALE env
1069 os.environ['GDK_SCALE'] = str(int(scale))
1070 ok = True
1071 return jsonify({'status': 'ok' if ok else 'error', 'scale': scale})
1073 # ─── Per-App Volume Control ────────────────────────────
1075 @app.route('/api/shell/audio/apps', methods=['GET'])
1076 def shell_audio_per_app():
1077 """List audio streams with per-app volume."""
1078 apps = []
1079 r = _run(['pw-cli', 'list-objects'], timeout=5)
1080 if r and r.returncode == 0:
1081 current = {}
1082 for line in r.stdout.split('\n'):
1083 line = line.strip()
1084 if line.startswith('id '):
1085 if current.get('type') == 'PipeWire:Interface:Node':
1086 apps.append(current)
1087 current = {'id': line.split()[1].rstrip(',')}
1088 elif 'type' in line and 'PipeWire:Interface:Node' in line:
1089 current['type'] = 'PipeWire:Interface:Node'
1090 elif 'application.name' in line:
1091 current['name'] = line.split('=', 1)[1].strip().strip('"')
1092 elif 'node.name' in line:
1093 current['node_name'] = line.split('=', 1)[1].strip().strip('"')
1094 if current.get('type') == 'PipeWire:Interface:Node':
1095 apps.append(current)
1096 # Filter to only app nodes
1097 app_nodes = [a for a in apps if a.get('name') or a.get('node_name')]
1098 return jsonify({'apps': app_nodes})
1100 @app.route('/api/shell/audio/apps/<node_id>/volume', methods=['PUT'])
1101 def shell_audio_per_app_volume(node_id):
1102 """Set volume for a specific app/node."""
1103 body = request.get_json(silent=True) or {}
1104 volume = body.get('volume', 1.0)
1105 if not isinstance(volume, (int, float)) or volume < 0 or volume > 2.0:
1106 return jsonify({'error': 'volume must be 0.0 to 2.0'}), 400
1107 r = _run(['pw-cli', 'set-param', node_id, 'Props',
1108 json.dumps({'volume': volume})], timeout=5)
1109 if r and r.returncode == 0:
1110 return jsonify({'status': 'ok', 'node_id': node_id, 'volume': volume})
1111 return jsonify({'error': r.stderr if r else 'pw-cli not available'}), 500
1113 # ─── RTL Layout Support ────────────────────────────────
1115 _RTL_LOCALES = {'ar', 'he', 'fa', 'ur', 'yi', 'ps', 'sd'}
1117 @app.route('/api/shell/rtl/status', methods=['GET'])
1118 def shell_rtl_status():
1119 """Check if current locale requires RTL layout."""
1120 locale_val = os.environ.get('LANG', 'en_US.UTF-8')
1121 lang_code = locale_val.split('_')[0].lower()
1122 is_rtl = lang_code in _RTL_LOCALES
1123 return jsonify({
1124 'rtl': is_rtl,
1125 'locale': locale_val,
1126 'css_direction': 'rtl' if is_rtl else 'ltr',
1127 })
1129 # ─── 10. Keyboard Shortcuts ────────────────────────────
1131 # Two profiles: 'windows' (default) and 'mac'
1132 # Users can switch between them without conflicts
1133 _SHORTCUTS_CFG = _config_path('keyboard_shortcuts.json')
1135 _SHORTCUT_PROFILES = {
1136 'windows': {
1137 'close_window': 'Alt+F4',
1138 'minimize': 'Super+H',
1139 'maximize': 'Super+Up',
1140 'switch_apps': 'Alt+Tab',
1141 'switch_windows': 'Alt+`',
1142 'file_manager': 'Super+E',
1143 'terminal': 'Ctrl+Alt+T',
1144 'lock_screen': 'Super+L',
1145 'search': 'Super+S',
1146 'screenshot': 'Print',
1147 'screenshot_window': 'Alt+Print',
1148 'screenshot_area': 'Shift+Print',
1149 'workspace_left': 'Super+Ctrl+Left',
1150 'workspace_right': 'Super+Ctrl+Right',
1151 'move_workspace_left': 'Super+Shift+Left',
1152 'move_workspace_right': 'Super+Shift+Right',
1153 'snap_left': 'Super+Left',
1154 'snap_right': 'Super+Right',
1155 'overview': 'Super',
1156 'app_grid': 'Super+A',
1157 'task_manager': 'Ctrl+Shift+Escape',
1158 'browser': 'Super+B',
1159 'calculator': 'Super+C',
1160 'copy': 'Ctrl+C',
1161 'paste': 'Ctrl+V',
1162 'cut': 'Ctrl+X',
1163 'undo': 'Ctrl+Z',
1164 'redo': 'Ctrl+Shift+Z',
1165 'select_all': 'Ctrl+A',
1166 'save': 'Ctrl+S',
1167 'find': 'Ctrl+F',
1168 },
1169 'mac': {
1170 'close_window': 'Super+W',
1171 'minimize': 'Super+M',
1172 'maximize': 'Super+Ctrl+F',
1173 'switch_apps': 'Super+Tab',
1174 'switch_windows': 'Super+`',
1175 'file_manager': 'Super+Shift+F',
1176 'terminal': 'Super+Space',
1177 'lock_screen': 'Super+Ctrl+Q',
1178 'search': 'Super+Space',
1179 'screenshot': 'Super+Shift+3',
1180 'screenshot_window': 'Super+Shift+4',
1181 'screenshot_area': 'Super+Shift+5',
1182 'workspace_left': 'Ctrl+Left',
1183 'workspace_right': 'Ctrl+Right',
1184 'move_workspace_left': 'Ctrl+Shift+Left',
1185 'move_workspace_right': 'Ctrl+Shift+Right',
1186 'snap_left': 'Super+Ctrl+Left',
1187 'snap_right': 'Super+Ctrl+Right',
1188 'overview': 'Super+Up',
1189 'app_grid': 'F4',
1190 'task_manager': 'Super+Alt+Escape',
1191 'browser': 'Super+Shift+B',
1192 'calculator': 'Super+Shift+C',
1193 'copy': 'Super+C',
1194 'paste': 'Super+V',
1195 'cut': 'Super+X',
1196 'undo': 'Super+Z',
1197 'redo': 'Super+Shift+Z',
1198 'select_all': 'Super+A',
1199 'save': 'Super+S',
1200 'find': 'Super+F',
1201 },
1202 }
1204 @app.route('/api/shell/shortcuts', methods=['GET'])
1205 def shell_shortcuts():
1206 """Get current keyboard shortcuts with active profile."""
1207 cfg = _load_json(_SHORTCUTS_CFG, {
1208 'profile': 'windows',
1209 'custom': {},
1210 })
1211 profile_name = cfg.get('profile', 'windows')
1212 base = dict(_SHORTCUT_PROFILES.get(profile_name, _SHORTCUT_PROFILES['windows']))
1213 # Custom overrides on top of profile
1214 base.update(cfg.get('custom', {}))
1215 return jsonify({
1216 'profile': profile_name,
1217 'available_profiles': list(_SHORTCUT_PROFILES.keys()),
1218 'shortcuts': base,
1219 })
1221 @app.route('/api/shell/shortcuts/profile', methods=['POST'])
1222 def shell_shortcuts_set_profile():
1223 """Switch shortcut profile (windows or mac)."""
1224 data = request.get_json(force=True)
1225 profile = data.get('profile', 'windows')
1226 if profile not in _SHORTCUT_PROFILES:
1227 return jsonify({'error': f'Unknown profile: {profile}. '
1228 f'Available: {list(_SHORTCUT_PROFILES.keys())}'}), 400
1229 cfg = _load_json(_SHORTCUTS_CFG, {'profile': 'windows', 'custom': {}})
1230 cfg['profile'] = profile
1231 _save_json(_SHORTCUTS_CFG, cfg)
1232 return jsonify({'profile': profile, 'shortcuts': _SHORTCUT_PROFILES[profile]})
1234 @app.route('/api/shell/shortcuts/set', methods=['POST'])
1235 def shell_shortcuts_set():
1236 """Set a custom keybinding (overrides profile default)."""
1237 data = request.get_json(force=True)
1238 action = data.get('action', '')
1239 binding = data.get('binding', '')
1240 if not action or not binding:
1241 return jsonify({'error': 'action and binding required'}), 400
1242 cfg = _load_json(_SHORTCUTS_CFG, {'profile': 'windows', 'custom': {}})
1243 if 'custom' not in cfg:
1244 cfg['custom'] = {}
1245 cfg['custom'][action] = binding
1246 _save_json(_SHORTCUTS_CFG, cfg)
1247 return jsonify({'set': True, 'action': action, 'binding': binding})
1249 @app.route('/api/shell/shortcuts/reset', methods=['POST'])
1250 def shell_shortcuts_reset():
1251 """Reset all custom overrides, revert to profile defaults."""
1252 cfg = _load_json(_SHORTCUTS_CFG, {'profile': 'windows', 'custom': {}})
1253 cfg['custom'] = {}
1254 _save_json(_SHORTCUTS_CFG, cfg)
1255 return jsonify({'reset': True, 'profile': cfg['profile']})
1257 # ─── 15. On-Screen Keyboard ─────────────────────────────
1259 @app.route('/api/shell/keyboard/osk-status', methods=['GET'])
1260 def shell_osk_status():
1261 """Get on-screen keyboard status."""
1262 backends = ['squeekboard', 'onboard', 'maliit-keyboard']
1263 running = None
1264 for name in backends:
1265 r = _run(['pgrep', '-x', name], timeout=3)
1266 if r and r.returncode == 0:
1267 running = name
1268 break
1270 enabled = False
1271 r = _run(['systemctl', '--user', 'is-active', 'squeekboard'], timeout=3)
1272 if r and r.returncode == 0 and 'active' in r.stdout:
1273 enabled = True
1275 return jsonify({
1276 'enabled': enabled,
1277 'running': running is not None,
1278 'backend': running or 'none',
1279 'available': [b for b in backends
1280 if _run(['which', b], timeout=2) is not None
1281 and _run(['which', b], timeout=2).returncode == 0],
1282 })
1284 @app.route('/api/shell/keyboard/osk-toggle', methods=['POST'])
1285 @_require_desktop_auth
1286 def shell_osk_toggle():
1287 """Toggle on-screen keyboard."""
1288 body = request.get_json(silent=True) or {}
1289 enable = body.get('enable') # None = toggle
1291 # Check current state
1292 r = _run(['pgrep', '-x', 'squeekboard'], timeout=3)
1293 is_running = r and r.returncode == 0
1295 if enable is None:
1296 enable = not is_running
1298 if enable and not is_running:
1299 _run(['systemctl', '--user', 'start', 'squeekboard'], timeout=5)
1300 elif not enable and is_running:
1301 _run(['systemctl', '--user', 'stop', 'squeekboard'], timeout=5)
1303 return jsonify({'enabled': enable})
1305 # ─── Voice Control (AI-native: Whisper STT → HART agent) ──
1307 _voice_state = {'listening': False, 'pid': None, 'last_transcript': None}
1308 _voice_lock = threading.Lock()
1310 @app.route('/api/shell/voice/status', methods=['GET'])
1311 def shell_voice_status():
1312 """Get voice control status."""
1313 with _voice_lock:
1314 return jsonify({
1315 'listening': _voice_state['listening'],
1316 'stt_available': _check_whisper_available(),
1317 'last_transcript': _voice_state['last_transcript'],
1318 })
1320 def _check_whisper_available():
1321 """Check if Whisper STT is available."""
1322 try:
1323 from integrations.service_tools.whisper_tool import whisper_transcribe
1324 return True
1325 except ImportError:
1326 return False
1328 @app.route('/api/shell/voice/start', methods=['POST'])
1329 @_require_desktop_auth
1330 def shell_voice_start():
1331 """Start listening for voice commands.
1333 AI-native: Uses Whisper STT (local, offline) to transcribe,
1334 then dispatches the transcript to the HART agent pipeline
1335 for full natural language understanding — not keyword matching.
1336 """
1337 with _voice_lock:
1338 if _voice_state['listening']:
1339 return jsonify({'error': 'already listening'}), 409
1341 if not _check_whisper_available():
1342 return jsonify({'error': 'whisper STT not available'}), 503
1344 with _voice_lock:
1345 _voice_state['listening'] = True
1347 _audit_desktop_op('voice_start', {})
1348 return jsonify({'started': True, 'mode': 'whisper_stt_to_agent'})
1350 @app.route('/api/shell/voice/stop', methods=['POST'])
1351 @_require_desktop_auth
1352 def shell_voice_stop():
1353 """Stop voice control listening."""
1354 with _voice_lock:
1355 _voice_state['listening'] = False
1356 _voice_state['pid'] = None
1357 _audit_desktop_op('voice_stop', {})
1358 return jsonify({'stopped': True})
1360 @app.route('/api/shell/voice/process', methods=['POST'])
1361 @_require_desktop_auth
1362 def shell_voice_process():
1363 """Process a voice command: transcribe audio → dispatch to HART agent.
1365 This is the AI-native pipeline:
1366 1. Audio file → Whisper STT (local, offline) → transcript
1367 2. Transcript → HART /chat endpoint → agent executes task
1368 3. Agent response returned to caller
1370 Unlike Siri/Cortana/Alexa that use keyword matching and fixed intents,
1371 this sends the full natural language to the agent for understanding.
1372 The agent can execute ANY task — file ops, code, web search, etc.
1373 """
1374 body = request.get_json(silent=True) or {}
1375 audio_path = body.get('audio_path', '')
1376 text = body.get('text', '') # Pre-transcribed text (skip STT)
1378 # Step 1: Transcribe if audio provided
1379 if audio_path and not text:
1380 if not os.path.isfile(audio_path):
1381 return jsonify({'error': 'audio file not found'}), 404
1382 try:
1383 from integrations.service_tools.whisper_tool import whisper_transcribe
1384 result = json.loads(whisper_transcribe(audio_path))
1385 text = result.get('text', '').strip()
1386 except Exception as e:
1387 return jsonify({'error': f'transcription failed: {e}'}), 500
1389 if not text:
1390 return jsonify({'error': 'no audio_path or text provided'}), 400
1392 with _voice_lock:
1393 _voice_state['last_transcript'] = text
1395 # Step 2: Dispatch to HART agent pipeline
1396 try:
1397 import urllib.request
1398 from core.port_registry import get_port
1399 port = get_port('backend')
1400 payload = json.dumps({
1401 'user_id': 'voice_user',
1402 'prompt_id': 'voice_cmd',
1403 'prompt': text,
1404 }).encode('utf-8')
1405 req = urllib.request.Request(
1406 f'http://localhost:{port}/chat',
1407 data=payload,
1408 headers={'Content-Type': 'application/json'},
1409 )
1410 with urllib.request.urlopen(req, timeout=30) as resp:
1411 agent_response = json.loads(resp.read().decode('utf-8'))
1412 return jsonify({
1413 'transcript': text,
1414 'agent_response': agent_response,
1415 'mode': 'ai_native',
1416 })
1417 except Exception as e:
1418 # Even if agent dispatch fails, return the transcript
1419 return jsonify({
1420 'transcript': text,
1421 'agent_response': None,
1422 'error': f'agent dispatch failed: {e}',
1423 'mode': 'ai_native',
1424 }), 200
1426 logger.info("Registered shell desktop routes (16 features)")