Coverage for integrations / social / feed_import.py: 33.2%

229 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2feed_import.py - RSS and Atom feed consumption for HevolveSocial 

3 

4Imports external feeds and creates social posts from them. 

5Supports RSS 2.0, Atom 1.0, and JSON Feed formats. 

6 

7Usage: 

8 from integrations.social.feed_import import FeedImporter, FeedSubscriptionService 

9 

10 importer = FeedImporter(db) 

11 items = importer.fetch_feed('https://example.com/feed.xml') 

12 importer.import_items(items, user_id=123) 

13""" 

14 

15import logging 

16import hashlib 

17import requests 

18from core.http_pool import pooled_get 

19from datetime import datetime, timezone, timedelta 

20from typing import List, Dict, Any, Optional, Tuple 

21from dataclasses import dataclass, field 

22from enum import Enum 

23import json 

24import re 

25 

26logger = logging.getLogger('hevolve_social') 

27 

28# Try to import feedparser (optional dependency) 

29try: 

30 import feedparser 

31 FEEDPARSER_AVAILABLE = True 

32except ImportError: 

33 FEEDPARSER_AVAILABLE = False 

34 logger.warning("feedparser not available - using basic XML parsing") 

35 

36 

37class FeedFormat(Enum): 

38 """Supported feed formats.""" 

39 RSS = 'rss' 

40 ATOM = 'atom' 

41 JSON = 'json' 

42 UNKNOWN = 'unknown' 

43 

44 

45@dataclass 

46class FeedItem: 

47 """Represents a single feed item.""" 

48 id: str 

49 title: str 

50 content: str 

51 link: str 

52 author: str = '' 

53 published: Optional[datetime] = None 

54 updated: Optional[datetime] = None 

55 categories: List[str] = field(default_factory=list) 

56 media_urls: List[str] = field(default_factory=list) 

57 source_feed: str = '' 

58 content_hash: str = '' 

59 

60 def __post_init__(self): 

61 """Calculate content hash for deduplication.""" 

62 if not self.content_hash: 

63 hash_content = f"{self.link}:{self.title}:{self.content[:500]}" 

64 self.content_hash = hashlib.sha256(hash_content.encode()).hexdigest()[:32] 

65 

66 

67@dataclass 

68class FeedMetadata: 

69 """Metadata about a feed.""" 

70 url: str 

71 title: str = '' 

72 description: str = '' 

73 link: str = '' 

74 format: FeedFormat = FeedFormat.UNKNOWN 

75 last_updated: Optional[datetime] = None 

76 etag: str = '' 

77 last_modified: str = '' 

78 

79 

80class FeedImporter: 

81 """Fetches and parses external RSS/Atom/JSON feeds.""" 

82 

83 def __init__(self, db_session=None, timeout: int = 30): 

84 """ 

85 Initialize feed importer. 

86 

87 Args: 

88 db_session: SQLAlchemy database session (optional) 

89 timeout: Request timeout in seconds 

