From b0cf40d28166da67706eceface47beae6b7be811 Mon Sep 17 00:00:00 2001 From: xia-chu <771730766@qq.com> Date: Wed, 3 Dec 2025 10:44:38 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=9B=9E=E8=B0=83http?= =?UTF-8?q?=E8=AF=B7=E6=B1=82=E5=88=B0fastapi?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 1 + python/mk_plugin.py | 57 +++++++++++++++++++++++++++++++++++++++-- server/pyinvoker.cpp | 60 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 116 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index b72e1fe4..2843d878 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -572,6 +572,7 @@ file(COPY "${CMAKE_CURRENT_SOURCE_DIR}/conf/config.ini" DESTINATION ${EXECUTABLE file(COPY "${CMAKE_CURRENT_SOURCE_DIR}/default.pem" DESTINATION ${EXECUTABLE_OUTPUT_PATH}) if (ENABLE_PYTHON) file(COPY "${CMAKE_CURRENT_SOURCE_DIR}/python" DESTINATION ${EXECUTABLE_OUTPUT_PATH}) + file(COPY "${CMAKE_CURRENT_SOURCE_DIR}/python/StreamUI/frontend" DESTINATION "${EXECUTABLE_OUTPUT_PATH}/www/StreamUI") endif () # 拷贝VideoStack 无视频流时默认填充的背景图片 diff --git a/python/mk_plugin.py b/python/mk_plugin.py index 01289776..101ca823 100644 --- a/python/mk_plugin.py +++ b/python/mk_plugin.py @@ -1,10 +1,63 @@ import mk_logger import mk_loader +import asyncio +import threading +from StreamUI.backend.main import app +from starlette.routing import Match + +def start_background_loop(loop): + asyncio.set_event_loop(loop) + loop.run_forever() + +loop = asyncio.new_event_loop() +threading.Thread(target=start_background_loop, args=(loop,), daemon=True).start() + +def submit_coro(scope, body, send): + async def run(): + # 包装 send 函数,确保它总是可等待的 + async def async_send(message): + # 调用原始的 send 函数,它现在应该返回一个协程 + result = send(message) + if result is not None: + await result + + async def receive(): + return { + "type": "http.request", + "body": body, + "more_body": False, + } + + try: + await app(scope, receive, async_send) + except Exception as e: + mk_logger.log_warn(f"FastAPI failed: {e}") + # 发送错误响应 + await async_send({ + "type": "http.response.start", + "status": 500, + "headers": [(b"content-type", b"text/plain")], + }) + await async_send({ + "type": "http.response.body", + "body": b"Internal Server Error", + "more_body": False, + }) + return asyncio.run_coroutine_threadsafe(run(), loop) + +def check_route(scope) -> bool: + for route in app.routes: + if hasattr(route, "matches"): + match, _ = route.matches(scope) + if match == Match.FULL: + return True + return False def on_start(): mk_logger.log_info(f"on_start, secret: {mk_loader.get_config('api.secret')}") - mk_loader.set_config('api.secret', "new_secret_from_python") - mk_loader.update_config() + # mk_loader.set_config('api.secret', "new_secret_from_python") + # mk_loader.update_config() + mk_loader.set_fastapi(check_route, submit_coro) def on_exit(): mk_logger.log_info("on_exit") diff --git a/server/pyinvoker.cpp b/server/pyinvoker.cpp index efc22c69..7e425382 100644 --- a/server/pyinvoker.cpp +++ b/server/pyinvoker.cpp @@ -8,6 +8,8 @@ #include #include #include "WebHook.h" +#include "Common/Parser.h" +#include "Http/HttpSession.h" using namespace toolkit; using namespace mediakit; @@ -87,6 +89,55 @@ mINI to_native(const py::dict &opt) { return ret; } +void handle_http_request(const py::object &check_route, const py::object &submit_coro, const Parser &parser, const HttpSession::HttpResponseInvoker &invoker, bool &consumed, toolkit::SockInfo &sender) { + py::gil_scoped_acquire guard; + + py::dict scope; + scope["type"] = "http"; + scope["http_version"] = "1.1"; + scope["method"] = parser.method(); + scope["path"] = parser.url(); + scope["query_string"] = parser.params(); + py::list hdrs; + for (auto &kv : parser.getHeader()) { + hdrs.append(py::make_tuple(py::bytes(kv.first), py::bytes(kv.second))); + } + scope["headers"] = hdrs; + + bool ok = check_route(scope).cast(); + if (!ok) { + return; + } + consumed = true; + + StrCaseMap resp_headers; + std::string resp_body; + int status = 500; + auto send = py::cpp_function([invoker, status, resp_body, resp_headers](const py::dict &msg) mutable { + auto type = msg["type"].cast(); + if (type == "http.response.start") { + status = msg["status"].cast(); + for (auto tup : msg["headers"].cast()) { + auto t = tup.cast(); + resp_headers[t[0].cast()] = t[1].cast(); + } + return; + } + + if (type == "http.response.body") { + resp_body += msg["body"].cast(); + // 💥 只在 more_body=False 时回调 + bool more = msg.contains("more_body") && msg["more_body"].cast(); + if (!more) { + invoker(status, resp_headers, resp_body); + } + } + }); + + submit_coro(scope, py::bytes(parser.content()), send); +} + + PYBIND11_EMBEDDED_MODULE(mk_loader, m) { m.def("log", [](int lev, const char *file, int line, const char *func, const char *content) { py::gil_scoped_release release; @@ -126,6 +177,15 @@ PYBIND11_EMBEDDED_MODULE(mk_loader, m) { auto &invoker = to_native(cap); invoker(err); }); + + m.def("set_fastapi", [](const py::object &check_route, const py::object &submit_coro) { + static void *fastapi_tag = nullptr; + NoticeCenter::Instance().delListener(&fastapi_tag, Broadcast::kBroadcastHttpRequest); + NoticeCenter::Instance().addListener(&fastapi_tag, Broadcast::kBroadcastHttpRequest, [check_route, submit_coro](BroadcastHttpRequestArgs) { + handle_http_request(check_route, submit_coro, parser, invoker, consumed, sender); + }); + }); + } namespace mediakit {