Coverage for core / agent_tools.py: 31.0%

542 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2core/agent_tools.py — Canonical AutoGen tool definitions (single source of truth). 

3 

4Every core tool is defined ONCE here. Both create_recipe.py and reuse_recipe.py 

5call build_core_tool_closures() + register_core_tools() instead of duplicating 

6function bodies and registration lines. 

7 

8Pattern mirrors: 

9 - integrations/service_tools/media_agent.py → register_media_tools() 

10 - integrations/channels/memory/agent_memory_tools.py → register_autogen_tools() 

11 - integrations/agent_engine/marketing_tools.py → register_marketing_tools() 

12""" 

13import json 

14import logging 

15import os 

16import threading 

17import time 

18import uuid 

19from datetime import datetime 

20from typing import Annotated, Any, List, Optional, Tuple 

21 

22import requests 

23from json_repair import repair_json 

24 

25from core.http_pool import pooled_get, pooled_post 

26from integrations.service_tools.model_catalog import ModelType 

27 

28tool_logger = logging.getLogger('tool_execution') 

29 

30 

31# --------------------------------------------------------------------------- 

32# Generic registration helper 

33# --------------------------------------------------------------------------- 

34 

35def register_dual(helper, executor, func, name: str, description: str): 

36 """Register a single tool on both the LLM-calling and executing agents. 

37 

38 AutoGen's tool pattern pairs ``helper.register_for_llm`` with 

39 ``executor.register_for_execution`` — create_recipe.py and 

40 reuse_recipe.py repeat this pair 40+ times inline. Call sites 

41 that define a closure and register it immediately use this 

42 helper; batches that already have a ``(name, desc, func)`` list 

43 should use :func:`register_core_tools` instead. 

44 

45 Returns ``func`` unchanged so the call can be inlined after a 

46 closure definition without shadowing the name. 

47 """ 

48 helper.register_for_llm(name=name, description=description)(func) 

49 executor.register_for_execution(name=name)(func) 

50 return func 

51 

52 

53def register_core_tools(tools, helper, executor): 

54 """Register (name, desc, func) tuples on an AutoGen helper/executor pair. 

55 

56 Args: 

57 tools: list of (name, description, func) tuples from build_core_tool_closures() 

58 helper: AutoGen agent that suggests tool use (register_for_llm) 

59 executor: AutoGen agent that executes tools (register_for_execution) 

60 """ 

61 for name, desc, func in tools: 

62 register_dual(helper, executor, func, name, desc) 

63 

64 

65# --------------------------------------------------------------------------- 

66# Core tool closure factory 

67# --------------------------------------------------------------------------- 

68 

69def build_core_tool_closures(ctx): 

70 """Build session-scoped tool closures. Returns list of (name, desc, func). 

71 

72 Args: 

73 ctx: dict with session variables: 

74 user_id, prompt_id, agent_data, helper_fun, user_prompt, 

75 request_id_list, recent_file_id, scheduler, 

76 simplemem_store (optional), memory_graph (optional), 

77 log_tool_execution (decorator), send_message_to_user1 (func), 

78 retrieve_json (func), strip_json_values (func), 

79 save_conversation_db (func) 

