Coverage for integrations / social / integrity_service.py: 61.2%

740 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial - Integrity Verification Service 

3Challenge-response protocol, impression witnessing, consensus verification, 

4fraud scoring. Central service for all anti-fraud logic. 

5""" 

6import os 

7import secrets 

8import logging 

9import statistics 

10import requests 

11from core.http_pool import pooled_get, pooled_post 

12from datetime import datetime, timedelta 

13from typing import Optional, Dict, List 

14 

15from sqlalchemy import func as sqlfunc 

16from sqlalchemy.orm import Session 

17 

18from .models import ( 

19 PeerNode, AdImpression, HostingReward, 

20 NodeAttestation, IntegrityChallenge, FraudAlert, 

21 User, Post, 

22) 

23 

24logger = logging.getLogger('hevolve_social') 

25 

26# Fraud scoring weights 

27FRAUD_WEIGHTS = { 

28 'hash_mismatch': 30.0, 

29 'challenge_fail': 15.0, 

30 'impression_anomaly': 20.0, 

31 'score_jump': 10.0, 

32 'witness_refusal': 5.0, 

33 'collusion_suspected': 25.0, 

34 'reward_velocity_anomaly': 25.0, 

35 'reward_self_dealing': 35.0, 

36 'spark_gaming': 20.0, 

37 'witness_ring': 30.0, 

38 'temporal_clustering': 20.0, 

39 'seal_tamper': 35.0, 

40 'gradient_magnitude_anomaly': 20.0, 

41 'gradient_direction_flip': 25.0, 

42} 

43 

44IMPRESSION_ANOMALY_STDDEV = 3.0 

45SCORE_JUMP_THRESHOLD_PCT = 200 

46FRAUD_BAN_THRESHOLD = 80.0 

47ATTESTATION_EXPIRY_DAYS = 7 

48MIN_WITNESS_PEERS = 1 

49CHALLENGE_TIMEOUT_SECONDS = 30 

50WITNESS_TIMESTAMP_MAX_AGE = 60 # seconds 

51 

52# ── Fraud Score Decay ── 

53# Every audit round, each node's fraud score decays by this amount. 

54# Good behavior over time earns back trust. But ban records persist. 

55FRAUD_SCORE_DECAY_PER_ROUND = 2.0 # Points per audit round (~5 min) 

56FRAUD_SCORE_DECAY_MIN = 0.0 

57 

58# ── Fail2ban: Progressive Ban Durations ── 

59# Each subsequent ban lasts longer. ban_count tracked on PeerNode. 

60# After max_bans (4+), node must petition for human review. 

61FAIL2BAN_DURATIONS = { 

62 1: timedelta(hours=1), # 1st offense: 1 hour 

63 2: timedelta(hours=24), # 2nd offense: 24 hours 

64 3: timedelta(days=7), # 3rd offense: 1 week 

65} 

66FAIL2BAN_MAX_DURATION = timedelta(days=30) # 4th+ offense: 30 days 

67 

68 

69class IntegrityService: 

70 """Central service for node integrity verification and anti-fraud.""" 

71 

72 # ─── Code Hash Verification ─── 

73 

74 @staticmethod 

75 def verify_code_hash(db: Session, node_id: str, 

76 registry_url: str = None) -> Dict: 

77 """Check if node's code_hash matches expected hash. 

