Coverage for integrations / coding_agent / aider_core / waiting.py: 16.9%

118 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1#!/usr/bin/env python 

2 

3""" 

4Thread-based, killable spinner utility. 

5 

6Use it like: 

7 

8 from .waiting import WaitingSpinner 

9 

10 spinner = WaitingSpinner("Waiting for LLM") 

11 spinner.start() 

12 ... # long task 

13 spinner.stop() 

14""" 

15 

16import sys 

17import threading 

18import time 

19 

20try: 

21 from rich.console import Console 

22except ImportError: 

23 # Minimal Console stub when rich is not installed 

24 class Console: 

25 @property 

26 def width(self): 

27 import shutil 

28 return shutil.get_terminal_size((80, 24)).columns 

29 def show_cursor(self, show=True): 

30 pass 

31 

32 

33class Spinner: 

34 """ 

35 Minimal spinner that scans a single marker back and forth across a line. 

36 

37 The animation is pre-rendered into a list of frames. If the terminal 

38 cannot display unicode the frames are converted to plain ASCII. 

39 """ 

40 

41 last_frame_idx = 0 # Class variable to store the last frame index 

42 

43 def __init__(self, text: str, width: int = 7): 

44 self.text = text 

45 self.start_time = time.time() 

46 self.last_update = 0.0 

47 self.visible = False 

48 self.is_tty = sys.stdout.isatty() 

49 self.console = Console() 

50 

51 # Pre-render the animation frames using pure ASCII so they will 

52 # always display, even on very limited terminals. 

53 ascii_frames = [ 

54 "#= ", # C1 C2 space(8) 

55 "=# ", # C2 C1 space(8) 

56 " =# ", # space(1) C2 C1 space(7) 

57 " =# ", # space(2) C2 C1 space(6) 

58 " =# ", # space(3) C2 C1 space(5) 

59 " =# ", # space(4) C2 C1 space(4) 

60 " =# ", # space(5) C2 C1 space(3) 

61 " =# ", # space(6) C2 C1 space(2) 

62 " =# ", # space(7) C2 C1 space(1) 

63 " =#", # space(8) C2 C1 

64 " #=", # space(8) C1 C2 

65 " #= ", # space(7) C1 C2 space(1) 

66 " #= ", # space(6) C1 C2 space(2) 

67 " #= ", # space(5) C1 C2 space(3) 

68 " #= ", # space(4) C1 C2 space(4) 

69 " #= ", # space(3) C1 C2 space(5) 

70 " #= ", # space(2) C1 C2 space(6) 

71 " #= ", # space(1) C1 C2 space(7) 

72 ] 

73 

74 self.unicode_palette = "░█" 

75 xlate_from, xlate_to = ("=#", self.unicode_palette) 

76 

77 # If unicode is supported, swap the ASCII chars for nicer glyphs. 

78 if self._supports_unicode(): 

79 translation_table = str.maketrans(xlate_from, xlate_to) 

80 frames = [f.translate(translation_table) for f in ascii_frames] 

81 self.scan_char = xlate_to[xlate_from.find("#")] 

82 else: 

83 frames = ascii_frames 

84 self.scan_char = "#" 

85 

86 # Bounce the scanner back and forth. 

87 self.frames = frames 

88 self.frame_idx = Spinner.last_frame_idx # Initialize from class variable 

89 self.width = len(frames[0]) - 2 # number of chars between the brackets 

90 self.animation_len = len(frames[0]) 

91 self.last_display_len = 0 # Length of the last spinner line (frame + text) 

92 

93 def _supports_unicode(self) -> bool: 

94 if not self.is_tty: 

95 return False 

96 try: 

97 out = self.unicode_palette 

98 out += "\b" * len(self.unicode_palette) 

99 out += " " * len(self.unicode_palette) 

100 out += "\b" * len(self.unicode_palette) 

101 sys.stdout.write(out) 

102 sys.stdout.flush() 

103 return True 

104 except UnicodeEncodeError: 

105 return False 

106 except Exception: 

107 return False 

108 

109 def _next_frame(self) -> str: 

110 frame = self.frames[self.frame_idx] 

111 self.frame_idx = (self.frame_idx + 1) % len(self.frames) 

112 Spinner.last_frame_idx = self.frame_idx # Update class variable 

113 return frame 