80 """ 

81 # Unpack context ------------------------------------------------------- 

82 user_id = ctx['user_id'] 

83 prompt_id = ctx['prompt_id'] 

84 agent_data = ctx['agent_data'] 

85 helper_fun = ctx['helper_fun'] 

86 user_prompt = ctx['user_prompt'] 

87 request_id_list = ctx['request_id_list'] 

88 recent_file_id = ctx['recent_file_id'] 

89 scheduler = ctx['scheduler'] 

90 simplemem_store = ctx.get('simplemem_store') 

91 memory_graph = ctx.get('memory_graph') 

92 # log_tool_execution: optional decorator (create_recipe.py has it, reuse_recipe.py may not) 

93 log_tool_execution = ctx.get('log_tool_execution') or (lambda f: f) 

94 send_message_to_user1 = ctx['send_message_to_user1'] 

95 _retrieve_json = ctx['retrieve_json'] 

96 _strip_json_values = ctx['strip_json_values'] 

97 save_conversation_db = ctx['save_conversation_db'] 

98 

99 tools: List[Tuple[str, str, Any]] = [] 

100 

101 # ------------------------------------------------------------------ 

102 # 1. text_2_image 

103 # ------------------------------------------------------------------ 

104 @log_tool_execution 

105 def text_2_image(text: Annotated[str, "Text to create image"]) -> str: 

106 return helper_fun.txt2img(text) 

107 

108 tools.append(( 

109 "text_2_image", 

110 "Text to image Creator", 

111 text_2_image, 

112 )) 

113 # Alias — reuse_recipe.py main flow LLM prompts advertise `txt2img` (#510). 

114 # Same canonical closure (helper_fun.txt2img — local-first, sovereignty). 

115 tools.append(( 

116 "txt2img", 

117 "Text to image Creator (alias of text_2_image)", 

118 text_2_image, 

119 )) 

120 

121 # ------------------------------------------------------------------ 

122 # 2. get_user_camera_inp 

123 # ------------------------------------------------------------------ 

124 @log_tool_execution 

125 def get_user_camera_inp( 

126 inp: Annotated[str, "The Question to check from visual context"], 

127 ) -> str: 

128 return helper_fun.get_user_camera_inp(inp, int(user_id), request_id_list[user_prompt]) 

129 

130 tools.append(( 

131 "get_user_camera_inp", 

132 "Get user's visual information to process somethings", 

133 get_user_camera_inp, 

134 )) 

135 

136 # ------------------------------------------------------------------ 

137 # 3. save_data_in_memory 

138 # ------------------------------------------------------------------ 

139 @log_tool_execution 

140 def save_data_in_memory( 

141 key: Annotated[str, "Key path for storing data now & retrieving data later. Use dot notation for nested keys (e.g., 'user.info.name')."], 

142 value: Annotated[Optional[Any], "Value you want to store; strictly should be one of int, float, bool, json array or json object."] = None, 

143 ) -> str: 

144 """Store data with validation to prevent corruption.""" 

145 tool_logger.info('INSIDE save_data_in_memory') 

146 try: 

147 if isinstance(value, str) and (value.startswith('{') or value.startswith('[')): 

148 value = _retrieve_json(value) 

149 tool_logger.info(f"REPAIRED JSON STRING: {value}") 

150 if value is not None: 

151 json_str = json.dumps(value) 

152 validated_value = json.loads(json_str) 

153 tool_logger.info(f"VALIDATED VALUE (post JSON cycle): {validated_value}") 

154 else: 

155 validated_value = None 

156 

157 keys = key.split('.') 

158 d = agent_data.setdefault(prompt_id, {}) 

159 for k in keys[:-1]: 

160 d = d.setdefault(k, {}) 

161 d[keys[-1]] = validated_value 

162 tool_logger.info(f"VALUES STORED IN AGENT DATA: {validated_value}") 

163 tool_logger.info(f"FULL AGENT DATA AT KEY: {d}") 

164 

165 if helper_fun.save_agent_data_to_file(prompt_id, agent_data): 

166 tool_logger.info(f"[OK] Data persisted to file for prompt_id {prompt_id}") 

167 else: 

168 tool_logger.warning(f"Failed to persist data to file for prompt_id {prompt_id}") 

169 

170 try: 

171 stored_value = get_data_by_key(key) 

172 tool_logger.info(f"VERIFICATION - READ BACK VALUE: {stored_value}") 

173 if stored_value == "Key not found in stored data.": 

174 tool_logger.error(f"VERIFICATION FAILED: Data not properly stored at key {key}") 

175 except Exception as e: 

176 tool_logger.error(f"VERIFICATION ERROR: {str(e)}") 

177 

178 return f'{agent_data[prompt_id]}' 

179 except json.JSONDecodeError as je: 

180 error_msg = f"Invalid JSON structure in value: {str(je)}" 

181 tool_logger.error(error_msg) 

182 return f"Error: {error_msg} - Data not saved" 

183 except TypeError as te: 

184 error_msg = f"Type error in value: {str(te)}" 

185 tool_logger.error(error_msg) 

186 return f"Error: {error_msg} - Data not saved" 

187 except Exception as e: 

188 error_msg = f"Unexpected error saving data: {str(e)}" 

189 tool_logger.error(error_msg) 

190 return f"Error: {error_msg} - Data not saved" 

191 

192 tools.append(( 

193 "save_data_in_memory", 

194 "Use this to Store and retrieve data using key-value storage system", 

195 save_data_in_memory, 

196 )) 

197 

198 # ------------------------------------------------------------------ 

199 # 4. get_saved_metadata 

200 # ------------------------------------------------------------------ 

201 @log_tool_execution 

202 def get_saved_metadata() -> str: 

203 """Get metadata with automatic loading from persistent storage.""" 

204 if prompt_id not in agent_data or not agent_data[prompt_id]: 

205 tool_logger.info(f"Loading agent data from file for get_saved_metadata, prompt_id {prompt_id}") 

206 helper_fun.load_agent_data_from_file(prompt_id, agent_data) 

207 stripped_json = _strip_json_values(agent_data[prompt_id]) 

208 return f'{stripped_json}' 

209 

210 tools.append(( 

211 "get_saved_metadata", 

212 "Returns the schema of the json from internal memory with all keys but without actual values.", 

213 get_saved_metadata, 

214 )) 

215 

216 # ------------------------------------------------------------------ 

217 # 5. get_data_by_key 

218 # ------------------------------------------------------------------ 

219 @log_tool_execution 

220 def get_data_by_key( 

221 key: Annotated[str, "Key path for retrieving data. Use dot notation for nested keys (e.g., 'user.info.name')."], 

222 ) -> str: 

223 if prompt_id not in agent_data or not agent_data[prompt_id]: 

224 tool_logger.info(f"Loading agent data from file for prompt_id {prompt_id}") 

225 helper_fun.load_agent_data_from_file(prompt_id, agent_data) 

226 keys = key.split('.') 

227 d = agent_data.get(prompt_id, {}) 

228 try: 

229 for k in keys: 

230 d = d[k] 

231 return f'{d}' 

232 except KeyError: 

233 return "Key not found in stored data." 

234 

235 tools.append(( 

236 "get_data_by_key", 

237 "Returns all data from the internal Memory using key", 

238 get_data_by_key, 

239 )) 

240 # Alias — Helper system prompts in reuse_recipe.py advertise this name (#510). 

241 # Same closure → identical behavior under both names. Never remove a 

242 # registered tool: phantom tool fixed by adding a real registration. 

243 tools.append(( 

244 "get_data_from_memory", 

245 "Returns all data from the internal Memory using key (alias of get_data_by_key)", 

246 get_data_by_key, 

247 )) 

248 

249 # ------------------------------------------------------------------ 

250 # 6. get_user_id 

251 # ------------------------------------------------------------------ 

252 @log_tool_execution 

253 def get_user_id() -> str: 

254 tool_logger.info('INSIDE get_user_id') 

255 return f'{user_id}' 

256 

257 tools.append(( 

258 "get_user_id", 

259 "Returns the unique identifier (user_id) of the current user.", 

260 get_user_id, 

261 )) 

262 

263 # ------------------------------------------------------------------ 

264 # 7. get_prompt_id 

265 # ------------------------------------------------------------------ 

266 @log_tool_execution 

267 def get_prompt_id() -> str: 

268 tool_logger.info('INSIDE get_prompt_id') 

269 return f'{prompt_id}' 

270 

271 tools.append(( 

272 "get_prompt_id", 

273 "Returns the unique identifier (prompt_id) associated with the current prompt or conversation.", 

274 get_prompt_id, 

275 )) 

276 

277 # ------------------------------------------------------------------ 

278 # 8. Generate_video (canonical — full LTX-2 + avatar) 

279 # ------------------------------------------------------------------ 

280 @log_tool_execution 

281 def Generate_video( 

282 text: Annotated[str, "Text to be used for video generation"], 

283 avatar_id: Annotated[int, "Unique identifier for the avatar (use 0 for LTX-2 text-to-video)"], 

284 realtime: Annotated[bool, "If True, response is fast but less realistic by default it should be true; if False, response is realistic but slower"], 

285 model: Annotated[str, "Video model to use: 'avatar' for avatar-based video, 'ltx2' for LTX-2 text-to-video generation"] = "avatar", 

286 ) -> str: 

287 tool_logger.info(f'INSIDE Generate_video with model={model}') 

288 

289 # LTX-2 Text-to-Video Generation 

290 if model.lower() == "ltx2": 

291 tool_logger.info(f'Using LTX-2 for video generation: {text[:50]}...') 

292 LOCAL_COMFYUI_URL = "http://localhost:8188" 

293 LOCAL_LTX_URL = "http://localhost:5002" 

294 headers = {'Content-Type': 'application/json'} 

295 ltx_payload = { 

296 "prompt": text, 

297 "negative_prompt": "worst quality, inconsistent motion, blurry, jittery, distorted", 

298 "num_frames": 97, 

299 "width": 832, 

300 "height": 480, 

301 "num_inference_steps": 30 if realtime else 50, 

302 "guidance_scale": 3.0, 

303 "fps": 24, 

304 } 

305 

306 # Fast health probe — skip dead servers instantly (0ms vs 10s timeout) 

307 def _is_server_up(url, name): 

308 try: 

309 r = pooled_get(f"{url}/health", timeout=1.5) 

310 return r.status_code < 500 

311 except Exception: 

312 tool_logger.info(f"{name} not reachable — skipping instantly") 

313 return False 

314 

315 # Try local LTX-2 server first — only if alive 

316 if _is_server_up(LOCAL_LTX_URL, "LTX-2"): 

317 try: 

318 tool_logger.info(f"LTX-2 server is UP, generating...") 

319 response = pooled_post(f"{LOCAL_LTX_URL}/generate", json=ltx_payload, headers=headers, timeout=600) 

320 if response.status_code == 200: 

321 result = response.json() 

322 video_url = result.get('video_url') or result.get('output_url') or result.get('video_path') 

323 if video_url: 

324 tool_logger.info(f"LTX-2 video generated: {video_url}") 

325 return f"LTX-2 Video generated successfully. URL: {video_url}" 

326 except requests.exceptions.RequestException as e: 

327 tool_logger.info(f"LTX-2 generation failed: {e}") 

328 

329 # Try ComfyUI — only if alive 

330 if _is_server_up(LOCAL_COMFYUI_URL, "ComfyUI"): 

331 try: 

332 tool_logger.info(f"ComfyUI is UP, submitting workflow...") 

333 comfyui_workflow = { 

334 "prompt": { 

335 "1": {"class_type": "LTXVLoader", "inputs": {"ckpt_name": "ltx-video-2b-v0.9.safetensors"}}, 

336 "2": {"class_type": "LTXVConditioning", "inputs": {"positive": text, "negative": ltx_payload["negative_prompt"], "ltxv_model": ["1", 0]}}, 

337 "3": {"class_type": "LTXVSampler", "inputs": {"seed": int(time.time()) % 2147483647, "steps": ltx_payload["num_inference_steps"], "cfg": ltx_payload["guidance_scale"], "width": ltx_payload["width"], "height": ltx_payload["height"], "num_frames": ltx_payload["num_frames"], "ltxv_model": ["1", 0], "conditioning": ["2", 0]}}, 

338 "4": {"class_type": "LTXVDecode", "inputs": {"ltxv_model": ["1", 0], "samples": ["3", 0]}}, 

339 "5": {"class_type": "VHS_VideoCombine", "inputs": {"frame_rate": ltx_payload["fps"], "filename_prefix": "ltx2_output", "format": "video/h264-mp4", "images": ["4", 0]}}, 

340 } 

341 } 

342 response = pooled_post(f"{LOCAL_COMFYUI_URL}/prompt", json=comfyui_workflow, headers=headers, timeout=10) 

343 if response.status_code == 200: 

344 comfy_prompt_id = response.json().get('prompt_id') 

345 tool_logger.info(f"ComfyUI LTX-2 job queued: {comfy_prompt_id}") 

346 for _ in range(120): 

347 time.sleep(5) 

348 history_response = pooled_get(f"{LOCAL_COMFYUI_URL}/history/{comfy_prompt_id}") 

349 if history_response.status_code == 200: 

350 history = history_response.json() 

351 if comfy_prompt_id in history: 

352 outputs = history[comfy_prompt_id].get('outputs', {}) 

353 for node_id, output in outputs.items(): 

354 for media_key in ('gifs', 'videos'): 

355 if media_key in output: 

356 filename = output[media_key][0].get('filename') 

357 if filename: 

358 video_url = f"{LOCAL_COMFYUI_URL}/view?filename={filename}" 

359 return f"LTX-2 Video generated via ComfyUI. URL: {video_url}" 

360 return f"LTX-2 Video generation queued in ComfyUI (prompt_id: {comfy_prompt_id}). Check ComfyUI interface for output." 

361 except requests.exceptions.RequestException as e: 

362 tool_logger.info(f"ComfyUI generation failed: {e}") 

363 

364 # Local servers unavailable — try hive mesh peer with GPU 

365 try: 

366 from integrations.agent_engine.compute_config import get_compute_policy 

367 policy = get_compute_policy() 

368 if policy.get('compute_policy') != 'local_only': 

369 from integrations.agent_engine.compute_mesh_service import get_compute_mesh 

370 mesh = get_compute_mesh() 

371 result = mesh.offload_to_best_peer( 

372 model_type=ModelType.VIDEO_GEN, 

373 prompt=text, 

374 options={'model': 'ltx2', 'timeout': 300}, 

375 ) 

376 if result and 'error' not in result: 

377 video_url = result.get('response', result.get('video_url', '')) 

378 peer = result.get('offloaded_to', 'hive_peer') 

379 tool_logger.info(f"LTX-2 video generated via hive peer {peer}: {video_url}") 

380 return f"LTX-2 Video generated via hive peer. URL: {video_url}" 

381 tool_logger.info(f"Hive mesh video offload failed: {result.get('error')}") 

382 except Exception as e: 

383 tool_logger.info(f"Hive mesh offload not available: {e}") 

384 

385 return ("LTX-2 video generation failed. No local GPU, no hive peers with GPU. " 

386 "Options: (1) Pair a GPU device: hart compute pair <address>, " 

387 "(2) Set HEVOLVE_COMPUTE_POLICY=any, " 

388 "(3) Install local LTX-2 server with CUDA GPU") 

389 

390 # Default: Avatar-based video generation 

391 from core.config_cache import get_db_url 

392 database_url = get_db_url() or 'https://mailer.hertzai.com' 

393 request_id = str(uuid.uuid4()).replace("-", "")[:11] 

394 tool_logger.info(f"avtar_id: {avatar_id}:\n{text[:10]}....\n") 

395 

396 headers = {'Content-Type': 'application/json'} 

397 data = { 

398 "text": str(text), 

399 'flag_hallo': 'false', 

400 'chattts': False, 

401 'openvoice': "false", 

402 } 

403 

404 try: 

405 res = pooled_get(f"{database_url}/get_image_by_id/{avatar_id}") 

406 res = res.json() 

407 new_image_url = res["image_url"] 

408 voice_id = res.get('voice_id') 

409 except Exception: 

410 data['openvoice'] = "true" 

411 new_image_url = None 

412 voice_id = None 

413 

414 data["cartoon_image"] = "True" 

415 data["bg_url"] = 'http://stream.mcgroce.com/txt/examples_cartoon/roy_bg.jpg' 

416 data['vtoonify'] = "false" 

417 data["image_url"] = new_image_url 

418 data['im_crop'] = "false" 

419 data['remove_bg'] = "false" 

420 data['hd_video'] = "false" 

421 data['uid'] = str(request_id) 

422 data['gradient'] = "true" 

423 data['cus_bg'] = "false" 

424 data['solid_color'] = "false" 

425 data['inpainting'] = "false" 

426 data['prompt'] = "" 

427 data['gender'] = 'male' 

428 

429 timeout = 60 

430 if not realtime: 

431 timeout = 600 

432 data['chattts'] = True 

433 data['flag_hallo'] = "true" 

434 data["cartoon_image"] = "False" 

435 

436 if voice_id is not None: 

437 try: 

438 voice_sample = pooled_get(f"{database_url}/get_voice_sample_id/{voice_id}") 

439 voice_sample = voice_sample.json() 

440 data["audio_sample_url"] = voice_sample.get("voice_sample_url") 

441 data['voice_id'] = int(voice_id) if voice_id else None 

442 except Exception: 

443 data["audio_sample_url"] = None 

444 data['voice_id'] = None 

445 else: 

446 data["audio_sample_url"] = None 

447 data['voice_id'] = None 

448 

449 conv_id = save_conversation_db(text, user_id, prompt_id, database_url, request_id) 

450 data['conv_id'] = int(conv_id) 

451 data['avatar_id'] = int(avatar_id) 

452 data['timeout'] = int(timeout) 

453 

454 try: 

455 pooled_post(f"{database_url}/video_generate_save", 

456 data=json.dumps(data), headers=headers, timeout=1) 

457 except Exception: 

458 pass 

459 

460 if data['chattts'] or data['flag_hallo'] == "true": 

461 return f"Video Generation task added to queue with conv_id:{conv_id}. Ask the helper to save this conv_id in the same collection from which the story used to generate the video was retrieved, for future reference" 

462 else: 

463 return f"Video Generation completed with conv_id:{conv_id}. Ask the helper to save this conv_id in the same collection from which the story used to generate the video was retrieved, for future reference" 

464 

465 tools.append(( 

466 "Generate_video", 

467 "Generate video with text. Use model='ltx2' for AI text-to-video generation, or model='avatar' (default) for avatar-based video with voice synthesis.", 

468 Generate_video, 

469 )) 

470 

471 # ------------------------------------------------------------------ 

472 # 9. get_user_uploaded_file 

473 # ------------------------------------------------------------------ 

474 @log_tool_execution 

475 def get_user_uploaded_file() -> str: 

476 tool_logger.info('INSIDE get_user_uploaded_file') 

477 if recent_file_id[user_id]: 

478 return f'Got user uploaded file the file_id is {recent_file_id[user_id]}' 

479 return 'No file uploaded from user' 

480 

481 tools.append(( 

482 "get_user_uploaded_file", 

483 "get user's recent uploaded files", 

484 get_user_uploaded_file, 

485 )) 

486 

487 # ------------------------------------------------------------------ 

488 # 10. get_text_from_image (img2txt) 

489 # ------------------------------------------------------------------ 

490 @log_tool_execution 

491 def img2txt( 

492 image_url: Annotated[str, "image url of which you want text"], 

493 text: Annotated[str, "the details you want from image"] = 'Describe the Images & Text data in this image in detail', 

494 ) -> str: 

495 tool_logger.info('INSIDE img2txt') 

496 # SSRF protection: validate image URL before fetching 

497 try: 

498 from security.sanitize import validate_url 

499 image_url = validate_url(image_url) 

500 except (ImportError, ValueError) as e: 

501 tool_logger.warning(f"Image URL blocked by SSRF filter: {image_url} — {e}") 

502 return f"Error: URL blocked by security filter: {e}" 

503 # Try local Qwen Vision first (bundled mode), fall back to cloud 

504 from core.config_cache import get_vision_api, is_bundled 

505 url = get_vision_api() 

506 if not url: 

507 tool_logger.warning("No LLAVA_API configured — vision inference may fail on no-GPU instances") 

508 url = "http://azurekong.hertzai.com:8000/llava/image_inference" 

509 

510 if is_bundled(): 

511 # Local: use Qwen Vision via upload/vision endpoint 

512 payload = json.dumps({'image_url': image_url, 'prompt': text}) 

513 response = requests.post(url, data=payload, 

514 headers={'Content-Type': 'application/json'}, timeout=60) 

515 else: 

516 payload = {'url': image_url, 'prompt': text} 

517 response = requests.request("POST", url, headers={}, data=payload, files=[], timeout=300) 

518 if response.status_code == 200: 

519 return response.text 

520 else: 

521 return 'Not able to get this page details try later' 

522 

523 tools.append(( 

524 "get_text_from_image", 

525 "Image to Text/Question Answering from image", 

526 img2txt, 

527 )) 

528 # Alias — reuse_recipe.py main flow LLM prompts advertise `img2txt` (#510). 

529 # Same canonical closure (SSRF-validated, local Qwen Vision + cloud LLaVA fallback). 

530 tools.append(( 

531 "img2txt", 

532 "Image to Text/Question Answering from image (alias of get_text_from_image)", 

533 img2txt, 

534 )) 

535 

536 # ------------------------------------------------------------------ 

537 # 11. create_scheduled_jobs 

538 # ------------------------------------------------------------------ 

539 @log_tool_execution 

540 def create_scheduled_jobs( 

541 interval_sec: Annotated[int, "time between two Interval in seconds."], 

542 job_description: Annotated[str, "Description of the job to be performed"], 

543 cron_expression: Annotated[Optional[str], "Cron expression for scheduling. Example: '0 9 * * 1-5' (Runs at 9:00 AM, Monday to Friday). If the interval is greater than 60 seconds or it needs to be executed at a dynamic cron time this argument is Mandatory else None"] = None, 

544 ) -> str: 

545 tool_logger.info('INSIDE create_scheduled_jobs') 

546 return 'Added this schedule job in creation process will do it at the end. you can go ahead and mark this action as completed.' 

547 

548 tools.append(( 

549 "create_scheduled_jobs", 

550 "Creates time-based jobs using APScheduler to schedule jobs", 

551 create_scheduled_jobs, 

552 )) 

553 

554 # ------------------------------------------------------------------ 

555 # 12. send_message_to_user 

556 # ------------------------------------------------------------------ 

557 @log_tool_execution 

558 def send_message_to_user( 

559 text: Annotated[str, "Text you want to send to the user"], 

560 avatar_id: Annotated[Optional[str], "Unique identifier for the avatar"] = None, 

561 response_type: Annotated[Optional[str], "Response mode: 'Realistic' (slower, better quality) or 'Realtime' (faster, lower quality)"] = 'Realtime', 

562 ) -> str: 

563 tool_logger.info('INSIDE send_message_to_user') 

564 tool_logger.info(f'SENDING DATA 2 user with values text:{text}, avatar_id:{avatar_id}, response_type:{response_type}') 

565 thread = threading.Thread(target=send_message_to_user1, args=(user_id, text, '', prompt_id)) 

566 thread.start() 

567 return f'Message sent successfully to user with request_id: {request_id_list[user_prompt]}-intermediate' 

568 

569 tools.append(( 

570 "send_message_to_user", 

571 "Sends a message/information to user. You can use this if you want to ask a question", 

572 send_message_to_user, 

573 )) 

574 

575 # ------------------------------------------------------------------ 

576 # 13. send_presynthesized_video_to_user 

577 # ------------------------------------------------------------------ 

578 @log_tool_execution 

579 def send_presynthesized_video_to_user( 

580 conv_id: Annotated[str, "Conversation ID associated with the text from memory"], 

581 ) -> str: 

582 tool_logger.info('INSIDE send_presynthesized_video_to_user') 

583 tool_logger.info(f'SENDING DATA 2 user with value: conv_id:{conv_id}.') 

584 return 'Message sent successfully to user' 

585 

586 tools.append(( 

587 "send_presynthesized_video_to_user", 

588 "Sends a presynthesized message/video/dialogue to user using conv_id.", 

589 send_presynthesized_video_to_user, 

590 )) 

591 

592 # ------------------------------------------------------------------ 

593 # 14. send_message_in_seconds 

594 # ------------------------------------------------------------------ 

595 @log_tool_execution 

596 def send_message_in_seconds( 

597 text: Annotated[str, "text to send to user"], 

598 delay: Annotated[int, "time to wait in seconds before sending text"], 

599 conv_id: Annotated[Optional[int], "conv_id for this text if not available make it None"] = None, 

600 ) -> str: 

601 tool_logger.info('INSIDE send_message_in_seconds') 

602 tool_logger.info(f'with text:{text}. and waiting time: {delay} conv_id: {conv_id}') 

603 run_time = datetime.fromtimestamp(time.time() + delay) 

604 scheduler.add_job(send_message_to_user1, 'date', run_date=run_time, args=[user_id, text, '', prompt_id]) 

605 return 'Message scheduled successfully' 

606 

607 tools.append(( 

608 "send_message_in_seconds", 

609 "Sends a presynthesized message/video/dialogue to user using conv_id with a timer.", 

610 send_message_in_seconds, 

611 )) 

612 

613 # ------------------------------------------------------------------ 

614 # 15. get_chat_history 

615 # ------------------------------------------------------------------ 

616 @log_tool_execution 

617 def get_chat_history( 

618 text: Annotated[str, "Text related to which you want history"], 

619 start: Annotated[Optional[str], "start date in format %Y-%m-%dT%H:%M:%S.%fZ"] = None, 

620 end: Annotated[Optional[str], "end date in format %Y-%m-%dT%H:%M:%S.%fZ"] = None, 

621 ) -> str: 

622 tool_logger.info('INSIDE get_chat_history') 

623 return helper_fun.get_time_based_history(text, f'user_{user_id}', start, end) 

624 

625 tools.append(( 

626 "get_chat_history", 

627 "Get Chat history based on text & start & end date", 

628 get_chat_history, 

629 )) 

630 

631 # ------------------------------------------------------------------ 

632 # 16. search_visual_history 

633 # ------------------------------------------------------------------ 

634 @log_tool_execution 

635 def search_visual_history( 

636 query: Annotated[str, "What to search for in visual/screen descriptions"], 

637 minutes_back: Annotated[int, "How many minutes back to search (default 30)"] = 30, 

638 channel: Annotated[str, "Which feed: 'camera', 'screen', or 'both' (default)"] = "both", 

639 ) -> str: 

640 """Search past camera/screen descriptions. Use for questions about what happened earlier visually.""" 

641 results = helper_fun.search_visual_history(user_id, query, mins=minutes_back, channel=channel) 

642 if results: 

643 return '\n'.join(results) 

644 return "No matching visual/screen descriptions found in the given time range." 

645 

646 tools.append(( 

647 "search_visual_history", 

648 "Search past camera and screen descriptions by keyword and time range.", 

649 search_visual_history, 

650 )) 

651 

652 # ------------------------------------------------------------------ 

653 # 17. google_search 

654 # ------------------------------------------------------------------ 

655 @log_tool_execution 

656 def google_search( 

657 text: Annotated[str, "Text/Query which you want to search"], 

658 ) -> str: 

659 tool_logger.info('INSIDE google search') 

660 return helper_fun.top5_results(text) 

661 

662 tools.append(( 

663 "google_search", 

664 "web/google/bing search api tool for a given query", 

665 google_search, 

666 )) 

667 

668 # ------------------------------------------------------------------ 

669 # Conditional: SimpleMem long-term memory 

670 # ------------------------------------------------------------------ 

671 if simplemem_store is not None: 

672 from core.event_loop import get_or_create_event_loop 

673 

674 @log_tool_execution 

675 def search_long_term_memory( 

676 query: Annotated[str, "Natural language query to search long-term memory"], 

677 ) -> str: 

678 """Search compressed long-term memory using semantic retrieval.""" 

679 try: 

680 loop = get_or_create_event_loop() 

681 results = loop.run_until_complete(simplemem_store.search(query)) 

682 if results: 

683 return results[0].content 

684 return "No relevant memories found." 

685 except Exception as e: 

686 tool_logger.info(f"SimpleMem search error: {e}") 

687 return "Memory search unavailable." 

688 

689 tools.append(( 

690 "search_long_term_memory", 

691 "Search long-term memory for past conversations, facts, and context using natural language query.", 

692 search_long_term_memory, 

693 )) 

694 

695 @log_tool_execution 

696 def save_to_long_term_memory( 

697 content: Annotated[str, "The information/fact to remember long-term"], 

698 speaker: Annotated[str, "Who said this (e.g. 'User', 'Assistant', 'System')"] = "System", 

699 ) -> str: 

700 """Save important information to compressed long-term memory.""" 

701 try: 

702 loop = get_or_create_event_loop() 

703 loop.run_until_complete(simplemem_store.add(content, { 

704 "sender_name": speaker, 

705 "user_id": user_id, 

706 "prompt_id": prompt_id, 

707 })) 

708 return "Saved to long-term memory." 

709 except Exception as e: 

710 tool_logger.info(f"SimpleMem save error: {e}") 

711 return "Failed to save to long-term memory." 

712 

713 tools.append(( 

714 "save_to_long_term_memory", 

715 "Save important facts or information to long-term memory for future retrieval across sessions.", 

716 save_to_long_term_memory, 

717 )) 

718 

719 # ------------------------------------------------------------------ 

720 # Suggest_Share_Worthy_Content 

721 # ------------------------------------------------------------------ 

722 @log_tool_execution 

723 def suggest_share_worthy_content( 

724 query: Annotated[str, "Any text — not used for filtering, just provide context"] = "", 

725 ) -> str: 

726 """Find high-engagement posts that haven't been shared much and suggest sharing them.""" 

