Coverage for integrations / agent_lightning / tracer.py: 24.8%

133 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Agent Lightning Tracer 

3 

4Automatic tracing for agent interactions. 

5Captures prompts, tool calls, rewards, and outcomes. 

6""" 

7 

8import logging 

9import time 

10import uuid 

11from typing import Any, Dict, Optional, List 

12from datetime import datetime 

13from collections import defaultdict 

14 

15from .config import AGENT_LIGHTNING_CONFIG 

16 

17logger = logging.getLogger(__name__) 

18 

19# Global tracing state 

20_global_tracing_enabled = False 

21_active_spans = {} 

22 

23 

24class Span: 

25 """Represents a single traced interaction""" 

26 

27 def __init__( 

28 self, 

29 span_id: str, 

30 agent_id: str, 

31 span_type: str, 

32 context: Optional[Dict] = None 

33 ): 

34 self.span_id = span_id 

35 self.agent_id = agent_id 

36 self.span_type = span_type 

37 self.context = context or {} 

38 self.start_time = time.time() 

39 self.end_time = None 

40 self.status = 'in_progress' 

41 self.events = [] 

42 self.result = None 

43 

44 def add_event(self, event_type: str, data: Dict): 

45 """Add event to span""" 

46 self.events.append({ 

47 'type': event_type, 

48 'timestamp': time.time(), 

49 'data': data 

50 }) 

51 

52 def end(self, status: str, result: Any = None): 

53 """End the span""" 

54 self.end_time = time.time() 

55 self.status = status 

56 self.result = result 

57 

58 def to_dict(self) -> Dict: 

59 """Convert to dictionary""" 

60 return { 

61 'span_id': self.span_id, 

62 'agent_id': self.agent_id, 

63 'span_type': self.span_type, 

64 'context': self.context, 

65 'start_time': self.start_time, 

66 'end_time': self.end_time, 

67 'duration': self.end_time - self.start_time if self.end_time else None, 

68 'status': self.status, 

69 'events': self.events, 

70 'result': str(self.result)[:500] if self.result else None 

71 } 

72 

73 

74class LightningTracer: 

75 """ 

76 Automatic tracer for agent interactions 

77 

78 Captures: 

79 - Prompts sent to LLM 

80 - Tool executions 

81 - Rewards and outcomes 

82 - Performance metrics 

83 """ 

84 

85 def __init__(self, agent_id: str): 

86 self.agent_id = agent_id 

87 self.spans = {} 

88 self.stats = defaultdict(int) 

89 self.enabled = AGENT_LIGHTNING_CONFIG['monitoring']['enabled'] 

90 

91 logger.info(f"LightningTracer initialized for {agent_id}") 

92 

93 def start_span( 

94 self, 

95 span_type: str, 

96 context: Optional[Dict] = None 

97 ) -> str: 

98 """ 

99 Start a new span 

100 

101 Args: 

102 span_type: Type of span (e.g., 'generate_reply', 'tool_call') 

103 context: Optional context data 

104 

105 Returns: 

106 Span ID 

107 """ 

108 if not self.enabled: 

109 return None 

110 

111 span_id = f"{self.agent_id}_{uuid.uuid4().hex[:12]}" 

112 

113 span = Span( 

114 span_id=span_id, 

115 agent_id=self.agent_id, 

116 span_type=span_type, 

117 context=context 

118 ) 

119 

120 self.spans[span_id] = span 

121 _active_spans[span_id] = span 

122 

123 self.stats['spans_started'] += 1 

124 

125 logger.debug(f"Started span: {span_id} (type: {span_type})") 

126 

127 return span_id 

128 

129 def end_span( 

130 self, 

131 span_id: str, 

132 status: str, 

133 result: Any = None 

134 ): 

135 """ 

136 End a span 

137 

138 Args: 

139 span_id: Span ID 

140 status: Status ('success', 'error', etc.) 

141 result: Optional result data 

142 """ 

143 if not self.enabled or not span_id: 

144 return 

145 

146 span = self.spans.get(span_id) 

147 if not span: 

148 logger.warning(f"Span not found: {span_id}") 

149 return 

150 

151 span.end(status, result) 

152 

153 # Remove from active spans 

154 _active_spans.pop(span_id, None) 

155 

156 self.stats['spans_completed'] += 1 

157 self.stats[f'spans_{status}'] += 1 

158 

159 logger.debug(f"Ended span: {span_id} (status: {status})") 

160 

161 # Emit to EventBus so FederatedAggregator and WorldModelBridge 

162 # can consume Agent Lightning traces (closes gap #337) 

163 try: 

164 from core.platform.events import emit_event 

165 emit_event('inference.completed', { 

166 'source': 'agent_lightning', 

167 'span_id': span.span_id, 

168 'agent_id': span.agent_id, 

169 'span_type': span.span_type, 

170 'duration_ms': (span.end_time - span.start_time) * 1000, 

171 'status': status, 

172 'success': status == 'success', 

173 }) 

174 except Exception: 

175 pass # Best-effort — never break tracing for event emission 

176 

177 # Save span if configured 

178 if AGENT_LIGHTNING_CONFIG['monitoring']['save_traces']: 

179 self._save_span(span) 

180 

181 def emit_prompt( 

182 self, 

183 span_id: str, 

184 prompt: str, 

185 context: Optional[Dict] = None 

186 ): 

187 """ 

188 Emit prompt event 

189 

190 Args: 

191 span_id: Span ID 

