Coverage for integrations / agent_engine / ip_protection_tools.py: 19.5%
123 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Unified Agent Goal Engine - IP Protection Tools (Tier 2)
4Loaded ONLY when the agent is working on an ip_protection goal.
5Follows exact same pattern as marketing_tools.py.
7Tier 1 (Default): google_search, text_2_image, delegate_to_specialist, etc.
8Tier 2 (Category): verify_loop, draft_claims, draft_patent, check_prior_art,
9 monitor_infringement, generate_cease_desist, get_loop_health
10Tier 3 (Runtime): delegate_to_specialist finds agents with needed skills via A2A
11"""
12import json
13import logging
14from typing import Annotated, Optional
16logger = logging.getLogger('hevolve_social')
19def register_ip_protection_tools(helper, assistant, user_id: str):
20 """Register IP protection tools with the agent (Tier 2).
22 These wrap IPService + world model bridge — no new logic, just tool
23 interfaces that let the agent verify the flywheel, draft patents,
24 check prior art, and monitor infringement.
26 Args:
27 helper: AutoGen helper agent (registers for LLM)
28 assistant: AutoGen assistant agent (registers for execution)
29 user_id: Current user ID for ownership
30 """
32 def verify_self_improvement_loop(
33 days: Annotated[int, "Number of days to analyze (default 30)"] = 30,
34 ) -> str:
35 """Verify the self-improving loop is working: world model health,
36 agent success rates, RALT propagation, recipe reuse, HiveMind agents.
37 Returns verification status with evidence and detected loopholes."""
38 try:
39 from integrations.social.models import get_db
40 from .ip_service import IPService
41 db = get_db()
42 try:
43 result = IPService.verify_exponential_improvement(db, days=days)
44 return json.dumps(result)
45 finally:
46 db.close()
47 except Exception as e:
48 return json.dumps({'success': False, 'error': str(e)})
50 def draft_patent_claims(
51 invention_title: Annotated[str, "Title of the invention"],
52 claim_areas: Annotated[str, "Comma-separated claim areas: hive_distributed_inference,self_improving_loop,ralt_skill_propagation,recipe_pattern,guardian_angel_architecture"],
53 evidence_summary: Annotated[str, "Summary of evidence supporting novelty"],
54 ) -> str:
55 """Draft formal patent claims in USPTO format. Saves as IPPatent
56 with status='draft'. Returns the created patent record."""
57 try:
58 from integrations.social.models import get_db
59 from .ip_service import IPService
60 db = get_db()
61 try:
62 areas = [a.strip() for a in claim_areas.split(',')]
63 claims = []
64 for i, area in enumerate(areas, 1):
65 claims.append({
66 'claim_number': i,
67 'type': 'independent',
68 'area': area,
69 'text': f'[DRAFT] A method and system for {area.replace("_", " ")}...',
70 })
71 result = IPService.create_patent(
72 db,
73 title=invention_title,
74 claims=claims,
75 abstract=f'Patent application for: {invention_title}',
76 description=evidence_summary,
77 filing_type='provisional',
78 evidence=[{'type': 'verification', 'summary': evidence_summary}],
79 created_by=str(user_id),
80 )
81 db.commit()
82 return json.dumps({'success': True, 'patent': result})
83 finally:
84 db.close()
85 except Exception as e:
86 return json.dumps({'success': False, 'error': str(e)})
88 def draft_provisional_patent(
89 patent_id: Annotated[str, "ID of existing draft patent to build into provisional"],
90 inventors: Annotated[str, "Comma-separated inventor names"],
91 assignee: Annotated[str, "Assignee organization name"] = 'HART AI',
92 ) -> str:
93 """Build complete USPTO provisional patent application from a draft.
94 Updates the patent with full provisional structure."""
95 try:
96 from integrations.social.models import get_db
97 from .ip_service import IPService
98 db = get_db()
99 try:
100 patent = IPService.get_patent(db, patent_id)
101 if not patent:
102 return json.dumps({'success': False, 'error': 'Patent not found'})
104 # Capture loop health as verification evidence
105 health = IPService.get_loop_health()
106 verification = IPService.verify_exponential_improvement(db)
108 updated = IPService.update_patent_status(
109 db, patent_id, 'provisional')
110 if updated:
111 from integrations.social.models import IPPatent
112 p = db.query(IPPatent).filter_by(id=patent_id).first()
113 if p:
114 p.verification_metrics = {
115 'loop_health': health,
116 'verification': verification,
117 'inventors': [i.strip() for i in inventors.split(',')],
118 'assignee': assignee,
119 }
120 db.flush()
121 updated = p.to_dict()
122 db.commit()
123 return json.dumps({'success': True, 'patent': updated})
124 finally:
125 db.close()
126 except Exception as e:
127 return json.dumps({'success': False, 'error': str(e)})
129 def check_prior_art(
130 claim_text: Annotated[str, "The patent claim text to check for novelty"],
131 search_scope: Annotated[str, "Search scope: domestic|worldwide"] = 'worldwide',
132 ) -> str:
133 """Search for prior art using HiveMind collective thinking.
134 Queries the hive for distributed knowledge about similar patents
135 and architectures. Returns novelty assessment."""
136 try:
137 # Use HiveMind for distributed prior art analysis
138 from integrations.agent_engine.world_model_bridge import get_world_model_bridge
139 bridge = get_world_model_bridge()
141 collective = bridge.query_hivemind(
142 f"Assess novelty of this patent claim against known distributed "
143 f"computing and AI architectures. Scope: {search_scope}. "
144 f"Claim: {claim_text[:1000]}",
145 timeout_ms=5000,
146 )
148 result = {
149 'claim_text': claim_text[:200],
150 'search_scope': search_scope,
151 'hivemind_analysis': collective,
152 'note': 'HiveMind-assisted analysis. Manual USPTO/WIPO search '
153 'recommended for formal filing.',
154 }
155 return json.dumps(result)
156 except Exception as e:
157 return json.dumps({'success': False, 'error': str(e)})
159 def monitor_infringement(
160 architecture_fingerprint: Annotated[str, "Description of the architecture to protect"],
161 scan_targets: Annotated[str, "Comma-separated scan targets: github.com,arxiv.org,huggingface.co"] = 'github.com,arxiv.org',
162 ) -> str:
163 """Scan for potential infringement of the hive architecture patents.
164 Uses HiveMind for distributed scanning across connected agents.
165 Returns matches with risk levels."""
166 try:
167 from integrations.agent_engine.world_model_bridge import get_world_model_bridge
168 bridge = get_world_model_bridge()
170 targets = [t.strip() for t in scan_targets.split(',')]
171 collective = bridge.query_hivemind(
172 f"Scan for similar architectures to: {architecture_fingerprint[:500]}. "
173 f"Focus on: {', '.join(targets)}. "
174 f"Look for: distributed hive compute using user machines, "
175 f"self-improving agent loops, RALT-like skill propagation, "
176 f"recipe-based agent execution patterns.",
177 timeout_ms=10000,
178 )
180 result = {
181 'architecture_fingerprint': architecture_fingerprint[:200],
182 'scan_targets': targets,
183 'hivemind_analysis': collective,
184 'note': 'Automated scan — results require human review before '
185 'legal action.',
186 }
187 return json.dumps(result)
188 except Exception as e:
189 return json.dumps({'success': False, 'error': str(e)})
191 def generate_cease_desist(
192 infringer_name: Annotated[str, "Name of the infringing party"],
193 infringer_url: Annotated[str, "URL of the infringing product/service"],
194 patent_id: Annotated[str, "Patent ID being infringed"],
195 evidence: Annotated[str, "Description of infringement evidence"],
196 ) -> str:
197 """Generate a cease-and-desist notice and record the infringement.
198 Creates an IPInfringement record linked to the patent."""
199 try:
200 from integrations.social.models import get_db
201 from .ip_service import IPService
202 db = get_db()
203 try:
204 # Record infringement
205 infringement = IPService.create_infringement(
206 db,
207 patent_id=patent_id,
208 infringer_name=infringer_name,
209 infringer_url=infringer_url,
210 evidence_summary=evidence,
211 risk_level='high',
212 )
214 # Get patent details for the notice
215 patent = IPService.get_patent(db, patent_id)
216 patent_title = patent['title'] if patent else 'Unknown'
218 notice_text = (
219 f"CEASE AND DESIST NOTICE\n\n"
220 f"To: {infringer_name}\n"
221 f"Re: Infringement of Patent Application: {patent_title}\n\n"
222 f"We have identified that your product/service at {infringer_url} "
223 f"infringes upon our patent claims covering:\n"
224 f"- Distributed hive compute using user machines\n"
225 f"- Self-improving agent loop architecture\n"
226 f"- Reality-Anchored Latent Transfer (RALT) skill propagation\n\n"
227 f"Evidence: {evidence}\n\n"
228 f"We demand that you immediately cease and desist from all "
229 f"activities that infringe upon our intellectual property rights.\n\n"
230 f"[DRAFT — Requires legal review before sending]\n"
231 )
233 # Update with notice
234 IPService.update_infringement_status(
235 db, infringement['id'], 'reviewed',
236 notice_type='cease_desist',
237 notice_text=notice_text,
238 )
240 db.commit()
241 return json.dumps({
242 'success': True,
243 'infringement': infringement,
244 'notice_text': notice_text,
245 'warning': 'DRAFT notice — requires legal counsel review '
246 'before sending.',
247 })
248 finally:
249 db.close()
250 except Exception as e:
251 return json.dumps({'success': False, 'error': str(e)})
253 def get_loop_health() -> str:
254 """Get real-time dashboard of the self-improving loop:
255 world model stats, agent performance, RALT propagation,
256 recipe adoption, HiveMind agents, and detected flywheel loopholes."""
257 try:
258 from .ip_service import IPService
259 health = IPService.get_loop_health()
260 return json.dumps(health)
261 except Exception as e:
262 return json.dumps({'success': False, 'error': str(e)})
264 def measure_moat() -> str:
265 """Measure technical irreproducibility — how far ahead of a code clone.
266 The moat is accumulated latent state, not code:
267 - Latent dynamics trained on real interactions (uncopyable)
268 - HiveMind N² network effect (more nodes = exponentially more knowledge)
269 - MetaRouter REINFORCE policy (trained on real decisions)
270 - Kernel support vectors (real expert corrections)
271 - Episodic memory (years of compressed experiences)
272 - Master key chain (cryptographic, non-forkable)
273 Returns moat score and competitor catch-up estimate."""
274 try:
275 from .ip_service import IPService
276 return json.dumps(IPService.measure_moat_depth())
277 except Exception as e:
278 return json.dumps({'success': False, 'error': str(e)})
280 # Register all IP protection tools
281 tools = [
282 ('verify_self_improvement_loop',
283 'Verify the self-improving loop is working with evidence and loophole detection',
284 verify_self_improvement_loop),
285 ('draft_patent_claims',
286 'Draft formal patent claims in USPTO format for the hive architecture',
287 draft_patent_claims),
288 ('draft_provisional_patent',
289 'Build complete USPTO provisional patent from existing draft',
290 draft_provisional_patent),
291 ('check_prior_art',
292 'Search for prior art using HiveMind collective intelligence',
293 check_prior_art),
294 ('monitor_infringement',
295 'Scan for infringement of hive architecture patents',
296 monitor_infringement),
297 ('generate_cease_desist',
298 'Generate cease-and-desist notice for detected infringement',
299 generate_cease_desist),
300 ('get_loop_health',
301 'Get real-time self-improving loop health dashboard with loopholes',
302 get_loop_health),
303 ('measure_moat',
304 'Measure technical irreproducibility — latent dynamics moat depth vs code clone',
305 measure_moat),
306 ]
308 # ─── Defensive IP tools ───
310 def create_defensive_pub(
311 title: Annotated[str, "Title of the defensive publication"],
312 content: Annotated[str, "Full content to publish (hashed for proof)"],
313 abstract: Annotated[str, "Brief abstract"] = '',
314 ) -> str:
315 """Create a timestamped defensive publication — prior art proof, not a patent."""
316 from integrations.social.models import get_db
317 from .ip_service import IPService
318 import subprocess
320 db = get_db()
321 try:
322 git_commit = None
323 try:
324 _kw = dict(capture_output=True, text=True, timeout=5)
325 if hasattr(subprocess, 'CREATE_NO_WINDOW'):
326 _kw['creationflags'] = subprocess.CREATE_NO_WINDOW
327 result = subprocess.run(
328 ['git', 'rev-parse', 'HEAD'], **_kw)
329 if result.returncode == 0:
330 git_commit = result.stdout.strip()
331 except Exception:
332 pass
334 pub = IPService.create_defensive_publication(
335 db, title=title, content=content,
336 abstract=abstract, git_commit=git_commit,
337 created_by=user_id)
338 db.commit()
339 return json.dumps({'created': True, 'publication': pub})
340 except Exception as e:
341 db.rollback()
342 return json.dumps({'error': str(e)})
343 finally:
344 db.close()
346 def get_provenance_record() -> str:
347 """Get the full provenance chain — all defensive publications, patents, and evidence."""
348 from integrations.social.models import get_db
349 from .ip_service import IPService
351 db = get_db()
352 try:
353 record = IPService.get_provenance_record(db)
354 return json.dumps(record, default=str)
355 finally:
356 db.close()
358 tools.extend([
359 ('create_defensive_pub',
360 'Create a timestamped defensive publication — prior art proof for legal defence',
361 create_defensive_pub),
362 ('get_provenance_record',
363 'Get the full provenance chain: all publications, patents, moat, and evidence',
364 get_provenance_record),
365 ])
367 for name, desc, func in tools:
368 helper.register_for_llm(name=name, description=desc)(func)
369 assistant.register_for_execution(name=name)(func)
371 logger.info(f"Registered {len(tools)} IP protection tools for user {user_id}")
373 # Register skills for A2A discovery
374 try:
375 from integrations.internal_comm.internal_agent_communication import register_agent_with_skills
376 register_agent_with_skills(f"ip_protection_{user_id}", [
377 {'name': 'patent_filing', 'description': 'Draft and file patent applications', 'proficiency': 0.9},
378 {'name': 'prior_art_search', 'description': 'Search for prior art and assess novelty', 'proficiency': 0.8},
379 {'name': 'infringement_detection', 'description': 'Monitor for IP infringement', 'proficiency': 0.8},
380 {'name': 'loop_verification', 'description': 'Verify self-improving loop health', 'proficiency': 0.95},
381 ])
382 except Exception as e:
383 logger.debug(f"IP protection skill registration skipped: {e}")