727 try: 

728 from integrations.social.models import get_db, Post, ShareableLink 

729 from sqlalchemy import func as sa_func 

730 

731 db = get_db() 

732 try: 

733 share_counts = ( 

734 db.query( 

735 ShareableLink.resource_id, 

736 sa_func.count(ShareableLink.id).label('link_count'), 

737 ) 

738 .filter(ShareableLink.resource_type == 'post') 

739 .group_by(ShareableLink.resource_id) 

740 .subquery() 

741 ) 

742 

743 posts = ( 

744 db.query(Post, share_counts.c.link_count) 

745 .outerjoin(share_counts, Post.id == share_counts.c.resource_id) 

746 .filter( 

747 Post.is_deleted == False, 

748 Post.is_hidden == False, 

749 Post.upvotes > 5, 

750 Post.comment_count > 3, 

751 ) 

752 .filter( 

753 (share_counts.c.link_count == None) | # noqa: E711 

754 (share_counts.c.link_count < 3) 

755 ) 

756 .order_by(Post.score.desc()) 

757 .limit(3) 

758 .all() 

759 ) 

760 

761 if not posts: 

762 return ("No under-shared high-engagement content found right now. " 

763 "Keep creating great posts and the community will notice!") 

764 

765 suggestions = [] 

766 for post, link_count in posts: 