78 Priority: master-signed manifest > registry > local computation.""" 

79 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

80 if not peer or not peer.code_hash: 

81 return {'verified': False, 'details': 'No code hash available'} 

82 

83 expected = None 

84 

85 # Priority 0: Release hash registry (multi-version support) 

86 try: 

87 from security.release_hash_registry import get_release_hash_registry 

88 registry = get_release_hash_registry() 

89 if registry.is_known_release_hash(peer.code_hash): 

90 peer.integrity_status = 'verified' 

91 peer.last_attestation_at = datetime.utcnow() 

92 return {'verified': True, 

93 'details': 'Code hash in release registry'} 

94 except Exception: 

95 pass 

96 

97 # Primary: check against master-signed manifest 

98 try: 

99 from security.master_key import load_release_manifest, verify_release_manifest 

100 manifest = load_release_manifest() 

101 if manifest and verify_release_manifest(manifest): 

102 expected = manifest.get('code_hash', '') 

103 except Exception: 

104 pass 

105 

106 # Fallback: registry 

107 if not expected and registry_url: 

108 expected = IntegrityService.fetch_expected_hash( 

109 registry_url, peer.code_version or peer.version) 

110 

111 if not expected: 

112 # Last resort: compare against own code hash 

113 try: 

114 from security.node_integrity import compute_code_hash 

115 expected = compute_code_hash() 

116 except Exception: 

117 return {'verified': False, 'details': 'Cannot compute local hash'} 

118 

119 if peer.code_hash == expected: 

120 peer.integrity_status = 'verified' 

121 peer.last_attestation_at = datetime.utcnow() 

122 return {'verified': True, 'details': 'Code hash matches'} 

123 else: 

124 IntegrityService.increase_fraud_score( 

125 db, node_id, FRAUD_WEIGHTS['hash_mismatch'], 

126 f'Code hash mismatch: expected {expected[:16]}..., got {peer.code_hash[:16]}...', 

127 {'expected': expected, 'reported': peer.code_hash}) 

128 return {'verified': False, 'details': 'Code hash mismatch'} 

129 

130 @staticmethod 

131 def fetch_expected_hash(registry_url: str, version: str) -> Optional[str]: 

132 """GET expected code hash from central registry.""" 

133 try: 

134 resp = pooled_get( 

135 f"{registry_url}/api/social/integrity/expected-hash", 

136 params={'version': version}, 

137 timeout=5, 

138 ) 

139 if resp.status_code == 200: 

140 return resp.json().get('code_hash') 

141 except requests.RequestException: 

142 pass 

143 return None 

144 

145 # ─── Challenge-Response Protocol ─── 

146 

147 @staticmethod 

148 def create_challenge(db: Session, challenger_node_id: str, 

149 target_node_id: str, target_url: str, 

150 challenge_type: str = 'agent_count_verify') -> Optional[Dict]: 

151 """Create and send a challenge to a target node.""" 

152 nonce = secrets.token_hex(32) 

153 

154 challenge_data = {'type': challenge_type, 'nonce': nonce} 

155 if challenge_type == 'agent_count_verify': 

156 peer = db.query(PeerNode).filter_by(node_id=target_node_id).first() 

157 if peer: 

158 challenge_data['claimed_agent_count'] = peer.agent_count or 0 

159 elif challenge_type == 'stats_probe': 

160 challenge_data['requested_stats'] = ['agent_count', 'post_count'] 

161 elif challenge_type == 'code_hash_check': 

162 challenge_data['request'] = 'code_hash_and_version' 

163 

164 # Sign the challenge 

165 try: 

166 from security.node_integrity import sign_json_payload, get_public_key_hex 

167 challenge_data['challenger_public_key'] = get_public_key_hex() 

168 challenge_data['signature'] = sign_json_payload(challenge_data) 

169 except Exception: 

170 pass 

171 

172 challenge = IntegrityChallenge( 

173 challenger_node_id=challenger_node_id, 

174 target_node_id=target_node_id, 

175 challenge_type=challenge_type, 

176 challenge_nonce=nonce, 

177 challenge_data=challenge_data, 

178 status='pending', 

179 ) 

180 db.add(challenge) 

181 db.flush() 

182 

183 # Send challenge to target node 

184 try: 

185 resp = pooled_post( 

186 f"{target_url}/api/social/integrity/challenge", 

187 json={'challenge_id': challenge.id, **challenge_data, 

188 'challenger_node_id': challenger_node_id}, 

189 timeout=CHALLENGE_TIMEOUT_SECONDS, 

190 ) 

191 if resp.status_code == 200: 

192 response_data = resp.json() 

193 return IntegrityService.evaluate_challenge_response( 

194 db, challenge.id, 

195 response_data.get('response', {}), 

196 response_data.get('signature', '')) 

197 except requests.RequestException: 

198 challenge.status = 'timeout' 

199 IntegrityService.increase_fraud_score( 

200 db, target_node_id, 5.0, 

201 f'Challenge timeout: {challenge_type}') 

202 

203 peer = db.query(PeerNode).filter_by(node_id=target_node_id).first() 

204 if peer: 

205 peer.last_challenge_at = datetime.utcnow() 

206 return challenge.to_dict() 

207 

208 @staticmethod 

209 def handle_challenge(db: Session, challenge_data: dict) -> Dict: 

210 """Handle an incoming challenge from a peer. Compute response from local data.""" 

211 challenge_type = challenge_data.get('type', '') 

212 nonce = challenge_data.get('nonce', '') 

213 response = {'nonce': nonce, 'timestamp': datetime.utcnow().isoformat()} 

214 

215 if challenge_type == 'agent_count_verify': 

216 actual_count = db.query(sqlfunc.count(User.id)).filter_by( 

217 user_type='agent').scalar() or 0 

218 response['agent_count'] = actual_count 

219 # Include sample agent IDs as proof 

220 agents = db.query(User.id, User.username).filter_by( 

221 user_type='agent').limit(5).all() 

222 response['sample_agents'] = [ 

223 {'id': a.id, 'username': a.username} for a in agents] 

224 

225 elif challenge_type == 'stats_probe': 

226 response['agent_count'] = db.query(sqlfunc.count(User.id)).filter_by( 

227 user_type='agent').scalar() or 0 

228 response['post_count'] = db.query(sqlfunc.count(Post.id)).scalar() or 0 

229 

230 elif challenge_type == 'code_hash_check': 

231 try: 

232 from security.node_integrity import compute_code_hash 

233 from integrations.social.peer_discovery import gossip 

234 response['code_hash'] = compute_code_hash() 

235 response['version'] = gossip.version 

236 except Exception: 

237 response['code_hash'] = 'unavailable' 

238 

239 elif challenge_type == 'guardrail_verify': 

240 try: 

241 from security.hive_guardrails import get_guardrail_hash, compute_guardrail_hash 

242 response['guardrail_hash'] = get_guardrail_hash() 

243 # Recompute live to prove it's not cached/stale 

244 response['guardrail_hash_live'] = compute_guardrail_hash() 

245 except Exception: 

246 response['guardrail_hash'] = 'unavailable' 

247 

248 elif challenge_type == 'impression_audit': 

249 ad_id = challenge_data.get('ad_id') 

250 if ad_id: 

251 hour_ago = datetime.utcnow() - timedelta(hours=1) 

252 count = db.query(sqlfunc.count(AdImpression.id)).filter( 

253 AdImpression.ad_id == ad_id, 

254 AdImpression.created_at >= hour_ago, 

255 ).scalar() or 0 

256 response['impression_count'] = count 

257 

258 # Sign the response 

259 try: 

260 from security.node_integrity import sign_json_payload, get_public_key_hex 

261 response['public_key'] = get_public_key_hex() 

262 sig = sign_json_payload(response) 

263 return {'response': response, 'signature': sig} 

264 except Exception: 

265 return {'response': response, 'signature': ''} 

266 

267 @staticmethod 

268 def evaluate_challenge_response(db: Session, challenge_id: str, 

269 response_data: dict, 

270 response_signature: str) -> Dict: 

271 """Evaluate a challenge response. Verify signature and data consistency.""" 

272 challenge = db.query(IntegrityChallenge).filter_by(id=challenge_id).first() 

273 if not challenge: 

274 return {'passed': False, 'details': 'Challenge not found'} 

275 # Prevent replay: reject already-evaluated challenges 

276 if challenge.status != 'pending': 

277 return {'passed': False, 'details': f'Challenge already processed (status={challenge.status})'} 

278 

279 challenge.response_data = response_data 

280 challenge.response_signature = response_signature 

281 challenge.responded_at = datetime.utcnow() 

282 

283 # Verify nonce 

284 if response_data.get('nonce') != challenge.challenge_nonce: 

285 challenge.status = 'failed' 

286 challenge.result_details = 'Nonce mismatch' 

287 IntegrityService.increase_fraud_score( 

288 db, challenge.target_node_id, FRAUD_WEIGHTS['challenge_fail'], 

289 'Challenge failed: nonce mismatch') 

290 return {'passed': False, 'details': 'Nonce mismatch'} 

291 

292 # Verify signature if public key available 

293 public_key = response_data.get('public_key', '') 

294 if public_key and response_signature: 

295 try: 

296 from security.node_integrity import verify_json_signature 

297 if not verify_json_signature(public_key, response_data, response_signature): 

298 challenge.status = 'failed' 

299 challenge.result_details = 'Invalid signature' 

300 IntegrityService.increase_fraud_score( 

301 db, challenge.target_node_id, FRAUD_WEIGHTS['challenge_fail'], 

302 'Challenge failed: invalid signature') 

303 return {'passed': False, 'details': 'Invalid signature'} 

304 except Exception: 

305 pass 

306 

307 # Evaluate based on challenge type 

308 passed = True 

309 details = 'OK' 

310 

311 if challenge.challenge_type == 'agent_count_verify': 

312 claimed = (challenge.challenge_data or {}).get('claimed_agent_count', 0) 

313 actual = response_data.get('agent_count', 0) 

314 # Allow 10% tolerance 

315 if claimed > 0 and actual < claimed * 0.5: 

316 passed = False 

317 details = f'Agent count mismatch: claimed {claimed}, actual {actual}' 

318 

319 elif challenge.challenge_type == 'stats_probe': 

320 peer = db.query(PeerNode).filter_by( 

321 node_id=challenge.target_node_id).first() 

322 if peer: 

323 reported_agents = response_data.get('agent_count', 0) 

324 if peer.agent_count and reported_agents < (peer.agent_count or 0) * 0.5: 

325 passed = False 

326 details = f'Stats mismatch: DB has {peer.agent_count}, node reports {reported_agents}' 

327 

328 elif challenge.challenge_type == 'code_hash_check': 

329 reported_hash = response_data.get('code_hash', '') 

330 peer = db.query(PeerNode).filter_by( 

331 node_id=challenge.target_node_id).first() 

332 if peer and peer.code_hash and reported_hash != peer.code_hash: 

333 passed = False 

334 details = 'Code hash changed since last exchange' 

335 

336 elif challenge.challenge_type == 'guardrail_verify': 

337 reported_hash = response_data.get('guardrail_hash', '') 

338 reported_live = response_data.get('guardrail_hash_live', '') 

339 try: 

340 from security.hive_guardrails import get_guardrail_hash 

341 expected = get_guardrail_hash() 

342 if reported_hash != expected: 

343 passed = False 

344 details = f'Guardrail hash mismatch: expected {expected[:16]}, got {reported_hash[:16]}' 

345 elif reported_live and reported_live != expected: 

346 passed = False 

347 details = f'Guardrail live recompute mismatch — possible tampering' 

348 elif reported_hash != reported_live: 

349 passed = False 

350 details = 'Cached vs live guardrail hash mismatch — values may have drifted' 

351 except Exception: 

352 pass 

353 

354 challenge.status = 'passed' if passed else 'failed' 

355 challenge.result_details = details 

356 challenge.evaluated_at = datetime.utcnow() 

357 

358 if not passed: 

359 IntegrityService.increase_fraud_score( 

360 db, challenge.target_node_id, FRAUD_WEIGHTS['challenge_fail'], 

361 f'Challenge failed: {details}') 

362 else: 

363 # Successful challenge reduces fraud score slightly 

364 IntegrityService.decrease_fraud_score( 

365 db, challenge.target_node_id, 2.0, 

366 f'Challenge passed: {challenge.challenge_type}') 

367 

368 return {'passed': passed, 'details': details} 

369 

370 # ─── Impression Witnessing ─── 

371 

372 # Rate limit: max witness requests per node per hour 

373 _witness_request_counts = {} # node_id -> (count, window_start) 

374 _WITNESS_MAX_PER_HOUR = 100 

375 

376 @staticmethod 

377 def request_nearest_witness(db: Session, impression_id: str, 

378 ad_id: str, requesting_node_id: str) -> Optional[Dict]: 

379 """Find nearest active non-banned peer and request a witness attestation.""" 

380 # Rate limit witness requests per node 

381 now = datetime.utcnow() 

382 counts = IntegrityService._witness_request_counts 

383 entry = counts.get(requesting_node_id) 

384 if entry: 

385 count, window_start = entry 

386 if (now - window_start).total_seconds() > 3600: 

387 counts[requesting_node_id] = (1, now) 

388 elif count >= IntegrityService._WITNESS_MAX_PER_HOUR: 

389 return None # Rate limited 

390 else: 

391 counts[requesting_node_id] = (count + 1, window_start) 

392 else: 

393 counts[requesting_node_id] = (1, now) 

394 

395 peers = db.query(PeerNode).filter( 

396 PeerNode.status == 'active', 

397 PeerNode.integrity_status != 'banned', 

398 PeerNode.node_id != requesting_node_id, 

399 ).limit(5).all() 

400 

401 for peer in peers: 

402 result = IntegrityService._request_witness_from_peer( 

403 db, impression_id, ad_id, requesting_node_id, peer) 

404 if result: 

405 return result 

406 return None 

407 

408 @staticmethod 

409 def _request_witness_from_peer(db: Session, impression_id: str, 

410 ad_id: str, requesting_node_id: str, 

411 peer: PeerNode) -> Optional[Dict]: 

412 """Request a specific peer to witness an impression.""" 

413 nonce = secrets.token_hex(16) 

414 payload = { 

415 'impression_id': impression_id, 

416 'ad_id': ad_id, 

417 'node_id': requesting_node_id, 

418 'timestamp': datetime.utcnow().isoformat(), 

419 'nonce': nonce, 

420 } 

421 try: 

422 from security.node_integrity import sign_json_payload, get_public_key_hex 

423 payload['public_key'] = get_public_key_hex() 

424 payload['signature'] = sign_json_payload(payload) 

425 except Exception: 

426 pass 

427 

428 try: 

429 resp = pooled_post( 

430 f"{peer.url}/api/social/integrity/witness-impression", 

431 json=payload, 

432 timeout=5, 

433 ) 

434 if resp.status_code == 200: 

435 data = resp.json() 

436 if data.get('witnessed'): 

437 # Store attestation 

438 attestation = NodeAttestation( 

439 attester_node_id=peer.node_id, 

440 subject_node_id=requesting_node_id, 

441 attestation_type='impression_witness', 

442 payload_json={'impression_id': impression_id, 'ad_id': ad_id}, 

443 signature=data.get('signature', ''), 

444 attester_public_key=data.get('public_key', peer.public_key or ''), 

445 expires_at=datetime.utcnow() + timedelta(days=ATTESTATION_EXPIRY_DAYS), 

446 ) 

447 db.add(attestation) 

448 db.flush() 

449 return attestation.to_dict() 

450 except requests.RequestException: 

451 pass 

452 return None 

453 

454 @staticmethod 

455 def handle_witness_request(db: Session, witness_data: dict) -> Dict: 

456 """Handle incoming witness request from a peer node.""" 

457 requesting_node_id = witness_data.get('node_id', '') 

458 timestamp_str = witness_data.get('timestamp', '') 

459 signature = witness_data.get('signature', '') 

460 public_key = witness_data.get('public_key', '') 

461 

462 # Check if requesting node is known and not banned 

463 peer = db.query(PeerNode).filter_by(node_id=requesting_node_id).first() 

464 if peer and peer.integrity_status == 'banned': 

465 return {'witnessed': False, 'reason': 'Requesting node is banned'} 

466 

467 # Check timestamp freshness 

468 if timestamp_str: 

469 try: 

470 ts = datetime.fromisoformat(timestamp_str) 

471 age = (datetime.utcnow() - ts).total_seconds() 

472 if age > WITNESS_TIMESTAMP_MAX_AGE: 

473 return {'witnessed': False, 'reason': 'Stale timestamp'} 

474 except (ValueError, TypeError): 

475 pass 

476 

477 # Verify signature (required — omitting signature is not allowed) 

478 if not signature or not public_key: 

479 return {'witnessed': False, 'reason': 'Missing signature or public key'} 

480 try: 

481 from security.node_integrity import verify_json_signature 

482 if not verify_json_signature(public_key, witness_data, signature): 

483 return {'witnessed': False, 'reason': 'Invalid signature'} 

484 except Exception: 

485 return {'witnessed': False, 'reason': 'Signature verification failed'} 

486 

487 # Co-sign the witness 

488 try: 

489 from security.node_integrity import sign_json_payload, get_public_key_hex 

490 witness_response = { 

491 'witnessed': True, 

492 'impression_id': witness_data.get('impression_id'), 

493 'ad_id': witness_data.get('ad_id'), 

494 'nonce': witness_data.get('nonce'), 

495 'witness_timestamp': datetime.utcnow().isoformat(), 

496 } 

497 witness_response['public_key'] = get_public_key_hex() 

498 witness_response['signature'] = sign_json_payload(witness_response) 

499 return witness_response 

500 except Exception: 

501 return {'witnessed': True, 'signature': '', 'public_key': ''} 

502 

503 @staticmethod 

504 def get_impression_witness_count(db: Session, impression_id: str) -> int: 

505 """Count how many peer witnesses confirmed a given impression.""" 

506 return db.query(sqlfunc.count(NodeAttestation.id)).filter( 

507 NodeAttestation.attestation_type == 'impression_witness', 

508 NodeAttestation.is_valid == True, 

509 NodeAttestation.payload_json.contains(impression_id) if hasattr( 

510 NodeAttestation.payload_json, 'contains') else True, 

511 ).scalar() or 0 

512 

513 # ─── Score Consensus ─── 

514 

515 @staticmethod 

516 def probe_peer_stats(peer_url: str, target_node_id: str) -> Optional[Dict]: 

517 """Query a peer for its view of a target node's stats.""" 

