Coverage for integrations / channels / agent_tools.py: 31.3%

259 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2AutoGen tools for channel operations — used by HART agents. 

3 

4Follows the same pattern as core/agent_tools.py: 

5 - build_channel_tool_closures(ctx) → list of (name, desc, func) tuples 

6 - register_channel_tools(helper, executor, ctx) → registers on autogen agents 

7 

8Allows agents to: 

91. Send messages to specific channels or broadcast to all 

102. Register/connect new channels via natural language 

113. List connected channels and their status 

124. Get current channel context (where the message came from) 

13 

14All tools reuse existing infrastructure: 

15- ChannelResponseRouter for sending 

16- AdminAPI singleton for registration 

17- UserChannelBinding for bindings 

18- thread_local_data for channel context 

19""" 

20 

21import json 

22import logging 

23from typing import Annotated, Optional 

24 

25logger = logging.getLogger(__name__) 

26 

27 

28# --------------------------------------------------------------------------- 

29# Helpers (shared by all closures) 

30# --------------------------------------------------------------------------- 

31 

32def _get_channel_context(): 

33 """Read the current channel context from thread-local storage.""" 

34 try: 

35 from threadlocal import thread_local_data 

36 return getattr(thread_local_data, 'channel_context', None) 

37 except Exception: 

38 return None 

39 

40 

41def _get_user_id_from_threadlocal(): 

42 """Get current user_id from thread-local.""" 

43 try: 

44 from threadlocal import thread_local_data 

45 return thread_local_data.get_user_id() 

46 except Exception: 

47 return None 

48 

49 

50# --------------------------------------------------------------------------- 

51# Tool closure factory 

52# --------------------------------------------------------------------------- 

53 

54def build_channel_tool_closures(ctx): 

55 """Build session-scoped channel tool closures. 

56 

57 Args: 

58 ctx: dict with at least 'user_id', 'prompt_id'. 

59 Optional: 'log_tool_execution' decorator, 'send_message_to_user1' func. 

60 

61 Returns: 

62 list of (name, description, func) tuples — same format as core/agent_tools.py 

