Coverage for integrations / providers / discovery_agent.py: 68.0%

197 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Agentic Service Discovery — an agent that autonomously expands Nunba's capabilities. 

3 

4This is a meta-agent: it discovers, evaluates, and integrates new AI services 

5so that OTHER agents can use them. Runs during idle via ResourceGovernor. 

6 

7Discovery sources (searched autonomously): 

8 1. MCP server registries — scan for published MCP tool servers 

9 2. OpenAPI specs — parse any service's Swagger/OpenAPI as tools 

10 3. HuggingFace model hub — discover new models for local/API use 

11 4. GitHub trending — find new AI tools and APIs 

12 5. Known provider APIs — check for new models on existing providers 

13 6. Hive network — learn about services other Nunba nodes discovered 

14 

15Pipeline per discovery cycle: 

16 1. SEARCH: agent picks a source, queries for new services 

17 2. EVALUATE: check pricing, capabilities, terms, reliability 

18 3. REGISTER: add to ProviderRegistry if it passes evaluation 

19 4. TEST: run a probe request via gateway 

20 5. SCORE: update EfficiencyMatrix with results 

21 6. SHARE: broadcast discovery to hive (if enabled) 

22 

23Safety: 

24 - Only registers services that respond to a health check 

25 - Never auto-sets API keys (user must configure in Settings) 

26 - Rate-limited: max 5 discoveries per idle cycle 

27 - Constitutional filter: skip services that violate guardrails 

28 

29Integration: 

30 - ResourceGovernor._proactive_check_signals() triggers discovery 

31 - Results stored in provider_registry.json (persisted) 

32 - Discoveries visible in admin /providers page 

33 - Uses LangChain agent with provider tools for autonomous reasoning 

34""" 

35 

36import json 

37import logging 

38import os 

39import re 

40import threading 

41import time 

42import urllib.request 

43import urllib.error 

44from typing import Any, Dict, List, Optional, Tuple 

45 

46logger = logging.getLogger(__name__) 

47 

48# ── Constants ───────────────────────────────────────────────────────── 

49 

50MAX_DISCOVERIES_PER_CYCLE = 5 

51DISCOVERY_COOLDOWN_HOURS = 6 # Don't re-scan same source within 6h 

52PROBE_TIMEOUT_SECONDS = 10 

53 

54# Known OpenAI-compatible API patterns to auto-detect 

55_OPENAI_COMPATIBLE_PATTERNS = [ 

56 '/v1/chat/completions', 

57 '/v1/completions', 

58 '/v1/models', 

59 '/v1/embeddings', 

60] 

61 

62# Sources the agent checks (rotated each cycle) 

63DISCOVERY_SOURCES = [ 

64 'existing_providers', # Check existing providers for new models 

65 'huggingface_trending', # HuggingFace trending models 

66 'mcp_registry', # MCP server discovery 

67 'openrouter_models', # OpenRouter model catalog (aggregator) 

68] 

69 

70 

71# ═══════════════════════════════════════════════════════════════════════ 

72# Discovery Agent 

73# ═══════════════════════════════════════════════════════════════════════ 

74 

75class DiscoveryAgent: 

76 """Autonomous agent that discovers and integrates new AI services. 

77 

78 Does NOT use LangChain (avoids heavy imports). Instead, uses a simple 

79 rule-based pipeline with web requests. The agent "reasons" by: 

80 1. Picking a discovery source based on rotation + staleness 

81 2. Fetching data from that source 

82 3. Evaluating each candidate against criteria 

83 4. Registering passing candidates 

84 """ 

85 

86 def __init__(self): 

87 self._lock = threading.Lock() 

88 self._last_scan: Dict[str, float] = {} # source → timestamp 

89 self._source_index = 0 

90 self._discoveries_this_cycle = 0 

91 self._total_discoveries = 0 

92 

93 def run_discovery_cycle(self) -> List[Dict[str, Any]]: 

94 """Run one discovery cycle. Called by ResourceGovernor during idle. 

95 

96 Returns list of newly discovered services (may be empty). 