90 """ 

91 self.db = db_session 

92 self.timeout = timeout 

93 self.user_agent = 'HARTSocial/1.0 (+https://hevolve.ai)' 

94 

95 def _detect_format(self, content: str) -> FeedFormat: 

96 """Detect feed format from content.""" 

97 content_lower = content.strip().lower() 

98 

99 if content_lower.startswith('{'): 

100 try: 

101 data = json.loads(content) 

102 if 'version' in data and 'jsonfeed' in str(data.get('version', '')).lower(): 

103 return FeedFormat.JSON 

104 if 'items' in data or 'entries' in data: 

105 return FeedFormat.JSON 

106 except Exception: 

107 pass 

108 

109 if '<feed' in content_lower and 'xmlns' in content_lower: 

110 return FeedFormat.ATOM 

111 

112 if '<rss' in content_lower or '<channel>' in content_lower: 

113 return FeedFormat.RSS 

114 

115 return FeedFormat.UNKNOWN 

116 

117 def _extract_images_from_content(self, content: str) -> List[str]: 

118 """Extract image URLs from HTML content.""" 

119 images = [] 

120 # Find img tags 

121 img_pattern = r'<img[^>]+src=["\']([^"\']+)["\']' 

122 images.extend(re.findall(img_pattern, content, re.IGNORECASE)) 

123 

124 # Find media:content or enclosure 

125 media_pattern = r'<media:content[^>]+url=["\']([^"\']+)["\']' 

126 images.extend(re.findall(media_pattern, content, re.IGNORECASE)) 

127 

128 enclosure_pattern = r'<enclosure[^>]+url=["\']([^"\']+)["\']' 

129 images.extend(re.findall(enclosure_pattern, content, re.IGNORECASE)) 

130 

131 return list(set(images)) 

132 

133 def _parse_datetime(self, date_str: str) -> Optional[datetime]: 

134 """Parse various datetime formats.""" 

135 if not date_str: 

136 return None 

137 

138 formats = [ 

139 '%Y-%m-%dT%H:%M:%S%z', 

140 '%Y-%m-%dT%H:%M:%SZ', 

141 '%Y-%m-%dT%H:%M:%S.%f%z', 

142 '%Y-%m-%dT%H:%M:%S.%fZ', 

143 '%a, %d %b %Y %H:%M:%S %z', 

144 '%a, %d %b %Y %H:%M:%S %Z', 

145 '%Y-%m-%d %H:%M:%S', 

146 '%Y-%m-%d', 

147 ] 

148 

149 date_str = date_str.strip() 

150 

151 # Handle common timezone abbreviations 

152 date_str = date_str.replace('GMT', '+0000').replace('UTC', '+0000') 

153 

154 for fmt in formats: 

155 try: 

156 dt = datetime.strptime(date_str, fmt) 

157 if dt.tzinfo is None: 

158 dt = dt.replace(tzinfo=timezone.utc) 

159 return dt 

160 except ValueError: 

161 continue 

162 

163 # Try feedparser's date parsing if available 

164 if FEEDPARSER_AVAILABLE: 

165 try: 

166 parsed = feedparser._parse_date(date_str) 

167 if parsed: 

168 return datetime(*parsed[:6], tzinfo=timezone.utc) 

169 except Exception: 

170 pass 

171 

172 logger.warning(f"Could not parse date: {date_str}") 

173 return None 

174 

175 def _parse_with_feedparser(self, content: str, url: str) -> Tuple[FeedMetadata, List[FeedItem]]: 

176 """Parse feed using feedparser library.""" 

177 feed = feedparser.parse(content) 

178 

179 # Extract metadata 

180 metadata = FeedMetadata( 

181 url=url, 

182 title=feed.feed.get('title', ''), 

183 description=feed.feed.get('description', feed.feed.get('subtitle', '')), 

184 link=feed.feed.get('link', ''), 

185 format=FeedFormat.RSS if feed.version.startswith('rss') else FeedFormat.ATOM, 

186 last_updated=self._parse_datetime(feed.feed.get('updated', feed.feed.get('published', ''))) 

187 ) 

188 

189 # Extract items 

190 items = [] 

191 for entry in feed.entries: 

192 # Get content 

193 content = '' 

194 if entry.get('content'): 

195 content = entry.content[0].get('value', '') 

196 elif entry.get('summary'): 

197 content = entry.summary 

198 elif entry.get('description'): 

199 content = entry.description 

200 

201 # Extract images 

202 media_urls = self._extract_images_from_content(content) 

203 if entry.get('media_content'): 

204 for media in entry.media_content: 

205 if media.get('url'): 

206 media_urls.append(media['url']) 

207 if entry.get('enclosures'): 

208 for enc in entry.enclosures: 

209 if enc.get('href'): 

210 media_urls.append(enc['href']) 

211 

212 # Get author 

213 author = '' 

214 if entry.get('author'): 

215 author = entry.author 

216 elif entry.get('authors'): 

217 author = entry.authors[0].get('name', '') if entry.authors else '' 

218 

219 # Categories 

220 categories = [tag.get('term', tag) if isinstance(tag, dict) else str(tag) 

221 for tag in entry.get('tags', [])] 

222 

223 item = FeedItem( 

224 id=entry.get('id', entry.get('link', '')), 

225 title=entry.get('title', ''), 

226 content=content, 

227 link=entry.get('link', ''), 

228 author=author, 

229 published=self._parse_datetime(entry.get('published', '')), 

230 updated=self._parse_datetime(entry.get('updated', '')), 

231 categories=categories, 

232 media_urls=list(set(media_urls)), 

233 source_feed=url 

234 ) 

235 items.append(item) 

236 

237 return metadata, items 

238 

239 def _parse_json_feed(self, content: str, url: str) -> Tuple[FeedMetadata, List[FeedItem]]: 

240 """Parse JSON Feed format.""" 

241 data = json.loads(content) 

242 

243 metadata = FeedMetadata( 

244 url=url, 

245 title=data.get('title', ''), 

246 description=data.get('description', ''), 

247 link=data.get('home_page_url', ''), 

248 format=FeedFormat.JSON 

249 ) 

250 

251 items = [] 

252 for entry in data.get('items', []): 

253 # Get content 

254 content = entry.get('content_html', entry.get('content_text', '')) 

255 

256 # Media 

257 media_urls = [] 

258 for attachment in entry.get('attachments', []): 

259 if attachment.get('url'): 

260 media_urls.append(attachment['url']) 

261 if entry.get('image'): 

262 media_urls.append(entry['image']) 

263 

264 # Author 

265 author = '' 

266 authors = entry.get('authors', [entry.get('author')] if entry.get('author') else []) 

267 if authors and authors[0]: 

268 author = authors[0].get('name', '') if isinstance(authors[0], dict) else str(authors[0]) 

269 

270 item = FeedItem( 

271 id=entry.get('id', entry.get('url', '')), 

272 title=entry.get('title', ''), 

273 content=content, 

274 link=entry.get('url', ''), 

275 author=author, 

276 published=self._parse_datetime(entry.get('date_published', '')), 

277 updated=self._parse_datetime(entry.get('date_modified', '')), 

278 categories=entry.get('tags', []), 

279 media_urls=list(set(media_urls)), 

280 source_feed=url 

281 ) 

282 items.append(item) 

283 

284 return metadata, items 

285 

286 def fetch_feed(self, url: str, etag: str = None, 

287 last_modified: str = None) -> Tuple[FeedMetadata, List[FeedItem], bool]: 

288 """ 

