Coverage for integrations / channels / plugins / http_server.py: 0.0%

192 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HTTP Server for Plugin System. 

3 

4Provides a lightweight HTTP server that plugins can use to register 

5custom routes and endpoints. 

6""" 

7 

8import logging 

9import json 

10import threading 

11from http.server import HTTPServer, BaseHTTPRequestHandler 

12from typing import Any, Callable, Dict, List, Optional, Tuple 

13from dataclasses import dataclass, field 

14from urllib.parse import urlparse, parse_qs 

15from enum import Enum 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20class HTTPMethod(Enum): 

21 """HTTP methods.""" 

22 GET = "GET" 

23 POST = "POST" 

24 PUT = "PUT" 

25 DELETE = "DELETE" 

26 PATCH = "PATCH" 

27 OPTIONS = "OPTIONS" 

28 HEAD = "HEAD" 

29 

30 

31@dataclass 

32class Route: 

33 """Represents an HTTP route.""" 

34 path: str 

35 method: HTTPMethod 

36 handler: Callable 

37 plugin_name: str 

38 description: str = "" 

39 

40 def matches(self, path: str, method: str) -> bool: 

41 """Check if this route matches the given path and method.""" 

42 return self.path == path and self.method.value == method 

43 

44 

45@dataclass 

46class Request: 

47 """Represents an HTTP request.""" 

48 method: str 

49 path: str 

50 query_params: Dict[str, List[str]] 

51 headers: Dict[str, str] 

52 body: Optional[bytes] = None 

53 

54 def json(self) -> Any: 

55 """Parse body as JSON.""" 

56 if self.body: 

57 return json.loads(self.body.decode('utf-8')) 

58 return None 

59 

60 

61@dataclass 

62class Response: 

63 """Represents an HTTP response.""" 

64 status_code: int = 200 

65 body: Any = None 

66 headers: Dict[str, str] = field(default_factory=dict) 

67 content_type: str = "application/json" 

68 

69 def to_bytes(self) -> bytes: 

70 """Convert response body to bytes.""" 

71 if self.body is None: 

72 return b"" 

73 if isinstance(self.body, bytes): 

74 return self.body 

75 if isinstance(self.body, str): 

76 return self.body.encode('utf-8') 

77 return json.dumps(self.body).encode('utf-8') 

78 

79 

80class PluginHTTPServer: 

81 """ 

82 HTTP server for plugin routes. 

83 

84 Allows plugins to register custom HTTP endpoints and handles 

85 routing requests to the appropriate handlers. 

86 """ 

87 

88 def __init__(self, host: str = "127.0.0.1", port: int = 8080): 

89 self._host = host 

90 self._port = port 

91 self._routes: List[Route] = [] 

92 self._server: Optional[HTTPServer] = None 

93 self._server_thread: Optional[threading.Thread] = None 

94 self._running = False 

95 self._middleware: List[Callable] = [] 

96 

97 @property 

98 def host(self) -> str: 

99 """Return the server host.""" 

100 return self._host 

101 

102 @property 

103 def port(self) -> int: 

104 """Return the server port.""" 

105 return self._port 

106 

107 @property 

108 def is_running(self) -> bool: 

109 """Return whether the server is running.""" 

110 return self._running 

111 

112 @property 

113 def routes(self) -> List[Route]: 

114 """Return all registered routes.""" 

115 return self._routes.copy() 

116 

117 def register_route(self, path: str, method: HTTPMethod, handler: Callable, 

118 plugin_name: str, description: str = "") -> bool: 

119 """ 

120 Register a new route. 

121 

122 Args: 

123 path: The URL path for the route. 

124 method: The HTTP method. 

125 handler: The handler function. 

126 plugin_name: The name of the plugin registering the route. 

127 description: Optional description of the route. 

128 

129 Returns: 

130 True if registration was successful, False otherwise. 

