Coverage for integrations / agent_engine / news_tools.py: 95.6%

90 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2News Agent Tools — AutoGen tools for news curation and push notifications. 

3 

4Tier 2 tools (agent_engine context). Same registration pattern as revenue_tools.py. 

5Uses existing FeedImporter, NotificationService, and feed_engine infrastructure. 

6""" 

7import json 

8import logging 

9from typing import Annotated, Optional 

10 

11logger = logging.getLogger('hevolve_social') 

12 

13 

14def register_news_tools(helper, assistant, user_id: str): 

15 """Register news curation and push notification tools with an AutoGen agent.""" 

16 

17 def fetch_news_feeds( 

18 feed_urls: Annotated[str, "Comma-separated RSS/Atom feed URLs to fetch"], 

19 max_items: Annotated[int, "Maximum items to return per feed"] = 10, 

20 ) -> str: 

21 """Fetch and parse RSS/Atom/JSON feeds. Returns titles, links, categories.""" 

22 try: 

23 from integrations.social.feed_import import FeedImporter 

24 

25 importer = FeedImporter() 

26 all_items = [] 

27 errors = [] 

28 

29 for url in feed_urls.split(','): 

30 url = url.strip() 

31 if not url: 

32 continue 

33 try: 

34 metadata, items, _ = importer.fetch_feed(url) 

35 for item in items[:max_items]: 

36 all_items.append({ 

37 'title': item.title, 

38 'link': item.link, 

39 'author': item.author, 

40 'published': item.published.isoformat() if item.published else None, 

41 'categories': item.categories, 

42 'source': metadata.title or url, 

43 'content_preview': item.content[:200] if item.content else '', 

44 }) 

45 except Exception as e: 

46 errors.append({'url': url, 'error': str(e)}) 

47 

48 return json.dumps({ 

49 'items': all_items, 

50 'total': len(all_items), 

51 'errors': errors, 

52 }) 

53 except Exception as e: 

54 return json.dumps({'error': str(e)}) 

55 

56 def subscribe_news_feed( 

57 feed_url: Annotated[str, "RSS/Atom/JSON feed URL to subscribe to"], 

58 categories: Annotated[str, "Comma-separated category tags for this feed"] = '', 

59 ) -> str: 

60 """Subscribe to a new RSS/Atom feed for ongoing monitoring.""" 

61 try: 

62 from integrations.social.feed_import import FeedSubscriptionService 

63 from integrations.social.models import get_db 

64 

65 db = get_db() 

66 try: 

67 svc = FeedSubscriptionService(db) 

68 result = svc.subscribe( 

69 user_id=int(user_id) if user_id.isdigit() else 0, 

70 feed_url=feed_url.strip(), 

71 auto_import=True, 

72 ) 

73 if categories: 

74 result['categories'] = [c.strip() for c in categories.split(',')] 

75 return json.dumps(result) 

76 finally: 

77 db.close() 

78 except Exception as e: 

79 return json.dumps({'error': str(e)}) 

80 

81 def send_news_notification( 

82 title: Annotated[str, "Notification title (news headline)"], 

83 message: Annotated[str, "Notification body (summary + source attribution)"], 

84 source_url: Annotated[str, "Link to the original article"], 

85 scope: Annotated[str, "Target scope: all, regional, or a specific user_id"] = 'all', 

86 category: Annotated[str, "News category tag"] = 'news', 

87 ) -> str: 

88 """Push a curated news item as notification to users.""" 

89 try: 

90 from integrations.social.models import get_db, User, Notification 

91 from integrations.social.services import NotificationService 

92 from sqlalchemy import func 

93 

94 db = get_db() 

95 try: 

96 # Determine target users 

97 if scope == 'all': 

98 target_users = db.query(User.id).filter( 

99 User.is_active == True # noqa: E712 

100 ).limit(1000).all() 

101 target_ids = [str(u.id) for u in target_users] 

102 elif scope == 'regional': 

103 # For regional: broadcast to all active users 

104 # Region filtering will be refined when regional user assignment exists 

105 target_users = db.query(User.id).filter( 

106 User.is_active == True # noqa: E712 

107 ).limit(500).all() 

108 target_ids = [str(u.id) for u in target_users] 

109 else: 

110 # Specific user_id 

111 target_ids = [scope] 

112 

113 sent_count = 0 

114 full_message = f"{message}\n\nSource: {source_url}" 

115 

116 for uid in target_ids: 

117 try: 

118 NotificationService.create( 

119 db, 

120 user_id=uid, 

121 type=f'news_{category}', 

122 source_user_id=user_id, 

123 target_type='news', 

124 target_id=source_url[:64], 

125 message=f"{title}\n{full_message}", 

126 ) 

127 sent_count += 1 

128 except Exception: 

129 continue 

130 

131 if sent_count: 

132 db.commit() 

133 

134 return json.dumps({ 

135 'success': True, 

136 'sent_count': sent_count, 

137 'scope': scope, 

138 'category': category, 

139 'title': title, 

140 }) 

141 finally: 

142 db.close() 

143 except Exception as e: 

144 return json.dumps({'error': str(e)}) 

145 

146 def get_trending_news( 

147 limit: Annotated[int, "Maximum number of trending items to return"] = 10, 

148 ) -> str: 

149 """Get trending/hot news items from already-imported feed posts.""" 

150 try: 

151 from integrations.social.models import get_db 

152 from integrations.social.feed_engine import get_trending_feed 

153 

154 db = get_db() 

155 try: 

156 posts = get_trending_feed(db, limit=limit) 

157 items = [] 

158 for post in posts: 

159 p = post.to_dict() if hasattr(post, 'to_dict') else {} 

160 items.append({ 

161 'id': p.get('id'), 

162 'title': p.get('title', ''), 

163 'content_preview': (p.get('content', '') or '')[:200], 

164 'author_id': p.get('author_id'), 

165 'vote_count': p.get('vote_count', 0), 

166 'comment_count': p.get('comment_count', 0), 

167 'created_at': p.get('created_at'), 

168 }) 

169 return json.dumps({'trending': items, 'count': len(items)}) 

170 finally: 

171 db.close() 

172 except Exception as e: 

173 return json.dumps({'error': str(e)}) 

174 

175 def get_news_metrics( 

176 days: Annotated[int, "Number of days to look back"] = 7, 

177 scope: Annotated[str, "Filter by notification type prefix: news_world, news_local, or all"] = 'all', 

178 ) -> str: 

179 """Get news notification delivery stats (sent count, read rate).""" 

180 try: 

181 from integrations.social.models import get_db, Notification 

182 from sqlalchemy import func 

183 from datetime import datetime, timedelta 

184 

185 db = get_db() 

186 try: 

187 cutoff = datetime.utcnow() - timedelta(days=days) 

188 query = db.query(Notification).filter( 

189 Notification.created_at >= cutoff, 

190 Notification.type.like('news_%'), 

191 ) 

192 if scope != 'all': 

193 query = query.filter(Notification.type == scope) 

194 

195 total = query.count() 

196 read_count = query.filter(Notification.is_read == True).count() # noqa: E712 

197 

198 # Group by type 

199 type_counts = db.query( 

200 Notification.type, 

201 func.count(Notification.id), 

202 ).filter( 

203 Notification.created_at >= cutoff, 

204 Notification.type.like('news_%'), 

205 ).group_by(Notification.type).all() 

206 

207 return json.dumps({ 

208 'period_days': days, 

209 'total_sent': total, 

210 'total_read': read_count, 

211 'read_rate': round(read_count / max(total, 1), 3), 

212 'by_type': {t: c for t, c in type_counts}, 

213 }) 

214 finally: 

215 db.close() 

216 except Exception as e: 

217 return json.dumps({'error': str(e)}) 

218 

219 tools = [ 

220 ('fetch_news_feeds', 

221 'Fetch and parse RSS/Atom feeds, returning titles, links, and categories', 

222 fetch_news_feeds), 

223 ('subscribe_news_feed', 

224 'Subscribe to a new RSS/Atom feed URL for ongoing monitoring', 

225 subscribe_news_feed), 

226 ('send_news_notification', 

227 'Push a curated news item as notification to users (all, regional, or specific user)', 

228 send_news_notification), 

229 ('get_trending_news', 

230 'Get trending/hot news items from imported feed posts', 

231 get_trending_news), 

232 ('get_news_metrics', 

233 'Get news notification delivery stats: sent count, read rate, by category', 

234 get_news_metrics), 

235 ] 

236 

237 for name, desc, func in tools: 

238 helper.register_for_llm(name=name, description=desc)(func) 

239 assistant.register_for_execution(name=name)(func) 

240 

241 logger.info(f"Registered {len(tools)} news tools for user {user_id}")