Coverage for integrations / channels / gateway / protocol.py: 0.0%

316 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Gateway Protocol - JSON-RPC 2.0 Based Gateway 

3 

4Provides a JSON-RPC 2.0 compliant protocol for inter-service communication 

5in Docker container networks. Handles method registration, request routing, 

6notifications, and error handling. 

7 

8Features: 

9- JSON-RPC 2.0 compliance 

10- Method registration with handlers 

11- Async request/response handling 

12- Notification support (no response expected) 

13- Docker network addressing support 

14- Container-friendly persistence 

15- Error handling with standard codes 

16""" 

17 

18from __future__ import annotations 

19 

20import asyncio 

21import json 

22import logging 

23import os 

24import time 

25import uuid 

26from dataclasses import dataclass, field, asdict 

27from datetime import datetime 

28from enum import Enum 

29from typing import Any, Callable, Coroutine, Dict, List, Optional, Union 

30 

31logger = logging.getLogger(__name__) 

32 

33 

34class JsonRpcErrorCode(Enum): 

35 """Standard JSON-RPC 2.0 error codes.""" 

36 PARSE_ERROR = -32700 

37 INVALID_REQUEST = -32600 

38 METHOD_NOT_FOUND = -32601 

39 INVALID_PARAMS = -32602 

40 INTERNAL_ERROR = -32603 

41 # Custom error codes (application-specific) 

42 TIMEOUT_ERROR = -32000 

43 UNAUTHORIZED = -32001 

44 RATE_LIMITED = -32002 

45 SERVICE_UNAVAILABLE = -32003 

46 

47 

48@dataclass 

49class JsonRpcError: 

50 """JSON-RPC 2.0 error object.""" 

51 code: int 

52 message: str 

53 data: Optional[Any] = None 

54 

55 def to_dict(self) -> Dict[str, Any]: 

56 result = {"code": self.code, "message": self.message} 

57 if self.data is not None: 

58 result["data"] = self.data 

59 return result 

60 

61 

62@dataclass 

63class JsonRpcRequest: 

64 """JSON-RPC 2.0 request object.""" 

65 method: str 

66 params: Optional[Union[Dict, List]] = None 

67 id: Optional[Union[str, int]] = None 

68 jsonrpc: str = "2.0" 

69 

70 def is_notification(self) -> bool: 

71 """Check if this is a notification (no id = no response expected).""" 

72 return self.id is None 

73 

74 def to_dict(self) -> Dict[str, Any]: 

75 result = {"jsonrpc": self.jsonrpc, "method": self.method} 

76 if self.params is not None: 

77 result["params"] = self.params 

78 if self.id is not None: 

79 result["id"] = self.id 

80 return result 

81 

82 @classmethod 

83 def from_dict(cls, data: Dict[str, Any]) -> JsonRpcRequest: 

84 return cls( 

85 method=data.get("method", ""), 

86 params=data.get("params"), 

87 id=data.get("id"), 

88 jsonrpc=data.get("jsonrpc", "2.0"), 

89 ) 

90 

91 

92@dataclass 

93class JsonRpcResponse: 

94 """JSON-RPC 2.0 response object.""" 

95 id: Optional[Union[str, int]] 

96 result: Optional[Any] = None 

97 error: Optional[JsonRpcError] = None 

98 jsonrpc: str = "2.0" 

99 

100 def to_dict(self) -> Dict[str, Any]: 

101 result = {"jsonrpc": self.jsonrpc, "id": self.id} 

102 if self.error is not None: 

103 result["error"] = self.error.to_dict() 

104 else: 

105 result["result"] = self.result 

106 return result 

107 

108 @classmethod 

109 def success(cls, request_id: Union[str, int], result: Any) -> JsonRpcResponse: 

110 return cls(id=request_id, result=result) 

111 

112 @classmethod 

113 def error_response( 

114 cls, 

115 request_id: Optional[Union[str, int]], 

116 code: int, 

117 message: str, 

118 data: Optional[Any] = None 

119 ) -> JsonRpcResponse: 

120 return cls(id=request_id, error=JsonRpcError(code=code, message=message, data=data)) 

121 

122 

123@dataclass 

124class GatewayConfig: 

125 """Configuration for the gateway protocol.""" 

126 host: str = "0.0.0.0" # Bind to all interfaces for Docker 

127 port: int = 9000 

128 # Docker network settings 

129 docker_network: Optional[str] = None 

130 container_name: Optional[str] = None 

131 # Persistence path (should be volume-mounted in Docker) 

132 persistence_path: Optional[str] = None 

133 # Timeouts 

134 request_timeout_ms: int = 30000 

135 connect_timeout_ms: int = 5000 

136 # Security 

137 require_auth: bool = False 

138 api_keys: List[str] = field(default_factory=list) 

139 # Rate limiting 

140 rate_limit_per_second: int = 100 

141 

142 def get_persistence_path(self) -> str: 

143 """Get persistence path, defaulting to Docker-friendly location.""" 

144 if self.persistence_path: 

145 return self.persistence_path 

146 # Default to volume-mounted data directory 

147 import sys as _sys 

148 if os.environ.get('NUNBA_BUNDLED') or getattr(_sys, 'frozen', False): 

149 try: 

150 from core.platform_paths import get_agent_data_dir 

151 _default = os.path.join(get_agent_data_dir(), 'gateway') 

152 except ImportError: 

153 _default = os.path.join(os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'agent_data', 'gateway') 

154 elif os.path.exists("/app"): 

155 _default = "/app/data/gateway" 

156 else: 

157 _default = "./agent_data/gateway" 

158 return os.environ.get("GATEWAY_DATA_PATH", _default) 

159 

160 

161@dataclass 

162class MethodInfo: 

163 """Information about a registered method.""" 

164 name: str 

165 handler: Callable 

166 description: str = "" 

167 params_schema: Optional[Dict[str, Any]] = None 

168 requires_auth: bool = False 

169 

170 

171@dataclass 

172class NotificationTarget: 

173 """Target for notifications.""" 

174 url: str 

175 methods: List[str] = field(default_factory=list) 

176 headers: Dict[str, str] = field(default_factory=dict) 

177 

178 

179@dataclass 

180class GatewayStats: 

181 """Gateway statistics.""" 

182 total_requests: int = 0 

183 total_notifications: int = 0 

184 total_errors: int = 0 

185 methods_called: Dict[str, int] = field(default_factory=dict) 

186 avg_response_time_ms: float = 0.0 

187 uptime_seconds: float = 0.0 

188 

189 def to_dict(self) -> Dict[str, Any]: 

190 return asdict(self) 

191 

192 

193class GatewayProtocol: 

194 """ 

