Coverage for integrations / coding_agent / aider_core / coders / search_replace.py: 45.3%

371 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1#!/usr/bin/env python 

2 

3import sys 

4from pathlib import Path 

5 

6try: 

7 import git 

8except ImportError: 

9 git = None 

10 

11from diff_match_patch import diff_match_patch 

12from tqdm import tqdm 

13 

14from ..dump import dump 

15from ..utils import GitTemporaryDirectory 

16 

17 

18class RelativeIndenter: 

19 """Rewrites text files to have relative indentation, which involves 

20 reformatting the leading white space on lines. This format makes 

21 it easier to search and apply edits to pairs of code blocks which 

22 may differ significantly in their overall level of indentation. 

23 

24 It removes leading white space which is shared with the preceding 

25 line. 

26 

27 Original: 

28 ``` 

29 Foo # indented 8 

30 Bar # indented 4 more than the previous line 

31 Baz # same indent as the previous line 

32 Fob # same indent as the previous line 

33 ``` 

34 

35 Becomes: 

36 ``` 

37 Foo # indented 8 

38 Bar # indented 4 more than the previous line 

39 Baz # same indent as the previous line 

40 Fob # same indent as the previous line 

41 ``` 

42 

43 If the current line is *less* indented then the previous line, 

44 uses a unicode character to indicate outdenting. 

45 

46 Original 

47 ``` 

48 Foo 

49 Bar 

50 Baz 

51 Fob # indented 4 less than the previous line 

52 ``` 

53 

54 Becomes: 

55 ``` 

56 Foo 

57 Bar 

58 Baz 

59 ←←←←Fob # indented 4 less than the previous line 

60 ``` 

61 

62 This is a similar original to the last one, but every line has 

63 been uniformly outdented: 

64 ``` 

65 Foo 

66 Bar 

67 Baz 

68 Fob # indented 4 less than the previous line 

69 ``` 

70 

71 It becomes this result, which is very similar to the previous 

72 result. Only the white space on the first line differs. From the 

73 word Foo onwards, it is identical to the previous result. 

74 ``` 

75 Foo 

76 Bar 

77 Baz 

78 ←←←←Fob # indented 4 less than the previous line 

79 ``` 

80 

81 """ 

82 

83 def __init__(self, texts): 

84 """ 

85 Based on the texts, choose a unicode character that isn't in any of them. 

86 """ 

87 

88 chars = set() 

89 for text in texts: 

90 chars.update(text) 

91 

92 ARROW = "←" 

93 if ARROW not in chars: 

94 self.marker = ARROW 

95 else: 

96 self.marker = self.select_unique_marker(chars) 

97 

98 def select_unique_marker(self, chars): 

99 for codepoint in range(0x10FFFF, 0x10000, -1): 

100 marker = chr(codepoint) 

101 if marker not in chars: 

102 return marker 

103 

104 raise ValueError("Could not find a unique marker") 

105 

106 def make_relative(self, text): 

107 """ 

108 Transform text to use relative indents. 

109 """ 

110 

111 if self.marker in text: 

112 raise ValueError(f"Text already contains the outdent marker: {self.marker}") 

113 

114 lines = text.splitlines(keepends=True) 

115 

116 output = [] 

117 prev_indent = "" 

118 for line in lines: 

119 line_without_end = line.rstrip("\n\r") 

120 

121 len_indent = len(line_without_end) - len(line_without_end.lstrip()) 

122 indent = line[:len_indent] 

123 change = len_indent - len(prev_indent) 

124 if change > 0: 

125 cur_indent = indent[-change:] 

126 elif change < 0: 

127 cur_indent = self.marker * -change 

128 else: 

129 cur_indent = "" 

130 

131 out_line = cur_indent + "\n" + line[len_indent:] 

132 # dump(len_indent, change, out_line) 

133 # print(out_line) 

134 output.append(out_line) 