131 """ 

132 # Check for duplicate routes 

133 for route in self._routes: 

134 if route.path == path and route.method == method: 

135 logger.warning(f"Route {method.value} {path} already registered") 

136 return False 

137 

138 route = Route( 

139 path=path, 

140 method=method, 

141 handler=handler, 

142 plugin_name=plugin_name, 

143 description=description 

144 ) 

145 self._routes.append(route) 

146 logger.info(f"Route registered: {method.value} {path} by {plugin_name}") 

147 return True 

148 

149 def unregister_route(self, path: str, method: HTTPMethod) -> bool: 

150 """ 

151 Unregister a specific route. 

152 

153 Args: 

154 path: The URL path of the route. 

155 method: The HTTP method. 

156 

157 Returns: 

158 True if unregistration was successful, False otherwise. 

159 """ 

160 for route in self._routes: 

161 if route.path == path and route.method == method: 

162 self._routes.remove(route) 

163 logger.info(f"Route unregistered: {method.value} {path}") 

164 return True 

165 

166 logger.warning(f"Route {method.value} {path} not found") 

167 return False 

168 

169 def unregister_routes(self, plugin_name: str) -> int: 

170 """ 

171 Unregister all routes for a plugin. 

172 

173 Args: 

174 plugin_name: The name of the plugin. 

175 

176 Returns: 

177 Number of routes unregistered. 

178 """ 

179 routes_to_remove = [r for r in self._routes if r.plugin_name == plugin_name] 

180 for route in routes_to_remove: 

181 self._routes.remove(route) 

182 

183 count = len(routes_to_remove) 

184 if count > 0: 

185 logger.info(f"Unregistered {count} routes for plugin {plugin_name}") 

186 return count 

187 

188 def add_middleware(self, middleware: Callable) -> None: 

189 """ 

190 Add middleware to the request pipeline. 

191 

192 Args: 

193 middleware: A callable that takes (request, next) and returns a response. 

194 """ 

195 self._middleware.append(middleware) 

196 

197 def _find_route(self, path: str, method: str) -> Optional[Route]: 

198 """Find a matching route for the given path and method.""" 

199 for route in self._routes: 

200 if route.matches(path, method): 

201 return route 

202 return None 

203 

204 def _create_handler(self): 

205 """Create the HTTP request handler class.""" 

206 server = self 

207 

208 class RequestHandler(BaseHTTPRequestHandler): 

209 def log_message(self, format, *args): 

210 logger.debug(f"HTTP: {format % args}") 

211 

212 def _handle_request(self, method: str): 

213 parsed = urlparse(self.path) 

214 path = parsed.path 

215 query_params = parse_qs(parsed.query) 

216 

217 # Read body for POST/PUT/PATCH 

218 content_length = int(self.headers.get('Content-Length', 0)) 

219 body = self.rfile.read(content_length) if content_length > 0 else None 

220 

221 # Create request object 

222 headers = {k: v for k, v in self.headers.items()} 

223 request = Request( 

224 method=method, 

225 path=path, 

226 query_params=query_params, 

227 headers=headers, 

228 body=body 

229 ) 

230 

231 # Find route 

232 route = server._find_route(path, method) 

233 

234 if route is None: 

235 self._send_response(Response( 

236 status_code=404, 

237 body={"error": "Not Found", "path": path} 

238 )) 

239 return 

240 

241 try: 

242 # Call handler 

243 result = route.handler(request) 

244 

245 if isinstance(result, Response): 

246 self._send_response(result) 

247 elif isinstance(result, dict) or isinstance(result, list): 

248 self._send_response(Response(body=result)) 

249 else: 

250 self._send_response(Response(body=str(result))) 

251 except Exception as e: 

252 logger.exception(f"Error handling request: {e}") 

253 self._send_response(Response( 

254 status_code=500, 

255 body={"error": "Internal Server Error", "message": str(e)} 

256 )) 

257 

258 def _send_response(self, response: Response): 

259 self.send_response(response.status_code) 

260 self.send_header('Content-Type', response.content_type) 

261 for key, value in response.headers.items(): 

262 self.send_header(key, value) 

263 self.end_headers() 

264 self.wfile.write(response.to_bytes()) 

265 

266 def do_GET(self): 

267 self._handle_request("GET") 

268 

269 def do_POST(self): 

270 self._handle_request("POST") 

271 

272 def do_PUT(self): 

273 self._handle_request("PUT") 

274 

275 def do_DELETE(self): 

276 self._handle_request("DELETE") 

277 

278 def do_PATCH(self): 

279 self._handle_request("PATCH") 

280 

281 def do_OPTIONS(self): 

282 self._handle_request("OPTIONS") 

283 

284 def do_HEAD(self): 

285 self._handle_request("HEAD") 

286 

287 return RequestHandler 

288 

289 def start(self, blocking: bool = False) -> bool: 

290 """ 

