Coverage for integrations / agent_lightning / tracer.py: 24.8%
133 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Agent Lightning Tracer
4Automatic tracing for agent interactions.
5Captures prompts, tool calls, rewards, and outcomes.
6"""
8import logging
9import time
10import uuid
11from typing import Any, Dict, Optional, List
12from datetime import datetime
13from collections import defaultdict
15from .config import AGENT_LIGHTNING_CONFIG
17logger = logging.getLogger(__name__)
19# Global tracing state
20_global_tracing_enabled = False
21_active_spans = {}
24class Span:
25 """Represents a single traced interaction"""
27 def __init__(
28 self,
29 span_id: str,
30 agent_id: str,
31 span_type: str,
32 context: Optional[Dict] = None
33 ):
34 self.span_id = span_id
35 self.agent_id = agent_id
36 self.span_type = span_type
37 self.context = context or {}
38 self.start_time = time.time()
39 self.end_time = None
40 self.status = 'in_progress'
41 self.events = []
42 self.result = None
44 def add_event(self, event_type: str, data: Dict):
45 """Add event to span"""
46 self.events.append({
47 'type': event_type,
48 'timestamp': time.time(),
49 'data': data
50 })
52 def end(self, status: str, result: Any = None):
53 """End the span"""
54 self.end_time = time.time()
55 self.status = status
56 self.result = result
58 def to_dict(self) -> Dict:
59 """Convert to dictionary"""
60 return {
61 'span_id': self.span_id,
62 'agent_id': self.agent_id,
63 'span_type': self.span_type,
64 'context': self.context,
65 'start_time': self.start_time,
66 'end_time': self.end_time,
67 'duration': self.end_time - self.start_time if self.end_time else None,
68 'status': self.status,
69 'events': self.events,
70 'result': str(self.result)[:500] if self.result else None
71 }
74class LightningTracer:
75 """
76 Automatic tracer for agent interactions
78 Captures:
79 - Prompts sent to LLM
80 - Tool executions
81 - Rewards and outcomes
82 - Performance metrics
83 """
85 def __init__(self, agent_id: str):
86 self.agent_id = agent_id
87 self.spans = {}
88 self.stats = defaultdict(int)
89 self.enabled = AGENT_LIGHTNING_CONFIG['monitoring']['enabled']
91 logger.info(f"LightningTracer initialized for {agent_id}")
93 def start_span(
94 self,
95 span_type: str,
96 context: Optional[Dict] = None
97 ) -> str:
98 """
99 Start a new span
101 Args:
102 span_type: Type of span (e.g., 'generate_reply', 'tool_call')
103 context: Optional context data
105 Returns:
106 Span ID
107 """
108 if not self.enabled:
109 return None
111 span_id = f"{self.agent_id}_{uuid.uuid4().hex[:12]}"
113 span = Span(
114 span_id=span_id,
115 agent_id=self.agent_id,
116 span_type=span_type,
117 context=context
118 )
120 self.spans[span_id] = span
121 _active_spans[span_id] = span
123 self.stats['spans_started'] += 1
125 logger.debug(f"Started span: {span_id} (type: {span_type})")
127 return span_id
129 def end_span(
130 self,
131 span_id: str,
132 status: str,
133 result: Any = None
134 ):
135 """
136 End a span
138 Args:
139 span_id: Span ID
140 status: Status ('success', 'error', etc.)
141 result: Optional result data
142 """
143 if not self.enabled or not span_id:
144 return
146 span = self.spans.get(span_id)
147 if not span:
148 logger.warning(f"Span not found: {span_id}")
149 return
151 span.end(status, result)
153 # Remove from active spans
154 _active_spans.pop(span_id, None)
156 self.stats['spans_completed'] += 1
157 self.stats[f'spans_{status}'] += 1
159 logger.debug(f"Ended span: {span_id} (status: {status})")
161 # Emit to EventBus so FederatedAggregator and WorldModelBridge
162 # can consume Agent Lightning traces (closes gap #337)
163 try:
164 from core.platform.events import emit_event
165 emit_event('inference.completed', {
166 'source': 'agent_lightning',
167 'span_id': span.span_id,
168 'agent_id': span.agent_id,
169 'span_type': span.span_type,
170 'duration_ms': (span.end_time - span.start_time) * 1000,
171 'status': status,
172 'success': status == 'success',
173 })
174 except Exception:
175 pass # Best-effort — never break tracing for event emission
177 # Save span if configured
178 if AGENT_LIGHTNING_CONFIG['monitoring']['save_traces']:
179 self._save_span(span)
181 def emit_prompt(
182 self,
183 span_id: str,
184 prompt: str,
185 context: Optional[Dict] = None
186 ):
187 """
188 Emit prompt event
190 Args:
191 span_id: Span ID
192 prompt: Prompt text
193 context: Optional context
194 """
195 if not self.enabled or not span_id:
196 return
198 span = self.spans.get(span_id)
199 if not span:
200 return
202 span.add_event('prompt', {
203 'prompt': prompt[:500], # Truncate long prompts
204 'context': context or {}
205 })
207 self.stats['prompts_emitted'] += 1
209 def emit_response(
210 self,
211 span_id: str,
212 response: str,
213 context: Optional[Dict] = None
214 ):
215 """
216 Emit response event
218 Args:
219 span_id: Span ID
220 response: Response text
221 context: Optional context
222 """
223 if not self.enabled or not span_id:
224 return
226 span = self.spans.get(span_id)
227 if not span:
228 return
230 span.add_event('response', {
231 'response': response[:500], # Truncate long responses
232 'context': context or {}
233 })
235 self.stats['responses_emitted'] += 1
237 def emit_tool_call(
238 self,
239 span_id: str,
240 tool_name: str,
241 tool_args: str,
242 context: Optional[Dict] = None
243 ):
244 """
245 Emit tool call event
247 Args:
248 span_id: Span ID
249 tool_name: Tool name
250 tool_args: Tool arguments
251 context: Optional context
252 """
253 if not self.enabled or not span_id:
254 return
256 span = self.spans.get(span_id)
257 if not span:
258 return
260 span.add_event('tool_call', {
261 'tool_name': tool_name,
262 'tool_args': tool_args[:200],
263 'context': context or {}
264 })
266 self.stats['tool_calls_emitted'] += 1
267 self.stats[f'tool_{tool_name}'] += 1
269 def emit_reward(
270 self,
271 span_id: str,
272 reward: float,
273 context: Optional[Dict] = None
274 ):
275 """
276 Emit reward event
278 Args:
279 span_id: Span ID
280 reward: Reward value
281 context: Optional context
282 """
283 if not self.enabled or not span_id:
284 return
286 span = self.spans.get(span_id)
287 if not span:
288 return
290 span.add_event('reward', {
291 'reward': reward,
292 'context': context or {}
293 })
295 self.stats['rewards_emitted'] += 1
296 self.stats['total_reward'] = self.stats.get('total_reward', 0) + reward
298 def _save_span(self, span: Span):
299 """Save span to storage"""
300 try:
301 import os
302 import json
304 # Get traces path from config — resolve to user data dir (not CWD which
305 # may be read-only C:\Program Files\ in installed builds)
306 traces_path = AGENT_LIGHTNING_CONFIG.get('traces_path', '')
307 if not traces_path or traces_path.startswith('./'):
308 try:
309 from core.platform_paths import get_agent_data_dir
310 traces_path = os.path.join(get_agent_data_dir(), 'lightning_traces')
311 except ImportError:
312 traces_path = os.path.join(
313 os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 'lightning_traces')
314 os.makedirs(traces_path, exist_ok=True)
316 # Save span as JSON
317 filename = f"{traces_path}/{span.span_id}.json"
318 with open(filename, 'w') as f:
319 json.dump(span.to_dict(), f, indent=2)
321 logger.debug(f"Saved span to {filename}")
323 except Exception as e:
324 logger.error(f"Error saving span: {e}")
326 def get_span(self, span_id: str) -> Optional[Span]:
327 """Get span by ID"""
328 return self.spans.get(span_id)
330 def get_active_spans(self) -> List[Span]:
331 """Get all active spans"""
332 return [s for s in self.spans.values() if s.status == 'in_progress']
334 def get_statistics(self) -> Dict:
335 """Get tracer statistics"""
336 return dict(self.stats)
338 def clear(self):
339 """Clear all spans"""
340 self.spans.clear()
341 logger.info(f"Cleared all spans for {self.agent_id}")
344# Global functions for auto-tracing
346def enable_auto_tracing():
347 """Enable automatic tracing globally"""
348 global _global_tracing_enabled
349 _global_tracing_enabled = True
350 logger.info("Global auto-tracing enabled")
353def disable_auto_tracing():
354 """Disable automatic tracing globally"""
355 global _global_tracing_enabled
356 _global_tracing_enabled = False
357 logger.info("Global auto-tracing disabled")
360def is_auto_tracing_enabled() -> bool:
361 """Check if auto-tracing is enabled"""
362 return _global_tracing_enabled
365def get_active_span() -> Optional[Span]:
366 """Get currently active span (if any)"""
367 if not _active_spans:
368 return None
369 # Return most recent span
370 return list(_active_spans.values())[-1]
373__all__ = [
374 'LightningTracer',
375 'Span',
376 'enable_auto_tracing',
377 'disable_auto_tracing',
378 'is_auto_tracing_enabled',
379 'get_active_span',
380]