Coverage for integrations / agent_engine / ip_protection_tools.py: 19.5%

123 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Unified Agent Goal Engine - IP Protection Tools (Tier 2) 

3 

4Loaded ONLY when the agent is working on an ip_protection goal. 

5Follows exact same pattern as marketing_tools.py. 

6 

7Tier 1 (Default): google_search, text_2_image, delegate_to_specialist, etc. 

8Tier 2 (Category): verify_loop, draft_claims, draft_patent, check_prior_art, 

9 monitor_infringement, generate_cease_desist, get_loop_health 

10Tier 3 (Runtime): delegate_to_specialist finds agents with needed skills via A2A 

11""" 

12import json 

13import logging 

14from typing import Annotated, Optional 

15 

16logger = logging.getLogger('hevolve_social') 

17 

18 

19def register_ip_protection_tools(helper, assistant, user_id: str): 

20 """Register IP protection tools with the agent (Tier 2). 

21 

22 These wrap IPService + world model bridge — no new logic, just tool 

23 interfaces that let the agent verify the flywheel, draft patents, 

24 check prior art, and monitor infringement. 

25 

26 Args: 

27 helper: AutoGen helper agent (registers for LLM) 

28 assistant: AutoGen assistant agent (registers for execution) 

29 user_id: Current user ID for ownership 

30 """ 

31 

32 def verify_self_improvement_loop( 

33 days: Annotated[int, "Number of days to analyze (default 30)"] = 30, 

34 ) -> str: 

35 """Verify the self-improving loop is working: world model health, 

36 agent success rates, RALT propagation, recipe reuse, HiveMind agents. 

37 Returns verification status with evidence and detected loopholes.""" 

38 try: 

39 from integrations.social.models import get_db 

40 from .ip_service import IPService 

41 db = get_db() 

42 try: 

43 result = IPService.verify_exponential_improvement(db, days=days) 

44 return json.dumps(result) 

45 finally: 

46 db.close() 

47 except Exception as e: 

48 return json.dumps({'success': False, 'error': str(e)}) 

49 

50 def draft_patent_claims( 

51 invention_title: Annotated[str, "Title of the invention"], 

52 claim_areas: Annotated[str, "Comma-separated claim areas: hive_distributed_inference,self_improving_loop,ralt_skill_propagation,recipe_pattern,guardian_angel_architecture"], 

53 evidence_summary: Annotated[str, "Summary of evidence supporting novelty"], 

54 ) -> str: 

55 """Draft formal patent claims in USPTO format. Saves as IPPatent 

56 with status='draft'. Returns the created patent record.""" 

57 try: 

58 from integrations.social.models import get_db 

59 from .ip_service import IPService 

60 db = get_db() 

61 try: 

62 areas = [a.strip() for a in claim_areas.split(',')] 

63 claims = [] 

64 for i, area in enumerate(areas, 1): 

65 claims.append({ 

66 'claim_number': i, 

67 'type': 'independent', 

68 'area': area, 

69 'text': f'[DRAFT] A method and system for {area.replace("_", " ")}...', 

70 }) 

71 result = IPService.create_patent( 

72 db, 

73 title=invention_title, 

74 claims=claims, 

75 abstract=f'Patent application for: {invention_title}', 

76 description=evidence_summary, 

77 filing_type='provisional', 

78 evidence=[{'type': 'verification', 'summary': evidence_summary}], 

79 created_by=str(user_id), 

80 ) 

81 db.commit() 

82 return json.dumps({'success': True, 'patent': result}) 

83 finally: 

84 db.close() 

85 except Exception as e: 

86 return json.dumps({'success': False, 'error': str(e)}) 

87 

88 def draft_provisional_patent( 

89 patent_id: Annotated[str, "ID of existing draft patent to build into provisional"], 

90 inventors: Annotated[str, "Comma-separated inventor names"], 

91 assignee: Annotated[str, "Assignee organization name"] = 'HART AI', 

92 ) -> str: 

93 """Build complete USPTO provisional patent application from a draft. 

