Coverage for integrations / service_tools / registry.py: 38.2%

152 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Service Tool Registry — follows MCPToolRegistry pattern (mcp/mcp_integration.py) 

3but for any HTTP microservice (not just MCP protocol servers). 

4 

5Design: 

6- ServiceToolInfo describes a tool's endpoints, auth, and health check 

7- ServiceToolRegistry manages discovery, health, and function generation 

8- Global singleton: service_tool_registry (mirrors mcp_registry) 

9- Uses core.http_pool for connection pooling (same as MCP) 

10""" 

11 

12import json 

13import logging 

14import os 

15from dataclasses import dataclass, field 

16from datetime import datetime 

17from typing import Dict, List, Any, Optional, Callable 

18 

19logger = logging.getLogger(__name__) 

20 

21try: 

22 from core.labeled_tool import labeled_tool 

23except ImportError: # cx_Freeze / degraded test env 

24 def labeled_tool(name, func, description, *, ui_label): # type: ignore 

25 from langchain.agents import Tool as _Tool 

26 return _Tool(name=name, func=func, description=description) 

27 

28 

29# Friendly UI status labels per known service tool name fragment. 

30# Used by `get_langchain_tools()` to register Tool() display strings. 

31_SERVICE_TOOL_LABEL_HINTS = { 

32 "crawl4ai": "Crawling web content…", 

33 "acestep": "Generating music…", 

34 "ace_step": "Generating music…", 

35 "ace-step": "Generating music…", 

36 "diffrhythm": "Generating song…", 

37 "rembg": "Removing background…", 

38 "omniparser": "Parsing screen UI…", 

39 "wan2gp": "Generating video…", 

40 "f5": "Speaking with F5 voice…", 

41 "kokoro": "Speaking with Kokoro voice…", 

42 "cosyvoice": "Speaking with CosyVoice…", 

43 "chatterbox": "Speaking with Chatterbox…", 

44 "indic_parler": "Speaking Indic voice…", 

45 "indic-parler": "Speaking Indic voice…", 

46 "melotts": "Speaking with MeloTTS…", 

47 "mms_tts": "Speaking with MMS TTS…", 

48 "mms-tts": "Speaking with MMS TTS…", 

49 "neutts": "Speaking with NeuTTS…", 

50 "luxtts": "Speaking with LuxTTS…", 

51 "xtts": "Speaking with XTTS…", 

52 "omnivoice": "Speaking with OmniVoice…", 

53 "pocket_tts": "Speaking with PocketTTS…", 

54 "pocket-tts": "Speaking with PocketTTS…", 

55 "tts_audio_suite": "Synthesising speech…", 

56 "whisper": "Transcribing audio…", 

57} 

58 

59 

60def _derive_service_tool_label(tool_name: str, ep_name: str) -> str: 

61 """Resolve a ≤60-char human label for a service-tool endpoint.""" 

62 key = (tool_name or "").lower() 

63 for hint, label in _SERVICE_TOOL_LABEL_HINTS.items(): 

64 if hint in key: 

65 return label[:60] 

66 return (f"Calling {tool_name} service…")[:60] 

67 

68 

69@dataclass 

70class ServiceToolInfo: 

71 """Metadata for a registered service tool.""" 

72 name: str 

73 description: str 

74 base_url: str 

75 endpoints: Dict[str, Dict[str, Any]] # endpoint_name -> {path, method, description, params_schema} 

76 health_endpoint: str = "/health" 

77 version: str = "1.0.0" 

78 tags: List[str] = field(default_factory=list) 

79 api_key: Optional[str] = None 

80 api_key_header: str = "Authorization" 

81 timeout: int = 30 

82 is_healthy: bool = False 

83 registered_at: Optional[str] = None 

84 

85 def to_dict(self) -> Dict[str, Any]: 

86 return { 

87 "name": self.name, 

88 "description": self.description, 

89 "base_url": self.base_url, 

90 "endpoints": self.endpoints, 

91 "health_endpoint": self.health_endpoint, 

92 "version": self.version, 

93 "tags": self.tags, 

94 "api_key_header": self.api_key_header, 

95 "timeout": self.timeout, 

96 } 

97 

98 @classmethod 

99 def from_dict(cls, data: Dict[str, Any]) -> 'ServiceToolInfo': 

100 return cls( 

101 name=data["name"], 

102 description=data.get("description", ""), 

103 base_url=data["base_url"], 

104 endpoints=data.get("endpoints", {}), 

105 health_endpoint=data.get("health_endpoint", "/health"), 

106 version=data.get("version", "1.0.0"), 

107 tags=data.get("tags", []), 

108 api_key=data.get("api_key"), 

109 api_key_header=data.get("api_key_header", "Authorization"), 

110 timeout=data.get("timeout", 30), 

111 ) 

112 

113 

114class ServiceToolRegistry: 

115 """ 

