Coverage for integrations / service_tools / registry.py: 38.2%
152 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Service Tool Registry — follows MCPToolRegistry pattern (mcp/mcp_integration.py)
3but for any HTTP microservice (not just MCP protocol servers).
5Design:
6- ServiceToolInfo describes a tool's endpoints, auth, and health check
7- ServiceToolRegistry manages discovery, health, and function generation
8- Global singleton: service_tool_registry (mirrors mcp_registry)
9- Uses core.http_pool for connection pooling (same as MCP)
10"""
12import json
13import logging
14import os
15from dataclasses import dataclass, field
16from datetime import datetime
17from typing import Dict, List, Any, Optional, Callable
19logger = logging.getLogger(__name__)
21try:
22 from core.labeled_tool import labeled_tool
23except ImportError: # cx_Freeze / degraded test env
24 def labeled_tool(name, func, description, *, ui_label): # type: ignore
25 from langchain.agents import Tool as _Tool
26 return _Tool(name=name, func=func, description=description)
29# Friendly UI status labels per known service tool name fragment.
30# Used by `get_langchain_tools()` to register Tool() display strings.
31_SERVICE_TOOL_LABEL_HINTS = {
32 "crawl4ai": "Crawling web content…",
33 "acestep": "Generating music…",
34 "ace_step": "Generating music…",
35 "ace-step": "Generating music…",
36 "diffrhythm": "Generating song…",
37 "rembg": "Removing background…",
38 "omniparser": "Parsing screen UI…",
39 "wan2gp": "Generating video…",
40 "f5": "Speaking with F5 voice…",
41 "kokoro": "Speaking with Kokoro voice…",
42 "cosyvoice": "Speaking with CosyVoice…",
43 "chatterbox": "Speaking with Chatterbox…",
44 "indic_parler": "Speaking Indic voice…",
45 "indic-parler": "Speaking Indic voice…",
46 "melotts": "Speaking with MeloTTS…",
47 "mms_tts": "Speaking with MMS TTS…",
48 "mms-tts": "Speaking with MMS TTS…",
49 "neutts": "Speaking with NeuTTS…",
50 "luxtts": "Speaking with LuxTTS…",
51 "xtts": "Speaking with XTTS…",
52 "omnivoice": "Speaking with OmniVoice…",
53 "pocket_tts": "Speaking with PocketTTS…",
54 "pocket-tts": "Speaking with PocketTTS…",
55 "tts_audio_suite": "Synthesising speech…",
56 "whisper": "Transcribing audio…",
57}
60def _derive_service_tool_label(tool_name: str, ep_name: str) -> str:
61 """Resolve a ≤60-char human label for a service-tool endpoint."""
62 key = (tool_name or "").lower()
63 for hint, label in _SERVICE_TOOL_LABEL_HINTS.items():
64 if hint in key:
65 return label[:60]
66 return (f"Calling {tool_name} service…")[:60]
69@dataclass
70class ServiceToolInfo:
71 """Metadata for a registered service tool."""
72 name: str
73 description: str
74 base_url: str
75 endpoints: Dict[str, Dict[str, Any]] # endpoint_name -> {path, method, description, params_schema}
76 health_endpoint: str = "/health"
77 version: str = "1.0.0"
78 tags: List[str] = field(default_factory=list)
79 api_key: Optional[str] = None
80 api_key_header: str = "Authorization"
81 timeout: int = 30
82 is_healthy: bool = False
83 registered_at: Optional[str] = None
85 def to_dict(self) -> Dict[str, Any]:
86 return {
87 "name": self.name,
88 "description": self.description,
89 "base_url": self.base_url,
90 "endpoints": self.endpoints,
91 "health_endpoint": self.health_endpoint,
92 "version": self.version,
93 "tags": self.tags,
94 "api_key_header": self.api_key_header,
95 "timeout": self.timeout,
96 }
98 @classmethod
99 def from_dict(cls, data: Dict[str, Any]) -> 'ServiceToolInfo':
100 return cls(
101 name=data["name"],
102 description=data.get("description", ""),
103 base_url=data["base_url"],
104 endpoints=data.get("endpoints", {}),
105 health_endpoint=data.get("health_endpoint", "/health"),
106 version=data.get("version", "1.0.0"),
107 tags=data.get("tags", []),
108 api_key=data.get("api_key"),
109 api_key_header=data.get("api_key_header", "Authorization"),
110 timeout=data.get("timeout", 30),
111 )
114class ServiceToolRegistry:
115 """
116 Registry for HTTP microservice tools.
118 Mirrors MCPToolRegistry (mcp/mcp_integration.py:185-315):
119 - add_server → register_tool (with health check)
120 - create_tool_function → create_endpoint_function
121 - get_all_tool_functions → same signature
122 - Global singleton: service_tool_registry
123 """
125 def __init__(self, config_file: str = "service_tools.json"):
126 self._tools: Dict[str, ServiceToolInfo] = {}
127 self._config_file = config_file
129 def register_tool(self, tool_info: ServiceToolInfo) -> bool:
130 """Register a tool. Health-checks first; skips if service is down."""
131 if tool_info.name in self._tools:
132 logger.info(f"Service tool '{tool_info.name}' already registered, skipping")
133 return True
135 tool_info.is_healthy = self._health_check(tool_info)
136 tool_info.registered_at = datetime.now().isoformat()
137 self._tools[tool_info.name] = tool_info
139 status = "healthy" if tool_info.is_healthy else "unhealthy (registered anyway)"
140 logger.info(f"Registered service tool: {tool_info.name} [{status}]")
141 return True
143 def unregister_tool(self, name: str) -> bool:
144 if name in self._tools:
145 del self._tools[name]
146 logger.info(f"Unregistered service tool: {name}")
147 return True
148 return False
150 def _health_check(self, tool_info: ServiceToolInfo) -> bool:
151 """Check if service is reachable."""
152 try:
153 from core.http_pool import pooled_get
154 headers = {}
155 if tool_info.api_key:
156 headers[tool_info.api_key_header] = f"Bearer {tool_info.api_key}"
158 response = pooled_get(
159 f"{tool_info.base_url.rstrip('/')}{tool_info.health_endpoint}",
160 headers=headers,
161 timeout=5,
162 )
163 return response.status_code == 200
164 except Exception as e:
165 logger.debug(f"Health check failed for {tool_info.name}: {e}")
166 return False
168 def health_check(self, name: str) -> bool:
169 """Re-check health for a specific tool."""
170 tool = self._tools.get(name)
171 if not tool:
172 return False
173 tool.is_healthy = self._health_check(tool)
174 return tool.is_healthy
176 def health_check_all(self) -> Dict[str, bool]:
177 """Re-check health for all registered tools."""
178 return {name: self.health_check(name) for name in self._tools}
180 def create_endpoint_function(self, tool_name: str, endpoint_name: str) -> Optional[Callable]:
181 """
182 Create a callable for a specific endpoint.
184 Mirrors MCPToolRegistry.create_tool_function (mcp_integration.py:262-295):
185 returns a function with __name__ and __doc__ set for autogen registration.
186 """
187 tool = self._tools.get(tool_name)
188 if not tool:
189 return None
191 endpoint = tool.endpoints.get(endpoint_name)
192 if not endpoint:
193 return None
195 path = endpoint["path"]
196 method = endpoint.get("method", "POST").upper()
197 description = endpoint.get("description", f"{tool_name} {endpoint_name}")
198 timeout = tool.timeout
200 # Capture in closure
201 base_url = tool.base_url.rstrip("/")
202 api_key = tool.api_key
203 api_key_header = tool.api_key_header
205 # If endpoint has a native handler, use it directly (no HTTP)
206 native_handler = endpoint.get("native_handler")
208 def endpoint_executor(**kwargs) -> str:
209 """Execute the service tool endpoint."""
210 try:
211 if native_handler is not None:
212 return native_handler(json.dumps(kwargs))
214 from core.http_pool import pooled_get, pooled_post
216 headers = {"Content-Type": "application/json"}
217 if api_key:
218 headers[api_key_header] = f"Bearer {api_key}"
220 url = f"{base_url}{path}"
222 if method == "GET":
223 resp = pooled_get(url, params=kwargs, headers=headers, timeout=timeout)
224 else:
225 resp = pooled_post(url, json=kwargs, headers=headers, timeout=timeout)
227 if resp.status_code == 200:
228 return json.dumps(resp.json())
229 else:
230 return json.dumps({"success": False, "error": f"HTTP {resp.status_code}: {resp.text[:200]}"})
231 except Exception as e:
232 return json.dumps({"success": False, "error": str(e)})
234 # Set function metadata (same as MCPToolRegistry.create_tool_function)
235 func_name = f"{tool_name}_{endpoint_name}"
236 endpoint_executor.__name__ = func_name
237 endpoint_executor.__doc__ = description
239 return endpoint_executor
241 def get_all_tool_functions(self) -> Dict[str, Callable]:
242 """
243 Get all tools as executable functions.
245 Mirrors MCPToolRegistry.get_all_tool_functions (mcp_integration.py:297-311).
246 Creates one function per endpoint for each registered tool.
247 """
248 functions = {}
249 for tool_name, tool in self._tools.items():
250 for endpoint_name in tool.endpoints:
251 func = self.create_endpoint_function(tool_name, endpoint_name)
252 if func:
253 functions[func.__name__] = func
254 return functions
256 def get_tool_definitions(self) -> List[Dict[str, Any]]:
257 """
258 Get tool definitions in autogen-compatible format.
260 Mirrors MCPToolRegistry.get_tool_definitions (mcp_integration.py:241-260).
261 """
262 defs = []
263 for tool_name, tool in self._tools.items():
264 for ep_name, ep in tool.endpoints.items():
265 defs.append({
266 "name": f"{tool_name}_{ep_name}",
267 "description": ep.get("description", f"{tool_name} {ep_name}"),
268 "parameters": ep.get("params_schema", {}),
269 "service_tool": tool_name,
270 "endpoint": ep_name,
271 })
272 return defs
274 def get_langchain_tools(self) -> list:
275 """
276 Get healthy tools as LangChain Tool() objects for get_tools().
278 Plugs into hart_intelligence get_tools().
279 LangChain Tool func receives a single string — we route it to
280 the first parameter defined in the endpoint's params_schema.
281 """
282 tools = []
283 for tool_name, tool in self._tools.items():
284 if not tool.is_healthy:
285 continue
286 for ep_name, ep in tool.endpoints.items():
287 func = self.create_endpoint_function(tool_name, ep_name)
288 if func:
289 # Determine the primary parameter name from params_schema
290 # so the single LangChain string input maps correctly.
291 params = ep.get("params_schema", {})
292 primary_param = next(iter(params), "query") if params else "query"
294 tools.append(labeled_tool(
295 name=func.__name__,
296 func=lambda query, _f=func, _p=primary_param: _f(**{_p: query}),
297 description=ep.get("description", f"{tool_name} {ep_name}"),
298 ui_label=_derive_service_tool_label(tool_name, ep_name),
299 ))
300 return tools
302 def save_config(self) -> None:
303 """Persist registry to JSON config file."""
304 data = {"tools": [t.to_dict() for t in self._tools.values()]}
305 try:
306 with open(self._config_file, "w") as f:
307 json.dump(data, f, indent=2)
308 logger.info(f"Saved {len(self._tools)} tools to {self._config_file}")
309 except Exception as e:
310 logger.warning(f"Failed to save service tools config: {e}")
312 def load_config(self) -> int:
313 """Load registry from JSON config file. Returns count loaded."""
314 if not os.path.exists(self._config_file):
315 logger.info(f"No service tools config at {self._config_file}")
316 return 0
318 try:
319 with open(self._config_file, "r") as f:
320 data = json.load(f)
322 loaded = 0
323 for tool_data in data.get("tools", []):
324 tool_info = ServiceToolInfo.from_dict(tool_data)
325 if self.register_tool(tool_info):
326 loaded += 1
328 logger.info(f"Loaded {loaded} service tools from {self._config_file}")
329 return loaded
330 except Exception as e:
331 logger.warning(f"Failed to load service tools config: {e}")
332 return 0
335# Global singleton (mirrors mcp_registry in mcp/mcp_integration.py:315)
336service_tool_registry = ServiceToolRegistry()