135 prev_indent = indent 

136 

137 res = "".join(output) 

138 return res 

139 

140 def make_absolute(self, text): 

141 """ 

142 Transform text from relative back to absolute indents. 

143 """ 

144 lines = text.splitlines(keepends=True) 

145 

146 output = [] 

147 prev_indent = "" 

148 for i in range(0, len(lines), 2): 

149 dent = lines[i].rstrip("\r\n") 

150 non_indent = lines[i + 1] 

151 

152 if dent.startswith(self.marker): 

153 len_outdent = len(dent) 

154 cur_indent = prev_indent[:-len_outdent] 

155 else: 

156 cur_indent = prev_indent + dent 

157 

158 if not non_indent.rstrip("\r\n"): 

159 out_line = non_indent # don't indent a blank line 

160 else: 

161 out_line = cur_indent + non_indent 

162 

163 output.append(out_line) 

164 prev_indent = cur_indent 

165 

166 res = "".join(output) 

167 if self.marker in res: 

168 # dump(res) 

169 raise ValueError("Error transforming text back to absolute indents") 

170 

171 return res 

172 

173 

174# The patches are created to change S->R. 

175# So all the patch offsets are relative to S. 

176# But O has a lot more content. So all the offsets are very wrong. 

177# 

178# But patch_apply() seems to imply that once patch N is located, 

179# then it adjusts the offset of the next patch. 

180# 

181# This is great, because once we sync up after a big gap the nearby 

182# patches are close to being located right. 

183# Except when indentation has been changed by GPT. 

184# 

185# It would help to use the diff trick to build map_S_offset_to_O_offset(). 

186# Then update all the S offsets in the S->R patches to be O offsets. 

187# Do we also need to update the R offsets? 

188# 

189# What if this gets funky/wrong? 

190# 

191 

192 

193def map_patches(texts, patches, debug): 

194 search_text, replace_text, original_text = texts 

195 

196 dmp = diff_match_patch() 

197 dmp.Diff_Timeout = 5 

198 

199 diff_s_o = dmp.diff_main(search_text, original_text) 

200 # diff_r_s = dmp.diff_main(replace_text, search_text) 

201 

202 # dmp.diff_cleanupSemantic(diff_s_o) 

203 # dmp.diff_cleanupEfficiency(diff_s_o) 

204 

205 if debug: 

206 html = dmp.diff_prettyHtml(diff_s_o) 

207 Path("tmp.html").write_text(html) 

208 

209 dump(len(search_text)) 

210 dump(len(original_text)) 

211 

212 for patch in patches: 

213 start1 = patch.start1 

214 start2 = patch.start2 

215 

216 patch.start1 = dmp.diff_xIndex(diff_s_o, start1) 

217 patch.start2 = dmp.diff_xIndex(diff_s_o, start2) 

218 

219 if debug: 

220 print() 

221 print(start1, repr(search_text[start1 : start1 + 50])) 

222 print(patch.start1, repr(original_text[patch.start1 : patch.start1 + 50])) 

223 print(patch.diffs) 

224 print() 

225 

226 return patches 

227 

228 

229example = """Left 

230Left 

231 4 in 

232 4 in 

233 8 in 

234 4 in 

235Left 

236""" 

237 

238 

239def relative_indent(texts): 

240 ri = RelativeIndenter(texts) 

241 texts = list(map(ri.make_relative, texts)) 

242 

243 return ri, texts 

244 

245 

246line_padding = 100 

247 

248 

249def line_pad(text): 

250 padding = "\n" * line_padding 

251 return padding + text + padding 

252 

253 

254def line_unpad(text): 

255 if set(text[:line_padding] + text[-line_padding:]) != set("\n"): 

256 return 

257 return text[line_padding:-line_padding] 

258 

259 

260def dmp_apply(texts, remap=True): 

261 debug = False 

262 # debug = True 

263 

264 search_text, replace_text, original_text = texts 

