Coverage for integrations / coding_agent / aider_core / waiting.py: 16.9%
118 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1#!/usr/bin/env python
3"""
4Thread-based, killable spinner utility.
6Use it like:
8 from .waiting import WaitingSpinner
10 spinner = WaitingSpinner("Waiting for LLM")
11 spinner.start()
12 ... # long task
13 spinner.stop()
14"""
16import sys
17import threading
18import time
20try:
21 from rich.console import Console
22except ImportError:
23 # Minimal Console stub when rich is not installed
24 class Console:
25 @property
26 def width(self):
27 import shutil
28 return shutil.get_terminal_size((80, 24)).columns
29 def show_cursor(self, show=True):
30 pass
33class Spinner:
34 """
35 Minimal spinner that scans a single marker back and forth across a line.
37 The animation is pre-rendered into a list of frames. If the terminal
38 cannot display unicode the frames are converted to plain ASCII.
39 """
41 last_frame_idx = 0 # Class variable to store the last frame index
43 def __init__(self, text: str, width: int = 7):
44 self.text = text
45 self.start_time = time.time()
46 self.last_update = 0.0
47 self.visible = False
48 self.is_tty = sys.stdout.isatty()
49 self.console = Console()
51 # Pre-render the animation frames using pure ASCII so they will
52 # always display, even on very limited terminals.
53 ascii_frames = [
54 "#= ", # C1 C2 space(8)
55 "=# ", # C2 C1 space(8)
56 " =# ", # space(1) C2 C1 space(7)
57 " =# ", # space(2) C2 C1 space(6)
58 " =# ", # space(3) C2 C1 space(5)
59 " =# ", # space(4) C2 C1 space(4)
60 " =# ", # space(5) C2 C1 space(3)
61 " =# ", # space(6) C2 C1 space(2)
62 " =# ", # space(7) C2 C1 space(1)
63 " =#", # space(8) C2 C1
64 " #=", # space(8) C1 C2
65 " #= ", # space(7) C1 C2 space(1)
66 " #= ", # space(6) C1 C2 space(2)
67 " #= ", # space(5) C1 C2 space(3)
68 " #= ", # space(4) C1 C2 space(4)
69 " #= ", # space(3) C1 C2 space(5)
70 " #= ", # space(2) C1 C2 space(6)
71 " #= ", # space(1) C1 C2 space(7)
72 ]
74 self.unicode_palette = "░█"
75 xlate_from, xlate_to = ("=#", self.unicode_palette)
77 # If unicode is supported, swap the ASCII chars for nicer glyphs.
78 if self._supports_unicode():
79 translation_table = str.maketrans(xlate_from, xlate_to)
80 frames = [f.translate(translation_table) for f in ascii_frames]
81 self.scan_char = xlate_to[xlate_from.find("#")]
82 else:
83 frames = ascii_frames
84 self.scan_char = "#"
86 # Bounce the scanner back and forth.
87 self.frames = frames
88 self.frame_idx = Spinner.last_frame_idx # Initialize from class variable
89 self.width = len(frames[0]) - 2 # number of chars between the brackets
90 self.animation_len = len(frames[0])
91 self.last_display_len = 0 # Length of the last spinner line (frame + text)
93 def _supports_unicode(self) -> bool:
94 if not self.is_tty:
95 return False
96 try:
97 out = self.unicode_palette
98 out += "\b" * len(self.unicode_palette)
99 out += " " * len(self.unicode_palette)
100 out += "\b" * len(self.unicode_palette)
101 sys.stdout.write(out)
102 sys.stdout.flush()
103 return True
104 except UnicodeEncodeError:
105 return False
106 except Exception:
107 return False
109 def _next_frame(self) -> str:
110 frame = self.frames[self.frame_idx]
111 self.frame_idx = (self.frame_idx + 1) % len(self.frames)
112 Spinner.last_frame_idx = self.frame_idx # Update class variable
113 return frame
115 def step(self, text: str = None) -> None:
116 if text is not None:
117 self.text = text
119 if not self.is_tty:
120 return
122 now = time.time()
123 if not self.visible and now - self.start_time >= 0.5:
124 self.visible = True
125 self.last_update = 0.0
126 if self.is_tty:
127 self.console.show_cursor(False)
129 if not self.visible or now - self.last_update < 0.1:
130 return
132 self.last_update = now
133 frame_str = self._next_frame()
135 # Determine the maximum width for the spinner line
136 # Subtract 2 as requested, to leave a margin or prevent cursor wrapping issues
137 max_spinner_width = self.console.width - 2
138 if max_spinner_width < 0: # Handle extremely narrow terminals
139 max_spinner_width = 0
141 current_text_payload = f" {self.text}"
142 line_to_display = f"{frame_str}{current_text_payload}"
144 # Truncate the line if it's too long for the console width
145 if len(line_to_display) > max_spinner_width:
146 line_to_display = line_to_display[:max_spinner_width]
148 len_line_to_display = len(line_to_display)
150 # Calculate padding to clear any remnants from a longer previous line
151 padding_to_clear = " " * max(0, self.last_display_len - len_line_to_display)
153 # Write the spinner frame, text, and any necessary clearing spaces
154 sys.stdout.write(f"\r{line_to_display}{padding_to_clear}")
155 self.last_display_len = len_line_to_display
157 # Calculate number of backspaces to position cursor at the scanner character
158 scan_char_abs_pos = frame_str.find(self.scan_char)
160 # Total characters written to the line (frame + text + padding)
161 total_chars_written_on_line = len_line_to_display + len(padding_to_clear)
163 # num_backspaces will be non-positive if scan_char_abs_pos is beyond
164 # total_chars_written_on_line (e.g., if the scan char itself was truncated).
165 # (e.g., if the scan char itself was truncated).
166 # In such cases, (effectively) 0 backspaces are written,
167 # and the cursor stays at the end of the line.
168 num_backspaces = total_chars_written_on_line - scan_char_abs_pos
169 sys.stdout.write("\b" * num_backspaces)
170 sys.stdout.flush()
172 def end(self) -> None:
173 if self.visible and self.is_tty:
174 clear_len = self.last_display_len # Use the length of the last displayed content
175 sys.stdout.write("\r" + " " * clear_len + "\r")
176 sys.stdout.flush()
177 self.console.show_cursor(True)
178 self.visible = False
181class WaitingSpinner:
182 """Background spinner that can be started/stopped safely."""
184 def __init__(self, text: str = "Waiting for LLM", delay: float = 0.15):
185 self.spinner = Spinner(text)
186 self.delay = delay
187 self._stop_event = threading.Event()
188 self._thread = threading.Thread(target=self._spin, daemon=True)
190 def _spin(self):
191 while not self._stop_event.is_set():
192 self.spinner.step()
193 time.sleep(self.delay)
194 self.spinner.end()
196 def start(self):
197 """Start the spinner in a background thread."""
198 if not self._thread.is_alive():
199 self._thread.start()
201 def stop(self):
202 """Request the spinner to stop and wait briefly for the thread to exit."""
203 self._stop_event.set()
204 if self._thread.is_alive():
205 self._thread.join(timeout=self.delay)
206 self.spinner.end()
208 # Allow use as a context-manager
209 def __enter__(self):
210 self.start()
211 return self
213 def __exit__(self, exc_type, exc_val, exc_tb):
214 self.stop()
217def main():
218 spinner = Spinner("Running spinner...")
219 try:
220 for _ in range(100):
221 time.sleep(0.15)
222 spinner.step()
223 print("Success!")
224 except KeyboardInterrupt:
225 print("\nInterrupted by user.")
226 finally:
227 spinner.end()
230if __name__ == "__main__":
231 main()