新增流健康度探针功能
Some checks are pending
Android / build (push) Waiting to run
CodeQL / Analyze (cpp) (push) Waiting to run
CodeQL / Analyze (javascript) (push) Waiting to run
Docker / build (push) Waiting to run
DockerPy / build (push) Waiting to run
Linux / build (push) Waiting to run
Linux_Python / build (push) Waiting to run
macOS / build (push) Waiting to run
macOS_Python / build (push) Waiting to run
Windows / build (push) Waiting to run
Windows_Python / build (push) Waiting to run

This commit is contained in:
xia-chu 2026-05-03 13:34:18 +08:00
parent 6f9531c5fa
commit c440c45ce4
4 changed files with 129 additions and 0 deletions

View File

@ -3279,6 +3279,57 @@
}
},
"response": []
},
{
"name": "添加探针(addProbe)",
"request": {
"method": "GET",
"header": [],
"url": {
"raw": "{{ZLMediaKit_URL}}/index/api/addProbe?secret={{ZLMediaKit_secret}}&vhost={{defaultVhost}}&app=live&stream=test&probe_ms=5000",
"host": [
"{{ZLMediaKit_URL}}"
],
"path": [
"index",
"api",
"addProbe"
],
"query": [
{
"key": "secret",
"value": "{{ZLMediaKit_secret}}",
"description": "api操作密钥(配置文件配置)",
"type": "text"
},
{
"key": "vhost",
"value": "{{defaultVhost}}",
"description": "流的虚拟主机例如__defaultVhost__",
"type": "text"
},
{
"key": "app",
"value": "live",
"description": "流的应用名例如live",
"type": "text"
},
{
"key": "stream",
"value": "test",
"description": "流的id名例如test",
"type": "text"
},
{
"key": "probe_ms",
"value": "5000",
"description": "探针时长,单位毫秒",
"type": "text"
}
]
}
},
"response": []
}
],
"event": [

View File

@ -2651,6 +2651,39 @@ void installWebApi() {
invoker(200, headerOut, val.toStyledString());
});
});
api_regist("/index/api/addProbe", [](API_ARGS_MAP_ASYNC) {
CHECK_SECRET();
CHECK_ARGS("vhost", "app", "stream", "probe_ms");
std::string vhost = allArgs["vhost"];
std::string app = allArgs["app"];
std::string stream = allArgs["stream"];
uint32_t probe_ms = allArgs["probe_ms"];
auto src = MediaSource::find(vhost, app, stream);
if (!src) {
throw ApiRetException("can not find the stream", API::NotFound);
}
src->getOwnerPoller()->async([=]() mutable {
src->getMuxer()->addProbe(probe_ms, [=](const std::list<FrameInfo> &info_list) mutable {
for (const auto &info : info_list) {
Json::Value item;
item["codec"] = getCodecName(info.codec_id);
item["track_type"] = getTrackString(getTrackType(info.codec_id));
item["dts"] = (Json::Int64)info.dts;
item["pts"] = (Json::Int64)info.pts;
item["recv_stamp"] = (Json::Int64)info.recv_stamp;
item["frame_size"] = (Json::UInt)info.frame_size;
item["index"] = info.index;
item["key_frame"] = info.key_frame;
item["config_frame"] = info.config_frame;
val["data"].append(std::move(item));
}
invoker(200, headerOut, val.toStyledString());
});
});
});
}
void unInstallWebApi(){

View File

@ -818,7 +818,36 @@ void MultiMediaSourceMuxer::resetTracks() {
}
}
void MultiMediaSourceMuxer::addProbe(uint32_t probe_ms, const std::function<void(const std::list<FrameInfo> &info_list)> &cb) {
CHECK(getOwnerPoller(MediaSource::NullMediaSource())->isCurrentThread());
auto info_list = std::make_shared<std::list<FrameInfo>>();
Ticker ticker;
_on_frame = [info_list, ticker](const Frame::Ptr &frame) mutable {
FrameInfo info;
info.codec_id = frame->getCodecId();
info.dts = frame->dts();
info.pts = frame->pts();
info.recv_stamp = ticker.createdTime();
info.frame_size = frame->size();
info.index = frame->getIndex();
info.key_frame = frame->keyFrame();
info.config_frame = frame->configFrame();
info_list->emplace_back(info);
};
std::weak_ptr<MultiMediaSourceMuxer> weak_self = shared_from_this();
getOwnerPoller(MediaSource::NullMediaSource())->doDelayTask(probe_ms, [weak_self, cb, info_list]() {
if (auto strong_self = weak_self.lock()) {
strong_self->_on_frame = nullptr;
}
cb(*info_list);
return 0;
});
}
bool MultiMediaSourceMuxer::onTrackFrame(const Frame::Ptr &frame_in) {
if (_on_frame) {
_on_frame(frame_in);
}
auto frame = frame_in;
if (_option.modify_stamp != ProtocolOption::kModifyStampOff) {
// 时间戳不采用原始的绝对时间戳 [AUTO-TRANSLATED:8beb3bf7]

View File

@ -25,6 +25,18 @@
namespace mediakit {
struct FrameInfo {
CodecId codec_id = CodecInvalid;
int64_t dts = 0;
int64_t pts = 0;
int64_t recv_stamp = 0;
size_t frame_size = 0;
int index = 0;
bool key_frame = false;
bool config_frame = false;
};
class MultiMediaSourceMuxer : public MediaSourceEventInterceptor, public MediaSink, public toolkit::noncopyable, public std::enable_shared_from_this<MultiMediaSourceMuxer>{
public:
using Ptr = std::shared_ptr<MultiMediaSourceMuxer>;
@ -188,6 +200,9 @@ public:
#if defined(ENABLE_RTPPROXY)
void forEachRtpSender(const std::function<void(const std::string &ssrc, const RtpSender &sender)> &cb) const;
#endif // ENABLE_RTPPROXY
void addProbe(uint32_t probe_ms, const std::function<void(const std::list<FrameInfo> &info_list)> &cb);
protected:
/////////////////////////////////MediaSink override/////////////////////////////////
@ -231,6 +246,7 @@ private:
bool _create_in_poller = false;
bool _video_key_pos = false;
float _dur_sec;
std::function<void(const Frame::Ptr &frame)> _on_frame;
std::shared_ptr<class FramePacedSender> _paced_sender;
MediaTuple _tuple;
ProtocolOption _option;