94 Updates the patent with full provisional structure.""" 

95 try: 

96 from integrations.social.models import get_db 

97 from .ip_service import IPService 

98 db = get_db() 

99 try: 

100 patent = IPService.get_patent(db, patent_id) 

101 if not patent: 

102 return json.dumps({'success': False, 'error': 'Patent not found'}) 

103 

104 # Capture loop health as verification evidence 

105 health = IPService.get_loop_health() 

106 verification = IPService.verify_exponential_improvement(db) 

107 

108 updated = IPService.update_patent_status( 

109 db, patent_id, 'provisional') 

110 if updated: 

111 from integrations.social.models import IPPatent 

112 p = db.query(IPPatent).filter_by(id=patent_id).first() 

113 if p: 

114 p.verification_metrics = { 

115 'loop_health': health, 

116 'verification': verification, 

117 'inventors': [i.strip() for i in inventors.split(',')], 

118 'assignee': assignee, 

119 } 

120 db.flush() 

121 updated = p.to_dict() 

122 db.commit() 

123 return json.dumps({'success': True, 'patent': updated}) 

124 finally: 

125 db.close() 

126 except Exception as e: 

127 return json.dumps({'success': False, 'error': str(e)}) 

128 

129 def check_prior_art( 

130 claim_text: Annotated[str, "The patent claim text to check for novelty"], 

131 search_scope: Annotated[str, "Search scope: domestic|worldwide"] = 'worldwide', 

132 ) -> str: 

133 """Search for prior art using HiveMind collective thinking. 

134 Queries the hive for distributed knowledge about similar patents 

135 and architectures. Returns novelty assessment.""" 

136 try: 

137 # Use HiveMind for distributed prior art analysis 

138 from integrations.agent_engine.world_model_bridge import get_world_model_bridge 

139 bridge = get_world_model_bridge() 

140 

141 collective = bridge.query_hivemind( 

142 f"Assess novelty of this patent claim against known distributed " 

143 f"computing and AI architectures. Scope: {search_scope}. " 

144 f"Claim: {claim_text[:1000]}", 

145 timeout_ms=5000, 

146 ) 

147 

148 result = { 

149 'claim_text': claim_text[:200], 

150 'search_scope': search_scope, 

151 'hivemind_analysis': collective, 

152 'note': 'HiveMind-assisted analysis. Manual USPTO/WIPO search ' 

153 'recommended for formal filing.', 

154 } 

155 return json.dumps(result) 

156 except Exception as e: 

157 return json.dumps({'success': False, 'error': str(e)}) 

158 

159 def monitor_infringement( 

160 architecture_fingerprint: Annotated[str, "Description of the architecture to protect"], 

161 scan_targets: Annotated[str, "Comma-separated scan targets: github.com,arxiv.org,huggingface.co"] = 'github.com,arxiv.org', 

162 ) -> str: 

163 """Scan for potential infringement of the hive architecture patents. 

164 Uses HiveMind for distributed scanning across connected agents. 

165 Returns matches with risk levels.""" 

166 try: 

167 from integrations.agent_engine.world_model_bridge import get_world_model_bridge 

168 bridge = get_world_model_bridge() 

169 

170 targets = [t.strip() for t in scan_targets.split(',')] 

171 collective = bridge.query_hivemind( 

172 f"Scan for similar architectures to: {architecture_fingerprint[:500]}. " 

173 f"Focus on: {', '.join(targets)}. " 

174 f"Look for: distributed hive compute using user machines, " 

175 f"self-improving agent loops, RALT-like skill propagation, " 

176 f"recipe-based agent execution patterns.", 

177 timeout_ms=10000, 

178 ) 

179 

180 result = { 

181 'architecture_fingerprint': architecture_fingerprint[:200], 

182 'scan_targets': targets, 

183 'hivemind_analysis': collective, 

184 'note': 'Automated scan — results require human review before ' 

185 'legal action.', 

186 } 

187 return json.dumps(result) 

188 except Exception as e: 

189 return json.dumps({'success': False, 'error': str(e)}) 

190 

191 def generate_cease_desist( 

192 infringer_name: Annotated[str, "Name of the infringing party"], 

193 infringer_url: Annotated[str, "URL of the infringing product/service"], 

194 patent_id: Annotated[str, "Patent ID being infringed"], 

195 evidence: Annotated[str, "Description of infringement evidence"], 

196 ) -> str: 

197 """Generate a cease-and-desist notice and record the infringement. 

