Coverage for integrations / social / api_channels.py: 27.3%
143 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2User-facing Channel Bindings API — /api/social/channels
4Provides endpoints for:
5- Channel catalog (metadata, capabilities)
6- User channel bindings (CRUD, preferred)
7- Pairing (code generation + QR, verification)
8- Presence (adapter status, heartbeat)
9- Unified conversation history
10"""
12import base64
13import io
14import logging
15from datetime import datetime
17from flask import Blueprint, request, jsonify, g
19from .auth import require_auth
20from .models import (
21 get_db, UserChannelBinding, ConversationEntry, ChannelPresence,
22)
24logger = logging.getLogger(__name__)
26channel_user_bp = Blueprint('channel_user', __name__, url_prefix='/api/social/channels')
29# ── Catalog ────────────────────────────────────────────────────
31@channel_user_bp.route('/catalog', methods=['GET'])
32def get_catalog():
33 """Return the full channel metadata catalog (public, no auth)."""
34 from integrations.channels.metadata import list_all_channels
35 catalog = list_all_channels()
36 return jsonify({'success': True, 'data': catalog})
39@channel_user_bp.route('/catalog/<channel_type>', methods=['GET'])
40def get_catalog_channel(channel_type):
41 """Return metadata for a single channel."""
42 from integrations.channels.metadata import get_channel_metadata
43 meta = get_channel_metadata(channel_type)
44 if not meta:
45 return jsonify({'success': False, 'error': f'Unknown channel: {channel_type}'}), 404
46 return jsonify({'success': True, 'data': meta})
49# ── Bindings ───────────────────────────────────────────────────
51@channel_user_bp.route('/bindings', methods=['GET'])
52@require_auth
53def list_bindings():
54 """List current user's channel bindings."""
55 bindings = g.db.query(UserChannelBinding).filter_by(
56 user_id=g.user_id,
57 ).order_by(UserChannelBinding.created_at.desc()).all()
58 return jsonify({'success': True, 'data': [b.to_dict() for b in bindings]})
61@channel_user_bp.route('/bindings', methods=['POST'])
62@require_auth
63def create_binding():
64 """Create a new channel binding for the current user."""
65 data = request.get_json(silent=True) or {}
66 channel_type = data.get('channel_type')
67 if not channel_type:
68 return jsonify({'success': False, 'error': 'channel_type is required'}), 400
70 from integrations.channels.metadata import get_channel_metadata
71 if not get_channel_metadata(channel_type):
72 return jsonify({'success': False, 'error': f'Unknown channel: {channel_type}'}), 400
74 sender_id = data.get('channel_sender_id', '')
75 chat_id = data.get('channel_chat_id', '')
77 # Check for existing binding
78 existing = g.db.query(UserChannelBinding).filter_by(
79 user_id=g.user_id,
80 channel_type=channel_type,
81 channel_sender_id=sender_id,
82 ).first()
84 if existing:
85 existing.is_active = True
86 existing.channel_chat_id = chat_id or existing.channel_chat_id
87 existing.metadata_json = data.get('metadata', existing.metadata_json)
88 existing.auth_method = data.get('auth_method', existing.auth_method)
89 binding = existing
90 else:
91 binding = UserChannelBinding(
92 user_id=g.user_id,
93 channel_type=channel_type,
94 channel_sender_id=sender_id,
95 channel_chat_id=chat_id,
96 auth_method=data.get('auth_method'),
97 metadata_json=data.get('metadata'),
98 is_active=True,
99 is_preferred=False,
100 )
101 g.db.add(binding)
103 g.db.flush()
104 return jsonify({'success': True, 'data': binding.to_dict()}), 201
107@channel_user_bp.route('/bindings/<int:binding_id>', methods=['DELETE'])
108@require_auth
109def remove_binding(binding_id):
110 """Remove a channel binding (soft-delete: set is_active=False)."""
111 binding = g.db.query(UserChannelBinding).filter_by(
112 id=binding_id, user_id=g.user_id,
113 ).first()
114 if not binding:
115 return jsonify({'success': False, 'error': 'Binding not found'}), 404
117 g.db.delete(binding)
118 return jsonify({'success': True, 'data': {'deleted': binding_id}})
121@channel_user_bp.route('/bindings/<int:binding_id>/preferred', methods=['PUT'])
122@require_auth
123def set_preferred(binding_id):
124 """Set a binding as the preferred reply channel (unsets others)."""
125 binding = g.db.query(UserChannelBinding).filter_by(
126 id=binding_id, user_id=g.user_id,
127 ).first()
128 if not binding:
129 return jsonify({'success': False, 'error': 'Binding not found'}), 404
131 # Unset all other preferred for this user
132 g.db.query(UserChannelBinding).filter(
133 UserChannelBinding.user_id == g.user_id,
134 UserChannelBinding.id != binding_id,
135 ).update({'is_preferred': False})
137 binding.is_preferred = True
138 return jsonify({'success': True, 'data': binding.to_dict()})
141# ── Pairing ────────────────────────────────────────────────────
143@channel_user_bp.route('/pair/generate', methods=['POST'])
144@require_auth
145def generate_pair_code():
146 """Generate a pairing code + QR code for cross-device linking."""
147 try:
148 from integrations.channels.security import PairingManager
149 pm = PairingManager()
150 code = pm.generate_pairing_code(
151 user_id=int(g.user_id) if g.user_id.isdigit() else hash(g.user_id) % 100000,
152 prompt_id=0,
153 expiry_minutes=15,
154 )
156 # Generate QR as base64 PNG
157 qr_data_url = _generate_qr_data_url(f'hevolve://pair?code={code}')
159 return jsonify({
160 'success': True,
161 'data': {
162 'code': code,
163 'qr_data_url': qr_data_url,
164 'expires_in_seconds': 900,
165 },
166 })
167 except Exception as e:
168 logger.error("Failed to generate pairing code: %s", e)
169 return jsonify({'success': False, 'error': 'Failed to generate pairing code'}), 500
172@channel_user_bp.route('/pair/verify', methods=['POST'])
173@require_auth
174def verify_pair_code():
175 """Verify a pairing code and create a UserChannelBinding."""
176 data = request.get_json(silent=True) or {}
177 code = data.get('code')
178 channel_type = data.get('channel', 'mobile')
179 sender_id = data.get('sender_id', '')
181 if not code:
182 return jsonify({'success': False, 'error': 'code is required'}), 400
184 try:
185 from integrations.channels.security import PairingManager
186 pm = PairingManager()
187 result = pm.verify_pairing(channel_type, sender_id, code)
189 if result is None:
190 return jsonify({'success': False, 'error': 'Invalid or expired pairing code'}), 400
192 # Create binding
193 binding = UserChannelBinding(
194 user_id=g.user_id,
195 channel_type=channel_type,
196 channel_sender_id=sender_id,
197 auth_method='pairing',
198 is_active=True,
199 is_preferred=False,
200 )
201 g.db.add(binding)
202 g.db.flush()
204 return jsonify({'success': True, 'data': binding.to_dict()})
205 except Exception as e:
206 logger.error("Pairing verification failed: %s", e)
207 return jsonify({'success': False, 'error': 'Verification failed'}), 500
210# ── Presence ───────────────────────────────────────────────────
212@channel_user_bp.route('/presence', methods=['GET'])
213def get_presence():
214 """Get all channel adapter statuses (public)."""
215 db = get_db()
216 try:
217 presences = db.query(ChannelPresence).all()
218 return jsonify({'success': True, 'data': [p.to_dict() for p in presences]})
219 finally:
220 db.close()
223@channel_user_bp.route('/presence/heartbeat', methods=['POST'])
224def post_heartbeat():
225 """Adapter reports heartbeat — upserts ChannelPresence row."""
226 data = request.get_json(silent=True) or {}
227 channel_type = data.get('channel_type')
228 status = data.get('status', 'online')
230 if not channel_type:
231 return jsonify({'success': False, 'error': 'channel_type required'}), 400
233 db = get_db()
234 try:
235 existing = db.query(ChannelPresence).filter_by(channel_type=channel_type).first()
236 if existing:
237 existing.status = status
238 existing.last_heartbeat = datetime.utcnow()
239 existing.error_message = data.get('error_message')
240 else:
241 presence = ChannelPresence(
242 channel_type=channel_type,
243 status=status,
244 last_heartbeat=datetime.utcnow(),
245 error_message=data.get('error_message'),
246 )
247 db.add(presence)
248 db.commit()
249 return jsonify({'success': True})
250 except Exception as e:
251 db.rollback()
252 return jsonify({'success': False, 'error': str(e)}), 500
253 finally:
254 db.close()
257# ── Conversation History ───────────────────────────────────────
259@channel_user_bp.route('/conversations', methods=['GET'])
260@require_auth
261def get_conversations():
262 """Paginated unified conversation history for the current user."""
263 page = request.args.get('page', 1, type=int)
264 per_page = min(request.args.get('per_page', 50, type=int), 100)
265 channel_filter = request.args.get('channel_type')
267 query = g.db.query(ConversationEntry).filter_by(user_id=g.user_id)
268 if channel_filter:
269 query = query.filter_by(channel_type=channel_filter)
271 total = query.count()
272 entries = query.order_by(
273 ConversationEntry.created_at.desc()
274 ).offset((page - 1) * per_page).limit(per_page).all()
276 return jsonify({
277 'success': True,
278 'data': [e.to_dict() for e in entries],
279 'pagination': {
280 'page': page,
281 'per_page': per_page,
282 'total': total,
283 'pages': (total + per_page - 1) // per_page,
284 },
285 })
288# ── Helpers ────────────────────────────────────────────────────
290def _generate_qr_data_url(data: str) -> str:
291 """Generate a QR code as a base64 data URL (PNG)."""
292 try:
293 import qrcode
294 qr = qrcode.QRCode(version=1, box_size=8, border=2)
295 qr.add_data(data)
296 qr.make(fit=True)
297 img = qr.make_image(fill_color='black', back_color='white')
298 buf = io.BytesIO()
299 img.save(buf, format='PNG')
300 b64 = base64.b64encode(buf.getvalue()).decode('ascii')
301 return f'data:image/png;base64,{b64}'
302 except ImportError:
303 logger.warning("qrcode package not installed — QR generation unavailable")
304 return ''
305 except Exception as e:
306 logger.warning("QR generation failed: %s", e)
307 return ''