Coverage for integrations / social / integrity_service.py: 61.2%
740 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial - Integrity Verification Service
3Challenge-response protocol, impression witnessing, consensus verification,
4fraud scoring. Central service for all anti-fraud logic.
5"""
6import os
7import secrets
8import logging
9import statistics
10import requests
11from core.http_pool import pooled_get, pooled_post
12from datetime import datetime, timedelta
13from typing import Optional, Dict, List
15from sqlalchemy import func as sqlfunc
16from sqlalchemy.orm import Session
18from .models import (
19 PeerNode, AdImpression, HostingReward,
20 NodeAttestation, IntegrityChallenge, FraudAlert,
21 User, Post,
22)
24logger = logging.getLogger('hevolve_social')
26# Fraud scoring weights
27FRAUD_WEIGHTS = {
28 'hash_mismatch': 30.0,
29 'challenge_fail': 15.0,
30 'impression_anomaly': 20.0,
31 'score_jump': 10.0,
32 'witness_refusal': 5.0,
33 'collusion_suspected': 25.0,
34 'reward_velocity_anomaly': 25.0,
35 'reward_self_dealing': 35.0,
36 'spark_gaming': 20.0,
37 'witness_ring': 30.0,
38 'temporal_clustering': 20.0,
39 'seal_tamper': 35.0,
40 'gradient_magnitude_anomaly': 20.0,
41 'gradient_direction_flip': 25.0,
42}
44IMPRESSION_ANOMALY_STDDEV = 3.0
45SCORE_JUMP_THRESHOLD_PCT = 200
46FRAUD_BAN_THRESHOLD = 80.0
47ATTESTATION_EXPIRY_DAYS = 7
48MIN_WITNESS_PEERS = 1
49CHALLENGE_TIMEOUT_SECONDS = 30
50WITNESS_TIMESTAMP_MAX_AGE = 60 # seconds
52# ── Fraud Score Decay ──
53# Every audit round, each node's fraud score decays by this amount.
54# Good behavior over time earns back trust. But ban records persist.
55FRAUD_SCORE_DECAY_PER_ROUND = 2.0 # Points per audit round (~5 min)
56FRAUD_SCORE_DECAY_MIN = 0.0
58# ── Fail2ban: Progressive Ban Durations ──
59# Each subsequent ban lasts longer. ban_count tracked on PeerNode.
60# After max_bans (4+), node must petition for human review.
61FAIL2BAN_DURATIONS = {
62 1: timedelta(hours=1), # 1st offense: 1 hour
63 2: timedelta(hours=24), # 2nd offense: 24 hours
64 3: timedelta(days=7), # 3rd offense: 1 week
65}
66FAIL2BAN_MAX_DURATION = timedelta(days=30) # 4th+ offense: 30 days
69class IntegrityService:
70 """Central service for node integrity verification and anti-fraud."""
72 # ─── Code Hash Verification ───
74 @staticmethod
75 def verify_code_hash(db: Session, node_id: str,
76 registry_url: str = None) -> Dict:
77 """Check if node's code_hash matches expected hash.
78 Priority: master-signed manifest > registry > local computation."""
79 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
80 if not peer or not peer.code_hash:
81 return {'verified': False, 'details': 'No code hash available'}
83 expected = None
85 # Priority 0: Release hash registry (multi-version support)
86 try:
87 from security.release_hash_registry import get_release_hash_registry
88 registry = get_release_hash_registry()
89 if registry.is_known_release_hash(peer.code_hash):
90 peer.integrity_status = 'verified'
91 peer.last_attestation_at = datetime.utcnow()
92 return {'verified': True,
93 'details': 'Code hash in release registry'}
94 except Exception:
95 pass
97 # Primary: check against master-signed manifest
98 try:
99 from security.master_key import load_release_manifest, verify_release_manifest
100 manifest = load_release_manifest()
101 if manifest and verify_release_manifest(manifest):
102 expected = manifest.get('code_hash', '')
103 except Exception:
104 pass
106 # Fallback: registry
107 if not expected and registry_url:
108 expected = IntegrityService.fetch_expected_hash(
109 registry_url, peer.code_version or peer.version)
111 if not expected:
112 # Last resort: compare against own code hash
113 try:
114 from security.node_integrity import compute_code_hash
115 expected = compute_code_hash()
116 except Exception:
117 return {'verified': False, 'details': 'Cannot compute local hash'}
119 if peer.code_hash == expected:
120 peer.integrity_status = 'verified'
121 peer.last_attestation_at = datetime.utcnow()
122 return {'verified': True, 'details': 'Code hash matches'}
123 else:
124 IntegrityService.increase_fraud_score(
125 db, node_id, FRAUD_WEIGHTS['hash_mismatch'],
126 f'Code hash mismatch: expected {expected[:16]}..., got {peer.code_hash[:16]}...',
127 {'expected': expected, 'reported': peer.code_hash})
128 return {'verified': False, 'details': 'Code hash mismatch'}
130 @staticmethod
131 def fetch_expected_hash(registry_url: str, version: str) -> Optional[str]:
132 """GET expected code hash from central registry."""
133 try:
134 resp = pooled_get(
135 f"{registry_url}/api/social/integrity/expected-hash",
136 params={'version': version},
137 timeout=5,
138 )
139 if resp.status_code == 200:
140 return resp.json().get('code_hash')
141 except requests.RequestException:
142 pass
143 return None
145 # ─── Challenge-Response Protocol ───
147 @staticmethod
148 def create_challenge(db: Session, challenger_node_id: str,
149 target_node_id: str, target_url: str,
150 challenge_type: str = 'agent_count_verify') -> Optional[Dict]:
151 """Create and send a challenge to a target node."""
152 nonce = secrets.token_hex(32)
154 challenge_data = {'type': challenge_type, 'nonce': nonce}
155 if challenge_type == 'agent_count_verify':
156 peer = db.query(PeerNode).filter_by(node_id=target_node_id).first()
157 if peer:
158 challenge_data['claimed_agent_count'] = peer.agent_count or 0
159 elif challenge_type == 'stats_probe':
160 challenge_data['requested_stats'] = ['agent_count', 'post_count']
161 elif challenge_type == 'code_hash_check':
162 challenge_data['request'] = 'code_hash_and_version'
164 # Sign the challenge
165 try:
166 from security.node_integrity import sign_json_payload, get_public_key_hex
167 challenge_data['challenger_public_key'] = get_public_key_hex()
168 challenge_data['signature'] = sign_json_payload(challenge_data)
169 except Exception:
170 pass
172 challenge = IntegrityChallenge(
173 challenger_node_id=challenger_node_id,
174 target_node_id=target_node_id,
175 challenge_type=challenge_type,
176 challenge_nonce=nonce,
177 challenge_data=challenge_data,
178 status='pending',
179 )
180 db.add(challenge)
181 db.flush()
183 # Send challenge to target node
184 try:
185 resp = pooled_post(
186 f"{target_url}/api/social/integrity/challenge",
187 json={'challenge_id': challenge.id, **challenge_data,
188 'challenger_node_id': challenger_node_id},
189 timeout=CHALLENGE_TIMEOUT_SECONDS,
190 )
191 if resp.status_code == 200:
192 response_data = resp.json()
193 return IntegrityService.evaluate_challenge_response(
194 db, challenge.id,
195 response_data.get('response', {}),
196 response_data.get('signature', ''))
197 except requests.RequestException:
198 challenge.status = 'timeout'
199 IntegrityService.increase_fraud_score(
200 db, target_node_id, 5.0,
201 f'Challenge timeout: {challenge_type}')
203 peer = db.query(PeerNode).filter_by(node_id=target_node_id).first()
204 if peer:
205 peer.last_challenge_at = datetime.utcnow()
206 return challenge.to_dict()
208 @staticmethod
209 def handle_challenge(db: Session, challenge_data: dict) -> Dict:
210 """Handle an incoming challenge from a peer. Compute response from local data."""
211 challenge_type = challenge_data.get('type', '')
212 nonce = challenge_data.get('nonce', '')
213 response = {'nonce': nonce, 'timestamp': datetime.utcnow().isoformat()}
215 if challenge_type == 'agent_count_verify':
216 actual_count = db.query(sqlfunc.count(User.id)).filter_by(
217 user_type='agent').scalar() or 0
218 response['agent_count'] = actual_count
219 # Include sample agent IDs as proof
220 agents = db.query(User.id, User.username).filter_by(
221 user_type='agent').limit(5).all()
222 response['sample_agents'] = [
223 {'id': a.id, 'username': a.username} for a in agents]
225 elif challenge_type == 'stats_probe':
226 response['agent_count'] = db.query(sqlfunc.count(User.id)).filter_by(
227 user_type='agent').scalar() or 0
228 response['post_count'] = db.query(sqlfunc.count(Post.id)).scalar() or 0
230 elif challenge_type == 'code_hash_check':
231 try:
232 from security.node_integrity import compute_code_hash
233 from integrations.social.peer_discovery import gossip
234 response['code_hash'] = compute_code_hash()
235 response['version'] = gossip.version
236 except Exception:
237 response['code_hash'] = 'unavailable'
239 elif challenge_type == 'guardrail_verify':
240 try:
241 from security.hive_guardrails import get_guardrail_hash, compute_guardrail_hash
242 response['guardrail_hash'] = get_guardrail_hash()
243 # Recompute live to prove it's not cached/stale
244 response['guardrail_hash_live'] = compute_guardrail_hash()
245 except Exception:
246 response['guardrail_hash'] = 'unavailable'
248 elif challenge_type == 'impression_audit':
249 ad_id = challenge_data.get('ad_id')
250 if ad_id:
251 hour_ago = datetime.utcnow() - timedelta(hours=1)
252 count = db.query(sqlfunc.count(AdImpression.id)).filter(
253 AdImpression.ad_id == ad_id,
254 AdImpression.created_at >= hour_ago,
255 ).scalar() or 0
256 response['impression_count'] = count
258 # Sign the response
259 try:
260 from security.node_integrity import sign_json_payload, get_public_key_hex
261 response['public_key'] = get_public_key_hex()
262 sig = sign_json_payload(response)
263 return {'response': response, 'signature': sig}
264 except Exception:
265 return {'response': response, 'signature': ''}
267 @staticmethod
268 def evaluate_challenge_response(db: Session, challenge_id: str,
269 response_data: dict,
270 response_signature: str) -> Dict:
271 """Evaluate a challenge response. Verify signature and data consistency."""
272 challenge = db.query(IntegrityChallenge).filter_by(id=challenge_id).first()
273 if not challenge:
274 return {'passed': False, 'details': 'Challenge not found'}
275 # Prevent replay: reject already-evaluated challenges
276 if challenge.status != 'pending':
277 return {'passed': False, 'details': f'Challenge already processed (status={challenge.status})'}
279 challenge.response_data = response_data
280 challenge.response_signature = response_signature
281 challenge.responded_at = datetime.utcnow()
283 # Verify nonce
284 if response_data.get('nonce') != challenge.challenge_nonce:
285 challenge.status = 'failed'
286 challenge.result_details = 'Nonce mismatch'
287 IntegrityService.increase_fraud_score(
288 db, challenge.target_node_id, FRAUD_WEIGHTS['challenge_fail'],
289 'Challenge failed: nonce mismatch')
290 return {'passed': False, 'details': 'Nonce mismatch'}
292 # Verify signature if public key available
293 public_key = response_data.get('public_key', '')
294 if public_key and response_signature:
295 try:
296 from security.node_integrity import verify_json_signature
297 if not verify_json_signature(public_key, response_data, response_signature):
298 challenge.status = 'failed'
299 challenge.result_details = 'Invalid signature'
300 IntegrityService.increase_fraud_score(
301 db, challenge.target_node_id, FRAUD_WEIGHTS['challenge_fail'],
302 'Challenge failed: invalid signature')
303 return {'passed': False, 'details': 'Invalid signature'}
304 except Exception:
305 pass
307 # Evaluate based on challenge type
308 passed = True
309 details = 'OK'
311 if challenge.challenge_type == 'agent_count_verify':
312 claimed = (challenge.challenge_data or {}).get('claimed_agent_count', 0)
313 actual = response_data.get('agent_count', 0)
314 # Allow 10% tolerance
315 if claimed > 0 and actual < claimed * 0.5:
316 passed = False
317 details = f'Agent count mismatch: claimed {claimed}, actual {actual}'
319 elif challenge.challenge_type == 'stats_probe':
320 peer = db.query(PeerNode).filter_by(
321 node_id=challenge.target_node_id).first()
322 if peer:
323 reported_agents = response_data.get('agent_count', 0)
324 if peer.agent_count and reported_agents < (peer.agent_count or 0) * 0.5:
325 passed = False
326 details = f'Stats mismatch: DB has {peer.agent_count}, node reports {reported_agents}'
328 elif challenge.challenge_type == 'code_hash_check':
329 reported_hash = response_data.get('code_hash', '')
330 peer = db.query(PeerNode).filter_by(
331 node_id=challenge.target_node_id).first()
332 if peer and peer.code_hash and reported_hash != peer.code_hash:
333 passed = False
334 details = 'Code hash changed since last exchange'
336 elif challenge.challenge_type == 'guardrail_verify':
337 reported_hash = response_data.get('guardrail_hash', '')
338 reported_live = response_data.get('guardrail_hash_live', '')
339 try:
340 from security.hive_guardrails import get_guardrail_hash
341 expected = get_guardrail_hash()
342 if reported_hash != expected:
343 passed = False
344 details = f'Guardrail hash mismatch: expected {expected[:16]}, got {reported_hash[:16]}'
345 elif reported_live and reported_live != expected:
346 passed = False
347 details = f'Guardrail live recompute mismatch — possible tampering'
348 elif reported_hash != reported_live:
349 passed = False
350 details = 'Cached vs live guardrail hash mismatch — values may have drifted'
351 except Exception:
352 pass
354 challenge.status = 'passed' if passed else 'failed'
355 challenge.result_details = details
356 challenge.evaluated_at = datetime.utcnow()
358 if not passed:
359 IntegrityService.increase_fraud_score(
360 db, challenge.target_node_id, FRAUD_WEIGHTS['challenge_fail'],
361 f'Challenge failed: {details}')
362 else:
363 # Successful challenge reduces fraud score slightly
364 IntegrityService.decrease_fraud_score(
365 db, challenge.target_node_id, 2.0,
366 f'Challenge passed: {challenge.challenge_type}')
368 return {'passed': passed, 'details': details}
370 # ─── Impression Witnessing ───
372 # Rate limit: max witness requests per node per hour
373 _witness_request_counts = {} # node_id -> (count, window_start)
374 _WITNESS_MAX_PER_HOUR = 100
376 @staticmethod
377 def request_nearest_witness(db: Session, impression_id: str,
378 ad_id: str, requesting_node_id: str) -> Optional[Dict]:
379 """Find nearest active non-banned peer and request a witness attestation."""
380 # Rate limit witness requests per node
381 now = datetime.utcnow()
382 counts = IntegrityService._witness_request_counts
383 entry = counts.get(requesting_node_id)
384 if entry:
385 count, window_start = entry
386 if (now - window_start).total_seconds() > 3600:
387 counts[requesting_node_id] = (1, now)
388 elif count >= IntegrityService._WITNESS_MAX_PER_HOUR:
389 return None # Rate limited
390 else:
391 counts[requesting_node_id] = (count + 1, window_start)
392 else:
393 counts[requesting_node_id] = (1, now)
395 peers = db.query(PeerNode).filter(
396 PeerNode.status == 'active',
397 PeerNode.integrity_status != 'banned',
398 PeerNode.node_id != requesting_node_id,
399 ).limit(5).all()
401 for peer in peers:
402 result = IntegrityService._request_witness_from_peer(
403 db, impression_id, ad_id, requesting_node_id, peer)
404 if result:
405 return result
406 return None
408 @staticmethod
409 def _request_witness_from_peer(db: Session, impression_id: str,
410 ad_id: str, requesting_node_id: str,
411 peer: PeerNode) -> Optional[Dict]:
412 """Request a specific peer to witness an impression."""
413 nonce = secrets.token_hex(16)
414 payload = {
415 'impression_id': impression_id,
416 'ad_id': ad_id,
417 'node_id': requesting_node_id,
418 'timestamp': datetime.utcnow().isoformat(),
419 'nonce': nonce,
420 }
421 try:
422 from security.node_integrity import sign_json_payload, get_public_key_hex
423 payload['public_key'] = get_public_key_hex()
424 payload['signature'] = sign_json_payload(payload)
425 except Exception:
426 pass
428 try:
429 resp = pooled_post(
430 f"{peer.url}/api/social/integrity/witness-impression",
431 json=payload,
432 timeout=5,
433 )
434 if resp.status_code == 200:
435 data = resp.json()
436 if data.get('witnessed'):
437 # Store attestation
438 attestation = NodeAttestation(
439 attester_node_id=peer.node_id,
440 subject_node_id=requesting_node_id,
441 attestation_type='impression_witness',
442 payload_json={'impression_id': impression_id, 'ad_id': ad_id},
443 signature=data.get('signature', ''),
444 attester_public_key=data.get('public_key', peer.public_key or ''),
445 expires_at=datetime.utcnow() + timedelta(days=ATTESTATION_EXPIRY_DAYS),
446 )
447 db.add(attestation)
448 db.flush()
449 return attestation.to_dict()
450 except requests.RequestException:
451 pass
452 return None
454 @staticmethod
455 def handle_witness_request(db: Session, witness_data: dict) -> Dict:
456 """Handle incoming witness request from a peer node."""
457 requesting_node_id = witness_data.get('node_id', '')
458 timestamp_str = witness_data.get('timestamp', '')
459 signature = witness_data.get('signature', '')
460 public_key = witness_data.get('public_key', '')
462 # Check if requesting node is known and not banned
463 peer = db.query(PeerNode).filter_by(node_id=requesting_node_id).first()
464 if peer and peer.integrity_status == 'banned':
465 return {'witnessed': False, 'reason': 'Requesting node is banned'}
467 # Check timestamp freshness
468 if timestamp_str:
469 try:
470 ts = datetime.fromisoformat(timestamp_str)
471 age = (datetime.utcnow() - ts).total_seconds()
472 if age > WITNESS_TIMESTAMP_MAX_AGE:
473 return {'witnessed': False, 'reason': 'Stale timestamp'}
474 except (ValueError, TypeError):
475 pass
477 # Verify signature (required — omitting signature is not allowed)
478 if not signature or not public_key:
479 return {'witnessed': False, 'reason': 'Missing signature or public key'}
480 try:
481 from security.node_integrity import verify_json_signature
482 if not verify_json_signature(public_key, witness_data, signature):
483 return {'witnessed': False, 'reason': 'Invalid signature'}
484 except Exception:
485 return {'witnessed': False, 'reason': 'Signature verification failed'}
487 # Co-sign the witness
488 try:
489 from security.node_integrity import sign_json_payload, get_public_key_hex
490 witness_response = {
491 'witnessed': True,
492 'impression_id': witness_data.get('impression_id'),
493 'ad_id': witness_data.get('ad_id'),
494 'nonce': witness_data.get('nonce'),
495 'witness_timestamp': datetime.utcnow().isoformat(),
496 }
497 witness_response['public_key'] = get_public_key_hex()
498 witness_response['signature'] = sign_json_payload(witness_response)
499 return witness_response
500 except Exception:
501 return {'witnessed': True, 'signature': '', 'public_key': ''}
503 @staticmethod
504 def get_impression_witness_count(db: Session, impression_id: str) -> int:
505 """Count how many peer witnesses confirmed a given impression."""
506 return db.query(sqlfunc.count(NodeAttestation.id)).filter(
507 NodeAttestation.attestation_type == 'impression_witness',
508 NodeAttestation.is_valid == True,
509 NodeAttestation.payload_json.contains(impression_id) if hasattr(
510 NodeAttestation.payload_json, 'contains') else True,
511 ).scalar() or 0
513 # ─── Score Consensus ───
515 @staticmethod
516 def probe_peer_stats(peer_url: str, target_node_id: str) -> Optional[Dict]:
517 """Query a peer for its view of a target node's stats."""
518 try:
519 resp = pooled_get(
520 f"{peer_url}/api/social/integrity/peer-stats",
521 params={'node_id': target_node_id},
522 timeout=5,
523 )
524 if resp.status_code == 200:
525 return resp.json()
526 except requests.RequestException:
527 pass
528 return None
530 @staticmethod
531 def run_consensus_check(db: Session, target_node_id: str,
532 peer_urls: List[str]) -> Dict:
533 """Query multiple peers for their view of target's stats. Flag disagreements."""
534 reports = []
535 for url in peer_urls[:5]:
536 report = IntegrityService.probe_peer_stats(url, target_node_id)
537 if report:
538 reports.append(report)
540 if len(reports) < 2:
541 return {'consensus': True, 'reports': reports, 'anomalies': [],
542 'details': 'Not enough peers for consensus'}
544 agent_counts = [r.get('agent_count', 0) for r in reports]
545 post_counts = [r.get('post_count', 0) for r in reports]
547 anomalies = []
548 if len(set(agent_counts)) > 1:
549 spread = max(agent_counts) - min(agent_counts)
550 mean = statistics.mean(agent_counts) if agent_counts else 0
551 if mean > 0 and spread / mean > 0.5:
552 anomalies.append({
553 'field': 'agent_count',
554 'values': agent_counts,
555 'spread_ratio': round(spread / mean, 2),
556 })
558 consensus = len(anomalies) == 0
559 if not consensus:
560 IntegrityService.increase_fraud_score(
561 db, target_node_id, FRAUD_WEIGHTS['score_jump'] * 0.5,
562 f'Consensus disagreement: {anomalies}',
563 {'reports': reports, 'anomalies': anomalies})
565 return {'consensus': consensus, 'reports': reports, 'anomalies': anomalies}
567 # ─── Fraud Detection ───
569 @staticmethod
570 def detect_impression_anomaly(db: Session, node_id: str,
571 period_hours: int = 24) -> Optional[Dict]:
572 """Compare node's impression rate to network average. Flag if >3 stddev."""
573 cutoff = datetime.utcnow() - timedelta(hours=period_hours)
575 # Get per-node impression counts
576 node_counts = db.query(
577 AdImpression.node_id,
578 sqlfunc.count(AdImpression.id).label('cnt'),
579 ).filter(
580 AdImpression.created_at >= cutoff,
581 AdImpression.node_id.isnot(None),
582 ).group_by(AdImpression.node_id).all()
584 if len(node_counts) < 3:
585 return None # Not enough data
587 counts = [nc.cnt for nc in node_counts]
588 target_count = next((nc.cnt for nc in node_counts if nc.node_id == node_id), 0)
590 if target_count < 50:
591 return None # Too low to flag
593 mean = statistics.mean(counts)
594 stddev = statistics.stdev(counts) if len(counts) > 1 else 0
596 if stddev == 0:
597 return None
599 z_score = (target_count - mean) / stddev
601 if z_score > IMPRESSION_ANOMALY_STDDEV:
602 return IntegrityService._create_fraud_alert(
603 db, node_id, 'impression_anomaly', 'high',
604 f'Impression rate anomaly: {target_count} impressions '
605 f'(z-score={z_score:.1f}, mean={mean:.1f}, stddev={stddev:.1f})',
606 FRAUD_WEIGHTS['impression_anomaly'],
607 {'count': target_count, 'mean': round(mean, 2),
608 'stddev': round(stddev, 2), 'z_score': round(z_score, 2)})
610 return None
612 @staticmethod
613 def detect_score_jump(db: Session, node_id: str) -> Optional[Dict]:
614 """Flag if agent_count or post_count jumped >200% in 24h."""
615 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
616 if not peer:
617 return None
619 metadata = peer.metadata_json or {}
620 prev_agents = metadata.get('_prev_agent_count', 0)
621 prev_posts = metadata.get('_prev_post_count', 0)
622 current_agents = peer.agent_count or 0
623 current_posts = peer.post_count or 0
625 alert = None
626 if prev_agents > 5 and current_agents > prev_agents * (1 + SCORE_JUMP_THRESHOLD_PCT / 100):
627 alert = IntegrityService._create_fraud_alert(
628 db, node_id, 'score_jump', 'medium',
629 f'Agent count jumped from {prev_agents} to {current_agents} '
630 f'({((current_agents / prev_agents) - 1) * 100:.0f}% increase)',
631 FRAUD_WEIGHTS['score_jump'],
632 {'prev': prev_agents, 'current': current_agents, 'field': 'agent_count'})
634 if prev_posts > 10 and current_posts > prev_posts * (1 + SCORE_JUMP_THRESHOLD_PCT / 100):
635 alert = IntegrityService._create_fraud_alert(
636 db, node_id, 'score_jump', 'medium',
637 f'Post count jumped from {prev_posts} to {current_posts}',
638 FRAUD_WEIGHTS['score_jump'],
639 {'prev': prev_posts, 'current': current_posts, 'field': 'post_count'})
641 # Store current values as previous for next check
642 if not metadata:
643 metadata = {}
644 metadata['_prev_agent_count'] = current_agents
645 metadata['_prev_post_count'] = current_posts
646 metadata['_last_score_check'] = datetime.utcnow().isoformat()
647 peer.metadata_json = metadata
649 return alert
651 @staticmethod
652 def detect_collusion(db: Session, node_id: str) -> Optional[Dict]:
653 """Detect if >80% attestations come from a single peer."""
654 cutoff = datetime.utcnow() - timedelta(days=ATTESTATION_EXPIRY_DAYS)
655 attestations = db.query(NodeAttestation).filter(
656 NodeAttestation.subject_node_id == node_id,
657 NodeAttestation.created_at >= cutoff,
658 NodeAttestation.is_valid == True,
659 ).all()
661 if len(attestations) < 5:
662 return None
664 attester_counts = {}
665 for a in attestations:
666 attester_counts[a.attester_node_id] = attester_counts.get(
667 a.attester_node_id, 0) + 1
669 total = sum(attester_counts.values())
670 for attester_id, count in attester_counts.items():
671 ratio = count / total
672 if ratio > 0.8:
673 return IntegrityService._create_fraud_alert(
674 db, node_id, 'collusion_suspected', 'high',
675 f'Collusion suspected: {count}/{total} attestations '
676 f'({ratio:.0%}) from single peer {attester_id[:8]}',
677 FRAUD_WEIGHTS['collusion_suspected'],
678 {'dominant_attester': attester_id, 'ratio': round(ratio, 3),
679 'count': count, 'total': total})
681 return None
683 @staticmethod
684 def run_full_audit(db: Session, node_id: str,
685 registry_url: str = None) -> Dict:
686 """Run all fraud detection checks on a node, including reward hacking.
688 Also applies fraud score decay and checks ban expiry — good nodes
689 recover trust over time, banned nodes serve their sentence then
690 return to 'suspicious' for re-evaluation.
691 """
692 results = {}
694 # Decay fraud scores + check ban expiry BEFORE running checks.
695 # This ensures previously-banned nodes get a fair reassessment.
696 results['decay'] = IntegrityService.apply_fraud_score_decay(db)
698 results['code_hash'] = IntegrityService.verify_code_hash(
699 db, node_id, registry_url)
700 results['impression_anomaly'] = IntegrityService.detect_impression_anomaly(
701 db, node_id)
702 results['score_jump'] = IntegrityService.detect_score_jump(db, node_id)
703 results['collusion'] = IntegrityService.detect_collusion(db, node_id)
704 results['audit_dominance'] = IntegrityService.verify_audit_dominance(
705 db, node_id)
707 # Reward hacking detection
708 results['reward_velocity'] = IntegrityService.detect_reward_velocity_anomaly(
709 db, node_id)
710 results['reward_self_dealing'] = IntegrityService.detect_reward_self_dealing(
711 db, node_id)
712 results['spark_gaming'] = IntegrityService.detect_spark_gaming(db, node_id)
714 # Impression integrity (collusion + tampering)
715 results['witness_ring'] = IntegrityService.detect_witness_ring(db, node_id)
716 results['temporal_clustering'] = IntegrityService.detect_temporal_clustering(
717 db, node_id)
718 results['seal_integrity'] = IntegrityService.verify_all_sealed_impressions(
719 db, node_id, limit=50)
721 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
722 results['fraud_score'] = peer.fraud_score if peer else 0.0
723 results['integrity_status'] = peer.integrity_status if peer else 'unknown'
724 results['ban_count'] = peer.ban_count if peer else 0
725 results['ban_until'] = peer.ban_until.isoformat() if peer and peer.ban_until else None
727 # Auto-isolate if multiple reward hacking signals fire
728 reward_hack_signals = sum(1 for k in (
729 'reward_velocity', 'reward_self_dealing', 'spark_gaming'
730 ) if results.get(k) is not None)
731 if reward_hack_signals >= 2:
732 results['isolation'] = IntegrityService.isolate_reward_hacker(
733 db, node_id,
734 f'Multiple reward hacking signals: {reward_hack_signals}/3 triggered',
735 {'signals': {k: results.get(k) for k in (
736 'reward_velocity', 'reward_self_dealing', 'spark_gaming'
737 ) if results.get(k)}})
739 # Auto-isolate impression fraud: witness_ring + temporal_clustering together
740 impression_fraud_signals = sum(1 for k in (
741 'witness_ring', 'temporal_clustering',
742 ) if results.get(k) is not None)
743 seal_tampered = (results.get('seal_integrity') or {}).get('tampered', 0) > 0
744 if impression_fraud_signals >= 2 or (impression_fraud_signals >= 1 and seal_tampered):
745 results['impression_isolation'] = IntegrityService.isolate_reward_hacker(
746 db, node_id,
747 f'Impression fraud: {impression_fraud_signals} signals + '
748 f'seal_tampered={seal_tampered}',
749 {'witness_ring': results.get('witness_ring'),
750 'temporal_clustering': results.get('temporal_clustering'),
751 'seal_integrity': results.get('seal_integrity')})
753 return results
755 # ─── Post-Update Peer Witness Verification ───
757 @staticmethod
758 def verify_post_update(db: Session, node_id: str,
759 expected_version: str = '') -> Dict:
760 """Verify a node after it reports an upgrade.
762 Challenges code_hash, verifies guardrail_hash, and records
763 attestation in NodeAttestation table. Called on next gossip round
764 after a peer announces a new version.
765 """
766 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
767 if not peer:
768 return {'verified': False, 'reason': 'peer not found'}
770 results = {'node_id': node_id, 'checks': {}}
772 # 1. Code hash verification (against master-signed manifest)
773 hash_result = IntegrityService.verify_code_hash(db, node_id)
774 results['checks']['code_hash'] = hash_result
776 # 2. Guardrail hash verification via challenge
777 if peer.url:
778 try:
779 challenge_result = IntegrityService.create_challenge(
780 db, challenger_node_id='self',
781 target_node_id=node_id,
782 target_url=peer.url,
783 challenge_type='code_hash_check')
784 results['checks']['challenge'] = challenge_result or {}
785 except Exception as e:
786 results['checks']['challenge'] = {'error': str(e)}
788 # 3. Guardrail hash match check from latest gossip info
789 try:
790 from security.hive_guardrails import get_guardrail_hash
791 our_hash = get_guardrail_hash()
792 peer_hash = getattr(peer, 'guardrail_hash', None)
793 if peer_hash and peer_hash != our_hash:
794 results['checks']['guardrail_hash'] = {
795 'match': False, 'our': our_hash[:16], 'theirs': peer_hash[:16]}
796 IntegrityService.increase_fraud_score(
797 db, node_id, 10.0,
798 f'Post-update guardrail hash mismatch',
799 {'expected': our_hash[:16], 'got': peer_hash[:16]})
800 else:
801 results['checks']['guardrail_hash'] = {'match': True}
802 except ImportError:
803 results['checks']['guardrail_hash'] = {'skipped': True}
805 # 4. Record attestation
806 try:
807 attestation = NodeAttestation(
808 node_id=node_id,
809 attestation_type='post_update_verification',
810 attestation_data={
811 'version': expected_version,
812 'code_hash_verified': hash_result.get('verified', False),
813 'checks': {k: bool(v.get('verified') or v.get('match'))
814 for k, v in results['checks'].items()
815 if isinstance(v, dict)},
816 },
817 )
818 db.add(attestation)
819 db.flush()
820 results['attestation_id'] = attestation.id
821 except Exception as e:
822 results['attestation_error'] = str(e)
824 all_passed = all(
825 v.get('verified') or v.get('match') or v.get('skipped') or v.get('passed')
826 for v in results['checks'].values()
827 if isinstance(v, dict)
828 )
829 results['verified'] = all_passed
830 return results
832 # ─── Audit Compute Dominance ───
834 @staticmethod
835 def verify_audit_dominance(db: Session, target_node_id: str) -> Dict:
836 """Verify that the compute available for auditing a node exceeds
837 that node's own compute. No node should be able to outcompute its auditors.
839 Principle: audit_compute > target_compute — always.
840 This is enforced by compute democracy (max 5% influence) but we
841 verify it explicitly here to catch edge cases.
842 """
843 target = db.query(PeerNode).filter_by(node_id=target_node_id).first()
844 if not target:
845 return {'dominant': True, 'details': 'Unknown node'}
847 target_compute = _get_node_compute(target)
849 # Sum compute of all active non-banned peers (excluding target)
850 auditors = db.query(PeerNode).filter(
851 PeerNode.status == 'active',
852 PeerNode.node_id != target_node_id,
853 PeerNode.integrity_status != 'banned',
854 ).all()
856 auditor_compute = sum(_get_node_compute(p) for p in auditors)
857 auditor_count = len(auditors)
859 # The audit collective must have more compute than the target
860 dominant = auditor_compute > target_compute
862 if not dominant and auditor_count > 0:
863 # If a single node has more compute than all its auditors combined,
864 # flag it — this violates compute democracy
865 logger.warning(
866 f"Audit dominance violation: node {target_node_id[:8]} has "
867 f"{target_compute} compute vs {auditor_compute} auditor compute "
868 f"({auditor_count} auditors)")
869 IntegrityService.increase_fraud_score(
870 db, target_node_id, 10.0,
871 f'Audit dominance violation: target compute ({target_compute}) '
872 f'exceeds auditor compute ({auditor_compute})',
873 {'target_compute': target_compute,
874 'auditor_compute': auditor_compute,
875 'auditor_count': auditor_count})
877 return {
878 'dominant': dominant,
879 'target_compute': target_compute,
880 'auditor_compute': auditor_compute,
881 'auditor_count': auditor_count,
882 'ratio': round(auditor_compute / max(target_compute, 1), 2),
883 }
885 @staticmethod
886 def get_audit_coverage(db: Session) -> Dict:
887 """Network-wide audit dominance report. Returns nodes where
888 audit compute does NOT exceed their own compute."""
889 active_peers = db.query(PeerNode).filter(
890 PeerNode.status == 'active',
891 PeerNode.integrity_status != 'banned',
892 ).all()
894 total_compute = sum(_get_node_compute(p) for p in active_peers)
895 violations = []
897 for peer in active_peers:
898 peer_compute = _get_node_compute(peer)
899 # Auditors = everyone else
900 auditor_compute = total_compute - peer_compute
901 if peer_compute > 0 and auditor_compute <= peer_compute:
902 violations.append({
903 'node_id': peer.node_id,
904 'node_name': peer.name,
905 'compute': peer_compute,
906 'auditor_compute': auditor_compute,
907 'ratio': round(auditor_compute / max(peer_compute, 1), 2),
908 })
910 return {
911 'total_nodes': len(active_peers),
912 'total_compute': total_compute,
913 'violations': violations,
914 'all_dominant': len(violations) == 0,
915 }
917 # ─── Fraud Score Management ───
919 @staticmethod
920 def increase_fraud_score(db: Session, node_id: str, delta: float,
921 reason: str, evidence: dict = None) -> float:
922 """Increase a node's fraud_score. Auto-bans at threshold. Creates FraudAlert."""
923 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
924 if not peer:
925 return 0.0
927 peer.fraud_score = min((peer.fraud_score or 0) + delta, 100.0)
929 # Determine severity
930 severity = 'low'
931 if delta >= 20:
932 severity = 'high'
933 elif delta >= 10:
934 severity = 'medium'
935 if peer.fraud_score >= FRAUD_BAN_THRESHOLD:
936 severity = 'critical'
938 # Create fraud alert
939 alert = FraudAlert(
940 node_id=node_id,
941 alert_type=_determine_alert_type(reason),
942 severity=severity,
943 description=reason,
944 evidence_json=evidence or {},
945 fraud_score_delta=delta,
946 )
947 db.add(alert)
949 # Auto-ban at threshold — fail2ban progressive duration
950 if peer.fraud_score >= FRAUD_BAN_THRESHOLD:
951 IntegrityService._apply_fail2ban(db, peer, reason)
952 elif peer.fraud_score >= 40:
953 peer.integrity_status = 'suspicious'
955 db.flush()
956 return peer.fraud_score
958 @staticmethod
959 def _apply_fail2ban(db: Session, peer: 'PeerNode', reason: str):
960 """Apply fail2ban progressive ban. Each offense = longer ban.
962 1st ban: 1 hour, 2nd: 24 hours, 3rd: 7 days, 4th+: 30 days.
963 ban_count persists across unbans — history is never erased.
964 """
965 peer.ban_count = (peer.ban_count or 0) + 1
966 peer.integrity_status = 'banned'
968 duration = FAIL2BAN_DURATIONS.get(peer.ban_count, FAIL2BAN_MAX_DURATION)
969 peer.ban_until = datetime.utcnow() + duration
971 logger.warning(
972 f"Node {peer.node_id[:8]} fail2ban: offense #{peer.ban_count}, "
973 f"banned until {peer.ban_until.isoformat()}, "
974 f"fraud_score={peer.fraud_score}, reason={reason}")
976 @staticmethod
977 def decrease_fraud_score(db: Session, node_id: str, delta: float,
978 reason: str) -> float:
979 """Decrease fraud_score (e.g., after successful verification)."""
980 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
981 if not peer:
982 return 0.0
984 peer.fraud_score = max((peer.fraud_score or 0) - delta, 0.0)
986 # Upgrade status if score dropped below threshold
987 if peer.fraud_score < 40 and peer.integrity_status == 'suspicious':
988 peer.integrity_status = 'verified'
990 db.flush()
991 return peer.fraud_score
993 @staticmethod
994 def apply_fraud_score_decay(db: Session) -> Dict:
995 """Decay all nodes' fraud scores by a small amount each audit round.
997 Good behavior over time earns back trust. Called every audit round
998 (typically every ~5 minutes via AgentDaemon integrity tick).
1000 Also checks ban expiry: if a banned node's ban_until has passed,
1001 move to 'suspicious' (not 'verified' — they must prove themselves).
1003 Returns summary of decay effects.
1004 """
1005 now = datetime.utcnow()
1006 decayed = 0
1007 unbanned = 0
1009 # Decay all non-zero fraud scores
1010 peers_with_score = db.query(PeerNode).filter(
1011 PeerNode.fraud_score > FRAUD_SCORE_DECAY_MIN
1012 ).all()
1014 for peer in peers_with_score:
1015 old_score = peer.fraud_score or 0
1016 peer.fraud_score = max(old_score - FRAUD_SCORE_DECAY_PER_ROUND, 0.0)
1017 if peer.fraud_score != old_score:
1018 decayed += 1
1020 # If score drops below suspicious threshold, upgrade
1021 if (peer.fraud_score < 40 and
1022 peer.integrity_status == 'suspicious'):
1023 peer.integrity_status = 'verified'
1025 # Check ban expiry (fail2ban timer)
1026 banned_peers = db.query(PeerNode).filter(
1027 PeerNode.integrity_status == 'banned',
1028 PeerNode.ban_until != None, # noqa: E711 (SQLAlchemy)
1029 PeerNode.ban_until <= now,
1030 ).all()
1032 for peer in banned_peers:
1033 peer.integrity_status = 'suspicious'
1034 peer.fraud_score = min(peer.fraud_score or 0, 50.0)
1035 peer.ban_until = None
1036 unbanned += 1
1037 logger.info(
1038 f"Node {peer.node_id[:8]} ban expired (offense #{peer.ban_count}). "
1039 f"Status → suspicious. Will be re-audited.")
1041 if decayed or unbanned:
1042 db.flush()
1044 return {
1045 'decayed_count': decayed,
1046 'unbanned_count': unbanned,
1047 'total_with_score': len(peers_with_score),
1048 }
1050 @staticmethod
1051 def ban_node(db: Session, node_id: str, reason: str):
1052 """Set integrity_status='banned' with fail2ban progression."""
1053 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
1054 if peer:
1055 peer.fraud_score = 100.0
1056 IntegrityService._apply_fail2ban(db, peer, reason)
1057 alert = FraudAlert(
1058 node_id=node_id, alert_type='manual_ban',
1059 severity='critical', description=reason,
1060 )
1061 db.add(alert)
1062 db.flush()
1064 @staticmethod
1065 def unban_node(db: Session, node_id: str, admin_user_id: str):
1066 """Admin action to unban a node."""
1067 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
1068 if peer:
1069 peer.integrity_status = 'unverified'
1070 peer.fraud_score = 0.0
1071 alert = FraudAlert(
1072 node_id=node_id, alert_type='unban',
1073 severity='low',
1074 description=f'Unbanned by admin {admin_user_id}',
1075 reviewed_by=admin_user_id,
1076 reviewed_at=datetime.utcnow(),
1077 status='dismissed',
1078 )
1079 db.add(alert)
1080 db.flush()
1082 # ─── Registry (Central Trust Anchor) ───
1084 @staticmethod
1085 def register_with_registry(registry_url: str, node_id: str,
1086 public_key_hex: str, version: str) -> bool:
1087 """POST to registry to register this node's public key."""
1088 try:
1089 from security.node_integrity import compute_code_hash, sign_json_payload
1090 payload = {
1091 'node_id': node_id,
1092 'public_key': public_key_hex,
1093 'version': version,
1094 'code_hash': compute_code_hash(),
1095 }
1096 # Include release manifest info if available
1097 try:
1098 from security.master_key import load_release_manifest
1099 manifest = load_release_manifest()
1100 if manifest:
1101 payload['release_version'] = manifest.get('version', '')
1102 payload['release_manifest_signature'] = manifest.get('master_signature', '')
1103 except Exception:
1104 pass
1105 payload['signature'] = sign_json_payload(payload)
1106 resp = pooled_post(
1107 f"{registry_url}/api/social/integrity/register-node",
1108 json=payload,
1109 timeout=10,
1110 )
1111 return resp.status_code == 200
1112 except Exception:
1113 return False
1115 @staticmethod
1116 def check_registry_ban_list(registry_url: str) -> List[str]:
1117 """GET banned node_ids from registry."""
1118 try:
1119 resp = pooled_get(
1120 f"{registry_url}/api/social/integrity/ban-list",
1121 timeout=5,
1122 )
1123 if resp.status_code == 200:
1124 return resp.json().get('banned_node_ids', [])
1125 except requests.RequestException:
1126 pass
1127 return []
1129 @staticmethod
1130 def pull_trusted_keys(registry_url: str) -> Dict[str, str]:
1131 """GET verified node public keys from registry."""
1132 try:
1133 resp = pooled_get(
1134 f"{registry_url}/api/social/integrity/trusted-keys",
1135 timeout=5,
1136 )
1137 if resp.status_code == 200:
1138 return resp.json().get('keys', {})
1139 except requests.RequestException:
1140 pass
1141 return {}
1143 # ─── Queries ───
1145 @staticmethod
1146 def get_fraud_alerts(db: Session, node_id: str = None,
1147 status: str = None, severity: str = None,
1148 limit: int = 50, offset: int = 0) -> List[Dict]:
1149 """Query fraud alerts with filters."""
1150 q = db.query(FraudAlert)
1151 if node_id:
1152 q = q.filter(FraudAlert.node_id == node_id)
1153 if status:
1154 q = q.filter(FraudAlert.status == status)
1155 if severity:
1156 q = q.filter(FraudAlert.severity == severity)
1157 alerts = q.order_by(FraudAlert.created_at.desc()).offset(offset).limit(limit).all()
1158 return [a.to_dict() for a in alerts]
1160 @staticmethod
1161 def update_alert(db: Session, alert_id: str, status: str,
1162 reviewed_by: str) -> Optional[Dict]:
1163 """Update a fraud alert status."""
1164 alert = db.query(FraudAlert).filter_by(id=alert_id).first()
1165 if not alert:
1166 return None
1167 alert.status = status
1168 alert.reviewed_by = reviewed_by
1169 alert.reviewed_at = datetime.utcnow()
1170 db.flush()
1171 return alert.to_dict()
1173 @staticmethod
1174 def get_integrity_dashboard(db: Session) -> Dict:
1175 """Overview stats for admin dashboard."""
1176 total_nodes = db.query(sqlfunc.count(PeerNode.id)).scalar() or 0
1177 verified = db.query(sqlfunc.count(PeerNode.id)).filter(
1178 PeerNode.integrity_status == 'verified').scalar() or 0
1179 suspicious = db.query(sqlfunc.count(PeerNode.id)).filter(
1180 PeerNode.integrity_status == 'suspicious').scalar() or 0
1181 banned = db.query(sqlfunc.count(PeerNode.id)).filter(
1182 PeerNode.integrity_status == 'banned').scalar() or 0
1183 open_alerts = db.query(sqlfunc.count(FraudAlert.id)).filter(
1184 FraudAlert.status == 'open').scalar() or 0
1186 return {
1187 'total_nodes': total_nodes,
1188 'verified': verified,
1189 'suspicious': suspicious,
1190 'banned': banned,
1191 'unverified': total_nodes - verified - suspicious - banned,
1192 'open_alerts': open_alerts,
1193 }
1195 # ─── Reward Hacking Detection ───
1197 @staticmethod
1198 def detect_reward_velocity_anomaly(db: Session, node_id: str,
1199 period_hours: int = 24) -> Optional[Dict]:
1200 """Detect nodes claiming rewards at anomalously high rates.
1202 Compares this node's reward claim rate to the network average.
1203 A node receiving >3 stddev above mean reward amount per period
1204 is flagged for investigation.
1205 """
1206 cutoff = datetime.utcnow() - timedelta(hours=period_hours)
1208 # Get per-node reward totals in the period
1209 node_totals = db.query(
1210 HostingReward.node_id,
1211 sqlfunc.sum(HostingReward.amount).label('total'),
1212 sqlfunc.count(HostingReward.id).label('claim_count'),
1213 ).filter(
1214 HostingReward.created_at >= cutoff,
1215 ).group_by(HostingReward.node_id).all()
1217 if len(node_totals) < 3:
1218 return None # Not enough nodes to compare
1220 amounts = [nt.total for nt in node_totals]
1221 target_entry = next(
1222 (nt for nt in node_totals if nt.node_id == node_id), None)
1223 if not target_entry or target_entry.total < 10:
1224 return None # Too low to flag
1226 mean = statistics.mean(amounts)
1227 stddev = statistics.stdev(amounts) if len(amounts) > 1 else 0
1228 if stddev == 0:
1229 return None
1231 z_score = (target_entry.total - mean) / stddev
1233 if z_score > 3.0:
1234 return IntegrityService._create_fraud_alert(
1235 db, node_id, 'reward_velocity_anomaly', 'high',
1236 f'Reward velocity anomaly: {target_entry.total:.1f} Spark '
1237 f'in {period_hours}h ({target_entry.claim_count} claims, '
1238 f'z-score={z_score:.1f}, mean={mean:.1f})',
1239 FRAUD_WEIGHTS['reward_velocity_anomaly'],
1240 {'total': round(target_entry.total, 2),
1241 'claim_count': target_entry.claim_count,
1242 'mean': round(mean, 2), 'stddev': round(stddev, 2),
1243 'z_score': round(z_score, 2),
1244 'period_hours': period_hours})
1246 return None
1248 @staticmethod
1249 def detect_reward_self_dealing(db: Session, node_id: str) -> Optional[Dict]:
1250 """Detect circular reward patterns — nodes awarding rewards to
1251 themselves or to a small ring of accomplices who reciprocate.
1253 Checks:
1254 1. Node's operator_id matches the node's own user account
1255 2. Reward claims where the same small group witnesses each other
1256 """
1257 # Check 1: Self-referential rewards
1258 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
1259 if not peer:
1260 return None
1262 operator_id = peer.node_operator_id
1263 self_rewards = db.query(HostingReward).filter(
1264 HostingReward.node_id == node_id,
1265 HostingReward.operator_id == operator_id,
1266 ).count()
1268 total_rewards = db.query(HostingReward).filter(
1269 HostingReward.node_id == node_id,
1270 ).count()
1272 if total_rewards > 5 and self_rewards > total_rewards * 0.5:
1273 return IntegrityService._create_fraud_alert(
1274 db, node_id, 'reward_self_dealing', 'critical',
1275 f'Self-dealing: {self_rewards}/{total_rewards} rewards '
1276 f'({self_rewards / total_rewards * 100:.0f}%) awarded to '
1277 f'own operator account',
1278 FRAUD_WEIGHTS['reward_self_dealing'],
1279 {'self_rewards': self_rewards, 'total_rewards': total_rewards,
1280 'ratio': round(self_rewards / total_rewards, 3)})
1282 # Check 2: Witness ring — same small set of peers witness all
1283 # of this node's ad impressions (already partially covered by
1284 # detect_collusion, but this checks the reward side specifically)
1285 cutoff = datetime.utcnow() - timedelta(days=7)
1286 witnessed = db.query(AdImpression).filter(
1287 AdImpression.node_id == node_id,
1288 AdImpression.created_at >= cutoff,
1289 AdImpression.witness_node_id.isnot(None),
1290 ).all()
1292 if len(witnessed) > 10:
1293 witness_set = set(w.witness_node_id for w in witnessed)
1294 if len(witness_set) <= 2:
1295 return IntegrityService._create_fraud_alert(
1296 db, node_id, 'reward_self_dealing', 'high',
1297 f'Witness ring: {len(witnessed)} impressions all '
1298 f'witnessed by only {len(witness_set)} peer(s): '
1299 f'{", ".join(w[:8] for w in witness_set)}',
1300 FRAUD_WEIGHTS['reward_self_dealing'],
1301 {'impression_count': len(witnessed),
1302 'unique_witnesses': len(witness_set),
1303 'witness_ids': list(witness_set)})
1305 return None
1307 @staticmethod
1308 def detect_spark_gaming(db: Session, node_id: str) -> Optional[Dict]:
1309 """Detect goals created to burn Spark budget without producing
1310 real output — a form of reward hacking where the node claims
1311 compute credits for work it didn't meaningfully do.
1313 Signals: high goal failure rate, minimal output, rapid goal cycling.
1314 """
1315 try:
1316 from integrations.social.models import AgentGoal
1317 except ImportError:
1318 return None
1320 # Get goals associated with this node's user
1321 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
1322 if not peer or not peer.node_operator_id:
1323 return None
1325 cutoff = datetime.utcnow() - timedelta(hours=48)
1326 recent_goals = db.query(AgentGoal).filter(
1327 AgentGoal.owner_id == peer.node_operator_id,
1328 AgentGoal.created_at >= cutoff,
1329 ).all()
1331 if len(recent_goals) < 5:
1332 return None # Not enough to judge
1334 total = len(recent_goals)
1335 failed = sum(1 for g in recent_goals if g.status in ('failed', 'error'))
1336 total_spent = sum((g.spark_spent or 0) for g in recent_goals)
1337 completed = sum(1 for g in recent_goals if g.status == 'completed')
1339 # High failure rate with significant spend = gaming
1340 failure_rate = failed / total if total > 0 else 0
1341 if failure_rate > 0.7 and total_spent > 100:
1342 return IntegrityService._create_fraud_alert(
1343 db, node_id, 'spark_gaming', 'high',
1344 f'Spark gaming suspected: {failed}/{total} goals failed '
1345 f'({failure_rate:.0%}) while spending {total_spent} Spark '
1346 f'in 48h. Only {completed} completed.',
1347 FRAUD_WEIGHTS['spark_gaming'],
1348 {'total_goals': total, 'failed': failed,
1349 'completed': completed, 'spark_spent': total_spent,
1350 'failure_rate': round(failure_rate, 3)})
1352 # Rapid cycling: many goals created and immediately abandoned
1353 abandoned = sum(1 for g in recent_goals
1354 if g.status == 'archived' and (g.spark_spent or 0) > 0)
1355 if abandoned > 10 and total_spent > 200:
1356 return IntegrityService._create_fraud_alert(
1357 db, node_id, 'spark_gaming', 'medium',
1358 f'Goal cycling: {abandoned} goals abandoned after spending '
1359 f'{total_spent} Spark in 48h',
1360 FRAUD_WEIGHTS['spark_gaming'],
1361 {'abandoned': abandoned, 'spark_spent': total_spent})
1363 return None
1365 # ─── Impression Integrity (Collusion + Tampering) ───
1367 @staticmethod
1368 def detect_witness_ring(db: Session, node_id: str,
1369 period_days: int = 7,
1370 min_impressions: int = 10,
1371 max_witnesses: int = 2,
1372 ring_ratio: float = 0.9) -> Optional[Dict]:
1373 """Detect small witness rings — a tight group of nodes that
1374 exclusively witness each other's ad impressions.
1376 Unlike detect_collusion (which checks attestation concentration),
1377 this checks bidirectional impression witnessing: if A witnesses B
1378 and B witnesses A with the same tiny set, it's a ring.
1380 Signals:
1381 1. Node's impressions witnessed by <=max_witnesses unique peers
1382 2. Those same peers have their impressions witnessed by this node
1383 3. The overlap ratio exceeds ring_ratio
1384 """
1385 cutoff = datetime.utcnow() - timedelta(days=period_days)
1387 # 1. Who witnesses this node's impressions?
1388 node_impressions = db.query(AdImpression).filter(
1389 AdImpression.node_id == node_id,
1390 AdImpression.created_at >= cutoff,
1391 AdImpression.witness_node_id.isnot(None),
1392 ).all()
1394 if len(node_impressions) < min_impressions:
1395 return None # Not enough data
1397 witness_counts = {}
1398 for imp in node_impressions:
1399 witness_counts[imp.witness_node_id] = witness_counts.get(
1400 imp.witness_node_id, 0) + 1
1402 unique_witnesses = set(witness_counts.keys())
1403 if len(unique_witnesses) > max_witnesses:
1404 return None # Diverse enough
1406 # 2. Check bidirectional: does this node witness those peers back?
1407 ring_members = set()
1408 for witness_id in unique_witnesses:
1409 reverse_count = db.query(sqlfunc.count(AdImpression.id)).filter(
1410 AdImpression.node_id == witness_id,
1411 AdImpression.witness_node_id == node_id,
1412 AdImpression.created_at >= cutoff,
1413 ).scalar() or 0
1414 if reverse_count >= min_impressions // 2:
1415 ring_members.add(witness_id)
1417 if not ring_members:
1418 return None
1420 # 3. Calculate ring tightness
1421 total_witnessed = len(node_impressions)
1422 ring_witnessed = sum(witness_counts.get(m, 0) for m in ring_members)
1423 ratio = ring_witnessed / total_witnessed if total_witnessed > 0 else 0
1425 if ratio >= ring_ratio:
1426 return IntegrityService._create_fraud_alert(
1427 db, node_id, 'witness_ring', 'high',
1428 f'Witness ring detected: {total_witnessed} impressions, '
1429 f'{len(ring_members)+1} nodes in ring '
1430 f'(bidirectional ratio={ratio:.0%})',
1431 FRAUD_WEIGHTS['witness_ring'],
1432 {'ring_members': [node_id] + list(ring_members),
1433 'total_witnessed': total_witnessed,
1434 'ring_witnessed': ring_witnessed,
1435 'ratio': round(ratio, 3),
1436 'period_days': period_days})
1438 return None
1440 @staticmethod
1441 def detect_temporal_clustering(db: Session, node_id: str,
1442 period_hours: int = 1,
1443 cluster_window_seconds: int = 5,
1444 min_cluster_size: int = 10,
1445 min_impressions: int = 20) -> Optional[Dict]:
1446 """Detect suspiciously tight temporal clustering of impressions.
1448 Legitimate traffic has organic timing variation. Bot-driven or
1449 fabricated impressions often arrive in tight bursts — many
1450 impressions within a few seconds, then silence.
1452 Algorithm: slide a window of cluster_window_seconds across the
1453 node's impressions. If any window contains >=min_cluster_size
1454 impressions, flag it.
1455 """
1456 cutoff = datetime.utcnow() - timedelta(hours=period_hours)
1458 impressions = db.query(AdImpression).filter(
1459 AdImpression.node_id == node_id,
1460 AdImpression.created_at >= cutoff,
1461 ).order_by(AdImpression.created_at.asc()).all()
1463 if len(impressions) < min_impressions:
1464 return None
1466 # Sliding window: find max cluster density
1467 timestamps = [imp.created_at for imp in impressions]
1468 max_cluster = 0
1469 worst_window_start = None
1471 for i, ts in enumerate(timestamps):
1472 window_end = ts + timedelta(seconds=cluster_window_seconds)
1473 # Count impressions within [ts, ts + window]
1474 cluster_count = 0
1475 for j in range(i, len(timestamps)):
1476 if timestamps[j] <= window_end:
1477 cluster_count += 1
1478 else:
1479 break
1480 if cluster_count > max_cluster:
1481 max_cluster = cluster_count
1482 worst_window_start = ts
1484 if max_cluster >= min_cluster_size:
1485 return IntegrityService._create_fraud_alert(
1486 db, node_id, 'temporal_clustering', 'medium',
1487 f'Temporal clustering: {max_cluster} impressions in '
1488 f'{cluster_window_seconds}s window '
1489 f'(total={len(impressions)} in {period_hours}h)',
1490 FRAUD_WEIGHTS['temporal_clustering'],
1491 {'max_cluster': max_cluster,
1492 'window_seconds': cluster_window_seconds,
1493 'total_impressions': len(impressions),
1494 'worst_window_start': worst_window_start.isoformat()
1495 if worst_window_start else None,
1496 'period_hours': period_hours})
1498 return None
1500 @staticmethod
1501 def verify_impression_seal(db: Session, impression_id: str) -> Dict:
1502 """Verify that a sealed impression's hash matches its current data.
1504 Once an impression is sealed (witnessed + hashed), its data should
1505 be immutable. If the recomputed hash doesn't match sealed_hash,
1506 the impression has been tampered with post-seal.
1508 Returns: {'valid': bool, 'details': str, 'impression_id': str}
1509 """
1510 imp = db.query(AdImpression).filter_by(id=impression_id).first()
1511 if not imp:
1512 return {'valid': False, 'details': 'Impression not found',
1513 'impression_id': impression_id}
1515 if not imp.sealed_hash:
1516 return {'valid': True, 'details': 'Not sealed (no hash to verify)',
1517 'impression_id': impression_id}
1519 # Recompute and compare
1520 current_hash = imp.compute_seal_hash
1521 if current_hash == imp.sealed_hash:
1522 return {'valid': True, 'details': 'Seal intact',
1523 'impression_id': impression_id}
1525 # Tampered — raise fraud alert on the node
1526 if imp.node_id:
1527 IntegrityService._create_fraud_alert(
1528 db, imp.node_id, 'seal_tamper', 'critical',
1529 f'Impression seal tampered: id={impression_id[:16]}, '
1530 f'expected={imp.sealed_hash[:16]}..., '
1531 f'got={current_hash[:16]}...',
1532 FRAUD_WEIGHTS['seal_tamper'],
1533 {'impression_id': impression_id,
1534 'sealed_hash': imp.sealed_hash,
1535 'recomputed_hash': current_hash})
1537 return {'valid': False, 'details': 'Seal hash mismatch — tampered',
1538 'impression_id': impression_id,
1539 'sealed_hash': imp.sealed_hash,
1540 'recomputed_hash': current_hash}
1542 @staticmethod
1543 def verify_all_sealed_impressions(db: Session, node_id: str = None,
1544 limit: int = 100) -> Dict:
1545 """Batch-verify sealed impressions. Returns summary of tampered seals."""
1546 q = db.query(AdImpression).filter(
1547 AdImpression.sealed_hash.isnot(None),
1548 )
1549 if node_id:
1550 q = q.filter(AdImpression.node_id == node_id)
1551 impressions = q.order_by(AdImpression.sealed_at.desc()).limit(limit).all()
1553 total = len(impressions)
1554 tampered = 0
1555 tampered_ids = []
1557 for imp in impressions:
1558 result = IntegrityService.verify_impression_seal(db, imp.id)
1559 if not result['valid'] and 'tampered' in result.get('details', '').lower():
1560 tampered += 1
1561 tampered_ids.append(imp.id)
1563 return {
1564 'total_checked': total,
1565 'tampered': tampered,
1566 'tampered_ids': tampered_ids,
1567 'integrity_ratio': round((total - tampered) / total, 3) if total > 0 else 1.0,
1568 }
1570 @staticmethod
1571 def isolate_reward_hacker(db: Session, node_id: str,
1572 reason: str, evidence: dict = None) -> Dict:
1573 """Isolate a confirmed reward hacker from the network.
1575 This is the enforcement action when reward hacking is detected
1576 with high confidence. The node is:
1577 1. Quarantined via TrustQuarantine (ISOLATE level)
1578 2. All pending rewards frozen
1579 3. Fraud score set to maximum
1580 4. Witnesses notified
1581 5. Node cannot re-enter until reviewed by a human auditor
1583 Reward hackers are isolated, not punished — the goal is to
1584 protect the network, not to seek vengeance (per TrustQuarantine).
1585 """
1586 # 1. Set fraud score to max and ban
1587 IntegrityService.increase_fraud_score(
1588 db, node_id, 50.0,
1589 f'Reward hacker isolated: {reason}', evidence)
1591 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
1592 if peer:
1593 peer.integrity_status = 'banned'
1594 peer.fraud_score = 100.0
1596 # 2. Quarantine via guardrail system
1597 try:
1598 from security.hive_guardrails import TrustQuarantine
1599 TrustQuarantine.quarantine(
1600 node_id,
1601 TrustQuarantine.LEVEL_ISOLATE,
1602 f'Reward hacking: {reason}')
1603 except ImportError:
1604 logger.warning("TrustQuarantine not available for isolation")
1606 # 3. Freeze pending rewards (mark as disputed)
1607 pending = db.query(HostingReward).filter(
1608 HostingReward.node_id == node_id,
1609 HostingReward.created_at >= datetime.utcnow() - timedelta(days=30),
1610 ).all()
1611 frozen_amount = 0.0
1612 for reward in pending:
1613 frozen_amount += reward.amount
1614 reward.reason = f'FROZEN: {reward.reason} [reward hack investigation]'
1615 db.flush()
1617 result = {
1618 'node_id': node_id,
1619 'action': 'isolated',
1620 'reason': reason,
1621 'rewards_frozen': len(pending),
1622 'frozen_amount': round(frozen_amount, 2),
1623 'requires_human_review': True,
1624 }
1626 logger.warning(
1627 f"Reward hacker isolated: node={node_id[:8]}, "
1628 f"reason={reason}, frozen={frozen_amount:.2f} Spark")
1630 return result
1632 # ─── Private Helpers ───
1634 @staticmethod
1635 def _create_fraud_alert(db: Session, node_id: str, alert_type: str,
1636 severity: str, description: str,
1637 fraud_delta: float, evidence: dict) -> Dict:
1638 """Create a FraudAlert and increase fraud score."""
1639 IntegrityService.increase_fraud_score(
1640 db, node_id, fraud_delta, description, evidence)
1641 # The alert is created inside increase_fraud_score
1642 return {'node_id': node_id, 'alert_type': alert_type,
1643 'severity': severity, 'description': description}
1646def _get_node_compute(peer) -> float:
1647 """Extract compute score from a PeerNode's metadata.
1648 Uses contribution_score as primary metric, falls back to metadata fields."""
1649 if peer.contribution_score and peer.contribution_score > 0:
1650 return float(peer.contribution_score)
1651 meta = peer.metadata_json or {}
1652 # Check for reported compute capacity (TFLOPS, GPU count, etc.)
1653 compute = meta.get('compute_tflops', 0) or meta.get('gpu_count', 0)
1654 if compute:
1655 return float(compute)
1656 # Minimum: 1.0 (every node has at least some compute)
1657 return 1.0
1660def _determine_alert_type(reason: str) -> str:
1661 """Infer alert type from reason string."""
1662 reason_lower = reason.lower()
1663 if 'reward hack' in reason_lower or 'isolated' in reason_lower:
1664 return 'reward_hacking'
1665 if 'self-dealing' in reason_lower or 'witness ring' in reason_lower:
1666 return 'reward_self_dealing'
1667 if 'spark gaming' in reason_lower or 'goal cycling' in reason_lower:
1668 return 'spark_gaming'
1669 if 'reward velocity' in reason_lower:
1670 return 'reward_velocity_anomaly'
1671 if 'hash' in reason_lower or 'code' in reason_lower:
1672 return 'hash_mismatch'
1673 if 'challenge' in reason_lower:
1674 return 'challenge_fail'
1675 if 'impression' in reason_lower or 'anomaly' in reason_lower:
1676 return 'impression_anomaly'
1677 if 'jump' in reason_lower or 'count' in reason_lower:
1678 return 'score_jump'
1679 if 'collusion' in reason_lower:
1680 return 'collusion_suspected'
1681 if 'witness' in reason_lower:
1682 return 'witness_refusal'
1683 if 'ban' in reason_lower:
1684 return 'manual_ban'
1685 return 'other'