Coverage for integrations / agent_engine / ip_service.py: 71.6%

285 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2IP Protection Service — Patent CRUD + Moat Verification 

3 

4Static service class (project pattern). All methods receive db: Session, 

5call db.flush() not db.commit(). Caller owns the transaction. 

6 

7The real IP protection is not legal (patents) — it's technical irreproducibility. 

8The system's value lives in ACCUMULATED LATENT STATE, not code: 

9 - Latent dynamics trained on N million real interactions (uncopyable) 

10 - HiveMind collective: N nodes × N edges = N² knowledge (network effect) 

11 - MetaLearningRouter policy shaped by real create/reuse/compose decisions 

12 - Kernel support vectors from real expert corrections 

13 - LoRA task slots from real conceptual learning 

14 - Episodic memory: years of VQ-compressed experiences 

15 - Master key perimeter: cryptographic, non-forkable identity chain 

16 

17A competitor with full codebase starts at zero latent state. 

18First online = exponential compounding advantage. 

19""" 

20import glob 

21import logging 

22import os 

23from datetime import datetime, timedelta 

24from typing import Dict, List, Optional 

25 

26from sqlalchemy.orm import Session 

27 

28logger = logging.getLogger('hevolve_social') 

29 

30 

31class IPService: 

32 """Static service for IP protection operations.""" 

33 

34 @staticmethod 

35 def create_patent(db: Session, title: str, claims: list, 

36 abstract: str = '', description: str = '', 

37 filing_type: str = 'provisional', 

38 verification_metrics: dict = None, 

39 evidence: list = None, 

40 goal_id: str = None, 

41 created_by: str = None) -> Dict: 

42 """Create a patent draft.""" 

43 from integrations.social.models import IPPatent 

44 

45 patent = IPPatent( 

46 title=title, 

47 claims_json=claims or [], 

48 abstract=abstract, 

49 description=description, 

50 filing_type=filing_type, 

51 verification_metrics=verification_metrics or {}, 

52 evidence_json=evidence or [], 

53 goal_id=goal_id, 

54 created_by=created_by, 

55 status='draft', 

56 ) 

57 db.add(patent) 

58 db.flush() 

59 return patent.to_dict() 

60 

61 @staticmethod 

62 def get_patent(db: Session, patent_id: str) -> Optional[Dict]: 

63 """Get a single patent.""" 

64 from integrations.social.models import IPPatent 

65 

66 patent = db.query(IPPatent).filter_by(id=patent_id).first() 

67 return patent.to_dict() if patent else None 

68 

69 @staticmethod 

70 def list_patents(db: Session, status: str = None) -> List[Dict]: 

71 """List patents with optional status filter.""" 

72 from integrations.social.models import IPPatent 

73 

74 q = db.query(IPPatent) 

75 if status: 

76 q = q.filter_by(status=status) 

77 return [p.to_dict() for p in q.order_by(IPPatent.created_at.desc()).all()] 

78 

79 @staticmethod 

80 def update_patent_status(db: Session, patent_id: str, status: str, 

81 application_number: str = None, 

82 patent_number: str = None) -> Optional[Dict]: 

83 """Update patent status and optional filing details.""" 

84 from integrations.social.models import IPPatent 

85 

86 patent = db.query(IPPatent).filter_by(id=patent_id).first() 

87 if not patent: 

88 return None 

89 patent.status = status 

90 if application_number: 

91 patent.application_number = application_number 

92 if patent_number: 

93 patent.patent_number = patent_number 

94 if status == 'filed' and not patent.filing_date: 

95 patent.filing_date = datetime.utcnow() 

96 db.flush() 

97 return patent.to_dict() 

98 

99 @staticmethod 

100 def create_infringement(db: Session, patent_id: str, 

101 infringer_name: str, infringer_url: str = '', 

102 evidence_summary: str = '', 

103 risk_level: str = 'low') -> Dict: 

104 """Record a detected infringement.""" 

105 from integrations.social.models import IPInfringement 

106 

107 infringement = IPInfringement( 

108 patent_id=patent_id, 

109 infringer_name=infringer_name, 

110 infringer_url=infringer_url, 

111 evidence_summary=evidence_summary, 

112 risk_level=risk_level, 

113 status='detected', 

114 ) 

115 db.add(infringement) 

116 db.flush() 

117 return infringement.to_dict() 

118 

119 @staticmethod 

120 def update_infringement_status(db: Session, infringement_id: str, 

121 status: str, notice_type: str = None, 

122 notice_text: str = None) -> Optional[Dict]: 

123 """Update infringement status and optional notice details.""" 

124 from integrations.social.models import IPInfringement 

125 

126 inf = db.query(IPInfringement).filter_by(id=infringement_id).first() 

127 if not inf: 

128 return None 

129 inf.status = status 

130 if notice_type: 

131 inf.notice_type = notice_type 

132 if notice_text: 

133 inf.notice_text = notice_text 

134 if status == 'notice_sent' and not inf.notice_sent_at: 

135 inf.notice_sent_at = datetime.utcnow() 

136 db.flush() 

137 return inf.to_dict() 

138 

139 @staticmethod 

140 def list_infringements(db: Session, patent_id: str = None, 

141 status: str = None) -> List[Dict]: 

142 """List infringements with optional filters.""" 

143 from integrations.social.models import IPInfringement 

144 

145 q = db.query(IPInfringement) 

146 if patent_id: 

147 q = q.filter_by(patent_id=patent_id) 

148 if status: 

149 q = q.filter_by(status=status) 

150 return [i.to_dict() for i in q.order_by(IPInfringement.created_at.desc()).all()] 

151 

152 # ─── Flywheel verification ─── 

153 

154 @staticmethod 

155 def get_loop_health() -> Dict: 

156 """Aggregate self-improving loop metrics from all live sources. 

