新增add_stream_proxy python接口
Some checks are pending
Android / build (push) Waiting to run
CodeQL / Analyze (cpp) (push) Waiting to run
CodeQL / Analyze (javascript) (push) Waiting to run
Docker / build (push) Waiting to run
Linux / build (push) Waiting to run
Linux_Python / build (push) Waiting to run
macOS / build (push) Waiting to run
macOS_Python / build (push) Waiting to run
Windows / build (push) Waiting to run
Windows_Python / build (push) Waiting to run

This commit is contained in:
xia-chu 2026-03-18 17:48:36 +08:00
parent 1bc40690c9
commit e1d3c21529
4 changed files with 51 additions and 8 deletions

View File

@ -595,7 +595,7 @@ void getStatisticJson(const function<void(Value &val)> &cb) {
}
void addStreamProxy(const MediaTuple &tuple, const string &url, int retry_count, bool force,
const ProtocolOption &option, int rtp_type, float timeout_sec, const mINI &args,
const ProtocolOption &option, float timeout_sec, const mINI &args,
const function<void(const SockException &ex, const string &key)> &cb) {
auto key = tuple.shortUrl();
if (s_player_proxy.find(key)) {
@ -614,10 +614,6 @@ void addStreamProxy(const MediaTuple &tuple, const string &url, int retry_count,
(*player)[pr.first] = pr.second;
}
// 指定RTP over TCP(播放rtsp时有效) [AUTO-TRANSLATED:1a062656]
// Specify RTP over TCP (effective when playing RTSP)
(*player)[Client::kRtpType] = rtp_type;
if (timeout_sec > 0.1f) {
// 播放握手超时时间 [AUTO-TRANSLATED:5a29ae1f]
// Play handshake timeout
@ -1284,7 +1280,6 @@ void installWebApi() {
retry_count,
allArgs["force"],
option,
allArgs["rtp_type"],
allArgs["timeout_sec"],
args,
[invoker,val,headerOut](const SockException &ex,const string &key) mutable {

View File

@ -250,7 +250,7 @@ Json::Value makeMediaSourceJson(mediakit::MediaSource &media);
ApiArgsType getAllArgs(const mediakit::Parser &parser);
void getStatisticJson(const std::function<void(Json::Value &val)> &cb);
void addStreamProxy(const mediakit::MediaTuple &tuple, const std::string &url, int retry_count, bool force,
const mediakit::ProtocolOption &option, int rtp_type, float timeout_sec, const toolkit::mINI &args,
const mediakit::ProtocolOption &option, float timeout_sec, const toolkit::mINI &args,
const std::function<void(const toolkit::SockException &ex, const std::string &key)> &cb);
template <typename Type>

View File

@ -321,7 +321,7 @@ static void pullStreamFromOrigin(const vector<string> &urls, size_t index, size_
option.enable_hls = option.enable_hls || (args.schema == HLS_SCHEMA);
option.enable_mp4 = false;
addStreamProxy(args, url, retry_count, false, option, Rtsp::RTP_TCP, timeout_sec, mINI{}, [=](const SockException &ex, const string &key) mutable {
addStreamProxy(args, url, retry_count, false, option, timeout_sec, mINI{}, [=](const SockException &ex, const string &key) mutable {
if (!ex) {
return;
}

View File

@ -12,7 +12,9 @@
#include "Util/util.h"
#include "Util/File.h"
#include "Common/Parser.h"
#include "Common/macros.h"
#include "Http/HttpSession.h"
#include "Poller/EventPoller.h"
#include "WebApi.h"
using namespace toolkit;
@ -329,6 +331,52 @@ PYBIND11_EMBEDDED_MODULE(mk_loader, m) {
invoker(errMsg, accessPath, cookieLifeSecond);
});
// add_stream_proxy(vhost, app, stream, url, cb, retry_count=-1, force=False,
// rtp_type=0, timeout_sec=0, opt={})
// opt 字典可包含 ProtocolOption 的所有字段,以及其他透传给 Player 的 key-value 参数
m.def("add_stream_proxy",
[](const std::string &vhost, const std::string &app, const std::string &stream,
const std::string &url, const py::object &cb,
int retry_count, bool force, float timeout_sec,
const py::dict &opt) {
mINI args = to_native(opt);
ProtocolOption option;
option.load(args);
MediaTuple tuple { vhost.empty() ? DEFAULT_VHOST : vhost, app, stream, "" };
// 用 shared_ptr 包裹 py::object使其析构dec_ref可在受控环境下执行。
// 必须在 GIL 持有期间创建该 shared_ptr此处仍在 GIL 内)。
// 自定义 deleter 保证即使在非 Python 线程析构时也会先获取 GIL。
auto cb_ptr = std::shared_ptr<py::object>(
new py::object(cb),
[](py::object *p) {
// dec_ref / 析构 py::object 需要 GIL
py::gil_scoped_acquire guard;
delete p;
}
);
py::gil_scoped_release release;
EventPollerPool::Instance().getPoller(false)->async([=]() mutable {
addStreamProxy(tuple, url, retry_count, force, option, timeout_sec, args,
[cb_ptr](const SockException &ex, const std::string &key) {
// cb_ptr 按值捕获shared_ptr 的拷贝,纯 C++ 操作,无需 GIL
// inc_ref/dec_ref/调用 Python 对象均在 gil_scoped_acquire 保护下进行
py::gil_scoped_acquire guard;
try {
(*cb_ptr)(ex ? ex.what() : "", key);
} catch (py::error_already_set &e) {
WarnL << "Python exception in add_stream_proxy callback: " << e.what();
}
// cb_ptr 在此析构局部副本dec_ref 由自定义 deleter 在 GIL 下执行
});
});
},
py::arg("vhost"), py::arg("app"), py::arg("stream"), py::arg("url"), py::arg("cb"),
py::arg("retry_count") = -1, py::arg("force") = false,
py::arg("timeout_sec") = 0.0f, py::arg("opt") = py::dict()
);
m.def("set_fastapi", [](const py::object &check_route, const py::object &submit_coro) {
static void *fastapi_tag = nullptr;
NoticeCenter::Instance().delListener(&fastapi_tag, Broadcast::kBroadcastHttpRequest);