116 Registry for HTTP microservice tools. 

117 

118 Mirrors MCPToolRegistry (mcp/mcp_integration.py:185-315): 

119 - add_server → register_tool (with health check) 

120 - create_tool_function → create_endpoint_function 

121 - get_all_tool_functions → same signature 

122 - Global singleton: service_tool_registry 

123 """ 

124 

125 def __init__(self, config_file: str = "service_tools.json"): 

126 self._tools: Dict[str, ServiceToolInfo] = {} 

127 self._config_file = config_file 

128 

129 def register_tool(self, tool_info: ServiceToolInfo) -> bool: 

130 """Register a tool. Health-checks first; skips if service is down.""" 

131 if tool_info.name in self._tools: 

132 logger.info(f"Service tool '{tool_info.name}' already registered, skipping") 

133 return True 

134 

135 tool_info.is_healthy = self._health_check(tool_info) 

136 tool_info.registered_at = datetime.now().isoformat() 

137 self._tools[tool_info.name] = tool_info 

138 

139 status = "healthy" if tool_info.is_healthy else "unhealthy (registered anyway)" 

140 logger.info(f"Registered service tool: {tool_info.name} [{status}]") 

141 return True 

142 

143 def unregister_tool(self, name: str) -> bool: 

144 if name in self._tools: 

145 del self._tools[name] 

146 logger.info(f"Unregistered service tool: {name}") 

147 return True 

148 return False 

149 

150 def _health_check(self, tool_info: ServiceToolInfo) -> bool: 

151 """Check if service is reachable.""" 

152 try: 

153 from core.http_pool import pooled_get 

154 headers = {} 

155 if tool_info.api_key: 

156 headers[tool_info.api_key_header] = f"Bearer {tool_info.api_key}" 

157 

158 response = pooled_get( 

159 f"{tool_info.base_url.rstrip('/')}{tool_info.health_endpoint}", 

160 headers=headers, 

161 timeout=5, 

162 ) 

163 return response.status_code == 200 

164 except Exception as e: 

165 logger.debug(f"Health check failed for {tool_info.name}: {e}") 

166 return False 

167 

168 def health_check(self, name: str) -> bool: 

169 """Re-check health for a specific tool.""" 

170 tool = self._tools.get(name) 

171 if not tool: 

172 return False 

173 tool.is_healthy = self._health_check(tool) 

174 return tool.is_healthy 

175 

176 def health_check_all(self) -> Dict[str, bool]: 

177 """Re-check health for all registered tools.""" 

178 return {name: self.health_check(name) for name in self._tools} 

179 

180 def create_endpoint_function(self, tool_name: str, endpoint_name: str) -> Optional[Callable]: 

181 """ 

182 Create a callable for a specific endpoint. 

183 

184 Mirrors MCPToolRegistry.create_tool_function (mcp_integration.py:262-295): 

185 returns a function with __name__ and __doc__ set for autogen registration. 

186 """ 

187 tool = self._tools.get(tool_name) 

188 if not tool: 

189 return None 

190 

191 endpoint = tool.endpoints.get(endpoint_name) 

192 if not endpoint: 

193 return None 

194 

195 path = endpoint["path"] 

196 method = endpoint.get("method", "POST").upper() 

197 description = endpoint.get("description", f"{tool_name} {endpoint_name}") 

198 timeout = tool.timeout 

199 

200 # Capture in closure 

201 base_url = tool.base_url.rstrip("/") 

202 api_key = tool.api_key 

203 api_key_header = tool.api_key_header 

204 

205 # If endpoint has a native handler, use it directly (no HTTP) 

206 native_handler = endpoint.get("native_handler") 

207 

208 def endpoint_executor(**kwargs) -> str: 

209 """Execute the service tool endpoint.""" 

210 try: 

211 if native_handler is not None: 

212 return native_handler(json.dumps(kwargs)) 

213 

214 from core.http_pool import pooled_get, pooled_post 

215 

216 headers = {"Content-Type": "application/json"} 

217 if api_key: 

218 headers[api_key_header] = f"Bearer {api_key}" 

219 

220 url = f"{base_url}{path}" 

221 

222 if method == "GET": 

223 resp = pooled_get(url, params=kwargs, headers=headers, timeout=timeout) 

224 else: 

225 resp = pooled_post(url, json=kwargs, headers=headers, timeout=timeout) 

226 

227 if resp.status_code == 200: 

228 return json.dumps(resp.json()) 

229 else: 

230 return json.dumps({"success": False, "error": f"HTTP {resp.status_code}: {resp.text[:200]}"}) 

231 except Exception as e: 

232 return json.dumps({"success": False, "error": str(e)}) 

233 

234 # Set function metadata (same as MCPToolRegistry.create_tool_function) 

235 func_name = f"{tool_name}_{endpoint_name}" 

236 endpoint_executor.__name__ = func_name 

237 endpoint_executor.__doc__ = description 

238 

239 return endpoint_executor 

240 

241 def get_all_tool_functions(self) -> Dict[str, Callable]: 

242 """ 

