Coverage for integrations / channels / plugins / http_server.py: 0.0%
192 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HTTP Server for Plugin System.
4Provides a lightweight HTTP server that plugins can use to register
5custom routes and endpoints.
6"""
8import logging
9import json
10import threading
11from http.server import HTTPServer, BaseHTTPRequestHandler
12from typing import Any, Callable, Dict, List, Optional, Tuple
13from dataclasses import dataclass, field
14from urllib.parse import urlparse, parse_qs
15from enum import Enum
17logger = logging.getLogger(__name__)
20class HTTPMethod(Enum):
21 """HTTP methods."""
22 GET = "GET"
23 POST = "POST"
24 PUT = "PUT"
25 DELETE = "DELETE"
26 PATCH = "PATCH"
27 OPTIONS = "OPTIONS"
28 HEAD = "HEAD"
31@dataclass
32class Route:
33 """Represents an HTTP route."""
34 path: str
35 method: HTTPMethod
36 handler: Callable
37 plugin_name: str
38 description: str = ""
40 def matches(self, path: str, method: str) -> bool:
41 """Check if this route matches the given path and method."""
42 return self.path == path and self.method.value == method
45@dataclass
46class Request:
47 """Represents an HTTP request."""
48 method: str
49 path: str
50 query_params: Dict[str, List[str]]
51 headers: Dict[str, str]
52 body: Optional[bytes] = None
54 def json(self) -> Any:
55 """Parse body as JSON."""
56 if self.body:
57 return json.loads(self.body.decode('utf-8'))
58 return None
61@dataclass
62class Response:
63 """Represents an HTTP response."""
64 status_code: int = 200
65 body: Any = None
66 headers: Dict[str, str] = field(default_factory=dict)
67 content_type: str = "application/json"
69 def to_bytes(self) -> bytes:
70 """Convert response body to bytes."""
71 if self.body is None:
72 return b""
73 if isinstance(self.body, bytes):
74 return self.body
75 if isinstance(self.body, str):
76 return self.body.encode('utf-8')
77 return json.dumps(self.body).encode('utf-8')
80class PluginHTTPServer:
81 """
82 HTTP server for plugin routes.
84 Allows plugins to register custom HTTP endpoints and handles
85 routing requests to the appropriate handlers.
86 """
88 def __init__(self, host: str = "127.0.0.1", port: int = 8080):
89 self._host = host
90 self._port = port
91 self._routes: List[Route] = []
92 self._server: Optional[HTTPServer] = None
93 self._server_thread: Optional[threading.Thread] = None
94 self._running = False
95 self._middleware: List[Callable] = []
97 @property
98 def host(self) -> str:
99 """Return the server host."""
100 return self._host
102 @property
103 def port(self) -> int:
104 """Return the server port."""
105 return self._port
107 @property
108 def is_running(self) -> bool:
109 """Return whether the server is running."""
110 return self._running
112 @property
113 def routes(self) -> List[Route]:
114 """Return all registered routes."""
115 return self._routes.copy()
117 def register_route(self, path: str, method: HTTPMethod, handler: Callable,
118 plugin_name: str, description: str = "") -> bool:
119 """
120 Register a new route.
122 Args:
123 path: The URL path for the route.
124 method: The HTTP method.
125 handler: The handler function.
126 plugin_name: The name of the plugin registering the route.
127 description: Optional description of the route.
129 Returns:
130 True if registration was successful, False otherwise.
131 """
132 # Check for duplicate routes
133 for route in self._routes:
134 if route.path == path and route.method == method:
135 logger.warning(f"Route {method.value} {path} already registered")
136 return False
138 route = Route(
139 path=path,
140 method=method,
141 handler=handler,
142 plugin_name=plugin_name,
143 description=description
144 )
145 self._routes.append(route)
146 logger.info(f"Route registered: {method.value} {path} by {plugin_name}")
147 return True
149 def unregister_route(self, path: str, method: HTTPMethod) -> bool:
150 """
151 Unregister a specific route.
153 Args:
154 path: The URL path of the route.
155 method: The HTTP method.
157 Returns:
158 True if unregistration was successful, False otherwise.
159 """
160 for route in self._routes:
161 if route.path == path and route.method == method:
162 self._routes.remove(route)
163 logger.info(f"Route unregistered: {method.value} {path}")
164 return True
166 logger.warning(f"Route {method.value} {path} not found")
167 return False
169 def unregister_routes(self, plugin_name: str) -> int:
170 """
171 Unregister all routes for a plugin.
173 Args:
174 plugin_name: The name of the plugin.
176 Returns:
177 Number of routes unregistered.
178 """
179 routes_to_remove = [r for r in self._routes if r.plugin_name == plugin_name]
180 for route in routes_to_remove:
181 self._routes.remove(route)
183 count = len(routes_to_remove)
184 if count > 0:
185 logger.info(f"Unregistered {count} routes for plugin {plugin_name}")
186 return count
188 def add_middleware(self, middleware: Callable) -> None:
189 """
190 Add middleware to the request pipeline.
192 Args:
193 middleware: A callable that takes (request, next) and returns a response.
194 """
195 self._middleware.append(middleware)
197 def _find_route(self, path: str, method: str) -> Optional[Route]:
198 """Find a matching route for the given path and method."""
199 for route in self._routes:
200 if route.matches(path, method):
201 return route
202 return None
204 def _create_handler(self):
205 """Create the HTTP request handler class."""
206 server = self
208 class RequestHandler(BaseHTTPRequestHandler):
209 def log_message(self, format, *args):
210 logger.debug(f"HTTP: {format % args}")
212 def _handle_request(self, method: str):
213 parsed = urlparse(self.path)
214 path = parsed.path
215 query_params = parse_qs(parsed.query)
217 # Read body for POST/PUT/PATCH
218 content_length = int(self.headers.get('Content-Length', 0))
219 body = self.rfile.read(content_length) if content_length > 0 else None
221 # Create request object
222 headers = {k: v for k, v in self.headers.items()}
223 request = Request(
224 method=method,
225 path=path,
226 query_params=query_params,
227 headers=headers,
228 body=body
229 )
231 # Find route
232 route = server._find_route(path, method)
234 if route is None:
235 self._send_response(Response(
236 status_code=404,
237 body={"error": "Not Found", "path": path}
238 ))
239 return
241 try:
242 # Call handler
243 result = route.handler(request)
245 if isinstance(result, Response):
246 self._send_response(result)
247 elif isinstance(result, dict) or isinstance(result, list):
248 self._send_response(Response(body=result))
249 else:
250 self._send_response(Response(body=str(result)))
251 except Exception as e:
252 logger.exception(f"Error handling request: {e}")
253 self._send_response(Response(
254 status_code=500,
255 body={"error": "Internal Server Error", "message": str(e)}
256 ))
258 def _send_response(self, response: Response):
259 self.send_response(response.status_code)
260 self.send_header('Content-Type', response.content_type)
261 for key, value in response.headers.items():
262 self.send_header(key, value)
263 self.end_headers()
264 self.wfile.write(response.to_bytes())
266 def do_GET(self):
267 self._handle_request("GET")
269 def do_POST(self):
270 self._handle_request("POST")
272 def do_PUT(self):
273 self._handle_request("PUT")
275 def do_DELETE(self):
276 self._handle_request("DELETE")
278 def do_PATCH(self):
279 self._handle_request("PATCH")
281 def do_OPTIONS(self):
282 self._handle_request("OPTIONS")
284 def do_HEAD(self):
285 self._handle_request("HEAD")
287 return RequestHandler
289 def start(self, blocking: bool = False) -> bool:
290 """
291 Start the HTTP server.
293 Args:
294 blocking: If True, run in the current thread (blocking).
295 If False, run in a background thread.
297 Returns:
298 True if server started successfully.
299 """
300 if self._running:
301 logger.warning("Server is already running")
302 return False
304 try:
305 handler_class = self._create_handler()
306 self._server = HTTPServer((self._host, self._port), handler_class)
307 self._running = True
309 logger.info(f"HTTP server starting on {self._host}:{self._port}")
311 if blocking:
312 self._server.serve_forever()
313 else:
314 self._server_thread = threading.Thread(
315 target=self._server.serve_forever,
316 daemon=True
317 )
318 self._server_thread.start()
320 return True
321 except Exception as e:
322 logger.exception(f"Failed to start HTTP server: {e}")
323 self._running = False
324 return False
326 def stop(self) -> bool:
327 """
328 Stop the HTTP server.
330 Returns:
331 True if server stopped successfully.
332 """
333 if not self._running:
334 logger.warning("Server is not running")
335 return False
337 try:
338 if self._server:
339 self._server.shutdown()
340 self._server.server_close()
341 self._server = None
343 if self._server_thread:
344 self._server_thread.join(timeout=5)
345 self._server_thread = None
347 self._running = False
348 logger.info("HTTP server stopped")
349 return True
350 except Exception as e:
351 logger.exception(f"Error stopping HTTP server: {e}")
352 return False
354 def list_routes(self) -> List[Dict[str, str]]:
355 """
356 List all registered routes.
358 Returns:
359 List of route information.
360 """
361 return [
362 {
363 "path": route.path,
364 "method": route.method.value,
365 "plugin": route.plugin_name,
366 "description": route.description
367 }
368 for route in self._routes
369 ]
371 def handle_request(self, request: Request) -> Response:
372 """
373 Handle a request directly (for testing).
375 Args:
376 request: The request to handle.
378 Returns:
379 The response.
380 """
381 route = self._find_route(request.path, request.method)
383 if route is None:
384 return Response(
385 status_code=404,
386 body={"error": "Not Found", "path": request.path}
387 )
389 try:
390 result = route.handler(request)
392 if isinstance(result, Response):
393 return result
394 elif isinstance(result, dict) or isinstance(result, list):
395 return Response(body=result)
396 else:
397 return Response(body=str(result))
398 except Exception as e:
399 logger.exception(f"Error handling request: {e}")
400 return Response(
401 status_code=500,
402 body={"error": "Internal Server Error", "message": str(e)}
403 )