Coverage for integrations / agent_engine / content_gen_tracker.py: 91.3%

254 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Content Generation Tracker — per-game content gen task tracking. 

3 

4Links games to AgentGoal (goal_type='content_gen') with SmartLedger subtasks. 

5Tracks progress snapshots for 24h delta computation. Detects stuck jobs. 

6""" 

7import json 

8import logging 

9import os 

10import time 

11from datetime import datetime, timedelta 

12from typing import Dict, List, Optional 

13 

14from sqlalchemy.orm.attributes import flag_modified 

15 

16logger = logging.getLogger('hevolve_social') 

17 

18# Media types that a game can require 

19MEDIA_TYPES = ('image', 'tts', 'music', 'video') 

20 

21# Max snapshot history (7 days of hourly snapshots) 

22MAX_SNAPSHOTS = 168 

23 

24 

25class ContentGenTracker: 

26 """Track content generation tasks per game using AgentGoal + SmartLedger.""" 

27 

28 @staticmethod 

29 def get_or_create_game_goal(db, game_id: str, game_config: dict) -> Optional[Dict]: 

30 """Find existing content_gen goal for game or create one. 

31 

32 Uses config_json.game_id for lookup. Idempotent. 

33 

34 Args: 

35 db: SQLAlchemy session 

36 game_id: Unique game identifier (e.g. 'eng-spell-animals-01') 

37 game_config: Game configuration dict with content requirements 

38 

39 Returns: 

40 Goal dict or None on error 

41 """ 

42 try: 

43 from integrations.social.models import AgentGoal 

44 from .goal_manager import GoalManager 

45 

46 # Check for existing goal 

47 existing = db.query(AgentGoal).filter( 

48 AgentGoal.goal_type == 'content_gen', 

49 AgentGoal.status.in_(['active', 'paused']), 

50 ).all() 

51 

52 for goal in existing: 

53 config = goal.config_json or {} 

54 if config.get('game_id') == game_id: 

55 return goal.to_dict() 

56 

57 # Compute media requirements from game config 

58 media_reqs = ContentGenTracker._compute_media_requirements(game_config) 

59 

60 # Create new goal 

61 config_json = { 

62 'game_id': game_id, 

63 'game_title': game_config.get('title', game_id), 

64 'media_requirements': media_reqs, 

65 'progress_snapshots': [], 

66 'task_jobs': {}, # {task_type: {job_id, status, progress}} 

67 } 

68 

69 result = GoalManager.create_goal( 

70 db, 

71 goal_type='content_gen', 

72 title=f"Content generation: {game_config.get('title', game_id)}", 

73 description=f"Generate media assets for game {game_id}: " 

74 f"{json.dumps(media_reqs)}", 

75 owner_id=None, 

76 config_json=config_json, 

77 spark_budget=100, 

78 ) 

79 if result: 

80 db.flush() 

81 return result 

82 except Exception as e: 

83 logger.debug(f"ContentGenTracker.get_or_create_game_goal failed: {e}") 

84 return None 

85 

86 @staticmethod 

87 def _compute_media_requirements(game_config: dict) -> Dict: 

88 """Compute how many media assets a game needs. 

89 

90 Walks the game config to count image prompts, text for TTS, etc. 

91 """ 

92 reqs = {'images': 0, 'tts': 0, 'music': 0, 'video': 0} 

93 content = game_config.get('content', {}) 

94 

95 # Count image prompts 

96 for key in ('questions', 'words', 'pairs', 'rounds', 'items', 

97 'sequences', 'statements', 'sentences', 'scenes'): 

98 items = content.get(key, []) 

99 if isinstance(items, list): 

100 for item in items: 

101 if isinstance(item, dict): 

102 if item.get('imagePrompt') or item.get('image_prompt'): 

103 reqs['images'] += 1 

104 # Count nested items (e.g., options with images) 

105 for sub_key in ('options', 'cards', 'items'): 

106 sub_items = item.get(sub_key, []) 

107 if isinstance(sub_items, list): 

108 for sub in sub_items: 

109 if isinstance(sub, dict) and ( 

110 sub.get('imagePrompt') or sub.get('image_prompt')): 

111 reqs['images'] += 1 

112 

113 # Count TTS text segments 

114 for key in ('questions', 'words', 'statements', 'sentences'): 

115 items = content.get(key, []) 

116 if isinstance(items, list): 

117 for item in items: 

118 if isinstance(item, dict): 

119 # Each question/word/statement needs TTS for various fields 

120 for text_field in ('question', 'text', 'hint', 'word', 

121 'explanation', 'statement'): 

122 if item.get(text_field): 

123 reqs['tts'] += 1 

124 

125 # Music: 1 per game if category has BGM 

126 if game_config.get('category') in ('english', 'math', 'science', 

127 'lifeSkills', 'creativity'): 

128 reqs['music'] = 1 

129 

130 return reqs 

131 

132 @staticmethod 

133 def get_game_progress(db, game_id: str) -> Optional[Dict]: 

134 """Get content generation progress for a game. 