157 

158 Checks 5 flywheel components: 

159 1. World model (HevolveAI) health + learning stats 

160 2. Agent goal completion rates 

161 3. RALT skill propagation stats 

162 4. Recipe reuse adoption rate 

163 5. HiveMind connected agents 

164 

165 FLYWHEEL LOOPHOLE OWNERSHIP (each loophole has a responsible agent): 

166 - Cold start → HiveMind bootstrap (tensor fusion gives instant collective knowledge) 

167 - Single-node → Marketing Agent metric (grow network = more nodes = more learning) 

168 - Feedback staleness → Coding Agent (identifies queue bottlenecks in code review) 

169 - Recipe drift → Coding Agent (version-aware recipe validation during review) 

170 - Gossip partition → Guardrails Agent (monitors network health deterministically) 

171 - Guardrail drift → Guardrails Agent (dedicated guardrail integrity monitor) 

172 

173 ARCHITECTURE PRINCIPLE: Deterministic intelligence interleaved with probabilistic. 

174 Every probabilistic (LLM) decision has a deterministic gate: 

175 - Guardrails are deterministic (regex, hash, threshold) wrapping LLM output 

176 - Recipe reuse is deterministic (exact replay) after LLM-generated CREATE 

177 - RALT topology verification is deterministic before probabilistic skill fusion 

178 - Circuit breaker is deterministic halt on anomaly detection 