767 title = (post.title or post.content or '')[:80].strip() 

768 shares = link_count or 0 

769 suggestions.append( 

770 f"- \"{title}\" ({post.upvotes} upvotes, " 

771 f"{post.comment_count} comments, only {shares} shares) " 

772 f"[post_id: {post.id}]" 

773 ) 

774 

775 header = ("These posts are resonating with the community but haven't " 

776 "been shared much yet. Consider sharing them:\n") 

777 return header + "\n".join(suggestions) 

778 finally: 

779 db.close() 

780 except Exception as e: 

781 tool_logger.warning(f"Suggest_Share_Worthy_Content failed: {e}") 

782 return f"Could not fetch share-worthy content right now: {e}" 

783 

784 tools.append(( 

785 "Suggest_Share_Worthy_Content", 

786 "Find high-engagement posts that deserve wider reach but haven't been shared much. " 

787 "Use when the user asks about content worth sharing or to proactively suggest " 

788 "share-worthy community posts.", 

789 suggest_share_worthy_content, 

790 )) 

791 

792 # ------------------------------------------------------------------ 

793 # Observe_User_Experience 

794 # ------------------------------------------------------------------ 

795 @log_tool_execution 

796 def observe_user_experience( 

797 input_text: Annotated[str, "JSON string with event, page, duration_ms, outcome fields"], 

798 ) -> str: 