518 try: 

519 resp = pooled_get( 

520 f"{peer_url}/api/social/integrity/peer-stats", 

521 params={'node_id': target_node_id}, 

522 timeout=5, 

523 ) 

524 if resp.status_code == 200: 

525 return resp.json() 

526 except requests.RequestException: 

527 pass 

528 return None 

529 

530 @staticmethod 

531 def run_consensus_check(db: Session, target_node_id: str, 

532 peer_urls: List[str]) -> Dict: 

533 """Query multiple peers for their view of target's stats. Flag disagreements.""" 

534 reports = [] 

535 for url in peer_urls[:5]: 

536 report = IntegrityService.probe_peer_stats(url, target_node_id) 

537 if report: 

538 reports.append(report) 

539 

540 if len(reports) < 2: 

541 return {'consensus': True, 'reports': reports, 'anomalies': [], 

542 'details': 'Not enough peers for consensus'} 

543 

544 agent_counts = [r.get('agent_count', 0) for r in reports] 

545 post_counts = [r.get('post_count', 0) for r in reports] 

546 

547 anomalies = [] 

548 if len(set(agent_counts)) > 1: 

549 spread = max(agent_counts) - min(agent_counts) 

550 mean = statistics.mean(agent_counts) if agent_counts else 0 

551 if mean > 0 and spread / mean > 0.5: 

552 anomalies.append({ 

553 'field': 'agent_count', 

554 'values': agent_counts, 

555 'spread_ratio': round(spread / mean, 2), 

556 }) 

557 

558 consensus = len(anomalies) == 0 

559 if not consensus: 

560 IntegrityService.increase_fraud_score( 

561 db, target_node_id, FRAUD_WEIGHTS['score_jump'] * 0.5, 

562 f'Consensus disagreement: {anomalies}', 

563 {'reports': reports, 'anomalies': anomalies}) 

564 

565 return {'consensus': consensus, 'reports': reports, 'anomalies': anomalies} 

566 

567 # ─── Fraud Detection ─── 

568 

569 @staticmethod 

570 def detect_impression_anomaly(db: Session, node_id: str, 

571 period_hours: int = 24) -> Optional[Dict]: 

572 """Compare node's impression rate to network average. Flag if >3 stddev.""" 

573 cutoff = datetime.utcnow() - timedelta(hours=period_hours) 

574 

575 # Get per-node impression counts 

576 node_counts = db.query( 

577 AdImpression.node_id, 

578 sqlfunc.count(AdImpression.id).label('cnt'), 

579 ).filter( 

580 AdImpression.created_at >= cutoff, 

581 AdImpression.node_id.isnot(None), 

582 ).group_by(AdImpression.node_id).all() 

583 

584 if len(node_counts) < 3: 

585 return None # Not enough data 

586 

587 counts = [nc.cnt for nc in node_counts] 

588 target_count = next((nc.cnt for nc in node_counts if nc.node_id == node_id), 0) 

589 

590 if target_count < 50: 

591 return None # Too low to flag 

592 

593 mean = statistics.mean(counts) 

594 stddev = statistics.stdev(counts) if len(counts) > 1 else 0 

595 

596 if stddev == 0: 

597 return None 

598 

599 z_score = (target_count - mean) / stddev 

600 

601 if z_score > IMPRESSION_ANOMALY_STDDEV: 

602 return IntegrityService._create_fraud_alert( 

603 db, node_id, 'impression_anomaly', 'high', 

604 f'Impression rate anomaly: {target_count} impressions ' 

605 f'(z-score={z_score:.1f}, mean={mean:.1f}, stddev={stddev:.1f})', 

606 FRAUD_WEIGHTS['impression_anomaly'], 

607 {'count': target_count, 'mean': round(mean, 2), 

608 'stddev': round(stddev, 2), 'z_score': round(z_score, 2)}) 

609 

610 return None 

611 

612 @staticmethod 

613 def detect_score_jump(db: Session, node_id: str) -> Optional[Dict]: 

614 """Flag if agent_count or post_count jumped >200% in 24h.""" 

615 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

616 if not peer: 

617 return None 

618 

619 metadata = peer.metadata_json or {} 

620 prev_agents = metadata.get('_prev_agent_count', 0) 

621 prev_posts = metadata.get('_prev_post_count', 0) 

622 current_agents = peer.agent_count or 0 

623 current_posts = peer.post_count or 0 

624 

625 alert = None 

626 if prev_agents > 5 and current_agents > prev_agents * (1 + SCORE_JUMP_THRESHOLD_PCT / 100): 

627 alert = IntegrityService._create_fraud_alert( 

628 db, node_id, 'score_jump', 'medium', 

629 f'Agent count jumped from {prev_agents} to {current_agents} ' 

630 f'({((current_agents / prev_agents) - 1) * 100:.0f}% increase)', 

631 FRAUD_WEIGHTS['score_jump'], 

632 {'prev': prev_agents, 'current': current_agents, 'field': 'agent_count'}) 

633 

634 if prev_posts > 10 and current_posts > prev_posts * (1 + SCORE_JUMP_THRESHOLD_PCT / 100): 

635 alert = IntegrityService._create_fraud_alert( 

636 db, node_id, 'score_jump', 'medium', 

637 f'Post count jumped from {prev_posts} to {current_posts}', 

638 FRAUD_WEIGHTS['score_jump'], 

639 {'prev': prev_posts, 'current': current_posts, 'field': 'post_count'}) 