265 

266 dmp = diff_match_patch() 

267 dmp.Diff_Timeout = 5 

268 # dmp.Diff_EditCost = 16 

269 

270 if remap: 

271 dmp.Match_Threshold = 0.95 

272 dmp.Match_Distance = 500 

273 dmp.Match_MaxBits = 128 

274 dmp.Patch_Margin = 32 

275 else: 

276 dmp.Match_Threshold = 0.5 

277 dmp.Match_Distance = 100_000 

278 dmp.Match_MaxBits = 32 

279 dmp.Patch_Margin = 8 

280 

281 diff = dmp.diff_main(search_text, replace_text, None) 

282 dmp.diff_cleanupSemantic(diff) 

283 dmp.diff_cleanupEfficiency(diff) 

284 

285 patches = dmp.patch_make(search_text, diff) 

286 

287 if debug: 

288 html = dmp.diff_prettyHtml(diff) 

289 Path("tmp.search_replace_diff.html").write_text(html) 

290 

291 for d in diff: 

292 print(d[0], repr(d[1])) 

293 

294 for patch in patches: 

295 start1 = patch.start1 

296 print() 

297 print(start1, repr(search_text[start1 : start1 + 10])) 

298 print(start1, repr(replace_text[start1 : start1 + 10])) 

299 print(patch.diffs) 

300 

301 # dump(original_text) 

302 # dump(search_text) 

303 

304 if remap: 

305 patches = map_patches(texts, patches, debug) 

306 

307 patches_text = dmp.patch_toText(patches) 

308 

309 new_text, success = dmp.patch_apply(patches, original_text) 

310 

311 all_success = False not in success 

312 

313 if debug: 

314 # dump(new_text) 

315 print(patches_text) 

316 

317 # print(new_text) 

318 dump(success) 

319 dump(all_success) 

320 

321 # print(new_text) 

322 

323 if not all_success: 

324 return 

325 

326 return new_text 

327 

328 

329def lines_to_chars(lines, mapping): 

330 new_text = [] 

331 for char in lines: 

332 new_text.append(mapping[ord(char)]) 

333 

334 new_text = "".join(new_text) 

335 return new_text 

336 

337 

338def dmp_lines_apply(texts): 

339 debug = False 

340 # debug = True 

341 

342 for t in texts: 

343 assert t.endswith("\n"), t 

344 

345 search_text, replace_text, original_text = texts 

346 

347 dmp = diff_match_patch() 

348 dmp.Diff_Timeout = 5 

349 # dmp.Diff_EditCost = 16 

350 

351 dmp.Match_Threshold = 0.1 

352 dmp.Match_Distance = 100_000 

353 dmp.Match_MaxBits = 32 

354 dmp.Patch_Margin = 1 

355 

356 all_text = search_text + replace_text + original_text 

357 all_lines, _, mapping = dmp.diff_linesToChars(all_text, "") 

358 assert len(all_lines) == len(all_text.splitlines()) 

359 

360 search_num = len(search_text.splitlines()) 

361 replace_num = len(replace_text.splitlines()) 

362 original_num = len(original_text.splitlines()) 

363 

364 search_lines = all_lines[:search_num] 

365 replace_lines = all_lines[search_num : search_num + replace_num] 

366 original_lines = all_lines[search_num + replace_num :] 

367 

368 assert len(search_lines) == search_num 

369 assert len(replace_lines) == replace_num 

370 assert len(original_lines) == original_num 

371 

372 diff_lines = dmp.diff_main(search_lines, replace_lines, None) 

373 dmp.diff_cleanupSemantic(diff_lines) 

374 dmp.diff_cleanupEfficiency(diff_lines) 

375 

376 patches = dmp.patch_make(search_lines, diff_lines) 

377 

378 if debug: 

379 diff = list(diff_lines) 

380 dmp.diff_charsToLines(diff, mapping) 