799 """Record a user experience observation for self-improvement.""" 

800 try: 

801 data = json.loads(input_text) if input_text.startswith('{') else {'event': input_text} 

802 event = data.get('event', 'interaction') 

803 page = data.get('page', '') 

804 duration_ms = data.get('duration_ms', 0) 

805 outcome = data.get('outcome', 'recorded') 

806 

807 observation = f"User {event} on {page} ({duration_ms}ms): {outcome}" 

808 

809 if memory_graph: 

810 session_key = f"{user_id}_{prompt_id}" if prompt_id else str(user_id) 

811 memory_id = memory_graph.register( 

812 content=observation, 

813 metadata={ 

814 'memory_type': 'observation', 

815 'source_agent': 'agent', 

816 'session_id': session_key, 

817 'page': page, 

818 'event': event, 

819 }, 

820 context_snapshot=f"UX observation during session {session_key}", 

821 ) 

822 return f"Observation recorded (id: {memory_id}): {observation}" 

823 

824 return f"Observation noted: {observation}" 

825 except Exception as e: 

826 tool_logger.warning(f"Observe_User_Experience failed: {e}") 

827 return f"Observation noted: {input_text}" 

828 

829 tools.append(( 

830 "Observe_User_Experience", 

831 "Record a user experience observation. Input: JSON with event, page, " 

832 "duration_ms, outcome. Used for self-improvement and understanding user " 

833 "behavior patterns.", 

834 observe_user_experience, 

835 )) 