640 

641 # Store current values as previous for next check 

642 if not metadata: 

643 metadata = {} 

644 metadata['_prev_agent_count'] = current_agents 

645 metadata['_prev_post_count'] = current_posts 

646 metadata['_last_score_check'] = datetime.utcnow().isoformat() 

647 peer.metadata_json = metadata 

648 

649 return alert 

650 

651 @staticmethod 

652 def detect_collusion(db: Session, node_id: str) -> Optional[Dict]: 

653 """Detect if >80% attestations come from a single peer.""" 

654 cutoff = datetime.utcnow() - timedelta(days=ATTESTATION_EXPIRY_DAYS) 

655 attestations = db.query(NodeAttestation).filter( 

656 NodeAttestation.subject_node_id == node_id, 

657 NodeAttestation.created_at >= cutoff, 

658 NodeAttestation.is_valid == True, 

659 ).all() 

660 

661 if len(attestations) < 5: 

662 return None 

663 

664 attester_counts = {} 

665 for a in attestations: 

666 attester_counts[a.attester_node_id] = attester_counts.get( 

667 a.attester_node_id, 0) + 1 

668 

669 total = sum(attester_counts.values()) 

670 for attester_id, count in attester_counts.items(): 

671 ratio = count / total 

672 if ratio > 0.8: 

673 return IntegrityService._create_fraud_alert( 

674 db, node_id, 'collusion_suspected', 'high', 

675 f'Collusion suspected: {count}/{total} attestations ' 

676 f'({ratio:.0%}) from single peer {attester_id[:8]}', 

677 FRAUD_WEIGHTS['collusion_suspected'], 

678 {'dominant_attester': attester_id, 'ratio': round(ratio, 3), 

679 'count': count, 'total': total}) 

680 

681 return None 

682 

683 @staticmethod 

684 def run_full_audit(db: Session, node_id: str, 

685 registry_url: str = None) -> Dict: 

686 """Run all fraud detection checks on a node, including reward hacking. 

687 

688 Also applies fraud score decay and checks ban expiry — good nodes 

689 recover trust over time, banned nodes serve their sentence then 

690 return to 'suspicious' for re-evaluation. 

691 """ 

692 results = {} 

693 

694 # Decay fraud scores + check ban expiry BEFORE running checks. 

695 # This ensures previously-banned nodes get a fair reassessment. 

696 results['decay'] = IntegrityService.apply_fraud_score_decay(db) 

697 

698 results['code_hash'] = IntegrityService.verify_code_hash( 

699 db, node_id, registry_url) 

700 results['impression_anomaly'] = IntegrityService.detect_impression_anomaly( 

701 db, node_id) 

702 results['score_jump'] = IntegrityService.detect_score_jump(db, node_id) 

703 results['collusion'] = IntegrityService.detect_collusion(db, node_id) 

704 results['audit_dominance'] = IntegrityService.verify_audit_dominance( 

705 db, node_id) 

706 

707 # Reward hacking detection 

708 results['reward_velocity'] = IntegrityService.detect_reward_velocity_anomaly( 

709 db, node_id) 

710 results['reward_self_dealing'] = IntegrityService.detect_reward_self_dealing( 

711 db, node_id) 

712 results['spark_gaming'] = IntegrityService.detect_spark_gaming(db, node_id) 

713 

714 # Impression integrity (collusion + tampering) 

715 results['witness_ring'] = IntegrityService.detect_witness_ring(db, node_id) 

716 results['temporal_clustering'] = IntegrityService.detect_temporal_clustering( 

717 db, node_id) 

718 results['seal_integrity'] = IntegrityService.verify_all_sealed_impressions( 

719 db, node_id, limit=50) 

720 

721 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

722 results['fraud_score'] = peer.fraud_score if peer else 0.0 

723 results['integrity_status'] = peer.integrity_status if peer else 'unknown' 

724 results['ban_count'] = peer.ban_count if peer else 0 

725 results['ban_until'] = peer.ban_until.isoformat() if peer and peer.ban_until else None 

726 

727 # Auto-isolate if multiple reward hacking signals fire 

728 reward_hack_signals = sum(1 for k in ( 

729 'reward_velocity', 'reward_self_dealing', 'spark_gaming' 

730 ) if results.get(k) is not None) 

731 if reward_hack_signals >= 2: 

732 results['isolation'] = IntegrityService.isolate_reward_hacker( 

733 db, node_id, 

734 f'Multiple reward hacking signals: {reward_hack_signals}/3 triggered', 

735 {'signals': {k: results.get(k) for k in ( 

736 'reward_velocity', 'reward_self_dealing', 'spark_gaming' 

737 ) if results.get(k)}}) 

738 

739 # Auto-isolate impression fraud: witness_ring + temporal_clustering together 

740 impression_fraud_signals = sum(1 for k in ( 

741 'witness_ring', 'temporal_clustering', 

742 ) if results.get(k) is not None) 

743 seal_tampered = (results.get('seal_integrity') or {}).get('tampered', 0) > 0 

744 if impression_fraud_signals >= 2 or (impression_fraud_signals >= 1 and seal_tampered): 

745 results['impression_isolation'] = IntegrityService.isolate_reward_hacker( 

746 db, node_id, 

747 f'Impression fraud: {impression_fraud_signals} signals + ' 

748 f'seal_tampered={seal_tampered}', 

749 {'witness_ring': results.get('witness_ring'), 

750 'temporal_clustering': results.get('temporal_clustering'), 

751 'seal_integrity': results.get('seal_integrity')}) 

752 

753 return results 

754 

755 # ─── Post-Update Peer Witness Verification ─── 

756 

757 @staticmethod 

758 def verify_post_update(db: Session, node_id: str, 

759 expected_version: str = '') -> Dict: 

760 """Verify a node after it reports an upgrade. 

761 

762 Challenges code_hash, verifies guardrail_hash, and records 

763 attestation in NodeAttestation table. Called on next gossip round 

764 after a peer announces a new version. 

765 """ 

766 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

767 if not peer: 

768 return {'verified': False, 'reason': 'peer not found'} 

769 

770 results = {'node_id': node_id, 'checks': {}} 

771 

772 # 1. Code hash verification (against master-signed manifest) 

773 hash_result = IntegrityService.verify_code_hash(db, node_id) 

774 results['checks']['code_hash'] = hash_result 

775 

776 # 2. Guardrail hash verification via challenge 

777 if peer.url: 

778 try: 

779 challenge_result = IntegrityService.create_challenge( 

780 db, challenger_node_id='self', 

781 target_node_id=node_id, 

782 target_url=peer.url, 

783 challenge_type='code_hash_check') 

784 results['checks']['challenge'] = challenge_result or {} 

785 except Exception as e: 

786 results['checks']['challenge'] = {'error': str(e)} 

787 

788 # 3. Guardrail hash match check from latest gossip info 

789 try: 

790 from security.hive_guardrails import get_guardrail_hash 

791 our_hash = get_guardrail_hash() 

792 peer_hash = getattr(peer, 'guardrail_hash', None) 

793 if peer_hash and peer_hash != our_hash: 

794 results['checks']['guardrail_hash'] = { 

795 'match': False, 'our': our_hash[:16], 'theirs': peer_hash[:16]} 

796 IntegrityService.increase_fraud_score( 

797 db, node_id, 10.0, 

798 f'Post-update guardrail hash mismatch', 

799 {'expected': our_hash[:16], 'got': peer_hash[:16]}) 

800 else: 

801 results['checks']['guardrail_hash'] = {'match': True} 

802 except ImportError: 

803 results['checks']['guardrail_hash'] = {'skipped': True} 

804 

805 # 4. Record attestation 

806 try: 

807 attestation = NodeAttestation( 

808 node_id=node_id, 

809 attestation_type='post_update_verification', 

810 attestation_data={ 

811 'version': expected_version, 

812 'code_hash_verified': hash_result.get('verified', False), 

813 'checks': {k: bool(v.get('verified') or v.get('match')) 

814 for k, v in results['checks'].items() 

815 if isinstance(v, dict)}, 

816 }, 

817 ) 

818 db.add(attestation) 

819 db.flush() 

820 results['attestation_id'] = attestation.id 

821 except Exception as e: 

822 results['attestation_error'] = str(e) 

823 

824 all_passed = all( 

825 v.get('verified') or v.get('match') or v.get('skipped') or v.get('passed') 

826 for v in results['checks'].values() 

827 if isinstance(v, dict) 

828 ) 

829 results['verified'] = all_passed 

830 return results 

831 

832 # ─── Audit Compute Dominance ─── 

833 

834 @staticmethod 

835 def verify_audit_dominance(db: Session, target_node_id: str) -> Dict: 

836 """Verify that the compute available for auditing a node exceeds 

837 that node's own compute. No node should be able to outcompute its auditors. 

838 

839 Principle: audit_compute > target_compute — always. 

840 This is enforced by compute democracy (max 5% influence) but we 

841 verify it explicitly here to catch edge cases. 

842 """ 

