Coverage for integrations / channels / gateway / protocol.py: 0.0%
316 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Gateway Protocol - JSON-RPC 2.0 Based Gateway
4Provides a JSON-RPC 2.0 compliant protocol for inter-service communication
5in Docker container networks. Handles method registration, request routing,
6notifications, and error handling.
8Features:
9- JSON-RPC 2.0 compliance
10- Method registration with handlers
11- Async request/response handling
12- Notification support (no response expected)
13- Docker network addressing support
14- Container-friendly persistence
15- Error handling with standard codes
16"""
18from __future__ import annotations
20import asyncio
21import json
22import logging
23import os
24import time
25import uuid
26from dataclasses import dataclass, field, asdict
27from datetime import datetime
28from enum import Enum
29from typing import Any, Callable, Coroutine, Dict, List, Optional, Union
31logger = logging.getLogger(__name__)
34class JsonRpcErrorCode(Enum):
35 """Standard JSON-RPC 2.0 error codes."""
36 PARSE_ERROR = -32700
37 INVALID_REQUEST = -32600
38 METHOD_NOT_FOUND = -32601
39 INVALID_PARAMS = -32602
40 INTERNAL_ERROR = -32603
41 # Custom error codes (application-specific)
42 TIMEOUT_ERROR = -32000
43 UNAUTHORIZED = -32001
44 RATE_LIMITED = -32002
45 SERVICE_UNAVAILABLE = -32003
48@dataclass
49class JsonRpcError:
50 """JSON-RPC 2.0 error object."""
51 code: int
52 message: str
53 data: Optional[Any] = None
55 def to_dict(self) -> Dict[str, Any]:
56 result = {"code": self.code, "message": self.message}
57 if self.data is not None:
58 result["data"] = self.data
59 return result
62@dataclass
63class JsonRpcRequest:
64 """JSON-RPC 2.0 request object."""
65 method: str
66 params: Optional[Union[Dict, List]] = None
67 id: Optional[Union[str, int]] = None
68 jsonrpc: str = "2.0"
70 def is_notification(self) -> bool:
71 """Check if this is a notification (no id = no response expected)."""
72 return self.id is None
74 def to_dict(self) -> Dict[str, Any]:
75 result = {"jsonrpc": self.jsonrpc, "method": self.method}
76 if self.params is not None:
77 result["params"] = self.params
78 if self.id is not None:
79 result["id"] = self.id
80 return result
82 @classmethod
83 def from_dict(cls, data: Dict[str, Any]) -> JsonRpcRequest:
84 return cls(
85 method=data.get("method", ""),
86 params=data.get("params"),
87 id=data.get("id"),
88 jsonrpc=data.get("jsonrpc", "2.0"),
89 )
92@dataclass
93class JsonRpcResponse:
94 """JSON-RPC 2.0 response object."""
95 id: Optional[Union[str, int]]
96 result: Optional[Any] = None
97 error: Optional[JsonRpcError] = None
98 jsonrpc: str = "2.0"
100 def to_dict(self) -> Dict[str, Any]:
101 result = {"jsonrpc": self.jsonrpc, "id": self.id}
102 if self.error is not None:
103 result["error"] = self.error.to_dict()
104 else:
105 result["result"] = self.result
106 return result
108 @classmethod
109 def success(cls, request_id: Union[str, int], result: Any) -> JsonRpcResponse:
110 return cls(id=request_id, result=result)
112 @classmethod
113 def error_response(
114 cls,
115 request_id: Optional[Union[str, int]],
116 code: int,
117 message: str,
118 data: Optional[Any] = None
119 ) -> JsonRpcResponse:
120 return cls(id=request_id, error=JsonRpcError(code=code, message=message, data=data))
123@dataclass
124class GatewayConfig:
125 """Configuration for the gateway protocol."""
126 host: str = "0.0.0.0" # Bind to all interfaces for Docker
127 port: int = 9000
128 # Docker network settings
129 docker_network: Optional[str] = None
130 container_name: Optional[str] = None
131 # Persistence path (should be volume-mounted in Docker)
132 persistence_path: Optional[str] = None
133 # Timeouts
134 request_timeout_ms: int = 30000
135 connect_timeout_ms: int = 5000
136 # Security
137 require_auth: bool = False
138 api_keys: List[str] = field(default_factory=list)
139 # Rate limiting
140 rate_limit_per_second: int = 100
142 def get_persistence_path(self) -> str:
143 """Get persistence path, defaulting to Docker-friendly location."""
144 if self.persistence_path:
145 return self.persistence_path
146 # Default to volume-mounted data directory
147 import sys as _sys
148 if os.environ.get('NUNBA_BUNDLED') or getattr(_sys, 'frozen', False):
149 try:
150 from core.platform_paths import get_agent_data_dir
151 _default = os.path.join(get_agent_data_dir(), 'gateway')
152 except ImportError:
153 _default = os.path.join(os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'agent_data', 'gateway')
154 elif os.path.exists("/app"):
155 _default = "/app/data/gateway"
156 else:
157 _default = "./agent_data/gateway"
158 return os.environ.get("GATEWAY_DATA_PATH", _default)
161@dataclass
162class MethodInfo:
163 """Information about a registered method."""
164 name: str
165 handler: Callable
166 description: str = ""
167 params_schema: Optional[Dict[str, Any]] = None
168 requires_auth: bool = False
171@dataclass
172class NotificationTarget:
173 """Target for notifications."""
174 url: str
175 methods: List[str] = field(default_factory=list)
176 headers: Dict[str, str] = field(default_factory=dict)
179@dataclass
180class GatewayStats:
181 """Gateway statistics."""
182 total_requests: int = 0
183 total_notifications: int = 0
184 total_errors: int = 0
185 methods_called: Dict[str, int] = field(default_factory=dict)
186 avg_response_time_ms: float = 0.0
187 uptime_seconds: float = 0.0
189 def to_dict(self) -> Dict[str, Any]:
190 return asdict(self)
193class GatewayProtocol:
194 """
195 JSON-RPC 2.0 based gateway for inter-service communication.
197 Designed for Docker container environments with support for:
198 - Container network addressing
199 - Volume-mounted persistence
200 - Health checks
201 - Service discovery
203 Usage:
204 gateway = GatewayProtocol()
206 # Register methods
207 gateway.register_method("echo", echo_handler)
208 gateway.register_method("process", process_handler, requires_auth=True)
210 # Handle requests
211 response = await gateway.handle_request({"jsonrpc": "2.0", "method": "echo", "params": {"msg": "hi"}, "id": 1})
213 # Send notifications
214 await gateway.send_notification("event.message", {"channel": "telegram", "text": "Hello"})
215 """
217 def __init__(self, config: Optional[GatewayConfig] = None):
218 self.config = config or GatewayConfig()
219 self._methods: Dict[str, MethodInfo] = {}
220 self._notification_targets: List[NotificationTarget] = []
221 self._pending_requests: Dict[str, asyncio.Future] = {}
222 self._start_time = time.time()
223 self._stats = GatewayStats()
224 self._response_times: List[float] = []
225 self._running = False
227 # Ensure persistence directory exists
228 self._ensure_persistence_dir()
230 # Load state
231 self._load_state()
233 # Register built-in methods
234 self._register_builtin_methods()
236 def _ensure_persistence_dir(self) -> None:
237 """Ensure persistence directory exists (for Docker volumes)."""
238 path = self.config.get_persistence_path()
239 try:
240 os.makedirs(path, exist_ok=True)
241 except Exception as e:
242 logger.warning(f"Could not create persistence directory {path}: {e}")
244 def _get_state_file(self) -> str:
245 """Get path to state file."""
246 return os.path.join(self.config.get_persistence_path(), "gateway_state.json")
248 def _load_state(self) -> None:
249 """Load persisted state."""
250 state_file = self._get_state_file()
251 try:
252 if os.path.exists(state_file):
253 with open(state_file, "r") as f:
254 data = json.load(f)
255 # Restore notification targets
256 for target_data in data.get("notification_targets", []):
257 self._notification_targets.append(
258 NotificationTarget(
259 url=target_data["url"],
260 methods=target_data.get("methods", []),
261 headers=target_data.get("headers", {}),
262 )
263 )
264 logger.info(f"Loaded gateway state from {state_file}")
265 except Exception as e:
266 logger.warning(f"Could not load gateway state: {e}")
268 def _save_state(self) -> None:
269 """Persist state to disk."""
270 state_file = self._get_state_file()
271 try:
272 data = {
273 "notification_targets": [
274 {"url": t.url, "methods": t.methods, "headers": t.headers}
275 for t in self._notification_targets
276 ],
277 "saved_at": datetime.now().isoformat(),
278 }
279 with open(state_file, "w") as f:
280 json.dump(data, f, indent=2)
281 except Exception as e:
282 logger.warning(f"Could not save gateway state: {e}")
284 def _register_builtin_methods(self) -> None:
285 """Register built-in RPC methods."""
286 self.register_method(
287 "rpc.discover",
288 self._rpc_discover,
289 description="List available methods",
290 )
291 self.register_method(
292 "rpc.describe",
293 self._rpc_describe,
294 description="Describe a specific method",
295 )
296 self.register_method(
297 "gateway.health",
298 self._gateway_health,
299 description="Health check endpoint",
300 )
301 self.register_method(
302 "gateway.stats",
303 self._gateway_stats,
304 description="Get gateway statistics",
305 )
306 self.register_method(
307 "gateway.subscribe",
308 self._gateway_subscribe,
309 description="Subscribe to notifications",
310 )
311 self.register_method(
312 "gateway.unsubscribe",
313 self._gateway_unsubscribe,
314 description="Unsubscribe from notifications",
315 )
317 async def _rpc_discover(self, params: Optional[Dict] = None) -> List[Dict]:
318 """List all available methods."""
319 return [
320 {
321 "name": info.name,
322 "description": info.description,
323 "requires_auth": info.requires_auth,
324 }
325 for info in self._methods.values()
326 ]
328 async def _rpc_describe(self, params: Dict) -> Optional[Dict]:
329 """Describe a specific method."""
330 method_name = params.get("method")
331 if not method_name or method_name not in self._methods:
332 return None
333 info = self._methods[method_name]
334 return {
335 "name": info.name,
336 "description": info.description,
337 "params_schema": info.params_schema,
338 "requires_auth": info.requires_auth,
339 }
341 async def _gateway_health(self, params: Optional[Dict] = None) -> Dict:
342 """Health check endpoint."""
343 return {
344 "status": "healthy",
345 "uptime_seconds": time.time() - self._start_time,
346 "methods_count": len(self._methods),
347 "container": self.config.container_name,
348 "network": self.config.docker_network,
349 }
351 async def _gateway_stats(self, params: Optional[Dict] = None) -> Dict:
352 """Get gateway statistics."""
353 self._stats.uptime_seconds = time.time() - self._start_time
354 if self._response_times:
355 self._stats.avg_response_time_ms = sum(self._response_times) / len(self._response_times)
356 return self._stats.to_dict()
358 async def _gateway_subscribe(self, params: Dict) -> Dict:
359 """Subscribe to notifications."""
360 url = params.get("url")
361 methods = params.get("methods", [])
362 headers = params.get("headers", {})
364 if not url:
365 raise ValueError("url is required")
367 target = NotificationTarget(url=url, methods=methods, headers=headers)
368 self._notification_targets.append(target)
369 self._save_state()
371 return {"subscribed": True, "url": url, "methods": methods}
373 async def _gateway_unsubscribe(self, params: Dict) -> Dict:
374 """Unsubscribe from notifications."""
375 url = params.get("url")
376 if not url:
377 raise ValueError("url is required")
379 original_count = len(self._notification_targets)
380 self._notification_targets = [t for t in self._notification_targets if t.url != url]
381 removed = original_count - len(self._notification_targets)
382 self._save_state()
384 return {"unsubscribed": True, "url": url, "removed_count": removed}
386 def register_method(
387 self,
388 name: str,
389 handler: Callable[..., Coroutine[Any, Any, Any]],
390 description: str = "",
391 params_schema: Optional[Dict[str, Any]] = None,
392 requires_auth: bool = False,
393 ) -> None:
394 """
395 Register a method handler.
397 Args:
398 name: Method name (e.g., "channel.send", "agent.process")
399 handler: Async function to handle the method call
400 description: Human-readable description
401 params_schema: JSON schema for parameters validation
402 requires_auth: Whether this method requires authentication
403 """
404 self._methods[name] = MethodInfo(
405 name=name,
406 handler=handler,
407 description=description,
408 params_schema=params_schema,
409 requires_auth=requires_auth,
410 )
411 logger.debug(f"Registered method: {name}")
413 def unregister_method(self, name: str) -> bool:
414 """Unregister a method."""
415 if name in self._methods:
416 del self._methods[name]
417 return True
418 return False
420 def _validate_request(self, data: Dict[str, Any]) -> Optional[JsonRpcError]:
421 """Validate JSON-RPC request structure."""
422 if not isinstance(data, dict):
423 return JsonRpcError(
424 code=JsonRpcErrorCode.INVALID_REQUEST.value,
425 message="Request must be an object",
426 )
428 if data.get("jsonrpc") != "2.0":
429 return JsonRpcError(
430 code=JsonRpcErrorCode.INVALID_REQUEST.value,
431 message="jsonrpc must be '2.0'",
432 )
434 if "method" not in data or not isinstance(data["method"], str):
435 return JsonRpcError(
436 code=JsonRpcErrorCode.INVALID_REQUEST.value,
437 message="method must be a string",
438 )
440 params = data.get("params")
441 if params is not None and not isinstance(params, (dict, list)):
442 return JsonRpcError(
443 code=JsonRpcErrorCode.INVALID_PARAMS.value,
444 message="params must be an object or array",
445 )
447 return None
449 def _check_auth(self, request: Dict[str, Any], method_info: MethodInfo) -> Optional[JsonRpcError]:
450 """Check authentication if required."""
451 if not method_info.requires_auth:
452 return None
454 if not self.config.require_auth:
455 return None
457 # Check for API key in params or headers (passed via metadata)
458 api_key = None
459 params = request.get("params", {})
460 if isinstance(params, dict):
461 api_key = params.get("_api_key") or params.get("api_key")
463 if not api_key or api_key not in self.config.api_keys:
464 return JsonRpcError(
465 code=JsonRpcErrorCode.UNAUTHORIZED.value,
466 message="Unauthorized: invalid or missing API key",
467 )
469 return None
471 async def handle_request(self, request: Union[Dict, str, bytes]) -> Dict[str, Any]:
472 """
473 Handle a JSON-RPC request.
475 Args:
476 request: JSON-RPC request (dict, JSON string, or bytes)
478 Returns:
479 JSON-RPC response as dictionary
480 """
481 start_time = time.time()
482 self._stats.total_requests += 1
484 # Parse if string/bytes
485 if isinstance(request, (str, bytes)):
486 try:
487 request = json.loads(request)
488 except json.JSONDecodeError as e:
489 self._stats.total_errors += 1
490 return JsonRpcResponse.error_response(
491 None,
492 JsonRpcErrorCode.PARSE_ERROR.value,
493 f"Parse error: {e}",
494 ).to_dict()
496 # Handle batch requests
497 if isinstance(request, list):
498 if not request:
499 return JsonRpcResponse.error_response(
500 None,
501 JsonRpcErrorCode.INVALID_REQUEST.value,
502 "Empty batch request",
503 ).to_dict()
504 responses = await asyncio.gather(
505 *[self._handle_single_request(r) for r in request]
506 )
507 # Filter out None responses (notifications)
508 return [r for r in responses if r is not None]
510 # Handle single request
511 response = await self._handle_single_request(request)
513 # Track response time
514 elapsed_ms = (time.time() - start_time) * 1000
515 self._response_times.append(elapsed_ms)
516 # Keep only last 1000 response times
517 if len(self._response_times) > 1000:
518 self._response_times = self._response_times[-1000:]
520 return response
522 async def _handle_single_request(self, request: Dict[str, Any]) -> Optional[Dict[str, Any]]:
523 """Handle a single JSON-RPC request."""
524 # Validate structure
525 error = self._validate_request(request)
526 if error:
527 self._stats.total_errors += 1
528 return JsonRpcResponse.error_response(
529 request.get("id"),
530 error.code,
531 error.message,
532 error.data,
533 ).to_dict()
535 method_name = request["method"]
536 params = request.get("params")
537 request_id = request.get("id")
538 is_notification = request_id is None
540 # Check method exists
541 if method_name not in self._methods:
542 self._stats.total_errors += 1
543 if is_notification:
544 return None
545 return JsonRpcResponse.error_response(
546 request_id,
547 JsonRpcErrorCode.METHOD_NOT_FOUND.value,
548 f"Method not found: {method_name}",
549 ).to_dict()
551 method_info = self._methods[method_name]
553 # Check authentication
554 auth_error = self._check_auth(request, method_info)
555 if auth_error:
556 self._stats.total_errors += 1
557 if is_notification:
558 return None
559 return JsonRpcResponse.error_response(
560 request_id,
561 auth_error.code,
562 auth_error.message,
563 auth_error.data,
564 ).to_dict()
566 # Track method calls
567 self._stats.methods_called[method_name] = (
568 self._stats.methods_called.get(method_name, 0) + 1
569 )
571 # Execute handler
572 try:
573 if params is None:
574 result = await method_info.handler()
575 elif isinstance(params, dict):
576 result = await method_info.handler(params)
577 else:
578 result = await method_info.handler(*params)
580 if is_notification:
581 self._stats.total_notifications += 1
582 return None
584 return JsonRpcResponse.success(request_id, result).to_dict()
586 except Exception as e:
587 self._stats.total_errors += 1
588 logger.exception(f"Error executing method {method_name}: {e}")
589 if is_notification:
590 return None
591 return JsonRpcResponse.error_response(
592 request_id,
593 JsonRpcErrorCode.INTERNAL_ERROR.value,
594 f"Internal error: {str(e)}",
595 ).to_dict()
597 async def send_notification(self, method: str, params: Optional[Dict] = None) -> None:
598 """
599 Send a notification to all subscribed targets.
601 Notifications are fire-and-forget (no response expected).
603 Args:
604 method: Notification method name
605 params: Notification parameters
606 """
607 notification = JsonRpcRequest(
608 method=method,
609 params=params,
610 id=None, # Notifications have no id
611 )
613 payload = json.dumps(notification.to_dict())
615 # Send to all matching targets
616 for target in self._notification_targets:
617 # Check if target subscribes to this method
618 if target.methods and method not in target.methods:
619 continue
621 try:
622 await self._send_to_target(target, payload)
623 except Exception as e:
624 logger.warning(f"Failed to send notification to {target.url}: {e}")
626 async def _send_to_target(self, target: NotificationTarget, payload: str) -> None:
627 """Send notification to a target URL."""
628 # Use aiohttp if available, otherwise log the intent
629 try:
630 import aiohttp
631 async with aiohttp.ClientSession() as session:
632 headers = {"Content-Type": "application/json"}
633 headers.update(target.headers)
634 async with session.post(
635 target.url,
636 data=payload,
637 headers=headers,
638 timeout=aiohttp.ClientTimeout(total=5),
639 ) as response:
640 if response.status >= 400:
641 logger.warning(
642 f"Notification target {target.url} returned {response.status}"
643 )
644 except ImportError:
645 logger.debug(f"Would send notification to {target.url}: {payload}")
647 async def call(
648 self,
649 target_url: str,
650 method: str,
651 params: Optional[Dict] = None,
652 timeout_ms: Optional[int] = None,
653 ) -> Any:
654 """
655 Make an RPC call to another service.
657 Args:
658 target_url: URL of the target service
659 method: Method to call
660 params: Parameters for the method
661 timeout_ms: Timeout in milliseconds
663 Returns:
664 Result from the method call
666 Raises:
667 Exception: If the call fails or times out
668 """
669 request_id = str(uuid.uuid4())
670 request = JsonRpcRequest(
671 method=method,
672 params=params,
673 id=request_id,
674 )
676 timeout = (timeout_ms or self.config.request_timeout_ms) / 1000
678 try:
679 import aiohttp
680 async with aiohttp.ClientSession() as session:
681 async with session.post(
682 target_url,
683 json=request.to_dict(),
684 headers={"Content-Type": "application/json"},
685 timeout=aiohttp.ClientTimeout(total=timeout),
686 ) as response:
687 data = await response.json()
689 if "error" in data:
690 error = data["error"]
691 raise Exception(f"RPC error {error.get('code')}: {error.get('message')}")
693 return data.get("result")
694 except ImportError:
695 raise ImportError("aiohttp is required for RPC calls")
697 def get_docker_address(self) -> str:
698 """
699 Get the Docker-friendly address for this gateway.
701 Returns address in format suitable for Docker networking.
702 """
703 if self.config.container_name and self.config.docker_network:
704 return f"http://{self.config.container_name}:{self.config.port}"
705 return f"http://{self.config.host}:{self.config.port}"
707 def get_methods(self) -> List[str]:
708 """Get list of registered method names."""
709 return list(self._methods.keys())
711 def get_stats(self) -> GatewayStats:
712 """Get current gateway statistics."""
713 self._stats.uptime_seconds = time.time() - self._start_time
714 return self._stats
717# Singleton instance
718_gateway: Optional[GatewayProtocol] = None
721def get_gateway(config: Optional[GatewayConfig] = None) -> GatewayProtocol:
722 """Get or create the global gateway instance."""
723 global _gateway
724 if _gateway is None:
725 _gateway = GatewayProtocol(config)
726 return _gateway