Files
snippets/serial_mjpeg_http_streamer.py
2026-04-06 15:14:06 +08:00

396 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
import argparse
import sys
import threading
import time
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from typing import Optional
import serial
JPEG_SOI = b"\xff\xd8" # Start Of Image
JPEG_EOI = b"\xff\xd9" # End Of Image
BLANK_JPEG = (
b"\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00"
b"\x01\x00\x00\xff\xdb\x00C\x00\x08\x06\x06\x07\x06\x05\x08"
b"\x07\x07\x07\t\t\x08\n\x0c\x14\r\x0c\x0b\x0b\x0c\x19\x12"
b'\x13\x0f\x14\x1d\x1a\x1f\x1e\x1d\x1a\x1c\x1c $.\x27 ",#'
b"\x1c\x1c(7telestreet.telestreet1c\x01\t\t\t\x0c\x0b\x0c"
b"\x18\r\r\x1821\x1c!222222222222222222222222222222222222"
b"22222222222222\xff\xc0\x00\x0b\x08\x00\x01\x00\x01\x01\x01"
b"\x11\x00\xff\xc4\x00\x1f\x00\x00\x01\x05\x01\x01\x01\x01\x01"
b"\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x02\x03\x04\x05\x06"
b"\x07\x08\t\n\x0b\xff\xc4\x00\xb5\x10\x00\x02\x01\x03\x03\x02"
b"\x04\x03\x05\x05\x04\x04\x00\x00\x01}\x01\x02\x03\x00\x04"
b'\x11\x05\x12!1A\x06\x13Qa\x07"q\x142\x81\x91\xa1\x08#B\xb1'
b"\xc1\x15R\xd1\xf0$3br\x82\t\n\x16\x17\x18\x19\x1a%&\x27()*"
b"456789:CDEFGHIJSTUVWXYZcdefghijstuvwxyz\x83\x84\x85\x86\x87"
b"\x88\x89\x8a\x92\x93\x94\x95\x96\x97\x98\x99\x9a\xa2\xa3\xa4"
b"\xa5\xa6\xa7\xa8\xa9\xaa\xb2\xb3\xb4\xb5\xb6\xb7\xb8\xb9\xba"
b"\xc2\xc3\xc4\xc5\xc6\xc7\xc8\xc9\xca\xd2\xd3\xd4\xd5\xd6\xd7"
b"\xd8\xd9\xda\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xf1\xf2"
b"\xf3\xf4\xf5\xf6\xf7\xf8\xf9\xfa\xff\xda\x00\x08\x01\x01\x00"
b"\x00?\x00\xfb\xd5\xff\xd9"
)
class FrameBuffer:
def __init__(self):
self._lock = threading.Lock()
self._frame: Optional[bytes] = None
self._status: str = "waiting for data"
def update_frame(self, frame: bytes) -> None:
with self._lock:
self._frame = frame
self._status = "streaming"
def set_status(self, status: str) -> None:
with self._lock:
self._status = status
def get_frame(self) -> Optional[bytes]:
with self._lock:
return self._frame
def get_status(self) -> str:
with self._lock:
return self._status
#Serial Helpers
def validate_serial_port(port: str, baudrate: int, timeout: float = 1.0) -> bool:
try:
test_ser = serial.Serial(port=port, baudrate=baudrate, timeout=timeout)
# Try to read a small amount of data to verify it's working
test_ser.read(1)
test_ser.close()
return True
except serial.SerialException as e:
print(f"[Validation] Failed to validate {port}: {e}", file=sys.stderr)
return False
except Exception as e:
print(f"[Validation] Unexpected error validating {port}: {e}", file=sys.stderr)
return False
def try_open_serial(
port: str,
baudrate: int,
timeout: float,
reconnect_delay: float,
frame_buffer: FrameBuffer,
verbose: bool = False,
) -> Optional[serial.Serial]:
try:
ser = serial.Serial(port=port, baudrate=baudrate, timeout=timeout)
if verbose:
print(f"[Serial] Opened {port} at {baudrate} baud")
frame_buffer.set_status("connected")
return ser
except serial.SerialException as e:
if verbose:
print(f"[Serial] Could not open {port}: {e}")
frame_buffer.set_status("disconnected")
time.sleep(reconnect_delay)
return None
def serial_mjpeg_reader(
port: str,
baudrate: int,
timeout: float,
frame_buffer: FrameBuffer,
chunk_size: int,
max_buffer: int,
reconnect_delay: float,
verbose: bool = False,
) -> None:
ser: Optional[serial.Serial] = None
buffer = bytearray()
while True:
# Ensure serial is open
if ser is None or not ser.is_open:
frame_buffer.set_status("connecting")
ser = try_open_serial(
port, baudrate, timeout, reconnect_delay, frame_buffer, verbose
)
if ser is None:
# Failed to open; try again in next loop iteration
continue
buffer.clear()
try:
# Normal read loop
chunk = ser.read(chunk_size)
if not chunk:
# Timeout or no data; small sleep to avoid busy-wait
time.sleep(0.01)
continue
buffer.extend(chunk)
# Prevent unbounded buffer growth
if len(buffer) > max_buffer:
buffer[:] = buffer[-max_buffer // 2 :]
while True:
start_idx = buffer.find(JPEG_SOI)
if start_idx == -1:
if len(buffer) > max_buffer:
buffer.clear()
break
end_idx = buffer.find(JPEG_EOI, start_idx + 2)
if end_idx == -1:
# Have SOI, but no complete JPEG yet
if start_idx > 0:
del buffer[:start_idx]
break
frame_data = buffer[start_idx : end_idx + 2]
del buffer[: end_idx + 2]
frame_buffer.update_frame(bytes(frame_data))
if verbose:
print(f"[Serial] Frame: {len(frame_data)} bytes")
except (serial.SerialException, OSError) as e:
# Port disconnected or error occurred
print(f"[Serial] Error / disconnect: {e}", file=sys.stderr)
frame_buffer.set_status("disconnected")
try:
if ser and ser.is_open:
ser.close()
except Exception:
pass
ser = None
# Short pause before trying to reopen
time.sleep(reconnect_delay)
except Exception as e:
# Unexpected errors; log and retry
print(f"[Serial] Unexpected error: {e}", file=sys.stderr)
time.sleep(1)
#HTTP Handler
class MJPEGRequestHandler(BaseHTTPRequestHandler):
# Set at server initialization
frame_buffer: FrameBuffer = None # type: ignore[assignment]
boundary: str = "frame"
frame_interval: float = 0.05
verbose: bool = False
def log_message(self, format: str, *args) -> None:
if self.verbose:
super().log_message(format, *args)
def do_GET(self):
if self.path != self.server.stream_path: # type: ignore[attr-defined]
# Optionally expose a basic status page at "/"
if self.path == "/":
self.send_response(200)
self.send_header("Content-Type", "text/plain; charset=utf-8")
self.end_headers()
status = self.frame_buffer.get_status()
self.wfile.write(
f"Serial MJPEG HTTP Streamer\nStatus: {status}\n".encode("utf-8")
)
return
self.send_error(404, "Not Found")
return
self.send_response(200)
self.send_header(
"Content-Type", f"multipart/x-mixed-replace; boundary={self.boundary}"
)
self.send_header("Cache-Control", "no-cache, no-store, must-revalidate")
self.send_header("Pragma", "no-cache")
self.send_header("Expires", "0")
self.end_headers()
if self.verbose:
print(f"[HTTP] Client connected: {self.client_address}")
try:
while True:
frame = self.frame_buffer.get_frame()
status = self.frame_buffer.get_status()
if frame is None or status != "streaming":
frame = BLANK_JPEG
self.wfile.write(b"--" + self.boundary.encode("ascii") + b"\r\n")
self.wfile.write(b"Content-Type: image/jpeg\r\n")
self.wfile.write(f"Content-Length: {len(frame)}\r\n".encode("ascii"))
self.wfile.write(b"\r\n")
self.wfile.write(frame)
self.wfile.write(b"\r\n")
time.sleep(self.frame_interval)
except (BrokenPipeError, ConnectionResetError):
if self.verbose:
print(f"[HTTP] Client disconnected: {self.client_address}")
except Exception as e:
print(f"[HTTP] Streaming error: {e}", file=sys.stderr)
class MJPEGHTTPServer(ThreadingHTTPServer):
def __init__(self, server_address, RequestHandlerClass, stream_path: str):
super().__init__(server_address, RequestHandlerClass)
self.stream_path = stream_path
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Stream an MJPEG serial feed over HTTP (auto-reconnect)."
)
# Serial options
parser.add_argument(
"--serial-port",
"-p",
required=True,
help="Serial port (e.g. COM3, /dev/ttyUSB0, /dev/ttyACM0)",
)
parser.add_argument(
"--baudrate",
"-b",
type=int,
default=115200,
help="Serial baud rate (default: 115200)",
)
parser.add_argument(
"--timeout",
type=float,
default=0.1,
help="Serial read timeout in seconds (default: 0.1)",
)
parser.add_argument(
"--chunk-size",
type=int,
default=4096,
help="Bytes to read from serial per call (default: 4096)",
)
parser.add_argument(
"--max-buffer",
type=int,
default=1_000_000,
help="Maximum serial buffer size in bytes (default: 1,000,000)",
)
parser.add_argument(
"--reconnect-delay",
type=float,
default=2.0,
help="Seconds to wait before retrying serial open after an error (default: 2.0)",
)
# HTTP options
parser.add_argument(
"--http-host",
"-H",
default="0.0.0.0",
help="HTTP bind address (default: 0.0.0.0)",
)
parser.add_argument(
"--http-port", "-P", type=int, default=8080, help="HTTP port (default: 8080)"
)
parser.add_argument(
"--path",
default="/stream",
help="HTTP path for MJPEG stream (default: /stream)",
)
parser.add_argument(
"--boundary",
default="frame",
help="Multipart boundary string (default: 'frame')",
)
parser.add_argument(
"--fps",
type=float,
default=20.0,
help="Approximate frames per second to send (default: 20.0)",
)
# Misc
parser.add_argument(
"-v", "--verbose", action="store_true", help="Enable verbose logging"
)
return parser.parse_args()
def main():
args = parse_args()
# Validate serial port is accessible before starting
print(f"[Main] Validating serial port {args.serial_port}...")
if not validate_serial_port(args.serial_port, args.baudrate):
print(
f"[Main] Error: Serial port {args.serial_port} is not accessible.",
file=sys.stderr,
)
print(
"[Main] Please check that the device is connected and you have permission to access it.",
file=sys.stderr,
)
sys.exit(1)
print(f"[Main] Serial port {args.serial_port} validated successfully")
frame_buffer = FrameBuffer()
frame_buffer.set_status("initializing")
# Start serial reader in background; it handles reconnects itself
reader_thread = threading.Thread(
target=serial_mjpeg_reader,
args=(
args.serial_port,
args.baudrate,
args.timeout,
frame_buffer,
args.chunk_size,
args.max_buffer,
args.reconnect_delay,
args.verbose,
),
daemon=True,
)
reader_thread.start()
print(
f"[Main] Serial reader thread started for {args.serial_port} "
f"({args.baudrate} baud). Auto-reconnect enabled."
)
# Configure HTTP handler
MJPEGRequestHandler.frame_buffer = frame_buffer
MJPEGRequestHandler.boundary = args.boundary
MJPEGRequestHandler.frame_interval = 1.0 / max(args.fps, 0.1)
MJPEGRequestHandler.verbose = args.verbose
server_address = (args.http_host, args.http_port)
httpd = MJPEGHTTPServer(server_address, MJPEGRequestHandler, args.path)
print(
f"[Main] HTTP MJPEG server running on "
f"http://{args.http_host}:{args.http_port}{args.path}"
)
print("[Main] Status endpoint at http://%s:%d/" % (args.http_host, args.http_port))
print("[Main] Press Ctrl+C to quit")
try:
httpd.serve_forever()
except KeyboardInterrupt:
print("\n[Main] Shutting down HTTP server...")
finally:
httpd.server_close()
print("[Main] Exiting")
if __name__ == "__main__":
main()