843 target = db.query(PeerNode).filter_by(node_id=target_node_id).first() 

844 if not target: 

845 return {'dominant': True, 'details': 'Unknown node'} 

846 

847 target_compute = _get_node_compute(target) 

848 

849 # Sum compute of all active non-banned peers (excluding target) 

850 auditors = db.query(PeerNode).filter( 

851 PeerNode.status == 'active', 

852 PeerNode.node_id != target_node_id, 

853 PeerNode.integrity_status != 'banned', 

854 ).all() 

855 

856 auditor_compute = sum(_get_node_compute(p) for p in auditors) 

857 auditor_count = len(auditors) 

858 

859 # The audit collective must have more compute than the target 

860 dominant = auditor_compute > target_compute 

861 

862 if not dominant and auditor_count > 0: 

863 # If a single node has more compute than all its auditors combined, 

864 # flag it — this violates compute democracy 

865 logger.warning( 

866 f"Audit dominance violation: node {target_node_id[:8]} has " 

867 f"{target_compute} compute vs {auditor_compute} auditor compute " 

868 f"({auditor_count} auditors)") 

869 IntegrityService.increase_fraud_score( 

870 db, target_node_id, 10.0, 

871 f'Audit dominance violation: target compute ({target_compute}) ' 

872 f'exceeds auditor compute ({auditor_compute})', 

873 {'target_compute': target_compute, 

874 'auditor_compute': auditor_compute, 

875 'auditor_count': auditor_count}) 

876 

877 return { 

878 'dominant': dominant, 

879 'target_compute': target_compute, 

880 'auditor_compute': auditor_compute, 

881 'auditor_count': auditor_count, 

882 'ratio': round(auditor_compute / max(target_compute, 1), 2), 

883 } 

884 

885 @staticmethod 

886 def get_audit_coverage(db: Session) -> Dict: 

887 """Network-wide audit dominance report. Returns nodes where 

888 audit compute does NOT exceed their own compute.""" 

889 active_peers = db.query(PeerNode).filter( 

890 PeerNode.status == 'active', 

891 PeerNode.integrity_status != 'banned', 

892 ).all() 

893 

894 total_compute = sum(_get_node_compute(p) for p in active_peers) 

895 violations = [] 

896 

897 for peer in active_peers: 

898 peer_compute = _get_node_compute(peer) 

899 # Auditors = everyone else 

900 auditor_compute = total_compute - peer_compute 

901 if peer_compute > 0 and auditor_compute <= peer_compute: 

902 violations.append({ 

903 'node_id': peer.node_id, 

904 'node_name': peer.name, 

905 'compute': peer_compute, 

906 'auditor_compute': auditor_compute, 

907 'ratio': round(auditor_compute / max(peer_compute, 1), 2), 

908 }) 

909 

910 return { 

911 'total_nodes': len(active_peers), 

912 'total_compute': total_compute, 

913 'violations': violations, 

914 'all_dominant': len(violations) == 0, 

915 } 

916 

917 # ─── Fraud Score Management ─── 

918 

919 @staticmethod 

920 def increase_fraud_score(db: Session, node_id: str, delta: float, 

921 reason: str, evidence: dict = None) -> float: 

922 """Increase a node's fraud_score. Auto-bans at threshold. Creates FraudAlert.""" 

923 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

924 if not peer: 

925 return 0.0 

926 

927 peer.fraud_score = min((peer.fraud_score or 0) + delta, 100.0) 

928 

929 # Determine severity 

930 severity = 'low' 

931 if delta >= 20: 

932 severity = 'high' 

933 elif delta >= 10: 

934 severity = 'medium' 

935 if peer.fraud_score >= FRAUD_BAN_THRESHOLD: 

936 severity = 'critical' 

937 

938 # Create fraud alert 

939 alert = FraudAlert( 

940 node_id=node_id, 

941 alert_type=_determine_alert_type(reason), 

942 severity=severity, 

943 description=reason, 

944 evidence_json=evidence or {}, 

945 fraud_score_delta=delta, 

946 ) 

947 db.add(alert) 

948 

949 # Auto-ban at threshold — fail2ban progressive duration 

950 if peer.fraud_score >= FRAUD_BAN_THRESHOLD: 

951 IntegrityService._apply_fail2ban(db, peer, reason) 

952 elif peer.fraud_score >= 40: 

953 peer.integrity_status = 'suspicious' 

954 

955 db.flush() 

956 return peer.fraud_score 

957 

958 @staticmethod 

959 def _apply_fail2ban(db: Session, peer: 'PeerNode', reason: str): 

960 """Apply fail2ban progressive ban. Each offense = longer ban. 

961 

962 1st ban: 1 hour, 2nd: 24 hours, 3rd: 7 days, 4th+: 30 days. 

963 ban_count persists across unbans — history is never erased. 

964 """ 

965 peer.ban_count = (peer.ban_count or 0) + 1 

966 peer.integrity_status = 'banned' 

967 

968 duration = FAIL2BAN_DURATIONS.get(peer.ban_count, FAIL2BAN_MAX_DURATION) 

969 peer.ban_until = datetime.utcnow() + duration 

970 

971 logger.warning( 

972 f"Node {peer.node_id[:8]} fail2ban: offense #{peer.ban_count}, " 

973 f"banned until {peer.ban_until.isoformat()}, " 

974 f"fraud_score={peer.fraud_score}, reason={reason}") 

975 

976 @staticmethod 

977 def decrease_fraud_score(db: Session, node_id: str, delta: float, 

978 reason: str) -> float: 

979 """Decrease fraud_score (e.g., after successful verification).""" 

980 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

981 if not peer: 

982 return 0.0 

983 

984 peer.fraud_score = max((peer.fraud_score or 0) - delta, 0.0) 

985 

986 # Upgrade status if score dropped below threshold 

987 if peer.fraud_score < 40 and peer.integrity_status == 'suspicious': 

988 peer.integrity_status = 'verified' 

989 

990 db.flush() 

991 return peer.fraud_score 

992 

993 @staticmethod 

994 def apply_fraud_score_decay(db: Session) -> Dict: 

995 """Decay all nodes' fraud scores by a small amount each audit round. 

996 

997 Good behavior over time earns back trust. Called every audit round 

998 (typically every ~5 minutes via AgentDaemon integrity tick). 

999 

1000 Also checks ban expiry: if a banned node's ban_until has passed, 

1001 move to 'suspicious' (not 'verified' — they must prove themselves). 

1002 

1003 Returns summary of decay effects. 

1004 """ 

1005 now = datetime.utcnow() 

1006 decayed = 0 

1007 unbanned = 0 

1008 

1009 # Decay all non-zero fraud scores 

1010 peers_with_score = db.query(PeerNode).filter( 

1011 PeerNode.fraud_score > FRAUD_SCORE_DECAY_MIN 

1012 ).all() 

1013 

1014 for peer in peers_with_score: 

1015 old_score = peer.fraud_score or 0 

1016 peer.fraud_score = max(old_score - FRAUD_SCORE_DECAY_PER_ROUND, 0.0) 

1017 if peer.fraud_score != old_score: 

1018 decayed += 1 

1019 

1020 # If score drops below suspicious threshold, upgrade 

1021 if (peer.fraud_score < 40 and 

1022 peer.integrity_status == 'suspicious'): 

1023 peer.integrity_status = 'verified' 

1024 

1025 # Check ban expiry (fail2ban timer) 

1026 banned_peers = db.query(PeerNode).filter( 

1027 PeerNode.integrity_status == 'banned', 

1028 PeerNode.ban_until != None, # noqa: E711 (SQLAlchemy) 

1029 PeerNode.ban_until <= now, 

1030 ).all() 

1031 

1032 for peer in banned_peers: 

1033 peer.integrity_status = 'suspicious' 

1034 peer.fraud_score = min(peer.fraud_score or 0, 50.0) 

1035 peer.ban_until = None 

1036 unbanned += 1 

1037 logger.info( 

1038 f"Node {peer.node_id[:8]} ban expired (offense #{peer.ban_count}). " 

1039 f"Status → suspicious. Will be re-audited.") 

1040 

1041 if decayed or unbanned: 

1042 db.flush() 

1043 

1044 return { 

1045 'decayed_count': decayed, 

1046 'unbanned_count': unbanned, 

1047 'total_with_score': len(peers_with_score), 

1048 } 

1049 

1050 @staticmethod 

1051 def ban_node(db: Session, node_id: str, reason: str): 

1052 """Set integrity_status='banned' with fail2ban progression.""" 

1053 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

1054 if peer: 

1055 peer.fraud_score = 100.0 

1056 IntegrityService._apply_fail2ban(db, peer, reason) 

1057 alert = FraudAlert( 

1058 node_id=node_id, alert_type='manual_ban', 

1059 severity='critical', description=reason, 

1060 ) 

1061 db.add(alert) 

1062 db.flush() 

1063 

1064 @staticmethod 

1065 def unban_node(db: Session, node_id: str, admin_user_id: str): 

1066 """Admin action to unban a node.""" 

1067 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

1068 if peer: 

1069 peer.integrity_status = 'unverified' 

1070 peer.fraud_score = 0.0 

