Coverage for integrations / mcp / mcp_server.py: 0.0%
232 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HARTOS MCP Server — stdio-based Model Context Protocol server
4Exposes HARTOS agent ecosystem tools to Claude Code for orchestration.
5Run: python -m integrations.mcp.mcp_server
7Tools:
8 list_agents, list_goals, create_goal, dispatch_goal, agent_status,
9 remember, recall, list_recipes, system_health, social_query
10"""
11import os
12import sys
13import json
14import glob as _glob
15import logging
16from pathlib import Path
17from typing import Optional
18from core.port_registry import get_port
19from core.http_pool import pooled_get, pooled_post
21from mcp.server.fastmcp import FastMCP
23logger = logging.getLogger('hartos_mcp')
24logging.basicConfig(level=logging.INFO, format='%(name)s %(levelname)s %(message)s')
26mcp = FastMCP("hartos", instructions=(
27 "HARTOS agent ecosystem tools. Use these to orchestrate autonomous agents, "
28 "manage goals, query memory, and monitor system health."
29))
31# ─── Lazy imports (deferred to avoid import-time side effects) ───
33_registry = None
34_memory_graph = None
37def _get_registry():
38 global _registry
39 if _registry is None:
40 from integrations.expert_agents.registry import ExpertAgentRegistry
41 _registry = ExpertAgentRegistry()
42 return _registry
45def _get_db():
46 from integrations.social.models import get_db
47 return get_db()
50def _get_memory_graph(user_id: str = 'system'):
51 global _memory_graph
52 if _memory_graph is None:
53 from integrations.channels.memory.memory_graph import MemoryGraph
54 try:
55 from core.platform_paths import get_memory_graph_dir
56 db_path = get_memory_graph_dir()
57 except ImportError:
58 db_path = os.path.join(
59 os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'memory_graph'
60 )
61 _memory_graph = MemoryGraph(db_path=db_path, user_id=user_id)
62 return _memory_graph
65# ─── Tools ───
67@mcp.tool()
68def list_agents(category: Optional[str] = None, query: Optional[str] = None) -> str:
69 """List available expert agents. Filter by category or search by query.
71 Categories: software_dev, data_analytics, creative, business, education,
72 health, security, devops, research, robotics
73 """
74 reg = _get_registry()
76 if query:
77 agents = reg.search_agents(query)
78 elif category:
79 from integrations.expert_agents.registry import AgentCategory
80 cat_map = {name.lower(): member for name, member in AgentCategory.__members__.items()}
81 cat = cat_map.get(category.lower())
82 if not cat:
83 return json.dumps({"error": f"Unknown category: {category}. Valid: {list(cat_map.keys())}"})
84 agents = reg.get_agents_by_category(cat)
85 else:
86 agents = list(reg.agents.values())
88 result = []
89 for a in agents:
90 result.append({
91 "agent_id": a.agent_id,
92 "name": a.name,
93 "category": a.category.name if hasattr(a.category, 'name') else str(a.category),
94 "description": a.description,
95 "model_type": a.model_type,
96 })
98 # Also include dynamically discovered agents (trained recipes)
99 prompts_dir = os.path.join(os.path.dirname(__file__), '..', '..', 'prompts')
100 dynamic = []
101 if os.path.isdir(prompts_dir):
102 for f in _glob.glob(os.path.join(prompts_dir, '*.json')):
103 try:
104 with open(f) as fh:
105 data = json.load(fh)
106 dynamic.append({
107 "agent_id": data.get("prompt_id", Path(f).stem),
108 "name": data.get("agent_name", Path(f).stem),
109 "category": "dynamic_recipe",
110 "description": data.get("description", "Trained agent recipe"),
111 "model_type": "llm",
112 })
113 except Exception:
114 pass
116 return json.dumps({
117 "expert_agents": len(result),
118 "dynamic_agents": len(dynamic),
119 "agents": result[:50], # cap at 50 to avoid token overflow
120 "dynamic": dynamic[:20],
121 }, indent=2)
124@mcp.tool()
125def list_goals(
126 goal_type: Optional[str] = None,
127 status: Optional[str] = None
128) -> str:
129 """List agent goals. Filter by type (marketing, coding, ip_protection, etc.) or status (active, pending, completed)."""
130 try:
131 from integrations.agent_engine.goal_manager import GoalManager
132 db = _get_db()
133 try:
134 goals = GoalManager.list_goals(db, goal_type=goal_type, status=status)
135 return json.dumps({"count": len(goals), "goals": goals}, indent=2, default=str)
136 finally:
137 db.close()
138 except Exception as e:
139 return json.dumps({"error": str(e)})
142@mcp.tool()
143def create_goal(
144 goal_type: str,
145 title: str,
146 description: str = '',
147 spark_budget: int = 200
148) -> str:
149 """Create a new goal for agents to pursue.
151 goal_type: marketing, coding, ip_protection, revenue, finance, self_heal,
152 federation, upgrade, thought_experiment, news, provision, content_gen
153 """
154 try:
155 from integrations.agent_engine.goal_manager import GoalManager
156 db = _get_db()
157 try:
158 result = GoalManager.create_goal(
159 db,
160 goal_type=goal_type,
161 title=title,
162 description=description,
163 spark_budget=spark_budget,
164 )
165 db.commit()
166 return json.dumps(result, indent=2, default=str)
167 finally:
168 db.close()
169 except Exception as e:
170 return json.dumps({"error": str(e)})
173@mcp.tool()
174def dispatch_goal(goal_id: str, goal_type: str = 'marketing') -> str:
175 """Dispatch a goal to an idle agent for execution. The daemon does this automatically every 30s, but this forces immediate dispatch."""
176 try:
177 from integrations.agent_engine.goal_manager import GoalManager
178 db = _get_db()
179 try:
180 goal_result = GoalManager.get_goal(db, goal_id)
181 if not goal_result.get('success'):
182 return json.dumps({"error": f"Goal {goal_id} not found"})
184 goal = goal_result['goal']
185 prompt = goal.get('description', goal.get('title', ''))
187 # Get system agent user_id
188 from integrations.social.models import User
189 sys_agent = db.query(User).filter_by(username='hevolve_system_agent').first()
190 user_id = sys_agent.id if sys_agent else 'system'
191 finally:
192 db.close()
194 from integrations.agent_engine.dispatch import dispatch_goal as _dispatch
195 response = _dispatch(
196 prompt=prompt,
197 user_id=user_id,
198 goal_id=goal_id,
199 goal_type=goal_type,
200 )
201 return json.dumps({"dispatched": True, "goal_id": goal_id, "response_preview": str(response)[:500]}, default=str)
202 except Exception as e:
203 return json.dumps({"error": str(e)})
206@mcp.tool()
207def agent_status() -> str:
208 """Check agent daemon health, active dispatches, and system state.
210 All probes flow through ``core.health_probe`` (single canonical
211 source). See that module's docstring for the root-cause notes
212 on why we route through it instead of reading env vars directly.
213 """
214 from core.health_probe import (
215 probe_agent_daemon, probe_llm, probe_nunba_flask,
216 )
217 status = probe_agent_daemon()
218 status['nunba_server'] = probe_nunba_flask()
219 status['llm_server'] = probe_llm()
221 # Goal counts (DB query — kept inline; not a "probe" in the
222 # health-check sense, this is a count-by-status aggregation).
223 try:
224 from integrations.agent_engine.goal_manager import GoalManager
225 db = _get_db()
226 try:
227 all_goals = GoalManager.list_goals(db)
228 by_status = {}
229 for g in all_goals:
230 s = g.get('status', 'unknown')
231 by_status[s] = by_status.get(s, 0) + 1
232 status['goals'] = {'total': len(all_goals), 'by_status': by_status}
233 finally:
234 db.close()
235 except Exception as e:
236 status['goals'] = {'error': str(e)}
238 # Expert agent count
239 try:
240 reg = _get_registry()
241 status['expert_agents'] = len(reg.agents)
242 except Exception:
243 status['expert_agents'] = 'unknown'
245 return json.dumps(status, indent=2, default=str)
248@mcp.tool()
249def remember(content: str, memory_type: str = 'decision') -> str:
250 """Store a memory in the persistent memory graph. Types: fact, decision, insight, lifecycle."""
251 try:
252 mg = _get_memory_graph()
253 memory_id = mg.register(
254 content=content,
255 metadata={'memory_type': memory_type, 'source_agent': 'claude_orchestrator'},
256 )
257 return json.dumps({"stored": True, "memory_id": memory_id})
258 except Exception as e:
259 return json.dumps({"error": str(e)})
262@mcp.tool()
263def recall(query: str, top_k: int = 5) -> str:
264 """Search the persistent memory graph. Returns relevant memories ranked by relevance."""
265 try:
266 mg = _get_memory_graph()
267 memories = mg.recall(query=query, mode='hybrid', top_k=top_k)
268 result = []
269 for m in memories:
270 result.append({
271 "id": m.id,
272 "content": m.content,
273 "memory_type": m.memory_type,
274 "source_agent": m.source_agent,
275 "created_at": m.created_at,
276 })
277 return json.dumps({"count": len(result), "memories": result}, indent=2, default=str)
278 except Exception as e:
279 return json.dumps({"error": str(e)})
282@mcp.tool()
283def list_recipes() -> str:
284 """List trained agent recipes (prompts/*.json files)."""
285 prompts_dir = os.path.join(os.path.dirname(__file__), '..', '..', 'prompts')
286 recipes = []
287 if os.path.isdir(prompts_dir):
288 for f in sorted(_glob.glob(os.path.join(prompts_dir, '*.json'))):
289 try:
290 with open(f) as fh:
291 data = json.load(fh)
292 recipes.append({
293 "file": Path(f).name,
294 "prompt_id": data.get("prompt_id", ""),
295 "agent_name": data.get("agent_name", ""),
296 "status": data.get("agent_status", ""),
297 "description": data.get("description", "")[:200],
298 })
299 except Exception:
300 recipes.append({"file": Path(f).name, "error": "parse failed"})
302 return json.dumps({"count": len(recipes), "recipes": recipes}, indent=2)
305@mcp.tool()
306def system_health() -> str:
307 """Full system health check: Flask server, LLM, DB, memory graph.
309 All non-DB probes flow through ``core.health_probe`` (single
310 canonical source). See that module for why we no longer hit
311 ``localhost:{get_port('llm')}/health`` directly.
312 """
313 from core.health_probe import probe_nunba_flask, probe_llm, probe_langchain
314 health = {
315 'flask': probe_nunba_flask(),
316 'llm': probe_llm(),
317 'langchain': probe_langchain(),
318 }
320 # DB
321 try:
322 db = _get_db()
323 try:
324 from integrations.social.models import User
325 count = db.query(User).count()
326 health['db'] = {'status': 'up', 'user_count': count}
327 finally:
328 db.close()
329 except Exception as e:
330 health['db'] = {'status': 'error', 'detail': str(e)}
332 # Memory graph
333 try:
334 mg = _get_memory_graph()
335 health['memory'] = {'status': 'up', 'db_path': mg.db_path if hasattr(mg, 'db_path') else 'unknown'}
336 except Exception as e:
337 health['memory'] = {'status': 'error', 'detail': str(e)}
339 return json.dumps(health, indent=2, default=str)
342@mcp.tool()
343def social_query(query_type: str, limit: int = 20) -> str:
344 """Read-only social DB queries. Types: users, posts, goals, products, agents.
346 Returns recent entries. For safety, only SELECT operations are performed.
347 """
348 try:
349 db = _get_db()
350 try:
351 if query_type == 'users':
352 from integrations.social.models import User
353 rows = db.query(User).order_by(User.created_at.desc()).limit(limit).all()
354 return json.dumps([{
355 "id": r.id, "username": r.username, "display_name": r.display_name,
356 "user_type": r.user_type, "role": r.role, "karma_score": r.karma_score,
357 } for r in rows], indent=2, default=str)
359 elif query_type == 'posts':
360 from integrations.social.models import Post
361 rows = db.query(Post).order_by(Post.created_at.desc()).limit(limit).all()
362 return json.dumps([{
363 "id": r.id, "title": getattr(r, 'title', ''), "author_id": r.author_id,
364 "content": (r.content or '')[:200], "vote_count": getattr(r, 'vote_count', 0),
365 } for r in rows], indent=2, default=str)
367 elif query_type == 'goals':
368 from integrations.agent_engine.goal_manager import GoalManager
369 goals = GoalManager.list_goals(db)
370 return json.dumps({"count": len(goals), "goals": goals[:limit]}, indent=2, default=str)
372 elif query_type == 'products':
373 from integrations.agent_engine.goal_manager import ProductManager
374 products = ProductManager.list_products(db)
375 return json.dumps({"count": len(products), "products": products[:limit]}, indent=2, default=str)
377 elif query_type == 'agents':
378 from integrations.social.models import User
379 rows = db.query(User).filter_by(user_type='agent').limit(limit).all()
380 return json.dumps([{
381 "id": r.id, "username": r.username, "display_name": r.display_name,
382 "agent_id": r.agent_id, "karma_score": r.karma_score,
383 } for r in rows], indent=2, default=str)
385 else:
386 return json.dumps({"error": f"Unknown query_type: {query_type}. Valid: users, posts, goals, products, agents"})
387 finally:
388 db.close()
389 except Exception as e:
390 return json.dumps({"error": str(e)})
393@mcp.tool()
394def switch_model(model_name: str) -> str:
395 """Switch the local LLM model at runtime. Restarts llama-server with the new model.
397 Available models:
398 - "default" or "qwen35-4b": Qwen3.5-4B VL (recommended, vision+text) [index 0]
399 - "qwen35-2b": Qwen3.5-2B VL (lightweight, low VRAM / CPU) [index 1]
400 - "vision" or "qwen3-vl-2b": Qwen3-VL-2B (older vision model) [index 2]
401 - "gemma": Gemma-3-1B (smallest, fastest, text-only) [index 3]
402 - "qwen3-2b": Qwen3-2B (text-only) [index 4]
403 """
404 name_to_index = {
405 "default": 0, "text": 0, "qwen35-4b": 0, "qwen3.5-4b": 0,
406 "qwen35-2b": 1, "qwen3.5-2b": 1,
407 "vision": 2, "qwen3-vl-2b": 2, "vl": 2, "vl-2b": 2,
408 "gemma": 3, "gemma-1b": 3,
409 "qwen3-2b": 4,
410 }
412 model_index = name_to_index.get(model_name.lower().strip())
413 if model_index is None:
414 try:
415 model_index = int(model_name)
416 except ValueError:
417 return json.dumps({
418 "error": f"Unknown model: {model_name}",
419 "valid": list(name_to_index.keys()),
420 })
422 try:
423 import requests
424 resp = pooled_post('http://localhost:5000/api/llm/switch', json={"model_index": model_index}, timeout=120)
425 if resp.status_code == 200:
426 return json.dumps(resp.json(), default=str)
427 return json.dumps({"error": f"Server returned {resp.status_code}: {resp.text[:300]}"})
428 except requests.exceptions.ConnectionError:
429 # Server not running — update config directly
430 config_path = os.path.join(os.path.expanduser('~'), '.nunba', 'llama_config.json')
431 try:
432 with open(config_path) as f:
433 cfg = json.load(f)
434 cfg['selected_model_index'] = model_index
435 with open(config_path, 'w') as f:
436 json.dump(cfg, f, indent=2)
437 from llama.llama_installer import MODEL_PRESETS
438 preset = MODEL_PRESETS[model_index] if model_index < len(MODEL_PRESETS) else None
439 return json.dumps({
440 "config_updated": True,
441 "model_index": model_index,
442 "model_name": preset.display_name if preset else "unknown",
443 "note": "Server not running. Config saved — will use this model on next start."
444 })
445 except Exception as e:
446 return json.dumps({"error": str(e)})
447 except Exception as e:
448 return json.dumps({"error": str(e)})
451@mcp.tool()
452def code(
453 task: str,
454 task_type: str = 'feature',
455 preferred_tool: str = '',
456 working_dir: str = '',
457 model: str = '',
458) -> str:
459 """Execute a coding task via the distributed coding agent.
461 Routes to the best available tool (kilocode, claude_code, opencode, aider).
462 Records benchmarks. Captures edits as recipes for REUSE mode.
464 task_type: feature, bug_fix, refactor, code_review, app_build
465 """
466 try:
467 from integrations.coding_agent.orchestrator import get_coding_orchestrator
468 orchestrator = get_coding_orchestrator()
469 result = orchestrator.execute(
470 task=task,
471 task_type=task_type,
472 preferred_tool=preferred_tool,
473 user_id='claude_mcp',
474 model=model,
475 working_dir=working_dir or os.getcwd(),
476 )
477 return json.dumps(result, indent=2, default=str)
478 except Exception as e:
479 return json.dumps({"error": str(e)})
482@mcp.tool()
483def onboard_kong(
484 kong_url: str = 'http://localhost:8001',
485 upstream_url: str = 'http://localhost:8000',
486) -> str:
487 """Onboard the Mindstory SDK into Kong API Gateway.
489 Creates service, routes, and plugins (key-auth, rate-limiting, cors).
490 Idempotent — safe to call multiple times. Queries existing config first.
491 """
492 try:
493 from integrations.gateway.kong_onboard import onboard
494 ok = onboard(kong_url=kong_url, upstream_url=upstream_url)
495 return json.dumps({"success": ok})
496 except Exception as e:
497 return json.dumps({"error": str(e)})
500# ─── Entry point ───
502def start_sse_server(host: str = '127.0.0.1', port: int = None):
503 """Start MCP server with SSE transport for HTTP clients (Nunba, external).
505 This runs the FastMCP server on a dedicated port using Server-Sent Events.
506 Clients connect via standard MCP SSE protocol.
507 """
508 if port is None:
509 port = get_port('mcp')
510 logger.info(f"Starting MCP SSE server on {host}:{port}")
511 mcp.run(transport="sse", host=host, port=port)
514def start_sse_server_background(host: str = '127.0.0.1', port: int = None):
515 """Start MCP SSE server in a background thread."""
516 import threading
517 t = threading.Thread(
518 target=start_sse_server,
519 args=(host, port),
520 daemon=True,
521 name='mcp-sse-server',
522 )
523 t.start()
524 logger.info("MCP SSE server started in background thread")
525 return t
528if __name__ == "__main__":
529 # Ensure HARTOS root is on sys.path
530 hartos_root = str(Path(__file__).resolve().parent.parent.parent)
531 if hartos_root not in sys.path:
532 sys.path.insert(0, hartos_root)
534 # Support --sse flag for HTTP mode, default to stdio for Claude Code
535 transport = "stdio"
536 if "--sse" in sys.argv:
537 transport = "sse"
538 elif "--http" in sys.argv:
539 transport = "streamable-http"
541 if transport == "stdio":
542 mcp.run(transport="stdio")
543 else:
544 port = get_port('mcp')
545 logger.info(f"Starting MCP server with {transport} transport on port {port}")
546 mcp.run(transport=transport, host="127.0.0.1", port=port)