63 """ 

64 user_id = ctx.get('user_id') 

65 log_tool_execution = ctx.get('log_tool_execution') or (lambda f: f) 

66 

67 tools = [] 

68 

69 # ------------------------------------------------------------------ 

70 # 1. send_to_channel 

71 # ------------------------------------------------------------------ 

72 @log_tool_execution 

73 def send_to_channel( 

74 channel_type: Annotated[str, "Channel name (telegram, discord, slack, etc.) or 'all' to broadcast"], 

75 message: Annotated[str, "The message text to send"], 

76 chat_id: Annotated[Optional[str], "Target chat ID. Use 'all' to send to all bindings for this channel"] = "all", 

77 ) -> str: 

78 """Send a message to a specific messaging channel or broadcast to all connected channels.""" 

79 try: 

80 uid = user_id or _get_user_id_from_threadlocal() 

81 

82 if channel_type.lower() == 'all' or chat_id.lower() == 'all': 

83 from integrations.channels.response.router import get_response_router 

84 router = get_response_router() 

85 router.route_response( 

86 user_id=uid, 

87 response_text=message, 

88 channel_context=_get_channel_context(), 

89 fan_out=True, 

90 ) 

91 return f"Message broadcast to all connected channels for user {uid}." 

92 

93 from integrations.channels.registry import get_registry 

94 import asyncio 

95 registry = get_registry() 

96 loop = getattr(registry, '_loop', None) 

97 

98 if loop and loop.is_running(): 

99 future = asyncio.run_coroutine_threadsafe( 

100 registry.send_to_channel(channel_type, chat_id, message), 

101 loop, 

102 ) 

103 result = future.result(timeout=30) 

104 if result.success: 

105 return f"Message sent to {channel_type}:{chat_id} successfully." 

106 else: 

107 return f"Failed to send to {channel_type}: {result.error}" 

108 else: 

109 return f"Channel adapters not running. Message queued for delivery." 

110 

111 except Exception as e: 

112 logger.error("send_to_channel error: %s", e) 

113 return f"Error sending message: {e}" 

114 

115 tools.append(( 

116 "send_to_channel", 

117 "Send a message to a specific messaging channel (Telegram, Discord, Slack, WhatsApp, etc.) " 

118 "or broadcast to all connected channels. Use channel_type='all' to broadcast. " 

119 "Examples: send_to_channel('telegram', 'Task complete!', '123456') or " 

120 "send_to_channel('all', 'Important update for all channels.')", 

121 send_to_channel, 

122 )) 

123 

124 # ------------------------------------------------------------------ 

125 # 2. register_channel 

126 # ------------------------------------------------------------------ 

127 @log_tool_execution 

128 def register_channel( 

129 channel_type: Annotated[str, "Channel to register (telegram, discord, slack, whatsapp, etc.)"], 

130 config_json: Annotated[str, "JSON config with required credentials, e.g. '{\"bot_token\": \"123:ABC\"}'"], 

131 ) -> str: 

132 """Register and connect a new messaging channel. Creates config, enables it, and creates a user binding.""" 

133 try: 

134 channel_type = channel_type.lower().strip() 

135 

136 from integrations.channels.metadata import get_channel_metadata, list_all_channels 

137 meta = get_channel_metadata(channel_type) 

138 if not meta: 

139 available = ', '.join(sorted(list_all_channels().keys())) 

140 return f"Unknown channel '{channel_type}'. Available channels: {available}" 

141 

142 try: 

143 config = json.loads(config_json) 

144 except json.JSONDecodeError: 

145 # If user just pasted a token, try to assign it to the first 

146 # USER-VISIBLE field (skip auto:True infrastructure fields 

147 # like WhatsApp's api_url/access_token — the user wouldn't 

148 # paste a WAHA URL when prompted for "WhatsApp number"). 

149 fields = [f for f in meta.get('setup_fields', []) 

150 if not f.get('auto')] 

151 if fields: 

152 config = {fields[0]['key']: config_json.strip()} 

153 else: 

154 return f"Could not parse config. Expected JSON. Required fields: {[f['key'] for f in meta.get('setup_fields', [])]}" 

155 

156 # Auto-fill auto:True fields from env-var defaults so the 

157 # user doesn't have to know about gateway infrastructure 

158 # (WAHA api_url for WhatsApp, etc). Order: 

159 # 1. config[key] explicitly supplied by caller — wins 

160 # 2. WHATSAPP_<KEY_UPPER> env var — operator override 

161 # (e.g. WHATSAPP_API_URL=https://my-waha.example.com) 

162 # 3. setup_fields[].default — schema default 

163 # 4. '' if no default 

164 # Single helper inside this closure — DRY across all 

165 # auto-fill paths in register_channel. 

166 import os 

167 env_prefix = f"{channel_type.upper()}_" 

168 for f in meta.get('setup_fields', []) or []: 

169 if not f.get('auto'): 

170 continue 

171 key = f.get('key') 

172 if not key or config.get(key) not in (None, ''): 

173 continue 

174 env_val = os.getenv(env_prefix + key.upper()) 

175 if env_val is not None: 

176 config[key] = env_val 

177 elif 'default' in f: 

178 config[key] = f['default'] 

179 else: 

180 config[key] = '' 

181 

182 # Save via admin API singleton 

183 from integrations.channels.admin.api import get_api 

184 api = get_api() 

185 

186 if channel_type in api._channels: 

187 api._channels[channel_type].update({'config': config, 'enabled': True}) 

188 else: 

189 api._channels[channel_type] = { 

190 'channel_type': channel_type, 

191 'name': meta['display_name'], 

192 'enabled': True, 

193 'config': config, 

194 } 

195 api._save_config() 

196 

197 # Create user binding 

198 uid = user_id or _get_user_id_from_threadlocal() 

199 if uid: 

200 try: 

201 from integrations.social.models import get_db, UserChannelBinding 

202 db = get_db() 

203 try: 

204 existing = db.query(UserChannelBinding).filter_by( 

205 user_id=str(uid), channel_type=channel_type, 

206 ).first() 

207 if not existing: 

208 db.add(UserChannelBinding( 

209 user_id=str(uid), 

210 channel_type=channel_type, 

211 channel_sender_id='agent_registered', 

212 auth_method=meta['auth_method'], 

213 is_active=True, 

214 )) 

215 else: 

216 existing.is_active = True 

217 db.commit() 

218 finally: 

219 db.close() 

220 except Exception as e: 

221 logger.debug("Binding creation during registration: %s", e) 

222 

223 required_fields = [f['key'] for f in meta.get('setup_fields', [])] 

224 missing = [f for f in required_fields if f not in config] 

225 if missing: 

226 return (f"{meta['display_name']} registered with partial config. " 

227 f"Missing: {missing}. Complete setup in the Channels page.") 

228 

229 # PR P.4 — best-effort adapter probe so we surface a toast 

230 # the moment the credential turns out to be wrong. Runs 

231 # in a daemon thread with its own event loop so: 

232 # - the agent-tool return is not delayed by the probe 

233 # (some adapters open long-lived sockets — sub-second 

234 # to seconds depending on provider RTT); 

235 # - the loop we own is closed only after the connect() 

236 # coroutine actually exits, avoiding the dangling- 

237 # adapter / loop-closed-mid-task class of bug; 

238 # - on failure we emit a Liquid UI toast (handled by 

239 # AgentOverlay's case 'toast' renderer) so the user 

240 # sees actionable feedback in chat. 

241 # 

242 # The registration itself stays committed — the toast is 

243 # advisory, not authoritative; operator can fix in admin. 

244 try: 

245 from integrations.channels.registry import get_registry 

246 registry = get_registry() 

247 adapter = registry.get(channel_type) if registry else None 

248 if adapter is not None: 

249 import threading as _threading 

250 _probe_uid = ( 

251 user_id or _get_user_id_from_threadlocal() or 'system' 

252 ) 

253 _probe_meta = meta # capture for the thread closure 

254 

255 def _probe_in_thread(): 

256 import asyncio as _asyncio 

257 loop = _asyncio.new_event_loop() 

258 _asyncio.set_event_loop(loop) 

259 try: 

260 loop.run_until_complete( 

261 _asyncio.wait_for(adapter.connect(), timeout=10), 

262 ) 

263 except Exception as probe_err: 

264 logger.info( 

265 "register_channel: adapter probe failed " 

266 "for %s: %s", channel_type, probe_err, 

267 ) 

268 try: 

269 from core.platform.service_registry import ( 

270 ServiceRegistry, 

271 ) 

272 _lui = ServiceRegistry.get('LiquidUIService') 

273 if _lui: 

274 _lui.agent_ui_update(_probe_uid, { 

275 'type': 'toast', 

276 'severity': 'error', 

277 'channel': channel_type, 

278 'channel_type': channel_type, 

279 'text': ( 

280 f"{_probe_meta.get('display_name') or channel_type} " 

281 f"couldn't connect: " 

282 f"{str(probe_err)[:120]}" 

283 ), 

284 }) 

285 except Exception as toast_err: 

286 logger.debug( 

287 "Probe-failure toast emit skipped: %s", 

288 toast_err, 

289 ) 

290 finally: 

291 loop.close() 

292 

293 _threading.Thread( 

294 target=_probe_in_thread, 

295 name=f'channel-probe-{channel_type}', 

296 daemon=True, 

297 ).start() 

298 except Exception as e: 

299 logger.debug("Probe thread spawn skipped: %s", e) 

300 

301 return (f"{meta['display_name']} registered and enabled! " 

302 f"Auth: {meta['auth_method']}. " 

303 f"Adapter will connect on restart or via the Channels page.") 

304 

305 except Exception as e: 

306 logger.error("register_channel error: %s", e) 

307 return f"Error registering channel: {e}" 

308 

309 tools.append(( 

310 "register_channel", 

311 "Register and connect a new messaging channel. Use when the user wants to connect " 

312 "a Telegram bot, Discord bot, Slack app, or any of the 31 supported channels. " 

313 "Example: register_channel('telegram', '{\"bot_token\": \"123456:ABC-DEF\"}') or " 

314 "register_channel('slack', '{\"bot_token\": \"xoxb-...\", \"signing_secret\": \"...\"}').", 

315 register_channel, 

316 )) 

317 

318 # ------------------------------------------------------------------ 

319 # 3. list_channels 

320 # ------------------------------------------------------------------ 

321 @log_tool_execution 

322 def list_channels() -> str: 

323 """List all connected messaging channels, their status, and user's channel bindings.""" 