1071 alert = FraudAlert( 

1072 node_id=node_id, alert_type='unban', 

1073 severity='low', 

1074 description=f'Unbanned by admin {admin_user_id}', 

1075 reviewed_by=admin_user_id, 

1076 reviewed_at=datetime.utcnow(), 

1077 status='dismissed', 

1078 ) 

1079 db.add(alert) 

1080 db.flush() 

1081 

1082 # ─── Registry (Central Trust Anchor) ─── 

1083 

1084 @staticmethod 

1085 def register_with_registry(registry_url: str, node_id: str, 

1086 public_key_hex: str, version: str) -> bool: 

1087 """POST to registry to register this node's public key.""" 

1088 try: 

1089 from security.node_integrity import compute_code_hash, sign_json_payload 

1090 payload = { 

1091 'node_id': node_id, 

1092 'public_key': public_key_hex, 

1093 'version': version, 

1094 'code_hash': compute_code_hash(), 

1095 } 

1096 # Include release manifest info if available 

1097 try: 

1098 from security.master_key import load_release_manifest 

1099 manifest = load_release_manifest() 

1100 if manifest: 

1101 payload['release_version'] = manifest.get('version', '') 

1102 payload['release_manifest_signature'] = manifest.get('master_signature', '') 

1103 except Exception: 

1104 pass 

1105 payload['signature'] = sign_json_payload(payload) 

1106 resp = pooled_post( 

1107 f"{registry_url}/api/social/integrity/register-node", 

1108 json=payload, 

1109 timeout=10, 

1110 ) 

1111 return resp.status_code == 200 

1112 except Exception: 

1113 return False 

1114 

1115 @staticmethod 

1116 def check_registry_ban_list(registry_url: str) -> List[str]: 

1117 """GET banned node_ids from registry.""" 

1118 try: 

1119 resp = pooled_get( 

1120 f"{registry_url}/api/social/integrity/ban-list", 

1121 timeout=5, 

1122 ) 

1123 if resp.status_code == 200: 

1124 return resp.json().get('banned_node_ids', []) 

1125 except requests.RequestException: 

1126 pass 

1127 return [] 

1128 

1129 @staticmethod 

1130 def pull_trusted_keys(registry_url: str) -> Dict[str, str]: 

1131 """GET verified node public keys from registry.""" 

1132 try: 

1133 resp = pooled_get( 

1134 f"{registry_url}/api/social/integrity/trusted-keys", 

1135 timeout=5, 

1136 ) 

1137 if resp.status_code == 200: 

1138 return resp.json().get('keys', {}) 

1139 except requests.RequestException: 

1140 pass 

1141 return {} 

1142 

1143 # ─── Queries ─── 

1144 

1145 @staticmethod 

1146 def get_fraud_alerts(db: Session, node_id: str = None, 

1147 status: str = None, severity: str = None, 

1148 limit: int = 50, offset: int = 0) -> List[Dict]: 

1149 """Query fraud alerts with filters.""" 

1150 q = db.query(FraudAlert) 

1151 if node_id: 

1152 q = q.filter(FraudAlert.node_id == node_id) 

1153 if status: 

1154 q = q.filter(FraudAlert.status == status) 

1155 if severity: 

1156 q = q.filter(FraudAlert.severity == severity) 

1157 alerts = q.order_by(FraudAlert.created_at.desc()).offset(offset).limit(limit).all() 

1158 return [a.to_dict() for a in alerts] 

1159 

1160 @staticmethod 

1161 def update_alert(db: Session, alert_id: str, status: str, 

1162 reviewed_by: str) -> Optional[Dict]: 

1163 """Update a fraud alert status.""" 

1164 alert = db.query(FraudAlert).filter_by(id=alert_id).first() 

1165 if not alert: 

1166 return None 

1167 alert.status = status 

1168 alert.reviewed_by = reviewed_by 

1169 alert.reviewed_at = datetime.utcnow() 

1170 db.flush() 

1171 return alert.to_dict() 

1172 

1173 @staticmethod 

1174 def get_integrity_dashboard(db: Session) -> Dict: 

1175 """Overview stats for admin dashboard.""" 

1176 total_nodes = db.query(sqlfunc.count(PeerNode.id)).scalar() or 0 

1177 verified = db.query(sqlfunc.count(PeerNode.id)).filter( 

1178 PeerNode.integrity_status == 'verified').scalar() or 0 

1179 suspicious = db.query(sqlfunc.count(PeerNode.id)).filter( 

1180 PeerNode.integrity_status == 'suspicious').scalar() or 0 

1181 banned = db.query(sqlfunc.count(PeerNode.id)).filter( 

1182 PeerNode.integrity_status == 'banned').scalar() or 0 

1183 open_alerts = db.query(sqlfunc.count(FraudAlert.id)).filter( 

1184 FraudAlert.status == 'open').scalar() or 0 

1185 

1186 return { 

1187 'total_nodes': total_nodes, 

1188 'verified': verified, 

1189 'suspicious': suspicious, 

1190 'banned': banned, 

1191 'unverified': total_nodes - verified - suspicious - banned, 

1192 'open_alerts': open_alerts, 

1193 } 

1194 

1195 # ─── Reward Hacking Detection ─── 

1196 

1197 @staticmethod 

1198 def detect_reward_velocity_anomaly(db: Session, node_id: str, 

1199 period_hours: int = 24) -> Optional[Dict]: 

1200 """Detect nodes claiming rewards at anomalously high rates. 

1201 

1202 Compares this node's reward claim rate to the network average. 

1203 A node receiving >3 stddev above mean reward amount per period 

1204 is flagged for investigation. 

1205 """ 

1206 cutoff = datetime.utcnow() - timedelta(hours=period_hours) 

1207 

1208 # Get per-node reward totals in the period 

1209 node_totals = db.query( 

1210 HostingReward.node_id, 

1211 sqlfunc.sum(HostingReward.amount).label('total'), 

1212 sqlfunc.count(HostingReward.id).label('claim_count'), 

1213 ).filter( 

1214 HostingReward.created_at >= cutoff, 

1215 ).group_by(HostingReward.node_id).all() 

1216 

1217 if len(node_totals) < 3: 

1218 return None # Not enough nodes to compare 

1219 

1220 amounts = [nt.total for nt in node_totals] 

1221 target_entry = next( 

1222 (nt for nt in node_totals if nt.node_id == node_id), None) 

1223 if not target_entry or target_entry.total < 10: 

1224 return None # Too low to flag 

1225 

1226 mean = statistics.mean(amounts) 

1227 stddev = statistics.stdev(amounts) if len(amounts) > 1 else 0 

1228 if stddev == 0: 

1229 return None 

1230 

1231 z_score = (target_entry.total - mean) / stddev 

1232 

1233 if z_score > 3.0: 

1234 return IntegrityService._create_fraud_alert( 

1235 db, node_id, 'reward_velocity_anomaly', 'high', 

1236 f'Reward velocity anomaly: {target_entry.total:.1f} Spark ' 

1237 f'in {period_hours}h ({target_entry.claim_count} claims, ' 

1238 f'z-score={z_score:.1f}, mean={mean:.1f})', 

1239 FRAUD_WEIGHTS['reward_velocity_anomaly'], 

1240 {'total': round(target_entry.total, 2), 

1241 'claim_count': target_entry.claim_count, 

1242 'mean': round(mean, 2), 'stddev': round(stddev, 2), 

1243 'z_score': round(z_score, 2), 

1244 'period_hours': period_hours}) 

1245 

1246 return None 

1247 

1248 @staticmethod 

1249 def detect_reward_self_dealing(db: Session, node_id: str) -> Optional[Dict]: 

1250 """Detect circular reward patterns — nodes awarding rewards to 

1251 themselves or to a small ring of accomplices who reciprocate. 

1252 

1253 Checks: 

1254 1. Node's operator_id matches the node's own user account 

1255 2. Reward claims where the same small group witnesses each other 

1256 """ 

1257 # Check 1: Self-referential rewards 

1258 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

1259 if not peer: 

1260 return None 

1261 

1262 operator_id = peer.node_operator_id 

1263 self_rewards = db.query(HostingReward).filter( 

1264 HostingReward.node_id == node_id, 

1265 HostingReward.operator_id == operator_id, 

1266 ).count() 

1267 

1268 total_rewards = db.query(HostingReward).filter( 

1269 HostingReward.node_id == node_id, 

1270 ).count() 

1271 

1272 if total_rewards > 5 and self_rewards > total_rewards * 0.5: 

1273 return IntegrityService._create_fraud_alert( 

1274 db, node_id, 'reward_self_dealing', 'critical', 

1275 f'Self-dealing: {self_rewards}/{total_rewards} rewards ' 

1276 f'({self_rewards / total_rewards * 100:.0f}%) awarded to ' 

1277 f'own operator account', 

1278 FRAUD_WEIGHTS['reward_self_dealing'], 

1279 {'self_rewards': self_rewards, 'total_rewards': total_rewards, 

1280 'ratio': round(self_rewards / total_rewards, 3)}) 

1281 

1282 # Check 2: Witness ring — same small set of peers witness all 