135 

136 Returns: 

137 {game_id, goal_id, status, progress_pct, delta_24h, 

138 tasks: [{task_type, job_id, status, progress_pct}], 

139 media_requirements, created_at, updated_at} 

140 """ 

141 try: 

142 from integrations.social.models import AgentGoal 

143 

144 goals = db.query(AgentGoal).filter( 

145 AgentGoal.goal_type == 'content_gen', 

146 ).all() 

147 

148 goal = None 

149 for g in goals: 

150 config = g.config_json or {} 

151 if config.get('game_id') == game_id: 

152 goal = g 

153 break 

154 

155 if not goal: 

156 return None 

157 

158 config = goal.config_json or {} 

159 media_reqs = config.get('media_requirements', {}) 

160 task_jobs = config.get('task_jobs', {}) 

161 snapshots = config.get('progress_snapshots', []) 

162 

163 # Compute overall progress from task_jobs 

164 total_assets = sum(media_reqs.values()) 

165 completed_assets = 0 

166 tasks = [] 

167 

168 # Map media_type → requirements key 

169 _REQ_KEYS = {'image': 'images', 'tts': 'tts', 'music': 'music', 'video': 'video'} 

170 

171 for media_type in MEDIA_TYPES: 

172 required = media_reqs.get(_REQ_KEYS.get(media_type, media_type), 0) 

173 if required == 0: 

174 continue 

175 job_info = task_jobs.get(media_type, {}) 

176 task_progress = job_info.get('progress', 0) 

177 task_completed = int(required * task_progress / 100) if task_progress else 0 

178 completed_assets += task_completed 

179 tasks.append({ 

180 'task_type': media_type, 

181 'job_id': job_info.get('job_id'), 

182 'status': job_info.get('status', 'pending'), 

183 'progress_pct': task_progress, 

184 'required': required, 

185 'completed': task_completed, 

186 'error': job_info.get('error'), 

187 'updated_at': job_info.get('updated_at'), 

188 }) 

189 

190 progress_pct = round(completed_assets / total_assets * 100, 1) if total_assets else 0 

191 

192 # Compute 24h delta from snapshots 

193 delta_24h = ContentGenTracker._compute_delta(snapshots, hours=24) 

194 

195 return { 

196 'game_id': game_id, 

197 'goal_id': goal.id, 

198 'game_title': config.get('game_title', game_id), 

199 'status': _classify_status(goal.status, progress_pct, delta_24h), 

200 'progress_pct': progress_pct, 

201 'delta_24h': delta_24h, 

202 'tasks': tasks, 

203 'media_requirements': media_reqs, 

204 'created_at': goal.created_at.isoformat() if goal.created_at else None, 

205 'updated_at': goal.updated_at.isoformat() if goal.updated_at else None, 

206 } 

207 except Exception as e: 

208 logger.debug(f"ContentGenTracker.get_game_progress failed: {e}") 

209 return None 

210 

211 @staticmethod 

212 def _compute_delta(snapshots: list, hours: int = 24) -> float: 

213 """Compute progress delta from snapshots over the given window.""" 

214 if not snapshots or len(snapshots) < 2: 

215 return 0.0 

216 

217 now = datetime.utcnow() 

218 cutoff = now - timedelta(hours=hours) 

219 latest_pct = snapshots[-1].get('pct', 0) 

220 

221 # Find the snapshot closest to `hours` ago 

222 best_snap = None 

223 best_dist = float('inf') 

224 for snap in snapshots: 

225 try: 

226 ts = datetime.fromisoformat(snap['ts']) 

227 dist = abs((ts - cutoff).total_seconds()) 

228 if dist < best_dist: 

229 best_dist = dist 

230 best_snap = snap 

231 except (KeyError, ValueError): 

232 continue 

233 

234 if best_snap is None: 

235 return 0.0 

236 

237 return round(latest_pct - best_snap.get('pct', 0), 1) 

238 

239 @staticmethod 

240 def record_progress_snapshot(db, game_id: str): 

241 """Append a progress snapshot for delta tracking. 

242 

243 Called periodically by the daemon. Keeps last MAX_SNAPSHOTS entries. 