324 try: 

325 uid = user_id or _get_user_id_from_threadlocal() 

326 lines = [] 

327 

328 from integrations.channels.registry import get_registry 

329 registry = get_registry() 

330 status = registry.get_status() 

331 

332 if status: 

333 lines.append("**Active Channel Adapters:**") 

334 for name, st in status.items(): 

335 state = 'Connected' if st.connected else 'Disconnected' 

336 lines.append(f"- {name}: {state}") 

337 else: 

338 lines.append("No channel adapters currently running.") 

339 

340 if uid: 

341 try: 

342 from integrations.social.models import get_db, UserChannelBinding 

343 db = get_db() 

344 try: 

345 bindings = db.query(UserChannelBinding).filter_by( 

346 user_id=str(uid), is_active=True, 

347 ).all() 

348 if bindings: 

349 lines.append("\n**Your Channel Bindings:**") 

350 for b in bindings: 

351 pref = ' (preferred)' if b.is_preferred else '' 

352 lines.append(f"- {b.channel_type}: {b.channel_sender_id or 'linked'}{pref}") 

353 finally: 

354 db.close() 

355 except Exception: 

356 pass 

357 

358 ctx = _get_channel_context() 

359 if ctx: 

360 lines.append(f"\n**Current message from:** {ctx.get('channel', 'unknown')} " 

361 f"(sender: {ctx.get('sender_name', ctx.get('sender_id', 'unknown'))})") 

