Coverage for core / agent_tools.py: 31.0%
542 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2core/agent_tools.py — Canonical AutoGen tool definitions (single source of truth).
4Every core tool is defined ONCE here. Both create_recipe.py and reuse_recipe.py
5call build_core_tool_closures() + register_core_tools() instead of duplicating
6function bodies and registration lines.
8Pattern mirrors:
9 - integrations/service_tools/media_agent.py → register_media_tools()
10 - integrations/channels/memory/agent_memory_tools.py → register_autogen_tools()
11 - integrations/agent_engine/marketing_tools.py → register_marketing_tools()
12"""
13import json
14import logging
15import os
16import threading
17import time
18import uuid
19from datetime import datetime
20from typing import Annotated, Any, List, Optional, Tuple
22import requests
23from json_repair import repair_json
25from core.http_pool import pooled_get, pooled_post
26from integrations.service_tools.model_catalog import ModelType
28tool_logger = logging.getLogger('tool_execution')
31# ---------------------------------------------------------------------------
32# Generic registration helper
33# ---------------------------------------------------------------------------
35def register_dual(helper, executor, func, name: str, description: str):
36 """Register a single tool on both the LLM-calling and executing agents.
38 AutoGen's tool pattern pairs ``helper.register_for_llm`` with
39 ``executor.register_for_execution`` — create_recipe.py and
40 reuse_recipe.py repeat this pair 40+ times inline. Call sites
41 that define a closure and register it immediately use this
42 helper; batches that already have a ``(name, desc, func)`` list
43 should use :func:`register_core_tools` instead.
45 Returns ``func`` unchanged so the call can be inlined after a
46 closure definition without shadowing the name.
47 """
48 helper.register_for_llm(name=name, description=description)(func)
49 executor.register_for_execution(name=name)(func)
50 return func
53def register_core_tools(tools, helper, executor):
54 """Register (name, desc, func) tuples on an AutoGen helper/executor pair.
56 Args:
57 tools: list of (name, description, func) tuples from build_core_tool_closures()
58 helper: AutoGen agent that suggests tool use (register_for_llm)
59 executor: AutoGen agent that executes tools (register_for_execution)
60 """
61 for name, desc, func in tools:
62 register_dual(helper, executor, func, name, desc)
65# ---------------------------------------------------------------------------
66# Core tool closure factory
67# ---------------------------------------------------------------------------
69def build_core_tool_closures(ctx):
70 """Build session-scoped tool closures. Returns list of (name, desc, func).
72 Args:
73 ctx: dict with session variables:
74 user_id, prompt_id, agent_data, helper_fun, user_prompt,
75 request_id_list, recent_file_id, scheduler,
76 simplemem_store (optional), memory_graph (optional),
77 log_tool_execution (decorator), send_message_to_user1 (func),
78 retrieve_json (func), strip_json_values (func),
79 save_conversation_db (func)
80 """
81 # Unpack context -------------------------------------------------------
82 user_id = ctx['user_id']
83 prompt_id = ctx['prompt_id']
84 agent_data = ctx['agent_data']
85 helper_fun = ctx['helper_fun']
86 user_prompt = ctx['user_prompt']
87 request_id_list = ctx['request_id_list']
88 recent_file_id = ctx['recent_file_id']
89 scheduler = ctx['scheduler']
90 simplemem_store = ctx.get('simplemem_store')
91 memory_graph = ctx.get('memory_graph')
92 # log_tool_execution: optional decorator (create_recipe.py has it, reuse_recipe.py may not)
93 log_tool_execution = ctx.get('log_tool_execution') or (lambda f: f)
94 send_message_to_user1 = ctx['send_message_to_user1']
95 _retrieve_json = ctx['retrieve_json']
96 _strip_json_values = ctx['strip_json_values']
97 save_conversation_db = ctx['save_conversation_db']
99 tools: List[Tuple[str, str, Any]] = []
101 # ------------------------------------------------------------------
102 # 1. text_2_image
103 # ------------------------------------------------------------------
104 @log_tool_execution
105 def text_2_image(text: Annotated[str, "Text to create image"]) -> str:
106 return helper_fun.txt2img(text)
108 tools.append((
109 "text_2_image",
110 "Text to image Creator",
111 text_2_image,
112 ))
113 # Alias — reuse_recipe.py main flow LLM prompts advertise `txt2img` (#510).
114 # Same canonical closure (helper_fun.txt2img — local-first, sovereignty).
115 tools.append((
116 "txt2img",
117 "Text to image Creator (alias of text_2_image)",
118 text_2_image,
119 ))
121 # ------------------------------------------------------------------
122 # 2. get_user_camera_inp
123 # ------------------------------------------------------------------
124 @log_tool_execution
125 def get_user_camera_inp(
126 inp: Annotated[str, "The Question to check from visual context"],
127 ) -> str:
128 return helper_fun.get_user_camera_inp(inp, int(user_id), request_id_list[user_prompt])
130 tools.append((
131 "get_user_camera_inp",
132 "Get user's visual information to process somethings",
133 get_user_camera_inp,
134 ))
136 # ------------------------------------------------------------------
137 # 3. save_data_in_memory
138 # ------------------------------------------------------------------
139 @log_tool_execution
140 def save_data_in_memory(
141 key: Annotated[str, "Key path for storing data now & retrieving data later. Use dot notation for nested keys (e.g., 'user.info.name')."],
142 value: Annotated[Optional[Any], "Value you want to store; strictly should be one of int, float, bool, json array or json object."] = None,
143 ) -> str:
144 """Store data with validation to prevent corruption."""
145 tool_logger.info('INSIDE save_data_in_memory')
146 try:
147 if isinstance(value, str) and (value.startswith('{') or value.startswith('[')):
148 value = _retrieve_json(value)
149 tool_logger.info(f"REPAIRED JSON STRING: {value}")
150 if value is not None:
151 json_str = json.dumps(value)
152 validated_value = json.loads(json_str)
153 tool_logger.info(f"VALIDATED VALUE (post JSON cycle): {validated_value}")
154 else:
155 validated_value = None
157 keys = key.split('.')
158 d = agent_data.setdefault(prompt_id, {})
159 for k in keys[:-1]:
160 d = d.setdefault(k, {})
161 d[keys[-1]] = validated_value
162 tool_logger.info(f"VALUES STORED IN AGENT DATA: {validated_value}")
163 tool_logger.info(f"FULL AGENT DATA AT KEY: {d}")
165 if helper_fun.save_agent_data_to_file(prompt_id, agent_data):
166 tool_logger.info(f"[OK] Data persisted to file for prompt_id {prompt_id}")
167 else:
168 tool_logger.warning(f"Failed to persist data to file for prompt_id {prompt_id}")
170 try:
171 stored_value = get_data_by_key(key)
172 tool_logger.info(f"VERIFICATION - READ BACK VALUE: {stored_value}")
173 if stored_value == "Key not found in stored data.":
174 tool_logger.error(f"VERIFICATION FAILED: Data not properly stored at key {key}")
175 except Exception as e:
176 tool_logger.error(f"VERIFICATION ERROR: {str(e)}")
178 return f'{agent_data[prompt_id]}'
179 except json.JSONDecodeError as je:
180 error_msg = f"Invalid JSON structure in value: {str(je)}"
181 tool_logger.error(error_msg)
182 return f"Error: {error_msg} - Data not saved"
183 except TypeError as te:
184 error_msg = f"Type error in value: {str(te)}"
185 tool_logger.error(error_msg)
186 return f"Error: {error_msg} - Data not saved"
187 except Exception as e:
188 error_msg = f"Unexpected error saving data: {str(e)}"
189 tool_logger.error(error_msg)
190 return f"Error: {error_msg} - Data not saved"
192 tools.append((
193 "save_data_in_memory",
194 "Use this to Store and retrieve data using key-value storage system",
195 save_data_in_memory,
196 ))
198 # ------------------------------------------------------------------
199 # 4. get_saved_metadata
200 # ------------------------------------------------------------------
201 @log_tool_execution
202 def get_saved_metadata() -> str:
203 """Get metadata with automatic loading from persistent storage."""
204 if prompt_id not in agent_data or not agent_data[prompt_id]:
205 tool_logger.info(f"Loading agent data from file for get_saved_metadata, prompt_id {prompt_id}")
206 helper_fun.load_agent_data_from_file(prompt_id, agent_data)
207 stripped_json = _strip_json_values(agent_data[prompt_id])
208 return f'{stripped_json}'
210 tools.append((
211 "get_saved_metadata",
212 "Returns the schema of the json from internal memory with all keys but without actual values.",
213 get_saved_metadata,
214 ))
216 # ------------------------------------------------------------------
217 # 5. get_data_by_key
218 # ------------------------------------------------------------------
219 @log_tool_execution
220 def get_data_by_key(
221 key: Annotated[str, "Key path for retrieving data. Use dot notation for nested keys (e.g., 'user.info.name')."],
222 ) -> str:
223 if prompt_id not in agent_data or not agent_data[prompt_id]:
224 tool_logger.info(f"Loading agent data from file for prompt_id {prompt_id}")
225 helper_fun.load_agent_data_from_file(prompt_id, agent_data)
226 keys = key.split('.')
227 d = agent_data.get(prompt_id, {})
228 try:
229 for k in keys:
230 d = d[k]
231 return f'{d}'
232 except KeyError:
233 return "Key not found in stored data."
235 tools.append((
236 "get_data_by_key",
237 "Returns all data from the internal Memory using key",
238 get_data_by_key,
239 ))
240 # Alias — Helper system prompts in reuse_recipe.py advertise this name (#510).
241 # Same closure → identical behavior under both names. Never remove a
242 # registered tool: phantom tool fixed by adding a real registration.
243 tools.append((
244 "get_data_from_memory",
245 "Returns all data from the internal Memory using key (alias of get_data_by_key)",
246 get_data_by_key,
247 ))
249 # ------------------------------------------------------------------
250 # 6. get_user_id
251 # ------------------------------------------------------------------
252 @log_tool_execution
253 def get_user_id() -> str:
254 tool_logger.info('INSIDE get_user_id')
255 return f'{user_id}'
257 tools.append((
258 "get_user_id",
259 "Returns the unique identifier (user_id) of the current user.",
260 get_user_id,
261 ))
263 # ------------------------------------------------------------------
264 # 7. get_prompt_id
265 # ------------------------------------------------------------------
266 @log_tool_execution
267 def get_prompt_id() -> str:
268 tool_logger.info('INSIDE get_prompt_id')
269 return f'{prompt_id}'
271 tools.append((
272 "get_prompt_id",
273 "Returns the unique identifier (prompt_id) associated with the current prompt or conversation.",
274 get_prompt_id,
275 ))
277 # ------------------------------------------------------------------
278 # 8. Generate_video (canonical — full LTX-2 + avatar)
279 # ------------------------------------------------------------------
280 @log_tool_execution
281 def Generate_video(
282 text: Annotated[str, "Text to be used for video generation"],
283 avatar_id: Annotated[int, "Unique identifier for the avatar (use 0 for LTX-2 text-to-video)"],
284 realtime: Annotated[bool, "If True, response is fast but less realistic by default it should be true; if False, response is realistic but slower"],
285 model: Annotated[str, "Video model to use: 'avatar' for avatar-based video, 'ltx2' for LTX-2 text-to-video generation"] = "avatar",
286 ) -> str:
287 tool_logger.info(f'INSIDE Generate_video with model={model}')
289 # LTX-2 Text-to-Video Generation
290 if model.lower() == "ltx2":
291 tool_logger.info(f'Using LTX-2 for video generation: {text[:50]}...')
292 LOCAL_COMFYUI_URL = "http://localhost:8188"
293 LOCAL_LTX_URL = "http://localhost:5002"
294 headers = {'Content-Type': 'application/json'}
295 ltx_payload = {
296 "prompt": text,
297 "negative_prompt": "worst quality, inconsistent motion, blurry, jittery, distorted",
298 "num_frames": 97,
299 "width": 832,
300 "height": 480,
301 "num_inference_steps": 30 if realtime else 50,
302 "guidance_scale": 3.0,
303 "fps": 24,
304 }
306 # Fast health probe — skip dead servers instantly (0ms vs 10s timeout)
307 def _is_server_up(url, name):
308 try:
309 r = pooled_get(f"{url}/health", timeout=1.5)
310 return r.status_code < 500
311 except Exception:
312 tool_logger.info(f"{name} not reachable — skipping instantly")
313 return False
315 # Try local LTX-2 server first — only if alive
316 if _is_server_up(LOCAL_LTX_URL, "LTX-2"):
317 try:
318 tool_logger.info(f"LTX-2 server is UP, generating...")
319 response = pooled_post(f"{LOCAL_LTX_URL}/generate", json=ltx_payload, headers=headers, timeout=600)
320 if response.status_code == 200:
321 result = response.json()
322 video_url = result.get('video_url') or result.get('output_url') or result.get('video_path')
323 if video_url:
324 tool_logger.info(f"LTX-2 video generated: {video_url}")
325 return f"LTX-2 Video generated successfully. URL: {video_url}"
326 except requests.exceptions.RequestException as e:
327 tool_logger.info(f"LTX-2 generation failed: {e}")
329 # Try ComfyUI — only if alive
330 if _is_server_up(LOCAL_COMFYUI_URL, "ComfyUI"):
331 try:
332 tool_logger.info(f"ComfyUI is UP, submitting workflow...")
333 comfyui_workflow = {
334 "prompt": {
335 "1": {"class_type": "LTXVLoader", "inputs": {"ckpt_name": "ltx-video-2b-v0.9.safetensors"}},
336 "2": {"class_type": "LTXVConditioning", "inputs": {"positive": text, "negative": ltx_payload["negative_prompt"], "ltxv_model": ["1", 0]}},
337 "3": {"class_type": "LTXVSampler", "inputs": {"seed": int(time.time()) % 2147483647, "steps": ltx_payload["num_inference_steps"], "cfg": ltx_payload["guidance_scale"], "width": ltx_payload["width"], "height": ltx_payload["height"], "num_frames": ltx_payload["num_frames"], "ltxv_model": ["1", 0], "conditioning": ["2", 0]}},
338 "4": {"class_type": "LTXVDecode", "inputs": {"ltxv_model": ["1", 0], "samples": ["3", 0]}},
339 "5": {"class_type": "VHS_VideoCombine", "inputs": {"frame_rate": ltx_payload["fps"], "filename_prefix": "ltx2_output", "format": "video/h264-mp4", "images": ["4", 0]}},
340 }
341 }
342 response = pooled_post(f"{LOCAL_COMFYUI_URL}/prompt", json=comfyui_workflow, headers=headers, timeout=10)
343 if response.status_code == 200:
344 comfy_prompt_id = response.json().get('prompt_id')
345 tool_logger.info(f"ComfyUI LTX-2 job queued: {comfy_prompt_id}")
346 for _ in range(120):
347 time.sleep(5)
348 history_response = pooled_get(f"{LOCAL_COMFYUI_URL}/history/{comfy_prompt_id}")
349 if history_response.status_code == 200:
350 history = history_response.json()
351 if comfy_prompt_id in history:
352 outputs = history[comfy_prompt_id].get('outputs', {})
353 for node_id, output in outputs.items():
354 for media_key in ('gifs', 'videos'):
355 if media_key in output:
356 filename = output[media_key][0].get('filename')
357 if filename:
358 video_url = f"{LOCAL_COMFYUI_URL}/view?filename={filename}"
359 return f"LTX-2 Video generated via ComfyUI. URL: {video_url}"
360 return f"LTX-2 Video generation queued in ComfyUI (prompt_id: {comfy_prompt_id}). Check ComfyUI interface for output."
361 except requests.exceptions.RequestException as e:
362 tool_logger.info(f"ComfyUI generation failed: {e}")
364 # Local servers unavailable — try hive mesh peer with GPU
365 try:
366 from integrations.agent_engine.compute_config import get_compute_policy
367 policy = get_compute_policy()
368 if policy.get('compute_policy') != 'local_only':
369 from integrations.agent_engine.compute_mesh_service import get_compute_mesh
370 mesh = get_compute_mesh()
371 result = mesh.offload_to_best_peer(
372 model_type=ModelType.VIDEO_GEN,
373 prompt=text,
374 options={'model': 'ltx2', 'timeout': 300},
375 )
376 if result and 'error' not in result:
377 video_url = result.get('response', result.get('video_url', ''))
378 peer = result.get('offloaded_to', 'hive_peer')
379 tool_logger.info(f"LTX-2 video generated via hive peer {peer}: {video_url}")
380 return f"LTX-2 Video generated via hive peer. URL: {video_url}"
381 tool_logger.info(f"Hive mesh video offload failed: {result.get('error')}")
382 except Exception as e:
383 tool_logger.info(f"Hive mesh offload not available: {e}")
385 return ("LTX-2 video generation failed. No local GPU, no hive peers with GPU. "
386 "Options: (1) Pair a GPU device: hart compute pair <address>, "
387 "(2) Set HEVOLVE_COMPUTE_POLICY=any, "
388 "(3) Install local LTX-2 server with CUDA GPU")
390 # Default: Avatar-based video generation
391 from core.config_cache import get_db_url
392 database_url = get_db_url() or 'https://mailer.hertzai.com'
393 request_id = str(uuid.uuid4()).replace("-", "")[:11]
394 tool_logger.info(f"avtar_id: {avatar_id}:\n{text[:10]}....\n")
396 headers = {'Content-Type': 'application/json'}
397 data = {
398 "text": str(text),
399 'flag_hallo': 'false',
400 'chattts': False,
401 'openvoice': "false",
402 }
404 try:
405 res = pooled_get(f"{database_url}/get_image_by_id/{avatar_id}")
406 res = res.json()
407 new_image_url = res["image_url"]
408 voice_id = res.get('voice_id')
409 except Exception:
410 data['openvoice'] = "true"
411 new_image_url = None
412 voice_id = None
414 data["cartoon_image"] = "True"
415 data["bg_url"] = 'http://stream.mcgroce.com/txt/examples_cartoon/roy_bg.jpg'
416 data['vtoonify'] = "false"
417 data["image_url"] = new_image_url
418 data['im_crop'] = "false"
419 data['remove_bg'] = "false"
420 data['hd_video'] = "false"
421 data['uid'] = str(request_id)
422 data['gradient'] = "true"
423 data['cus_bg'] = "false"
424 data['solid_color'] = "false"
425 data['inpainting'] = "false"
426 data['prompt'] = ""
427 data['gender'] = 'male'
429 timeout = 60
430 if not realtime:
431 timeout = 600
432 data['chattts'] = True
433 data['flag_hallo'] = "true"
434 data["cartoon_image"] = "False"
436 if voice_id is not None:
437 try:
438 voice_sample = pooled_get(f"{database_url}/get_voice_sample_id/{voice_id}")
439 voice_sample = voice_sample.json()
440 data["audio_sample_url"] = voice_sample.get("voice_sample_url")
441 data['voice_id'] = int(voice_id) if voice_id else None
442 except Exception:
443 data["audio_sample_url"] = None
444 data['voice_id'] = None
445 else:
446 data["audio_sample_url"] = None
447 data['voice_id'] = None
449 conv_id = save_conversation_db(text, user_id, prompt_id, database_url, request_id)
450 data['conv_id'] = int(conv_id)
451 data['avatar_id'] = int(avatar_id)
452 data['timeout'] = int(timeout)
454 try:
455 pooled_post(f"{database_url}/video_generate_save",
456 data=json.dumps(data), headers=headers, timeout=1)
457 except Exception:
458 pass
460 if data['chattts'] or data['flag_hallo'] == "true":
461 return f"Video Generation task added to queue with conv_id:{conv_id}. Ask the helper to save this conv_id in the same collection from which the story used to generate the video was retrieved, for future reference"
462 else:
463 return f"Video Generation completed with conv_id:{conv_id}. Ask the helper to save this conv_id in the same collection from which the story used to generate the video was retrieved, for future reference"
465 tools.append((
466 "Generate_video",
467 "Generate video with text. Use model='ltx2' for AI text-to-video generation, or model='avatar' (default) for avatar-based video with voice synthesis.",
468 Generate_video,
469 ))
471 # ------------------------------------------------------------------
472 # 9. get_user_uploaded_file
473 # ------------------------------------------------------------------
474 @log_tool_execution
475 def get_user_uploaded_file() -> str:
476 tool_logger.info('INSIDE get_user_uploaded_file')
477 if recent_file_id[user_id]:
478 return f'Got user uploaded file the file_id is {recent_file_id[user_id]}'
479 return 'No file uploaded from user'
481 tools.append((
482 "get_user_uploaded_file",
483 "get user's recent uploaded files",
484 get_user_uploaded_file,
485 ))
487 # ------------------------------------------------------------------
488 # 10. get_text_from_image (img2txt)
489 # ------------------------------------------------------------------
490 @log_tool_execution
491 def img2txt(
492 image_url: Annotated[str, "image url of which you want text"],
493 text: Annotated[str, "the details you want from image"] = 'Describe the Images & Text data in this image in detail',
494 ) -> str:
495 tool_logger.info('INSIDE img2txt')
496 # SSRF protection: validate image URL before fetching
497 try:
498 from security.sanitize import validate_url
499 image_url = validate_url(image_url)
500 except (ImportError, ValueError) as e:
501 tool_logger.warning(f"Image URL blocked by SSRF filter: {image_url} — {e}")
502 return f"Error: URL blocked by security filter: {e}"
503 # Try local Qwen Vision first (bundled mode), fall back to cloud
504 from core.config_cache import get_vision_api, is_bundled
505 url = get_vision_api()
506 if not url:
507 tool_logger.warning("No LLAVA_API configured — vision inference may fail on no-GPU instances")
508 url = "http://azurekong.hertzai.com:8000/llava/image_inference"
510 if is_bundled():
511 # Local: use Qwen Vision via upload/vision endpoint
512 payload = json.dumps({'image_url': image_url, 'prompt': text})
513 response = requests.post(url, data=payload,
514 headers={'Content-Type': 'application/json'}, timeout=60)
515 else:
516 payload = {'url': image_url, 'prompt': text}
517 response = requests.request("POST", url, headers={}, data=payload, files=[], timeout=300)
518 if response.status_code == 200:
519 return response.text
520 else:
521 return 'Not able to get this page details try later'
523 tools.append((
524 "get_text_from_image",
525 "Image to Text/Question Answering from image",
526 img2txt,
527 ))
528 # Alias — reuse_recipe.py main flow LLM prompts advertise `img2txt` (#510).
529 # Same canonical closure (SSRF-validated, local Qwen Vision + cloud LLaVA fallback).
530 tools.append((
531 "img2txt",
532 "Image to Text/Question Answering from image (alias of get_text_from_image)",
533 img2txt,
534 ))
536 # ------------------------------------------------------------------
537 # 11. create_scheduled_jobs
538 # ------------------------------------------------------------------
539 @log_tool_execution
540 def create_scheduled_jobs(
541 interval_sec: Annotated[int, "time between two Interval in seconds."],
542 job_description: Annotated[str, "Description of the job to be performed"],
543 cron_expression: Annotated[Optional[str], "Cron expression for scheduling. Example: '0 9 * * 1-5' (Runs at 9:00 AM, Monday to Friday). If the interval is greater than 60 seconds or it needs to be executed at a dynamic cron time this argument is Mandatory else None"] = None,
544 ) -> str:
545 tool_logger.info('INSIDE create_scheduled_jobs')
546 return 'Added this schedule job in creation process will do it at the end. you can go ahead and mark this action as completed.'
548 tools.append((
549 "create_scheduled_jobs",
550 "Creates time-based jobs using APScheduler to schedule jobs",
551 create_scheduled_jobs,
552 ))
554 # ------------------------------------------------------------------
555 # 12. send_message_to_user
556 # ------------------------------------------------------------------
557 @log_tool_execution
558 def send_message_to_user(
559 text: Annotated[str, "Text you want to send to the user"],
560 avatar_id: Annotated[Optional[str], "Unique identifier for the avatar"] = None,
561 response_type: Annotated[Optional[str], "Response mode: 'Realistic' (slower, better quality) or 'Realtime' (faster, lower quality)"] = 'Realtime',
562 ) -> str:
563 tool_logger.info('INSIDE send_message_to_user')
564 tool_logger.info(f'SENDING DATA 2 user with values text:{text}, avatar_id:{avatar_id}, response_type:{response_type}')
565 thread = threading.Thread(target=send_message_to_user1, args=(user_id, text, '', prompt_id))
566 thread.start()
567 return f'Message sent successfully to user with request_id: {request_id_list[user_prompt]}-intermediate'
569 tools.append((
570 "send_message_to_user",
571 "Sends a message/information to user. You can use this if you want to ask a question",
572 send_message_to_user,
573 ))
575 # ------------------------------------------------------------------
576 # 13. send_presynthesized_video_to_user
577 # ------------------------------------------------------------------
578 @log_tool_execution
579 def send_presynthesized_video_to_user(
580 conv_id: Annotated[str, "Conversation ID associated with the text from memory"],
581 ) -> str:
582 tool_logger.info('INSIDE send_presynthesized_video_to_user')
583 tool_logger.info(f'SENDING DATA 2 user with value: conv_id:{conv_id}.')
584 return 'Message sent successfully to user'
586 tools.append((
587 "send_presynthesized_video_to_user",
588 "Sends a presynthesized message/video/dialogue to user using conv_id.",
589 send_presynthesized_video_to_user,
590 ))
592 # ------------------------------------------------------------------
593 # 14. send_message_in_seconds
594 # ------------------------------------------------------------------
595 @log_tool_execution
596 def send_message_in_seconds(
597 text: Annotated[str, "text to send to user"],
598 delay: Annotated[int, "time to wait in seconds before sending text"],
599 conv_id: Annotated[Optional[int], "conv_id for this text if not available make it None"] = None,
600 ) -> str:
601 tool_logger.info('INSIDE send_message_in_seconds')
602 tool_logger.info(f'with text:{text}. and waiting time: {delay} conv_id: {conv_id}')
603 run_time = datetime.fromtimestamp(time.time() + delay)
604 scheduler.add_job(send_message_to_user1, 'date', run_date=run_time, args=[user_id, text, '', prompt_id])
605 return 'Message scheduled successfully'
607 tools.append((
608 "send_message_in_seconds",
609 "Sends a presynthesized message/video/dialogue to user using conv_id with a timer.",
610 send_message_in_seconds,
611 ))
613 # ------------------------------------------------------------------
614 # 15. get_chat_history
615 # ------------------------------------------------------------------
616 @log_tool_execution
617 def get_chat_history(
618 text: Annotated[str, "Text related to which you want history"],
619 start: Annotated[Optional[str], "start date in format %Y-%m-%dT%H:%M:%S.%fZ"] = None,
620 end: Annotated[Optional[str], "end date in format %Y-%m-%dT%H:%M:%S.%fZ"] = None,
621 ) -> str:
622 tool_logger.info('INSIDE get_chat_history')
623 return helper_fun.get_time_based_history(text, f'user_{user_id}', start, end)
625 tools.append((
626 "get_chat_history",
627 "Get Chat history based on text & start & end date",
628 get_chat_history,
629 ))
631 # ------------------------------------------------------------------
632 # 16. search_visual_history
633 # ------------------------------------------------------------------
634 @log_tool_execution
635 def search_visual_history(
636 query: Annotated[str, "What to search for in visual/screen descriptions"],
637 minutes_back: Annotated[int, "How many minutes back to search (default 30)"] = 30,
638 channel: Annotated[str, "Which feed: 'camera', 'screen', or 'both' (default)"] = "both",
639 ) -> str:
640 """Search past camera/screen descriptions. Use for questions about what happened earlier visually."""
641 results = helper_fun.search_visual_history(user_id, query, mins=minutes_back, channel=channel)
642 if results:
643 return '\n'.join(results)
644 return "No matching visual/screen descriptions found in the given time range."
646 tools.append((
647 "search_visual_history",
648 "Search past camera and screen descriptions by keyword and time range.",
649 search_visual_history,
650 ))
652 # ------------------------------------------------------------------
653 # 17. google_search
654 # ------------------------------------------------------------------
655 @log_tool_execution
656 def google_search(
657 text: Annotated[str, "Text/Query which you want to search"],
658 ) -> str:
659 tool_logger.info('INSIDE google search')
660 return helper_fun.top5_results(text)
662 tools.append((
663 "google_search",
664 "web/google/bing search api tool for a given query",
665 google_search,
666 ))
668 # ------------------------------------------------------------------
669 # Conditional: SimpleMem long-term memory
670 # ------------------------------------------------------------------
671 if simplemem_store is not None:
672 from core.event_loop import get_or_create_event_loop
674 @log_tool_execution
675 def search_long_term_memory(
676 query: Annotated[str, "Natural language query to search long-term memory"],
677 ) -> str:
678 """Search compressed long-term memory using semantic retrieval."""
679 try:
680 loop = get_or_create_event_loop()
681 results = loop.run_until_complete(simplemem_store.search(query))
682 if results:
683 return results[0].content
684 return "No relevant memories found."
685 except Exception as e:
686 tool_logger.info(f"SimpleMem search error: {e}")
687 return "Memory search unavailable."
689 tools.append((
690 "search_long_term_memory",
691 "Search long-term memory for past conversations, facts, and context using natural language query.",
692 search_long_term_memory,
693 ))
695 @log_tool_execution
696 def save_to_long_term_memory(
697 content: Annotated[str, "The information/fact to remember long-term"],
698 speaker: Annotated[str, "Who said this (e.g. 'User', 'Assistant', 'System')"] = "System",
699 ) -> str:
700 """Save important information to compressed long-term memory."""
701 try:
702 loop = get_or_create_event_loop()
703 loop.run_until_complete(simplemem_store.add(content, {
704 "sender_name": speaker,
705 "user_id": user_id,
706 "prompt_id": prompt_id,
707 }))
708 return "Saved to long-term memory."
709 except Exception as e:
710 tool_logger.info(f"SimpleMem save error: {e}")
711 return "Failed to save to long-term memory."
713 tools.append((
714 "save_to_long_term_memory",
715 "Save important facts or information to long-term memory for future retrieval across sessions.",
716 save_to_long_term_memory,
717 ))
719 # ------------------------------------------------------------------
720 # Suggest_Share_Worthy_Content
721 # ------------------------------------------------------------------
722 @log_tool_execution
723 def suggest_share_worthy_content(
724 query: Annotated[str, "Any text — not used for filtering, just provide context"] = "",
725 ) -> str:
726 """Find high-engagement posts that haven't been shared much and suggest sharing them."""
727 try:
728 from integrations.social.models import get_db, Post, ShareableLink
729 from sqlalchemy import func as sa_func
731 db = get_db()
732 try:
733 share_counts = (
734 db.query(
735 ShareableLink.resource_id,
736 sa_func.count(ShareableLink.id).label('link_count'),
737 )
738 .filter(ShareableLink.resource_type == 'post')
739 .group_by(ShareableLink.resource_id)
740 .subquery()
741 )
743 posts = (
744 db.query(Post, share_counts.c.link_count)
745 .outerjoin(share_counts, Post.id == share_counts.c.resource_id)
746 .filter(
747 Post.is_deleted == False,
748 Post.is_hidden == False,
749 Post.upvotes > 5,
750 Post.comment_count > 3,
751 )
752 .filter(
753 (share_counts.c.link_count == None) | # noqa: E711
754 (share_counts.c.link_count < 3)
755 )
756 .order_by(Post.score.desc())
757 .limit(3)
758 .all()
759 )
761 if not posts:
762 return ("No under-shared high-engagement content found right now. "
763 "Keep creating great posts and the community will notice!")
765 suggestions = []
766 for post, link_count in posts:
767 title = (post.title or post.content or '')[:80].strip()
768 shares = link_count or 0
769 suggestions.append(
770 f"- \"{title}\" ({post.upvotes} upvotes, "
771 f"{post.comment_count} comments, only {shares} shares) "
772 f"[post_id: {post.id}]"
773 )
775 header = ("These posts are resonating with the community but haven't "
776 "been shared much yet. Consider sharing them:\n")
777 return header + "\n".join(suggestions)
778 finally:
779 db.close()
780 except Exception as e:
781 tool_logger.warning(f"Suggest_Share_Worthy_Content failed: {e}")
782 return f"Could not fetch share-worthy content right now: {e}"
784 tools.append((
785 "Suggest_Share_Worthy_Content",
786 "Find high-engagement posts that deserve wider reach but haven't been shared much. "
787 "Use when the user asks about content worth sharing or to proactively suggest "
788 "share-worthy community posts.",
789 suggest_share_worthy_content,
790 ))
792 # ------------------------------------------------------------------
793 # Observe_User_Experience
794 # ------------------------------------------------------------------
795 @log_tool_execution
796 def observe_user_experience(
797 input_text: Annotated[str, "JSON string with event, page, duration_ms, outcome fields"],
798 ) -> str:
799 """Record a user experience observation for self-improvement."""
800 try:
801 data = json.loads(input_text) if input_text.startswith('{') else {'event': input_text}
802 event = data.get('event', 'interaction')
803 page = data.get('page', '')
804 duration_ms = data.get('duration_ms', 0)
805 outcome = data.get('outcome', 'recorded')
807 observation = f"User {event} on {page} ({duration_ms}ms): {outcome}"
809 if memory_graph:
810 session_key = f"{user_id}_{prompt_id}" if prompt_id else str(user_id)
811 memory_id = memory_graph.register(
812 content=observation,
813 metadata={
814 'memory_type': 'observation',
815 'source_agent': 'agent',
816 'session_id': session_key,
817 'page': page,
818 'event': event,
819 },
820 context_snapshot=f"UX observation during session {session_key}",
821 )
822 return f"Observation recorded (id: {memory_id}): {observation}"
824 return f"Observation noted: {observation}"
825 except Exception as e:
826 tool_logger.warning(f"Observe_User_Experience failed: {e}")
827 return f"Observation noted: {input_text}"
829 tools.append((
830 "Observe_User_Experience",
831 "Record a user experience observation. Input: JSON with event, page, "
832 "duration_ms, outcome. Used for self-improvement and understanding user "
833 "behavior patterns.",
834 observe_user_experience,
835 ))
837 # ------------------------------------------------------------------
838 # Self_Critique_And_Enhance
839 # ------------------------------------------------------------------
840 @log_tool_execution
841 def self_critique_and_enhance(
842 input_text: Annotated[str, "Topic or area to critique"],
843 ) -> str:
844 """Review past suggestions and outcomes to improve future behavior."""
845 try:
846 if not memory_graph:
847 return f"Self-critique on '{input_text}': Will adjust future behavior based on observations."
849 session_key = f"{user_id}_{prompt_id}" if prompt_id else str(user_id)
851 # Recall past suggestions and observations
852 suggestions = memory_graph.recall(
853 input_text or 'suggestions made outcomes', mode='semantic', top_k=10,
854 )
855 observations = memory_graph.recall(
856 'user experience observation', mode='semantic', top_k=10,
857 )
859 if not suggestions and not observations:
860 return "No past interactions to critique yet. Will observe and learn."
862 # Format findings for agent reasoning
863 critique = "Self-critique findings:\n"
864 if suggestions:
865 critique += f"Past suggestions ({len(suggestions)}):\n"
866 for s in suggestions[:5]:
867 critique += f" - {s.content[:100]}\n"
868 if observations:
869 critique += f"User observations ({len(observations)}):\n"
870 for o in observations[:5]:
871 critique += f" - {o.content[:100]}\n"
873 # Store the critique itself as an insight
874 insight = f"Self-critique on: {input_text}"
875 memory_graph.register(
876 content=insight,
877 metadata={
878 'memory_type': 'insight',
879 'source_agent': 'agent',
880 'session_id': session_key,
881 'type': 'self_critique',
882 },
883 context_snapshot=f"Self-critique during session {session_key}",
884 )
886 return critique
887 except Exception as e:
888 tool_logger.warning(f"Self_Critique_And_Enhance failed: {e}")
889 return f"Self-critique on '{input_text}': Will adjust future behavior based on observations."
891 tools.append((
892 "Self_Critique_And_Enhance",
893 "Review past agent suggestions and user behavior observations to improve "
894 "future recommendations. Input: topic or area to critique. Helps the agent "
895 "learn from its own interactions.",
896 self_critique_and_enhance,
897 ))
899 # ------------------------------------------------------------------
900 # device_control — Cross-device control via PeerLink (SAME_USER only)
901 # ------------------------------------------------------------------
902 @log_tool_execution
903 def device_control(
904 action: Annotated[str, "What to do: 'turn on light', 'check temperature', 'list files', 'run command ls -la'"],
905 device_hint: Annotated[str, "Which device: 'phone', 'desktop', 'iot hub', or empty for default"] = '',
906 ) -> str:
907 """Control a device on the user's private network via PeerLink.
909 Privacy-first: only targets the user's own devices (SAME_USER trust).
910 Uses PeerLink dispatch channel with FleetCommandService fallback.
911 """
912 try:
913 # Step 1: Find the target device via DeviceRoutingService
914 target_device = None
915 try:
916 from integrations.social.models import db_session
917 from integrations.social.device_routing_service import DeviceRoutingService
918 with db_session(commit=False) as db:
919 if device_hint:
920 # Map hint to capability or form factor
921 capability = 'general'
922 if device_hint.lower() in ('phone', 'desktop', 'tablet', 'tv', 'embedded', 'robot'):
923 # Filter by form factor
924 devices = DeviceRoutingService.get_user_device_map(db, str(user_id))
925 for d in devices:
926 if d.get('form_factor', '') == device_hint.lower():
927 target_device = d
928 break
929 if not target_device:
930 target_device = DeviceRoutingService.pick_device(
931 db, str(user_id), required_capability=capability)
932 else:
933 target_device = DeviceRoutingService.pick_device(
934 db, str(user_id), required_capability='general')
935 except Exception as e:
936 tool_logger.debug(f"Device routing lookup failed: {e}")
938 target_node_id = (target_device or {}).get('device_id', '')
940 # Step 2: Try PeerLink dispatch channel (SAME_USER trust only)
941 peerlink_sent = False
942 if target_node_id:
943 try:
944 from core.peer_link.link_manager import get_link_manager
945 from core.peer_link.link import TrustLevel
946 mgr = get_link_manager()
947 link = mgr.get_link(target_node_id)
948 if link and link.trust == TrustLevel.SAME_USER:
949 result = mgr.send(
950 target_node_id, 'dispatch',
951 {'type': 'device_control', 'action': action,
952 'user_id': str(user_id)},
953 wait_response=True, timeout=30.0,
954 )
955 if result is not None:
956 peerlink_sent = True
957 msg = result.get('message', str(result))
958 return f"Device control result: {msg}"
959 elif link and link.trust != TrustLevel.SAME_USER:
960 return ("Device control blocked: target device is not a SAME_USER "
961 "trusted device. Only your own devices can be controlled.")
962 except Exception as e:
963 tool_logger.debug(f"PeerLink dispatch failed: {e}")
965 # Step 3: Fallback to FleetCommandService
966 if not peerlink_sent:
967 try:
968 from integrations.social.models import db_session
969 from integrations.social.fleet_command import FleetCommandService
970 with db_session() as db:
971 cmd = FleetCommandService.push_command(
972 db, target_node_id or 'self',
973 'device_control',
974 {'action': action, 'device_hint': device_hint},
975 )
976 if cmd:
977 return f"Device control command queued (id={cmd.get('id', '?')}): {action}"
978 return "Device control: failed to queue command"
979 except Exception as e:
980 tool_logger.debug(f"Fleet command fallback failed: {e}")
982 # Step 4: Local execution as last resort (this IS the target device)
983 try:
984 from integrations.social.fleet_command import FleetCommandService
985 result = FleetCommandService.execute_command(
986 'device_control', {'action': action})
987 if result.get('success'):
988 return f"Device control (local): {result.get('message', 'OK')}"
989 return f"Device control failed: {result.get('message', 'Unknown error')}"
990 except Exception as e:
991 return f"Device control unavailable: {e}"
993 except Exception as e:
994 tool_logger.warning(f"device_control failed: {e}")
995 return f"Device control error: {e}"
997 tools.append((
998 "device_control",
999 "Control a device on the user's private network. Actions: turn on/off lights, "
1000 "check temperature, list files, run commands. Privacy-first: only your own devices.",
1001 device_control,
1002 ))
1004 # ------------------------------------------------------------------
1005 # data_extraction_from_url — Parity with LangChain Data_Extraction_From_URL
1006 # ------------------------------------------------------------------
1007 @log_tool_execution
1008 def data_extraction_from_url(
1009 url: Annotated[str, "The URL to extract content from"],
1010 url_type: Annotated[str, "Type of URL: 'pdf' or 'website'"] = "website",
1011 ) -> str:
1012 """Extract content from a URL (PDF or website). Uses Crawl4AI or direct parsing."""
1013 try:
1014 from threadlocal import thread_local_data as _tld
1015 _uid = _tld.get_user_id() if hasattr(_tld, 'get_user_id') else user_id
1016 _rid = _tld.get_request_id() if hasattr(_tld, 'get_request_id') else None
1018 # Try Crawl4AI service first
1019 try:
1020 from integrations.service_tools import service_tool_registry
1021 crawl_tool = service_tool_registry.get_tool('Crawl4AI')
1022 if crawl_tool:
1023 result = crawl_tool.execute(url)
1024 if result:
1025 return f"Extracted from {url}:\n{str(result)[:5000]}"
1026 except Exception:
1027 pass
1029 # Fallback: direct requests
1030 import requests as _req
1031 resp = _req.get(url, timeout=30, headers={'User-Agent': 'Mozilla/5.0'})
1032 if url_type == 'pdf':
1033 return f"PDF downloaded ({len(resp.content)} bytes). Use a PDF parser for full extraction."
1034 text = resp.text[:5000]
1035 # Strip HTML tags naively
1036 import re
1037 text = re.sub(r'<[^>]+>', ' ', text)
1038 text = re.sub(r'\s+', ' ', text).strip()
1039 return f"Extracted from {url}:\n{text[:4000]}"
1040 except Exception as e:
1041 return f"URL extraction failed: {e}"
1043 tools.append((
1044 "data_extraction_from_url",
1045 "Extract content from a URL (PDF or website). Input: URL and type ('pdf' or 'website'). "
1046 "Uses Crawl4AI for rich extraction with fallback to direct HTTP fetch.",
1047 data_extraction_from_url,
1048 ))
1050 # ------------------------------------------------------------------
1051 # get_user_details — Parity with LangChain User_details_tool
1052 # ------------------------------------------------------------------
1053 @log_tool_execution
1054 def get_user_details() -> str:
1055 """Get current user's profile details."""
1056 try:
1057 uid = user_id
1058 # Try local DB first
1059 try:
1060 from integrations.social.models import get_db, User
1061 db = get_db()
1062 try:
1063 user = db.query(User).filter_by(id=str(uid)).first()
1064 if user:
1065 return json.dumps(user.to_dict(), default=str)
1066 finally:
1067 db.close()
1068 except Exception:
1069 pass
1071 # Fallback: cloud API
1072 import requests as _req
1073 resp = _req.post(
1074 'https://azurekong.hertzai.com:8443/db/getstudent_by_user_id',
1075 json={'user_id': uid}, timeout=10,
1076 )
1077 return f"User details: {resp.text}"
1078 except Exception as e:
1079 return f"Could not fetch user details: {e}"
1081 tools.append((
1082 "get_user_details",
1083 "Get the current user's profile information (name, email, preferences, etc.). "
1084 "Use when the user asks about their profile or when you need user context.",
1085 get_user_details,
1086 ))
1088 # ------------------------------------------------------------------
1089 # request_resource — Parity with LangChain Request_Resource
1090 # ------------------------------------------------------------------
1091 @log_tool_execution
1092 def request_resource(
1093 resource_description: Annotated[str, "JSON or plain text describing the needed resource. JSON format: {\"resource_type\": \"api_key\", \"key_name\": \"GOOGLE_API_KEY\", \"label\": \"Google API Key\", \"used_by\": \"search tool\", \"description\": \"needed for web search\"}"],
1094 ) -> str:
1095 """Request an API key, credential, token, or config value that is not currently available."""
1096 try:
1097 try:
1098 req = json.loads(resource_description)
1099 except (ValueError, TypeError):
1100 req = {
1101 'resource_type': 'api_key',
1102 'key_name': 'UNKNOWN',
1103 'label': resource_description[:100],
1104 'description': resource_description,
1105 'used_by': 'Agent tool',
1106 }
1108 key_name = req.get('key_name', 'UNKNOWN')
1109 resource_type = req.get('resource_type', 'api_key')
1111 # Check env vars first
1112 env_val = os.environ.get(key_name)
1113 if env_val:
1114 return f"Resource '{key_name}' is already configured and available."
1116 # Check vault
1117 try:
1118 from desktop.ai_key_vault import AIKeyVault
1119 vault = AIKeyVault.get_instance()
1120 val = vault.get_tool_key(key_name) if resource_type != 'channel_secret' else vault.get_channel_secret(req.get('channel_type', ''), key_name)
1121 if val:
1122 os.environ[key_name] = val
1123 return f"Resource '{key_name}' loaded from vault and is now available."
1124 except Exception:
1125 pass
1127 # Track as pending and request from user
1128 try:
1129 from desktop.ai_key_vault import AIKeyVault
1130 AIKeyVault.get_instance().add_pending_request(
1131 key_name=key_name, resource_type=resource_type,
1132 channel_type=req.get('channel_type', ''),
1133 label=req.get('label', key_name),
1134 description=req.get('description', ''),
1135 used_by=req.get('used_by', 'Agent tool'),
1136 )
1137 except Exception:
1138 pass
1140 secret_request = json.dumps({
1141 '__SECRET_REQUEST__': True, 'type': resource_type,
1142 'key_name': key_name, 'label': req.get('label', key_name),
1143 'description': req.get('description', f'{key_name} is required.'),
1144 'used_by': req.get('used_by', 'Agent tool'),
1145 'channel_type': req.get('channel_type', ''),
1146 })
1147 return (
1148 f"I need the user to provide '{req.get('label', key_name)}'. "
1149 f"Required for {req.get('used_by', 'a tool')}. "
1150 f"{req.get('description', '')} "
1151 f"RESOURCE_REQUEST:{secret_request}"
1152 )
1153 except Exception as e:
1154 return f"Resource request failed: {e}"
1156 tools.append((
1157 "request_resource",
1158 "Request an API key, credential, token, or config value. Checks vault/env first, "
1159 "then prompts the user if not found. Handles: API keys (OpenAI, Google, Slack), "
1160 "OAuth tokens, channel secrets, service credentials. "
1161 "Input: JSON with resource_type, key_name, label, used_by, description.",
1162 request_resource,
1163 ))
1165 # ------------------------------------------------------------------
1166 # observe_user_experience — Parity with LangChain Observe_User_Experience
1167 # ------------------------------------------------------------------
1168 @log_tool_execution
1169 def observe_user_experience(
1170 event: Annotated[str, "What happened (e.g. 'clicked', 'scrolled', 'left page')"],
1171 page: Annotated[str, "Which page or screen"] = "",
1172 outcome: Annotated[str, "What was the result or user reaction"] = "",
1173 duration_ms: Annotated[int, "How long the interaction lasted in ms"] = 0,
1174 ) -> str:
1175 """Record a user experience observation for self-improvement."""
1176 observation = f"User {event} on {page} ({duration_ms}ms): {outcome}"
1177 try:
1178 if memory_graph:
1179 session_id = f"{user_id}_{prompt_id}" if prompt_id else str(user_id)
1180 mid = memory_graph.register(
1181 content=observation,
1182 metadata={'memory_type': 'observation', 'source_agent': 'agent',
1183 'session_id': session_id, 'page': page, 'event': event},
1184 context_snapshot=f"UX observation during session {session_id}",
1185 )
1186 return f"Observation recorded (id: {mid}): {observation}"
1187 except Exception:
1188 pass
1189 return f"Observation noted: {observation}"
1191 tools.append((
1192 "observe_user_experience",
1193 "Record a user experience observation. Use to track behavior patterns "
1194 "for self-improvement. Input: event, page, outcome, duration_ms.",
1195 observe_user_experience,
1196 ))
1198 # ------------------------------------------------------------------
1199 # self_critique_and_enhance — Parity with LangChain Self_Critique_And_Enhance
1200 # ------------------------------------------------------------------
1201 @log_tool_execution
1202 def self_critique_and_enhance(
1203 topic: Annotated[str, "Topic or area to critique (e.g. 'my recommendations', 'search quality')"] = "",
1204 ) -> str:
1205 """Review past agent suggestions and user observations to improve future behavior."""
1206 try:
1207 if not memory_graph:
1208 return "Self-critique unavailable: no memory graph for this session."
1210 suggestions = memory_graph.recall(topic or 'suggestions made outcomes', mode='semantic', top_k=10)
1211 observations = memory_graph.recall('user experience observation', mode='semantic', top_k=10)
1213 if not suggestions and not observations:
1214 return "No past interactions to critique yet. Will observe and learn."
1216 critique = "Self-critique findings:\n"
1217 if suggestions:
1218 critique += f"Past suggestions ({len(suggestions)}):\n"
1219 for s in suggestions[:5]:
1220 critique += f" - {s.content[:100]}\n"
1221 if observations:
1222 critique += f"User observations ({len(observations)}):\n"
1223 for o in observations[:5]:
1224 critique += f" - {o.content[:100]}\n"
1226 session_id = f"{user_id}_{prompt_id}" if prompt_id else str(user_id)
1227 memory_graph.register(
1228 content=f"Self-critique on: {topic}",
1229 metadata={'memory_type': 'insight', 'source_agent': 'agent',
1230 'session_id': session_id, 'type': 'self_critique'},
1231 context_snapshot=f"Self-critique during session {session_id}",
1232 )
1233 return critique
1234 except Exception as e:
1235 return f"Self-critique unavailable: {e}"
1237 tools.append((
1238 "self_critique_and_enhance",
1239 "Review past agent suggestions and user behavior observations to improve "
1240 "future recommendations. Input: topic or area to critique.",
1241 self_critique_and_enhance,
1242 ))
1244 return tools
1247def register_remote_desktop_tools_if_available(ctx, helper, executor):
1248 """Register remote desktop tools if the module is available.
1250 Gracefully skips if integrations/remote_desktop is not installed.
1251 """
1252 try:
1253 from integrations.remote_desktop.agent_tools import (
1254 build_remote_desktop_tools, register_remote_desktop_tools,
1255 )
1256 rd_tools = build_remote_desktop_tools(ctx)
1257 register_remote_desktop_tools(rd_tools, helper, executor)
1258 tool_logger.info(f"Remote desktop tools registered ({len(rd_tools)} tools)")
1259 except ImportError:
1260 pass
1261 except Exception as e:
1262 tool_logger.warning(f"Remote desktop tools registration failed: {e}")
1265def register_memory_graph_tools(memory_graph, helper, executor, user_id, user_prompt):
1266 """Register MemoryGraph provenance tools if memory_graph is available.
1268 Delegates to the existing agent_memory_tools module.
1269 """
1270 if memory_graph is None:
1271 return
1272 try:
1273 from integrations.channels.memory.agent_memory_tools import create_memory_tools, register_autogen_tools
1274 mem_tools = create_memory_tools(memory_graph, str(user_id), user_prompt)
1275 register_autogen_tools(mem_tools, executor, helper)
1276 tool_logger.info(f"MemoryGraph tools registered for {user_prompt}")
1277 except Exception as e:
1278 tool_logger.warning(f"MemoryGraph tools registration failed: {e}")