195 JSON-RPC 2.0 based gateway for inter-service communication. 

196 

197 Designed for Docker container environments with support for: 

198 - Container network addressing 

199 - Volume-mounted persistence 

200 - Health checks 

201 - Service discovery 

202 

203 Usage: 

204 gateway = GatewayProtocol() 

205 

206 # Register methods 

207 gateway.register_method("echo", echo_handler) 

208 gateway.register_method("process", process_handler, requires_auth=True) 

209 

210 # Handle requests 

211 response = await gateway.handle_request({"jsonrpc": "2.0", "method": "echo", "params": {"msg": "hi"}, "id": 1}) 

212 

213 # Send notifications 

214 await gateway.send_notification("event.message", {"channel": "telegram", "text": "Hello"}) 

215 """ 

216 

217 def __init__(self, config: Optional[GatewayConfig] = None): 

218 self.config = config or GatewayConfig() 

219 self._methods: Dict[str, MethodInfo] = {} 

220 self._notification_targets: List[NotificationTarget] = [] 

221 self._pending_requests: Dict[str, asyncio.Future] = {} 

222 self._start_time = time.time() 

223 self._stats = GatewayStats() 

224 self._response_times: List[float] = [] 

225 self._running = False 

226 

227 # Ensure persistence directory exists 

228 self._ensure_persistence_dir() 

229 

230 # Load state 

231 self._load_state() 

232 

233 # Register built-in methods 

234 self._register_builtin_methods() 

235 

236 def _ensure_persistence_dir(self) -> None: 

237 """Ensure persistence directory exists (for Docker volumes).""" 

238 path = self.config.get_persistence_path() 

239 try: 

240 os.makedirs(path, exist_ok=True) 

241 except Exception as e: 

242 logger.warning(f"Could not create persistence directory {path}: {e}") 

243 

244 def _get_state_file(self) -> str: 

245 """Get path to state file.""" 

246 return os.path.join(self.config.get_persistence_path(), "gateway_state.json") 

247 

248 def _load_state(self) -> None: 

249 """Load persisted state.""" 

250 state_file = self._get_state_file() 

251 try: 

252 if os.path.exists(state_file): 

253 with open(state_file, "r") as f: 

254 data = json.load(f) 

255 # Restore notification targets 

256 for target_data in data.get("notification_targets", []): 

257 self._notification_targets.append( 

258 NotificationTarget( 

259 url=target_data["url"], 

260 methods=target_data.get("methods", []), 

261 headers=target_data.get("headers", {}), 

262 ) 

263 ) 

264 logger.info(f"Loaded gateway state from {state_file}") 

265 except Exception as e: 

266 logger.warning(f"Could not load gateway state: {e}") 

267 

268 def _save_state(self) -> None: 

269 """Persist state to disk.""" 

270 state_file = self._get_state_file() 

271 try: 

272 data = { 

273 "notification_targets": [ 

274 {"url": t.url, "methods": t.methods, "headers": t.headers} 

275 for t in self._notification_targets 

276 ], 

277 "saved_at": datetime.now().isoformat(), 

278 } 

279 with open(state_file, "w") as f: 

280 json.dump(data, f, indent=2) 

281 except Exception as e: 

282 logger.warning(f"Could not save gateway state: {e}") 

283 

284 def _register_builtin_methods(self) -> None: 

285 """Register built-in RPC methods.""" 

286 self.register_method( 

287 "rpc.discover", 

288 self._rpc_discover, 

289 description="List available methods", 

290 ) 

291 self.register_method( 

292 "rpc.describe", 

293 self._rpc_describe, 

294 description="Describe a specific method", 

295 ) 

296 self.register_method( 

297 "gateway.health", 

298 self._gateway_health, 

299 description="Health check endpoint", 

300 ) 

301 self.register_method( 

302 "gateway.stats", 

303 self._gateway_stats, 

304 description="Get gateway statistics", 

305 ) 

306 self.register_method( 

307 "gateway.subscribe", 

308 self._gateway_subscribe, 

309 description="Subscribe to notifications", 

310 ) 

311 self.register_method( 

312 "gateway.unsubscribe", 

313 self._gateway_unsubscribe, 

314 description="Unsubscribe from notifications", 

315 ) 

316 

317 async def _rpc_discover(self, params: Optional[Dict] = None) -> List[Dict]: 

318 """List all available methods.""" 

319 return [ 

320 { 

321 "name": info.name, 

322 "description": info.description, 

323 "requires_auth": info.requires_auth, 

324 } 

325 for info in self._methods.values() 

326 ] 

327 

328 async def _rpc_describe(self, params: Dict) -> Optional[Dict]: 

329 """Describe a specific method.""" 

330 method_name = params.get("method") 

331 if not method_name or method_name not in self._methods: 

332 return None 

333 info = self._methods[method_name] 

334 return { 

335 "name": info.name, 

336 "description": info.description, 

337 "params_schema": info.params_schema, 

338 "requires_auth": info.requires_auth, 

339 } 

340 

341 async def _gateway_health(self, params: Optional[Dict] = None) -> Dict: 

342 """Health check endpoint.""" 

343 return { 

344 "status": "healthy", 

345 "uptime_seconds": time.time() - self._start_time, 

346 "methods_count": len(self._methods), 

347 "container": self.config.container_name, 

348 "network": self.config.docker_network, 

349 } 

350 

351 async def _gateway_stats(self, params: Optional[Dict] = None) -> Dict: 

352 """Get gateway statistics.""" 

353 self._stats.uptime_seconds = time.time() - self._start_time 

354 if self._response_times: 

355 self._stats.avg_response_time_ms = sum(self._response_times) / len(self._response_times) 

356 return self._stats.to_dict() 

357 

358 async def _gateway_subscribe(self, params: Dict) -> Dict: 

359 """Subscribe to notifications.""" 

360 url = params.get("url") 

361 methods = params.get("methods", []) 

362 headers = params.get("headers", {}) 

363 

364 if not url: 

365 raise ValueError("url is required") 

366 

367 target = NotificationTarget(url=url, methods=methods, headers=headers) 

368 self._notification_targets.append(target) 

369 self._save_state() 

370 

371 return {"subscribed": True, "url": url, "methods": methods} 

372 

373 async def _gateway_unsubscribe(self, params: Dict) -> Dict: 

374 """Unsubscribe from notifications.""" 

375 url = params.get("url") 

376 if not url: 

377 raise ValueError("url is required") 

378 

379 original_count = len(self._notification_targets) 

380 self._notification_targets = [t for t in self._notification_targets if t.url != url] 

381 removed = original_count - len(self._notification_targets) 

382 self._save_state() 

383 

384 return {"unsubscribed": True, "url": url, "removed_count": removed} 

385 

386 def register_method( 

387 self, 

388 name: str, 

389 handler: Callable[..., Coroutine[Any, Any, Any]], 

390 description: str = "", 

391 params_schema: Optional[Dict[str, Any]] = None, 

392 requires_auth: bool = False, 

393 ) -> None: 

394 """ 