381 # dump(diff) 

382 html = dmp.diff_prettyHtml(diff) 

383 Path("tmp.search_replace_diff.html").write_text(html) 

384 

385 for d in diff: 

386 print(d[0], repr(d[1])) 

387 

388 new_lines, success = dmp.patch_apply(patches, original_lines) 

389 new_text = lines_to_chars(new_lines, mapping) 

390 

391 all_success = False not in success 

392 

393 if debug: 

394 # print(new_text) 

395 dump(success) 

396 dump(all_success) 

397 

398 # print(new_text) 

399 

400 if not all_success: 

401 return 

402 

403 return new_text 

404 

405 

406def diff_lines(search_text, replace_text): 

407 dmp = diff_match_patch() 

408 dmp.Diff_Timeout = 5 

409 # dmp.Diff_EditCost = 16 

410 search_lines, replace_lines, mapping = dmp.diff_linesToChars(search_text, replace_text) 

411 

412 diff_lines = dmp.diff_main(search_lines, replace_lines, None) 

413 dmp.diff_cleanupSemantic(diff_lines) 

414 dmp.diff_cleanupEfficiency(diff_lines) 

415 

416 diff = list(diff_lines) 

417 dmp.diff_charsToLines(diff, mapping) 

418 # dump(diff) 

419 

420 udiff = [] 

421 for d, lines in diff: 

422 if d < 0: 

423 d = "-" 

424 elif d > 0: 

425 d = "+" 

426 else: 

427 d = " " 

428 for line in lines.splitlines(keepends=True): 

429 udiff.append(d + line) 

430 

431 return udiff 

432 

433 

434def search_and_replace(texts): 

435 search_text, replace_text, original_text = texts 

436 

437 num = original_text.count(search_text) 

438 # if num > 1: 

439 # raise SearchTextNotUnique() 

440 if num == 0: 

441 return 

442 

443 new_text = original_text.replace(search_text, replace_text) 

444 

445 return new_text 

446 

447 

448def git_cherry_pick_osr_onto_o(texts): 

449 search_text, replace_text, original_text = texts 

450 

451 with GitTemporaryDirectory() as dname: 

452 repo = git.Repo(dname) 

453 

454 fname = Path(dname) / "file.txt" 

455 

456 # Make O->S->R 

457 fname.write_text(original_text) 

458 repo.git.add(str(fname)) 

459 repo.git.commit("-m", "original") 

460 original_hash = repo.head.commit.hexsha 

461 

462 fname.write_text(search_text) 

463 repo.git.add(str(fname)) 

464 repo.git.commit("-m", "search") 

465 

466 fname.write_text(replace_text) 

467 repo.git.add(str(fname)) 

468 repo.git.commit("-m", "replace") 

469 replace_hash = repo.head.commit.hexsha 

470 

471 # go back to O 

472 repo.git.checkout(original_hash) 

473 

474 # cherry pick R onto original 

475 try: 

476 repo.git.cherry_pick(replace_hash, "--minimal") 

477 except (git.exc.ODBError, git.exc.GitError): 

478 # merge conflicts! 

479 return 

480 

481 new_text = fname.read_text() 

482 return new_text 

483 

484 

485def git_cherry_pick_sr_onto_so(texts): 

486 search_text, replace_text, original_text = texts 

487 

488 with GitTemporaryDirectory() as dname: 

489 repo = git.Repo(dname) 

490 

491 fname = Path(dname) / "file.txt" 

492 

493 fname.write_text(search_text) 

494 repo.git.add(str(fname)) 

495 repo.git.commit("-m", "search") 

496 search_hash = repo.head.commit.hexsha 

497 

498 # make search->replace 

499 fname.write_text(replace_text) 

500 repo.git.add(str(fname)) 

501 repo.git.commit("-m", "replace") 

502 replace_hash = repo.head.commit.hexsha 

503 

