Coverage for integrations / web_crawler.py: 0.0%

161 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Native web crawler — in-process crawl4ai, no HTTP API middleman. 

3 

4Every step is logged into a progress buffer that gets returned as part of 

5the tool output, so the LangChain/autogen agent sees intermediate progress 

6(connecting, rendering, extracting, word count) alongside the final content. 

7 

8Falls back to requests+BeautifulSoup if crawl4ai not installed. 

9 

10Consumers: 

11- LangChain Data_Extraction_From_URL tool (hart_intelligence) 

12- Google search enrichment top5_results (helper.py) 

13- autogen service tools (reuse_recipe.py) 

14""" 

15 

16import asyncio 

17import logging 

18import time 

19from typing import List 

20 

21logger = logging.getLogger(__name__) 

22 

23# Lazy-loaded crawler instance (heavy import — Playwright/Chromium) 

24_crawler = None 

25_crawler_available = None # None = not checked yet 

26 

27 

28def _check_available() -> bool: 

29 """Check if crawl4ai library is installed (cached).""" 

30 global _crawler_available 

31 if _crawler_available is not None: 

32 return _crawler_available 

33 try: 

34 import crawl4ai # noqa: F401 

35 _crawler_available = True 

36 except ImportError: 

37 logger.info("crawl4ai not installed, using requests+BeautifulSoup fallback") 

38 _crawler_available = False 

39 return _crawler_available 

40 

41 

42class _ProgressLog: 

43 """Accumulates intermediate step messages the agent sees in tool output.""" 

44 

45 def __init__(self): 

46 self._lines = [] 

47 self._start = time.time() 

48 

49 def step(self, msg: str): 

50 elapsed = round(time.time() - self._start, 2) 

51 line = f"[{elapsed}s] {msg}" 

52 self._lines.append(line) 

53 logger.info(msg) 

54 

55 def text(self) -> str: 

56 return "\n".join(self._lines) 

57 

58 

59async def _get_crawler(log: _ProgressLog): 

60 """Lazy-init singleton AsyncWebCrawler.""" 

61 global _crawler 

62 if _crawler is not None: 

63 return _crawler 

64 log.step("Initializing browser engine (first run)...") 

65 from crawl4ai import AsyncWebCrawler 

66 _crawler = AsyncWebCrawler( 

67 headless=True, 

68 browser_type='chromium', 

69 verbose=False, 

70 ) 

71 await _crawler.start() 

72 log.step("Browser engine ready") 

73 return _crawler 

74 

75 

76async def _crawl_single(url: str, timeout: int, log: _ProgressLog) -> dict: 

77 """Crawl one URL with intermediate progress logging.""" 

78 log.step(f"Connecting to {url}...") 

79 try: 

80 crawler = await _get_crawler(log) 

81 log.step(f"Rendering page (timeout={timeout}s)...") 

82 result = await crawler.arun( 

83 url=url, 

84 word_count_threshold=50, 

85 timeout=timeout * 1000, 

86 bypass_cache=True, 

87 ) 

88 if result.success and result.markdown: 

89 word_count = len(result.markdown.split()) 

90 log.step(f"Extracted {word_count} words from {url}") 

91 return { 

92 'success': True, 

93 'url': url, 

94 'markdown': result.markdown, 

95 'word_count': word_count, 

96 } 

97 error = getattr(result, 'error_message', 'No content extracted') 

98 log.step(f"Crawl returned no content: {error}") 

99 return {'success': False, 'url': url, 'error': error} 

100 except Exception as e: 

101 log.step(f"Crawl error: {e}") 

102 return {'success': False, 'url': url, 'error': str(e)} 

103 

104 

105def _fallback_fetch(url: str, timeout: int, log: _ProgressLog) -> dict: 

106 """Fallback: requests + BeautifulSoup. No browser needed.""" 

107 import requests as _req 

108 log.step(f"Fetching {url} (requests fallback)...") 

109 try: 

110 headers = { 

111 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' 

112 } 

113 resp = _req.get(url, headers=headers, timeout=timeout) 

114 resp.raise_for_status() 

115 log.step(f"HTTP {resp.status_code}, {len(resp.content)} bytes received") 

116 

117 from bs4 import BeautifulSoup 

118 soup = BeautifulSoup(resp.text, 'html.parser') 

119 for tag in soup(["script", "style", "nav", "header", "footer"]): 

120 tag.decompose() 

121 text = soup.get_text() 

122 lines = (line.strip() for line in text.splitlines()) 

123 chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) 

124 cleaned = ' '.join(c for c in chunks if c) 

125 

126 if not cleaned or len(cleaned) < 50: 

127 log.step("Too little content after cleanup") 

128 return {'success': False, 'url': url, 'error': 'Too little content extracted'} 

129 

130 word_count = len(cleaned.split()) 

131 log.step(f"Extracted {word_count} words (BeautifulSoup)") 

132 return {'success': True, 'url': url, 'markdown': cleaned, 'word_count': word_count} 

133 except Exception as e: 

134 log.step(f"Fallback error: {e}") 

135 return {'success': False, 'url': url, 'error': str(e)} 

136 

137 

138def _run_async(coro): 

139 """Run an async coroutine from sync context.""" 

140 try: 

141 loop = asyncio.get_running_loop() 

142 except RuntimeError: 

143 loop = None 

144 

145 if loop and loop.is_running(): 

146 import concurrent.futures 

147 with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool: 

148 return pool.submit(lambda: asyncio.run(coro)).result(timeout=120) 

149 else: 

150 return asyncio.run(coro) 

151 

152 

153# ── Public API ────────────────────────────────────────────────────── 

154 

155def crawl_url(url: str, timeout: int = 30) -> dict: 

156 """ 

