Coverage for integrations / agent_engine / ip_service.py: 71.6%
285 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2IP Protection Service — Patent CRUD + Moat Verification
4Static service class (project pattern). All methods receive db: Session,
5call db.flush() not db.commit(). Caller owns the transaction.
7The real IP protection is not legal (patents) — it's technical irreproducibility.
8The system's value lives in ACCUMULATED LATENT STATE, not code:
9 - Latent dynamics trained on N million real interactions (uncopyable)
10 - HiveMind collective: N nodes × N edges = N² knowledge (network effect)
11 - MetaLearningRouter policy shaped by real create/reuse/compose decisions
12 - Kernel support vectors from real expert corrections
13 - LoRA task slots from real conceptual learning
14 - Episodic memory: years of VQ-compressed experiences
15 - Master key perimeter: cryptographic, non-forkable identity chain
17A competitor with full codebase starts at zero latent state.
18First online = exponential compounding advantage.
19"""
20import glob
21import logging
22import os
23from datetime import datetime, timedelta
24from typing import Dict, List, Optional
26from sqlalchemy.orm import Session
28logger = logging.getLogger('hevolve_social')
31class IPService:
32 """Static service for IP protection operations."""
34 @staticmethod
35 def create_patent(db: Session, title: str, claims: list,
36 abstract: str = '', description: str = '',
37 filing_type: str = 'provisional',
38 verification_metrics: dict = None,
39 evidence: list = None,
40 goal_id: str = None,
41 created_by: str = None) -> Dict:
42 """Create a patent draft."""
43 from integrations.social.models import IPPatent
45 patent = IPPatent(
46 title=title,
47 claims_json=claims or [],
48 abstract=abstract,
49 description=description,
50 filing_type=filing_type,
51 verification_metrics=verification_metrics or {},
52 evidence_json=evidence or [],
53 goal_id=goal_id,
54 created_by=created_by,
55 status='draft',
56 )
57 db.add(patent)
58 db.flush()
59 return patent.to_dict()
61 @staticmethod
62 def get_patent(db: Session, patent_id: str) -> Optional[Dict]:
63 """Get a single patent."""
64 from integrations.social.models import IPPatent
66 patent = db.query(IPPatent).filter_by(id=patent_id).first()
67 return patent.to_dict() if patent else None
69 @staticmethod
70 def list_patents(db: Session, status: str = None) -> List[Dict]:
71 """List patents with optional status filter."""
72 from integrations.social.models import IPPatent
74 q = db.query(IPPatent)
75 if status:
76 q = q.filter_by(status=status)
77 return [p.to_dict() for p in q.order_by(IPPatent.created_at.desc()).all()]
79 @staticmethod
80 def update_patent_status(db: Session, patent_id: str, status: str,
81 application_number: str = None,
82 patent_number: str = None) -> Optional[Dict]:
83 """Update patent status and optional filing details."""
84 from integrations.social.models import IPPatent
86 patent = db.query(IPPatent).filter_by(id=patent_id).first()
87 if not patent:
88 return None
89 patent.status = status
90 if application_number:
91 patent.application_number = application_number
92 if patent_number:
93 patent.patent_number = patent_number
94 if status == 'filed' and not patent.filing_date:
95 patent.filing_date = datetime.utcnow()
96 db.flush()
97 return patent.to_dict()
99 @staticmethod
100 def create_infringement(db: Session, patent_id: str,
101 infringer_name: str, infringer_url: str = '',
102 evidence_summary: str = '',
103 risk_level: str = 'low') -> Dict:
104 """Record a detected infringement."""
105 from integrations.social.models import IPInfringement
107 infringement = IPInfringement(
108 patent_id=patent_id,
109 infringer_name=infringer_name,
110 infringer_url=infringer_url,
111 evidence_summary=evidence_summary,
112 risk_level=risk_level,
113 status='detected',
114 )
115 db.add(infringement)
116 db.flush()
117 return infringement.to_dict()
119 @staticmethod
120 def update_infringement_status(db: Session, infringement_id: str,
121 status: str, notice_type: str = None,
122 notice_text: str = None) -> Optional[Dict]:
123 """Update infringement status and optional notice details."""
124 from integrations.social.models import IPInfringement
126 inf = db.query(IPInfringement).filter_by(id=infringement_id).first()
127 if not inf:
128 return None
129 inf.status = status
130 if notice_type:
131 inf.notice_type = notice_type
132 if notice_text:
133 inf.notice_text = notice_text
134 if status == 'notice_sent' and not inf.notice_sent_at:
135 inf.notice_sent_at = datetime.utcnow()
136 db.flush()
137 return inf.to_dict()
139 @staticmethod
140 def list_infringements(db: Session, patent_id: str = None,
141 status: str = None) -> List[Dict]:
142 """List infringements with optional filters."""
143 from integrations.social.models import IPInfringement
145 q = db.query(IPInfringement)
146 if patent_id:
147 q = q.filter_by(patent_id=patent_id)
148 if status:
149 q = q.filter_by(status=status)
150 return [i.to_dict() for i in q.order_by(IPInfringement.created_at.desc()).all()]
152 # ─── Flywheel verification ───
154 @staticmethod
155 def get_loop_health() -> Dict:
156 """Aggregate self-improving loop metrics from all live sources.
158 Checks 5 flywheel components:
159 1. World model (HevolveAI) health + learning stats
160 2. Agent goal completion rates
161 3. RALT skill propagation stats
162 4. Recipe reuse adoption rate
163 5. HiveMind connected agents
165 FLYWHEEL LOOPHOLE OWNERSHIP (each loophole has a responsible agent):
166 - Cold start → HiveMind bootstrap (tensor fusion gives instant collective knowledge)
167 - Single-node → Marketing Agent metric (grow network = more nodes = more learning)
168 - Feedback staleness → Coding Agent (identifies queue bottlenecks in code review)
169 - Recipe drift → Coding Agent (version-aware recipe validation during review)
170 - Gossip partition → Guardrails Agent (monitors network health deterministically)
171 - Guardrail drift → Guardrails Agent (dedicated guardrail integrity monitor)
173 ARCHITECTURE PRINCIPLE: Deterministic intelligence interleaved with probabilistic.
174 Every probabilistic (LLM) decision has a deterministic gate:
175 - Guardrails are deterministic (regex, hash, threshold) wrapping LLM output
176 - Recipe reuse is deterministic (exact replay) after LLM-generated CREATE
177 - RALT topology verification is deterministic before probabilistic skill fusion
178 - Circuit breaker is deterministic halt on anomaly detection
179 """
180 result = {
181 'world_model': {'healthy': False},
182 'agent_performance': {'total_goals': 0, 'completed': 0, 'success_rate': 0.0},
183 'ralt_propagation': {'total_distributed': 0, 'total_blocked': 0},
184 'recipe_adoption': {'total_recipes': 0, 'reuse_rate': 0.0},
185 'hivemind_agents': [],
186 'loop_verified': False,
187 'improvement_rate': 0.0,
188 'flywheel_loopholes': [],
189 }
191 # Loophole ownership map — which agent is responsible for fixing each
192 loophole_owners = {
193 'cold_start': 'hivemind', # HiveMind bootstrap gives instant knowledge
194 'single_node': 'marketing', # Marketing grows the node network
195 'feedback_staleness': 'coding', # Coding agent fixes flush pipeline
196 'recipe_drift': 'coding', # Coding agent validates recipe freshness
197 'guardrail_drift': 'guardrails', # Guardrails agent monitors integrity
198 'gossip_partition': 'guardrails', # Guardrails agent monitors network health
199 }
201 # 1. World model health
202 try:
203 from integrations.agent_engine.world_model_bridge import get_world_model_bridge
204 bridge = get_world_model_bridge()
205 health = bridge.check_health()
206 stats = bridge.get_learning_stats()
207 bridge_stats = bridge.get_stats()
208 result['world_model'] = {
209 'healthy': health.get('healthy', False),
210 'learning': stats.get('learning', {}),
211 'bridge': bridge_stats,
212 }
213 # Loophole: queue backup → coding agent should fix flush pipeline
214 queue_size = bridge_stats.get('queue_size', 0)
215 if queue_size > 500:
216 result['flywheel_loopholes'].append({
217 'type': 'feedback_staleness',
218 'owner': loophole_owners['feedback_staleness'],
219 'severity': 'high',
220 'message': f'Experience queue backing up ({queue_size} items)',
221 'remediation': 'Coding agent: optimize _flush_to_world_model batch size or add workers',
222 })
223 except Exception:
224 result['flywheel_loopholes'].append({
225 'type': 'cold_start',
226 'owner': loophole_owners['cold_start'],
227 'severity': 'critical',
228 'message': 'World model bridge unavailable — no learning happening',
229 'remediation': 'HiveMind bootstrap: connect to collective for instant knowledge',
230 })
232 # 2. Agent performance
233 try:
234 from integrations.social.models import get_db, AgentGoal
235 db = get_db()
236 try:
237 total = db.query(AgentGoal).count()
238 completed = db.query(AgentGoal).filter_by(status='completed').count()
239 result['agent_performance'] = {
240 'total_goals': total,
241 'completed': completed,
242 'success_rate': round(completed / total, 3) if total > 0 else 0.0,
243 }
244 if total < 100:
245 result['flywheel_loopholes'].append({
246 'type': 'single_node',
247 'owner': loophole_owners['single_node'],
248 'severity': 'medium',
249 'message': f'Insufficient goal volume ({total}) — need 100+',
250 'remediation': 'Marketing agent: grow user base to increase goal throughput',
251 })
252 finally:
253 db.close()
254 except Exception:
255 pass
257 # 3. RALT propagation
258 try:
259 from integrations.agent_engine.world_model_bridge import get_world_model_bridge
260 bridge = get_world_model_bridge()
261 bs = bridge.get_stats()
262 distributed = bs.get('total_skills_distributed', 0)
263 blocked = bs.get('total_skills_blocked', 0)
264 result['ralt_propagation'] = {
265 'total_distributed': distributed,
266 'total_blocked': blocked,
267 }
268 if distributed == 0:
269 result['flywheel_loopholes'].append({
270 'type': 'cold_start',
271 'owner': loophole_owners['cold_start'],
272 'severity': 'high',
273 'message': 'Zero RALT skills distributed — hive is not learning from peers',
274 'remediation': 'HiveMind bootstrap: first node seeds RALT from collective tensor fusion',
275 })
276 if blocked > distributed and distributed > 0:
277 result['flywheel_loopholes'].append({
278 'type': 'guardrail_drift',
279 'owner': loophole_owners['guardrail_drift'],
280 'severity': 'medium',
281 'message': f'More skills blocked ({blocked}) than distributed ({distributed})',
282 'remediation': 'Guardrails agent: review filter thresholds — '
283 'deterministic gates may be too restrictive',
284 })
285 except Exception:
286 pass
288 # 4. Recipe adoption
289 try:
290 prompts_dir = os.path.join(
291 os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
292 'prompts')
293 if os.path.isdir(prompts_dir):
294 all_prompts = glob.glob(os.path.join(prompts_dir, '*.json'))
295 recipes = [f for f in all_prompts if '_recipe.json' in f]
296 total_prompts = len([f for f in all_prompts
297 if '_recipe.json' not in f
298 and not f.endswith('_recipe.json')])
299 reuse_rate = round(len(recipes) / total_prompts, 3) if total_prompts > 0 else 0.0
300 result['recipe_adoption'] = {
301 'total_recipes': len(recipes),
302 'total_prompts': total_prompts,
303 'reuse_rate': reuse_rate,
304 }
305 if reuse_rate < 0.6 and total_prompts > 10:
306 result['flywheel_loopholes'].append({
307 'type': 'recipe_drift',
308 'owner': loophole_owners['recipe_drift'],
309 'severity': 'medium',
310 'message': f'Recipe reuse rate {reuse_rate:.0%} below 60% threshold',
311 'remediation': 'Coding agent: add recipe versioning — deterministic '
312 'staleness check before probabilistic re-creation',
313 })
314 except Exception:
315 pass
317 # 5. HiveMind agents
318 try:
319 from integrations.agent_engine.world_model_bridge import get_world_model_bridge
320 bridge = get_world_model_bridge()
321 agents = bridge.get_hivemind_agents()
322 result['hivemind_agents'] = agents
323 if len(agents) < 3:
324 result['flywheel_loopholes'].append({
325 'type': 'single_node',
326 'owner': loophole_owners['single_node'],
327 'severity': 'high',
328 'message': f'Only {len(agents)} HiveMind agents connected — need 3+',
329 'remediation': 'Marketing agent: grow node count; HiveMind bootstraps '
330 'new nodes with collective knowledge instantly',
331 })
332 except Exception:
333 pass
335 return result
337 @staticmethod
338 def measure_moat_depth() -> Dict:
339 """Quantify technical irreproducibility — how far ahead of a code-clone.
341 The moat is not code (copyable). The moat is accumulated latent state:
342 - Latent dynamics trained on real interactions
343 - HiveMind collective knowledge (N² network effect)
344 - MetaRouter policy (REINFORCE-trained on real decisions)
345 - Kernel support vectors (real expert corrections)
346 - Episodic memory (years of compressed experiences)
347 - Recipe library (real CREATE→REUSE chains)
348 - Master key chain (cryptographic identity, non-forkable)
350 A competitor starting today with identical code starts at zero.
351 This method measures how many zero-state dimensions they'd need to fill.
352 """
353 moat = {
354 'latent_interactions': 0, # Total interactions training latent state
355 'hivemind_nodes': 0, # N nodes → N² knowledge edges
356 'hivemind_knowledge_edges': 0,
357 'meta_router_decisions': 0, # REINFORCE training samples
358 'kernel_corrections': 0, # Expert corrections (instant, no gradient)
359 'episodic_experiences': 0, # VQ-compressed episodes
360 'recipe_count': 0, # CREATE→REUSE recipes
361 'master_key_verified_nodes': 0,
362 'moat_score': 0.0, # Composite irreproducibility score
363 'competitor_catch_up_estimate': 'unknown',
364 }
366 # 1. Latent interactions (world model bridge stats)
367 try:
368 from integrations.agent_engine.world_model_bridge import get_world_model_bridge
369 bridge = get_world_model_bridge()
370 stats = bridge.get_stats()
371 recorded = stats.get('total_recorded', 0)
372 flushed = stats.get('total_flushed', 0)
373 corrections = stats.get('total_corrections', 0)
374 hivemind_queries = stats.get('total_hivemind_queries', 0)
375 moat['latent_interactions'] = recorded
376 moat['kernel_corrections'] = corrections
377 moat['meta_router_decisions'] = flushed # Each flush = training data
378 except Exception:
379 pass
381 # 2. HiveMind network effect
382 try:
383 from integrations.agent_engine.world_model_bridge import get_world_model_bridge
384 bridge = get_world_model_bridge()
385 agents = bridge.get_hivemind_agents()
386 n = len(agents)
387 moat['hivemind_nodes'] = n
388 moat['hivemind_knowledge_edges'] = n * (n - 1) // 2 # N choose 2
389 except Exception:
390 pass
392 # 3. Agent goals completed (each = latent state improvement)
393 try:
394 from integrations.social.models import get_db, AgentGoal
395 db = get_db()
396 try:
397 moat['episodic_experiences'] = db.query(AgentGoal).count()
398 finally:
399 db.close()
400 except Exception:
401 pass
403 # 4. Recipe library
404 try:
405 prompts_dir = os.path.join(
406 os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
407 'prompts')
408 if os.path.isdir(prompts_dir):
409 recipes = glob.glob(os.path.join(prompts_dir, '*_recipe.json'))
410 moat['recipe_count'] = len(recipes)
411 except Exception:
412 pass
414 # 5. Master key verified nodes
415 try:
416 from integrations.social.models import get_db, PeerNode
417 db = get_db()
418 try:
419 verified = db.query(PeerNode).filter_by(
420 master_key_verified=True).count()
421 moat['master_key_verified_nodes'] = verified
422 finally:
423 db.close()
424 except Exception:
425 pass
427 # Composite moat score (logarithmic — diminishing returns but always growing)
428 import math
429 score = 0.0
430 score += math.log1p(moat['latent_interactions']) * 10 # Each interaction deepens latent
431 score += math.log1p(moat['hivemind_knowledge_edges']) * 20 # Network effect most valuable
432 score += math.log1p(moat['kernel_corrections']) * 15 # Expert corrections rare & valuable
433 score += math.log1p(moat['recipe_count']) * 5 # Recipes = deterministic speedup
434 score += math.log1p(moat['master_key_verified_nodes']) * 10 # Non-forkable trust chain
435 moat['moat_score'] = round(score, 2)
437 # Catch-up estimate
438 interactions = moat['latent_interactions']
439 nodes = moat['hivemind_nodes']
440 if interactions > 1_000_000 and nodes > 100:
441 moat['competitor_catch_up_estimate'] = 'practically impossible'
442 elif interactions > 100_000 and nodes > 10:
443 moat['competitor_catch_up_estimate'] = 'years'
444 elif interactions > 10_000:
445 moat['competitor_catch_up_estimate'] = 'months'
446 elif interactions > 1_000:
447 moat['competitor_catch_up_estimate'] = 'weeks'
448 else:
449 moat['competitor_catch_up_estimate'] = 'moat still shallow — grow network'
451 return moat
453 @staticmethod
454 def verify_exponential_improvement(db: Session, days: int = 30) -> Dict:
455 """Check if the self-improving loop shows genuine improvement.
457 Verification criteria (all 5 must pass for verified=True):
458 1. World model is healthy and responding
459 2. Agent task success rate > 50% over 100+ goals
460 3. RALT skills propagated to >3 nodes
461 4. Recipe reuse rate > 60%
462 5. No critical flywheel loopholes
464 Returns {verified: bool, metrics: {...}, evidence: [...], loopholes: [...]}
465 """
466 health = IPService.get_loop_health()
467 evidence = []
468 checks_passed = 0
469 total_checks = 5
471 # Check 1: World model healthy
472 wm_healthy = health['world_model'].get('healthy', False)
473 if wm_healthy:
474 checks_passed += 1
475 evidence.append('World model (HevolveAI) is healthy and auto-learning')
476 else:
477 evidence.append('FAIL: World model not healthy or unreachable')
479 # Check 2: Agent success rate
480 perf = health['agent_performance']
481 success_rate = perf.get('success_rate', 0)
482 total_goals = perf.get('total_goals', 0)
483 if total_goals >= 100 and success_rate > 0.5:
484 checks_passed += 1
485 evidence.append(
486 f'Agent success rate {success_rate:.1%} over {total_goals} goals')
487 else:
488 evidence.append(
489 f'FAIL: Need 100+ goals with >50% success '
490 f'(have {total_goals} goals, {success_rate:.1%} rate)')
492 # Check 3: RALT propagation
493 ralt = health['ralt_propagation']
494 if ralt.get('total_distributed', 0) >= 3:
495 checks_passed += 1
496 evidence.append(
497 f'RALT distributed {ralt["total_distributed"]} skills across hive')
498 else:
499 evidence.append(
500 f'FAIL: Only {ralt.get("total_distributed", 0)} RALT skills '
501 f'distributed (need 3+)')
503 # Check 4: Recipe reuse
504 recipe = health['recipe_adoption']
505 reuse_rate = recipe.get('reuse_rate', 0)
506 if reuse_rate >= 0.6:
507 checks_passed += 1
508 evidence.append(f'Recipe reuse rate {reuse_rate:.0%}')
509 else:
510 evidence.append(
511 f'FAIL: Recipe reuse rate {reuse_rate:.0%} below 60% threshold')
513 # Check 5: No critical loopholes (severity='critical' or 'high')
514 loopholes = health.get('flywheel_loopholes', [])
515 critical_loopholes = [l for l in loopholes
516 if isinstance(l, dict) and
517 l.get('severity') in ('critical', 'high')]
518 if not critical_loopholes:
519 checks_passed += 1
520 evidence.append('No critical flywheel loopholes detected')
521 else:
522 first = critical_loopholes[0]
523 owner = first.get('owner', '?')
524 msg = first.get('message', '')[:80]
525 evidence.append(
526 f'FAIL: {len(critical_loopholes)} critical loopholes — '
527 f'first: [{owner}] {msg}')
529 verified = checks_passed == total_checks
530 improvement_rate = (checks_passed / total_checks) * 100
532 return {
533 'verified': verified,
534 'checks_passed': checks_passed,
535 'total_checks': total_checks,
536 'improvement_rate': improvement_rate,
537 'metrics': {
538 'world_model_healthy': wm_healthy,
539 'agent_success_rate': success_rate,
540 'total_goals': total_goals,
541 'ralt_distributed': ralt.get('total_distributed', 0),
542 'recipe_reuse_rate': reuse_rate,
543 'hivemind_agents': len(health.get('hivemind_agents', [])),
544 },
545 'evidence': evidence,
546 'loopholes': loopholes,
547 }
549 # ─── Defensive IP (prior art proof, not patents) ───
551 @staticmethod
552 def create_defensive_publication(db: Session, title: str, content: str,
553 abstract: str = '',
554 git_commit: str = None,
555 created_by: str = None) -> Dict:
556 """Create a timestamped defensive publication (prior art proof).
558 NOT a patent — evidence of prior invention:
559 - SHA-256 hash of content (proves exact content existed at timestamp)
560 - Git commit hash (ties to specific codebase state)
561 - Code snapshot hash (ties to full project state)
562 - Node signature (cryptographic non-repudiation)
563 - Moat score snapshot (cumulative latent state at time)
565 If anyone files a patent on something we published first, THIS is prior art.
566 """
567 import hashlib
568 from integrations.social.models import DefensivePublication
570 content_hash = hashlib.sha256(content.encode('utf-8')).hexdigest()
572 # Code snapshot hash
573 code_hash = None
574 try:
575 from security.node_integrity import compute_code_hash
576 code_hash = compute_code_hash()
577 except Exception:
578 pass
580 # Node signature
581 node_key = None
582 sig_hex = None
583 try:
584 from security.node_integrity import get_public_key_hex, sign_message
585 node_key = get_public_key_hex()
586 sig_hex = sign_message(content_hash.encode('utf-8')).hex()
587 except Exception:
588 pass
590 # Snapshot moat depth
591 moat_score = 0.0
592 try:
593 moat = IPService.measure_moat_depth()
594 moat_score = moat.get('moat_score', 0.0)
595 except Exception:
596 pass
598 # Snapshot verification
599 verification = {}
600 try:
601 verification = IPService.verify_exponential_improvement(db)
602 except Exception:
603 pass
605 pub = DefensivePublication(
606 title=title,
607 abstract=abstract,
608 content_hash=content_hash,
609 git_commit_hash=git_commit,
610 code_snapshot_hash=code_hash,
611 signed_by_node_key=node_key,
612 signature_hex=sig_hex,
613 moat_score_at_publication=moat_score,
614 verification_snapshot=verification,
615 created_by=created_by,
616 )
617 db.add(pub)
618 db.flush()
619 return pub.to_dict()
621 @staticmethod
622 def list_defensive_publications(db: Session) -> List[Dict]:
623 """List all defensive publications in chronological order."""
624 from integrations.social.models import DefensivePublication
625 pubs = db.query(DefensivePublication).order_by(
626 DefensivePublication.publication_date.desc()).all()
627 return [p.to_dict() for p in pubs]
629 @staticmethod
630 def get_provenance_record(db: Session) -> Dict:
631 """Generate comprehensive provenance chain for the entire platform.
633 Aggregates: defensive publications, patents, moat measurements,
634 code hashes, and verification snapshots — a single cryptographic
635 chain of evidence for legal defence.
636 """
637 from integrations.social.models import DefensivePublication
639 pubs = db.query(DefensivePublication).order_by(
640 DefensivePublication.publication_date.asc()).all()
642 patents = IPService.list_patents(db)
644 moat = {}
645 try:
646 moat = IPService.measure_moat_depth()
647 except Exception:
648 pass
650 code_hash = None
651 try:
652 from security.node_integrity import compute_code_hash
653 code_hash = compute_code_hash()
654 except Exception:
655 pass
657 verification = {}
658 try:
659 verification = IPService.verify_exponential_improvement(db)
660 except Exception:
661 pass
663 return {
664 'generated_at': datetime.utcnow().isoformat(),
665 'code_snapshot_hash': code_hash,
666 'moat_depth': moat,
667 'verification': verification,
668 'defensive_publications': [p.to_dict() for p in pubs],
669 'patents': patents,
670 'total_publications': len(pubs),
671 'total_patents': len(patents),
672 'evidence_chain': [
673 {
674 'type': 'defensive_publication',
675 'id': p.id,
676 'content_hash': p.content_hash,
677 'timestamp': p.publication_date.isoformat() if p.publication_date else None,
678 'signature': p.signature_hex,
679 }
680 for p in pubs
681 ],
682 }
684 @staticmethod
685 def check_intelligence_milestone(db: Session,
686 consecutive_days_required: int = 14,
687 min_catch_up: str = 'months') -> Dict:
688 """Check if critical intelligence threshold has been reached.
690 Auto-patent filing trigger. All 3 conditions must be met:
691 1. verify_exponential_improvement() returns verified=True
692 2. moat catch_up_estimate >= min_catch_up
693 3. At least N consecutive verified defensive publications
695 The philosophy: we tread carefully. No premature filing.
696 File only when the hive has proven itself over sustained time.
697 """
698 from integrations.social.models import DefensivePublication
700 verification = IPService.verify_exponential_improvement(db)
702 moat = {}
703 catch_up = 'unknown'
704 try:
705 moat = IPService.measure_moat_depth()
706 catch_up = moat.get('competitor_catch_up_estimate', 'unknown')
707 except Exception:
708 pass
710 # Count consecutive verified publications (most recent first)
711 pubs = db.query(DefensivePublication).order_by(
712 DefensivePublication.publication_date.desc()
713 ).limit(consecutive_days_required).all()
715 consecutive_verified = 0
716 for p in pubs:
717 snap = p.verification_snapshot or {}
718 if snap.get('verified', False):
719 consecutive_verified += 1
720 else:
721 break
723 catch_up_levels = [
724 'moat still shallow — grow network',
725 'weeks', 'months', 'years', 'practically impossible',
726 ]
727 min_idx = catch_up_levels.index(min_catch_up) if min_catch_up in catch_up_levels else 2
728 cur_idx = catch_up_levels.index(catch_up) if catch_up in catch_up_levels else 0
730 triggered = (
731 verification.get('verified', False)
732 and consecutive_verified >= consecutive_days_required
733 and cur_idx >= min_idx
734 )
736 return {
737 'triggered': triggered,
738 'consecutive_verified': consecutive_verified,
739 'consecutive_required': consecutive_days_required,
740 'moat_catch_up': catch_up,
741 'min_catch_up_required': min_catch_up,
742 'current_verification': verification,
743 'moat_score': moat.get('moat_score', 0.0),
744 }