Coverage for integrations / mcp / mcp_server.py: 0.0%

232 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HARTOS MCP Server — stdio-based Model Context Protocol server 

3 

4Exposes HARTOS agent ecosystem tools to Claude Code for orchestration. 

5Run: python -m integrations.mcp.mcp_server 

6 

7Tools: 

8 list_agents, list_goals, create_goal, dispatch_goal, agent_status, 

9 remember, recall, list_recipes, system_health, social_query 

10""" 

11import os 

12import sys 

13import json 

14import glob as _glob 

15import logging 

16from pathlib import Path 

17from typing import Optional 

18from core.port_registry import get_port 

19from core.http_pool import pooled_get, pooled_post 

20 

21from mcp.server.fastmcp import FastMCP 

22 

23logger = logging.getLogger('hartos_mcp') 

24logging.basicConfig(level=logging.INFO, format='%(name)s %(levelname)s %(message)s') 

25 

26mcp = FastMCP("hartos", instructions=( 

27 "HARTOS agent ecosystem tools. Use these to orchestrate autonomous agents, " 

28 "manage goals, query memory, and monitor system health." 

29)) 

30 

31# ─── Lazy imports (deferred to avoid import-time side effects) ─── 

32 

33_registry = None 

34_memory_graph = None 

35 

36 

37def _get_registry(): 

38 global _registry 

39 if _registry is None: 

40 from integrations.expert_agents.registry import ExpertAgentRegistry 

41 _registry = ExpertAgentRegistry() 

42 return _registry 

43 

44 

45def _get_db(): 

46 from integrations.social.models import get_db 

47 return get_db() 

48 

49 

50def _get_memory_graph(user_id: str = 'system'): 

51 global _memory_graph 

52 if _memory_graph is None: 

53 from integrations.channels.memory.memory_graph import MemoryGraph 

54 try: 

55 from core.platform_paths import get_memory_graph_dir 

56 db_path = get_memory_graph_dir() 

57 except ImportError: 

58 db_path = os.path.join( 

59 os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'memory_graph' 

60 ) 

61 _memory_graph = MemoryGraph(db_path=db_path, user_id=user_id) 

62 return _memory_graph 

63 

64 

65# ─── Tools ─── 

66 

67@mcp.tool() 

68def list_agents(category: Optional[str] = None, query: Optional[str] = None) -> str: 

69 """List available expert agents. Filter by category or search by query. 

70 

71 Categories: software_dev, data_analytics, creative, business, education, 

72 health, security, devops, research, robotics 

73 """ 

74 reg = _get_registry() 

75 

76 if query: 

77 agents = reg.search_agents(query) 

78 elif category: 

79 from integrations.expert_agents.registry import AgentCategory 

80 cat_map = {name.lower(): member for name, member in AgentCategory.__members__.items()} 

81 cat = cat_map.get(category.lower()) 

82 if not cat: 

83 return json.dumps({"error": f"Unknown category: {category}. Valid: {list(cat_map.keys())}"}) 

84 agents = reg.get_agents_by_category(cat) 

85 else: 

86 agents = list(reg.agents.values()) 

87 

88 result = [] 

89 for a in agents: 

90 result.append({ 

91 "agent_id": a.agent_id, 

92 "name": a.name, 

93 "category": a.category.name if hasattr(a.category, 'name') else str(a.category), 

94 "description": a.description, 

95 "model_type": a.model_type, 

96 }) 

97 

98 # Also include dynamically discovered agents (trained recipes) 

99 prompts_dir = os.path.join(os.path.dirname(__file__), '..', '..', 'prompts') 

100 dynamic = [] 

101 if os.path.isdir(prompts_dir): 

102 for f in _glob.glob(os.path.join(prompts_dir, '*.json')): 

103 try: 

104 with open(f) as fh: 

105 data = json.load(fh) 

106 dynamic.append({ 

107 "agent_id": data.get("prompt_id", Path(f).stem), 

108 "name": data.get("agent_name", Path(f).stem), 

109 "category": "dynamic_recipe", 

110 "description": data.get("description", "Trained agent recipe"), 

111 "model_type": "llm", 

112 }) 

113 except Exception: 

114 pass 

115 

116 return json.dumps({ 

117 "expert_agents": len(result), 

118 "dynamic_agents": len(dynamic), 

119 "agents": result[:50], # cap at 50 to avoid token overflow 

120 "dynamic": dynamic[:20], 

121 }, indent=2) 

122 

123 

124@mcp.tool() 

125def list_goals( 

126 goal_type: Optional[str] = None, 

127 status: Optional[str] = None 

128) -> str: 

129 """List agent goals. Filter by type (marketing, coding, ip_protection, etc.) or status (active, pending, completed).""" 

130 try: 

131 from integrations.agent_engine.goal_manager import GoalManager 

132 db = _get_db() 

133 try: 

134 goals = GoalManager.list_goals(db, goal_type=goal_type, status=status) 

135 return json.dumps({"count": len(goals), "goals": goals}, indent=2, default=str) 

136 finally: 

137 db.close() 

138 except Exception as e: 

139 return json.dumps({"error": str(e)}) 

140 

141 

142@mcp.tool() 

143def create_goal( 

144 goal_type: str, 

145 title: str, 

146 description: str = '', 

147 spark_budget: int = 200 

148) -> str: 

149 """Create a new goal for agents to pursue. 