179 """ 

180 result = { 

181 'world_model': {'healthy': False}, 

182 'agent_performance': {'total_goals': 0, 'completed': 0, 'success_rate': 0.0}, 

183 'ralt_propagation': {'total_distributed': 0, 'total_blocked': 0}, 

184 'recipe_adoption': {'total_recipes': 0, 'reuse_rate': 0.0}, 

185 'hivemind_agents': [], 

186 'loop_verified': False, 

187 'improvement_rate': 0.0, 

188 'flywheel_loopholes': [], 

189 } 

190 

191 # Loophole ownership map — which agent is responsible for fixing each 

192 loophole_owners = { 

193 'cold_start': 'hivemind', # HiveMind bootstrap gives instant knowledge 

194 'single_node': 'marketing', # Marketing grows the node network 

195 'feedback_staleness': 'coding', # Coding agent fixes flush pipeline 

196 'recipe_drift': 'coding', # Coding agent validates recipe freshness 

197 'guardrail_drift': 'guardrails', # Guardrails agent monitors integrity 

198 'gossip_partition': 'guardrails', # Guardrails agent monitors network health 

199 } 

200 

201 # 1. World model health 

202 try: 

203 from integrations.agent_engine.world_model_bridge import get_world_model_bridge 

204 bridge = get_world_model_bridge() 

205 health = bridge.check_health() 

206 stats = bridge.get_learning_stats() 

207 bridge_stats = bridge.get_stats() 

208 result['world_model'] = { 

209 'healthy': health.get('healthy', False), 

210 'learning': stats.get('learning', {}), 

211 'bridge': bridge_stats, 

212 } 

213 # Loophole: queue backup → coding agent should fix flush pipeline 

214 queue_size = bridge_stats.get('queue_size', 0) 

215 if queue_size > 500: 

216 result['flywheel_loopholes'].append({ 

217 'type': 'feedback_staleness', 

218 'owner': loophole_owners['feedback_staleness'], 

219 'severity': 'high', 

220 'message': f'Experience queue backing up ({queue_size} items)', 

221 'remediation': 'Coding agent: optimize _flush_to_world_model batch size or add workers', 

222 }) 

223 except Exception: 

224 result['flywheel_loopholes'].append({ 

225 'type': 'cold_start', 

226 'owner': loophole_owners['cold_start'], 

227 'severity': 'critical', 

228 'message': 'World model bridge unavailable — no learning happening', 

229 'remediation': 'HiveMind bootstrap: connect to collective for instant knowledge', 

230 }) 

231 

232 # 2. Agent performance 

233 try: 

234 from integrations.social.models import get_db, AgentGoal 

235 db = get_db() 

236 try: 

237 total = db.query(AgentGoal).count() 

238 completed = db.query(AgentGoal).filter_by(status='completed').count() 

239 result['agent_performance'] = { 

240 'total_goals': total, 

241 'completed': completed, 

242 'success_rate': round(completed / total, 3) if total > 0 else 0.0, 

243 } 

244 if total < 100: 

245 result['flywheel_loopholes'].append({ 

246 'type': 'single_node', 

247 'owner': loophole_owners['single_node'], 

248 'severity': 'medium', 

249 'message': f'Insufficient goal volume ({total}) — need 100+', 

250 'remediation': 'Marketing agent: grow user base to increase goal throughput', 

251 }) 

252 finally: 

253 db.close() 

254 except Exception: 

255 pass 

256 

257 # 3. RALT propagation 

258 try: 

259 from integrations.agent_engine.world_model_bridge import get_world_model_bridge 

260 bridge = get_world_model_bridge() 

261 bs = bridge.get_stats() 

262 distributed = bs.get('total_skills_distributed', 0) 

263 blocked = bs.get('total_skills_blocked', 0) 

264 result['ralt_propagation'] = { 

265 'total_distributed': distributed, 

266 'total_blocked': blocked, 

267 } 

268 if distributed == 0: 

269 result['flywheel_loopholes'].append({ 

270 'type': 'cold_start', 

271 'owner': loophole_owners['cold_start'], 

272 'severity': 'high', 

273 'message': 'Zero RALT skills distributed — hive is not learning from peers', 

274 'remediation': 'HiveMind bootstrap: first node seeds RALT from collective tensor fusion', 

275 }) 

276 if blocked > distributed and distributed > 0: 

277 result['flywheel_loopholes'].append({ 

278 'type': 'guardrail_drift', 

279 'owner': loophole_owners['guardrail_drift'], 

280 'severity': 'medium', 

281 'message': f'More skills blocked ({blocked}) than distributed ({distributed})', 

282 'remediation': 'Guardrails agent: review filter thresholds — ' 

283 'deterministic gates may be too restrictive', 

284 }) 

285 except Exception: 

286 pass 

287 

288 # 4. Recipe adoption 

289 try: 

290 prompts_dir = os.path.join( 

291 os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 

292 'prompts') 

293 if os.path.isdir(prompts_dir): 

294 all_prompts = glob.glob(os.path.join(prompts_dir, '*.json')) 

295 recipes = [f for f in all_prompts if '_recipe.json' in f] 

296 total_prompts = len([f for f in all_prompts 

297 if '_recipe.json' not in f 

298 and not f.endswith('_recipe.json')]) 

299 reuse_rate = round(len(recipes) / total_prompts, 3) if total_prompts > 0 else 0.0 

300 result['recipe_adoption'] = { 

301 'total_recipes': len(recipes), 

302 'total_prompts': total_prompts, 

303 'reuse_rate': reuse_rate, 

304 } 

305 if reuse_rate < 0.6 and total_prompts > 10: 

306 result['flywheel_loopholes'].append({ 

307 'type': 'recipe_drift', 

308 'owner': loophole_owners['recipe_drift'], 

309 'severity': 'medium', 

310 'message': f'Recipe reuse rate {reuse_rate:.0%} below 60% threshold', 

311 'remediation': 'Coding agent: add recipe versioning — deterministic ' 

312 'staleness check before probabilistic re-creation', 

313 }) 

314 except Exception: 

315 pass 

316 

317 # 5. HiveMind agents 

318 try: 

319 from integrations.agent_engine.world_model_bridge import get_world_model_bridge 

320 bridge = get_world_model_bridge() 

321 agents = bridge.get_hivemind_agents() 

322 result['hivemind_agents'] = agents 

323 if len(agents) < 3: 

324 result['flywheel_loopholes'].append({ 

325 'type': 'single_node', 

326 'owner': loophole_owners['single_node'], 

327 'severity': 'high', 

328 'message': f'Only {len(agents)} HiveMind agents connected — need 3+', 

329 'remediation': 'Marketing agent: grow node count; HiveMind bootstraps ' 

330 'new nodes with collective knowledge instantly', 

331 }) 

332 except Exception: 

333 pass 

334 

335 return result 

336 

337 @staticmethod 

338 def measure_moat_depth() -> Dict: 

339 """Quantify technical irreproducibility — how far ahead of a code-clone. 