198 Creates an IPInfringement record linked to the patent.""" 

199 try: 

200 from integrations.social.models import get_db 

201 from .ip_service import IPService 

202 db = get_db() 

203 try: 

204 # Record infringement 

205 infringement = IPService.create_infringement( 

206 db, 

207 patent_id=patent_id, 

208 infringer_name=infringer_name, 

209 infringer_url=infringer_url, 

210 evidence_summary=evidence, 

211 risk_level='high', 

212 ) 

213 

214 # Get patent details for the notice 

215 patent = IPService.get_patent(db, patent_id) 

216 patent_title = patent['title'] if patent else 'Unknown' 

217 

218 notice_text = ( 

219 f"CEASE AND DESIST NOTICE\n\n" 

220 f"To: {infringer_name}\n" 

221 f"Re: Infringement of Patent Application: {patent_title}\n\n" 

222 f"We have identified that your product/service at {infringer_url} " 

223 f"infringes upon our patent claims covering:\n" 

224 f"- Distributed hive compute using user machines\n" 

225 f"- Self-improving agent loop architecture\n" 

226 f"- Reality-Anchored Latent Transfer (RALT) skill propagation\n\n" 

227 f"Evidence: {evidence}\n\n" 

228 f"We demand that you immediately cease and desist from all " 

229 f"activities that infringe upon our intellectual property rights.\n\n" 

230 f"[DRAFT — Requires legal review before sending]\n" 

231 ) 

232 

233 # Update with notice 

234 IPService.update_infringement_status( 

235 db, infringement['id'], 'reviewed', 

236 notice_type='cease_desist', 

237 notice_text=notice_text, 

238 ) 

239 

240 db.commit() 

241 return json.dumps({ 

242 'success': True, 

243 'infringement': infringement, 

244 'notice_text': notice_text, 

245 'warning': 'DRAFT notice — requires legal counsel review ' 

246 'before sending.', 

247 }) 

248 finally: 

249 db.close() 

250 except Exception as e: 

251 return json.dumps({'success': False, 'error': str(e)}) 

252 

253 def get_loop_health() -> str: 

254 """Get real-time dashboard of the self-improving loop: 

255 world model stats, agent performance, RALT propagation, 

256 recipe adoption, HiveMind agents, and detected flywheel loopholes.""" 

257 try: 

258 from .ip_service import IPService 

259 health = IPService.get_loop_health() 

260 return json.dumps(health) 

261 except Exception as e: 

262 return json.dumps({'success': False, 'error': str(e)}) 

263 

264 def measure_moat() -> str: 

265 """Measure technical irreproducibility — how far ahead of a code clone. 

266 The moat is accumulated latent state, not code: 

267 - Latent dynamics trained on real interactions (uncopyable) 

268 - HiveMind N² network effect (more nodes = exponentially more knowledge) 

269 - MetaRouter REINFORCE policy (trained on real decisions) 

270 - Kernel support vectors (real expert corrections) 

271 - Episodic memory (years of compressed experiences) 

272 - Master key chain (cryptographic, non-forkable) 

