Coverage for integrations / providers / discovery_agent.py: 68.0%
197 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Agentic Service Discovery — an agent that autonomously expands Nunba's capabilities.
4This is a meta-agent: it discovers, evaluates, and integrates new AI services
5so that OTHER agents can use them. Runs during idle via ResourceGovernor.
7Discovery sources (searched autonomously):
8 1. MCP server registries — scan for published MCP tool servers
9 2. OpenAPI specs — parse any service's Swagger/OpenAPI as tools
10 3. HuggingFace model hub — discover new models for local/API use
11 4. GitHub trending — find new AI tools and APIs
12 5. Known provider APIs — check for new models on existing providers
13 6. Hive network — learn about services other Nunba nodes discovered
15Pipeline per discovery cycle:
16 1. SEARCH: agent picks a source, queries for new services
17 2. EVALUATE: check pricing, capabilities, terms, reliability
18 3. REGISTER: add to ProviderRegistry if it passes evaluation
19 4. TEST: run a probe request via gateway
20 5. SCORE: update EfficiencyMatrix with results
21 6. SHARE: broadcast discovery to hive (if enabled)
23Safety:
24 - Only registers services that respond to a health check
25 - Never auto-sets API keys (user must configure in Settings)
26 - Rate-limited: max 5 discoveries per idle cycle
27 - Constitutional filter: skip services that violate guardrails
29Integration:
30 - ResourceGovernor._proactive_check_signals() triggers discovery
31 - Results stored in provider_registry.json (persisted)
32 - Discoveries visible in admin /providers page
33 - Uses LangChain agent with provider tools for autonomous reasoning
34"""
36import json
37import logging
38import os
39import re
40import threading
41import time
42import urllib.request
43import urllib.error
44from typing import Any, Dict, List, Optional, Tuple
46logger = logging.getLogger(__name__)
48# ── Constants ─────────────────────────────────────────────────────────
50MAX_DISCOVERIES_PER_CYCLE = 5
51DISCOVERY_COOLDOWN_HOURS = 6 # Don't re-scan same source within 6h
52PROBE_TIMEOUT_SECONDS = 10
54# Known OpenAI-compatible API patterns to auto-detect
55_OPENAI_COMPATIBLE_PATTERNS = [
56 '/v1/chat/completions',
57 '/v1/completions',
58 '/v1/models',
59 '/v1/embeddings',
60]
62# Sources the agent checks (rotated each cycle)
63DISCOVERY_SOURCES = [
64 'existing_providers', # Check existing providers for new models
65 'huggingface_trending', # HuggingFace trending models
66 'mcp_registry', # MCP server discovery
67 'openrouter_models', # OpenRouter model catalog (aggregator)
68]
71# ═══════════════════════════════════════════════════════════════════════
72# Discovery Agent
73# ═══════════════════════════════════════════════════════════════════════
75class DiscoveryAgent:
76 """Autonomous agent that discovers and integrates new AI services.
78 Does NOT use LangChain (avoids heavy imports). Instead, uses a simple
79 rule-based pipeline with web requests. The agent "reasons" by:
80 1. Picking a discovery source based on rotation + staleness
81 2. Fetching data from that source
82 3. Evaluating each candidate against criteria
83 4. Registering passing candidates
84 """
86 def __init__(self):
87 self._lock = threading.Lock()
88 self._last_scan: Dict[str, float] = {} # source → timestamp
89 self._source_index = 0
90 self._discoveries_this_cycle = 0
91 self._total_discoveries = 0
93 def run_discovery_cycle(self) -> List[Dict[str, Any]]:
94 """Run one discovery cycle. Called by ResourceGovernor during idle.
96 Returns list of newly discovered services (may be empty).
97 """
98 self._discoveries_this_cycle = 0
99 discoveries = []
101 # Pick next source (round-robin, skip if recently scanned)
102 source = self._pick_source()
103 if not source:
104 logger.debug("DiscoveryAgent: all sources recently scanned, skipping")
105 return []
107 logger.info("DiscoveryAgent: scanning source '%s'", source)
109 try:
110 if source == 'existing_providers':
111 discoveries = self._scan_existing_providers()
112 elif source == 'huggingface_trending':
113 discoveries = self._scan_huggingface()
114 elif source == 'mcp_registry':
115 discoveries = self._scan_mcp()
116 elif source == 'openrouter_models':
117 discoveries = self._scan_openrouter()
118 except Exception as e:
119 logger.warning("DiscoveryAgent: source '%s' failed: %s", source, e)
121 self._last_scan[source] = time.time()
122 self._total_discoveries += len(discoveries)
124 if discoveries:
125 logger.info("DiscoveryAgent: found %d new services from '%s'",
126 len(discoveries), source)
127 # Share with hive (best-effort)
128 self._share_with_hive(discoveries)
130 return discoveries
132 def _pick_source(self) -> Optional[str]:
133 """Pick the next discovery source, skipping recently scanned ones."""
134 now = time.time()
135 cooldown = DISCOVERY_COOLDOWN_HOURS * 3600
137 for _ in range(len(DISCOVERY_SOURCES)):
138 source = DISCOVERY_SOURCES[self._source_index % len(DISCOVERY_SOURCES)]
139 self._source_index += 1
140 last = self._last_scan.get(source, 0)
141 if now - last >= cooldown:
142 return source
143 return None
145 # ── Source: Existing Providers (check for new models) ─────────────
147 def _scan_existing_providers(self) -> List[Dict]:
148 """Check existing providers for newly added models."""
149 from integrations.providers.registry import get_registry
151 discoveries = []
152 registry = get_registry()
154 for provider in registry.list_api_providers():
155 if not provider.has_api_key():
156 continue
157 if self._discoveries_this_cycle >= MAX_DISCOVERIES_PER_CYCLE:
158 break
160 try:
161 # Query /v1/models endpoint to discover new models
162 url = f"{provider.base_url.rstrip('/')}/models"
163 headers = {'Authorization': f'Bearer {provider.get_api_key()}'}
164 req = urllib.request.Request(url, headers=headers)
165 with urllib.request.urlopen(req, timeout=PROBE_TIMEOUT_SECONDS) as resp:
166 data = json.loads(resp.read().decode())
168 models_list = data.get('data', data.get('models', []))
169 if isinstance(models_list, list):
170 known_ids = set(provider.models.keys())
171 for m in models_list:
172 mid = m.get('id', '') if isinstance(m, dict) else str(m)
173 if mid and mid not in known_ids:
174 # New model found on existing provider
175 discovery = self._register_new_model(
176 provider, mid, m if isinstance(m, dict) else {})
177 if discovery:
178 discoveries.append(discovery)
179 self._discoveries_this_cycle += 1
180 except Exception as e:
181 logger.debug("DiscoveryAgent: failed to scan %s models: %s",
182 provider.id, e)
184 return discoveries
186 def _register_new_model(self, provider, model_id: str,
187 model_info: dict) -> Optional[Dict]:
188 """Register a newly discovered model on an existing provider."""
189 from integrations.providers.registry import ProviderModel, PRICE_PER_1M_TOKENS
191 # Infer model type from ID
192 model_type = 'llm' # default
193 mid_lower = model_id.lower()
194 if any(kw in mid_lower for kw in ['embed', 'bge', 'e5', 'gte']):
195 model_type = 'embedding'
196 elif any(kw in mid_lower for kw in ['vlm', 'vision', 'vl-', 'minicpm']):
197 model_type = 'vlm'
198 elif any(kw in mid_lower for kw in ['flux', 'sdxl', 'dall', 'image']):
199 model_type = 'image_gen'
200 elif any(kw in mid_lower for kw in ['whisper', 'stt']):
201 model_type = 'stt'
202 elif any(kw in mid_lower for kw in ['tts', 'bark', 'tortoise', 'coqui']):
203 model_type = 'tts'
205 # Create ProviderModel entry
206 context_length = model_info.get('context_length', 0)
207 pm = ProviderModel(
208 model_id=model_id,
209 canonical_id=self._to_canonical_id(model_id),
210 model_type=model_type,
211 context_length=context_length or 0,
212 pricing_unit=PRICE_PER_1M_TOKENS,
213 supports_streaming=True,
214 enabled=True,
215 )
217 # Add to provider
218 provider.models[model_id] = pm
220 # Persist immediately so discoveries survive restart
221 from integrations.providers.registry import get_registry
222 get_registry().save()
224 logger.info("DiscoveryAgent: registered new model %s on %s (type=%s)",
225 model_id, provider.id, model_type)
227 return {
228 'type': 'new_model',
229 'provider': provider.id,
230 'model_id': model_id,
231 'model_type': model_type,
232 'timestamp': time.time(),
233 }
235 # ── Source: HuggingFace Trending ──────────────────────────────────
237 def _scan_huggingface(self) -> List[Dict]:
238 """Check HuggingFace for trending inference-ready models."""
239 discoveries = []
240 try:
241 # HuggingFace API: trending models with inference endpoints
242 url = ('https://huggingface.co/api/models'
243 '?sort=trending&limit=20&pipeline_tag=text-generation')
244 req = urllib.request.Request(url, headers={
245 'Accept': 'application/json',
246 })
247 with urllib.request.urlopen(req, timeout=PROBE_TIMEOUT_SECONDS) as resp:
248 models = json.loads(resp.read().decode())
250 from integrations.providers.registry import get_registry
251 registry = get_registry()
252 hf = registry.get('huggingface')
253 if not hf:
254 return []
256 known = set(hf.models.keys())
257 for m in models[:10]:
258 mid = m.get('id', '')
259 if mid and mid not in known and self._is_inference_ready(m):
260 discovery = self._register_new_model(hf, mid, {
261 'context_length': m.get('config', {}).get(
262 'max_position_embeddings', 0),
263 })
264 if discovery:
265 discovery['source'] = 'huggingface_trending'
266 discovery['downloads'] = m.get('downloads', 0)
267 discoveries.append(discovery)
268 self._discoveries_this_cycle += 1
269 if self._discoveries_this_cycle >= MAX_DISCOVERIES_PER_CYCLE:
270 break
272 except Exception as e:
273 logger.debug("DiscoveryAgent: HuggingFace scan failed: %s", e)
275 return discoveries
277 @staticmethod
278 def _is_inference_ready(model_info: dict) -> bool:
279 """Check if a HuggingFace model has inference endpoints."""
280 # Models with many downloads and recent activity are likely inference-ready
281 downloads = model_info.get('downloads', 0)
282 likes = model_info.get('likes', 0)
283 return downloads > 1000 and likes > 10
285 # ── Source: MCP Registry ──────────────────────────────────────────
287 def _scan_mcp(self) -> List[Dict]:
288 """Discover MCP tool servers and register as providers."""
289 discoveries = []
290 try:
291 from integrations.mcp.mcp_integration import get_mcp_registry
292 mcp_reg = get_mcp_registry()
293 count = mcp_reg.discover_all_tools()
294 if count > 0:
295 discoveries.append({
296 'type': 'mcp_tools',
297 'tools_discovered': count,
298 'timestamp': time.time(),
299 })
300 logger.info("DiscoveryAgent: discovered %d MCP tools", count)
301 except ImportError:
302 logger.debug("DiscoveryAgent: MCP integration not available")
303 except Exception as e:
304 logger.debug("DiscoveryAgent: MCP scan failed: %s", e)
305 return discoveries
307 # ── Source: OpenRouter Model Catalog ───────────────────────────────
309 def _scan_openrouter(self) -> List[Dict]:
310 """Scan OpenRouter for new models (aggregator of many providers)."""
311 discoveries = []
312 try:
313 url = 'https://openrouter.ai/api/v1/models'
314 req = urllib.request.Request(url, headers={'Accept': 'application/json'})
315 with urllib.request.urlopen(req, timeout=PROBE_TIMEOUT_SECONDS) as resp:
316 data = json.loads(resp.read().decode())
318 models = data.get('data', [])
319 from integrations.providers.registry import (
320 get_registry, ProviderModel, PRICE_PER_1M_TOKENS,
321 )
322 registry = get_registry()
323 openrouter = registry.get('openrouter')
324 if not openrouter:
325 return []
327 known = set(openrouter.models.keys())
328 for m in models:
329 mid = m.get('id', '')
330 if not mid or mid in known:
331 continue
332 if self._discoveries_this_cycle >= MAX_DISCOVERIES_PER_CYCLE:
333 break
335 pricing = m.get('pricing', {})
336 input_price = float(pricing.get('prompt', 0)) * 1_000_000 if pricing.get('prompt') else 0
337 output_price = float(pricing.get('completion', 0)) * 1_000_000 if pricing.get('completion') else 0
339 pm = ProviderModel(
340 model_id=mid,
341 canonical_id=self._to_canonical_id(mid),
342 model_type='llm',
343 context_length=m.get('context_length', 0),
344 input_price=input_price,
345 output_price=output_price,
346 pricing_unit=PRICE_PER_1M_TOKENS,
347 supports_streaming=True,
348 )
349 openrouter.models[mid] = pm
350 discoveries.append({
351 'type': 'new_model',
352 'provider': 'openrouter',
353 'model_id': mid,
354 'source': 'openrouter_catalog',
355 'timestamp': time.time(),
356 })
357 self._discoveries_this_cycle += 1
359 except Exception as e:
360 logger.debug("DiscoveryAgent: OpenRouter scan failed: %s", e)
362 if discoveries:
363 from integrations.providers.registry import get_registry
364 get_registry().save()
366 return discoveries
368 # ── Helpers ────────────────────────────────────────────────────────
370 @staticmethod
371 def _to_canonical_id(model_id: str) -> str:
372 """Convert provider model ID to a canonical slug for cross-provider matching."""
373 # "meta-llama/Llama-3.3-70B-Instruct" → "llama-3.3-70b"
374 name = model_id.split('/')[-1].lower()
375 # Remove common suffixes
376 for suffix in ['-instruct', '-turbo', '-chat', '-hf', '-fp8',
377 '-fp16', '-gguf', '-awq', '-gptq', '-preview']:
378 name = name.replace(suffix, '')
379 # Clean up
380 name = re.sub(r'[^a-z0-9.-]', '-', name)
381 name = re.sub(r'-+', '-', name).strip('-')
382 return name
384 def _share_with_hive(self, discoveries: List[Dict]):
385 """Broadcast discoveries to the hive network (best-effort)."""
386 try:
387 from integrations.channels.hive_signal_bridge import get_signal_bridge
388 bridge = get_signal_bridge()
389 for d in discoveries[:3]: # Limit to 3 per broadcast
390 bridge.emit_signal('service_discovery', d)
391 except Exception:
392 pass # Hive not available, that's fine
394 def get_stats(self) -> Dict[str, Any]:
395 return {
396 'total_discoveries': self._total_discoveries,
397 'last_scans': dict(self._last_scan),
398 'sources': DISCOVERY_SOURCES,
399 }
402# ═══════════════════════════════════════════════════════════════════════
403# Singleton
404# ═══════════════════════════════════════════════════════════════════════
406_agent: Optional[DiscoveryAgent] = None
407_agent_lock = threading.Lock()
410def get_discovery_agent() -> DiscoveryAgent:
411 global _agent
412 if _agent is None:
413 with _agent_lock:
414 if _agent is None:
415 _agent = DiscoveryAgent()
416 return _agent