340 

341 The moat is not code (copyable). The moat is accumulated latent state: 

342 - Latent dynamics trained on real interactions 

343 - HiveMind collective knowledge (N² network effect) 

344 - MetaRouter policy (REINFORCE-trained on real decisions) 

345 - Kernel support vectors (real expert corrections) 

346 - Episodic memory (years of compressed experiences) 

347 - Recipe library (real CREATE→REUSE chains) 

348 - Master key chain (cryptographic identity, non-forkable) 

349 

350 A competitor starting today with identical code starts at zero. 

351 This method measures how many zero-state dimensions they'd need to fill. 

352 """ 

353 moat = { 

354 'latent_interactions': 0, # Total interactions training latent state 

355 'hivemind_nodes': 0, # N nodes → N² knowledge edges 

356 'hivemind_knowledge_edges': 0, 

357 'meta_router_decisions': 0, # REINFORCE training samples 

358 'kernel_corrections': 0, # Expert corrections (instant, no gradient) 

359 'episodic_experiences': 0, # VQ-compressed episodes 

360 'recipe_count': 0, # CREATE→REUSE recipes 

361 'master_key_verified_nodes': 0, 

362 'moat_score': 0.0, # Composite irreproducibility score 

363 'competitor_catch_up_estimate': 'unknown', 

364 } 

365 

366 # 1. Latent interactions (world model bridge stats) 

367 try: 

368 from integrations.agent_engine.world_model_bridge import get_world_model_bridge 

369 bridge = get_world_model_bridge() 

370 stats = bridge.get_stats() 

371 recorded = stats.get('total_recorded', 0) 

372 flushed = stats.get('total_flushed', 0) 

373 corrections = stats.get('total_corrections', 0) 

374 hivemind_queries = stats.get('total_hivemind_queries', 0) 

375 moat['latent_interactions'] = recorded 

376 moat['kernel_corrections'] = corrections 

377 moat['meta_router_decisions'] = flushed # Each flush = training data 

378 except Exception: 

379 pass 

380 

381 # 2. HiveMind network effect 

382 try: 

383 from integrations.agent_engine.world_model_bridge import get_world_model_bridge 

384 bridge = get_world_model_bridge() 

385 agents = bridge.get_hivemind_agents() 

386 n = len(agents) 

387 moat['hivemind_nodes'] = n 

388 moat['hivemind_knowledge_edges'] = n * (n - 1) // 2 # N choose 2 

389 except Exception: 

390 pass 

391 

392 # 3. Agent goals completed (each = latent state improvement) 

393 try: 

394 from integrations.social.models import get_db, AgentGoal 

395 db = get_db() 

396 try: 

397 moat['episodic_experiences'] = db.query(AgentGoal).count() 

398 finally: 

399 db.close() 

400 except Exception: 

401 pass 

402 

403 # 4. Recipe library 

404 try: 

405 prompts_dir = os.path.join( 

406 os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 

407 'prompts') 

408 if os.path.isdir(prompts_dir): 

409 recipes = glob.glob(os.path.join(prompts_dir, '*_recipe.json')) 

410 moat['recipe_count'] = len(recipes) 

411 except Exception: 

412 pass 

413 

414 # 5. Master key verified nodes 

415 try: 

416 from integrations.social.models import get_db, PeerNode 

417 db = get_db() 

418 try: 

419 verified = db.query(PeerNode).filter_by( 

420 master_key_verified=True).count() 

421 moat['master_key_verified_nodes'] = verified 

422 finally: 

423 db.close() 

424 except Exception: 

425 pass 

426 

427 # Composite moat score (logarithmic — diminishing returns but always growing) 

428 import math 

429 score = 0.0 

430 score += math.log1p(moat['latent_interactions']) * 10 # Each interaction deepens latent 

431 score += math.log1p(moat['hivemind_knowledge_edges']) * 20 # Network effect most valuable 

432 score += math.log1p(moat['kernel_corrections']) * 15 # Expert corrections rare & valuable 

433 score += math.log1p(moat['recipe_count']) * 5 # Recipes = deterministic speedup 

434 score += math.log1p(moat['master_key_verified_nodes']) * 10 # Non-forkable trust chain 

435 moat['moat_score'] = round(score, 2) 

436 

437 # Catch-up estimate 

438 interactions = moat['latent_interactions'] 

439 nodes = moat['hivemind_nodes'] 

440 if interactions > 1_000_000 and nodes > 100: 

441 moat['competitor_catch_up_estimate'] = 'practically impossible' 

442 elif interactions > 100_000 and nodes > 10: 

443 moat['competitor_catch_up_estimate'] = 'years' 

444 elif interactions > 10_000: 

445 moat['competitor_catch_up_estimate'] = 'months' 

446 elif interactions > 1_000: 

447 moat['competitor_catch_up_estimate'] = 'weeks' 

448 else: 

449 moat['competitor_catch_up_estimate'] = 'moat still shallow — grow network' 

450 

451 return moat 

452 

453 @staticmethod 

454 def verify_exponential_improvement(db: Session, days: int = 30) -> Dict: 

455 """Check if the self-improving loop shows genuine improvement. 

