diff --git a/server/WebApi.cpp b/server/WebApi.cpp index fffffe24..2b1cc2f3 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -595,7 +595,7 @@ void getStatisticJson(const function &cb) { } void addStreamProxy(const MediaTuple &tuple, const string &url, int retry_count, bool force, - const ProtocolOption &option, int rtp_type, float timeout_sec, const mINI &args, + const ProtocolOption &option, float timeout_sec, const mINI &args, const function &cb) { auto key = tuple.shortUrl(); if (s_player_proxy.find(key)) { @@ -614,10 +614,6 @@ void addStreamProxy(const MediaTuple &tuple, const string &url, int retry_count, (*player)[pr.first] = pr.second; } - // 指定RTP over TCP(播放rtsp时有效) [AUTO-TRANSLATED:1a062656] - // Specify RTP over TCP (effective when playing RTSP) - (*player)[Client::kRtpType] = rtp_type; - if (timeout_sec > 0.1f) { // 播放握手超时时间 [AUTO-TRANSLATED:5a29ae1f] // Play handshake timeout @@ -1284,7 +1280,6 @@ void installWebApi() { retry_count, allArgs["force"], option, - allArgs["rtp_type"], allArgs["timeout_sec"], args, [invoker,val,headerOut](const SockException &ex,const string &key) mutable { diff --git a/server/WebApi.h b/server/WebApi.h index f7c1e7cc..b950f588 100755 --- a/server/WebApi.h +++ b/server/WebApi.h @@ -250,7 +250,7 @@ Json::Value makeMediaSourceJson(mediakit::MediaSource &media); ApiArgsType getAllArgs(const mediakit::Parser &parser); void getStatisticJson(const std::function &cb); void addStreamProxy(const mediakit::MediaTuple &tuple, const std::string &url, int retry_count, bool force, - const mediakit::ProtocolOption &option, int rtp_type, float timeout_sec, const toolkit::mINI &args, + const mediakit::ProtocolOption &option, float timeout_sec, const toolkit::mINI &args, const std::function &cb); template diff --git a/server/WebHook.cpp b/server/WebHook.cpp index a0268b56..5447bb6b 100755 --- a/server/WebHook.cpp +++ b/server/WebHook.cpp @@ -321,7 +321,7 @@ static void pullStreamFromOrigin(const vector &urls, size_t index, size_ option.enable_hls = option.enable_hls || (args.schema == HLS_SCHEMA); option.enable_mp4 = false; - addStreamProxy(args, url, retry_count, false, option, Rtsp::RTP_TCP, timeout_sec, mINI{}, [=](const SockException &ex, const string &key) mutable { + addStreamProxy(args, url, retry_count, false, option, timeout_sec, mINI{}, [=](const SockException &ex, const string &key) mutable { if (!ex) { return; } diff --git a/server/pyinvoker.cpp b/server/pyinvoker.cpp index fb186164..935903a0 100644 --- a/server/pyinvoker.cpp +++ b/server/pyinvoker.cpp @@ -12,7 +12,9 @@ #include "Util/util.h" #include "Util/File.h" #include "Common/Parser.h" +#include "Common/macros.h" #include "Http/HttpSession.h" +#include "Poller/EventPoller.h" #include "WebApi.h" using namespace toolkit; @@ -329,6 +331,52 @@ PYBIND11_EMBEDDED_MODULE(mk_loader, m) { invoker(errMsg, accessPath, cookieLifeSecond); }); + // add_stream_proxy(vhost, app, stream, url, cb, retry_count=-1, force=False, + // rtp_type=0, timeout_sec=0, opt={}) + // opt 字典可包含 ProtocolOption 的所有字段,以及其他透传给 Player 的 key-value 参数 + m.def("add_stream_proxy", + [](const std::string &vhost, const std::string &app, const std::string &stream, + const std::string &url, const py::object &cb, + int retry_count, bool force, float timeout_sec, + const py::dict &opt) { + mINI args = to_native(opt); + ProtocolOption option; + option.load(args); + MediaTuple tuple { vhost.empty() ? DEFAULT_VHOST : vhost, app, stream, "" }; + + // 用 shared_ptr 包裹 py::object,使其析构(dec_ref)可在受控环境下执行。 + // 必须在 GIL 持有期间创建该 shared_ptr(此处仍在 GIL 内)。 + // 自定义 deleter 保证即使在非 Python 线程析构时也会先获取 GIL。 + auto cb_ptr = std::shared_ptr( + new py::object(cb), + [](py::object *p) { + // dec_ref / 析构 py::object 需要 GIL + py::gil_scoped_acquire guard; + delete p; + } + ); + + py::gil_scoped_release release; + EventPollerPool::Instance().getPoller(false)->async([=]() mutable { + addStreamProxy(tuple, url, retry_count, force, option, timeout_sec, args, + [cb_ptr](const SockException &ex, const std::string &key) { + // cb_ptr 按值捕获(shared_ptr 的拷贝,纯 C++ 操作,无需 GIL) + // inc_ref/dec_ref/调用 Python 对象均在 gil_scoped_acquire 保护下进行 + py::gil_scoped_acquire guard; + try { + (*cb_ptr)(ex ? ex.what() : "", key); + } catch (py::error_already_set &e) { + WarnL << "Python exception in add_stream_proxy callback: " << e.what(); + } + // cb_ptr 在此析构(局部副本),dec_ref 由自定义 deleter 在 GIL 下执行 + }); + }); + }, + py::arg("vhost"), py::arg("app"), py::arg("stream"), py::arg("url"), py::arg("cb"), + py::arg("retry_count") = -1, py::arg("force") = false, + py::arg("timeout_sec") = 0.0f, py::arg("opt") = py::dict() + ); + m.def("set_fastapi", [](const py::object &check_route, const py::object &submit_coro) { static void *fastapi_tag = nullptr; NoticeCenter::Instance().delListener(&fastapi_tag, Broadcast::kBroadcastHttpRequest);