Coverage for integrations / agent_engine / news_tools.py: 95.6%
90 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2News Agent Tools — AutoGen tools for news curation and push notifications.
4Tier 2 tools (agent_engine context). Same registration pattern as revenue_tools.py.
5Uses existing FeedImporter, NotificationService, and feed_engine infrastructure.
6"""
7import json
8import logging
9from typing import Annotated, Optional
11logger = logging.getLogger('hevolve_social')
14def register_news_tools(helper, assistant, user_id: str):
15 """Register news curation and push notification tools with an AutoGen agent."""
17 def fetch_news_feeds(
18 feed_urls: Annotated[str, "Comma-separated RSS/Atom feed URLs to fetch"],
19 max_items: Annotated[int, "Maximum items to return per feed"] = 10,
20 ) -> str:
21 """Fetch and parse RSS/Atom/JSON feeds. Returns titles, links, categories."""
22 try:
23 from integrations.social.feed_import import FeedImporter
25 importer = FeedImporter()
26 all_items = []
27 errors = []
29 for url in feed_urls.split(','):
30 url = url.strip()
31 if not url:
32 continue
33 try:
34 metadata, items, _ = importer.fetch_feed(url)
35 for item in items[:max_items]:
36 all_items.append({
37 'title': item.title,
38 'link': item.link,
39 'author': item.author,
40 'published': item.published.isoformat() if item.published else None,
41 'categories': item.categories,
42 'source': metadata.title or url,
43 'content_preview': item.content[:200] if item.content else '',
44 })
45 except Exception as e:
46 errors.append({'url': url, 'error': str(e)})
48 return json.dumps({
49 'items': all_items,
50 'total': len(all_items),
51 'errors': errors,
52 })
53 except Exception as e:
54 return json.dumps({'error': str(e)})
56 def subscribe_news_feed(
57 feed_url: Annotated[str, "RSS/Atom/JSON feed URL to subscribe to"],
58 categories: Annotated[str, "Comma-separated category tags for this feed"] = '',
59 ) -> str:
60 """Subscribe to a new RSS/Atom feed for ongoing monitoring."""
61 try:
62 from integrations.social.feed_import import FeedSubscriptionService
63 from integrations.social.models import get_db
65 db = get_db()
66 try:
67 svc = FeedSubscriptionService(db)
68 result = svc.subscribe(
69 user_id=int(user_id) if user_id.isdigit() else 0,
70 feed_url=feed_url.strip(),
71 auto_import=True,
72 )
73 if categories:
74 result['categories'] = [c.strip() for c in categories.split(',')]
75 return json.dumps(result)
76 finally:
77 db.close()
78 except Exception as e:
79 return json.dumps({'error': str(e)})
81 def send_news_notification(
82 title: Annotated[str, "Notification title (news headline)"],
83 message: Annotated[str, "Notification body (summary + source attribution)"],
84 source_url: Annotated[str, "Link to the original article"],
85 scope: Annotated[str, "Target scope: all, regional, or a specific user_id"] = 'all',
86 category: Annotated[str, "News category tag"] = 'news',
87 ) -> str:
88 """Push a curated news item as notification to users."""
89 try:
90 from integrations.social.models import get_db, User, Notification
91 from integrations.social.services import NotificationService
92 from sqlalchemy import func
94 db = get_db()
95 try:
96 # Determine target users
97 if scope == 'all':
98 target_users = db.query(User.id).filter(
99 User.is_active == True # noqa: E712
100 ).limit(1000).all()
101 target_ids = [str(u.id) for u in target_users]
102 elif scope == 'regional':
103 # For regional: broadcast to all active users
104 # Region filtering will be refined when regional user assignment exists
105 target_users = db.query(User.id).filter(
106 User.is_active == True # noqa: E712
107 ).limit(500).all()
108 target_ids = [str(u.id) for u in target_users]
109 else:
110 # Specific user_id
111 target_ids = [scope]
113 sent_count = 0
114 full_message = f"{message}\n\nSource: {source_url}"
116 for uid in target_ids:
117 try:
118 NotificationService.create(
119 db,
120 user_id=uid,
121 type=f'news_{category}',
122 source_user_id=user_id,
123 target_type='news',
124 target_id=source_url[:64],
125 message=f"{title}\n{full_message}",
126 )
127 sent_count += 1
128 except Exception:
129 continue
131 if sent_count:
132 db.commit()
134 return json.dumps({
135 'success': True,
136 'sent_count': sent_count,
137 'scope': scope,
138 'category': category,
139 'title': title,
140 })
141 finally:
142 db.close()
143 except Exception as e:
144 return json.dumps({'error': str(e)})
146 def get_trending_news(
147 limit: Annotated[int, "Maximum number of trending items to return"] = 10,
148 ) -> str:
149 """Get trending/hot news items from already-imported feed posts."""
150 try:
151 from integrations.social.models import get_db
152 from integrations.social.feed_engine import get_trending_feed
154 db = get_db()
155 try:
156 posts = get_trending_feed(db, limit=limit)
157 items = []
158 for post in posts:
159 p = post.to_dict() if hasattr(post, 'to_dict') else {}
160 items.append({
161 'id': p.get('id'),
162 'title': p.get('title', ''),
163 'content_preview': (p.get('content', '') or '')[:200],
164 'author_id': p.get('author_id'),
165 'vote_count': p.get('vote_count', 0),
166 'comment_count': p.get('comment_count', 0),
167 'created_at': p.get('created_at'),
168 })
169 return json.dumps({'trending': items, 'count': len(items)})
170 finally:
171 db.close()
172 except Exception as e:
173 return json.dumps({'error': str(e)})
175 def get_news_metrics(
176 days: Annotated[int, "Number of days to look back"] = 7,
177 scope: Annotated[str, "Filter by notification type prefix: news_world, news_local, or all"] = 'all',
178 ) -> str:
179 """Get news notification delivery stats (sent count, read rate)."""
180 try:
181 from integrations.social.models import get_db, Notification
182 from sqlalchemy import func
183 from datetime import datetime, timedelta
185 db = get_db()
186 try:
187 cutoff = datetime.utcnow() - timedelta(days=days)
188 query = db.query(Notification).filter(
189 Notification.created_at >= cutoff,
190 Notification.type.like('news_%'),
191 )
192 if scope != 'all':
193 query = query.filter(Notification.type == scope)
195 total = query.count()
196 read_count = query.filter(Notification.is_read == True).count() # noqa: E712
198 # Group by type
199 type_counts = db.query(
200 Notification.type,
201 func.count(Notification.id),
202 ).filter(
203 Notification.created_at >= cutoff,
204 Notification.type.like('news_%'),
205 ).group_by(Notification.type).all()
207 return json.dumps({
208 'period_days': days,
209 'total_sent': total,
210 'total_read': read_count,
211 'read_rate': round(read_count / max(total, 1), 3),
212 'by_type': {t: c for t, c in type_counts},
213 })
214 finally:
215 db.close()
216 except Exception as e:
217 return json.dumps({'error': str(e)})
219 tools = [
220 ('fetch_news_feeds',
221 'Fetch and parse RSS/Atom feeds, returning titles, links, and categories',
222 fetch_news_feeds),
223 ('subscribe_news_feed',
224 'Subscribe to a new RSS/Atom feed URL for ongoing monitoring',
225 subscribe_news_feed),
226 ('send_news_notification',
227 'Push a curated news item as notification to users (all, regional, or specific user)',
228 send_news_notification),
229 ('get_trending_news',
230 'Get trending/hot news items from imported feed posts',
231 get_trending_news),
232 ('get_news_metrics',
233 'Get news notification delivery stats: sent count, read rate, by category',
234 get_news_metrics),
235 ]
237 for name, desc, func in tools:
238 helper.register_for_llm(name=name, description=desc)(func)
239 assistant.register_for_execution(name=name)(func)
241 logger.info(f"Registered {len(tools)} news tools for user {user_id}")