Coverage for integrations / web_crawler.py: 0.0%
161 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Native web crawler — in-process crawl4ai, no HTTP API middleman.
4Every step is logged into a progress buffer that gets returned as part of
5the tool output, so the LangChain/autogen agent sees intermediate progress
6(connecting, rendering, extracting, word count) alongside the final content.
8Falls back to requests+BeautifulSoup if crawl4ai not installed.
10Consumers:
11- LangChain Data_Extraction_From_URL tool (hart_intelligence)
12- Google search enrichment top5_results (helper.py)
13- autogen service tools (reuse_recipe.py)
14"""
16import asyncio
17import logging
18import time
19from typing import List
21logger = logging.getLogger(__name__)
23# Lazy-loaded crawler instance (heavy import — Playwright/Chromium)
24_crawler = None
25_crawler_available = None # None = not checked yet
28def _check_available() -> bool:
29 """Check if crawl4ai library is installed (cached)."""
30 global _crawler_available
31 if _crawler_available is not None:
32 return _crawler_available
33 try:
34 import crawl4ai # noqa: F401
35 _crawler_available = True
36 except ImportError:
37 logger.info("crawl4ai not installed, using requests+BeautifulSoup fallback")
38 _crawler_available = False
39 return _crawler_available
42class _ProgressLog:
43 """Accumulates intermediate step messages the agent sees in tool output."""
45 def __init__(self):
46 self._lines = []
47 self._start = time.time()
49 def step(self, msg: str):
50 elapsed = round(time.time() - self._start, 2)
51 line = f"[{elapsed}s] {msg}"
52 self._lines.append(line)
53 logger.info(msg)
55 def text(self) -> str:
56 return "\n".join(self._lines)
59async def _get_crawler(log: _ProgressLog):
60 """Lazy-init singleton AsyncWebCrawler."""
61 global _crawler
62 if _crawler is not None:
63 return _crawler
64 log.step("Initializing browser engine (first run)...")
65 from crawl4ai import AsyncWebCrawler
66 _crawler = AsyncWebCrawler(
67 headless=True,
68 browser_type='chromium',
69 verbose=False,
70 )
71 await _crawler.start()
72 log.step("Browser engine ready")
73 return _crawler
76async def _crawl_single(url: str, timeout: int, log: _ProgressLog) -> dict:
77 """Crawl one URL with intermediate progress logging."""
78 log.step(f"Connecting to {url}...")
79 try:
80 crawler = await _get_crawler(log)
81 log.step(f"Rendering page (timeout={timeout}s)...")
82 result = await crawler.arun(
83 url=url,
84 word_count_threshold=50,
85 timeout=timeout * 1000,
86 bypass_cache=True,
87 )
88 if result.success and result.markdown:
89 word_count = len(result.markdown.split())
90 log.step(f"Extracted {word_count} words from {url}")
91 return {
92 'success': True,
93 'url': url,
94 'markdown': result.markdown,
95 'word_count': word_count,
96 }
97 error = getattr(result, 'error_message', 'No content extracted')
98 log.step(f"Crawl returned no content: {error}")
99 return {'success': False, 'url': url, 'error': error}
100 except Exception as e:
101 log.step(f"Crawl error: {e}")
102 return {'success': False, 'url': url, 'error': str(e)}
105def _fallback_fetch(url: str, timeout: int, log: _ProgressLog) -> dict:
106 """Fallback: requests + BeautifulSoup. No browser needed."""
107 import requests as _req
108 log.step(f"Fetching {url} (requests fallback)...")
109 try:
110 headers = {
111 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
112 }
113 resp = _req.get(url, headers=headers, timeout=timeout)
114 resp.raise_for_status()
115 log.step(f"HTTP {resp.status_code}, {len(resp.content)} bytes received")
117 from bs4 import BeautifulSoup
118 soup = BeautifulSoup(resp.text, 'html.parser')
119 for tag in soup(["script", "style", "nav", "header", "footer"]):
120 tag.decompose()
121 text = soup.get_text()
122 lines = (line.strip() for line in text.splitlines())
123 chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
124 cleaned = ' '.join(c for c in chunks if c)
126 if not cleaned or len(cleaned) < 50:
127 log.step("Too little content after cleanup")
128 return {'success': False, 'url': url, 'error': 'Too little content extracted'}
130 word_count = len(cleaned.split())
131 log.step(f"Extracted {word_count} words (BeautifulSoup)")
132 return {'success': True, 'url': url, 'markdown': cleaned, 'word_count': word_count}
133 except Exception as e:
134 log.step(f"Fallback error: {e}")
135 return {'success': False, 'url': url, 'error': str(e)}
138def _run_async(coro):
139 """Run an async coroutine from sync context."""
140 try:
141 loop = asyncio.get_running_loop()
142 except RuntimeError:
143 loop = None
145 if loop and loop.is_running():
146 import concurrent.futures
147 with concurrent.futures.ThreadPoolExecutor(max_workers=1) as pool:
148 return pool.submit(lambda: asyncio.run(coro)).result(timeout=120)
149 else:
150 return asyncio.run(coro)
153# ── Public API ──────────────────────────────────────────────────────
155def crawl_url(url: str, timeout: int = 30) -> dict:
156 """
157 Crawl a single URL. Returns dict with markdown + progress log.
159 Result keys: success, url, markdown, word_count, progress (str).
160 """
161 log = _ProgressLog()
162 if _check_available():
163 log.step("Using crawl4ai (in-process, JS rendering enabled)")
164 try:
165 result = _run_async(_crawl_single(url, timeout, log))
166 result['progress'] = log.text()
167 return result
168 except Exception as e:
169 log.step(f"crawl4ai failed: {e}, falling back to requests")
170 else:
171 log.step("crawl4ai not installed, using requests+BeautifulSoup")
173 result = _fallback_fetch(url, timeout, log)
174 result['progress'] = log.text()
175 return result
178def crawl_urls(urls: List[str], timeout: int = 30, max_concurrent: int = 3) -> List[dict]:
179 """
180 Crawl multiple URLs. Returns list of result dicts, each with progress.
181 """
182 if not urls:
183 return []
185 log = _ProgressLog()
186 log.step(f"Batch crawl: {len(urls)} URLs, max_concurrent={max_concurrent}")
188 if _check_available():
189 log.step("Using crawl4ai (in-process)")
191 async def _batch():
192 sem = asyncio.Semaphore(max_concurrent)
193 async def _one(u):
194 async with sem:
195 return await _crawl_single(u, timeout, log)
196 return await asyncio.gather(*[_one(u) for u in urls])
198 try:
199 results = _run_async(_batch())
200 success_count = sum(1 for r in results if r['success'])
201 log.step(f"Batch complete: {success_count}/{len(urls)} succeeded")
202 batch_progress = log.text()
203 for r in results:
204 r['progress'] = batch_progress
205 return results
206 except Exception as e:
207 log.step(f"crawl4ai batch failed: {e}, falling back")
208 else:
209 log.step("crawl4ai not installed, sequential fallback")
211 results = []
212 for u in urls:
213 r = _fallback_fetch(u, timeout, log)
214 results.append(r)
215 batch_progress = log.text()
216 for r in results:
217 r['progress'] = batch_progress
218 return results
221def crawl_url_for_agent(url: str, timeout: int = 30) -> str:
222 """
223 Crawl a URL and return a string for the LangChain agent.
225 The agent sees every intermediate step (progress log) followed by content.
226 """
227 result = crawl_url(url, timeout)
228 parts = []
230 # Intermediate progress — agent sees each step
231 if result.get('progress'):
232 parts.append("--- Progress ---")
233 parts.append(result['progress'])
234 parts.append("--- Result ---")
236 if result['success']:
237 content = result['markdown']
238 # Truncate for agent context window
239 if len(content) > 8000:
240 truncate_pos = content.rfind('.', 0, 8000)
241 if truncate_pos > 6000:
242 content = content[:truncate_pos + 1] + "\n[Content truncated]"
243 else:
244 content = content[:8000] + "\n[Content truncated]"
245 parts.append(f"URL: {url}")
246 parts.append(f"Words extracted: {result['word_count']}")
247 parts.append(f"Content:\n{content}")
248 else:
249 parts.append(f"FAILED: {url}")
250 parts.append(f"Error: {result['error']}")
252 return "\n".join(parts)
255def crawl_urls_for_agent(urls: List[str], timeout: int = 30) -> str:
256 """
257 Crawl multiple URLs and return combined agent-readable output.
258 Includes progress log so agent sees intermediate steps.
259 """
260 results = crawl_urls(urls, timeout)
261 parts = []
263 # Shared progress log (all results have the same batch progress)
264 if results and results[0].get('progress'):
265 parts.append("--- Progress ---")
266 parts.append(results[0]['progress'])
267 parts.append("--- Results ---")
269 success_count = 0
270 for r in results:
271 if r['success']:
272 success_count += 1
273 content = r['markdown']
274 if len(content) > 4000:
275 truncate_pos = content.rfind('.', 0, 4000)
276 if truncate_pos > 3000:
277 content = content[:truncate_pos + 1] + " [truncated]"
278 else:
279 content = content[:4000] + " [truncated]"
280 parts.append(f"\n## {r['url']}\nWords: {r['word_count']}\n{content}")
281 else:
282 parts.append(f"\n## {r['url']}\nFailed: {r['error']}")
284 header = f"Crawled {success_count}/{len(urls)} URLs successfully"
285 return header + "\n" + "\n".join(parts)