新增更新流代理的功能,支持修改已有拉流代理的URL和参数
Some checks failed
Android / build (push) Has been cancelled
CodeQL / Analyze (cpp) (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
Docker / build (push) Has been cancelled
Linux / build (push) Has been cancelled
Linux_Python / build (push) Has been cancelled
macOS / build (push) Has been cancelled
macOS_Python / build (push) Has been cancelled
Windows / build (push) Has been cancelled
Windows_Python / build (push) Has been cancelled

This commit is contained in:
xia-chu 2026-03-20 21:59:55 +08:00
parent 0f704cca47
commit 899d9653a4
5 changed files with 61 additions and 25 deletions

View File

@ -592,6 +592,17 @@ void getStatisticJson(const function<void(Value &val)> &cb) {
#endif #endif
} }
void updateStreamProxy(const mediakit::MediaTuple &tuple, const std::string &url, const toolkit::mINI &args) {
auto key = tuple.shortUrl();
auto player = s_player_proxy.find(key);
if (!player) {
throw std::runtime_error("proxy player not found: " + key);
}
player->getPoller()->async([url, args, player]() {
player->update(url, args);
});
}
void addStreamProxy(const MediaTuple &tuple, const string &url, int retry_count, bool force, void addStreamProxy(const MediaTuple &tuple, const string &url, int retry_count, bool force,
const ProtocolOption &option, float timeout_sec, const mINI &args, const ProtocolOption &option, float timeout_sec, const mINI &args,
const function<void(const SockException &ex, const string &key)> &cb) { const function<void(const SockException &ex, const string &key)> &cb) {

View File

@ -252,6 +252,8 @@ void addStreamProxy(const mediakit::MediaTuple &tuple, const std::string &url, i
const mediakit::ProtocolOption &option, float timeout_sec, const toolkit::mINI &args, const mediakit::ProtocolOption &option, float timeout_sec, const toolkit::mINI &args,
const std::function<void(const toolkit::SockException &ex, const std::string &key)> &cb); const std::function<void(const toolkit::SockException &ex, const std::string &key)> &cb);
void updateStreamProxy(const mediakit::MediaTuple &tuple, const std::string &url, const toolkit::mINI &args);
template <typename Type> template <typename Type>
class ServiceController { class ServiceController {
public: public:

View File

@ -374,6 +374,20 @@ PYBIND11_EMBEDDED_MODULE(mk_loader, m) {
py::arg("timeout_sec") = 0.0f, py::arg("opt") = py::dict() py::arg("timeout_sec") = 0.0f, py::arg("opt") = py::dict()
); );
// update_stream_proxy(vhost, app, stream, url, opt={})
// 更新已有拉流代理的 url 和参数,流不存在时抛出异常
m.def("update_stream_proxy",
[](const std::string &vhost, const std::string &app, const std::string &stream,
const std::string &url, const py::dict &opt) {
mINI args = to_native(opt);
MediaTuple tuple { vhost.empty() ? DEFAULT_VHOST : vhost, app, stream, "" };
py::gil_scoped_release release;
updateStreamProxy(tuple, url, args);
},
py::arg("vhost"), py::arg("app"), py::arg("stream"), py::arg("url"),
py::arg("opt") = py::dict()
);
m.def("set_fastapi", [](const py::object &check_route, const py::object &submit_coro) { m.def("set_fastapi", [](const py::object &check_route, const py::object &submit_coro) {
static void *fastapi_tag = nullptr; static void *fastapi_tag = nullptr;
NoticeCenter::Instance().delListener(&fastapi_tag, Broadcast::kBroadcastHttpRequest); NoticeCenter::Instance().delListener(&fastapi_tag, Broadcast::kBroadcastHttpRequest);

View File

@ -42,6 +42,16 @@ PlayerProxy::PlayerProxy(
(*this)[Client::kWaitTrackReady] = false; (*this)[Client::kWaitTrackReady] = false;
} }
void PlayerProxy::update(const std::string &url, const toolkit::mINI &args) {
CHECK(getPoller()->isCurrentThread());
_pull_url = url;
this->mINI::clear();
(*this)[Client::kWaitTrackReady] = false;
for (auto &pr : args) {
(*this)[pr.first] = pr.second;
}
}
void PlayerProxy::setPlayCallbackOnce(function<void(const SockException &ex)> cb) { void PlayerProxy::setPlayCallbackOnce(function<void(const SockException &ex)> cb) {
_on_play = std::move(cb); _on_play = std::move(cb);
} }
@ -99,11 +109,12 @@ static int getMaxTrackSize(const std::string &url) {
return 2; return 2;
} }
void PlayerProxy::play(const string &strUrlTmp) { void PlayerProxy::play(const string &url) {
_option.max_track = getMaxTrackSize(strUrlTmp); _pull_url = url;
_option.max_track = getMaxTrackSize(_pull_url);
weak_ptr<PlayerProxy> weakSelf = shared_from_this(); weak_ptr<PlayerProxy> weakSelf = shared_from_this();
std::shared_ptr<int> piFailedCnt(new int(0)); // 连续播放失败次数 std::shared_ptr<int> piFailedCnt(new int(0)); // 连续播放失败次数
setOnPlayResult([weakSelf, strUrlTmp, piFailedCnt](const SockException &err) { setOnPlayResult([weakSelf, piFailedCnt](const SockException &err) {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if (!strongSelf) { if (!strongSelf) {
return; return;
@ -131,20 +142,20 @@ void PlayerProxy::play(const string &strUrlTmp) {
strongSelf->setTranslationInfo(); strongSelf->setTranslationInfo();
strongSelf->_on_connect(strongSelf->_transtalion_info); strongSelf->_on_connect(strongSelf->_transtalion_info);
InfoL << "play " << strUrlTmp << " success"; InfoL << "play " << strongSelf->_pull_url << " success";
strongSelf->_status = std::make_shared<std::string>("playing"); strongSelf->_status = std::make_shared<std::string>("playing");
} else if (*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { } else if (*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) {
// 播放失败,延时重试播放 [AUTO-TRANSLATED:d7537c9c] // 播放失败,延时重试播放 [AUTO-TRANSLATED:d7537c9c]
// Play failed, retry playing with delay // Play failed, retry playing with delay
strongSelf->_on_disconnect(); strongSelf->_on_disconnect();
strongSelf->rePlay(strUrlTmp, (*piFailedCnt)++); strongSelf->rePlay((*piFailedCnt)++);
} else { } else {
// 达到了最大重试次数,回调关闭 [AUTO-TRANSLATED:610f31f3] // 达到了最大重试次数,回调关闭 [AUTO-TRANSLATED:610f31f3]
// Reached the maximum number of retries, callback to close // Reached the maximum number of retries, callback to close
strongSelf->_on_close(err); strongSelf->_on_close(err);
} }
}); });
setOnShutdown([weakSelf, strUrlTmp, piFailedCnt](const SockException &err) { setOnShutdown([weakSelf, piFailedCnt](const SockException &err) {
auto strongSelf = weakSelf.lock(); auto strongSelf = weakSelf.lock();
if (!strongSelf) { if (!strongSelf) {
return; return;
@ -188,7 +199,7 @@ void PlayerProxy::play(const string &strUrlTmp) {
// Play interrupted abnormally, retry playing with delay // Play interrupted abnormally, retry playing with delay
if (*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) { if (*piFailedCnt < strongSelf->_retry_count || strongSelf->_retry_count < 0) {
strongSelf->_repull_count++; strongSelf->_repull_count++;
strongSelf->rePlay(strUrlTmp, (*piFailedCnt)++); strongSelf->rePlay((*piFailedCnt)++);
} else { } else {
// 达到了最大重试次数,回调关闭 [AUTO-TRANSLATED:610f31f3] // 达到了最大重试次数,回调关闭 [AUTO-TRANSLATED:610f31f3]
// Reached the maximum number of retries, callback to close // Reached the maximum number of retries, callback to close
@ -197,14 +208,13 @@ void PlayerProxy::play(const string &strUrlTmp) {
}); });
try { try {
_status = std::make_shared<std::string>("connecting"); _status = std::make_shared<std::string>("connecting");
MediaPlayer::play(strUrlTmp); MediaPlayer::play(_pull_url );
} catch (std::exception &ex) { } catch (std::exception &ex) {
_status = std::make_shared<std::string>(std::string("play failed: ") + ex.what()); _status = std::make_shared<std::string>(std::string("play failed: ") + ex.what());
ErrorL << ex.what(); ErrorL << ex.what();
onPlayResult(SockException(Err_other, ex.what())); onPlayResult(SockException(Err_other, ex.what()));
return; return;
} }
_pull_url = strUrlTmp;
setDirectProxy(); setDirectProxy();
} }
@ -244,24 +254,21 @@ PlayerProxy::~PlayerProxy() {
} }
} }
void PlayerProxy::rePlay(const string &strUrl, int iFailedCnt) { void PlayerProxy::rePlay(int iFailedCnt) {
auto iDelay = MAX(_reconnect_delay_min * 1000, MIN(iFailedCnt * _reconnect_delay_step * 1000, _reconnect_delay_max * 1000)); auto iDelay = MAX(_reconnect_delay_min * 1000, MIN(iFailedCnt * _reconnect_delay_step * 1000, _reconnect_delay_max * 1000));
weak_ptr<PlayerProxy> weakSelf = shared_from_this(); weak_ptr<PlayerProxy> weakSelf = shared_from_this();
_timer = std::make_shared<Timer>( _timer = std::make_shared<Timer>(iDelay / 1000.0f, [weakSelf, iFailedCnt]() {
iDelay / 1000.0f, // 播放失败次数越多,则延时越长 [AUTO-TRANSLATED:5af39264]
[weakSelf, strUrl, iFailedCnt]() { // The more times the playback fails, the longer the delay
// 播放失败次数越多,则延时越长 [AUTO-TRANSLATED:5af39264] auto strongPlayer = weakSelf.lock();
// The more times the playback fails, the longer the delay if (!strongPlayer) {
auto strongPlayer = weakSelf.lock();
if (!strongPlayer) {
return false;
}
WarnL << "重试播放[" << iFailedCnt << "]:" << strUrl;
strongPlayer->MediaPlayer::play(strUrl);
strongPlayer->setDirectProxy();
return false; return false;
}, }
getPoller()); WarnL << "重试播放[" << iFailedCnt << "]:" << strongPlayer->_pull_url;
strongPlayer->MediaPlayer::play(strongPlayer->_pull_url);
strongPlayer->setDirectProxy();
return false;
}, getPoller());
} }
bool PlayerProxy::close(MediaSource &sender) { bool PlayerProxy::close(MediaSource &sender) {

View File

@ -140,6 +140,8 @@ public:
const MediaTuple& getMediaTuple() const { return _tuple; } const MediaTuple& getMediaTuple() const { return _tuple; }
const ProtocolOption& getOption() const { return _option; } const ProtocolOption& getOption() const { return _option; }
void update(const std::string &url, const toolkit::mINI &args);
private: private:
// MediaSourceEvent override // MediaSourceEvent override
bool close(MediaSource &sender) override; bool close(MediaSource &sender) override;
@ -150,7 +152,7 @@ private:
float getLossRate(MediaSource &sender, TrackType type) override; float getLossRate(MediaSource &sender, TrackType type) override;
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override;
void rePlay(const std::string &strUrl, int iFailedCnt); void rePlay(int iFailedCnt);
void onPlaySuccess(); void onPlaySuccess();
void setDirectProxy(); void setDirectProxy();
void setTranslationInfo(); void setTranslationInfo();