362 

363 return '\n'.join(lines) if lines else "No channel information available." 

364 except Exception as e: 

365 return f"Error listing channels: {e}" 

366 

367 tools.append(( 

368 "list_channels", 

369 "List all connected messaging channels, their connection status, and the user's " 

370 "channel bindings. Use when asked about connected channels or channel status.", 

371 list_channels, 

372 )) 

373 

374 # ------------------------------------------------------------------ 

375 # 4. get_channel_context 

376 # ------------------------------------------------------------------ 

377 @log_tool_execution 

378 def get_channel_context() -> str: 

379 """Get info about which channel the current message was sent from.""" 

380 ctx = _get_channel_context() 

381 if not ctx: 

382 return "This message was sent from the direct web/desktop chat (no external channel)." 

383 return (f"Channel: {ctx.get('channel', 'unknown')}\n" 

384 f"Sender: {ctx.get('sender_name', 'unknown')} (ID: {ctx.get('sender_id', 'unknown')})\n" 

385 f"Chat ID: {ctx.get('chat_id', 'unknown')}\n" 

386 f"Group message: {ctx.get('is_group', False)}") 

387 

388 tools.append(( 

389 "get_channel_context", 

390 "Get information about which messaging channel the current message was sent from. " 

391 "Returns channel type, sender name, chat ID, and whether it's a group message. " 

392 "Use to tailor responses for the originating channel.", 

393 get_channel_context, 

394 )) 