395 Register a method handler. 

396 

397 Args: 

398 name: Method name (e.g., "channel.send", "agent.process") 

399 handler: Async function to handle the method call 

400 description: Human-readable description 

401 params_schema: JSON schema for parameters validation 

402 requires_auth: Whether this method requires authentication 

403 """ 

404 self._methods[name] = MethodInfo( 

405 name=name, 

406 handler=handler, 

407 description=description, 

408 params_schema=params_schema, 

409 requires_auth=requires_auth, 

410 ) 

411 logger.debug(f"Registered method: {name}") 

412 

413 def unregister_method(self, name: str) -> bool: 

414 """Unregister a method.""" 

415 if name in self._methods: 

416 del self._methods[name] 

417 return True 

418 return False 

419 

420 def _validate_request(self, data: Dict[str, Any]) -> Optional[JsonRpcError]: 

421 """Validate JSON-RPC request structure.""" 

422 if not isinstance(data, dict): 

423 return JsonRpcError( 

424 code=JsonRpcErrorCode.INVALID_REQUEST.value, 

425 message="Request must be an object", 

426 ) 

427 

428 if data.get("jsonrpc") != "2.0": 

429 return JsonRpcError( 

430 code=JsonRpcErrorCode.INVALID_REQUEST.value, 

431 message="jsonrpc must be '2.0'", 

432 ) 

433 

434 if "method" not in data or not isinstance(data["method"], str): 

435 return JsonRpcError( 

436 code=JsonRpcErrorCode.INVALID_REQUEST.value, 

437 message="method must be a string", 

438 ) 

439 

440 params = data.get("params") 

441 if params is not None and not isinstance(params, (dict, list)): 

442 return JsonRpcError( 

443 code=JsonRpcErrorCode.INVALID_PARAMS.value, 

444 message="params must be an object or array", 

445 ) 

446 

447 return None 

448 

449 def _check_auth(self, request: Dict[str, Any], method_info: MethodInfo) -> Optional[JsonRpcError]: 

450 """Check authentication if required.""" 

451 if not method_info.requires_auth: 

452 return None 

453 

454 if not self.config.require_auth: 

455 return None 

456 

457 # Check for API key in params or headers (passed via metadata) 

458 api_key = None 

459 params = request.get("params", {}) 

460 if isinstance(params, dict): 

461 api_key = params.get("_api_key") or params.get("api_key") 

462 

463 if not api_key or api_key not in self.config.api_keys: 

464 return JsonRpcError( 

465 code=JsonRpcErrorCode.UNAUTHORIZED.value, 

466 message="Unauthorized: invalid or missing API key", 

467 ) 

468 

469 return None 

470 

471 async def handle_request(self, request: Union[Dict, str, bytes]) -> Dict[str, Any]: 

472 """ 

