Coverage for integrations / channels / bridge / wamp_bridge.py: 0.0%

268 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2WAMP Bridge for Channel Chaining 

3 

4Integrates HevolveBot channel adapters with Crossbar WAMP for: 

5- Cross-channel message forwarding 

6- Unified inbox across channels 

7- Channel-to-channel relay rules 

8- Real-time message routing via pub/sub 

9 

10Uses existing Crossbar infrastructure: 

11- com.hertzai.hevolve.channel.* topics for channel events 

12- com.hertzai.hevolve.bridge.* topics for bridge control 

13""" 

14 

15from __future__ import annotations 

16 

17import asyncio 

18import json 

19import logging 

20import os 

21from dataclasses import dataclass, field 

22from datetime import datetime 

23from enum import Enum 

24from typing import Any, Callable, Dict, List, Optional, Set 

25from functools import partial 

26 

27try: 

28 from autobahn.asyncio.component import Component 

29 from autobahn.wamp.exception import ApplicationError 

30 HAS_AUTOBAHN = True 

31except ImportError: 

32 HAS_AUTOBAHN = False 

33 

34from ..base import Message, SendResult 

35 

36 

37def _get_crossbar_port() -> int: 

38 try: 

39 from core.port_registry import get_port 

40 return get_port('crossbar') 

41 except ImportError: 

42 return 8088 

43from ..registry import ChannelRegistry 

44 

45logger = logging.getLogger(__name__) 

46 

47 

48class RouteType(Enum): 

49 """Types of message routing.""" 

50 FORWARD = "forward" # Forward message to another channel 

51 MIRROR = "mirror" # Mirror to multiple channels 

52 BROADCAST = "broadcast" # Broadcast to all channels 

53 FILTER = "filter" # Forward only matching messages 

54 TRANSFORM = "transform" # Transform before forwarding 

55 

56 

57@dataclass 

58class BridgeRule: 

59 """Rule for routing messages between channels.""" 

60 id: str 

61 name: str 

62 source_channel: str # Source channel type (telegram, discord, etc.) 

63 source_chat_id: Optional[str] = None # Specific chat or None for all 

64 target_channel: str = "" # Target channel type 

65 target_chat_id: Optional[str] = None # Target chat ID 

66 route_type: RouteType = RouteType.FORWARD 

67 enabled: bool = True 

68 

69 # Filtering options 

70 filter_pattern: Optional[str] = None # Regex pattern to match 

71 filter_sender: Optional[str] = None # Specific sender ID 

72 filter_keywords: List[str] = field(default_factory=list) 

73 

74 # Transform options 

75 prefix: str = "" # Prefix to add to forwarded messages 

76 suffix: str = "" # Suffix to add 

77 include_source_info: bool = True # Include [From: channel/user] header 

78 strip_mentions: bool = False # Remove @mentions before forwarding 

79 

80 # Rate limiting 

81 rate_limit: int = 0 # Max messages per minute (0 = unlimited) 

82 cooldown_seconds: int = 0 # Cooldown between forwards 

83 

84 # Metadata 

85 created_at: datetime = field(default_factory=datetime.now) 

86 last_triggered: Optional[datetime] = None 

87 trigger_count: int = 0 

88 

89 

90@dataclass 

91class BridgeConfig: 

92 """Configuration for channel bridge.""" 

93 # WAMP connection 

94 crossbar_url: str = "" 

95 realm: str = "realm1" 

96 

97 # Topic prefixes 

98 channel_topic_prefix: str = "com.hertzai.hevolve.channel" 

99 bridge_topic_prefix: str = "com.hertzai.hevolve.bridge" 

100 

101 # Behavior 

102 enable_broadcast: bool = True 

103 enable_unified_inbox: bool = True 

104 max_forward_chain: int = 3 # Prevent infinite loops 

105 forward_timeout: float = 10.0 # Timeout for forward operations 

106 

107 # Persistence 

108 rules_file: str = "/app/data/bridge_rules.json" 

109 

110 @classmethod 

111 def from_env(cls) -> "BridgeConfig": 

112 """Create config from environment variables.""" 

113 return cls( 

114 crossbar_url=os.getenv("CBURL", f"ws://localhost:{_get_crossbar_port()}/ws"), 

115 realm=os.getenv("CBREALM", "realm1"), 

116 rules_file=os.getenv("BRIDGE_RULES_FILE", "/app/data/bridge_rules.json"), 

117 ) 

118 

119 

120class ChannelBridge: 

121 """ 