150 

151 goal_type: marketing, coding, ip_protection, revenue, finance, self_heal, 

152 federation, upgrade, thought_experiment, news, provision, content_gen 

153 """ 

154 try: 

155 from integrations.agent_engine.goal_manager import GoalManager 

156 db = _get_db() 

157 try: 

158 result = GoalManager.create_goal( 

159 db, 

160 goal_type=goal_type, 

161 title=title, 

162 description=description, 

163 spark_budget=spark_budget, 

164 ) 

165 db.commit() 

166 return json.dumps(result, indent=2, default=str) 

167 finally: 

168 db.close() 

169 except Exception as e: 

170 return json.dumps({"error": str(e)}) 

171 

172 

173@mcp.tool() 

174def dispatch_goal(goal_id: str, goal_type: str = 'marketing') -> str: 

175 """Dispatch a goal to an idle agent for execution. The daemon does this automatically every 30s, but this forces immediate dispatch.""" 

176 try: 

177 from integrations.agent_engine.goal_manager import GoalManager 

178 db = _get_db() 

179 try: 

180 goal_result = GoalManager.get_goal(db, goal_id) 

181 if not goal_result.get('success'): 

182 return json.dumps({"error": f"Goal {goal_id} not found"}) 

183 

184 goal = goal_result['goal'] 

185 prompt = goal.get('description', goal.get('title', '')) 

186 

187 # Get system agent user_id 

188 from integrations.social.models import User 

189 sys_agent = db.query(User).filter_by(username='hevolve_system_agent').first() 

190 user_id = sys_agent.id if sys_agent else 'system' 

191 finally: 

192 db.close() 

193 

194 from integrations.agent_engine.dispatch import dispatch_goal as _dispatch 

195 response = _dispatch( 

196 prompt=prompt, 

197 user_id=user_id, 

198 goal_id=goal_id, 

199 goal_type=goal_type, 

200 ) 

201 return json.dumps({"dispatched": True, "goal_id": goal_id, "response_preview": str(response)[:500]}, default=str) 

202 except Exception as e: 

203 return json.dumps({"error": str(e)}) 

204 

205 

206@mcp.tool() 

207def agent_status() -> str: 

208 """Check agent daemon health, active dispatches, and system state. 

209 

210 All probes flow through ``core.health_probe`` (single canonical 

211 source). See that module's docstring for the root-cause notes 

212 on why we route through it instead of reading env vars directly. 

