Coverage for integrations / agent_engine / commercial_api.py: 50.5%
321 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Commercial Intelligence API Gateway — Paid intelligence-as-a-service.
4Exposes Hevolve AI capabilities as metered API endpoints.
5Revenue flows: 90% to compute providers, 10% platform sustainability.
6Free tier always available — we don't gatekeep intelligence, we sustain it.
8Service Pattern: static methods, db: Session, db.flush() not db.commit().
9Blueprint Pattern: Blueprint('commercial_api', __name__).
10"""
11import hashlib
12import hmac
13import logging
14import secrets
15import time
16from datetime import datetime, timedelta
17from functools import wraps
18from typing import Dict, List, Optional
19from core.port_registry import get_port
21from flask import Blueprint, g, jsonify, request
22from sqlalchemy.orm import Session
24logger = logging.getLogger('hevolve_social')
27# ═══════════════════════════════════════════════════════════════
28# Tier configuration (deterministic pricing)
29# ═══════════════════════════════════════════════════════════════
31TIER_CONFIG = {
32 'free': {'rate_limit_per_day': 100, 'monthly_quota': 3000, 'priority': 'low'},
33 'starter': {'rate_limit_per_day': 1000, 'monthly_quota': 30000, 'priority': 'normal'},
34 'pro': {'rate_limit_per_day': 10000, 'monthly_quota': 300000, 'priority': 'high'},
35 'enterprise': {'rate_limit_per_day': 100000, 'monthly_quota': 10000000, 'priority': 'critical'},
36}
38COST_PER_1K_TOKENS = {
39 'free': 0.0,
40 'starter': 0.5,
41 'pro': 0.3,
42 'enterprise': 0.2,
43}
46# ═══════════════════════════════════════════════════════════════
47# Service
48# ═══════════════════════════════════════════════════════════════
50class CommercialAPIService:
51 """Static service for commercial API key management and usage metering."""
53 @staticmethod
54 def create_api_key(db: Session, user_id: str,
55 name: str = '', tier: str = 'free') -> Dict:
56 """Create a new API key. Returns the raw key ONCE — it cannot be retrieved later."""
57 from integrations.social.models import CommercialAPIKey
59 if tier not in TIER_CONFIG:
60 return {'error': f'Invalid tier: {tier}. Valid: {list(TIER_CONFIG.keys())}'}
62 raw_key = secrets.token_urlsafe(48)
63 key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
64 key_prefix = raw_key[:8]
66 config = TIER_CONFIG[tier]
67 api_key = CommercialAPIKey(
68 user_id=user_id,
69 key_hash=key_hash,
70 key_prefix=key_prefix,
71 name=name,
72 tier=tier,
73 rate_limit_per_day=config['rate_limit_per_day'],
74 monthly_quota=config['monthly_quota'],
75 usage_reset_at=datetime.utcnow() + timedelta(days=30),
76 )
77 db.add(api_key)
78 db.flush()
80 result = api_key.to_dict()
81 result['raw_key'] = raw_key # Only returned once
82 return result
84 @staticmethod
85 def validate_api_key(db: Session, raw_key: str) -> Optional[Dict]:
86 """Validate an API key. Returns key dict if valid, None if invalid.
88 Uses constant-time hash comparison to prevent timing side-channels.
89 """
90 from integrations.social.models import CommercialAPIKey
92 key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
93 api_key = db.query(CommercialAPIKey).filter_by(
94 key_hash=key_hash).first()
96 # Constant-time validation: check all conditions before returning
97 valid = True
98 if not api_key:
99 valid = False
100 else:
101 if not api_key.is_active:
102 valid = False
103 if api_key.expires_at and api_key.expires_at < datetime.utcnow():
104 valid = False
105 if api_key.usage_this_month >= api_key.monthly_quota:
106 valid = False
108 return api_key.to_dict() if valid and api_key else None
110 @staticmethod
111 def log_usage(db: Session, api_key_id: str, endpoint: str,
112 tokens_in: int = 0, tokens_out: int = 0,
113 compute_ms: int = 0, status_code: int = 200) -> Dict:
114 """Log a single API call for billing.
116 NOTE: does NOT increment `usage_this_month` — that is reserved
117 pre-execution by `reserve_quota()` so over-quota callers are
118 rejected before they burn GPU. This function writes the
119 APIUsageLog row (with token counts + cost) after execution.
120 """
121 from integrations.social.models import APIUsageLog, CommercialAPIKey
123 api_key = db.query(CommercialAPIKey).filter_by(id=api_key_id).first()
124 tier = api_key.tier if api_key else 'free'
125 cost_rate = COST_PER_1K_TOKENS.get(tier, 0.0)
126 total_tokens = tokens_in + tokens_out
127 cost = round((total_tokens / 1000.0) * cost_rate, 6)
129 log = APIUsageLog(
130 api_key_id=api_key_id,
131 endpoint=endpoint,
132 tokens_in=tokens_in,
133 tokens_out=tokens_out,
134 compute_ms=compute_ms,
135 cost_credits=cost,
136 status_code=status_code,
137 )
138 db.add(log)
139 db.flush()
140 return log.to_dict()
142 @staticmethod
143 def check_rate_limit(db: Session, api_key_id: str) -> bool:
144 """Check if the key is within daily rate limit. True = allowed."""
145 from integrations.social.models import APIUsageLog, CommercialAPIKey
147 api_key = db.query(CommercialAPIKey).filter_by(id=api_key_id).first()
148 if not api_key:
149 return False
151 today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
152 today_count = db.query(APIUsageLog).filter(
153 APIUsageLog.api_key_id == api_key_id,
154 APIUsageLog.created_at >= today_start,
155 ).count()
157 return today_count < api_key.rate_limit_per_day
159 @staticmethod
160 def reserve_quota(db: Session, api_key_id: str) -> bool:
161 """Atomically reserve one quota unit BEFORE executing an inference call.
163 Prevents the race where a burst of concurrent requests all pass the
164 stale `validate_api_key` quota read, then all burn GPU time before
165 any of them get metered. Increments `usage_this_month` up front; the
166 downstream `log_usage` no longer re-increments (it still writes the
167 APIUsageLog row for billing accuracy).
169 Returns True if the reservation succeeded (request may proceed).
170 Returns False if the key is now over quota — caller MUST return 429
171 before touching the backend.
172 """
173 from integrations.social.models import CommercialAPIKey
175 api_key = db.query(CommercialAPIKey).filter_by(id=api_key_id).first()
176 if not api_key:
177 return False
179 current = api_key.usage_this_month or 0
180 if current >= api_key.monthly_quota:
181 return False
183 # Increment BEFORE the inference fires. Flushed immediately so a
184 # concurrent request in another worker reads the incremented value.
185 api_key.usage_this_month = current + 1
186 db.flush()
187 return True
189 @staticmethod
190 def release_quota(db: Session, api_key_id: str) -> None:
191 """Refund a reservation when the caller decides not to execute.
193 Used on pre-execution validation failures AFTER reserve_quota
194 succeeded (e.g., a handler rejects the request shape for 400). Not
195 called on backend failures — those still count against quota since
196 GPU time was burned.
197 """
198 from integrations.social.models import CommercialAPIKey
200 api_key = db.query(CommercialAPIKey).filter_by(id=api_key_id).first()
201 if api_key and (api_key.usage_this_month or 0) > 0:
202 api_key.usage_this_month = api_key.usage_this_month - 1
203 db.flush()
205 @staticmethod
206 def get_usage_stats(db: Session, api_key_id: str, days: int = 30) -> Dict:
207 """Aggregate usage stats for a key."""
208 from integrations.social.models import APIUsageLog
209 from sqlalchemy import func
211 cutoff = datetime.utcnow() - timedelta(days=days)
212 logs = db.query(APIUsageLog).filter(
213 APIUsageLog.api_key_id == api_key_id,
214 APIUsageLog.created_at >= cutoff,
215 )
217 total_calls = logs.count()
218 total_tokens_in = db.query(func.coalesce(func.sum(APIUsageLog.tokens_in), 0)).filter(
219 APIUsageLog.api_key_id == api_key_id,
220 APIUsageLog.created_at >= cutoff,
221 ).scalar()
222 total_tokens_out = db.query(func.coalesce(func.sum(APIUsageLog.tokens_out), 0)).filter(
223 APIUsageLog.api_key_id == api_key_id,
224 APIUsageLog.created_at >= cutoff,
225 ).scalar()
226 total_cost = db.query(func.coalesce(func.sum(APIUsageLog.cost_credits), 0.0)).filter(
227 APIUsageLog.api_key_id == api_key_id,
228 APIUsageLog.created_at >= cutoff,
229 ).scalar()
231 return {
232 'api_key_id': api_key_id,
233 'period_days': days,
234 'total_calls': total_calls,
235 'total_tokens_in': int(total_tokens_in),
236 'total_tokens_out': int(total_tokens_out),
237 'total_cost_credits': round(float(total_cost), 4),
238 }
240 @staticmethod
241 def list_api_keys(db: Session, user_id: str) -> List[Dict]:
242 """List all API keys for a user."""
243 from integrations.social.models import CommercialAPIKey
244 keys = db.query(CommercialAPIKey).filter_by(
245 user_id=user_id).order_by(CommercialAPIKey.created_at.desc()).all()
246 return [k.to_dict() for k in keys]
248 @staticmethod
249 def revoke_api_key(db: Session, api_key_id: str) -> Optional[Dict]:
250 """Revoke (deactivate) an API key."""
251 from integrations.social.models import CommercialAPIKey
252 key = db.query(CommercialAPIKey).filter_by(id=api_key_id).first()
253 if not key:
254 return None
255 key.is_active = False
256 db.flush()
257 return key.to_dict()
259 @staticmethod
260 def reset_monthly_quotas(db: Session) -> int:
261 """Reset monthly usage for keys past their reset date. Called by daemon."""
262 from integrations.social.models import CommercialAPIKey
263 now = datetime.utcnow()
264 keys = db.query(CommercialAPIKey).filter(
265 CommercialAPIKey.is_active == True,
266 CommercialAPIKey.usage_reset_at <= now,
267 ).all()
268 count = 0
269 for k in keys:
270 k.usage_this_month = 0
271 k.usage_reset_at = now + timedelta(days=30)
272 count += 1
273 if count > 0:
274 db.flush()
275 return count
278# ═══════════════════════════════════════════════════════════════
279# Brute-force protection (TTLCache + lock)
280# ═══════════════════════════════════════════════════════════════
282import threading as _bf_threading
283from core.session_cache import TTLCache as _BFTTLCache
285_failed_attempts_lock = _bf_threading.Lock()
286_failed_attempts = _BFTTLCache(ttl_seconds=900, max_size=10000, name='api_brute_force')
289def _check_brute_force(ip: str) -> bool:
290 """Return True if IP exceeded 10 failed attempts in 15 min."""
291 with _failed_attempts_lock:
292 return (_failed_attempts.get(ip) or 0) >= 10
295def _record_failed_attempt(ip: str):
296 with _failed_attempts_lock:
297 _failed_attempts[ip] = (_failed_attempts.get(ip) or 0) + 1
300# ═══════════════════════════════════════════════════════════════
301# Auth decorator for API key endpoints
302# ═══════════════════════════════════════════════════════════════
304def _seconds_until_daily_reset() -> int:
305 """Return seconds until the next UTC midnight — when the daily rate
306 limit window rolls over. Used for the Retry-After header on 429s."""
307 now = datetime.utcnow()
308 tomorrow = (now + timedelta(days=1)).replace(
309 hour=0, minute=0, second=0, microsecond=0)
310 return max(1, int((tomorrow - now).total_seconds()))
313def _seconds_until_monthly_reset(api_key: Optional[dict]) -> int:
314 """Return seconds until `usage_reset_at` — when monthly quota rolls
315 over. Falls back to 24h if the key does not carry a reset timestamp."""
316 reset_raw = (api_key or {}).get('usage_reset_at') if api_key else None
317 if not reset_raw:
318 return 86400
319 try:
320 # SQLAlchemy normally returns a datetime; to_dict may serialize it
321 if isinstance(reset_raw, datetime):
322 reset_dt = reset_raw
323 else:
324 reset_dt = datetime.fromisoformat(str(reset_raw).replace('Z', ''))
325 delta = reset_dt - datetime.utcnow()
326 return max(1, int(delta.total_seconds()))
327 except (ValueError, TypeError):
328 return 86400
331def _rate_limit_response(error: str, retry_after: int,
332 usage_meta: Optional[dict] = None):
333 """Build a 429 response with the canonical Retry-After header and
334 usage metadata. Centralised so every rate-limit / quota path emits a
335 uniform shape (single writer — Gate 4 Parallel Path)."""
336 payload = {'success': False, 'error': error, 'retry_after': retry_after}
337 if usage_meta:
338 payload['usage'] = usage_meta
339 response = jsonify(payload)
340 response.status_code = 429
341 response.headers['Retry-After'] = str(retry_after)
342 return response
345def require_api_key(f):
346 """Decorator: requires valid X-API-Key header. Sets g.api_key.
348 Enforces quota PRE-EXECUTION via reserve_quota() — an over-quota key
349 returns 429 BEFORE the inference call reaches llama-server/media-agent,
350 so a malicious or runaway client cannot burn GPU time on requests that
351 will be billed as zero. No log_usage() is called on the 429 path — the
352 backend was never invoked, so no inference tokens are recorded.
354 The 429 response carries a `Retry-After` header (seconds) so clients
355 can back off correctly. All 4 tiers (free/starter/pro/enterprise) flow
356 through this single gate — the decorator is the one writer that
357 enforces quota uniformly.
358 """
359 @wraps(f)
360 def decorated(*args, **kwargs):
361 from integrations.social.models import get_db
363 if _check_brute_force(request.remote_addr):
364 # Brute-force window = 15 min; surface that as Retry-After.
365 return _rate_limit_response('Too many failed attempts', 900)
367 raw_key = request.headers.get('X-API-Key', '')
368 if not raw_key:
369 return jsonify({'success': False, 'error': 'Missing X-API-Key header'}), 401
371 db = get_db()
372 try:
373 key_data = CommercialAPIService.validate_api_key(db, raw_key)
374 if not key_data:
375 _record_failed_attempt(request.remote_addr)
376 return jsonify({'success': False, 'error': 'Invalid, expired, or quota-exceeded API key'}), 401
378 if not CommercialAPIService.check_rate_limit(db, key_data['id']):
379 return _rate_limit_response(
380 'Daily rate limit exceeded',
381 _seconds_until_daily_reset(),
382 usage_meta={
383 'tier': key_data.get('tier', 'free'),
384 'rate_limit_per_day': key_data.get(
385 'rate_limit_per_day'),
386 },
387 )
389 # Atomic pre-execution quota reservation. Closes the race where
390 # concurrent requests all pass the stale validate_api_key read
391 # and burn GPU before any of them update usage_this_month.
392 # No log_usage() is called on the 429 path — backend was never
393 # invoked, so no inference_used metric is written.
394 if not CommercialAPIService.reserve_quota(db, key_data['id']):
395 db.commit() # persist any state from validate/rate checks
396 return _rate_limit_response(
397 'Monthly quota exceeded',
398 _seconds_until_monthly_reset(key_data),
399 usage_meta={
400 'tier': key_data.get('tier', 'free'),
401 'monthly_quota': key_data.get('monthly_quota'),
402 'usage_this_month': key_data.get('usage_this_month'),
403 },
404 )
406 g.api_key = key_data
407 g.api_db = db
408 result = f(*args, **kwargs)
409 db.commit()
410 return result
411 except Exception as e:
412 db.rollback()
413 raise
414 finally:
415 db.close()
417 return decorated
420# ═══════════════════════════════════════════════════════════════
421# Blueprint
422# ═══════════════════════════════════════════════════════════════
424commercial_api_bp = Blueprint('commercial_api', __name__)
427@commercial_api_bp.route('/api/v1/intelligence/chat', methods=['POST'])
428@require_api_key
429def intelligence_chat():
430 """Metered intelligence-as-a-service chat endpoint."""
431 data = request.get_json() or {}
432 prompt = data.get('prompt', '')
433 if not prompt:
434 return jsonify({'success': False, 'error': 'prompt required'}), 400
436 t0 = time.time()
437 response_text = ''
438 tokens_in = len(prompt.split())
439 tokens_out = 0
441 try:
442 from core.http_pool import pooled_post
443 result = pooled_post(f'http://localhost:{get_port("backend")}/chat', json={
444 'user_id': g.api_key['user_id'],
445 'prompt_id': 'api_intelligence',
446 'prompt': prompt,
447 'create_agent': False,
448 }, timeout=120)
449 resp_data = result.json() if hasattr(result, 'json') else {}
450 response_text = resp_data.get('response', '')
451 tokens_out = len(response_text.split())
452 except Exception as e:
453 logger.warning(f"Intelligence endpoint error: {e}")
454 response_text = 'Intelligence service temporarily unavailable'
456 elapsed_ms = int((time.time() - t0) * 1000)
458 CommercialAPIService.log_usage(
459 g.api_db, g.api_key['id'], '/v1/intelligence/chat',
460 tokens_in=tokens_in, tokens_out=tokens_out,
461 compute_ms=elapsed_ms)
463 return jsonify({
464 'success': True,
465 'response': response_text,
466 'usage': {'tokens_in': tokens_in, 'tokens_out': tokens_out,
467 'compute_ms': elapsed_ms},
468 })
471@commercial_api_bp.route('/api/v1/intelligence/analyze', methods=['POST'])
472@require_api_key
473def intelligence_analyze():
474 """Document/data analysis via agent engine."""
475 data = request.get_json() or {}
476 document = data.get('document', '')
477 question = data.get('question', 'Analyze this document')
478 if not document:
479 return jsonify({'success': False, 'error': 'document required'}), 400
481 t0 = time.time()
482 prompt = f"Analyze the following document and answer: {question}\n\n{document[:5000]}"
483 tokens_in = len(prompt.split())
485 try:
486 from core.http_pool import pooled_post
487 result = pooled_post(f'http://localhost:{get_port("backend")}/chat', json={
488 'user_id': g.api_key['user_id'],
489 'prompt_id': 'api_analyze',
490 'prompt': prompt,
491 'create_agent': False,
492 }, timeout=120)
493 resp = result.json() if hasattr(result, 'json') else {}
494 response_text = resp.get('response', '')
495 tokens_out = len(response_text.split())
496 except Exception as e:
497 logger.warning(f"Analysis endpoint error: {e}")
498 response_text = 'Analysis service temporarily unavailable'
499 tokens_out = 0
501 elapsed_ms = int((time.time() - t0) * 1000)
502 CommercialAPIService.log_usage(
503 g.api_db, g.api_key['id'], '/v1/intelligence/analyze',
504 tokens_in=tokens_in, tokens_out=tokens_out, compute_ms=elapsed_ms)
506 return jsonify({'success': True, 'analysis': response_text,
507 'usage': {'tokens_in': tokens_in, 'tokens_out': tokens_out,
508 'compute_ms': elapsed_ms}})
511@commercial_api_bp.route('/api/v1/intelligence/generate', methods=['POST'])
512@require_api_key
513def intelligence_generate():
514 """Media generation (image/audio/video) via media agent."""
515 data = request.get_json() or {}
516 modality = data.get('modality', 'image')
517 prompt_text = data.get('prompt', '')
518 if not prompt_text:
519 return jsonify({'success': False, 'error': 'prompt required'}), 400
521 t0 = time.time()
523 try:
524 from integrations.service_tools.media_agent import generate_media
525 result_json = generate_media(
526 context=prompt_text,
527 output_modality=modality,
528 input_text=prompt_text,
529 )
530 import json
531 result = json.loads(result_json) if isinstance(result_json, str) else result_json
532 except Exception as e:
533 logger.warning(f"Generate endpoint error: {e}")
534 result = {'error': 'Generation service temporarily unavailable'}
536 elapsed_ms = int((time.time() - t0) * 1000)
537 tokens_in = len(prompt_text.split())
538 CommercialAPIService.log_usage(
539 g.api_db, g.api_key['id'], '/v1/intelligence/generate',
540 tokens_in=tokens_in, compute_ms=elapsed_ms)
542 return jsonify({'success': True, 'result': result,
543 'usage': {'tokens_in': tokens_in, 'compute_ms': elapsed_ms}})
546@commercial_api_bp.route('/api/v1/intelligence/hivemind', methods=['GET'])
547@require_api_key
548def intelligence_hivemind():
549 """Query collective knowledge via HiveMind."""
550 query = request.args.get('query', '')
551 if not query:
552 return jsonify({'success': False, 'error': 'query parameter required'}), 400
554 t0 = time.time()
555 result = {}
557 try:
558 from integrations.agent_engine.world_model_bridge import get_world_model_bridge
559 bridge = get_world_model_bridge()
560 result = bridge.query_hivemind(query)
561 except Exception as e:
562 logger.warning(f"HiveMind endpoint error: {e}")
563 result = {'error': 'HiveMind service temporarily unavailable'}
565 elapsed_ms = int((time.time() - t0) * 1000)
566 CommercialAPIService.log_usage(
567 g.api_db, g.api_key['id'], '/v1/intelligence/hivemind',
568 tokens_in=len(query.split()), compute_ms=elapsed_ms)
570 return jsonify({'success': True, 'result': result,
571 'usage': {'compute_ms': elapsed_ms}})
574@commercial_api_bp.route('/api/v1/intelligence/usage', methods=['GET'])
575@require_api_key
576def intelligence_usage():
577 """Get usage stats for the calling API key."""
578 days = request.args.get('days', 30, type=int)
579 stats = CommercialAPIService.get_usage_stats(
580 g.api_db, g.api_key['id'], days=days)
581 return jsonify({'success': True, 'data': stats})
584# ─── Key management (JWT auth, not API key) ───
586@commercial_api_bp.route('/api/v1/intelligence/keys', methods=['POST'])
587def create_key():
588 """Create a new API key (requires JWT auth)."""
589 from integrations.social.auth import require_auth
590 from integrations.social.models import get_db
592 auth_header = request.headers.get('Authorization', '')
593 if not auth_header.startswith('Bearer '):
594 return jsonify({'success': False, 'error': 'Authorization required'}), 401
596 from integrations.social.auth import _get_user_from_token
597 token = auth_header[7:]
598 user, db = _get_user_from_token(token)
599 if not user:
600 if db:
601 db.close()
602 return jsonify({'success': False, 'error': 'Invalid token'}), 401
604 try:
605 data = request.get_json() or {}
606 result = CommercialAPIService.create_api_key(
607 db, str(user.id),
608 name=data.get('name', ''),
609 tier=data.get('tier', 'free'),
610 )
611 if 'error' in result:
612 return jsonify({'success': False, 'error': result['error']}), 400
613 db.commit()
614 return jsonify({'success': True, 'api_key': result}), 201
615 finally:
616 db.close()
619@commercial_api_bp.route('/api/v1/intelligence/keys', methods=['GET'])
620def list_keys():
621 """List user's API keys (requires JWT auth)."""
622 auth_header = request.headers.get('Authorization', '')
623 if not auth_header.startswith('Bearer '):
624 return jsonify({'success': False, 'error': 'Authorization required'}), 401
626 from integrations.social.auth import _get_user_from_token
627 token = auth_header[7:]
628 user, db = _get_user_from_token(token)
629 if not user:
630 if db:
631 db.close()
632 return jsonify({'success': False, 'error': 'Invalid token'}), 401
634 try:
635 keys = CommercialAPIService.list_api_keys(db, str(user.id))
636 return jsonify({'success': True, 'keys': keys})
637 finally:
638 db.close()
641@commercial_api_bp.route('/api/v1/intelligence/keys/<key_id>', methods=['DELETE'])
642def revoke_key(key_id):
643 """Revoke an API key (requires JWT auth)."""
644 auth_header = request.headers.get('Authorization', '')
645 if not auth_header.startswith('Bearer '):
646 return jsonify({'success': False, 'error': 'Authorization required'}), 401
648 from integrations.social.auth import _get_user_from_token
649 token = auth_header[7:]
650 user, db = _get_user_from_token(token)
651 if not user:
652 if db:
653 db.close()
654 return jsonify({'success': False, 'error': 'Invalid token'}), 401
656 try:
657 result = CommercialAPIService.revoke_api_key(db, key_id)
658 if not result:
659 return jsonify({'success': False, 'error': 'Key not found'}), 404
660 db.commit()
661 return jsonify({'success': True, 'key': result})
662 finally:
663 db.close()