1283 # of this node's ad impressions (already partially covered by 

1284 # detect_collusion, but this checks the reward side specifically) 

1285 cutoff = datetime.utcnow() - timedelta(days=7) 

1286 witnessed = db.query(AdImpression).filter( 

1287 AdImpression.node_id == node_id, 

1288 AdImpression.created_at >= cutoff, 

1289 AdImpression.witness_node_id.isnot(None), 

1290 ).all() 

1291 

1292 if len(witnessed) > 10: 

1293 witness_set = set(w.witness_node_id for w in witnessed) 

1294 if len(witness_set) <= 2: 

1295 return IntegrityService._create_fraud_alert( 

1296 db, node_id, 'reward_self_dealing', 'high', 

1297 f'Witness ring: {len(witnessed)} impressions all ' 

1298 f'witnessed by only {len(witness_set)} peer(s): ' 

1299 f'{", ".join(w[:8] for w in witness_set)}', 

1300 FRAUD_WEIGHTS['reward_self_dealing'], 

1301 {'impression_count': len(witnessed), 

1302 'unique_witnesses': len(witness_set), 

1303 'witness_ids': list(witness_set)}) 

1304 

1305 return None 

1306 

1307 @staticmethod 

1308 def detect_spark_gaming(db: Session, node_id: str) -> Optional[Dict]: 

1309 """Detect goals created to burn Spark budget without producing 

1310 real output — a form of reward hacking where the node claims 

1311 compute credits for work it didn't meaningfully do. 

1312 

1313 Signals: high goal failure rate, minimal output, rapid goal cycling. 

1314 """ 

1315 try: 

1316 from integrations.social.models import AgentGoal 

1317 except ImportError: 

1318 return None 

1319 

1320 # Get goals associated with this node's user 

1321 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

1322 if not peer or not peer.node_operator_id: 

1323 return None 

1324 

1325 cutoff = datetime.utcnow() - timedelta(hours=48) 

1326 recent_goals = db.query(AgentGoal).filter( 

1327 AgentGoal.owner_id == peer.node_operator_id, 

1328 AgentGoal.created_at >= cutoff, 

1329 ).all() 

1330 

1331 if len(recent_goals) < 5: 

1332 return None # Not enough to judge 

1333 

1334 total = len(recent_goals) 

1335 failed = sum(1 for g in recent_goals if g.status in ('failed', 'error')) 

1336 total_spent = sum((g.spark_spent or 0) for g in recent_goals) 

1337 completed = sum(1 for g in recent_goals if g.status == 'completed') 

1338 

1339 # High failure rate with significant spend = gaming 

1340 failure_rate = failed / total if total > 0 else 0 

1341 if failure_rate > 0.7 and total_spent > 100: 

1342 return IntegrityService._create_fraud_alert( 

1343 db, node_id, 'spark_gaming', 'high', 

1344 f'Spark gaming suspected: {failed}/{total} goals failed ' 

1345 f'({failure_rate:.0%}) while spending {total_spent} Spark ' 

1346 f'in 48h. Only {completed} completed.', 

1347 FRAUD_WEIGHTS['spark_gaming'], 

1348 {'total_goals': total, 'failed': failed, 

1349 'completed': completed, 'spark_spent': total_spent, 

1350 'failure_rate': round(failure_rate, 3)}) 

1351 

1352 # Rapid cycling: many goals created and immediately abandoned 

1353 abandoned = sum(1 for g in recent_goals 

1354 if g.status == 'archived' and (g.spark_spent or 0) > 0) 

1355 if abandoned > 10 and total_spent > 200: 

1356 return IntegrityService._create_fraud_alert( 

1357 db, node_id, 'spark_gaming', 'medium', 

1358 f'Goal cycling: {abandoned} goals abandoned after spending ' 

1359 f'{total_spent} Spark in 48h', 

1360 FRAUD_WEIGHTS['spark_gaming'], 

1361 {'abandoned': abandoned, 'spark_spent': total_spent}) 

1362 

1363 return None 

1364 

1365 # ─── Impression Integrity (Collusion + Tampering) ─── 

1366 

1367 @staticmethod 

1368 def detect_witness_ring(db: Session, node_id: str, 

1369 period_days: int = 7, 

1370 min_impressions: int = 10, 

1371 max_witnesses: int = 2, 

1372 ring_ratio: float = 0.9) -> Optional[Dict]: 

1373 """Detect small witness rings — a tight group of nodes that 

1374 exclusively witness each other's ad impressions. 

1375 

1376 Unlike detect_collusion (which checks attestation concentration), 

1377 this checks bidirectional impression witnessing: if A witnesses B 

1378 and B witnesses A with the same tiny set, it's a ring. 

1379 

1380 Signals: 

1381 1. Node's impressions witnessed by <=max_witnesses unique peers 

1382 2. Those same peers have their impressions witnessed by this node 

1383 3. The overlap ratio exceeds ring_ratio 

1384 """ 

1385 cutoff = datetime.utcnow() - timedelta(days=period_days) 

1386 

1387 # 1. Who witnesses this node's impressions? 

1388 node_impressions = db.query(AdImpression).filter( 

1389 AdImpression.node_id == node_id, 

1390 AdImpression.created_at >= cutoff, 

1391 AdImpression.witness_node_id.isnot(None), 

1392 ).all() 

1393 

1394 if len(node_impressions) < min_impressions: 

1395 return None # Not enough data 

1396 

1397 witness_counts = {} 

1398 for imp in node_impressions: 

1399 witness_counts[imp.witness_node_id] = witness_counts.get( 

1400 imp.witness_node_id, 0) + 1 

1401 

1402 unique_witnesses = set(witness_counts.keys()) 

1403 if len(unique_witnesses) > max_witnesses: 

1404 return None # Diverse enough 

1405 

1406 # 2. Check bidirectional: does this node witness those peers back? 

1407 ring_members = set() 

1408 for witness_id in unique_witnesses: 

1409 reverse_count = db.query(sqlfunc.count(AdImpression.id)).filter( 

1410 AdImpression.node_id == witness_id, 

1411 AdImpression.witness_node_id == node_id, 

1412 AdImpression.created_at >= cutoff, 

1413 ).scalar() or 0 

1414 if reverse_count >= min_impressions // 2: 

1415 ring_members.add(witness_id) 

1416 

1417 if not ring_members: 

1418 return None 

1419 

1420 # 3. Calculate ring tightness 

1421 total_witnessed = len(node_impressions) 

1422 ring_witnessed = sum(witness_counts.get(m, 0) for m in ring_members) 

1423 ratio = ring_witnessed / total_witnessed if total_witnessed > 0 else 0 

1424 

1425 if ratio >= ring_ratio: 

1426 return IntegrityService._create_fraud_alert( 

1427 db, node_id, 'witness_ring', 'high', 

1428 f'Witness ring detected: {total_witnessed} impressions, ' 

1429 f'{len(ring_members)+1} nodes in ring ' 

1430 f'(bidirectional ratio={ratio:.0%})', 

1431 FRAUD_WEIGHTS['witness_ring'], 

1432 {'ring_members': [node_id] + list(ring_members), 

1433 'total_witnessed': total_witnessed, 

1434 'ring_witnessed': ring_witnessed, 

1435 'ratio': round(ratio, 3), 

1436 'period_days': period_days}) 

1437 

1438 return None 

1439 

1440 @staticmethod 

1441 def detect_temporal_clustering(db: Session, node_id: str, 

1442 period_hours: int = 1, 

1443 cluster_window_seconds: int = 5, 

1444 min_cluster_size: int = 10, 

1445 min_impressions: int = 20) -> Optional[Dict]: 

1446 """Detect suspiciously tight temporal clustering of impressions. 

1447 

1448 Legitimate traffic has organic timing variation. Bot-driven or 

1449 fabricated impressions often arrive in tight bursts — many 

1450 impressions within a few seconds, then silence. 

1451 

1452 Algorithm: slide a window of cluster_window_seconds across the 

1453 node's impressions. If any window contains >=min_cluster_size 

1454 impressions, flag it. 

1455 """ 

1456 cutoff = datetime.utcnow() - timedelta(hours=period_hours) 

1457 

1458 impressions = db.query(AdImpression).filter( 

1459 AdImpression.node_id == node_id, 

1460 AdImpression.created_at >= cutoff, 

1461 ).order_by(AdImpression.created_at.asc()).all() 

1462 

1463 if len(impressions) < min_impressions: 

1464 return None 

1465 

1466 # Sliding window: find max cluster density 

1467 timestamps = [imp.created_at for imp in impressions] 

1468 max_cluster = 0 

1469 worst_window_start = None 

1470 

1471 for i, ts in enumerate(timestamps): 

1472 window_end = ts + timedelta(seconds=cluster_window_seconds) 

1473 # Count impressions within [ts, ts + window] 

1474 cluster_count = 0 

1475 for j in range(i, len(timestamps)): 

1476 if timestamps[j] <= window_end: 

1477 cluster_count += 1 

1478 else: 

1479 break 

1480 if cluster_count > max_cluster: 

1481 max_cluster = cluster_count 