213 """ 

214 from core.health_probe import ( 

215 probe_agent_daemon, probe_llm, probe_nunba_flask, 

216 ) 

217 status = probe_agent_daemon() 

218 status['nunba_server'] = probe_nunba_flask() 

219 status['llm_server'] = probe_llm() 

220 

221 # Goal counts (DB query — kept inline; not a "probe" in the 

222 # health-check sense, this is a count-by-status aggregation). 

223 try: 

224 from integrations.agent_engine.goal_manager import GoalManager 

225 db = _get_db() 

226 try: 

227 all_goals = GoalManager.list_goals(db) 

228 by_status = {} 

229 for g in all_goals: 

230 s = g.get('status', 'unknown') 

231 by_status[s] = by_status.get(s, 0) + 1 

232 status['goals'] = {'total': len(all_goals), 'by_status': by_status} 

233 finally: 

234 db.close() 

235 except Exception as e: 

236 status['goals'] = {'error': str(e)} 

237 

238 # Expert agent count 

239 try: 

240 reg = _get_registry() 

241 status['expert_agents'] = len(reg.agents) 

242 except Exception: 

243 status['expert_agents'] = 'unknown' 

244 

245 return json.dumps(status, indent=2, default=str) 

246 

247 

248@mcp.tool() 

249def remember(content: str, memory_type: str = 'decision') -> str: 

250 """Store a memory in the persistent memory graph. Types: fact, decision, insight, lifecycle.""" 

251 try: 

252 mg = _get_memory_graph() 

253 memory_id = mg.register( 

254 content=content, 

255 metadata={'memory_type': memory_type, 'source_agent': 'claude_orchestrator'}, 

256 ) 

257 return json.dumps({"stored": True, "memory_id": memory_id}) 

258 except Exception as e: 

259 return json.dumps({"error": str(e)}) 

260 

261 

262@mcp.tool() 

263def recall(query: str, top_k: int = 5) -> str: 

264 """Search the persistent memory graph. Returns relevant memories ranked by relevance.""" 

265 try: 

266 mg = _get_memory_graph() 

267 memories = mg.recall(query=query, mode='hybrid', top_k=top_k) 

268 result = [] 

269 for m in memories: 

270 result.append({ 

271 "id": m.id, 

272 "content": m.content, 

273 "memory_type": m.memory_type, 

274 "source_agent": m.source_agent, 

275 "created_at": m.created_at, 

276 }) 

277 return json.dumps({"count": len(result), "memories": result}, indent=2, default=str) 

278 except Exception as e: 

279 return json.dumps({"error": str(e)}) 

280 

281 

282@mcp.tool() 

283def list_recipes() -> str: 

284 """List trained agent recipes (prompts/*.json files).""" 

285 prompts_dir = os.path.join(os.path.dirname(__file__), '..', '..', 'prompts') 

286 recipes = [] 

287 if os.path.isdir(prompts_dir): 

288 for f in sorted(_glob.glob(os.path.join(prompts_dir, '*.json'))): 

289 try: 

290 with open(f) as fh: 

291 data = json.load(fh) 

292 recipes.append({ 

293 "file": Path(f).name, 

294 "prompt_id": data.get("prompt_id", ""), 

295 "agent_name": data.get("agent_name", ""), 

296 "status": data.get("agent_status", ""), 

297 "description": data.get("description", "")[:200], 

298 }) 

299 except Exception: 

300 recipes.append({"file": Path(f).name, "error": "parse failed"}) 

301 

302 return json.dumps({"count": len(recipes), "recipes": recipes}, indent=2) 

303 

304 

305@mcp.tool() 

306def system_health() -> str: 

307 """Full system health check: Flask server, LLM, DB, memory graph. 

308 

309 All non-DB probes flow through ``core.health_probe`` (single 

310 canonical source). See that module for why we no longer hit 

311 ``localhost:{get_port('llm')}/health`` directly. 

312 """ 

313 from core.health_probe import probe_nunba_flask, probe_llm, probe_langchain 

314 health = { 

315 'flask': probe_nunba_flask(), 

316 'llm': probe_llm(), 

317 'langchain': probe_langchain(), 

318 } 

319 

320 # DB 

321 try: 

322 db = _get_db() 

323 try: 

324 from integrations.social.models import User 

325 count = db.query(User).count() 

326 health['db'] = {'status': 'up', 'user_count': count} 

327 finally: 

328 db.close() 

329 except Exception as e: 

330 health['db'] = {'status': 'error', 'detail': str(e)} 

331 

332 # Memory graph 

333 try: 

334 mg = _get_memory_graph() 

335 health['memory'] = {'status': 'up', 'db_path': mg.db_path if hasattr(mg, 'db_path') else 'unknown'} 

336 except Exception as e: 

337 health['memory'] = {'status': 'error', 'detail': str(e)} 

338 

339 return json.dumps(health, indent=2, default=str) 

340 

341 

342@mcp.tool() 

343def social_query(query_type: str, limit: int = 20) -> str: 

344 """Read-only social DB queries. Types: users, posts, goals, products, agents. 

345 

346 Returns recent entries. For safety, only SELECT operations are performed. 

347 """ 

348 try: 

349 db = _get_db() 

350 try: 

351 if query_type == 'users': 

352 from integrations.social.models import User 

353 rows = db.query(User).order_by(User.created_at.desc()).limit(limit).all() 

354 return json.dumps([{ 

355 "id": r.id, "username": r.username, "display_name": r.display_name, 

356 "user_type": r.user_type, "role": r.role, "karma_score": r.karma_score, 

357 } for r in rows], indent=2, default=str) 

358 

359 elif query_type == 'posts': 

360 from integrations.social.models import Post 

361 rows = db.query(Post).order_by(Post.created_at.desc()).limit(limit).all() 

362 return json.dumps([{ 

363 "id": r.id, "title": getattr(r, 'title', ''), "author_id": r.author_id, 

364 "content": (r.content or '')[:200], "vote_count": getattr(r, 'vote_count', 0), 

365 } for r in rows], indent=2, default=str) 

366 

367 elif query_type == 'goals': 

368 from integrations.agent_engine.goal_manager import GoalManager 

369 goals = GoalManager.list_goals(db) 

370 return json.dumps({"count": len(goals), "goals": goals[:limit]}, indent=2, default=str) 

371 

372 elif query_type == 'products': 

373 from integrations.agent_engine.goal_manager import ProductManager 

374 products = ProductManager.list_products(db) 

375 return json.dumps({"count": len(products), "products": products[:limit]}, indent=2, default=str) 

376 

377 elif query_type == 'agents': 

378 from integrations.social.models import User 

379 rows = db.query(User).filter_by(user_type='agent').limit(limit).all() 

380 return json.dumps([{ 

381 "id": r.id, "username": r.username, "display_name": r.display_name, 

382 "agent_id": r.agent_id, "karma_score": r.karma_score, 

383 } for r in rows], indent=2, default=str) 

384 

385 else: 

386 return json.dumps({"error": f"Unknown query_type: {query_type}. Valid: users, posts, goals, products, agents"}) 

387 finally: 

388 db.close() 

389 except Exception as e: 

390 return json.dumps({"error": str(e)}) 

391 

392 

393@mcp.tool() 

394def switch_model(model_name: str) -> str: 

395 """Switch the local LLM model at runtime. Restarts llama-server with the new model. 

396 

397 Available models: 

398 - "default" or "qwen35-4b": Qwen3.5-4B VL (recommended, vision+text) [index 0] 

399 - "qwen35-2b": Qwen3.5-2B VL (lightweight, low VRAM / CPU) [index 1] 

400 - "vision" or "qwen3-vl-2b": Qwen3-VL-2B (older vision model) [index 2] 

401 - "gemma": Gemma-3-1B (smallest, fastest, text-only) [index 3] 

402 - "qwen3-2b": Qwen3-2B (text-only) [index 4] 

403 """ 

404 name_to_index = { 

405 "default": 0, "text": 0, "qwen35-4b": 0, "qwen3.5-4b": 0, 

406 "qwen35-2b": 1, "qwen3.5-2b": 1, 

407 "vision": 2, "qwen3-vl-2b": 2, "vl": 2, "vl-2b": 2, 

408 "gemma": 3, "gemma-1b": 3, 

409 "qwen3-2b": 4, 

410 } 

411 

412 model_index = name_to_index.get(model_name.lower().strip()) 

413 if model_index is None: 

414 try: 

415 model_index = int(model_name) 

416 except ValueError: 

417 return json.dumps({ 

418 "error": f"Unknown model: {model_name}", 

419 "valid": list(name_to_index.keys()), 

420 }) 

421 

422 try: 

423 import requests 

424 resp = pooled_post('http://localhost:5000/api/llm/switch', json={"model_index": model_index}, timeout=120) 

425 if resp.status_code == 200: 

426 return json.dumps(resp.json(), default=str) 

427 return json.dumps({"error": f"Server returned {resp.status_code}: {resp.text[:300]}"}) 

428 except requests.exceptions.ConnectionError: 

429 # Server not running — update config directly 

430 config_path = os.path.join(os.path.expanduser('~'), '.nunba', 'llama_config.json') 

431 try: 

432 with open(config_path) as f: 

433 cfg = json.load(f) 

434 cfg['selected_model_index'] = model_index 

435 with open(config_path, 'w') as f: 

436 json.dump(cfg, f, indent=2) 

437 from llama.llama_installer import MODEL_PRESETS 

438 preset = MODEL_PRESETS[model_index] if model_index < len(MODEL_PRESETS) else None 

439 return json.dumps({ 

440 "config_updated": True, 

441 "model_index": model_index, 

442 "model_name": preset.display_name if preset else "unknown", 

443 "note": "Server not running. Config saved — will use this model on next start." 

444 }) 

445 except Exception as e: 

446 return json.dumps({"error": str(e)}) 

447 except Exception as e: 

448 return json.dumps({"error": str(e)}) 

449 

450 

451@mcp.tool() 

452def code( 

453 task: str, 

454 task_type: str = 'feature', 

455 preferred_tool: str = '', 

456 working_dir: str = '', 

457 model: str = '', 

458) -> str: 

459 """Execute a coding task via the distributed coding agent. 

460 

461 Routes to the best available tool (kilocode, claude_code, opencode, aider). 

462 Records benchmarks. Captures edits as recipes for REUSE mode. 

463 

464 task_type: feature, bug_fix, refactor, code_review, app_build 

465 """ 

466 try: 

467 from integrations.coding_agent.orchestrator import get_coding_orchestrator 

468 orchestrator = get_coding_orchestrator() 

469 result = orchestrator.execute( 

470 task=task, 

471 task_type=task_type, 

472 preferred_tool=preferred_tool, 

473 user_id='claude_mcp', 

474 model=model, 

475 working_dir=working_dir or os.getcwd(), 

476 ) 

477 return json.dumps(result, indent=2, default=str) 

478 except Exception as e: 

479 return json.dumps({"error": str(e)}) 

480 

481 

482@mcp.tool() 

483def onboard_kong( 

484 kong_url: str = 'http://localhost:8001', 

485 upstream_url: str = 'http://localhost:8000', 

486) -> str: 

487 """Onboard the Mindstory SDK into Kong API Gateway. 

488 

489 Creates service, routes, and plugins (key-auth, rate-limiting, cors). 

490 Idempotent — safe to call multiple times. Queries existing config first. 

491 """ 

492 try: 

493 from integrations.gateway.kong_onboard import onboard 

494 ok = onboard(kong_url=kong_url, upstream_url=upstream_url) 

495 return json.dumps({"success": ok}) 

496 except Exception as e: 

497 return json.dumps({"error": str(e)}) 

498 

499 

500# ─── Entry point ─── 

501 

502def start_sse_server(host: str = '127.0.0.1', port: int = None): 

503 """Start MCP server with SSE transport for HTTP clients (Nunba, external). 

504 

505 This runs the FastMCP server on a dedicated port using Server-Sent Events. 

506 Clients connect via standard MCP SSE protocol. 

507 """ 

508 if port is None: 

509 port = get_port('mcp') 

510 logger.info(f"Starting MCP SSE server on {host}:{port}") 

511 mcp.run(transport="sse", host=host, port=port) 

512 

513 

514def start_sse_server_background(host: str = '127.0.0.1', port: int = None): 

515 """Start MCP SSE server in a background thread.""" 

516 import threading 

517 t = threading.Thread( 

518 target=start_sse_server, 

519 args=(host, port), 

520 daemon=True, 

521 name='mcp-sse-server', 

522 ) 

523 t.start() 

524 logger.info("MCP SSE server started in background thread") 

525 return t 

526 

527 

528if __name__ == "__main__": 

529 # Ensure HARTOS root is on sys.path 

530 hartos_root = str(Path(__file__).resolve().parent.parent.parent) 

531 if hartos_root not in sys.path: 

532 sys.path.insert(0, hartos_root) 

533 

534 # Support --sse flag for HTTP mode, default to stdio for Claude Code 

535 transport = "stdio" 

536 if "--sse" in sys.argv: 

537 transport = "sse" 

538 elif "--http" in sys.argv: 

539 transport = "streamable-http" 

540 

541 if transport == "stdio": 

542 mcp.run(transport="stdio") 

543 else: 

544 port = get_port('mcp') 

545 logger.info(f"Starting MCP server with {transport} transport on port {port}") 

546 mcp.run(transport=transport, host="127.0.0.1", port=port)