244 """ 

245 try: 

246 from integrations.social.models import AgentGoal 

247 

248 goals = db.query(AgentGoal).filter( 

249 AgentGoal.goal_type == 'content_gen', 

250 ).all() 

251 

252 for goal in goals: 

253 config = goal.config_json or {} 

254 if config.get('game_id') != game_id: 

255 continue 

256 

257 # Get current progress 

258 progress = ContentGenTracker.get_game_progress(db, game_id) 

259 if not progress: 

260 return 

261 

262 snapshots = config.get('progress_snapshots', []) 

263 snapshots.append({ 

264 'ts': datetime.utcnow().isoformat(), 

265 'pct': progress['progress_pct'], 

266 }) 

267 

268 # Prune old snapshots 

269 if len(snapshots) > MAX_SNAPSHOTS: 

270 snapshots = snapshots[-MAX_SNAPSHOTS:] 

271 

272 config['progress_snapshots'] = snapshots 

273 goal.config_json = config 

274 flag_modified(goal, 'config_json') 

275 db.flush() 

276 return 

277 except Exception as e: 

278 logger.debug(f"ContentGenTracker.record_progress_snapshot failed: {e}") 

279 

280 @staticmethod 

281 def update_task_job(db, game_id: str, media_type: str, 

282 job_id: str = None, status: str = None, 

283 progress: float = None, error: str = None): 

284 """Update a specific media task's job info. 

285 

286 Called by media generation services when jobs start/progress/complete. 

287 """ 

288 try: 

289 from integrations.social.models import AgentGoal 

290 

291 goals = db.query(AgentGoal).filter( 

292 AgentGoal.goal_type == 'content_gen', 

293 ).all() 

294 

295 for goal in goals: 

296 config = goal.config_json or {} 

297 if config.get('game_id') != game_id: 

298 continue 

299 

300 task_jobs = config.get('task_jobs', {}) 

301 job_info = task_jobs.get(media_type, {}) 

302 

303 if job_id is not None: 

304 job_info['job_id'] = job_id 

305 if status is not None: 

306 job_info['status'] = status 

307 if progress is not None: 

308 job_info['progress'] = progress 

309 if error is not None: 

310 job_info['error'] = error 

311 job_info['updated_at'] = datetime.utcnow().isoformat() 

312 

313 task_jobs[media_type] = job_info 

314 config['task_jobs'] = task_jobs 

315 goal.config_json = config 

316 flag_modified(goal, 'config_json') 

317 db.flush() 

318 return 

319 except Exception as e: 

320 logger.debug(f"ContentGenTracker.update_task_job failed: {e}") 

321 

322 @staticmethod 

323 def get_stuck_games(db, stall_threshold_hours: int = 24) -> List[Dict]: 

324 """Find games where progress hasn't changed in stall_threshold_hours.""" 

325 try: 

326 from integrations.social.models import AgentGoal 

327 

328 stuck = [] 

329 goals = db.query(AgentGoal).filter( 

330 AgentGoal.goal_type == 'content_gen', 

331 AgentGoal.status == 'active', 

332 ).all() 

333 

334 for goal in goals: 

335 config = goal.config_json or {} 

336 game_id = config.get('game_id') 

337 if not game_id: 

338 continue 

339 

340 progress = ContentGenTracker.get_game_progress(db, game_id) 

341 if not progress: 

342 continue 

343 

344 # Skip completed games 

345 if progress['progress_pct'] >= 100: 

346 continue 

347 

348 delta = progress['delta_24h'] 

349 if delta == 0: 

350 # Check how long it's been stuck 

351 snapshots = config.get('progress_snapshots', []) 

352 stuck_hours = 0 

353 if len(snapshots) >= 2: 

354 try: 

355 latest_ts = datetime.fromisoformat(snapshots[-1]['ts']) 

356 stuck_hours = (datetime.utcnow() - latest_ts).total_seconds() / 3600 

357 except (KeyError, ValueError): 

358 pass 

359 

360 if stuck_hours >= stall_threshold_hours or len(snapshots) < 2: 

361 stuck.append({ 

362 **progress, 

363 'stuck_hours': round(stuck_hours, 1), 

364 'stuck_tasks': [t for t in progress['tasks'] 

365 if t['status'] in ('failed', 'stuck', 'pending') 

366 and t['progress_pct'] < 100], 

367 }) 

368 return stuck 

369 except Exception as e: 

370 logger.debug(f"ContentGenTracker.get_stuck_games failed: {e}") 

371 return [] 

372 

373 @staticmethod 

374 def attempt_unblock(db, game_id: str) -> Dict: 

375 """Attempt to unblock a stuck game's content generation. 

376 

377 Strategy: 

378 1. Retry failed tasks 

379 2. Check if media services are running 

380 3. Restart stalled services 

381 4. Escalate if retry fails 

382 

383 Returns: 

384 {action_taken, success, detail} 

385 """ 

386 try: 

387 from integrations.social.models import AgentGoal 

388 

