Coverage for integrations / agent_engine / content_gen_tools.py: 72.3%
47 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Content Generation AutoGen Tools
44 tools for the content_gen goal type — used by the monitor agent to
5check status, retry stuck tasks, inspect services, and force regeneration.
6"""
7import json
8import logging
9from typing import Annotated
11logger = logging.getLogger('hevolve_social')
14def get_content_gen_status(
15 game_id: Annotated[str, "Game ID to check (e.g. 'eng-spell-animals-01')"]
16) -> str:
17 """Get content generation status for a game.
19 Returns JSON with per-task breakdown, progress_pct, 24h delta, stuck tasks.
20 """
21 try:
22 from integrations.social.models import get_db
23 from .content_gen_tracker import ContentGenTracker
25 db = get_db()
26 try:
27 progress = ContentGenTracker.get_game_progress(db, game_id)
28 if not progress:
29 return json.dumps({
30 'error': f'No content generation goal found for game {game_id}'
31 })
32 return json.dumps(progress, default=str)
33 finally:
34 db.close()
35 except Exception as e:
36 return json.dumps({'error': str(e)})
39def retry_stuck_task(
40 game_id: Annotated[str, "Game ID with stuck content"],
41 task_type: Annotated[str, "Media type to retry: image, tts, music, or video"] = None
42) -> str:
43 """Retry a stuck content generation task.
45 Checks if the service is running, restarts if needed, then retries.
46 If task_type is omitted, retries all stuck tasks for the game.
47 """
48 try:
49 from integrations.social.models import get_db
50 from .content_gen_tracker import ContentGenTracker
52 db = get_db()
53 try:
54 if task_type:
55 ContentGenTracker.update_task_job(
56 db, game_id, task_type,
57 status='retrying', error=None)
58 db.commit()
59 return json.dumps({
60 'success': True,
61 'detail': f'Retrying {task_type} for game {game_id}'
62 })
63 else:
64 result = ContentGenTracker.attempt_unblock(db, game_id)
65 db.commit()
66 return json.dumps(result)
67 finally:
68 db.close()
69 except Exception as e:
70 return json.dumps({'error': str(e)})
73def check_media_services() -> str:
74 """Check health of all media generation services.
76 Returns which services are running, which need restart.
77 Services: txt2img, tts_audio_suite, acestep, wan2gp, ltx2.
78 """
79 try:
80 from .content_gen_tracker import ContentGenTracker
81 health = ContentGenTracker.get_services_health()
82 return json.dumps({
83 'services': {name: 'running' if ok else 'offline'
84 for name, ok in health.items()},
85 'all_healthy': all(health.values()),
86 })
87 except Exception as e:
88 return json.dumps({'error': str(e)})
91def force_regenerate(
92 game_id: Annotated[str, "Game ID to regenerate content for"],
93 asset_type: Annotated[str, "Asset type: image, tts, music, or video"],
94 prompt: Annotated[str, "Generation prompt"] = None
95) -> str:
96 """Force regeneration of a specific asset type for a game.
98 Clears the existing job status and triggers a fresh generation.
99 """
100 try:
101 from integrations.social.models import get_db
102 from .content_gen_tracker import ContentGenTracker
103 import uuid
105 db = get_db()
106 try:
107 new_job_id = f'{asset_type}_{uuid.uuid4().hex[:12]}'
108 ContentGenTracker.update_task_job(
109 db, game_id, asset_type,
110 job_id=new_job_id,
111 status='pending',
112 progress=0,
113 error=None)
114 db.commit()
115 return json.dumps({
116 'success': True,
117 'job_id': new_job_id,
118 'detail': f'Queued {asset_type} regeneration for {game_id}'
119 })
120 finally:
121 db.close()
122 except Exception as e:
123 return json.dumps({'error': str(e)})
126# Tool registration for ServiceToolRegistry
127CONTENT_GEN_TOOLS = [
128 {
129 'name': 'get_content_gen_status',
130 'func': get_content_gen_status,
131 'description': 'Get content generation status for a kids learning game',
132 'tags': ['content_gen'],
133 },
134 {
135 'name': 'retry_stuck_task',
136 'func': retry_stuck_task,
137 'description': 'Retry a stuck content generation task for a game',
138 'tags': ['content_gen'],
139 },
140 {
141 'name': 'check_media_services',
142 'func': check_media_services,
143 'description': 'Check health of all media generation services',
144 'tags': ['content_gen'],
145 },
146 {
147 'name': 'force_regenerate',
148 'func': force_regenerate,
149 'description': 'Force regeneration of a specific asset for a game',
150 'tags': ['content_gen'],
151 },
152]