395 

396 # ------------------------------------------------------------------ 

397 # 5. send_install_link (cross-device handoff — Phase 1) 

398 # ------------------------------------------------------------------ 

399 # 

400 # When a user says "send Nunba to my phone" / "I want this on my work 

401 # laptop", the agent dispatches the install link to ONE of the user's 

402 # PAIRED channels. The tool enforces three guarantees: 

403 # 

404 # 1. No cross-user spam: the destination chat_id MUST belong to a 

405 # currently-active UserChannelBinding for the *caller's* user_id. 

406 # Alice cannot resolve / target Bob's bindings. 

407 # 2. URL allowlist: if `install_link` is provided as an override, 

408 # it MUST resolve to a host in `core.install_links.ALLOWED_HOSTS` 

409 # (github.com / play.google.com / apps.apple.com / hevolve.ai / 

410 # testflight.apple.com). Otherwise the canonical mapping is used. 

411 # 3. Explicit consent: the tool description (read by the LLM) tells 

412 # it to confirm the channel choice with the user FIRST. We don't 

413 # enforce this in code — the system prompt + this description do. 

414 # 

415 # See `core/install_links.py` for the canonical (device, locale) → URL 

416 # table. See `tests/unit/test_install_handoff.py` for the FT/NFT 

417 # coverage. 

418 

419 @log_tool_execution 

420 def send_install_link( 

421 channel_type: Annotated[str, "Channel to dispatch through: telegram, discord, whatsapp, slack, signal, web, email"], 

422 target_device: Annotated[str, "Device the user wants Nunba on: android, ios, windows, macos, linux"], 

423 chat_id: Annotated[Optional[str], "Specific chat_id from one of the user's bindings; if omitted, uses the user's preferred binding for that channel"] = None, 

424 install_link: Annotated[Optional[str], "Optional URL override; MUST be on the allowlist (github.com / play.google.com / apps.apple.com / hevolve.ai / testflight.apple.com). If omitted, the canonical link for target_device is used."] = None, 

425 locale: Annotated[str, "BCP-47 locale tag for localized install pages; 'default' falls back to the global URL"] = 'default', 

426 ) -> str: 

427 """Send a Nunba install link for `target_device` through `channel_type`. 

428 

429 Use ONLY when the user has explicitly asked to install / set up / 

430 get / send Nunba on another device AND has confirmed which channel 

431 to use. Never auto-dispatch — always confirm first. 

432 """ 

433 try: 

434 from core.install_links import ( 

435 get_install_link, 

436 is_allowed_install_link, 

437 is_supported_device, 

438 is_supported_install_channel, 

439 ) 

440 

441 channel_type_n = (channel_type or '').lower().strip() 

442 target_n = (target_device or '').lower().strip() 

443 

444 if not is_supported_install_channel(channel_type_n): 

445 return ( 

446 f"Error: '{channel_type}' is not a supported install-handoff " 

447 f"channel. Allowed: telegram, discord, whatsapp, slack, signal, " 

448 f"web, email." 

449 ) 

450 if not is_supported_device(target_n): 

451 return ( 

452 f"Error: '{target_device}' is not a supported target device. " 

453 f"Allowed: android, ios, windows, macos, linux." 

454 ) 

455 

456 # Resolve the URL 

457 if install_link: 

458 if not is_allowed_install_link(install_link): 

459 return ( 

460 "Error: install_link override is not on the allowlist. " 

461 "Allowed hosts: github.com, play.google.com, " 

462 "apps.apple.com, hevolve.ai, testflight.apple.com." 

463 ) 

464 url = install_link 

465 else: 

466 url = get_install_link(target_n, locale) 

467 if not url: 

468 return ( 

469 f"Error: no canonical install link configured for " 

470 f"target_device={target_n}, locale={locale}." 

471 ) 

472 

473 # Resolve the destination chat_id from the caller's bindings 