289 Fetch and parse a feed from URL. 

290 

291 Args: 

292 url: Feed URL 

293 etag: Previous ETag for conditional requests 

294 last_modified: Previous Last-Modified for conditional requests 

295 

296 Returns: 

297 Tuple of (metadata, items, was_modified) 

298 """ 

299 headers = {'User-Agent': self.user_agent} 

300 if etag: 

301 headers['If-None-Match'] = etag 

302 if last_modified: 

303 headers['If-Modified-Since'] = last_modified 

304 

305 try: 

306 response = pooled_get(url, headers=headers, timeout=self.timeout) 

307 

308 # Handle 304 Not Modified 

309 if response.status_code == 304: 

310 return FeedMetadata(url=url), [], False 

311 

312 response.raise_for_status() 

313 content = response.text 

314 

315 # Detect format 

316 feed_format = self._detect_format(content) 

317 

318 if feed_format == FeedFormat.JSON: 

319 metadata, items = self._parse_json_feed(content, url) 

320 elif FEEDPARSER_AVAILABLE: 

321 metadata, items = self._parse_with_feedparser(content, url) 

322 else: 

323 raise ValueError("feedparser not available and non-JSON feed detected") 

324 

325 # Store conditional request headers 

326 metadata.etag = response.headers.get('ETag', '') 

327 metadata.last_modified = response.headers.get('Last-Modified', '') 

328 

329 return metadata, items, True 

330 

331 except requests.RequestException as e: 

332 logger.error(f"Error fetching feed {url}: {e}") 

333 raise 

334 except Exception as e: 

335 logger.error(f"Error parsing feed {url}: {e}") 

336 raise 

337 

338 def import_items(self, items: List[FeedItem], user_id: int, 

339 community_id: int = None, auto_tag: bool = True) -> List[int]: 

340 """ 

341 Import feed items as social posts. 

342 

343 Args: 

344 items: List of FeedItem objects 

345 user_id: User ID to attribute posts to 

346 community_id: Optional community to post to 

347 auto_tag: Whether to auto-generate tags 

348 

349 Returns: 

350 List of created post IDs 

351 """ 

352 if not self.db: 

353 raise ValueError("Database session required for importing") 

354 

355 try: 

356 from .models import Post, get_db 

357 from .services import PostService 

358 

359 created_ids = [] 

360 

361 for item in items: 

362 # Check for duplicates by content hash 

363 existing = self.db.query(Post).filter( 

364 Post.content.contains(item.content_hash) 

365 ).first() 

366 

367 if existing: 

368 logger.debug(f"Skipping duplicate item: {item.title}") 

369 continue 

370 

371 # Build post content 

372 content = f"**{item.title}**\n\n{item.content}" 

373 if item.link: 

374 content += f"\n\n[Source]({item.link})" 

375 

376 # Add content hash for future dedup 

377 content += f"\n\n<!-- feed_hash:{item.content_hash} -->" 

378 

379 # Prepare tags 

380 tags = list(item.categories) 

381 if auto_tag and item.source_feed: 

382 # Add source domain as tag 

383 from urllib.parse import urlparse 

384 domain = urlparse(item.source_feed).netloc 

385 if domain: 

386 tags.append(f"via:{domain.replace('www.', '')}") 

387 

388 # Create post 

389 try: 

390 post = PostService.create_post( 

391 self.db, 

392 author_id=user_id, 

393 content=content, 

394 tags=tags[:10], # Limit tags 

395 media_urls=item.media_urls[:5], # Limit media 

396 community_id=community_id, 

397 post_type='link' if item.link else 'text' 

398 ) 

399 self.db.commit() 

400 created_ids.append(post.id) 

401 logger.info(f"Imported feed item as post {post.id}: {item.title[:50]}") 

402 except Exception as e: 

403 self.db.rollback() 

404 logger.error(f"Error creating post from feed item: {e}") 

405 

406 return created_ids 

407 

408 except ImportError as e: 

409 logger.error(f"Cannot import - missing models: {e}") 

410 raise 

411 

412 

413class FeedSubscriptionService: 

414 """Manages feed subscriptions for users.""" 

415 

416 def __init__(self, db_session): 

417 self.db = db_session 

418 self.importer = FeedImporter(db_session) 

419 

420 def subscribe(self, user_id: int, feed_url: str, 

421 community_id: int = None, auto_import: bool = True) -> Dict[str, Any]: 

422 """ 