291 Start the HTTP server. 

292 

293 Args: 

294 blocking: If True, run in the current thread (blocking). 

295 If False, run in a background thread. 

296 

297 Returns: 

298 True if server started successfully. 

299 """ 

300 if self._running: 

301 logger.warning("Server is already running") 

302 return False 

303 

304 try: 

305 handler_class = self._create_handler() 

306 self._server = HTTPServer((self._host, self._port), handler_class) 

307 self._running = True 

308 

309 logger.info(f"HTTP server starting on {self._host}:{self._port}") 

310 

311 if blocking: 

312 self._server.serve_forever() 

313 else: 

314 self._server_thread = threading.Thread( 

315 target=self._server.serve_forever, 

316 daemon=True 

317 ) 

318 self._server_thread.start() 

319 

320 return True 

321 except Exception as e: 

322 logger.exception(f"Failed to start HTTP server: {e}") 

323 self._running = False 

324 return False 

325 

326 def stop(self) -> bool: 

327 """ 

328 Stop the HTTP server. 

329 

330 Returns: 

331 True if server stopped successfully. 

332 """ 

333 if not self._running: 

334 logger.warning("Server is not running") 

335 return False 

336 

337 try: 

338 if self._server: 

339 self._server.shutdown() 

340 self._server.server_close() 

341 self._server = None 

342 

343 if self._server_thread: 

344 self._server_thread.join(timeout=5) 

345 self._server_thread = None 

346 

347 self._running = False 

348 logger.info("HTTP server stopped") 

349 return True 

350 except Exception as e: 

351 logger.exception(f"Error stopping HTTP server: {e}") 

352 return False 

353 

354 def list_routes(self) -> List[Dict[str, str]]: 

355 """ 

356 List all registered routes. 

357 

358 Returns: 

359 List of route information. 

360 """ 

361 return [ 

362 { 

363 "path": route.path, 

364 "method": route.method.value, 

365 "plugin": route.plugin_name, 

366 "description": route.description 

367 } 

368 for route in self._routes 

369 ] 

370 

371 def handle_request(self, request: Request) -> Response: 

372 """ 

373 Handle a request directly (for testing). 

374 

375 Args: 

376 request: The request to handle. 

377 

378 Returns: 

379 The response. 

380 """ 

381 route = self._find_route(request.path, request.method) 

382 

383 if route is None: 

384 return Response( 

385 status_code=404, 

386 body={"error": "Not Found", "path": request.path} 

387 ) 

388 

389 try: 

390 result = route.handler(request) 

391 

392 if isinstance(result, Response): 

393 return result 

394 elif isinstance(result, dict) or isinstance(result, list): 

395 return Response(body=result) 

396 else: 

397 return Response(body=str(result)) 

398 except Exception as e: 

399 logger.exception(f"Error handling request: {e}") 

400 return Response( 

401 status_code=500, 

402 body={"error": "Internal Server Error", "message": str(e)} 

403 )