836 

837 # ------------------------------------------------------------------ 

838 # Self_Critique_And_Enhance 

839 # ------------------------------------------------------------------ 

840 @log_tool_execution 

841 def self_critique_and_enhance( 

842 input_text: Annotated[str, "Topic or area to critique"], 

843 ) -> str: 

844 """Review past suggestions and outcomes to improve future behavior.""" 

845 try: 

846 if not memory_graph: 

847 return f"Self-critique on '{input_text}': Will adjust future behavior based on observations." 

848 

849 session_key = f"{user_id}_{prompt_id}" if prompt_id else str(user_id) 

850 

851 # Recall past suggestions and observations 

852 suggestions = memory_graph.recall( 

853 input_text or 'suggestions made outcomes', mode='semantic', top_k=10, 

854 ) 

855 observations = memory_graph.recall( 

856 'user experience observation', mode='semantic', top_k=10, 

857 ) 

858 

859 if not suggestions and not observations: 

860 return "No past interactions to critique yet. Will observe and learn." 

861 

862 # Format findings for agent reasoning 

863 critique = "Self-critique findings:\n" 

864 if suggestions: 

865 critique += f"Past suggestions ({len(suggestions)}):\n" 

866 for s in suggestions[:5]: 

867 critique += f" - {s.content[:100]}\n" 

868 if observations: 

869 critique += f"User observations ({len(observations)}):\n" 

870 for o in observations[:5]: 

871 critique += f" - {o.content[:100]}\n" 

872 

873 # Store the critique itself as an insight 

874 insight = f"Self-critique on: {input_text}" 

875 memory_graph.register( 

876 content=insight, 

877 metadata={ 

878 'memory_type': 'insight', 

879 'source_agent': 'agent', 

880 'session_id': session_key, 

881 'type': 'self_critique', 

882 }, 

883 context_snapshot=f"Self-critique during session {session_key}", 

884 ) 

885 

886 return critique 

887 except Exception as e: 

888 tool_logger.warning(f"Self_Critique_And_Enhance failed: {e}") 

889 return f"Self-critique on '{input_text}': Will adjust future behavior based on observations." 

890 

891 tools.append(( 

892 "Self_Critique_And_Enhance", 

893 "Review past agent suggestions and user behavior observations to improve " 

894 "future recommendations. Input: topic or area to critique. Helps the agent " 

895 "learn from its own interactions.", 

896 self_critique_and_enhance, 

897 )) 

898 

899 # ------------------------------------------------------------------ 

900 # device_control — Cross-device control via PeerLink (SAME_USER only) 

901 # ------------------------------------------------------------------ 

902 @log_tool_execution 

903 def device_control( 

904 action: Annotated[str, "What to do: 'turn on light', 'check temperature', 'list files', 'run command ls -la'"], 

905 device_hint: Annotated[str, "Which device: 'phone', 'desktop', 'iot hub', or empty for default"] = '', 

906 ) -> str: 

907 """Control a device on the user's private network via PeerLink. 

908 

909 Privacy-first: only targets the user's own devices (SAME_USER trust). 

910 Uses PeerLink dispatch channel with FleetCommandService fallback. 

911 """ 

912 try: 

913 # Step 1: Find the target device via DeviceRoutingService 

914 target_device = None 

915 try: 

916 from integrations.social.models import db_session 

917 from integrations.social.device_routing_service import DeviceRoutingService 

918 with db_session(commit=False) as db: 

919 if device_hint: 

920 # Map hint to capability or form factor 

921 capability = 'general' 

922 if device_hint.lower() in ('phone', 'desktop', 'tablet', 'tv', 'embedded', 'robot'): 

923 # Filter by form factor 

924 devices = DeviceRoutingService.get_user_device_map(db, str(user_id)) 

925 for d in devices: 

926 if d.get('form_factor', '') == device_hint.lower(): 

927 target_device = d 

928 break 

929 if not target_device: 

930 target_device = DeviceRoutingService.pick_device( 

931 db, str(user_id), required_capability=capability) 

932 else: 

933 target_device = DeviceRoutingService.pick_device( 

934 db, str(user_id), required_capability='general') 

935 except Exception as e: 

936 tool_logger.debug(f"Device routing lookup failed: {e}") 

937 

938 target_node_id = (target_device or {}).get('device_id', '') 

939 

940 # Step 2: Try PeerLink dispatch channel (SAME_USER trust only) 

941 peerlink_sent = False 

942 if target_node_id: 

943 try: 

944 from core.peer_link.link_manager import get_link_manager 

945 from core.peer_link.link import TrustLevel 

946 mgr = get_link_manager() 

947 link = mgr.get_link(target_node_id) 

948 if link and link.trust == TrustLevel.SAME_USER: 

949 result = mgr.send( 

950 target_node_id, 'dispatch', 

951 {'type': 'device_control', 'action': action, 

952 'user_id': str(user_id)}, 

953 wait_response=True, timeout=30.0, 

954 ) 

955 if result is not None: 

956 peerlink_sent = True 

957 msg = result.get('message', str(result)) 

958 return f"Device control result: {msg}" 

959 elif link and link.trust != TrustLevel.SAME_USER: 