423 Subscribe a user to a feed. 

424 

425 Args: 

426 user_id: User ID 

427 feed_url: Feed URL to subscribe to 

428 community_id: Optional community to post imported items to 

429 auto_import: Whether to automatically import new items 

430 

431 Returns: 

432 Subscription details 

433 """ 

434 try: 

435 # Validate feed 

436 metadata, items, _ = self.importer.fetch_feed(feed_url) 

437 

438 # Store subscription (using a simple JSON approach for now) 

439 # In production, this would use a FeedSubscription model 

440 subscription = { 

441 'user_id': user_id, 

442 'feed_url': feed_url, 

443 'feed_title': metadata.title, 

444 'community_id': community_id, 

445 'auto_import': auto_import, 

446 'etag': metadata.etag, 

447 'last_modified': metadata.last_modified, 

448 'last_checked': datetime.now(timezone.utc).isoformat(), 

449 'item_count': len(items), 

450 'status': 'active' 

451 } 

452 

453 logger.info(f"User {user_id} subscribed to feed: {feed_url}") 

454 return subscription 

455 

456 except Exception as e: 

457 logger.error(f"Error subscribing to feed: {e}") 

458 return {'error': str(e), 'status': 'failed'} 

459 

460 def check_feed(self, subscription: Dict[str, Any]) -> List[FeedItem]: 

461 """ 

462 Check a subscription for new items. 

463 

464 Args: 

465 subscription: Subscription details dict 

466 

467 Returns: 

468 List of new FeedItem objects 

469 """ 

470 try: 

471 metadata, items, was_modified = self.importer.fetch_feed( 

472 subscription['feed_url'], 

473 etag=subscription.get('etag'), 

474 last_modified=subscription.get('last_modified') 

475 ) 

476 

477 if not was_modified: 

478 return [] 

479 

480 # Update subscription 

481 subscription['etag'] = metadata.etag 

482 subscription['last_modified'] = metadata.last_modified 

483 subscription['last_checked'] = datetime.now(timezone.utc).isoformat() 

484 

485 return items 

486 

487 except Exception as e: 

488 logger.error(f"Error checking feed: {e}") 

489 subscription['status'] = 'error' 

490 subscription['last_error'] = str(e) 

491 return [] 

492 

493 def import_new_items(self, subscription: Dict[str, Any]) -> int: 

494 """ 

495 Import new items from a subscription. 

496 

497 Args: 

498 subscription: Subscription details 

499 

500 Returns: 

501 Number of items imported 

502 """ 

503 items = self.check_feed(subscription) 

504 if not items: 

505 return 0 

506 

507 created_ids = self.importer.import_items( 

508 items, 

509 user_id=subscription['user_id'], 

510 community_id=subscription.get('community_id') 

511 ) 

512 

513 return len(created_ids) 

514 

515 

516# Convenience functions 

517def fetch_and_parse_feed(url: str) -> Tuple[FeedMetadata, List[FeedItem]]: 

518 """Fetch and parse a feed without database access.""" 

519 importer = FeedImporter() 

520 metadata, items, _ = importer.fetch_feed(url) 

521 return metadata, items 

522 

523 

524def preview_feed(url: str, limit: int = 5) -> Dict[str, Any]: 

525 """Preview a feed's contents.""" 

526 try: 

527 metadata, items = fetch_and_parse_feed(url) 

528 return { 

529 'success': True, 

530 'metadata': { 

531 'title': metadata.title, 

532 'description': metadata.description, 

533 'link': metadata.link, 

534 'format': metadata.format.value 

535 }, 

536 'item_count': len(items), 

537 'preview_items': [ 

538 { 

539 'title': item.title, 

540 'link': item.link, 

541 'author': item.author, 

542 'published': item.published.isoformat() if item.published else None, 

543 'categories': item.categories 

544 } 

545 for item in items[:limit] 

546 ] 

547 } 

548 except Exception as e: 

549 return {'success': False, 'error': str(e)}