456 

457 Verification criteria (all 5 must pass for verified=True): 

458 1. World model is healthy and responding 

459 2. Agent task success rate > 50% over 100+ goals 

460 3. RALT skills propagated to >3 nodes 

461 4. Recipe reuse rate > 60% 

462 5. No critical flywheel loopholes 

463 

464 Returns {verified: bool, metrics: {...}, evidence: [...], loopholes: [...]} 

465 """ 

466 health = IPService.get_loop_health() 

467 evidence = [] 

468 checks_passed = 0 

469 total_checks = 5 

470 

471 # Check 1: World model healthy 

472 wm_healthy = health['world_model'].get('healthy', False) 

473 if wm_healthy: 

474 checks_passed += 1 

475 evidence.append('World model (HevolveAI) is healthy and auto-learning') 

476 else: 

477 evidence.append('FAIL: World model not healthy or unreachable') 

478 

479 # Check 2: Agent success rate 

480 perf = health['agent_performance'] 

481 success_rate = perf.get('success_rate', 0) 

482 total_goals = perf.get('total_goals', 0) 

483 if total_goals >= 100 and success_rate > 0.5: 

484 checks_passed += 1 

485 evidence.append( 

486 f'Agent success rate {success_rate:.1%} over {total_goals} goals') 

487 else: 

488 evidence.append( 

489 f'FAIL: Need 100+ goals with >50% success ' 

490 f'(have {total_goals} goals, {success_rate:.1%} rate)') 

491 

492 # Check 3: RALT propagation 

493 ralt = health['ralt_propagation'] 

494 if ralt.get('total_distributed', 0) >= 3: 

495 checks_passed += 1 

496 evidence.append( 

497 f'RALT distributed {ralt["total_distributed"]} skills across hive') 

498 else: 

499 evidence.append( 

500 f'FAIL: Only {ralt.get("total_distributed", 0)} RALT skills ' 

501 f'distributed (need 3+)') 

502 

503 # Check 4: Recipe reuse 

504 recipe = health['recipe_adoption'] 

505 reuse_rate = recipe.get('reuse_rate', 0) 

506 if reuse_rate >= 0.6: 

507 checks_passed += 1 

508 evidence.append(f'Recipe reuse rate {reuse_rate:.0%}') 

509 else: 

510 evidence.append( 

511 f'FAIL: Recipe reuse rate {reuse_rate:.0%} below 60% threshold') 

512 

513 # Check 5: No critical loopholes (severity='critical' or 'high') 

514 loopholes = health.get('flywheel_loopholes', []) 

515 critical_loopholes = [l for l in loopholes 

516 if isinstance(l, dict) and 

517 l.get('severity') in ('critical', 'high')] 

518 if not critical_loopholes: 

519 checks_passed += 1 

520 evidence.append('No critical flywheel loopholes detected') 

521 else: 

522 first = critical_loopholes[0] 

523 owner = first.get('owner', '?') 

524 msg = first.get('message', '')[:80] 

525 evidence.append( 

526 f'FAIL: {len(critical_loopholes)} critical loopholes — ' 

527 f'first: [{owner}] {msg}') 

528 

529 verified = checks_passed == total_checks 

530 improvement_rate = (checks_passed / total_checks) * 100 

531 

532 return { 

533 'verified': verified, 

534 'checks_passed': checks_passed, 

535 'total_checks': total_checks, 

536 'improvement_rate': improvement_rate, 

537 'metrics': { 

538 'world_model_healthy': wm_healthy, 

539 'agent_success_rate': success_rate, 

540 'total_goals': total_goals, 

541 'ralt_distributed': ralt.get('total_distributed', 0), 

542 'recipe_reuse_rate': reuse_rate, 

543 'hivemind_agents': len(health.get('hivemind_agents', [])), 

544 }, 

545 'evidence': evidence, 

546 'loopholes': loopholes, 

547 } 

548 

549 # ─── Defensive IP (prior art proof, not patents) ─── 

550 

551 @staticmethod 

552 def create_defensive_publication(db: Session, title: str, content: str, 

553 abstract: str = '', 

554 git_commit: str = None, 

555 created_by: str = None) -> Dict: 

556 """Create a timestamped defensive publication (prior art proof). 