122 Bridge for routing messages between channels via WAMP Crossbar. 

123 

124 Enables: 

125 - Forward messages from one channel to another 

126 - Mirror messages to multiple channels 

127 - Broadcast to all channels 

128 - Filter-based routing 

129 - Transform messages during forwarding 

130 

131 Example: 

132 bridge = ChannelBridge(config, registry) 

133 await bridge.connect() 

134 

135 # Add a rule to forward Telegram messages to Discord 

136 bridge.add_rule(BridgeRule( 

137 id="tg-to-discord", 

138 name="Telegram to Discord", 

139 source_channel="telegram", 

140 target_channel="discord", 

141 target_chat_id="123456789", 

142 include_source_info=True, 

143 )) 

144 """ 

145 

146 def __init__( 

147 self, 

148 config: BridgeConfig, 

149 registry: ChannelRegistry, 

150 ): 

151 self.config = config 

152 self.registry = registry 

153 self._rules: Dict[str, BridgeRule] = {} 

154 self._component: Optional[Component] = None 

155 self._session = None 

156 self._connected = False 

157 self._forward_chain: Dict[str, int] = {} # Track forward depth 

158 self._rate_limiters: Dict[str, List[datetime]] = {} 

159 self._handlers: Dict[str, Callable] = {} 

160 

161 # Load persisted rules 

162 self._load_rules() 

163 

164 def _load_rules(self) -> None: 

165 """Load rules from persistence file.""" 

166 try: 

167 if os.path.exists(self.config.rules_file): 

168 with open(self.config.rules_file, "r") as f: 

169 data = json.load(f) 

170 for rule_data in data.get("rules", []): 

171 rule = BridgeRule( 

172 id=rule_data["id"], 

173 name=rule_data["name"], 

174 source_channel=rule_data["source_channel"], 

175 source_chat_id=rule_data.get("source_chat_id"), 

176 target_channel=rule_data["target_channel"], 

177 target_chat_id=rule_data.get("target_chat_id"), 

178 route_type=RouteType(rule_data.get("route_type", "forward")), 

179 enabled=rule_data.get("enabled", True), 

180 prefix=rule_data.get("prefix", ""), 

181 suffix=rule_data.get("suffix", ""), 

182 include_source_info=rule_data.get("include_source_info", True), 

183 ) 

184 self._rules[rule.id] = rule 

185 logger.info(f"Loaded {len(self._rules)} bridge rules") 

186 except Exception as e: 

187 logger.warning(f"Could not load bridge rules: {e}") 

188 

189 def _save_rules(self) -> None: 

190 """Save rules to persistence file.""" 

191 try: 

192 os.makedirs(os.path.dirname(self.config.rules_file), exist_ok=True) 

193 data = { 

194 "rules": [ 

195 { 

196 "id": r.id, 

197 "name": r.name, 

198 "source_channel": r.source_channel, 

199 "source_chat_id": r.source_chat_id, 

200 "target_channel": r.target_channel, 

201 "target_chat_id": r.target_chat_id, 

202 "route_type": r.route_type.value, 

203 "enabled": r.enabled, 

204 "prefix": r.prefix, 

205 "suffix": r.suffix, 

206 "include_source_info": r.include_source_info, 

207 } 

208 for r in self._rules.values() 

209 ] 

210 } 

211 with open(self.config.rules_file, "w") as f: 

212 json.dump(data, f, indent=2) 

213 except Exception as e: 

214 logger.error(f"Could not save bridge rules: {e}") 

215 

216 async def connect(self) -> bool: 

217 """Connect to Crossbar WAMP router.""" 

218 if not HAS_AUTOBAHN: 

219 logger.error("autobahn not installed, cannot connect to Crossbar") 

220 return False 

221 

222 try: 

223 self._component = Component( 

224 transports=self.config.crossbar_url, 

225 realm=self.config.realm, 

226 ) 

227 

228 @self._component.on_join 

229 async def on_join(session, details): 

230 self._session = session 

231 self._connected = True 

232 logger.info("Channel bridge connected to Crossbar") 

233 

234 # Subscribe to channel events 

235 await self._setup_subscriptions() 

236 

237 # Register RPC methods 

238 await self._register_rpcs() 

239 

240 @self._component.on_leave 

241 async def on_leave(session, details): 

242 self._connected = False 

243 logger.info("Channel bridge disconnected from Crossbar") 

244 

245 # Start component in background 

246 asyncio.create_task(self._run_component()) 

247 

248 # Wait for connection 

249 for _ in range(50): # 5 seconds timeout 

250 if self._connected: 

251 return True 

252 await asyncio.sleep(0.1) 

253 

254 return False 

255 

256 except Exception as e: 

257 logger.error(f"Failed to connect to Crossbar: {e}") 

258 return False 

259 

260 async def _run_component(self) -> None: 

261 """Run the WAMP component.""" 

262 try: 

263 from autobahn.asyncio.component import run 

264 await self._component.start() 

265 except Exception as e: 

266 logger.error(f"Component error: {e}") 

267 

268 async def _setup_subscriptions(self) -> None: 

269 """Subscribe to channel message topics.""" 

270 # Subscribe to all channel message events 

271 topic = f"{self.config.channel_topic_prefix}.message" 

272 await self._session.subscribe(self._on_channel_message, topic) 

273 logger.info(f"Subscribed to {topic}") 

274 

275 # Subscribe to bridge control topic 

276 control_topic = f"{self.config.bridge_topic_prefix}.control" 

277 await self._session.subscribe(self._on_bridge_control, control_topic) 

278 logger.info(f"Subscribed to {control_topic}") 

279 

280 async def _register_rpcs(self) -> None: 

281 """Register RPC methods for bridge control.""" 

282 prefix = self.config.bridge_topic_prefix 

283 

284 await self._session.register( 

285 self.add_rule_rpc, 

286 f"{prefix}.add_rule" 

287 ) 

288 await self._session.register( 

289 self.remove_rule_rpc, 

290 f"{prefix}.remove_rule" 

291 ) 

292 await self._session.register( 

293 self.list_rules_rpc, 

294 f"{prefix}.list_rules" 

295 ) 

296 await self._session.register( 

297 self.forward_message_rpc, 

298 f"{prefix}.forward" 

299 ) 

300 

301 logger.info("Registered bridge RPC methods") 

302 

303 async def _on_channel_message(self, message_data: Dict[str, Any]) -> None: 

304 """Handle incoming channel message event.""" 

305 try: 

306 # Parse message 

307 channel = message_data.get("channel", "") 

308 chat_id = message_data.get("chat_id", "") 

309 message_id = message_data.get("message_id", "") 

310 

311 # Check forward chain depth 

312 chain_key = f"{channel}:{chat_id}:{message_id}" 

313 depth = self._forward_chain.get(chain_key, 0) 

314 

315 if depth >= self.config.max_forward_chain: 

316 logger.warning(f"Max forward chain reached for {chain_key}") 

317 return 

318 

319 # Find matching rules 

320 for rule in self._rules.values(): 

321 if not rule.enabled: 

322 continue 

323 

324 if rule.source_channel != channel: 

325 continue 

326 

327 if rule.source_chat_id and rule.source_chat_id != chat_id: 

328 continue 

329 

330 # Check rate limit 

331 if not self._check_rate_limit(rule): 

332 continue 

333 

334 # Execute forward 

335 self._forward_chain[chain_key] = depth + 1 

336 try: 

337 await self._execute_forward(rule, message_data) 

338 finally: 

339 del self._forward_chain[chain_key] 

340 

341 except Exception as e: 

342 logger.error(f"Error handling channel message: {e}") 

343 

344 async def _on_bridge_control(self, control_data: Dict[str, Any]) -> None: 

345 """Handle bridge control messages.""" 

346 action = control_data.get("action", "") 

347 

348 if action == "reload_rules": 

349 self._load_rules() 

350 elif action == "clear_rules": 

351 self._rules.clear() 

352 self._save_rules() 

353 

354 def _check_rate_limit(self, rule: BridgeRule) -> bool: 

355 """Check if rule is within rate limit.""" 

356 if rule.rate_limit <= 0: 

357 return True 

358 

359 now = datetime.now() 

360 key = rule.id 

361 

362 if key not in self._rate_limiters: 

363 self._rate_limiters[key] = [] 

364 

365 # Clean old entries 

366 window_start = now.timestamp() - 60 

367 self._rate_limiters[key] = [ 

368 t for t in self._rate_limiters[key] 

369 if t.timestamp() > window_start 

370 ] 

371 

372 # Check limit 

373 if len(self._rate_limiters[key]) >= rule.rate_limit: 

374 return False 

375 

376 self._rate_limiters[key].append(now) 

377 return True 

378 

379 async def _execute_forward( 

380 self, 

381 rule: BridgeRule, 

382 message_data: Dict[str, Any] 

383 ) -> Optional[SendResult]: 

384 """Execute a forward operation based on rule.""" 

385 try: 

386 # Get target adapter 

387 target_adapter = self.registry.get(rule.target_channel) 

388 if not target_adapter: 

389 logger.warning(f"Target channel {rule.target_channel} not found") 

390 return None 

391 

392 # Build forwarded message 

393 text = message_data.get("text", "") 

394 

395 # Apply transforms 

396 if rule.include_source_info: 

397 source_channel = message_data.get("channel", "unknown") 

398 sender_name = message_data.get("sender_name", "unknown") 

399 text = f"[From {source_channel}/{sender_name}]\n{text}" 

400 

401 if rule.prefix: 

402 text = f"{rule.prefix}{text}" 

403 if rule.suffix: 

404 text = f"{text}{rule.suffix}" 

405 

406 # Determine target chat 

407 target_chat = rule.target_chat_id 

408 if not target_chat: 

409 # Use same chat ID if not specified 

410 target_chat = message_data.get("chat_id", "") 

411 

412 # Send to target channel 

413 result = await target_adapter.send_message( 

414 chat_id=target_chat, 

415 text=text, 

416 ) 

417 

418 # Update rule stats 

419 rule.last_triggered = datetime.now() 

420 rule.trigger_count += 1 

421 

422 # Publish forward event 

423 if self._session: 

424 await self._session.publish( 

425 f"{self.config.bridge_topic_prefix}.forwarded", 

426 { 

427 "rule_id": rule.id, 

428 "source_channel": rule.source_channel, 

429 "target_channel": rule.target_channel, 

430 "success": result.success if result else False, 

431 } 

432 ) 

433 

434 return result 

435 

436 except Exception as e: 

437 logger.error(f"Forward failed for rule {rule.id}: {e}") 

438 return None 

439 

440 # Rule Management 

441 

442 def add_rule(self, rule: BridgeRule) -> None: 

443 """Add a bridge rule.""" 

444 self._rules[rule.id] = rule 

445 self._save_rules() 

446 logger.info(f"Added bridge rule: {rule.name}") 

447 

448 def remove_rule(self, rule_id: str) -> bool: 

449 """Remove a bridge rule.""" 

450 if rule_id in self._rules: 

451 del self._rules[rule_id] 

452 self._save_rules() 

453 logger.info(f"Removed bridge rule: {rule_id}") 

454 return True 

455 return False 

456 

457 def get_rule(self, rule_id: str) -> Optional[BridgeRule]: 

458 """Get a rule by ID.""" 

459 return self._rules.get(rule_id) 

460 

461 def list_rules(self) -> List[BridgeRule]: 

462 """List all rules.""" 

463 return list(self._rules.values()) 

464 

465 def enable_rule(self, rule_id: str) -> bool: 

466 """Enable a rule.""" 

467 if rule_id in self._rules: 

468 self._rules[rule_id].enabled = True 

469 self._save_rules() 

470 return True 

471 return False 

472 

473 def disable_rule(self, rule_id: str) -> bool: 

474 """Disable a rule.""" 

475 if rule_id in self._rules: 

476 self._rules[rule_id].enabled = False 

477 self._save_rules() 

478 return True 

479 return False 

480 

481 # RPC Methods 

482 

483 async def add_rule_rpc(self, rule_data: Dict[str, Any]) -> Dict[str, Any]: 

484 """RPC method to add a rule.""" 

485 try: 

486 rule = BridgeRule( 

487 id=rule_data.get("id", str(datetime.now().timestamp())), 

488 name=rule_data["name"], 

489 source_channel=rule_data["source_channel"], 

490 source_chat_id=rule_data.get("source_chat_id"), 

491 target_channel=rule_data["target_channel"], 

492 target_chat_id=rule_data.get("target_chat_id"), 

493 route_type=RouteType(rule_data.get("route_type", "forward")), 

494 ) 

495 self.add_rule(rule) 

496 return {"success": True, "rule_id": rule.id} 

497 except Exception as e: 

498 return {"success": False, "error": str(e)} 

499 

500 async def remove_rule_rpc(self, rule_id: str) -> Dict[str, Any]: 

501 """RPC method to remove a rule.""" 

502 success = self.remove_rule(rule_id) 

503 return {"success": success} 

504 

505 async def list_rules_rpc(self) -> List[Dict[str, Any]]: 

506 """RPC method to list rules.""" 

507 return [ 

508 { 

509 "id": r.id, 

510 "name": r.name, 

511 "source_channel": r.source_channel, 

512 "target_channel": r.target_channel, 

513 "enabled": r.enabled, 

514 "trigger_count": r.trigger_count, 

515 } 

516 for r in self._rules.values() 

517 ] 

518 

519 async def forward_message_rpc( 

520 self, 

521 source_channel: str, 

522 source_chat_id: str, 

523 target_channel: str, 

524 target_chat_id: str, 

525 text: str, 

526 ) -> Dict[str, Any]: 

527 """RPC method to forward a specific message.""" 

528 try: 

529 target_adapter = self.registry.get(target_channel) 

530 if not target_adapter: 

531 return {"success": False, "error": f"Channel {target_channel} not found"} 

532 

533 result = await target_adapter.send_message( 

534 chat_id=target_chat_id, 

535 text=f"[Forwarded from {source_channel}]\n{text}", 

536 ) 

537 

538 return {"success": result.success if result else False} 

539 except Exception as e: 

540 return {"success": False, "error": str(e)} 

541 

542 # Convenience methods 

543 

544 async def forward_to_all( 

545 self, 

546 source_channel: str, 

547 message: Message, 

548 exclude_source: bool = True, 

549 ) -> Dict[str, SendResult]: 

550 """Forward a message to all connected channels.""" 

551 results = {} 

552 

553 for channel_name in self.registry.list_channels(): 

554 if exclude_source and channel_name == source_channel: 

555 continue 

556 

557 adapter = self.registry.get(channel_name) 

558 if not adapter: 

559 continue 

560 

561 try: 

562 # Would need target chat IDs from config 

563 # This is a simplified version 

564 text = f"[Broadcast from {source_channel}]\n{message.text}" 

565 # result = await adapter.send_message(chat_id=???, text=text) 

566 # results[channel_name] = result 

567 except Exception as e: 

568 logger.error(f"Broadcast to {channel_name} failed: {e}") 

569 

570 return results 

571 

572 async def publish_to_wamp( 

573 self, 

574 channel: str, 

575 message: Message, 

576 ) -> None: 

577 """Publish a channel message to WAMP for other services.""" 

578 if not self._session: 

579 return 

580 

581 topic = f"{self.config.channel_topic_prefix}.message" 

582 await self._session.publish(topic, { 

583 "channel": channel, 

584 "chat_id": message.chat_id, 

585 "message_id": message.id, 

586 "sender_id": message.sender_id, 

587 "sender_name": message.sender_name, 

588 "text": message.text, 

589 "timestamp": message.timestamp.isoformat(), 

590 }) 

591 

592 async def disconnect(self) -> None: 

593 """Disconnect from Crossbar.""" 

594 self._connected = False 

595 if self._component: 

596 try: 

597 await self._component.stop() 

598 except Exception: 

599 pass 

600 self._session = None 

601 logger.info("Channel bridge disconnected") 

602 

603 

604def create_channel_bridge( 

605 registry: ChannelRegistry, 

606 crossbar_url: Optional[str] = None, 

607 realm: Optional[str] = None, 

608) -> ChannelBridge: 

609 """Factory function to create a channel bridge. 

610 

611 Args: 

612 registry: Channel registry with adapters 

613 crossbar_url: Crossbar WebSocket URL 

614 realm: WAMP realm 

615 

616 Returns: 

617 Configured ChannelBridge instance 

618 """ 

619 config = BridgeConfig( 

620 crossbar_url=crossbar_url or os.getenv("CBURL", "ws://localhost:8088/ws"), 

621 realm=realm or os.getenv("CBREALM", "realm1"), 

622 ) 

623 return ChannelBridge(config, registry)