273 Returns moat score and competitor catch-up estimate.""" 

274 try: 

275 from .ip_service import IPService 

276 return json.dumps(IPService.measure_moat_depth()) 

277 except Exception as e: 

278 return json.dumps({'success': False, 'error': str(e)}) 

279 

280 # Register all IP protection tools 

281 tools = [ 

282 ('verify_self_improvement_loop', 

283 'Verify the self-improving loop is working with evidence and loophole detection', 

284 verify_self_improvement_loop), 

285 ('draft_patent_claims', 

286 'Draft formal patent claims in USPTO format for the hive architecture', 

287 draft_patent_claims), 

288 ('draft_provisional_patent', 

289 'Build complete USPTO provisional patent from existing draft', 

290 draft_provisional_patent), 

291 ('check_prior_art', 

292 'Search for prior art using HiveMind collective intelligence', 

293 check_prior_art), 

294 ('monitor_infringement', 

295 'Scan for infringement of hive architecture patents', 

296 monitor_infringement), 

297 ('generate_cease_desist', 

298 'Generate cease-and-desist notice for detected infringement', 

299 generate_cease_desist), 

300 ('get_loop_health', 

301 'Get real-time self-improving loop health dashboard with loopholes', 

302 get_loop_health), 

303 ('measure_moat', 

304 'Measure technical irreproducibility — latent dynamics moat depth vs code clone', 

305 measure_moat), 

306 ] 

307 

308 # ─── Defensive IP tools ─── 

309 

310 def create_defensive_pub( 

311 title: Annotated[str, "Title of the defensive publication"], 

312 content: Annotated[str, "Full content to publish (hashed for proof)"], 

313 abstract: Annotated[str, "Brief abstract"] = '', 

314 ) -> str: 

315 """Create a timestamped defensive publication — prior art proof, not a patent.""" 

316 from integrations.social.models import get_db 

317 from .ip_service import IPService 

318 import subprocess 

319 

320 db = get_db() 

321 try: 

322 git_commit = None 

323 try: 

324 _kw = dict(capture_output=True, text=True, timeout=5) 

325 if hasattr(subprocess, 'CREATE_NO_WINDOW'): 

326 _kw['creationflags'] = subprocess.CREATE_NO_WINDOW 

327 result = subprocess.run( 

328 ['git', 'rev-parse', 'HEAD'], **_kw) 

329 if result.returncode == 0: 

330 git_commit = result.stdout.strip() 

331 except Exception: 

332 pass 

333 

334 pub = IPService.create_defensive_publication( 

335 db, title=title, content=content, 

336 abstract=abstract, git_commit=git_commit, 

337 created_by=user_id) 

338 db.commit() 

339 return json.dumps({'created': True, 'publication': pub}) 

340 except Exception as e: 

341 db.rollback() 

342 return json.dumps({'error': str(e)}) 

343 finally: 

344 db.close() 

345 

346 def get_provenance_record() -> str: 

347 """Get the full provenance chain — all defensive publications, patents, and evidence.""" 

348 from integrations.social.models import get_db 

349 from .ip_service import IPService 

350 

351 db = get_db() 

352 try: 

353 record = IPService.get_provenance_record(db) 

354 return json.dumps(record, default=str) 

355 finally: 

356 db.close() 

357 

358 tools.extend([ 

359 ('create_defensive_pub', 

360 'Create a timestamped defensive publication — prior art proof for legal defence', 

361 create_defensive_pub), 

362 ('get_provenance_record', 

363 'Get the full provenance chain: all publications, patents, moat, and evidence', 

364 get_provenance_record), 

365 ]) 

366 

367 for name, desc, func in tools: 

368 helper.register_for_llm(name=name, description=desc)(func) 

369 assistant.register_for_execution(name=name)(func) 

370 

371 logger.info(f"Registered {len(tools)} IP protection tools for user {user_id}") 

372 

373 # Register skills for A2A discovery 

374 try: 

375 from integrations.internal_comm.internal_agent_communication import register_agent_with_skills 

376 register_agent_with_skills(f"ip_protection_{user_id}", [ 

377 {'name': 'patent_filing', 'description': 'Draft and file patent applications', 'proficiency': 0.9}, 

378 {'name': 'prior_art_search', 'description': 'Search for prior art and assess novelty', 'proficiency': 0.8}, 

379 {'name': 'infringement_detection', 'description': 'Monitor for IP infringement', 'proficiency': 0.8}, 

380 {'name': 'loop_verification', 'description': 'Verify self-improving loop health', 'proficiency': 0.95}, 

381 ]) 

382 except Exception as e: 

383 logger.debug(f"IP protection skill registration skipped: {e}")