Coverage for integrations / agent_engine / content_gen_tracker.py: 91.3%
254 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Content Generation Tracker — per-game content gen task tracking.
4Links games to AgentGoal (goal_type='content_gen') with SmartLedger subtasks.
5Tracks progress snapshots for 24h delta computation. Detects stuck jobs.
6"""
7import json
8import logging
9import os
10import time
11from datetime import datetime, timedelta
12from typing import Dict, List, Optional
14from sqlalchemy.orm.attributes import flag_modified
16logger = logging.getLogger('hevolve_social')
18# Media types that a game can require
19MEDIA_TYPES = ('image', 'tts', 'music', 'video')
21# Max snapshot history (7 days of hourly snapshots)
22MAX_SNAPSHOTS = 168
25class ContentGenTracker:
26 """Track content generation tasks per game using AgentGoal + SmartLedger."""
28 @staticmethod
29 def get_or_create_game_goal(db, game_id: str, game_config: dict) -> Optional[Dict]:
30 """Find existing content_gen goal for game or create one.
32 Uses config_json.game_id for lookup. Idempotent.
34 Args:
35 db: SQLAlchemy session
36 game_id: Unique game identifier (e.g. 'eng-spell-animals-01')
37 game_config: Game configuration dict with content requirements
39 Returns:
40 Goal dict or None on error
41 """
42 try:
43 from integrations.social.models import AgentGoal
44 from .goal_manager import GoalManager
46 # Check for existing goal
47 existing = db.query(AgentGoal).filter(
48 AgentGoal.goal_type == 'content_gen',
49 AgentGoal.status.in_(['active', 'paused']),
50 ).all()
52 for goal in existing:
53 config = goal.config_json or {}
54 if config.get('game_id') == game_id:
55 return goal.to_dict()
57 # Compute media requirements from game config
58 media_reqs = ContentGenTracker._compute_media_requirements(game_config)
60 # Create new goal
61 config_json = {
62 'game_id': game_id,
63 'game_title': game_config.get('title', game_id),
64 'media_requirements': media_reqs,
65 'progress_snapshots': [],
66 'task_jobs': {}, # {task_type: {job_id, status, progress}}
67 }
69 result = GoalManager.create_goal(
70 db,
71 goal_type='content_gen',
72 title=f"Content generation: {game_config.get('title', game_id)}",
73 description=f"Generate media assets for game {game_id}: "
74 f"{json.dumps(media_reqs)}",
75 owner_id=None,
76 config_json=config_json,
77 spark_budget=100,
78 )
79 if result:
80 db.flush()
81 return result
82 except Exception as e:
83 logger.debug(f"ContentGenTracker.get_or_create_game_goal failed: {e}")
84 return None
86 @staticmethod
87 def _compute_media_requirements(game_config: dict) -> Dict:
88 """Compute how many media assets a game needs.
90 Walks the game config to count image prompts, text for TTS, etc.
91 """
92 reqs = {'images': 0, 'tts': 0, 'music': 0, 'video': 0}
93 content = game_config.get('content', {})
95 # Count image prompts
96 for key in ('questions', 'words', 'pairs', 'rounds', 'items',
97 'sequences', 'statements', 'sentences', 'scenes'):
98 items = content.get(key, [])
99 if isinstance(items, list):
100 for item in items:
101 if isinstance(item, dict):
102 if item.get('imagePrompt') or item.get('image_prompt'):
103 reqs['images'] += 1
104 # Count nested items (e.g., options with images)
105 for sub_key in ('options', 'cards', 'items'):
106 sub_items = item.get(sub_key, [])
107 if isinstance(sub_items, list):
108 for sub in sub_items:
109 if isinstance(sub, dict) and (
110 sub.get('imagePrompt') or sub.get('image_prompt')):
111 reqs['images'] += 1
113 # Count TTS text segments
114 for key in ('questions', 'words', 'statements', 'sentences'):
115 items = content.get(key, [])
116 if isinstance(items, list):
117 for item in items:
118 if isinstance(item, dict):
119 # Each question/word/statement needs TTS for various fields
120 for text_field in ('question', 'text', 'hint', 'word',
121 'explanation', 'statement'):
122 if item.get(text_field):
123 reqs['tts'] += 1
125 # Music: 1 per game if category has BGM
126 if game_config.get('category') in ('english', 'math', 'science',
127 'lifeSkills', 'creativity'):
128 reqs['music'] = 1
130 return reqs
132 @staticmethod
133 def get_game_progress(db, game_id: str) -> Optional[Dict]:
134 """Get content generation progress for a game.
136 Returns:
137 {game_id, goal_id, status, progress_pct, delta_24h,
138 tasks: [{task_type, job_id, status, progress_pct}],
139 media_requirements, created_at, updated_at}
140 """
141 try:
142 from integrations.social.models import AgentGoal
144 goals = db.query(AgentGoal).filter(
145 AgentGoal.goal_type == 'content_gen',
146 ).all()
148 goal = None
149 for g in goals:
150 config = g.config_json or {}
151 if config.get('game_id') == game_id:
152 goal = g
153 break
155 if not goal:
156 return None
158 config = goal.config_json or {}
159 media_reqs = config.get('media_requirements', {})
160 task_jobs = config.get('task_jobs', {})
161 snapshots = config.get('progress_snapshots', [])
163 # Compute overall progress from task_jobs
164 total_assets = sum(media_reqs.values())
165 completed_assets = 0
166 tasks = []
168 # Map media_type → requirements key
169 _REQ_KEYS = {'image': 'images', 'tts': 'tts', 'music': 'music', 'video': 'video'}
171 for media_type in MEDIA_TYPES:
172 required = media_reqs.get(_REQ_KEYS.get(media_type, media_type), 0)
173 if required == 0:
174 continue
175 job_info = task_jobs.get(media_type, {})
176 task_progress = job_info.get('progress', 0)
177 task_completed = int(required * task_progress / 100) if task_progress else 0
178 completed_assets += task_completed
179 tasks.append({
180 'task_type': media_type,
181 'job_id': job_info.get('job_id'),
182 'status': job_info.get('status', 'pending'),
183 'progress_pct': task_progress,
184 'required': required,
185 'completed': task_completed,
186 'error': job_info.get('error'),
187 'updated_at': job_info.get('updated_at'),
188 })
190 progress_pct = round(completed_assets / total_assets * 100, 1) if total_assets else 0
192 # Compute 24h delta from snapshots
193 delta_24h = ContentGenTracker._compute_delta(snapshots, hours=24)
195 return {
196 'game_id': game_id,
197 'goal_id': goal.id,
198 'game_title': config.get('game_title', game_id),
199 'status': _classify_status(goal.status, progress_pct, delta_24h),
200 'progress_pct': progress_pct,
201 'delta_24h': delta_24h,
202 'tasks': tasks,
203 'media_requirements': media_reqs,
204 'created_at': goal.created_at.isoformat() if goal.created_at else None,
205 'updated_at': goal.updated_at.isoformat() if goal.updated_at else None,
206 }
207 except Exception as e:
208 logger.debug(f"ContentGenTracker.get_game_progress failed: {e}")
209 return None
211 @staticmethod
212 def _compute_delta(snapshots: list, hours: int = 24) -> float:
213 """Compute progress delta from snapshots over the given window."""
214 if not snapshots or len(snapshots) < 2:
215 return 0.0
217 now = datetime.utcnow()
218 cutoff = now - timedelta(hours=hours)
219 latest_pct = snapshots[-1].get('pct', 0)
221 # Find the snapshot closest to `hours` ago
222 best_snap = None
223 best_dist = float('inf')
224 for snap in snapshots:
225 try:
226 ts = datetime.fromisoformat(snap['ts'])
227 dist = abs((ts - cutoff).total_seconds())
228 if dist < best_dist:
229 best_dist = dist
230 best_snap = snap
231 except (KeyError, ValueError):
232 continue
234 if best_snap is None:
235 return 0.0
237 return round(latest_pct - best_snap.get('pct', 0), 1)
239 @staticmethod
240 def record_progress_snapshot(db, game_id: str):
241 """Append a progress snapshot for delta tracking.
243 Called periodically by the daemon. Keeps last MAX_SNAPSHOTS entries.
244 """
245 try:
246 from integrations.social.models import AgentGoal
248 goals = db.query(AgentGoal).filter(
249 AgentGoal.goal_type == 'content_gen',
250 ).all()
252 for goal in goals:
253 config = goal.config_json or {}
254 if config.get('game_id') != game_id:
255 continue
257 # Get current progress
258 progress = ContentGenTracker.get_game_progress(db, game_id)
259 if not progress:
260 return
262 snapshots = config.get('progress_snapshots', [])
263 snapshots.append({
264 'ts': datetime.utcnow().isoformat(),
265 'pct': progress['progress_pct'],
266 })
268 # Prune old snapshots
269 if len(snapshots) > MAX_SNAPSHOTS:
270 snapshots = snapshots[-MAX_SNAPSHOTS:]
272 config['progress_snapshots'] = snapshots
273 goal.config_json = config
274 flag_modified(goal, 'config_json')
275 db.flush()
276 return
277 except Exception as e:
278 logger.debug(f"ContentGenTracker.record_progress_snapshot failed: {e}")
280 @staticmethod
281 def update_task_job(db, game_id: str, media_type: str,
282 job_id: str = None, status: str = None,
283 progress: float = None, error: str = None):
284 """Update a specific media task's job info.
286 Called by media generation services when jobs start/progress/complete.
287 """
288 try:
289 from integrations.social.models import AgentGoal
291 goals = db.query(AgentGoal).filter(
292 AgentGoal.goal_type == 'content_gen',
293 ).all()
295 for goal in goals:
296 config = goal.config_json or {}
297 if config.get('game_id') != game_id:
298 continue
300 task_jobs = config.get('task_jobs', {})
301 job_info = task_jobs.get(media_type, {})
303 if job_id is not None:
304 job_info['job_id'] = job_id
305 if status is not None:
306 job_info['status'] = status
307 if progress is not None:
308 job_info['progress'] = progress
309 if error is not None:
310 job_info['error'] = error
311 job_info['updated_at'] = datetime.utcnow().isoformat()
313 task_jobs[media_type] = job_info
314 config['task_jobs'] = task_jobs
315 goal.config_json = config
316 flag_modified(goal, 'config_json')
317 db.flush()
318 return
319 except Exception as e:
320 logger.debug(f"ContentGenTracker.update_task_job failed: {e}")
322 @staticmethod
323 def get_stuck_games(db, stall_threshold_hours: int = 24) -> List[Dict]:
324 """Find games where progress hasn't changed in stall_threshold_hours."""
325 try:
326 from integrations.social.models import AgentGoal
328 stuck = []
329 goals = db.query(AgentGoal).filter(
330 AgentGoal.goal_type == 'content_gen',
331 AgentGoal.status == 'active',
332 ).all()
334 for goal in goals:
335 config = goal.config_json or {}
336 game_id = config.get('game_id')
337 if not game_id:
338 continue
340 progress = ContentGenTracker.get_game_progress(db, game_id)
341 if not progress:
342 continue
344 # Skip completed games
345 if progress['progress_pct'] >= 100:
346 continue
348 delta = progress['delta_24h']
349 if delta == 0:
350 # Check how long it's been stuck
351 snapshots = config.get('progress_snapshots', [])
352 stuck_hours = 0
353 if len(snapshots) >= 2:
354 try:
355 latest_ts = datetime.fromisoformat(snapshots[-1]['ts'])
356 stuck_hours = (datetime.utcnow() - latest_ts).total_seconds() / 3600
357 except (KeyError, ValueError):
358 pass
360 if stuck_hours >= stall_threshold_hours or len(snapshots) < 2:
361 stuck.append({
362 **progress,
363 'stuck_hours': round(stuck_hours, 1),
364 'stuck_tasks': [t for t in progress['tasks']
365 if t['status'] in ('failed', 'stuck', 'pending')
366 and t['progress_pct'] < 100],
367 })
368 return stuck
369 except Exception as e:
370 logger.debug(f"ContentGenTracker.get_stuck_games failed: {e}")
371 return []
373 @staticmethod
374 def attempt_unblock(db, game_id: str) -> Dict:
375 """Attempt to unblock a stuck game's content generation.
377 Strategy:
378 1. Retry failed tasks
379 2. Check if media services are running
380 3. Restart stalled services
381 4. Escalate if retry fails
383 Returns:
384 {action_taken, success, detail}
385 """
386 try:
387 from integrations.social.models import AgentGoal
389 progress = ContentGenTracker.get_game_progress(db, game_id)
390 if not progress:
391 return {'action_taken': None, 'success': False,
392 'detail': 'Game not found'}
394 stuck_tasks = [t for t in progress['tasks']
395 if t['status'] in ('failed', 'stuck')
396 and t['progress_pct'] < 100]
398 if not stuck_tasks:
399 return {'action_taken': None, 'success': True,
400 'detail': 'No stuck tasks'}
402 actions = []
403 for task in stuck_tasks:
404 media_type = task['task_type']
406 # 1. Check if the service is running
407 service_ok = _check_media_service(media_type)
408 if not service_ok:
409 restarted = _restart_media_service(media_type)
410 actions.append(f'restarted_{media_type}_service'
411 if restarted else f'failed_restart_{media_type}')
412 if not restarted:
413 continue
415 # 2. Retry the task
416 ContentGenTracker.update_task_job(
417 db, game_id, media_type,
418 status='retrying', error=None)
419 actions.append(f'retry_{media_type}')
421 return {
422 'action_taken': ', '.join(actions) if actions else None,
423 'success': len(actions) > 0,
424 'detail': f"Actions: {actions}",
425 }
426 except Exception as e:
427 return {'action_taken': None, 'success': False,
428 'detail': str(e)}
430 @staticmethod
431 def get_all_game_tasks(db) -> List[Dict]:
432 """Get all content_gen goals with per-task breakdown for admin dashboard."""
433 try:
434 from integrations.social.models import AgentGoal
436 result = []
437 goals = db.query(AgentGoal).filter(
438 AgentGoal.goal_type == 'content_gen',
439 ).order_by(AgentGoal.created_at.desc()).all()
441 for goal in goals:
442 config = goal.config_json or {}
443 game_id = config.get('game_id')
444 if not game_id:
445 continue
447 progress = ContentGenTracker.get_game_progress(db, game_id)
448 if progress:
449 result.append(progress)
450 return result
451 except Exception as e:
452 logger.debug(f"ContentGenTracker.get_all_game_tasks failed: {e}")
453 return []
455 @staticmethod
456 def get_services_health() -> Dict:
457 """Check health of all media generation services."""
458 services = {}
459 for svc_name in ('txt2img', 'tts_audio_suite', 'acestep', 'wan2gp', 'ltx2'):
460 services[svc_name] = _check_media_service(svc_name)
461 return services
464def _classify_status(goal_status: str, progress_pct: float, delta_24h: float) -> str:
465 """Classify a game's content gen status for display."""
466 if goal_status == 'completed' or progress_pct >= 100:
467 return 'complete'
468 if goal_status == 'paused':
469 return 'paused'
470 if delta_24h == 0 and progress_pct > 0:
471 return 'stuck'
472 if 0 < delta_24h < 5:
473 return 'slow'
474 if progress_pct == 0:
475 return 'pending'
476 return 'generating'
479def _check_media_service(media_type: str) -> bool:
480 """Check if a media generation service is available."""
481 try:
482 from integrations.service_tools.runtime_manager import RuntimeToolManager
483 manager = RuntimeToolManager.get_instance()
484 tool_map = {
485 'image': 'txt2img',
486 'txt2img': 'txt2img',
487 'tts': 'tts_audio_suite',
488 'tts_audio_suite': 'tts_audio_suite',
489 'music': 'acestep_generate',
490 'acestep': 'acestep_generate',
491 'video': 'wan2gp_generate',
492 'wan2gp': 'wan2gp_generate',
493 'ltx2': 'ltx2_generate',
494 }
495 tool_name = tool_map.get(media_type, media_type)
496 return manager.is_tool_running(tool_name)
497 except Exception:
498 return False
501def _restart_media_service(media_type: str) -> bool:
502 """Attempt to restart a media generation service."""
503 try:
504 from integrations.service_tools.runtime_manager import RuntimeToolManager
505 manager = RuntimeToolManager.get_instance()
506 tool_map = {
507 'image': 'txt2img',
508 'tts': 'tts_audio_suite',
509 'music': 'acestep_generate',
510 'video': 'wan2gp_generate',
511 }
512 tool_name = tool_map.get(media_type, media_type)
513 result = manager.ensure_tool_running(tool_name)
514 return result is not None
515 except Exception:
516 return False