960 return ("Device control blocked: target device is not a SAME_USER " 

961 "trusted device. Only your own devices can be controlled.") 

962 except Exception as e: 

963 tool_logger.debug(f"PeerLink dispatch failed: {e}") 

964 

965 # Step 3: Fallback to FleetCommandService 

966 if not peerlink_sent: 

967 try: 

968 from integrations.social.models import db_session 

969 from integrations.social.fleet_command import FleetCommandService 

970 with db_session() as db: 

971 cmd = FleetCommandService.push_command( 

972 db, target_node_id or 'self', 

973 'device_control', 

974 {'action': action, 'device_hint': device_hint}, 

975 ) 

976 if cmd: 

977 return f"Device control command queued (id={cmd.get('id', '?')}): {action}" 

978 return "Device control: failed to queue command" 

979 except Exception as e: 

980 tool_logger.debug(f"Fleet command fallback failed: {e}") 

981 

982 # Step 4: Local execution as last resort (this IS the target device) 

983 try: 

984 from integrations.social.fleet_command import FleetCommandService 

985 result = FleetCommandService.execute_command( 

986 'device_control', {'action': action}) 

987 if result.get('success'): 

988 return f"Device control (local): {result.get('message', 'OK')}" 

989 return f"Device control failed: {result.get('message', 'Unknown error')}" 

990 except Exception as e: 

991 return f"Device control unavailable: {e}" 

992 

993 except Exception as e: 

994 tool_logger.warning(f"device_control failed: {e}") 

995 return f"Device control error: {e}" 

996 

997 tools.append(( 

998 "device_control", 

999 "Control a device on the user's private network. Actions: turn on/off lights, " 

1000 "check temperature, list files, run commands. Privacy-first: only your own devices.", 

1001 device_control, 

1002 )) 

1003 

1004 # ------------------------------------------------------------------ 

1005 # data_extraction_from_url — Parity with LangChain Data_Extraction_From_URL 

1006 # ------------------------------------------------------------------ 

1007 @log_tool_execution 

1008 def data_extraction_from_url( 

1009 url: Annotated[str, "The URL to extract content from"], 

1010 url_type: Annotated[str, "Type of URL: 'pdf' or 'website'"] = "website", 

1011 ) -> str: 

1012 """Extract content from a URL (PDF or website). Uses Crawl4AI or direct parsing.""" 

1013 try: 

1014 from threadlocal import thread_local_data as _tld 

1015 _uid = _tld.get_user_id() if hasattr(_tld, 'get_user_id') else user_id 

1016 _rid = _tld.get_request_id() if hasattr(_tld, 'get_request_id') else None 

1017 

1018 # Try Crawl4AI service first 

1019 try: 

1020 from integrations.service_tools import service_tool_registry 

1021 crawl_tool = service_tool_registry.get_tool('Crawl4AI') 

1022 if crawl_tool: 

1023 result = crawl_tool.execute(url) 

1024 if result: 

1025 return f"Extracted from {url}:\n{str(result)[:5000]}" 

1026 except Exception: 

1027 pass 

1028 

1029 # Fallback: direct requests 

1030 import requests as _req 

1031 resp = _req.get(url, timeout=30, headers={'User-Agent': 'Mozilla/5.0'}) 

1032 if url_type == 'pdf': 

1033 return f"PDF downloaded ({len(resp.content)} bytes). Use a PDF parser for full extraction." 

1034 text = resp.text[:5000] 

1035 # Strip HTML tags naively 

1036 import re 

1037 text = re.sub(r'<[^>]+>', ' ', text) 

1038 text = re.sub(r'\s+', ' ', text).strip() 

1039 return f"Extracted from {url}:\n{text[:4000]}" 

1040 except Exception as e: 

1041 return f"URL extraction failed: {e}" 

1042 

1043 tools.append(( 

1044 "data_extraction_from_url", 

1045 "Extract content from a URL (PDF or website). Input: URL and type ('pdf' or 'website'). " 

1046 "Uses Crawl4AI for rich extraction with fallback to direct HTTP fetch.", 

1047 data_extraction_from_url, 

1048 )) 

1049 

1050 # ------------------------------------------------------------------ 

1051 # get_user_details — Parity with LangChain User_details_tool 

1052 # ------------------------------------------------------------------ 

1053 @log_tool_execution 

1054 def get_user_details() -> str: 

1055 """Get current user's profile details.""" 

1056 try: 

1057 uid = user_id 

1058 # Try local DB first 

1059 try: 

1060 from integrations.social.models import get_db, User 

1061 db = get_db() 

1062 try: 

1063 user = db.query(User).filter_by(id=str(uid)).first() 

1064 if user: 

1065 return json.dumps(user.to_dict(), default=str) 

1066 finally: 

1067 db.close() 

1068 except Exception: 

1069 pass 

1070 

1071 # Fallback: cloud API 

1072 import requests as _req 

1073 resp = _req.post( 

1074 'https://azurekong.hertzai.com:8443/db/getstudent_by_user_id', 

1075 json={'user_id': uid}, timeout=10, 

1076 ) 

1077 return f"User details: {resp.text}" 

1078 except Exception as e: 

1079 return f"Could not fetch user details: {e}" 

1080 

1081 tools.append(( 

1082 "get_user_details", 

1083 "Get the current user's profile information (name, email, preferences, etc.). " 

1084 "Use when the user asks about their profile or when you need user context.", 

1085 get_user_details, 

1086 )) 

1087 

1088 # ------------------------------------------------------------------ 

1089 # request_resource — Parity with LangChain Request_Resource 

1090 # ------------------------------------------------------------------ 

1091 @log_tool_execution 

1092 def request_resource( 

1093 resource_description: Annotated[str, "JSON or plain text describing the needed resource. JSON format: {\"resource_type\": \"api_key\", \"key_name\": \"GOOGLE_API_KEY\", \"label\": \"Google API Key\", \"used_by\": \"search tool\", \"description\": \"needed for web search\"}"], 

1094 ) -> str: 

1095 """Request an API key, credential, token, or config value that is not currently available.""" 

1096 try: 

1097 try: 

1098 req = json.loads(resource_description) 

1099 except (ValueError, TypeError): 

1100 req = { 

1101 'resource_type': 'api_key', 

1102 'key_name': 'UNKNOWN', 

1103 'label': resource_description[:100], 

1104 'description': resource_description, 

1105 'used_by': 'Agent tool', 

1106 } 

1107 

1108 key_name = req.get('key_name', 'UNKNOWN') 

1109 resource_type = req.get('resource_type', 'api_key') 

1110 

1111 # Check env vars first 

1112 env_val = os.environ.get(key_name) 

1113 if env_val: 

1114 return f"Resource '{key_name}' is already configured and available." 

1115 

1116 # Check vault 

1117 try: 

1118 from desktop.ai_key_vault import AIKeyVault 

1119 vault = AIKeyVault.get_instance() 

1120 val = vault.get_tool_key(key_name) if resource_type != 'channel_secret' else vault.get_channel_secret(req.get('channel_type', ''), key_name) 

1121 if val: 

1122 os.environ[key_name] = val 

1123 return f"Resource '{key_name}' loaded from vault and is now available." 