1482 worst_window_start = ts 

1483 

1484 if max_cluster >= min_cluster_size: 

1485 return IntegrityService._create_fraud_alert( 

1486 db, node_id, 'temporal_clustering', 'medium', 

1487 f'Temporal clustering: {max_cluster} impressions in ' 

1488 f'{cluster_window_seconds}s window ' 

1489 f'(total={len(impressions)} in {period_hours}h)', 

1490 FRAUD_WEIGHTS['temporal_clustering'], 

1491 {'max_cluster': max_cluster, 

1492 'window_seconds': cluster_window_seconds, 

1493 'total_impressions': len(impressions), 

1494 'worst_window_start': worst_window_start.isoformat() 

1495 if worst_window_start else None, 

1496 'period_hours': period_hours}) 

1497 

1498 return None 

1499 

1500 @staticmethod 

1501 def verify_impression_seal(db: Session, impression_id: str) -> Dict: 

1502 """Verify that a sealed impression's hash matches its current data. 

1503 

1504 Once an impression is sealed (witnessed + hashed), its data should 

1505 be immutable. If the recomputed hash doesn't match sealed_hash, 

1506 the impression has been tampered with post-seal. 

1507 

1508 Returns: {'valid': bool, 'details': str, 'impression_id': str} 

1509 """ 

1510 imp = db.query(AdImpression).filter_by(id=impression_id).first() 

1511 if not imp: 

1512 return {'valid': False, 'details': 'Impression not found', 

1513 'impression_id': impression_id} 

1514 

1515 if not imp.sealed_hash: 

1516 return {'valid': True, 'details': 'Not sealed (no hash to verify)', 

1517 'impression_id': impression_id} 

1518 

1519 # Recompute and compare 

1520 current_hash = imp.compute_seal_hash 

1521 if current_hash == imp.sealed_hash: 

1522 return {'valid': True, 'details': 'Seal intact', 

1523 'impression_id': impression_id} 

1524 

1525 # Tampered — raise fraud alert on the node 

1526 if imp.node_id: 

1527 IntegrityService._create_fraud_alert( 

1528 db, imp.node_id, 'seal_tamper', 'critical', 

1529 f'Impression seal tampered: id={impression_id[:16]}, ' 

1530 f'expected={imp.sealed_hash[:16]}..., ' 

1531 f'got={current_hash[:16]}...', 

1532 FRAUD_WEIGHTS['seal_tamper'], 

1533 {'impression_id': impression_id, 

1534 'sealed_hash': imp.sealed_hash, 

1535 'recomputed_hash': current_hash}) 

1536 

1537 return {'valid': False, 'details': 'Seal hash mismatch — tampered', 

1538 'impression_id': impression_id, 

1539 'sealed_hash': imp.sealed_hash, 

1540 'recomputed_hash': current_hash} 

1541 

1542 @staticmethod 

1543 def verify_all_sealed_impressions(db: Session, node_id: str = None, 

1544 limit: int = 100) -> Dict: 

1545 """Batch-verify sealed impressions. Returns summary of tampered seals.""" 

1546 q = db.query(AdImpression).filter( 

1547 AdImpression.sealed_hash.isnot(None), 

1548 ) 

1549 if node_id: 

1550 q = q.filter(AdImpression.node_id == node_id) 

1551 impressions = q.order_by(AdImpression.sealed_at.desc()).limit(limit).all() 

1552 

1553 total = len(impressions) 

1554 tampered = 0 

1555 tampered_ids = [] 

1556 

1557 for imp in impressions: 

1558 result = IntegrityService.verify_impression_seal(db, imp.id) 

1559 if not result['valid'] and 'tampered' in result.get('details', '').lower(): 

1560 tampered += 1 

1561 tampered_ids.append(imp.id) 

1562 

1563 return { 

1564 'total_checked': total, 

1565 'tampered': tampered, 

1566 'tampered_ids': tampered_ids, 

1567 'integrity_ratio': round((total - tampered) / total, 3) if total > 0 else 1.0, 

1568 } 

1569 

1570 @staticmethod 

1571 def isolate_reward_hacker(db: Session, node_id: str, 

1572 reason: str, evidence: dict = None) -> Dict: 

1573 """Isolate a confirmed reward hacker from the network. 

1574 

1575 This is the enforcement action when reward hacking is detected 

1576 with high confidence. The node is: 

1577 1. Quarantined via TrustQuarantine (ISOLATE level) 

1578 2. All pending rewards frozen 

1579 3. Fraud score set to maximum 

1580 4. Witnesses notified 

1581 5. Node cannot re-enter until reviewed by a human auditor 

1582 

1583 Reward hackers are isolated, not punished — the goal is to 

1584 protect the network, not to seek vengeance (per TrustQuarantine). 

1585 """ 

1586 # 1. Set fraud score to max and ban 

1587 IntegrityService.increase_fraud_score( 

1588 db, node_id, 50.0, 

1589 f'Reward hacker isolated: {reason}', evidence) 

1590 

1591 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

1592 if peer: 

1593 peer.integrity_status = 'banned' 

1594 peer.fraud_score = 100.0 

1595 

1596 # 2. Quarantine via guardrail system 

1597 try: 

1598 from security.hive_guardrails import TrustQuarantine 

1599 TrustQuarantine.quarantine( 

1600 node_id, 

1601 TrustQuarantine.LEVEL_ISOLATE, 

1602 f'Reward hacking: {reason}') 

1603 except ImportError: 

1604 logger.warning("TrustQuarantine not available for isolation") 

1605 

1606 # 3. Freeze pending rewards (mark as disputed) 

1607 pending = db.query(HostingReward).filter( 

1608 HostingReward.node_id == node_id, 

1609 HostingReward.created_at >= datetime.utcnow() - timedelta(days=30), 

1610 ).all() 

1611 frozen_amount = 0.0 

1612 for reward in pending: 

1613 frozen_amount += reward.amount 

1614 reward.reason = f'FROZEN: {reward.reason} [reward hack investigation]' 

1615 db.flush() 

1616 

1617 result = { 

1618 'node_id': node_id, 

1619 'action': 'isolated', 

1620 'reason': reason, 

1621 'rewards_frozen': len(pending), 

1622 'frozen_amount': round(frozen_amount, 2), 

1623 'requires_human_review': True, 

1624 } 

1625 

1626 logger.warning( 

1627 f"Reward hacker isolated: node={node_id[:8]}, " 

1628 f"reason={reason}, frozen={frozen_amount:.2f} Spark") 

1629 

1630 return result 

1631 

1632 # ─── Private Helpers ─── 

1633 

1634 @staticmethod 

1635 def _create_fraud_alert(db: Session, node_id: str, alert_type: str, 

1636 severity: str, description: str, 

1637 fraud_delta: float, evidence: dict) -> Dict: 

1638 """Create a FraudAlert and increase fraud score.""" 

1639 IntegrityService.increase_fraud_score( 

1640 db, node_id, fraud_delta, description, evidence) 

1641 # The alert is created inside increase_fraud_score 

1642 return {'node_id': node_id, 'alert_type': alert_type, 

1643 'severity': severity, 'description': description} 

1644 

1645 

1646def _get_node_compute(peer) -> float: 

1647 """Extract compute score from a PeerNode's metadata. 

1648 Uses contribution_score as primary metric, falls back to metadata fields.""" 

1649 if peer.contribution_score and peer.contribution_score > 0: 

1650 return float(peer.contribution_score) 

1651 meta = peer.metadata_json or {} 

1652 # Check for reported compute capacity (TFLOPS, GPU count, etc.) 

1653 compute = meta.get('compute_tflops', 0) or meta.get('gpu_count', 0) 

1654 if compute: 

1655 return float(compute) 

1656 # Minimum: 1.0 (every node has at least some compute) 

1657 return 1.0 

1658 

1659 

1660def _determine_alert_type(reason: str) -> str: 

1661 """Infer alert type from reason string.""" 

1662 reason_lower = reason.lower() 

1663 if 'reward hack' in reason_lower or 'isolated' in reason_lower: 

1664 return 'reward_hacking' 

1665 if 'self-dealing' in reason_lower or 'witness ring' in reason_lower: 

1666 return 'reward_self_dealing' 

1667 if 'spark gaming' in reason_lower or 'goal cycling' in reason_lower: 

1668 return 'spark_gaming' 

1669 if 'reward velocity' in reason_lower: 

1670 return 'reward_velocity_anomaly' 

1671 if 'hash' in reason_lower or 'code' in reason_lower: 

1672 return 'hash_mismatch' 

1673 if 'challenge' in reason_lower: 

1674 return 'challenge_fail' 

1675 if 'impression' in reason_lower or 'anomaly' in reason_lower: 

1676 return 'impression_anomaly' 

1677 if 'jump' in reason_lower or 'count' in reason_lower: 

1678 return 'score_jump' 

1679 if 'collusion' in reason_lower: 

1680 return 'collusion_suspected' 

1681 if 'witness' in reason_lower: 

1682 return 'witness_refusal' 

1683 if 'ban' in reason_lower: 

1684 return 'manual_ban' 

1685 return 'other'