473 Handle a JSON-RPC request. 

474 

475 Args: 

476 request: JSON-RPC request (dict, JSON string, or bytes) 

477 

478 Returns: 

479 JSON-RPC response as dictionary 

480 """ 

481 start_time = time.time() 

482 self._stats.total_requests += 1 

483 

484 # Parse if string/bytes 

485 if isinstance(request, (str, bytes)): 

486 try: 

487 request = json.loads(request) 

488 except json.JSONDecodeError as e: 

489 self._stats.total_errors += 1 

490 return JsonRpcResponse.error_response( 

491 None, 

492 JsonRpcErrorCode.PARSE_ERROR.value, 

493 f"Parse error: {e}", 

494 ).to_dict() 

495 

496 # Handle batch requests 

497 if isinstance(request, list): 

498 if not request: 

499 return JsonRpcResponse.error_response( 

500 None, 

501 JsonRpcErrorCode.INVALID_REQUEST.value, 

502 "Empty batch request", 

503 ).to_dict() 

504 responses = await asyncio.gather( 

505 *[self._handle_single_request(r) for r in request] 

506 ) 

507 # Filter out None responses (notifications) 

508 return [r for r in responses if r is not None] 

509 

510 # Handle single request 

511 response = await self._handle_single_request(request) 

512 

513 # Track response time 

514 elapsed_ms = (time.time() - start_time) * 1000 

515 self._response_times.append(elapsed_ms) 

516 # Keep only last 1000 response times 

517 if len(self._response_times) > 1000: 

518 self._response_times = self._response_times[-1000:] 

519 

520 return response 

521 

522 async def _handle_single_request(self, request: Dict[str, Any]) -> Optional[Dict[str, Any]]: 

523 """Handle a single JSON-RPC request.""" 

524 # Validate structure 

525 error = self._validate_request(request) 

526 if error: 

527 self._stats.total_errors += 1 

528 return JsonRpcResponse.error_response( 

529 request.get("id"), 

530 error.code, 

531 error.message, 

532 error.data, 

533 ).to_dict() 

534 

535 method_name = request["method"] 

536 params = request.get("params") 

537 request_id = request.get("id") 

538 is_notification = request_id is None 

539 

540 # Check method exists 

541 if method_name not in self._methods: 

542 self._stats.total_errors += 1 

543 if is_notification: 

544 return None 

545 return JsonRpcResponse.error_response( 

546 request_id, 

547 JsonRpcErrorCode.METHOD_NOT_FOUND.value, 

548 f"Method not found: {method_name}", 

549 ).to_dict() 

550 

551 method_info = self._methods[method_name] 

552 

553 # Check authentication 

554 auth_error = self._check_auth(request, method_info) 

555 if auth_error: 

556 self._stats.total_errors += 1 

557 if is_notification: 

558 return None 

559 return JsonRpcResponse.error_response( 

560 request_id, 

561 auth_error.code, 

562 auth_error.message, 

563 auth_error.data, 

564 ).to_dict() 

565 

566 # Track method calls 

567 self._stats.methods_called[method_name] = ( 

568 self._stats.methods_called.get(method_name, 0) + 1 

569 ) 

570 

571 # Execute handler 

572 try: 

573 if params is None: 

574 result = await method_info.handler() 

575 elif isinstance(params, dict): 

576 result = await method_info.handler(params) 

577 else: 

578 result = await method_info.handler(*params) 

579 

580 if is_notification: 

581 self._stats.total_notifications += 1 

582 return None 

583 

584 return JsonRpcResponse.success(request_id, result).to_dict() 

585 

586 except Exception as e: 

587 self._stats.total_errors += 1 

588 logger.exception(f"Error executing method {method_name}: {e}") 

589 if is_notification: 

590 return None 

591 return JsonRpcResponse.error_response( 

592 request_id, 

593 JsonRpcErrorCode.INTERNAL_ERROR.value, 

594 f"Internal error: {str(e)}", 

595 ).to_dict() 

596 

597 async def send_notification(self, method: str, params: Optional[Dict] = None) -> None: 

598 """ 

