Coverage for integrations / social / feed_import.py: 33.2%
229 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2feed_import.py - RSS and Atom feed consumption for HevolveSocial
4Imports external feeds and creates social posts from them.
5Supports RSS 2.0, Atom 1.0, and JSON Feed formats.
7Usage:
8 from integrations.social.feed_import import FeedImporter, FeedSubscriptionService
10 importer = FeedImporter(db)
11 items = importer.fetch_feed('https://example.com/feed.xml')
12 importer.import_items(items, user_id=123)
13"""
15import logging
16import hashlib
17import requests
18from core.http_pool import pooled_get
19from datetime import datetime, timezone, timedelta
20from typing import List, Dict, Any, Optional, Tuple
21from dataclasses import dataclass, field
22from enum import Enum
23import json
24import re
26logger = logging.getLogger('hevolve_social')
28# Try to import feedparser (optional dependency)
29try:
30 import feedparser
31 FEEDPARSER_AVAILABLE = True
32except ImportError:
33 FEEDPARSER_AVAILABLE = False
34 logger.warning("feedparser not available - using basic XML parsing")
37class FeedFormat(Enum):
38 """Supported feed formats."""
39 RSS = 'rss'
40 ATOM = 'atom'
41 JSON = 'json'
42 UNKNOWN = 'unknown'
45@dataclass
46class FeedItem:
47 """Represents a single feed item."""
48 id: str
49 title: str
50 content: str
51 link: str
52 author: str = ''
53 published: Optional[datetime] = None
54 updated: Optional[datetime] = None
55 categories: List[str] = field(default_factory=list)
56 media_urls: List[str] = field(default_factory=list)
57 source_feed: str = ''
58 content_hash: str = ''
60 def __post_init__(self):
61 """Calculate content hash for deduplication."""
62 if not self.content_hash:
63 hash_content = f"{self.link}:{self.title}:{self.content[:500]}"
64 self.content_hash = hashlib.sha256(hash_content.encode()).hexdigest()[:32]
67@dataclass
68class FeedMetadata:
69 """Metadata about a feed."""
70 url: str
71 title: str = ''
72 description: str = ''
73 link: str = ''
74 format: FeedFormat = FeedFormat.UNKNOWN
75 last_updated: Optional[datetime] = None
76 etag: str = ''
77 last_modified: str = ''
80class FeedImporter:
81 """Fetches and parses external RSS/Atom/JSON feeds."""
83 def __init__(self, db_session=None, timeout: int = 30):
84 """
85 Initialize feed importer.
87 Args:
88 db_session: SQLAlchemy database session (optional)
89 timeout: Request timeout in seconds
90 """
91 self.db = db_session
92 self.timeout = timeout
93 self.user_agent = 'HARTSocial/1.0 (+https://hevolve.ai)'
95 def _detect_format(self, content: str) -> FeedFormat:
96 """Detect feed format from content."""
97 content_lower = content.strip().lower()
99 if content_lower.startswith('{'):
100 try:
101 data = json.loads(content)
102 if 'version' in data and 'jsonfeed' in str(data.get('version', '')).lower():
103 return FeedFormat.JSON
104 if 'items' in data or 'entries' in data:
105 return FeedFormat.JSON
106 except Exception:
107 pass
109 if '<feed' in content_lower and 'xmlns' in content_lower:
110 return FeedFormat.ATOM
112 if '<rss' in content_lower or '<channel>' in content_lower:
113 return FeedFormat.RSS
115 return FeedFormat.UNKNOWN
117 def _extract_images_from_content(self, content: str) -> List[str]:
118 """Extract image URLs from HTML content."""
119 images = []
120 # Find img tags
121 img_pattern = r'<img[^>]+src=["\']([^"\']+)["\']'
122 images.extend(re.findall(img_pattern, content, re.IGNORECASE))
124 # Find media:content or enclosure
125 media_pattern = r'<media:content[^>]+url=["\']([^"\']+)["\']'
126 images.extend(re.findall(media_pattern, content, re.IGNORECASE))
128 enclosure_pattern = r'<enclosure[^>]+url=["\']([^"\']+)["\']'
129 images.extend(re.findall(enclosure_pattern, content, re.IGNORECASE))
131 return list(set(images))
133 def _parse_datetime(self, date_str: str) -> Optional[datetime]:
134 """Parse various datetime formats."""
135 if not date_str:
136 return None
138 formats = [
139 '%Y-%m-%dT%H:%M:%S%z',
140 '%Y-%m-%dT%H:%M:%SZ',
141 '%Y-%m-%dT%H:%M:%S.%f%z',
142 '%Y-%m-%dT%H:%M:%S.%fZ',
143 '%a, %d %b %Y %H:%M:%S %z',
144 '%a, %d %b %Y %H:%M:%S %Z',
145 '%Y-%m-%d %H:%M:%S',
146 '%Y-%m-%d',
147 ]
149 date_str = date_str.strip()
151 # Handle common timezone abbreviations
152 date_str = date_str.replace('GMT', '+0000').replace('UTC', '+0000')
154 for fmt in formats:
155 try:
156 dt = datetime.strptime(date_str, fmt)
157 if dt.tzinfo is None:
158 dt = dt.replace(tzinfo=timezone.utc)
159 return dt
160 except ValueError:
161 continue
163 # Try feedparser's date parsing if available
164 if FEEDPARSER_AVAILABLE:
165 try:
166 parsed = feedparser._parse_date(date_str)
167 if parsed:
168 return datetime(*parsed[:6], tzinfo=timezone.utc)
169 except Exception:
170 pass
172 logger.warning(f"Could not parse date: {date_str}")
173 return None
175 def _parse_with_feedparser(self, content: str, url: str) -> Tuple[FeedMetadata, List[FeedItem]]:
176 """Parse feed using feedparser library."""
177 feed = feedparser.parse(content)
179 # Extract metadata
180 metadata = FeedMetadata(
181 url=url,
182 title=feed.feed.get('title', ''),
183 description=feed.feed.get('description', feed.feed.get('subtitle', '')),
184 link=feed.feed.get('link', ''),
185 format=FeedFormat.RSS if feed.version.startswith('rss') else FeedFormat.ATOM,
186 last_updated=self._parse_datetime(feed.feed.get('updated', feed.feed.get('published', '')))
187 )
189 # Extract items
190 items = []
191 for entry in feed.entries:
192 # Get content
193 content = ''
194 if entry.get('content'):
195 content = entry.content[0].get('value', '')
196 elif entry.get('summary'):
197 content = entry.summary
198 elif entry.get('description'):
199 content = entry.description
201 # Extract images
202 media_urls = self._extract_images_from_content(content)
203 if entry.get('media_content'):
204 for media in entry.media_content:
205 if media.get('url'):
206 media_urls.append(media['url'])
207 if entry.get('enclosures'):
208 for enc in entry.enclosures:
209 if enc.get('href'):
210 media_urls.append(enc['href'])
212 # Get author
213 author = ''
214 if entry.get('author'):
215 author = entry.author
216 elif entry.get('authors'):
217 author = entry.authors[0].get('name', '') if entry.authors else ''
219 # Categories
220 categories = [tag.get('term', tag) if isinstance(tag, dict) else str(tag)
221 for tag in entry.get('tags', [])]
223 item = FeedItem(
224 id=entry.get('id', entry.get('link', '')),
225 title=entry.get('title', ''),
226 content=content,
227 link=entry.get('link', ''),
228 author=author,
229 published=self._parse_datetime(entry.get('published', '')),
230 updated=self._parse_datetime(entry.get('updated', '')),
231 categories=categories,
232 media_urls=list(set(media_urls)),
233 source_feed=url
234 )
235 items.append(item)
237 return metadata, items
239 def _parse_json_feed(self, content: str, url: str) -> Tuple[FeedMetadata, List[FeedItem]]:
240 """Parse JSON Feed format."""
241 data = json.loads(content)
243 metadata = FeedMetadata(
244 url=url,
245 title=data.get('title', ''),
246 description=data.get('description', ''),
247 link=data.get('home_page_url', ''),
248 format=FeedFormat.JSON
249 )
251 items = []
252 for entry in data.get('items', []):
253 # Get content
254 content = entry.get('content_html', entry.get('content_text', ''))
256 # Media
257 media_urls = []
258 for attachment in entry.get('attachments', []):
259 if attachment.get('url'):
260 media_urls.append(attachment['url'])
261 if entry.get('image'):
262 media_urls.append(entry['image'])
264 # Author
265 author = ''
266 authors = entry.get('authors', [entry.get('author')] if entry.get('author') else [])
267 if authors and authors[0]:
268 author = authors[0].get('name', '') if isinstance(authors[0], dict) else str(authors[0])
270 item = FeedItem(
271 id=entry.get('id', entry.get('url', '')),
272 title=entry.get('title', ''),
273 content=content,
274 link=entry.get('url', ''),
275 author=author,
276 published=self._parse_datetime(entry.get('date_published', '')),
277 updated=self._parse_datetime(entry.get('date_modified', '')),
278 categories=entry.get('tags', []),
279 media_urls=list(set(media_urls)),
280 source_feed=url
281 )
282 items.append(item)
284 return metadata, items
286 def fetch_feed(self, url: str, etag: str = None,
287 last_modified: str = None) -> Tuple[FeedMetadata, List[FeedItem], bool]:
288 """
289 Fetch and parse a feed from URL.
291 Args:
292 url: Feed URL
293 etag: Previous ETag for conditional requests
294 last_modified: Previous Last-Modified for conditional requests
296 Returns:
297 Tuple of (metadata, items, was_modified)
298 """
299 headers = {'User-Agent': self.user_agent}
300 if etag:
301 headers['If-None-Match'] = etag
302 if last_modified:
303 headers['If-Modified-Since'] = last_modified
305 try:
306 response = pooled_get(url, headers=headers, timeout=self.timeout)
308 # Handle 304 Not Modified
309 if response.status_code == 304:
310 return FeedMetadata(url=url), [], False
312 response.raise_for_status()
313 content = response.text
315 # Detect format
316 feed_format = self._detect_format(content)
318 if feed_format == FeedFormat.JSON:
319 metadata, items = self._parse_json_feed(content, url)
320 elif FEEDPARSER_AVAILABLE:
321 metadata, items = self._parse_with_feedparser(content, url)
322 else:
323 raise ValueError("feedparser not available and non-JSON feed detected")
325 # Store conditional request headers
326 metadata.etag = response.headers.get('ETag', '')
327 metadata.last_modified = response.headers.get('Last-Modified', '')
329 return metadata, items, True
331 except requests.RequestException as e:
332 logger.error(f"Error fetching feed {url}: {e}")
333 raise
334 except Exception as e:
335 logger.error(f"Error parsing feed {url}: {e}")
336 raise
338 def import_items(self, items: List[FeedItem], user_id: int,
339 community_id: int = None, auto_tag: bool = True) -> List[int]:
340 """
341 Import feed items as social posts.
343 Args:
344 items: List of FeedItem objects
345 user_id: User ID to attribute posts to
346 community_id: Optional community to post to
347 auto_tag: Whether to auto-generate tags
349 Returns:
350 List of created post IDs
351 """
352 if not self.db:
353 raise ValueError("Database session required for importing")
355 try:
356 from .models import Post, get_db
357 from .services import PostService
359 created_ids = []
361 for item in items:
362 # Check for duplicates by content hash
363 existing = self.db.query(Post).filter(
364 Post.content.contains(item.content_hash)
365 ).first()
367 if existing:
368 logger.debug(f"Skipping duplicate item: {item.title}")
369 continue
371 # Build post content
372 content = f"**{item.title}**\n\n{item.content}"
373 if item.link:
374 content += f"\n\n[Source]({item.link})"
376 # Add content hash for future dedup
377 content += f"\n\n<!-- feed_hash:{item.content_hash} -->"
379 # Prepare tags
380 tags = list(item.categories)
381 if auto_tag and item.source_feed:
382 # Add source domain as tag
383 from urllib.parse import urlparse
384 domain = urlparse(item.source_feed).netloc
385 if domain:
386 tags.append(f"via:{domain.replace('www.', '')}")
388 # Create post
389 try:
390 post = PostService.create_post(
391 self.db,
392 author_id=user_id,
393 content=content,
394 tags=tags[:10], # Limit tags
395 media_urls=item.media_urls[:5], # Limit media
396 community_id=community_id,
397 post_type='link' if item.link else 'text'
398 )
399 self.db.commit()
400 created_ids.append(post.id)
401 logger.info(f"Imported feed item as post {post.id}: {item.title[:50]}")
402 except Exception as e:
403 self.db.rollback()
404 logger.error(f"Error creating post from feed item: {e}")
406 return created_ids
408 except ImportError as e:
409 logger.error(f"Cannot import - missing models: {e}")
410 raise
413class FeedSubscriptionService:
414 """Manages feed subscriptions for users."""
416 def __init__(self, db_session):
417 self.db = db_session
418 self.importer = FeedImporter(db_session)
420 def subscribe(self, user_id: int, feed_url: str,
421 community_id: int = None, auto_import: bool = True) -> Dict[str, Any]:
422 """
423 Subscribe a user to a feed.
425 Args:
426 user_id: User ID
427 feed_url: Feed URL to subscribe to
428 community_id: Optional community to post imported items to
429 auto_import: Whether to automatically import new items
431 Returns:
432 Subscription details
433 """
434 try:
435 # Validate feed
436 metadata, items, _ = self.importer.fetch_feed(feed_url)
438 # Store subscription (using a simple JSON approach for now)
439 # In production, this would use a FeedSubscription model
440 subscription = {
441 'user_id': user_id,
442 'feed_url': feed_url,
443 'feed_title': metadata.title,
444 'community_id': community_id,
445 'auto_import': auto_import,
446 'etag': metadata.etag,
447 'last_modified': metadata.last_modified,
448 'last_checked': datetime.now(timezone.utc).isoformat(),
449 'item_count': len(items),
450 'status': 'active'
451 }
453 logger.info(f"User {user_id} subscribed to feed: {feed_url}")
454 return subscription
456 except Exception as e:
457 logger.error(f"Error subscribing to feed: {e}")
458 return {'error': str(e), 'status': 'failed'}
460 def check_feed(self, subscription: Dict[str, Any]) -> List[FeedItem]:
461 """
462 Check a subscription for new items.
464 Args:
465 subscription: Subscription details dict
467 Returns:
468 List of new FeedItem objects
469 """
470 try:
471 metadata, items, was_modified = self.importer.fetch_feed(
472 subscription['feed_url'],
473 etag=subscription.get('etag'),
474 last_modified=subscription.get('last_modified')
475 )
477 if not was_modified:
478 return []
480 # Update subscription
481 subscription['etag'] = metadata.etag
482 subscription['last_modified'] = metadata.last_modified
483 subscription['last_checked'] = datetime.now(timezone.utc).isoformat()
485 return items
487 except Exception as e:
488 logger.error(f"Error checking feed: {e}")
489 subscription['status'] = 'error'
490 subscription['last_error'] = str(e)
491 return []
493 def import_new_items(self, subscription: Dict[str, Any]) -> int:
494 """
495 Import new items from a subscription.
497 Args:
498 subscription: Subscription details
500 Returns:
501 Number of items imported
502 """
503 items = self.check_feed(subscription)
504 if not items:
505 return 0
507 created_ids = self.importer.import_items(
508 items,
509 user_id=subscription['user_id'],
510 community_id=subscription.get('community_id')
511 )
513 return len(created_ids)
516# Convenience functions
517def fetch_and_parse_feed(url: str) -> Tuple[FeedMetadata, List[FeedItem]]:
518 """Fetch and parse a feed without database access."""
519 importer = FeedImporter()
520 metadata, items, _ = importer.fetch_feed(url)
521 return metadata, items
524def preview_feed(url: str, limit: int = 5) -> Dict[str, Any]:
525 """Preview a feed's contents."""
526 try:
527 metadata, items = fetch_and_parse_feed(url)
528 return {
529 'success': True,
530 'metadata': {
531 'title': metadata.title,
532 'description': metadata.description,
533 'link': metadata.link,
534 'format': metadata.format.value
535 },
536 'item_count': len(items),
537 'preview_items': [
538 {
539 'title': item.title,
540 'link': item.link,
541 'author': item.author,
542 'published': item.published.isoformat() if item.published else None,
543 'categories': item.categories
544 }
545 for item in items[:limit]
546 ]
547 }
548 except Exception as e:
549 return {'success': False, 'error': str(e)}