474 # only. Cross-user lookups are impossible by construction: 

475 # we filter by user_id == caller. 

476 uid = user_id or _get_user_id_from_threadlocal() 

477 if not uid: 

478 return ( 

479 "Error: cannot identify the requesting user; refusing " 

480 "to dispatch install link without an authenticated " 

481 "session." 

482 ) 

483 

484 resolved_chat_id = chat_id 

485 if not resolved_chat_id: 

486 try: 

487 from integrations.social.models import ( 

488 get_db, UserChannelBinding, 

489 ) 

490 db = get_db() 

491 try: 

492 q = db.query(UserChannelBinding).filter_by( 

493 user_id=str(uid), 

494 channel_type=channel_type_n, 

495 is_active=True, 

496 ) 

497 # Prefer the explicitly-flagged preferred binding 

498 binding = q.filter_by(is_preferred=True).first() or q.first() 

499 if not binding: 

500 return ( 

501 f"You don't have a paired {channel_type_n} " 

502 f"yet. Open the Channels page to connect " 

503 f"one, then I can send the install link there." 

504 ) 

505 resolved_chat_id = ( 

506 binding.channel_chat_id 

507 or binding.channel_sender_id 

508 ) 

509 finally: 

510 db.close() 

511 except Exception as e: 

512 logger.error("send_install_link binding lookup error: %s", e) 

513 return ( 

514 f"Error: could not resolve a {channel_type_n} " 

515 f"binding for the requesting user." 

516 ) 

517 else: 

518 # Caller passed an explicit chat_id — verify it belongs to 

519 # this user, NOT to someone else (no-spam guarantee). 

520 try: 

521 from integrations.social.models import ( 

522 get_db, UserChannelBinding, 

523 ) 

524 db = get_db() 

525 try: 

526 owns = db.query(UserChannelBinding).filter_by( 

527 user_id=str(uid), 

528 channel_type=channel_type_n, 

529 is_active=True, 

530 ).filter( 

531 (UserChannelBinding.channel_chat_id == resolved_chat_id) 

532 | (UserChannelBinding.channel_sender_id == resolved_chat_id) 

533 ).first() 

534 if not owns: 

535 return ( 

536 f"Refusing to send: chat_id {resolved_chat_id} " 

537 f"is not bound to your account on " 

538 f"{channel_type_n}." 

539 ) 

540 finally: 

541 db.close() 

542 except Exception as e: 

543 logger.error("send_install_link ownership check error: %s", e) 

544 return f"Error: could not verify chat_id ownership: {e}" 

545 

546 # Compose the message — short, friendly, links open natively 

547 message = ( 

548 f"Here's the Nunba install link for your {target_n} device:\n" 

549 f"{url}\n\n" 

550 f"Open it on the {target_n} device and follow the prompts. " 

551 f"Reply here if you hit any issue during setup." 

552 ) 

553 

554 # Dispatch via the registry (re-uses the same plumbing as 

555 # send_to_channel) so all channel adapters share one path. 

556 from integrations.channels.registry import get_registry 

557 import asyncio 

558 registry = get_registry() 

559 loop = getattr(registry, '_loop', None) 

560 

561 if loop and loop.is_running(): 

562 future = asyncio.run_coroutine_threadsafe( 

563 registry.send_to_channel( 

564 channel_type_n, resolved_chat_id, message, 

565 ), 

566 loop, 

567 ) 

568 result = future.result(timeout=30) 

569 if getattr(result, 'success', False): 

570 msg_id = ( 

571 getattr(result, 'message_id', None) 

572 or getattr(result, 'id', None) 

573 or '' 

574 ) 

575 logger.info( 

576 "send_install_link OK uid=%s ch=%s dev=%s url=%s msg=%s", 

577 uid, channel_type_n, target_n, url, msg_id, 

578 ) 

579 return ( 

580 f"Install link for {target_n} sent via " 

581 f"{channel_type_n}." 

582 ) 