599 Send a notification to all subscribed targets. 

600 

601 Notifications are fire-and-forget (no response expected). 

602 

603 Args: 

604 method: Notification method name 

605 params: Notification parameters 

606 """ 

607 notification = JsonRpcRequest( 

608 method=method, 

609 params=params, 

610 id=None, # Notifications have no id 

611 ) 

612 

613 payload = json.dumps(notification.to_dict()) 

614 

615 # Send to all matching targets 

616 for target in self._notification_targets: 

617 # Check if target subscribes to this method 

618 if target.methods and method not in target.methods: 

619 continue 

620 

621 try: 

622 await self._send_to_target(target, payload) 

623 except Exception as e: 

624 logger.warning(f"Failed to send notification to {target.url}: {e}") 

625 

626 async def _send_to_target(self, target: NotificationTarget, payload: str) -> None: 

627 """Send notification to a target URL.""" 

628 # Use aiohttp if available, otherwise log the intent 

629 try: 

630 import aiohttp 

631 async with aiohttp.ClientSession() as session: 

632 headers = {"Content-Type": "application/json"} 

633 headers.update(target.headers) 

634 async with session.post( 

635 target.url, 

636 data=payload, 

637 headers=headers, 

638 timeout=aiohttp.ClientTimeout(total=5), 

639 ) as response: 

640 if response.status >= 400: 

641 logger.warning( 

642 f"Notification target {target.url} returned {response.status}" 

643 ) 

644 except ImportError: 

645 logger.debug(f"Would send notification to {target.url}: {payload}") 

646 

647 async def call( 

648 self, 

649 target_url: str, 

650 method: str, 

651 params: Optional[Dict] = None, 

652 timeout_ms: Optional[int] = None, 

653 ) -> Any: 

654 """ 