389 progress = ContentGenTracker.get_game_progress(db, game_id) 

390 if not progress: 

391 return {'action_taken': None, 'success': False, 

392 'detail': 'Game not found'} 

393 

394 stuck_tasks = [t for t in progress['tasks'] 

395 if t['status'] in ('failed', 'stuck') 

396 and t['progress_pct'] < 100] 

397 

398 if not stuck_tasks: 

399 return {'action_taken': None, 'success': True, 

400 'detail': 'No stuck tasks'} 

401 

402 actions = [] 

403 for task in stuck_tasks: 

404 media_type = task['task_type'] 

405 

406 # 1. Check if the service is running 

407 service_ok = _check_media_service(media_type) 

408 if not service_ok: 

409 restarted = _restart_media_service(media_type) 

410 actions.append(f'restarted_{media_type}_service' 

411 if restarted else f'failed_restart_{media_type}') 

412 if not restarted: 

413 continue 

414 

415 # 2. Retry the task 

416 ContentGenTracker.update_task_job( 

417 db, game_id, media_type, 

418 status='retrying', error=None) 

419 actions.append(f'retry_{media_type}') 

420 

421 return { 

422 'action_taken': ', '.join(actions) if actions else None, 

423 'success': len(actions) > 0, 

424 'detail': f"Actions: {actions}", 

425 } 

426 except Exception as e: 

427 return {'action_taken': None, 'success': False, 

428 'detail': str(e)} 

429 

430 @staticmethod 

431 def get_all_game_tasks(db) -> List[Dict]: 

432 """Get all content_gen goals with per-task breakdown for admin dashboard.""" 

433 try: 

434 from integrations.social.models import AgentGoal 

435 

436 result = [] 

437 goals = db.query(AgentGoal).filter( 

438 AgentGoal.goal_type == 'content_gen', 

439 ).order_by(AgentGoal.created_at.desc()).all() 

440 

441 for goal in goals: 

442 config = goal.config_json or {} 

443 game_id = config.get('game_id') 

444 if not game_id: 

445 continue 

446 

447 progress = ContentGenTracker.get_game_progress(db, game_id) 

448 if progress: 

449 result.append(progress) 

450 return result 

451 except Exception as e: 

452 logger.debug(f"ContentGenTracker.get_all_game_tasks failed: {e}") 

453 return [] 

454 

455 @staticmethod 

456 def get_services_health() -> Dict: 

457 """Check health of all media generation services.""" 

458 services = {} 

459 for svc_name in ('txt2img', 'tts_audio_suite', 'acestep', 'wan2gp', 'ltx2'): 

460 services[svc_name] = _check_media_service(svc_name) 

461 return services 

462 

463 

464def _classify_status(goal_status: str, progress_pct: float, delta_24h: float) -> str: 

465 """Classify a game's content gen status for display.""" 

466 if goal_status == 'completed' or progress_pct >= 100: 

467 return 'complete' 

468 if goal_status == 'paused': 

469 return 'paused' 

470 if delta_24h == 0 and progress_pct > 0: 

471 return 'stuck' 

472 if 0 < delta_24h < 5: 

473 return 'slow' 

474 if progress_pct == 0: 

475 return 'pending' 

476 return 'generating' 

477 

478 

479def _check_media_service(media_type: str) -> bool: 

480 """Check if a media generation service is available.""" 

481 try: 

482 from integrations.service_tools.runtime_manager import RuntimeToolManager 

483 manager = RuntimeToolManager.get_instance() 

484 tool_map = { 

485 'image': 'txt2img', 

486 'txt2img': 'txt2img', 

487 'tts': 'tts_audio_suite', 

488 'tts_audio_suite': 'tts_audio_suite', 

489 'music': 'acestep_generate', 

490 'acestep': 'acestep_generate', 

491 'video': 'wan2gp_generate', 

492 'wan2gp': 'wan2gp_generate', 

493 'ltx2': 'ltx2_generate', 

494 } 

495 tool_name = tool_map.get(media_type, media_type) 

496 return manager.is_tool_running(tool_name) 

497 except Exception: 

498 return False 

499 

500 

501def _restart_media_service(media_type: str) -> bool: 

502 """Attempt to restart a media generation service.""" 

503 try: 

504 from integrations.service_tools.runtime_manager import RuntimeToolManager 

505 manager = RuntimeToolManager.get_instance() 

506 tool_map = { 

507 'image': 'txt2img', 

508 'tts': 'tts_audio_suite', 

509 'music': 'acestep_generate', 

510 'video': 'wan2gp_generate', 

511 } 

512 tool_name = tool_map.get(media_type, media_type) 

513 result = manager.ensure_tool_running(tool_name) 

514 return result is not None 

515 except Exception: 

516 return False