583 return ( 

584 f"Failed to send via {channel_type_n}: " 

585 f"{getattr(result, 'error', 'unknown error')}" 

586 ) 

587 

588 # Adapter loop not running — return a graceful failure rather 

589 # than silently dropping. The user / agent can retry. 

590 return ( 

591 f"Channel adapters are not running right now. Try again " 

592 f"in a moment, or pick another channel." 

593 ) 

594 except Exception as e: 

595 logger.error("send_install_link error: %s", e) 

596 return f"Error sending install link: {e}" 

597 

598 tools.append(( 

599 "send_install_link", 

600 "Send a Nunba install link to one of the user's PAIRED channels " 

601 "(Telegram / Discord / WhatsApp / Slack / Signal / Web / Email). " 

602 "Call this when the user explicitly asks to install / set up / get / " 

603 "send Nunba on another device. Always CONFIRM the channel and target " 

604 "device with the user before calling — never auto-dispatch. " 

605 "target_device must be one of: android, ios, windows, macos, linux. " 

606 "Example: send_install_link('telegram', 'android') sends the Play " 

607 "Store link via the user's preferred Telegram binding. The tool " 

608 "refuses to send to a chat_id that is not bound to the requesting " 

609 "user (no cross-user spam) and refuses install_link overrides that " 

610 "are not on the host allowlist (no phishing-URL injection).", 

611 send_install_link, 

612 )) 

613 

614 # ------------------------------------------------------------------ 

615 # disconnect_channel (PR P.5) 

616 # ------------------------------------------------------------------ 

617 @log_tool_execution 

618 def disconnect_channel( 

619 channel_type: Annotated[str, "Channel to disconnect (telegram, discord, slack, ...)"], 

620 ) -> str: 

621 """Disconnect the user's binding for a channel. Marks the 

622 UserChannelBinding row inactive (same row the register_channel 

623 path created) — the adapter stops being used for this user but 

624 the channel-wide config and other users' bindings stay intact. 

625 Single owner of the binding lifecycle: register_channel writes, 

626 disconnect_channel reverses. 

627 """ 

628 try: 

629 channel_type = channel_type.lower().strip() 

630 from integrations.channels.metadata import get_channel_metadata 

631 meta = get_channel_metadata(channel_type) 

632 if not meta: 

633 return f"Unknown channel '{channel_type}'." 

634 uid = user_id or _get_user_id_from_threadlocal() 

635 if not uid: 

636 return "Could not determine the current user." 

637 from integrations.social.models import get_db, UserChannelBinding 

638 db = get_db() 

639 try: 

640 row = db.query(UserChannelBinding).filter_by( 

641 user_id=str(uid), channel_type=channel_type, is_active=True, 

642 ).first() 

643 if not row: 

644 return ( 

645 f"No active {meta['display_name']} binding to disconnect." 

646 ) 

647 row.is_active = False 

648 db.commit() 

649 finally: 

650 db.close() 

651 # User-visible toast confirming the action. 

652 try: 

653 from core.platform.service_registry import ServiceRegistry 

654 _lui = ServiceRegistry.get('LiquidUIService') 

655 if _lui: 

656 _lui.agent_ui_update(uid, { 

657 'type': 'toast', 'severity': 'info', 

658 'channel': channel_type, 'channel_type': channel_type, 

659 'text': f"{meta['display_name']} disconnected.", 

660 }) 

661 except Exception as e: 

662 logger.debug("disconnect toast emit skipped: %s", e) 

663 return ( 

664 f"{meta['display_name']} disconnected. Run " 

665 f"reconnect_channel('{channel_type}') to bring it back." 

666 ) 

667 except Exception as e: 

668 logger.error("disconnect_channel error: %s", e) 

669 return f"Error disconnecting channel: {e}" 

670 