192 prompt: Prompt text 

193 context: Optional context 

194 """ 

195 if not self.enabled or not span_id: 

196 return 

197 

198 span = self.spans.get(span_id) 

199 if not span: 

200 return 

201 

202 span.add_event('prompt', { 

203 'prompt': prompt[:500], # Truncate long prompts 

204 'context': context or {} 

205 }) 

206 

207 self.stats['prompts_emitted'] += 1 

208 

209 def emit_response( 

210 self, 

211 span_id: str, 

212 response: str, 

213 context: Optional[Dict] = None 

214 ): 

215 """ 

216 Emit response event 

217 

218 Args: 

219 span_id: Span ID 

220 response: Response text 

221 context: Optional context 

222 """ 

223 if not self.enabled or not span_id: 

224 return 

225 

226 span = self.spans.get(span_id) 

227 if not span: 

228 return 

229 

230 span.add_event('response', { 

231 'response': response[:500], # Truncate long responses 

232 'context': context or {} 

233 }) 

234 

235 self.stats['responses_emitted'] += 1 

236 

237 def emit_tool_call( 

238 self, 

239 span_id: str, 

240 tool_name: str, 

241 tool_args: str, 

242 context: Optional[Dict] = None 

243 ): 

244 """ 

245 Emit tool call event 

246 

247 Args: 

248 span_id: Span ID 

249 tool_name: Tool name 

250 tool_args: Tool arguments 

251 context: Optional context 

252 """ 

253 if not self.enabled or not span_id: 

254 return 

255 

256 span = self.spans.get(span_id) 

257 if not span: 

258 return 

259 

260 span.add_event('tool_call', { 

261 'tool_name': tool_name, 

262 'tool_args': tool_args[:200], 

263 'context': context or {} 

264 }) 

265 

266 self.stats['tool_calls_emitted'] += 1 

267 self.stats[f'tool_{tool_name}'] += 1 

268 

269 def emit_reward( 

270 self, 

271 span_id: str, 

272 reward: float, 

273 context: Optional[Dict] = None 

274 ): 

275 """ 

276 Emit reward event 

277 

278 Args: 

279 span_id: Span ID 

280 reward: Reward value 

281 context: Optional context 

282 """ 

283 if not self.enabled or not span_id: 

284 return 

285 

286 span = self.spans.get(span_id) 

287 if not span: 

288 return 

289 

290 span.add_event('reward', { 

291 'reward': reward, 

292 'context': context or {} 

293 }) 

294 

295 self.stats['rewards_emitted'] += 1 

296 self.stats['total_reward'] = self.stats.get('total_reward', 0) + reward 

297 

298 def _save_span(self, span: Span): 

299 """Save span to storage""" 

300 try: 

301 import os 

302 import json 

303 

304 # Get traces path from config — resolve to user data dir (not CWD which 

305 # may be read-only C:\Program Files\ in installed builds) 

306 traces_path = AGENT_LIGHTNING_CONFIG.get('traces_path', '') 

307 if not traces_path or traces_path.startswith('./'): 

308 try: 

309 from core.platform_paths import get_agent_data_dir 

310 traces_path = os.path.join(get_agent_data_dir(), 'lightning_traces') 

311 except ImportError: 

312 traces_path = os.path.join( 

313 os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'lightning_traces') 

314 os.makedirs(traces_path, exist_ok=True) 

315 

316 # Save span as JSON 

317 filename = f"{traces_path}/{span.span_id}.json" 

318 with open(filename, 'w') as f: 

319 json.dump(span.to_dict(), f, indent=2) 

320 

321 logger.debug(f"Saved span to {filename}") 

322 

323 except Exception as e: 

324 logger.error(f"Error saving span: {e}") 

325 

326 def get_span(self, span_id: str) -> Optional[Span]: 

327 """Get span by ID""" 

328 return self.spans.get(span_id) 

329 

330 def get_active_spans(self) -> List[Span]: 

331 """Get all active spans""" 

332 return [s for s in self.spans.values() if s.status == 'in_progress'] 

333 

334 def get_statistics(self) -> Dict: 

335 """Get tracer statistics""" 

336 return dict(self.stats) 

337 

338 def clear(self): 

339 """Clear all spans""" 

340 self.spans.clear() 

341 logger.info(f"Cleared all spans for {self.agent_id}") 

342 

343 

344# Global functions for auto-tracing 

345 

346def enable_auto_tracing(): 

347 """Enable automatic tracing globally""" 

348 global _global_tracing_enabled 

349 _global_tracing_enabled = True 

350 logger.info("Global auto-tracing enabled") 

351 

352 

353def disable_auto_tracing(): 

354 """Disable automatic tracing globally""" 

355 global _global_tracing_enabled 

356 _global_tracing_enabled = False 

357 logger.info("Global auto-tracing disabled") 

358 

359 

360def is_auto_tracing_enabled() -> bool: 

361 """Check if auto-tracing is enabled""" 

362 return _global_tracing_enabled 

363 

364 

365def get_active_span() -> Optional[Span]: 

366 """Get currently active span (if any)""" 

367 if not _active_spans: 

368 return None 

369 # Return most recent span 

370 return list(_active_spans.values())[-1] 

371 

372 

373__all__ = [ 

374 'LightningTracer', 

375 'Span', 

376 'enable_auto_tracing', 

377 'disable_auto_tracing', 

378 'is_auto_tracing_enabled', 

379 'get_active_span', 

380]