655 Make an RPC call to another service. 

656 

657 Args: 

658 target_url: URL of the target service 

659 method: Method to call 

660 params: Parameters for the method 

661 timeout_ms: Timeout in milliseconds 

662 

663 Returns: 

664 Result from the method call 

665 

666 Raises: 

667 Exception: If the call fails or times out 

668 """ 

669 request_id = str(uuid.uuid4()) 

670 request = JsonRpcRequest( 

671 method=method, 

672 params=params, 

673 id=request_id, 

674 ) 

675 

676 timeout = (timeout_ms or self.config.request_timeout_ms) / 1000 

677 

678 try: 

679 import aiohttp 

680 async with aiohttp.ClientSession() as session: 

681 async with session.post( 

682 target_url, 

683 json=request.to_dict(), 

684 headers={"Content-Type": "application/json"}, 

685 timeout=aiohttp.ClientTimeout(total=timeout), 

686 ) as response: 

687 data = await response.json() 

688 

689 if "error" in data: 

690 error = data["error"] 

691 raise Exception(f"RPC error {error.get('code')}: {error.get('message')}") 

692 

693 return data.get("result") 

694 except ImportError: 

695 raise ImportError("aiohttp is required for RPC calls") 

696 

697 def get_docker_address(self) -> str: 

698 """ 

699 Get the Docker-friendly address for this gateway. 

700 

701 Returns address in format suitable for Docker networking. 

702 """ 

703 if self.config.container_name and self.config.docker_network: 

704 return f"http://{self.config.container_name}:{self.config.port}" 

705 return f"http://{self.config.host}:{self.config.port}" 

706 

707 def get_methods(self) -> List[str]: 

708 """Get list of registered method names.""" 

709 return list(self._methods.keys()) 

710 

711 def get_stats(self) -> GatewayStats: 

712 """Get current gateway statistics.""" 

713 self._stats.uptime_seconds = time.time() - self._start_time 

714 return self._stats 

715 

716 

717# Singleton instance 

718_gateway: Optional[GatewayProtocol] = None 

719 

720 

721def get_gateway(config: Optional[GatewayConfig] = None) -> GatewayProtocol: 

722 """Get or create the global gateway instance.""" 

723 global _gateway 

724 if _gateway is None: 

725 _gateway = GatewayProtocol(config) 

726 return _gateway