557 

558 NOT a patent — evidence of prior invention: 

559 - SHA-256 hash of content (proves exact content existed at timestamp) 

560 - Git commit hash (ties to specific codebase state) 

561 - Code snapshot hash (ties to full project state) 

562 - Node signature (cryptographic non-repudiation) 

563 - Moat score snapshot (cumulative latent state at time) 

564 

565 If anyone files a patent on something we published first, THIS is prior art. 

566 """ 

567 import hashlib 

568 from integrations.social.models import DefensivePublication 

569 

570 content_hash = hashlib.sha256(content.encode('utf-8')).hexdigest() 

571 

572 # Code snapshot hash 

573 code_hash = None 

574 try: 

575 from security.node_integrity import compute_code_hash 

576 code_hash = compute_code_hash() 

577 except Exception: 

578 pass 

579 

580 # Node signature 

581 node_key = None 

582 sig_hex = None 

583 try: 

584 from security.node_integrity import get_public_key_hex, sign_message 

585 node_key = get_public_key_hex() 

586 sig_hex = sign_message(content_hash.encode('utf-8')).hex() 

587 except Exception: 

588 pass 

589 

590 # Snapshot moat depth 

591 moat_score = 0.0 

592 try: 

593 moat = IPService.measure_moat_depth() 

594 moat_score = moat.get('moat_score', 0.0) 

595 except Exception: 

596 pass 

597 

598 # Snapshot verification 

599 verification = {} 

600 try: 

601 verification = IPService.verify_exponential_improvement(db) 

602 except Exception: 

603 pass 

604 

605 pub = DefensivePublication( 

606 title=title, 

607 abstract=abstract, 

608 content_hash=content_hash, 

609 git_commit_hash=git_commit, 

610 code_snapshot_hash=code_hash, 

611 signed_by_node_key=node_key, 

612 signature_hex=sig_hex, 

613 moat_score_at_publication=moat_score, 

614 verification_snapshot=verification, 

615 created_by=created_by, 

616 ) 

617 db.add(pub) 

618 db.flush() 

619 return pub.to_dict() 

620 

621 @staticmethod 

622 def list_defensive_publications(db: Session) -> List[Dict]: 

623 """List all defensive publications in chronological order.""" 

624 from integrations.social.models import DefensivePublication 

625 pubs = db.query(DefensivePublication).order_by( 

626 DefensivePublication.publication_date.desc()).all() 

627 return [p.to_dict() for p in pubs] 

628 

629 @staticmethod 

630 def get_provenance_record(db: Session) -> Dict: 

631 """Generate comprehensive provenance chain for the entire platform. 