243 Get all tools as executable functions. 

244 

245 Mirrors MCPToolRegistry.get_all_tool_functions (mcp_integration.py:297-311). 

246 Creates one function per endpoint for each registered tool. 

247 """ 

248 functions = {} 

249 for tool_name, tool in self._tools.items(): 

250 for endpoint_name in tool.endpoints: 

251 func = self.create_endpoint_function(tool_name, endpoint_name) 

252 if func: 

253 functions[func.__name__] = func 

254 return functions 

255 

256 def get_tool_definitions(self) -> List[Dict[str, Any]]: 

257 """ 

258 Get tool definitions in autogen-compatible format. 

259 

260 Mirrors MCPToolRegistry.get_tool_definitions (mcp_integration.py:241-260). 

261 """ 

262 defs = [] 

263 for tool_name, tool in self._tools.items(): 

264 for ep_name, ep in tool.endpoints.items(): 

265 defs.append({ 

266 "name": f"{tool_name}_{ep_name}", 

267 "description": ep.get("description", f"{tool_name} {ep_name}"), 

268 "parameters": ep.get("params_schema", {}), 

269 "service_tool": tool_name, 

270 "endpoint": ep_name, 

271 }) 

272 return defs 

273 

274 def get_langchain_tools(self) -> list: 

275 """ 

276 Get healthy tools as LangChain Tool() objects for get_tools(). 

277 

278 Plugs into hart_intelligence get_tools(). 

279 LangChain Tool func receives a single string — we route it to 

280 the first parameter defined in the endpoint's params_schema. 

281 """ 

282 tools = [] 

283 for tool_name, tool in self._tools.items(): 

284 if not tool.is_healthy: 

285 continue 

286 for ep_name, ep in tool.endpoints.items(): 

287 func = self.create_endpoint_function(tool_name, ep_name) 

288 if func: 

289 # Determine the primary parameter name from params_schema 

290 # so the single LangChain string input maps correctly. 

291 params = ep.get("params_schema", {}) 

292 primary_param = next(iter(params), "query") if params else "query" 

293 

294 tools.append(labeled_tool( 

295 name=func.__name__, 

296 func=lambda query, _f=func, _p=primary_param: _f(**{_p: query}), 

297 description=ep.get("description", f"{tool_name} {ep_name}"), 

298 ui_label=_derive_service_tool_label(tool_name, ep_name), 

299 )) 

300 return tools 

301 

302 def save_config(self) -> None: 

303 """Persist registry to JSON config file.""" 

304 data = {"tools": [t.to_dict() for t in self._tools.values()]} 

305 try: 

306 with open(self._config_file, "w") as f: 

307 json.dump(data, f, indent=2) 

308 logger.info(f"Saved {len(self._tools)} tools to {self._config_file}") 

309 except Exception as e: 

310 logger.warning(f"Failed to save service tools config: {e}") 

311 

312 def load_config(self) -> int: 

313 """Load registry from JSON config file. Returns count loaded.""" 

314 if not os.path.exists(self._config_file): 

315 logger.info(f"No service tools config at {self._config_file}") 

316 return 0 

317 

318 try: 

319 with open(self._config_file, "r") as f: 

320 data = json.load(f) 

321 

322 loaded = 0 

323 for tool_data in data.get("tools", []): 

324 tool_info = ServiceToolInfo.from_dict(tool_data) 

325 if self.register_tool(tool_info): 

326 loaded += 1 

327 

328 logger.info(f"Loaded {loaded} service tools from {self._config_file}") 

329 return loaded 

330 except Exception as e: 

331 logger.warning(f"Failed to load service tools config: {e}") 

332 return 0 

333 

334 

335# Global singleton (mirrors mcp_registry in mcp/mcp_integration.py:315) 

336service_tool_registry = ServiceToolRegistry()