97 """ 

98 self._discoveries_this_cycle = 0 

99 discoveries = [] 

100 

101 # Pick next source (round-robin, skip if recently scanned) 

102 source = self._pick_source() 

103 if not source: 

104 logger.debug("DiscoveryAgent: all sources recently scanned, skipping") 

105 return [] 

106 

107 logger.info("DiscoveryAgent: scanning source '%s'", source) 

108 

109 try: 

110 if source == 'existing_providers': 

111 discoveries = self._scan_existing_providers() 

112 elif source == 'huggingface_trending': 

113 discoveries = self._scan_huggingface() 

114 elif source == 'mcp_registry': 

115 discoveries = self._scan_mcp() 

116 elif source == 'openrouter_models': 

117 discoveries = self._scan_openrouter() 

118 except Exception as e: 

119 logger.warning("DiscoveryAgent: source '%s' failed: %s", source, e) 

120 

121 self._last_scan[source] = time.time() 

122 self._total_discoveries += len(discoveries) 

123 

124 if discoveries: 

125 logger.info("DiscoveryAgent: found %d new services from '%s'", 

126 len(discoveries), source) 

127 # Share with hive (best-effort) 

128 self._share_with_hive(discoveries) 

129 

130 return discoveries 

131 

132 def _pick_source(self) -> Optional[str]: 

133 """Pick the next discovery source, skipping recently scanned ones.""" 

134 now = time.time() 

135 cooldown = DISCOVERY_COOLDOWN_HOURS * 3600 

136 

137 for _ in range(len(DISCOVERY_SOURCES)): 

138 source = DISCOVERY_SOURCES[self._source_index % len(DISCOVERY_SOURCES)] 

139 self._source_index += 1 

140 last = self._last_scan.get(source, 0) 

141 if now - last >= cooldown: 

142 return source 

143 return None 

144 

145 # ── Source: Existing Providers (check for new models) ───────────── 

146 

147 def _scan_existing_providers(self) -> List[Dict]: 

148 """Check existing providers for newly added models.""" 

149 from integrations.providers.registry import get_registry 

150 

151 discoveries = [] 

152 registry = get_registry() 

153 

154 for provider in registry.list_api_providers(): 

155 if not provider.has_api_key(): 

156 continue 

157 if self._discoveries_this_cycle >= MAX_DISCOVERIES_PER_CYCLE: 

158 break 

159 

160 try: 

161 # Query /v1/models endpoint to discover new models 

162 url = f"{provider.base_url.rstrip('/')}/models" 

163 headers = {'Authorization': f'Bearer {provider.get_api_key()}'} 

164 req = urllib.request.Request(url, headers=headers) 

165 with urllib.request.urlopen(req, timeout=PROBE_TIMEOUT_SECONDS) as resp: 

166 data = json.loads(resp.read().decode()) 

167 

168 models_list = data.get('data', data.get('models', [])) 

169 if isinstance(models_list, list): 

170 known_ids = set(provider.models.keys()) 

171 for m in models_list: 

172 mid = m.get('id', '') if isinstance(m, dict) else str(m) 

173 if mid and mid not in known_ids: 

174 # New model found on existing provider 

175 discovery = self._register_new_model( 

176 provider, mid, m if isinstance(m, dict) else {}) 

177 if discovery: 

178 discoveries.append(discovery) 

179 self._discoveries_this_cycle += 1 

180 except Exception as e: 

181 logger.debug("DiscoveryAgent: failed to scan %s models: %s", 

182 provider.id, e) 

183 

184 return discoveries 

185 

186 def _register_new_model(self, provider, model_id: str, 

187 model_info: dict) -> Optional[Dict]: 

188 """Register a newly discovered model on an existing provider.""" 

189 from integrations.providers.registry import ProviderModel, PRICE_PER_1M_TOKENS 

190 

191 # Infer model type from ID 

192 model_type = 'llm' # default 

193 mid_lower = model_id.lower() 

194 if any(kw in mid_lower for kw in ['embed', 'bge', 'e5', 'gte']): 

195 model_type = 'embedding' 

196 elif any(kw in mid_lower for kw in ['vlm', 'vision', 'vl-', 'minicpm']): 

197 model_type = 'vlm' 

198 elif any(kw in mid_lower for kw in ['flux', 'sdxl', 'dall', 'image']): 

199 model_type = 'image_gen' 

200 elif any(kw in mid_lower for kw in ['whisper', 'stt']): 

201 model_type = 'stt' 

202 elif any(kw in mid_lower for kw in ['tts', 'bark', 'tortoise', 'coqui']): 

203 model_type = 'tts' 

204 

205 # Create ProviderModel entry 

206 context_length = model_info.get('context_length', 0) 

207 pm = ProviderModel( 

208 model_id=model_id, 

209 canonical_id=self._to_canonical_id(model_id), 

210 model_type=model_type, 

211 context_length=context_length or 0, 

212 pricing_unit=PRICE_PER_1M_TOKENS, 

213 supports_streaming=True, 

214 enabled=True, 

215 ) 

216 

217 # Add to provider 

218 provider.models[model_id] = pm 

219 

220 # Persist immediately so discoveries survive restart 

221 from integrations.providers.registry import get_registry 

222 get_registry().save() 

223 

224 logger.info("DiscoveryAgent: registered new model %s on %s (type=%s)", 

225 model_id, provider.id, model_type) 

226 

227 return { 

228 'type': 'new_model', 

229 'provider': provider.id, 

230 'model_id': model_id, 

231 'model_type': model_type, 

232 'timestamp': time.time(), 

233 } 

234 

235 # ── Source: HuggingFace Trending ────────────────────────────────── 

236 

237 def _scan_huggingface(self) -> List[Dict]: 

238 """Check HuggingFace for trending inference-ready models.""" 

239 discoveries = [] 

240 try: 

241 # HuggingFace API: trending models with inference endpoints 

242 url = ('https://huggingface.co/api/models' 

243 '?sort=trending&limit=20&pipeline_tag=text-generation') 

244 req = urllib.request.Request(url, headers={ 

245 'Accept': 'application/json', 

246 }) 

247 with urllib.request.urlopen(req, timeout=PROBE_TIMEOUT_SECONDS) as resp: 

248 models = json.loads(resp.read().decode()) 

249 

250 from integrations.providers.registry import get_registry 

251 registry = get_registry() 

252 hf = registry.get('huggingface') 

253 if not hf: 

254 return [] 

255 

256 known = set(hf.models.keys()) 

257 for m in models[:10]: 

258 mid = m.get('id', '') 

259 if mid and mid not in known and self._is_inference_ready(m): 

260 discovery = self._register_new_model(hf, mid, { 

261 'context_length': m.get('config', {}).get( 

262 'max_position_embeddings', 0), 

263 }) 

264 if discovery: 

265 discovery['source'] = 'huggingface_trending' 

266 discovery['downloads'] = m.get('downloads', 0) 

267 discoveries.append(discovery) 

268 self._discoveries_this_cycle += 1 

269 if self._discoveries_this_cycle >= MAX_DISCOVERIES_PER_CYCLE: 

270 break 

271 

272 except Exception as e: 

273 logger.debug("DiscoveryAgent: HuggingFace scan failed: %s", e) 

274 

275 return discoveries 

276 

277 @staticmethod 

278 def _is_inference_ready(model_info: dict) -> bool: 

279 """Check if a HuggingFace model has inference endpoints.""" 

280 # Models with many downloads and recent activity are likely inference-ready 

281 downloads = model_info.get('downloads', 0) 

282 likes = model_info.get('likes', 0) 

283 return downloads > 1000 and likes > 10 

284 

285 # ── Source: MCP Registry ────────────────────────────────────────── 

286 

287 def _scan_mcp(self) -> List[Dict]: 

288 """Discover MCP tool servers and register as providers.""" 

289 discoveries = [] 

290 try: 

291 from integrations.mcp.mcp_integration import get_mcp_registry 

292 mcp_reg = get_mcp_registry() 

293 count = mcp_reg.discover_all_tools() 

294 if count > 0: 

295 discoveries.append({ 

296 'type': 'mcp_tools', 

297 'tools_discovered': count, 

298 'timestamp': time.time(), 

299 }) 

300 logger.info("DiscoveryAgent: discovered %d MCP tools", count) 

301 except ImportError: 

302 logger.debug("DiscoveryAgent: MCP integration not available") 

303 except Exception as e: 

304 logger.debug("DiscoveryAgent: MCP scan failed: %s", e) 

305 return discoveries 

306 

307 # ── Source: OpenRouter Model Catalog ─────────────────────────────── 

308 

309 def _scan_openrouter(self) -> List[Dict]: 

310 """Scan OpenRouter for new models (aggregator of many providers).""" 

311 discoveries = [] 

312 try: 

313 url = 'https://openrouter.ai/api/v1/models' 

314 req = urllib.request.Request(url, headers={'Accept': 'application/json'}) 

315 with urllib.request.urlopen(req, timeout=PROBE_TIMEOUT_SECONDS) as resp: 

316 data = json.loads(resp.read().decode()) 

317 

318 models = data.get('data', []) 

319 from integrations.providers.registry import ( 

320 get_registry, ProviderModel, PRICE_PER_1M_TOKENS, 

321 ) 

322 registry = get_registry() 

323 openrouter = registry.get('openrouter') 

324 if not openrouter: 

325 return [] 

326 

327 known = set(openrouter.models.keys()) 

328 for m in models: 

329 mid = m.get('id', '') 

330 if not mid or mid in known: 

331 continue 

332 if self._discoveries_this_cycle >= MAX_DISCOVERIES_PER_CYCLE: 

333 break 

334 

335 pricing = m.get('pricing', {}) 

336 input_price = float(pricing.get('prompt', 0)) * 1_000_000 if pricing.get('prompt') else 0 

337 output_price = float(pricing.get('completion', 0)) * 1_000_000 if pricing.get('completion') else 0 

338 

339 pm = ProviderModel( 

340 model_id=mid, 

341 canonical_id=self._to_canonical_id(mid), 

342 model_type='llm', 

343 context_length=m.get('context_length', 0), 

344 input_price=input_price, 

345 output_price=output_price, 

346 pricing_unit=PRICE_PER_1M_TOKENS, 

347 supports_streaming=True, 

348 ) 

349 openrouter.models[mid] = pm 

350 discoveries.append({ 

351 'type': 'new_model', 

352 'provider': 'openrouter', 

353 'model_id': mid, 

354 'source': 'openrouter_catalog', 

355 'timestamp': time.time(), 

356 }) 

357 self._discoveries_this_cycle += 1 

358 

359 except Exception as e: 

360 logger.debug("DiscoveryAgent: OpenRouter scan failed: %s", e) 

361 

362 if discoveries: 

363 from integrations.providers.registry import get_registry 

364 get_registry().save() 

365 

366 return discoveries 

367 

368 # ── Helpers ──────────────────────────────────────────────────────── 

369 

370 @staticmethod 

371 def _to_canonical_id(model_id: str) -> str: 

372 """Convert provider model ID to a canonical slug for cross-provider matching.""" 

373 # "meta-llama/Llama-3.3-70B-Instruct" → "llama-3.3-70b" 

374 name = model_id.split('/')[-1].lower() 

375 # Remove common suffixes 

376 for suffix in ['-instruct', '-turbo', '-chat', '-hf', '-fp8', 

377 '-fp16', '-gguf', '-awq', '-gptq', '-preview']: 

378 name = name.replace(suffix, '') 

379 # Clean up 

380 name = re.sub(r'[^a-z0-9.-]', '-', name) 

381 name = re.sub(r'-+', '-', name).strip('-') 

382 return name 

383 

384 def _share_with_hive(self, discoveries: List[Dict]): 

385 """Broadcast discoveries to the hive network (best-effort).""" 

386 try: 

387 from integrations.channels.hive_signal_bridge import get_signal_bridge 

388 bridge = get_signal_bridge() 

389 for d in discoveries[:3]: # Limit to 3 per broadcast 

390 bridge.emit_signal('service_discovery', d) 

391 except Exception: 

392 pass # Hive not available, that's fine 

393 

394 def get_stats(self) -> Dict[str, Any]: 

395 return { 

396 'total_discoveries': self._total_discoveries, 

397 'last_scans': dict(self._last_scan), 

398 'sources': DISCOVERY_SOURCES, 

399 } 

400 

401 

402# ═══════════════════════════════════════════════════════════════════════ 

403# Singleton 

404# ═══════════════════════════════════════════════════════════════════════ 

405 

406_agent: Optional[DiscoveryAgent] = None 

407_agent_lock = threading.Lock() 

408 

409 

410def get_discovery_agent() -> DiscoveryAgent: 

411 global _agent 

412 if _agent is None: 

413 with _agent_lock: 

414 if _agent is None: 

415 _agent = DiscoveryAgent() 

416 return _agent