157 Crawl a single URL. Returns dict with markdown + progress log. 

158 

159 Result keys: success, url, markdown, word_count, progress (str). 

160 """ 

161 log = _ProgressLog() 

162 if _check_available(): 

163 log.step("Using crawl4ai (in-process, JS rendering enabled)") 

164 try: 

165 result = _run_async(_crawl_single(url, timeout, log)) 

166 result['progress'] = log.text() 

167 return result 

168 except Exception as e: 

169 log.step(f"crawl4ai failed: {e}, falling back to requests") 

170 else: 

171 log.step("crawl4ai not installed, using requests+BeautifulSoup") 

172 

173 result = _fallback_fetch(url, timeout, log) 

174 result['progress'] = log.text() 

175 return result 

176 

177 

178def crawl_urls(urls: List[str], timeout: int = 30, max_concurrent: int = 3) -> List[dict]: 

179 """ 

180 Crawl multiple URLs. Returns list of result dicts, each with progress. 

181 """ 

182 if not urls: 

183 return [] 

184 

185 log = _ProgressLog() 

186 log.step(f"Batch crawl: {len(urls)} URLs, max_concurrent={max_concurrent}") 

187 

188 if _check_available(): 

189 log.step("Using crawl4ai (in-process)") 

190 

191 async def _batch(): 

192 sem = asyncio.Semaphore(max_concurrent) 

193 async def _one(u): 

194 async with sem: 

195 return await _crawl_single(u, timeout, log) 

196 return await asyncio.gather(*[_one(u) for u in urls]) 

197 

198 try: 

199 results = _run_async(_batch()) 

200 success_count = sum(1 for r in results if r['success']) 

201 log.step(f"Batch complete: {success_count}/{len(urls)} succeeded") 

202 batch_progress = log.text() 

203 for r in results: 

204 r['progress'] = batch_progress 

205 return results 

206 except Exception as e: 

207 log.step(f"crawl4ai batch failed: {e}, falling back") 

208 else: 

209 log.step("crawl4ai not installed, sequential fallback") 

210 

211 results = [] 

212 for u in urls: 

213 r = _fallback_fetch(u, timeout, log) 

214 results.append(r) 

215 batch_progress = log.text() 

216 for r in results: 

217 r['progress'] = batch_progress 

218 return results 

219 

220 

221def crawl_url_for_agent(url: str, timeout: int = 30) -> str: 

222 """ 

223 Crawl a URL and return a string for the LangChain agent. 

224 

225 The agent sees every intermediate step (progress log) followed by content. 

226 """ 

227 result = crawl_url(url, timeout) 

228 parts = [] 

229 

230 # Intermediate progress — agent sees each step 

231 if result.get('progress'): 

232 parts.append("--- Progress ---") 

233 parts.append(result['progress']) 

234 parts.append("--- Result ---") 

235 

236 if result['success']: 

237 content = result['markdown'] 

238 # Truncate for agent context window 

239 if len(content) > 8000: 

240 truncate_pos = content.rfind('.', 0, 8000) 

241 if truncate_pos > 6000: 

242 content = content[:truncate_pos + 1] + "\n[Content truncated]" 

243 else: 

244 content = content[:8000] + "\n[Content truncated]" 

245 parts.append(f"URL: {url}") 

246 parts.append(f"Words extracted: {result['word_count']}") 

247 parts.append(f"Content:\n{content}") 

248 else: 

249 parts.append(f"FAILED: {url}") 

250 parts.append(f"Error: {result['error']}") 

251 

252 return "\n".join(parts) 

253 

254 

255def crawl_urls_for_agent(urls: List[str], timeout: int = 30) -> str: 

256 """ 

257 Crawl multiple URLs and return combined agent-readable output. 

258 Includes progress log so agent sees intermediate steps. 

259 """ 

260 results = crawl_urls(urls, timeout) 

261 parts = [] 

262 

263 # Shared progress log (all results have the same batch progress) 

264 if results and results[0].get('progress'): 

265 parts.append("--- Progress ---") 

266 parts.append(results[0]['progress']) 

267 parts.append("--- Results ---") 

268 

269 success_count = 0 

270 for r in results: 

271 if r['success']: 

272 success_count += 1 

273 content = r['markdown'] 

274 if len(content) > 4000: 

275 truncate_pos = content.rfind('.', 0, 4000) 

276 if truncate_pos > 3000: 

277 content = content[:truncate_pos + 1] + " [truncated]" 

278 else: 

279 content = content[:4000] + " [truncated]" 

280 parts.append(f"\n## {r['url']}\nWords: {r['word_count']}\n{content}") 

281 else: 

282 parts.append(f"\n## {r['url']}\nFailed: {r['error']}") 

283 

284 header = f"Crawled {success_count}/{len(urls)} URLs successfully" 

285 return header + "\n" + "\n".join(parts)