504 # go back to search, 

505 repo.git.checkout(search_hash) 

506 

507 # make search->original 

508 fname.write_text(original_text) 

509 repo.git.add(str(fname)) 

510 repo.git.commit("-m", "original") 

511 

512 # cherry pick replace onto original 

513 try: 

514 repo.git.cherry_pick(replace_hash, "--minimal") 

515 except (git.exc.ODBError, git.exc.GitError): 

516 # merge conflicts! 

517 return 

518 

519 new_text = fname.read_text() 

520 

521 return new_text 

522 

523 

524class SearchTextNotUnique(ValueError): 

525 pass 

526 

527 

528all_preprocs = [ 

529 # (strip_blank_lines, relative_indent, reverse_lines) 

530 (False, False, False), 

531 (True, False, False), 

532 (False, True, False), 

533 (True, True, False), 

534 # (False, False, True), 

535 # (True, False, True), 

536 # (False, True, True), 

537 # (True, True, True), 

538] 

539 

540always_relative_indent = [ 

541 (False, True, False), 

542 (True, True, False), 

543 # (False, True, True), 

544 # (True, True, True), 

545] 

546 

547editblock_strategies = [ 

548 (search_and_replace, all_preprocs), 

549 (git_cherry_pick_osr_onto_o, all_preprocs), 

550 (dmp_lines_apply, all_preprocs), 

551] 

552 

553never_relative = [ 

554 (False, False), 

555 (True, False), 

556] 

557 

558udiff_strategies = [ 

559 (search_and_replace, all_preprocs), 

560 (git_cherry_pick_osr_onto_o, all_preprocs), 

561 (dmp_lines_apply, all_preprocs), 

562] 

563 

564 

565def flexible_search_and_replace(texts, strategies): 

566 """Try a series of search/replace methods, starting from the most 

567 literal interpretation of search_text. If needed, progress to more 

568 flexible methods, which can accommodate divergence between 

569 search_text and original_text and yet still achieve the desired 

570 edits. 

571 """ 

572 

573 for strategy, preprocs in strategies: 

574 for preproc in preprocs: 

575 res = try_strategy(texts, strategy, preproc) 

576 if res: 

577 return res 

578 

579 

580def reverse_lines(text): 

581 lines = text.splitlines(keepends=True) 

582 lines.reverse() 

583 return "".join(lines) 

584 

585 

586def try_strategy(texts, strategy, preproc): 

587 preproc_strip_blank_lines, preproc_relative_indent, preproc_reverse = preproc 

588 ri = None 

589 

590 if preproc_strip_blank_lines: 

591 texts = strip_blank_lines(texts) 

592 if preproc_relative_indent: 

593 ri, texts = relative_indent(texts) 

594 if preproc_reverse: 

595 texts = list(map(reverse_lines, texts)) 

596 

597 res = strategy(texts) 

598 

599 if res and preproc_reverse: 

600 res = reverse_lines(res) 

601 

602 if res and preproc_relative_indent: 

603 try: 

604 res = ri.make_absolute(res) 

605 except ValueError: 

606 return 

607 

608 return res 

609 

610 

611def strip_blank_lines(texts): 

612 # strip leading and trailing blank lines 

613 texts = [text.strip("\n") + "\n" for text in texts] 

614 return texts 

615 

616 

617def read_text(fname): 

618 text = Path(fname).read_text() 

619 return text 

620 

621 

622def proc(dname): 

623 dname = Path(dname) 

624 

625 try: 

626 search_text = read_text(dname / "search") 

627 replace_text = read_text(dname / "replace") 

628 original_text = read_text(dname / "original") 

629 except FileNotFoundError: 

630 return 

631 

632 #### 

633 

634 texts = search_text, replace_text, original_text 

635 