114 

115 def step(self, text: str = None) -> None: 

116 if text is not None: 

117 self.text = text 

118 

119 if not self.is_tty: 

120 return 

121 

122 now = time.time() 

123 if not self.visible and now - self.start_time >= 0.5: 

124 self.visible = True 

125 self.last_update = 0.0 

126 if self.is_tty: 

127 self.console.show_cursor(False) 

128 

129 if not self.visible or now - self.last_update < 0.1: 

130 return 

131 

132 self.last_update = now 

133 frame_str = self._next_frame() 

134 

135 # Determine the maximum width for the spinner line 

136 # Subtract 2 as requested, to leave a margin or prevent cursor wrapping issues 

137 max_spinner_width = self.console.width - 2 

138 if max_spinner_width < 0: # Handle extremely narrow terminals 

139 max_spinner_width = 0 

140 

141 current_text_payload = f" {self.text}" 

142 line_to_display = f"{frame_str}{current_text_payload}" 

143 

144 # Truncate the line if it's too long for the console width 

145 if len(line_to_display) > max_spinner_width: 

146 line_to_display = line_to_display[:max_spinner_width] 

147 

148 len_line_to_display = len(line_to_display) 

149 

150 # Calculate padding to clear any remnants from a longer previous line 

151 padding_to_clear = " " * max(0, self.last_display_len - len_line_to_display) 

152 

153 # Write the spinner frame, text, and any necessary clearing spaces 

154 sys.stdout.write(f"\r{line_to_display}{padding_to_clear}") 

155 self.last_display_len = len_line_to_display 

156 

157 # Calculate number of backspaces to position cursor at the scanner character 

158 scan_char_abs_pos = frame_str.find(self.scan_char) 

159 

160 # Total characters written to the line (frame + text + padding) 

161 total_chars_written_on_line = len_line_to_display + len(padding_to_clear) 

162 

163 # num_backspaces will be non-positive if scan_char_abs_pos is beyond 

164 # total_chars_written_on_line (e.g., if the scan char itself was truncated). 

165 # (e.g., if the scan char itself was truncated). 

166 # In such cases, (effectively) 0 backspaces are written, 

167 # and the cursor stays at the end of the line. 

168 num_backspaces = total_chars_written_on_line - scan_char_abs_pos 

169 sys.stdout.write("\b" * num_backspaces) 

170 sys.stdout.flush() 

171 

172 def end(self) -> None: 

173 if self.visible and self.is_tty: 

174 clear_len = self.last_display_len # Use the length of the last displayed content 

175 sys.stdout.write("\r" + " " * clear_len + "\r") 

176 sys.stdout.flush() 

177 self.console.show_cursor(True) 

178 self.visible = False 

179 

180 

181class WaitingSpinner: 

182 """Background spinner that can be started/stopped safely.""" 

183 

184 def __init__(self, text: str = "Waiting for LLM", delay: float = 0.15): 

185 self.spinner = Spinner(text) 

186 self.delay = delay 

187 self._stop_event = threading.Event() 

188 self._thread = threading.Thread(target=self._spin, daemon=True) 

189 

190 def _spin(self): 

191 while not self._stop_event.is_set(): 

192 self.spinner.step() 

193 time.sleep(self.delay) 

194 self.spinner.end() 

195 

196 def start(self): 

197 """Start the spinner in a background thread.""" 

198 if not self._thread.is_alive(): 

199 self._thread.start() 

200 

201 def stop(self): 

202 """Request the spinner to stop and wait briefly for the thread to exit.""" 

203 self._stop_event.set() 

204 if self._thread.is_alive(): 

205 self._thread.join(timeout=self.delay) 

206 self.spinner.end() 

207 

208 # Allow use as a context-manager 

209 def __enter__(self): 

210 self.start() 

211 return self 

212 

213 def __exit__(self, exc_type, exc_val, exc_tb): 

214 self.stop() 

215 

216 

217def main(): 

218 spinner = Spinner("Running spinner...") 

219 try: 

220 for _ in range(100): 

221 time.sleep(0.15) 

222 spinner.step() 

223 print("Success!") 

224 except KeyboardInterrupt: 

225 print("\nInterrupted by user.") 

226 finally: 

227 spinner.end() 

228 

229 

230if __name__ == "__main__": 

231 main()