Coverage for integrations / agent_engine / app_bridge_service.py: 0.0%
294 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HART OS App Bridge Service — Cross-Subsystem Intelligence.
4Makes subsystem boundaries invisible. An Android app can call a Linux
5service. A Windows app can use an AI model. Everything talks to everything
6through OS-native agents.
8Cross-subsystem IPC:
9 Android → Intent → App Bridge → D-Bus → Linux service
10 Web/PWA → HTTP → App Bridge → Pipe → Windows app (Wine)
11 AI Agent → Socket → App Bridge → Binder → Android Activity
12 Any app → Bridge → Semantic Router → best handler regardless of subsystem
14Also unifies:
15 - Clipboard (copy in Android, paste in Linux)
16 - Drag & drop (XDG portal)
17 - File sharing (cross-subsystem file access)
18 - Notifications (unified notification stream)
19"""
20import hashlib
21import json
22import logging
23import os
24import subprocess
25import threading
26import time
27from typing import Any, Dict, List, Optional
29logger = logging.getLogger('hevolve.app_bridge')
31# ═══════════════════════════════════════════════════════════════
32# Capability Registry
33# ═══════════════════════════════════════════════════════════════
35class Capability:
36 """A registered capability from any subsystem."""
38 def __init__(
39 self,
40 name: str,
41 subsystem: str,
42 handler: str,
43 actions: Optional[List[str]] = None,
44 mime_types: Optional[List[str]] = None,
45 priority: int = 50,
46 metadata: Optional[dict] = None,
47 ):
48 self.name = name
49 self.subsystem = subsystem # linux, android, windows, web, ai
50 self.handler = handler # D-Bus path, intent, COM object, URL, etc.
51 self.actions = actions or [] # open, edit, share, view, translate, etc.
52 self.mime_types = mime_types or []
53 self.priority = priority # 0-100, higher = preferred
54 self.metadata = metadata or {}
55 self.registered_at = time.time()
57 def to_dict(self) -> dict:
58 return {
59 'name': self.name,
60 'subsystem': self.subsystem,
61 'handler': self.handler,
62 'actions': self.actions,
63 'mime_types': self.mime_types,
64 'priority': self.priority,
65 'metadata': self.metadata,
66 'registered_at': self.registered_at,
67 }
69 def matches(self, action: str = '', mime_type: str = '') -> bool:
70 """Check if this capability handles the given action/mime_type."""
71 action_match = not action or action in self.actions or '*' in self.actions
72 mime_match = not mime_type or mime_type in self.mime_types or '*/*' in self.mime_types
73 return action_match and mime_match
76class CapabilityRegistry:
77 """Thread-safe registry of capabilities from all subsystems."""
79 def __init__(self):
80 self._capabilities: Dict[str, Capability] = {}
81 self._lock = threading.Lock()
83 def register(self, capability: Capability) -> str:
84 """Register a capability. Returns capability ID."""
85 cap_id = hashlib.sha256(
86 f"{capability.subsystem}:{capability.name}:{capability.handler}".encode()
87 ).hexdigest()[:12]
89 with self._lock:
90 self._capabilities[cap_id] = capability
92 logger.info(
93 f"Capability registered: {capability.name} "
94 f"({capability.subsystem}) -> {capability.handler}"
95 )
96 return cap_id
98 def unregister(self, cap_id: str) -> bool:
99 with self._lock:
100 if cap_id in self._capabilities:
101 del self._capabilities[cap_id]
102 return True
103 return False
105 def query(
106 self, action: str = '', mime_type: str = '', subsystem: str = ''
107 ) -> List[Capability]:
108 """Find capabilities matching the query, sorted by priority."""
109 with self._lock:
110 results = []
111 for cap in self._capabilities.values():
112 if subsystem and cap.subsystem != subsystem:
113 continue
114 if cap.matches(action, mime_type):
115 results.append(cap)
117 results.sort(key=lambda c: c.priority, reverse=True)
118 return results
120 def list_all(self) -> List[Dict[str, Any]]:
121 with self._lock:
122 return [c.to_dict() for c in self._capabilities.values()]
124 def get_subsystems(self) -> Dict[str, int]:
125 """Count capabilities per subsystem."""
126 with self._lock:
127 counts: Dict[str, int] = {}
128 for cap in self._capabilities.values():
129 counts[cap.subsystem] = counts.get(cap.subsystem, 0) + 1
130 return counts
133# ═══════════════════════════════════════════════════════════════
134# Semantic Router
135# ═══════════════════════════════════════════════════════════════
137class SemanticRouter:
138 """Routes requests to the best capability across subsystems."""
140 def __init__(self, registry: CapabilityRegistry, model_bus_port: int = 6790):
141 self.registry = registry
142 self.model_bus_port = model_bus_port
144 def route(
145 self,
146 action: str,
147 data: str = '',
148 mime_type: str = '',
149 preferred_subsystem: str = '',
150 ) -> Dict[str, Any]:
151 """Route an action to the best available handler."""
152 candidates = self.registry.query(
153 action=action, mime_type=mime_type, subsystem=preferred_subsystem
154 )
156 if not candidates:
157 # No native handler — try AI fallback
158 return self._ai_fallback(action, data, mime_type)
160 best = candidates[0]
161 return self._dispatch(best, action, data)
163 def _dispatch(self, capability: Capability, action: str, data: str) -> Dict[str, Any]:
164 """Dispatch to the handler based on subsystem type."""
165 subsystem = capability.subsystem
167 if subsystem == 'linux':
168 return self._dispatch_linux(capability, action, data)
169 elif subsystem == 'android':
170 return self._dispatch_android(capability, action, data)
171 elif subsystem == 'windows':
172 return self._dispatch_windows(capability, action, data)
173 elif subsystem == 'web':
174 return self._dispatch_web(capability, action, data)
175 elif subsystem == 'ai':
176 return self._dispatch_ai(capability, action, data)
177 else:
178 return {'error': f'Unknown subsystem: {subsystem}'}
180 def _dispatch_linux(self, cap: Capability, action: str, data: str) -> Dict[str, Any]:
181 """Dispatch to Linux D-Bus service or CLI tool."""
182 handler = cap.handler
184 if handler.startswith('dbus:'):
185 # D-Bus method call
186 dbus_dest = handler[5:]
187 try:
188 result = subprocess.run(
189 ['busctl', 'call', '--system', dbus_dest, '/', 'Execute', 's', data],
190 capture_output=True, text=True, timeout=30,
191 )
192 return {
193 'status': 'success' if result.returncode == 0 else 'error',
194 'subsystem': 'linux',
195 'handler': handler,
196 'output': result.stdout.strip() if result.returncode == 0 else result.stderr.strip(),
197 }
198 except Exception as e:
199 return {'error': f'D-Bus call failed: {str(e)}', 'subsystem': 'linux'}
201 elif handler.startswith('cli:'):
202 # CLI tool
203 cmd = handler[4:]
204 try:
205 result = subprocess.run(
206 [cmd, data] if data else [cmd],
207 capture_output=True, text=True, timeout=30,
208 )
209 return {
210 'status': 'success' if result.returncode == 0 else 'error',
211 'subsystem': 'linux',
212 'handler': handler,
213 'output': result.stdout.strip(),
214 }
215 except Exception as e:
216 return {'error': f'CLI execution failed: {str(e)}', 'subsystem': 'linux'}
218 return {'error': f'Unknown Linux handler format: {handler}', 'subsystem': 'linux'}
220 def _dispatch_android(self, cap: Capability, action: str, data: str) -> Dict[str, Any]:
221 """Dispatch to Android via ART bridge (am command)."""
222 handler = cap.handler
224 if handler.startswith('intent:'):
225 # Android Intent via am (Activity Manager)
226 intent_action = handler[7:]
227 try:
228 cmd = ['am', 'start', '-a', intent_action]
229 if data:
230 cmd.extend(['-d', data])
232 result = subprocess.run(
233 cmd, capture_output=True, text=True, timeout=15,
234 )
235 return {
236 'status': 'success' if result.returncode == 0 else 'error',
237 'subsystem': 'android',
238 'handler': handler,
239 'output': result.stdout.strip(),
240 }
241 except Exception as e:
242 return {'error': f'Android intent failed: {str(e)}', 'subsystem': 'android'}
244 elif handler.startswith('activity:'):
245 # Direct Activity launch
246 activity = handler[9:]
247 try:
248 result = subprocess.run(
249 ['am', 'start', '-n', activity],
250 capture_output=True, text=True, timeout=15,
251 )
252 return {
253 'status': 'success' if result.returncode == 0 else 'error',
254 'subsystem': 'android',
255 'handler': handler,
256 'output': result.stdout.strip(),
257 }
258 except Exception as e:
259 return {'error': f'Android activity failed: {str(e)}', 'subsystem': 'android'}
261 return {'error': f'Unknown Android handler: {handler}', 'subsystem': 'android'}
263 def _dispatch_windows(self, cap: Capability, action: str, data: str) -> Dict[str, Any]:
264 """Dispatch to Windows app via Wine."""
265 handler = cap.handler
267 if handler.startswith('wine:'):
268 exe_path = handler[5:]
269 try:
270 result = subprocess.run(
271 ['wine', exe_path, data] if data else ['wine', exe_path],
272 capture_output=True, text=True, timeout=30,
273 )
274 return {
275 'status': 'success' if result.returncode == 0 else 'error',
276 'subsystem': 'windows',
277 'handler': handler,
278 'output': result.stdout.strip(),
279 }
280 except Exception as e:
281 return {'error': f'Wine execution failed: {str(e)}', 'subsystem': 'windows'}
283 return {'error': f'Unknown Windows handler: {handler}', 'subsystem': 'windows'}
285 def _dispatch_web(self, cap: Capability, action: str, data: str) -> Dict[str, Any]:
286 """Dispatch to Web/PWA via HTTP."""
287 from core.http_pool import pooled_post
289 handler = cap.handler
290 if handler.startswith('http'):
291 try:
292 resp = pooled_post(
293 handler,
294 json={'action': action, 'data': data},
295 timeout=30,
296 )
297 return {
298 'status': 'success' if resp.status_code == 200 else 'error',
299 'subsystem': 'web',
300 'handler': handler,
301 'output': resp.text[:1000],
302 }
303 except Exception as e:
304 return {'error': f'Web dispatch failed: {str(e)}', 'subsystem': 'web'}
306 return {'error': f'Unknown Web handler: {handler}', 'subsystem': 'web'}
308 def _dispatch_ai(self, cap: Capability, action: str, data: str) -> Dict[str, Any]:
309 """Dispatch to AI model via Model Bus."""
310 return self._ai_fallback(action, data, '')
312 def _ai_fallback(self, action: str, data: str, mime_type: str) -> Dict[str, Any]:
313 """Fall back to AI agent when no native handler available."""
314 from core.http_pool import pooled_post
316 prompt = f"Action: {action}\nData: {data}"
317 if mime_type:
318 prompt += f"\nMIME type: {mime_type}"
320 try:
321 resp = pooled_post(
322 f'http://localhost:{self.model_bus_port}/v1/chat',
323 json={'prompt': prompt, 'max_tokens': 512},
324 timeout=60,
325 )
326 if resp.status_code == 200:
327 result = resp.json()
328 return {
329 'status': 'success',
330 'subsystem': 'ai',
331 'handler': 'model_bus',
332 'output': result.get('response', str(result)),
333 'ai_fallback': True,
334 }
335 except Exception as e:
336 logger.warning(f"AI fallback failed: {e}")
338 return {
339 'error': f'No handler found for action={action}, mime={mime_type}',
340 'ai_fallback_attempted': True,
341 }
344# ═══════════════════════════════════════════════════════════════
345# Unified Clipboard
346# ═══════════════════════════════════════════════════════════════
348class UnifiedClipboard:
349 """Cross-subsystem clipboard synchronization."""
351 def __init__(self):
352 self._content: str = ''
353 self._content_type: str = 'text/plain'
354 self._source: str = ''
355 self._timestamp: float = 0
356 self._lock = threading.Lock()
358 def set_content(self, content: str, content_type: str = 'text/plain',
359 source: str = 'unknown') -> Dict[str, Any]:
360 with self._lock:
361 self._content = content
362 self._content_type = content_type
363 self._source = source
364 self._timestamp = time.time()
366 logger.debug(f"Clipboard updated from {source}: {content_type} ({len(content)} chars)")
367 return {'status': 'set', 'source': source, 'content_type': content_type}
369 def get_content(self) -> Dict[str, Any]:
370 with self._lock:
371 return {
372 'content': self._content,
373 'content_type': self._content_type,
374 'source': self._source,
375 'timestamp': self._timestamp,
376 'age_seconds': int(time.time() - self._timestamp) if self._timestamp else 0,
377 }
379 def clear(self):
380 with self._lock:
381 self._content = ''
382 self._content_type = 'text/plain'
383 self._source = ''
384 self._timestamp = 0
387# ═══════════════════════════════════════════════════════════════
388# App Bridge Service
389# ═══════════════════════════════════════════════════════════════
391class AppBridgeService:
392 """Cross-subsystem agent routing via OS-native IPC."""
394 def __init__(
395 self,
396 socket_path: str = '/run/hart/app-bridge.sock',
397 http_port: int = 6810,
398 cross_subsystem: bool = True,
399 intent_router: bool = True,
400 clipboard_sync: bool = True,
401 drag_and_drop: bool = True,
402 ai_fallback: bool = True,
403 model_bus_port: int = 6790,
404 backend_port: int = 6777,
405 ):
406 self.socket_path = socket_path
407 self.http_port = http_port
408 self.cross_subsystem = cross_subsystem
409 self.intent_router = intent_router
410 self.clipboard_sync = clipboard_sync
411 self.drag_and_drop = drag_and_drop
412 self.ai_fallback = ai_fallback
413 self.model_bus_port = model_bus_port
414 self.backend_port = backend_port
416 self.registry = CapabilityRegistry()
417 self.router = SemanticRouter(self.registry, model_bus_port)
418 self.clipboard = UnifiedClipboard()
420 self._running = False
421 self._active_subsystems: List[str] = []
423 logger.info(
424 f"AppBridgeService initialized: http_port={http_port}, "
425 f"cross_subsystem={cross_subsystem}, ai_fallback={ai_fallback}"
426 )
428 # ─── Subsystem Detection ─────────────────────────────────
430 def detect_subsystems(self) -> List[str]:
431 """Detect which subsystems are active on this device."""
432 subsystems = ['linux'] # Linux always present
434 # Android (check for ART runtime)
435 try:
436 result = subprocess.run(
437 ['pgrep', '-f', 'zygote'], capture_output=True, timeout=5,
438 )
439 if result.returncode == 0:
440 subsystems.append('android')
441 except Exception:
442 pass
444 # Windows (Wine)
445 try:
446 result = subprocess.run(
447 ['which', 'wine'], capture_output=True, timeout=5,
448 )
449 if result.returncode == 0:
450 subsystems.append('windows')
451 except Exception:
452 pass
454 # Web/PWA
455 try:
456 result = subprocess.run(
457 ['which', 'chromium'], capture_output=True, timeout=5,
458 )
459 if result.returncode == 0:
460 subsystems.append('web')
461 except Exception:
462 pass
464 # AI (Model Bus)
465 from core.http_pool import pooled_get
466 try:
467 resp = pooled_get(
468 f'http://localhost:{self.model_bus_port}/v1/status', timeout=3,
469 )
470 if resp.status_code == 200:
471 subsystems.append('ai')
472 except Exception:
473 pass
475 self._active_subsystems = subsystems
476 logger.info(f"Active subsystems: {subsystems}")
477 return subsystems
479 # ─── Default Capability Seeding ───────────────────────────
481 def _seed_default_capabilities(self):
482 """Register built-in capabilities for detected subsystems."""
483 if 'linux' in self._active_subsystems:
484 self.registry.register(Capability(
485 name='file_manager', subsystem='linux',
486 handler='cli:xdg-open',
487 actions=['open', 'view'],
488 mime_types=['*/*'],
489 priority=30,
490 ))
491 self.registry.register(Capability(
492 name='text_editor', subsystem='linux',
493 handler='cli:xdg-open',
494 actions=['edit'],
495 mime_types=['text/*', 'application/json', 'application/xml'],
496 priority=40,
497 ))
499 if 'android' in self._active_subsystems:
500 self.registry.register(Capability(
501 name='android_share', subsystem='android',
502 handler='intent:android.intent.action.SEND',
503 actions=['share'],
504 mime_types=['*/*'],
505 priority=60,
506 ))
507 self.registry.register(Capability(
508 name='android_view', subsystem='android',
509 handler='intent:android.intent.action.VIEW',
510 actions=['open', 'view'],
511 mime_types=['*/*'],
512 priority=50,
513 ))
514 self.registry.register(Capability(
515 name='android_camera', subsystem='android',
516 handler='intent:android.media.action.IMAGE_CAPTURE',
517 actions=['capture', 'photo'],
518 mime_types=['image/*'],
519 priority=80,
520 ))
522 if 'ai' in self._active_subsystems:
523 self.registry.register(Capability(
524 name='ai_describe', subsystem='ai',
525 handler=f'http://localhost:{self.model_bus_port}/v1/vision',
526 actions=['describe', 'analyze', 'classify'],
527 mime_types=['image/*'],
528 priority=70,
529 ))
530 self.registry.register(Capability(
531 name='ai_translate', subsystem='ai',
532 handler=f'http://localhost:{self.model_bus_port}/v1/chat',
533 actions=['translate', 'summarize', 'explain'],
534 mime_types=['text/*'],
535 priority=80,
536 ))
537 self.registry.register(Capability(
538 name='ai_tts', subsystem='ai',
539 handler=f'http://localhost:{self.model_bus_port}/v1/tts',
540 actions=['speak', 'tts'],
541 mime_types=['text/plain'],
542 priority=90,
543 ))
544 self.registry.register(Capability(
545 name='ai_stt', subsystem='ai',
546 handler=f'http://localhost:{self.model_bus_port}/v1/stt',
547 actions=['transcribe', 'stt', 'listen'],
548 mime_types=['audio/*'],
549 priority=90,
550 ))
552 logger.info(
553 f"Seeded {len(self.registry.list_all())} default capabilities"
554 )
556 # ─── Intent Router ────────────────────────────────────────
558 def route_intent(
559 self, action: str, data: str = '', mime_type: str = '',
560 source_subsystem: str = '', preferred_subsystem: str = '',
561 ) -> Dict[str, Any]:
562 """Route an intent/action to the best handler across subsystems."""
563 if not self.cross_subsystem and source_subsystem:
564 # Only route within the same subsystem
565 preferred_subsystem = source_subsystem
567 result = self.router.route(
568 action=action,
569 data=data,
570 mime_type=mime_type,
571 preferred_subsystem=preferred_subsystem,
572 )
574 result['source_subsystem'] = source_subsystem
575 result['action'] = action
576 return result
578 # ─── File Open (Cross-Subsystem) ──────────────────────────
580 def open_file(self, path: str, preferred_subsystem: str = '') -> Dict[str, Any]:
581 """Open a file with the best handler from any subsystem."""
582 import mimetypes
583 mime_type, _ = mimetypes.guess_type(path)
584 mime_type = mime_type or 'application/octet-stream'
586 return self.route_intent(
587 action='open',
588 data=path,
589 mime_type=mime_type,
590 preferred_subsystem=preferred_subsystem,
591 )
593 # ─── Status ───────────────────────────────────────────────
595 def get_status(self) -> Dict[str, Any]:
596 capabilities = self.registry.list_all()
597 subsystem_counts = self.registry.get_subsystems()
599 return {
600 'status': 'running' if self._running else 'stopped',
601 'active_subsystems': self._active_subsystems,
602 'capability_count': len(capabilities),
603 'capabilities_by_subsystem': subsystem_counts,
604 'features': {
605 'cross_subsystem': self.cross_subsystem,
606 'intent_router': self.intent_router,
607 'clipboard_sync': self.clipboard_sync,
608 'drag_and_drop': self.drag_and_drop,
609 'ai_fallback': self.ai_fallback,
610 },
611 'http_port': self.http_port,
612 }
614 # ─── HTTP Server ──────────────────────────────────────────
616 def _create_flask_app(self):
617 """Create Flask app for bridge HTTP API."""
618 from flask import Flask, request, jsonify
620 app = Flask(__name__)
622 @app.route('/v1/capabilities', methods=['GET'])
623 def list_capabilities():
624 return jsonify({
625 'capabilities': self.registry.list_all(),
626 'count': len(self.registry.list_all()),
627 })
629 @app.route('/v1/capabilities/register', methods=['POST'])
630 def register_capability():
631 data = request.get_json(force=True)
632 cap = Capability(
633 name=data.get('name', ''),
634 subsystem=data.get('subsystem', 'linux'),
635 handler=data.get('handler', ''),
636 actions=data.get('actions', []),
637 mime_types=data.get('mime_types', []),
638 priority=data.get('priority', 50),
639 metadata=data.get('metadata', {}),
640 )
641 cap_id = self.registry.register(cap)
642 return jsonify({'cap_id': cap_id, 'status': 'registered'})
644 @app.route('/v1/capabilities/query', methods=['POST'])
645 def query_capabilities():
646 data = request.get_json(force=True)
647 results = self.registry.query(
648 action=data.get('action', ''),
649 mime_type=data.get('mime_type', ''),
650 subsystem=data.get('subsystem', ''),
651 )
652 return jsonify({
653 'results': [c.to_dict() for c in results],
654 'count': len(results),
655 })
657 @app.route('/v1/subsystems', methods=['GET'])
658 def list_subsystems():
659 return jsonify({
660 'active': self._active_subsystems,
661 'capability_counts': self.registry.get_subsystems(),
662 })
664 @app.route('/v1/route', methods=['POST'])
665 def route_action():
666 data = request.get_json(force=True)
667 result = self.route_intent(
668 action=data.get('action', ''),
669 data=data.get('data', ''),
670 mime_type=data.get('mime_type', ''),
671 source_subsystem=data.get('source', ''),
672 preferred_subsystem=data.get('preferred', ''),
673 )
674 return jsonify(result)
676 @app.route('/v1/open', methods=['POST'])
677 def open_file_route():
678 data = request.get_json(force=True)
679 result = self.open_file(
680 path=data.get('path', ''),
681 preferred_subsystem=data.get('preferred', ''),
682 )
683 return jsonify(result)
685 @app.route('/v1/clipboard', methods=['GET'])
686 def get_clipboard():
687 return jsonify(self.clipboard.get_content())
689 @app.route('/v1/clipboard', methods=['POST'])
690 def set_clipboard():
691 data = request.get_json(force=True)
692 result = self.clipboard.set_content(
693 content=data.get('content', ''),
694 content_type=data.get('content_type', 'text/plain'),
695 source=data.get('source', 'http'),
696 )
697 return jsonify(result)
699 @app.route('/v1/status', methods=['GET'])
700 def status():
701 return jsonify(self.get_status())
703 @app.route('/health', methods=['GET'])
704 def health():
705 return jsonify({'status': 'ok', 'service': 'app-bridge'})
707 return app
709 # ─── Serve ────────────────────────────────────────────────
711 def serve_forever(self):
712 """Start the App Bridge service."""
713 self._running = True
715 # Detect active subsystems
716 self.detect_subsystems()
718 # Seed default capabilities
719 self._seed_default_capabilities()
721 # Background: periodic subsystem re-detection
722 def _detect_loop():
723 while self._running:
724 time.sleep(60)
725 try:
726 self.detect_subsystems()
727 except Exception as e:
728 logger.error(f"Subsystem detection error: {e}")
730 threading.Thread(target=_detect_loop, daemon=True).start()
732 # Start Flask HTTP server
733 app = self._create_flask_app()
734 logger.info(f"App Bridge HTTP API starting on port {self.http_port}")
736 try:
737 from waitress import serve
738 serve(app, host='0.0.0.0', port=self.http_port, threads=4)
739 except ImportError:
740 app.run(host='0.0.0.0', port=self.http_port, threaded=True)