632 

633 Aggregates: defensive publications, patents, moat measurements, 

634 code hashes, and verification snapshots — a single cryptographic 

635 chain of evidence for legal defence. 

636 """ 

637 from integrations.social.models import DefensivePublication 

638 

639 pubs = db.query(DefensivePublication).order_by( 

640 DefensivePublication.publication_date.asc()).all() 

641 

642 patents = IPService.list_patents(db) 

643 

644 moat = {} 

645 try: 

646 moat = IPService.measure_moat_depth() 

647 except Exception: 

648 pass 

649 

650 code_hash = None 

651 try: 

652 from security.node_integrity import compute_code_hash 

653 code_hash = compute_code_hash() 

654 except Exception: 

655 pass 

656 

657 verification = {} 

658 try: 

659 verification = IPService.verify_exponential_improvement(db) 

660 except Exception: 

661 pass 

662 

663 return { 

664 'generated_at': datetime.utcnow().isoformat(), 

665 'code_snapshot_hash': code_hash, 

666 'moat_depth': moat, 

667 'verification': verification, 

668 'defensive_publications': [p.to_dict() for p in pubs], 

669 'patents': patents, 

670 'total_publications': len(pubs), 

671 'total_patents': len(patents), 

672 'evidence_chain': [ 

673 { 

674 'type': 'defensive_publication', 

675 'id': p.id, 

676 'content_hash': p.content_hash, 

677 'timestamp': p.publication_date.isoformat() if p.publication_date else None, 

678 'signature': p.signature_hex, 

679 } 

680 for p in pubs 

681 ], 

682 } 

683 

684 @staticmethod 

685 def check_intelligence_milestone(db: Session, 

686 consecutive_days_required: int = 14, 

687 min_catch_up: str = 'months') -> Dict: 

688 """Check if critical intelligence threshold has been reached. 

689 

690 Auto-patent filing trigger. All 3 conditions must be met: 

691 1. verify_exponential_improvement() returns verified=True 

692 2. moat catch_up_estimate >= min_catch_up 

693 3. At least N consecutive verified defensive publications 

694 

695 The philosophy: we tread carefully. No premature filing. 

696 File only when the hive has proven itself over sustained time. 

697 """ 

698 from integrations.social.models import DefensivePublication 

699 

700 verification = IPService.verify_exponential_improvement(db) 

701 

702 moat = {} 

703 catch_up = 'unknown' 

704 try: 

705 moat = IPService.measure_moat_depth() 

706 catch_up = moat.get('competitor_catch_up_estimate', 'unknown') 

707 except Exception: 

708 pass 

709 

710 # Count consecutive verified publications (most recent first) 

711 pubs = db.query(DefensivePublication).order_by( 

712 DefensivePublication.publication_date.desc() 

713 ).limit(consecutive_days_required).all() 

714 

715 consecutive_verified = 0 

716 for p in pubs: 

717 snap = p.verification_snapshot or {} 

718 if snap.get('verified', False): 

719 consecutive_verified += 1 

720 else: 

721 break 

722 

723 catch_up_levels = [ 

724 'moat still shallow — grow network', 

725 'weeks', 'months', 'years', 'practically impossible', 

726 ] 

727 min_idx = catch_up_levels.index(min_catch_up) if min_catch_up in catch_up_levels else 2 

728 cur_idx = catch_up_levels.index(catch_up) if catch_up in catch_up_levels else 0 

729 

730 triggered = ( 

731 verification.get('verified', False) 

732 and consecutive_verified >= consecutive_days_required 

733 and cur_idx >= min_idx 

734 ) 

735 

736 return { 

737 'triggered': triggered, 

738 'consecutive_verified': consecutive_verified, 

739 'consecutive_required': consecutive_days_required, 

740 'moat_catch_up': catch_up, 

741 'min_catch_up_required': min_catch_up, 

742 'current_verification': verification, 

743 'moat_score': moat.get('moat_score', 0.0), 

744 }