1124 except Exception: 

1125 pass 

1126 

1127 # Track as pending and request from user 

1128 try: 

1129 from desktop.ai_key_vault import AIKeyVault 

1130 AIKeyVault.get_instance().add_pending_request( 

1131 key_name=key_name, resource_type=resource_type, 

1132 channel_type=req.get('channel_type', ''), 

1133 label=req.get('label', key_name), 

1134 description=req.get('description', ''), 

1135 used_by=req.get('used_by', 'Agent tool'), 

1136 ) 

1137 except Exception: 

1138 pass 

1139 

1140 secret_request = json.dumps({ 

1141 '__SECRET_REQUEST__': True, 'type': resource_type, 

1142 'key_name': key_name, 'label': req.get('label', key_name), 

1143 'description': req.get('description', f'{key_name} is required.'), 

1144 'used_by': req.get('used_by', 'Agent tool'), 

1145 'channel_type': req.get('channel_type', ''), 

1146 }) 

1147 return ( 

1148 f"I need the user to provide '{req.get('label', key_name)}'. " 

1149 f"Required for {req.get('used_by', 'a tool')}. " 

1150 f"{req.get('description', '')} " 

1151 f"RESOURCE_REQUEST:{secret_request}" 

1152 ) 

1153 except Exception as e: 

1154 return f"Resource request failed: {e}" 

1155 

1156 tools.append(( 

1157 "request_resource", 

1158 "Request an API key, credential, token, or config value. Checks vault/env first, " 

1159 "then prompts the user if not found. Handles: API keys (OpenAI, Google, Slack), " 

1160 "OAuth tokens, channel secrets, service credentials. " 

1161 "Input: JSON with resource_type, key_name, label, used_by, description.", 

1162 request_resource, 

1163 )) 

1164 

1165 # ------------------------------------------------------------------ 

1166 # observe_user_experience — Parity with LangChain Observe_User_Experience 

1167 # ------------------------------------------------------------------ 

1168 @log_tool_execution 

1169 def observe_user_experience( 

1170 event: Annotated[str, "What happened (e.g. 'clicked', 'scrolled', 'left page')"], 

1171 page: Annotated[str, "Which page or screen"] = "", 

1172 outcome: Annotated[str, "What was the result or user reaction"] = "", 

1173 duration_ms: Annotated[int, "How long the interaction lasted in ms"] = 0, 

1174 ) -> str: 

1175 """Record a user experience observation for self-improvement.""" 

1176 observation = f"User {event} on {page} ({duration_ms}ms): {outcome}" 

1177 try: 

1178 if memory_graph: 

1179 session_id = f"{user_id}_{prompt_id}" if prompt_id else str(user_id) 

1180 mid = memory_graph.register( 

1181 content=observation, 

1182 metadata={'memory_type': 'observation', 'source_agent': 'agent', 

1183 'session_id': session_id, 'page': page, 'event': event}, 

1184 context_snapshot=f"UX observation during session {session_id}", 

1185 ) 

1186 return f"Observation recorded (id: {mid}): {observation}" 

1187 except Exception: 

1188 pass 

1189 return f"Observation noted: {observation}" 

1190 

1191 tools.append(( 

1192 "observe_user_experience", 

1193 "Record a user experience observation. Use to track behavior patterns " 

1194 "for self-improvement. Input: event, page, outcome, duration_ms.", 

1195 observe_user_experience, 

1196 )) 

1197 

1198 # ------------------------------------------------------------------ 

1199 # self_critique_and_enhance — Parity with LangChain Self_Critique_And_Enhance 

1200 # ------------------------------------------------------------------ 

1201 @log_tool_execution 

1202 def self_critique_and_enhance( 

1203 topic: Annotated[str, "Topic or area to critique (e.g. 'my recommendations', 'search quality')"] = "", 

1204 ) -> str: 

1205 """Review past agent suggestions and user observations to improve future behavior.""" 

1206 try: 

1207 if not memory_graph: 

1208 return "Self-critique unavailable: no memory graph for this session." 

1209 

1210 suggestions = memory_graph.recall(topic or 'suggestions made outcomes', mode='semantic', top_k=10) 

1211 observations = memory_graph.recall('user experience observation', mode='semantic', top_k=10) 

1212 

1213 if not suggestions and not observations: 

1214 return "No past interactions to critique yet. Will observe and learn." 

1215 

1216 critique = "Self-critique findings:\n" 

1217 if suggestions: 

1218 critique += f"Past suggestions ({len(suggestions)}):\n" 

1219 for s in suggestions[:5]: 

1220 critique += f" - {s.content[:100]}\n" 

1221 if observations: 

1222 critique += f"User observations ({len(observations)}):\n" 

1223 for o in observations[:5]: 

1224 critique += f" - {o.content[:100]}\n" 

1225 

1226 session_id = f"{user_id}_{prompt_id}" if prompt_id else str(user_id) 

1227 memory_graph.register( 

1228 content=f"Self-critique on: {topic}", 

1229 metadata={'memory_type': 'insight', 'source_agent': 'agent', 

1230 'session_id': session_id, 'type': 'self_critique'}, 

1231 context_snapshot=f"Self-critique during session {session_id}", 

1232 ) 

1233 return critique 

1234 except Exception as e: 

1235 return f"Self-critique unavailable: {e}" 

1236 

1237 tools.append(( 

1238 "self_critique_and_enhance", 

1239 "Review past agent suggestions and user behavior observations to improve " 

1240 "future recommendations. Input: topic or area to critique.", 

1241 self_critique_and_enhance, 

1242 )) 

1243 

1244 return tools 

1245 

1246 

1247def register_remote_desktop_tools_if_available(ctx, helper, executor): 

1248 """Register remote desktop tools if the module is available. 

1249 

1250 Gracefully skips if integrations/remote_desktop is not installed. 

1251 """ 

1252 try: 

1253 from integrations.remote_desktop.agent_tools import ( 

1254 build_remote_desktop_tools, register_remote_desktop_tools, 

1255 ) 

1256 rd_tools = build_remote_desktop_tools(ctx) 

1257 register_remote_desktop_tools(rd_tools, helper, executor) 

1258 tool_logger.info(f"Remote desktop tools registered ({len(rd_tools)} tools)") 

1259 except ImportError: 

1260 pass 

1261 except Exception as e: 

1262 tool_logger.warning(f"Remote desktop tools registration failed: {e}") 

1263 

1264 

1265def register_memory_graph_tools(memory_graph, helper, executor, user_id, user_prompt): 

1266 """Register MemoryGraph provenance tools if memory_graph is available. 

1267 

1268 Delegates to the existing agent_memory_tools module. 

1269 """ 

1270 if memory_graph is None: 

1271 return 

1272 try: 

1273 from integrations.channels.memory.agent_memory_tools import create_memory_tools, register_autogen_tools 

1274 mem_tools = create_memory_tools(memory_graph, str(user_id), user_prompt) 

1275 register_autogen_tools(mem_tools, executor, helper) 

1276 tool_logger.info(f"MemoryGraph tools registered for {user_prompt}") 

1277 except Exception as e: 

1278 tool_logger.warning(f"MemoryGraph tools registration failed: {e}")