636 strategies = [ 

637 # (search_and_replace, all_preprocs), 

638 # (git_cherry_pick_osr_onto_o, all_preprocs), 

639 # (git_cherry_pick_sr_onto_so, all_preprocs), 

640 # (dmp_apply, all_preprocs), 

641 (dmp_lines_apply, all_preprocs), 

642 ] 

643 

644 short_names = dict( 

645 search_and_replace="sr", 

646 git_cherry_pick_osr_onto_o="cp_o", 

647 git_cherry_pick_sr_onto_so="cp_so", 

648 dmp_apply="dmp", 

649 dmp_lines_apply="dmpl", 

650 ) 

651 

652 patched = dict() 

653 for strategy, preprocs in strategies: 

654 for preproc in preprocs: 

655 method = strategy.__name__ 

656 method = short_names[method] 

657 

658 strip_blank, rel_indent, rev_lines = preproc 

659 if strip_blank or rel_indent: 

660 method += "_" 

661 if strip_blank: 

662 method += "s" 

663 if rel_indent: 

664 method += "i" 

665 if rev_lines: 

666 method += "r" 

667 

668 res = try_strategy(texts, strategy, preproc) 

669 patched[method] = res 

670 

671 results = [] 

672 for method, res in patched.items(): 

673 out_fname = dname / f"original.{method}" 

674 if out_fname.exists(): 

675 out_fname.unlink() 

676 

677 if res: 

678 out_fname.write_text(res) 

679 

680 correct = (dname / "correct").read_text() 

681 if res == correct: 

682 res = "pass" 

683 else: 

684 res = "WRONG" 

685 else: 

686 res = "fail" 

687 

688 results.append((method, res)) 

689 

690 return results 

691 

692 

693def colorize_result(result): 

694 colors = { 

695 "pass": "\033[102;30mpass\033[0m", # Green background, black text 

696 "WRONG": "\033[101;30mWRONG\033[0m", # Red background, black text 

697 "fail": "\033[103;30mfail\033[0m", # Yellow background, black text 

698 } 

699 return colors.get(result, result) # Default to original result if not found 

700 

701 

702def main(dnames): 

703 all_results = [] 

704 for dname in tqdm(dnames): 

705 dname = Path(dname) 

706 results = proc(dname) 

707 for method, res in results: 

708 all_results.append((dname, method, res)) 

709 # print(dname, method, colorize_result(res)) 

710 

711 # Create a 2D table with directories along the right and methods along the top 

712 # Collect all unique methods and directories 

713 methods = [] 

714 for _, method, _ in all_results: 

715 if method not in methods: 

716 methods.append(method) 

717 

718 directories = dnames 

719 

720 # Sort directories by decreasing number of 'pass' results 

721 pass_counts = { 

722 dname: sum( 

723 res == "pass" for dname_result, _, res in all_results if str(dname) == str(dname_result) 

724 ) 

725 for dname in directories 

726 } 

727 directories.sort(key=lambda dname: pass_counts[dname], reverse=True) 

728 

729 # Create a results matrix 

730 results_matrix = {dname: {method: "" for method in methods} for dname in directories} 

731 

732 # Populate the results matrix 

733 for dname, method, res in all_results: 

734 results_matrix[str(dname)][method] = res 

735 

736 # Print the 2D table 

737 # Print the header 

738 print("{:<20}".format("Directory"), end="") 

739 for method in methods: 

740 print("{:<9}".format(method), end="") 

741 print() 

742 

743 # Print the rows with colorized results 

744 for dname in directories: 

745 print("{:<20}".format(Path(dname).name), end="") 

746 for method in methods: 

747 res = results_matrix[dname][method] 

748 colorized_res = colorize_result(res) 

749 res_l = 9 + len(colorized_res) - len(res) 

750 fmt = "{:<" + str(res_l) + "}" 

751 print(fmt.format(colorized_res), end="") 

752 print() 

753 

754 

755if __name__ == "__main__": 

756 status = main(sys.argv[1:]) 

757 sys.exit(status)