671 tools.append(( 

672 "disconnect_channel", 

673 "Disconnect the user's existing binding for a channel. Use when " 

674 "the user wants to stop using a previously connected channel " 

675 "(Telegram, Discord, WhatsApp, etc.). Reversible via " 

676 "reconnect_channel. Example: disconnect_channel('telegram').", 

677 disconnect_channel, 

678 )) 

679 

680 # ------------------------------------------------------------------ 

681 # reconnect_channel (PR P.6) 

682 # ------------------------------------------------------------------ 

683 @log_tool_execution 

684 def reconnect_channel( 

685 channel_type: Annotated[str, "Channel to reconnect"], 

686 ) -> str: 

687 """Re-activate a previously disconnected binding (or trigger a 

688 fresh connection flow if no inactive binding exists). Single 

689 flow: if an inactive binding exists, flip is_active back to True. 

690 Otherwise re-emit the form / qr_pair / oauth_link prompt the 

691 user originally went through — same Connect_Channel pipeline, 

692 no parallel re-onboarding code path. 

693 """ 

694 try: 

695 channel_type = channel_type.lower().strip() 

696 from integrations.channels.metadata import get_channel_metadata 

697 meta = get_channel_metadata(channel_type) 

698 if not meta: 

699 return f"Unknown channel '{channel_type}'." 

700 uid = user_id or _get_user_id_from_threadlocal() 

701 if not uid: 

702 return "Could not determine the current user." 

703 from integrations.social.models import get_db, UserChannelBinding 

704 db = get_db() 

705 try: 

706 row = db.query(UserChannelBinding).filter_by( 

707 user_id=str(uid), channel_type=channel_type, 

708 ).first() 

709 if row and not row.is_active: 

710 row.is_active = True 

711 db.commit() 

712 return ( 

713 f"{meta['display_name']} reconnected (existing binding " 

714 f"reactivated). The adapter will start using it on " 

715 f"the next message tick." 

716 ) 

717 finally: 

718 db.close() 

719 # No prior binding (or already active) — bounce the user 

720 # through the standard Connect_Channel onboarding so they 

721 # can re-paste a token / scan a QR / click OAuth, exactly 

722 # like a first-time setup. No parallel onboarding path. 

723 return ( 

724 f"No inactive {meta['display_name']} binding found. " 

725 f"Run Connect_Channel('{channel_type}') to start a fresh setup." 

726 ) 

727 except Exception as e: 

728 logger.error("reconnect_channel error: %s", e) 

729 return f"Error reconnecting channel: {e}" 

730 

731 tools.append(( 

732 "reconnect_channel", 

733 "Re-enable a previously disconnected channel binding. If the " 

734 "binding row exists but is inactive, this flips it back on. " 

735 "Otherwise the user is bounced through the standard " 

736 "Connect_Channel flow (form / QR / OAuth, depending on the " 

737 "channel) to re-establish credentials. Example: " 

738 "reconnect_channel('discord').", 

739 reconnect_channel, 

740 )) 

741 

742 return tools 

743 

744 

745# --------------------------------------------------------------------------- 

746# Registration helper (mirrors core/agent_tools.register_core_tools) 

747# --------------------------------------------------------------------------- 

748 

749def register_channel_tools(helper, executor, ctx=None): 

750 """Register channel tools on an AutoGen helper/executor pair. 

751 

752 Args: 

753 helper: AutoGen agent that suggests tool use (register_for_llm) 

754 executor: AutoGen agent that executes tools (register_for_execution) 

755 ctx: optional dict with 'user_id', 'prompt_id', 'log_tool_execution' 

756 """ 

757 if ctx is None: 

758 ctx = {} 

759 # Try to get user_id from thread-local if not in ctx 

760 uid = _get_user_id_from_threadlocal() 

761 if uid: 

762 ctx['user_id'] = uid 

763 

764 tools = build_channel_tool_closures(ctx) 

765 from core.agent_tools import register_core_tools 

766 register_core_tools(tools, helper, executor)