同步代码
Some checks failed
Android / build (push) Has been cancelled
CodeQL / Analyze (cpp) (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
Docker / build (push) Has been cancelled
Linux / build (push) Has been cancelled
macOS / build (push) Has been cancelled
Windows / build (push) Has been cancelled

This commit is contained in:
xia-chu 2026-01-21 22:16:11 +08:00
parent 788e34d848
commit 7534a70f34
11 changed files with 181 additions and 45 deletions

View File

@ -115,8 +115,6 @@ static void responseApi(int code, const string &msg, const HttpSession::HttpResp
responseApi(res, invoker);
}
static ApiArgsType getAllArgs(const Parser &parser);
static HttpApi toApi(const function<void(API_ARGS_MAP_ASYNC)> &cb) {
return [cb](const Parser &parser, const HttpSession::HttpResponseInvoker &invoker, SockInfo &sender) {
GET_CONFIG(string, charSet, Http::kCharSet);
@ -215,7 +213,7 @@ void api_regist(const string &api_path, const function<void(API_ARGS_STRING_ASYN
// 获取HTTP请求中url参数、content参数 [AUTO-TRANSLATED:d161a1e1]
// Get URL parameters and content parameters from the HTTP request
static ApiArgsType getAllArgs(const Parser &parser) {
ApiArgsType getAllArgs(const Parser &parser) {
ApiArgsType allArgs;
if (parser["Content-Type"].find("application/x-www-form-urlencoded") == 0) {
auto contentArgs = parser.parseArgs(parser.content());

View File

@ -238,6 +238,7 @@ uint16_t openRtpServer(uint16_t local_port, const mediakit::MediaTuple &tuple, i
#endif
Json::Value makeMediaSourceJson(mediakit::MediaSource &media);
ApiArgsType getAllArgs(const mediakit::Parser &parser);
void getStatisticJson(const std::function<void(Json::Value &val)> &cb);
void addStreamProxy(const mediakit::MediaTuple &tuple, const std::string &url, int retry_count,
const mediakit::ProtocolOption &option, int rtp_type, float timeout_sec, const toolkit::mINI &args,

View File

@ -18,6 +18,7 @@
#include "Http/HttpRequester.h"
#include "Network/Session.h"
#include "Rtsp/RtspSession.h"
#include "Player/PlayerProxy.h"
#include "WebHook.h"
#include "WebApi.h"
@ -500,10 +501,6 @@ void installWebHook() {
// 监听rtsp、rtmp源注册或注销事件 [AUTO-TRANSLATED:6396afa8]
// Listen to rtsp, rtmp source registration or deregistration events
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastMediaChanged, [](BroadcastMediaChangedArgs) {
GET_CONFIG(string, hook_stream_changed, Hook::kOnStreamChanged);
if (!hook_enable || hook_stream_changed.empty()) {
return;
}
GET_CONFIG_FUNC(std::set<std::string>, stream_changed_set, Hook::kStreamChangedSchemas, [](const std::string &str) {
std::set<std::string> ret;
auto vec = split(str, "/");
@ -520,6 +517,15 @@ void installWebHook() {
// This protocol registration deregistration event is ignored
return;
}
#if defined(ENABLE_PYTHON)
if (PythonInvoker::Instance().on_media_changed(bRegist, sender)) {
return;
}
#endif
GET_CONFIG(string, hook_stream_changed, Hook::kOnStreamChanged);
if (!hook_enable || hook_stream_changed.empty()) {
return;
}
ArgsType body;
if (bRegist) {
@ -799,6 +805,14 @@ void installWebHook() {
do_http_hook(rtp_server_timeout, body);
});
NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastPlayerProxyFailed, [](BroadcastPlayerProxyFailedArgs) {
#if defined(ENABLE_PYTHON)
if (PythonInvoker::Instance().on_player_proxy_failed(sender, ex)) {
return;
}
#endif
});
// 汇报服务器重新启动 [AUTO-TRANSLATED:bd7d83df]
// Report server restart
reportServerStarted();

View File

@ -273,6 +273,16 @@ int start_main(int argc,char *argv[]) {
}
#endif //! defined(_WIN32)
// 设置poller线程数和cpu亲和性,该函数必须在使用ZLToolKit网络相关对象之前调用才能生效 [AUTO-TRANSLATED:7f03a1e5]
// Set the number of poller threads and CPU affinity. This function must be called before using ZLToolKit network related objects to take effect.
// 如果需要调用getSnap和addFFmpegSource接口可以关闭cpu亲和性 [AUTO-TRANSLATED:7629f7bc]
// If you need to call the getSnap and addFFmpegSource interfaces, you can turn off CPU affinity
EventPollerPool::setPoolSize(threads);
WorkThreadPool::setPoolSize(threads);
EventPollerPool::enableCpuAffinity(affinity);
WorkThreadPool::enableCpuAffinity(affinity);
// 开启崩溃捕获等 [AUTO-TRANSLATED:9c7c759c]
// Enable crash capture, etc.
System::systemSetup();
@ -329,15 +339,6 @@ int start_main(int argc,char *argv[]) {
uint16_t httpsPort = mINI::Instance()[Http::kSSLPort];
uint16_t rtpPort = mINI::Instance()[RtpProxy::kPort];
// 设置poller线程数和cpu亲和性,该函数必须在使用ZLToolKit网络相关对象之前调用才能生效 [AUTO-TRANSLATED:7f03a1e5]
// Set the number of poller threads and CPU affinity. This function must be called before using ZLToolKit network related objects to take effect.
// 如果需要调用getSnap和addFFmpegSource接口可以关闭cpu亲和性 [AUTO-TRANSLATED:7629f7bc]
// If you need to call the getSnap and addFFmpegSource interfaces, you can turn off CPU affinity
EventPollerPool::setPoolSize(threads);
WorkThreadPool::setPoolSize(threads);
EventPollerPool::enableCpuAffinity(affinity);
// 简单的telnet服务器可用于服务器调试但是不能使用23端口否则telnet上了莫名其妙的现象 [AUTO-TRANSLATED:f9324c6e]
// Simple telnet server, can be used for server debugging, but cannot use port 23, otherwise telnet will have inexplicable phenomena
// 测试方法:telnet 127.0.0.1 9000 [AUTO-TRANSLATED:de0ac883]
@ -508,9 +509,11 @@ int start_main(int argc,char *argv[]) {
#endif
#if defined(ENABLE_PYTHON)
// 初始化python解释器
auto &ref = PythonInvoker::Instance();
auto py_plugin = mINI::Instance()[Python::kPlugin];
if (!py_plugin.empty()) {
PythonInvoker::Instance().load(py_plugin);
ref.load(py_plugin);
}
#endif
sem.wait();
@ -519,6 +522,10 @@ int start_main(int argc,char *argv[]) {
unInstallWebHook();
onProcessExited();
#if defined(ENABLE_PYTHON)
PythonInvoker::release();
#endif
// 休眠1秒再退出防止资源释放顺序错误 [AUTO-TRANSLATED:1b11a74f]
// sleep for 1 second before exiting, to prevent resource release order errors
InfoL << "程序退出中,请等待...";

View File

@ -7,7 +7,10 @@
#include <iostream>
#include <string>
#include <type_traits>
#include "WebApi.h"
#include "WebHook.h"
#include "Util/util.h"
#include "Util/File.h"
#include "Common/Parser.h"
#include "Http/HttpSession.h"
@ -70,6 +73,11 @@ py::dict to_python(const SockInfo &info) {
return jsonToPython(json);
}
template <typename T>
std::shared_ptr<T> to_python2(const T &t) {
return std::shared_ptr<T>(const_cast<T *>(&t), py::nodelete());
}
template <typename T>
T &to_native(const py::capsule &cap) {
static auto name_str = toolkit::demangle(typeid(T).name());
@ -110,6 +118,22 @@ void handle_http_request(const py::object &check_route, const py::object &submit
}
consumed = true;
// http api被python拦截了再api统一鉴权
try {
auto args = getAllArgs(parser);
auto allArgs = ArgsMap(parser, args);
GET_CONFIG(std::string, api_secret, API::kSecret);
CHECK_SECRET(); // 检测secret
} catch (std::exception &ex) {
Json::Value val;
val["code"] = API::Exception;
val["msg"] = ex.what();
HttpSession::KeyValue headerOut;
headerOut["Content-Type"] = "application/json";
invoker(200, headerOut, val.toStyledString());
return;
}
StrCaseMap resp_headers;
std::string resp_body;
int status = 500;
@ -152,11 +176,18 @@ PYBIND11_EMBEDDED_MODULE(mk_loader, m) {
}
return "";
});
m.def("get_full_path", [](const std::string &path, const std::string &current_path) -> std::string {
py::gil_scoped_release release;
return File::absolutePath(path, current_path);
});
m.def("set_config", [](const std::string &key, const std::string &value) -> bool {
py::gil_scoped_release release;
mINI::Instance()[key]= value;
return true;
});
m.def("update_config", []() {
NOTICE_EMIT(BroadcastReloadConfigArgs, Broadcast::kBroadcastReloadConfig);
mINI::Instance().dumpFile(g_ini_file);
@ -171,6 +202,7 @@ PYBIND11_EMBEDDED_MODULE(mk_loader, m) {
auto &invoker = to_native<Broadcast::PublishAuthInvoker>(cap);
invoker(err, option);
});
m.def("auth_invoker_do", [](const py::capsule &cap, const std::string &err) {
// 执行c++代码时释放gil锁
py::gil_scoped_release release;
@ -186,6 +218,46 @@ PYBIND11_EMBEDDED_MODULE(mk_loader, m) {
});
});
py::enum_<TrackType>(m, "TrackType")
.value("Invalid", TrackInvalid)
.value("Video", TrackVideo)
.value("Audio", TrackAudio)
.value("Title", TrackTitle)
.value("Application", TrackApplication)
.export_values();
py::class_<MediaSource, MediaSource::Ptr>(m, "MediaSource")
.def("getSchema", &MediaSource::getSchema)
.def("getUrl", &MediaSource::getUrl)
.def("getMediaTuple", &MediaSource::getMediaTuple)
.def("getTimeStamp", &MediaSource::getTimeStamp)
.def("setTimeStamp", &MediaSource::setTimeStamp)
.def("getBytesSpeed", &MediaSource::getBytesSpeed)
.def("getTotalBytes", &MediaSource::getTotalBytes)
.def("getCreateStamp", &MediaSource::getCreateStamp)
.def("getAliveSecond", &MediaSource::getAliveSecond)
.def("readerCount", &MediaSource::readerCount)
.def("totalReaderCount", &MediaSource::totalReaderCount)
.def("getOriginType", &MediaSource::getOriginType)
.def("getOriginUrl", &MediaSource::getOriginUrl)
.def("getOriginSock", &MediaSource::getOriginSock)
.def("seekTo", &MediaSource::seekTo)
.def("pause", &MediaSource::pause)
.def("speed", &MediaSource::speed)
.def("close", &MediaSource::close)
.def("setupRecord", &MediaSource::setupRecord)
.def("isRecording", &MediaSource::isRecording)
.def("stopSendRtp", &MediaSource::stopSendRtp)
.def("getLossRate", &MediaSource::getLossRate);
py::class_<MediaTuple, std::shared_ptr<MediaTuple>>(m, "MediaTuple")
.def_readwrite("vhost", &MediaTuple::vhost)
.def_readwrite("app", &MediaTuple::app)
.def_readwrite("stream", &MediaTuple::stream)
.def_readwrite("params", &MediaTuple::params)
.def("shortUrl", &MediaTuple::shortUrl);
py::class_<SockException, std::shared_ptr<SockException>>(m, "SockException").def("what", &SockException::what).def("code", &SockException::getErrCode);
}
namespace mediakit {
@ -215,9 +287,18 @@ bool set_python_path() {
return true;
}
static std::shared_ptr<PythonInvoker> g_instance;
PythonInvoker &PythonInvoker::Instance() {
static std::shared_ptr<PythonInvoker> instance(new PythonInvoker);
return *instance;
static toolkit::onceToken s_token([]() {
g_instance.reset(new PythonInvoker);
});
return *g_instance;
}
void PythonInvoker::release() {
g_instance = nullptr;
}
PythonInvoker::PythonInvoker() {
@ -244,32 +325,34 @@ PythonInvoker::~PythonInvoker() {
}
_on_exit = py::object();
_on_publish = py::object();
_on_play = py::object();
_on_flow_report = py::object();
_on_reload_config = py::object();
_on_media_changed = py::object();
_on_player_proxy_failed = py::object();
_module = py::module();
}
delete _rel;
delete _interpreter;
}
#define GET_FUNC(instance, name) \
if (hasattr(instance, #name)) { \
_##name = instance.attr(#name); \
}
void PythonInvoker::load(const std::string &module_name) {
try {
py::gil_scoped_acquire gil; // 加锁
_module = py::module::import(module_name.c_str());
if (hasattr(_module, "on_exit")) {
_on_exit = _module.attr("on_exit");
}
if (hasattr(_module, "on_publish")) {
_on_publish = _module.attr("on_publish");
}
if (hasattr(_module, "on_play")) {
_on_play = _module.attr("on_play");
}
if (hasattr(_module, "on_flow_report")) {
_on_flow_report = _module.attr("on_flow_report");
}
if (hasattr(_module, "on_reload_config")) {
_on_reload_config = _module.attr("on_reload_config");
}
GET_FUNC(_module, on_exit);
GET_FUNC(_module, on_publish);
GET_FUNC(_module, on_play);
GET_FUNC(_module, on_flow_report);
GET_FUNC(_module, on_reload_config);
GET_FUNC(_module, on_media_changed);
GET_FUNC(_module, on_player_proxy_failed);
if (hasattr(_module, "on_start")) {
py::object on_start = _module.attr("on_start");
if (on_start) {
@ -305,6 +388,22 @@ bool PythonInvoker::on_flow_report(BroadcastFlowReportArgs) const {
return _on_flow_report(to_python(args), totalBytes, totalDuration, isPlayer, to_python(sender)).cast<bool>();
}
bool PythonInvoker::on_media_changed(BroadcastMediaChangedArgs) const {
py::gil_scoped_acquire gil; // 确保在 Python 调用期间持有 GIL
if (!_on_media_changed) {
return false;
}
return _on_media_changed(bRegist, to_python2(sender)).cast<bool>();
}
bool PythonInvoker::on_player_proxy_failed(BroadcastPlayerProxyFailedArgs) const {
py::gil_scoped_acquire gil; // 确保在 Python 调用期间持有 GIL
if (!_on_player_proxy_failed) {
return false;
}
return _on_player_proxy_failed(sender.getUrl(), to_python2(sender.getMediaTuple()), to_python2(ex)).cast<bool>();
}
} // namespace mediakit
#endif

View File

@ -11,6 +11,7 @@
#include "Util/logger.h"
#include "Common/config.h"
#include "Common/MediaSource.h"
#include "Player/PlayerProxy.h"
namespace py = pybind11;
@ -21,11 +22,14 @@ public:
~PythonInvoker();
static PythonInvoker& Instance();
static void release();
void load(const std::string &module_name);
bool on_publish(BroadcastMediaPublishArgs) const;
bool on_play(BroadcastMediaPlayedArgs) const;
bool on_flow_report(BroadcastFlowReportArgs) const;
bool on_media_changed(BroadcastMediaChangedArgs) const;
bool on_player_proxy_failed(BroadcastPlayerProxyFailedArgs) const;
private:
PythonInvoker();
@ -46,6 +50,10 @@ private:
py::object _on_flow_report;
// 配置文件热更新回调
py::object _on_reload_config;
// 媒体注册注销
py::object _on_media_changed;
// 拉流代理失败
py::object _on_player_proxy_failed;
};
} // namespace mediakit

View File

@ -81,6 +81,7 @@ const string kBroadcastRtcSctpClosed = "kBroadcastRtcSctpClosed";
const string kBroadcastRtcSctpSend = "kBroadcastRtcSctpSend";
const string kBroadcastRtcSctpReceived = "kBroadcastRtcSctpReceived";
const string kBroadcastPlayerCountChanged = "kBroadcastPlayerCountChanged";
const string kBroadcastPlayerProxyFailed = "kBroadcastPlayerProxyFailed";
} // namespace Broadcast

View File

@ -161,6 +161,9 @@ extern const std::string kBroadcastRtcSctpReceived;
extern const std::string kBroadcastPlayerCountChanged;
#define BroadcastPlayerCountChangedArgs const MediaTuple& args, const int& count
extern const std::string kBroadcastPlayerProxyFailed;
#define BroadcastPlayerProxyFailedArgs const PlayerProxy& sender, const toolkit::SockException &ex
#define ReloadConfigTag ((void *)(0xFF))
#define RELOAD_KEY(arg, key) \
do { \

View File

@ -110,7 +110,9 @@ void PlayerProxy::play(const string &strUrlTmp) {
if (!strongSelf) {
return;
}
if (err) {
NOTICE_EMIT(BroadcastPlayerProxyFailedArgs, Broadcast::kBroadcastPlayerProxyFailed, *strongSelf, err);
}
if (strongSelf->_on_play) {
strongSelf->_on_play(err);
strongSelf->_on_play = nullptr;
@ -146,6 +148,9 @@ void PlayerProxy::play(const string &strUrlTmp) {
if (!strongSelf) {
return;
}
if (err) {
NOTICE_EMIT(BroadcastPlayerProxyFailedArgs, Broadcast::kBroadcastPlayerProxyFailed, *strongSelf, err);
}
// 注销直接拉流代理产生的流:#532 [AUTO-TRANSLATED:c6343a3b]
// Unregister the stream generated by the direct stream proxy: #532

View File

@ -18,8 +18,7 @@
namespace mediakit {
struct StreamInfo
{
struct StreamInfo {
TrackType codec_type;
std::string codec_name;
int bitrate;
@ -30,8 +29,7 @@ struct StreamInfo
int video_height;
float video_fps;
StreamInfo()
{
StreamInfo() {
codec_type = TrackInvalid;
codec_name = "none";
bitrate = -1;
@ -44,14 +42,12 @@ struct StreamInfo
}
};
struct TranslationInfo
{
struct TranslationInfo {
std::vector<StreamInfo> stream_info;
int byte_speed;
uint64_t start_time_stamp;
TranslationInfo()
{
TranslationInfo() {
byte_speed = -1;
start_time_stamp = 0;
}

View File

@ -19,7 +19,11 @@ using namespace toolkit;
namespace mediakit {
MP4Muxer::~MP4Muxer() {
closeMP4();
try {
closeMP4();
} catch (std::exception &e) {
WarnL << e.what();
}
}
void MP4Muxer::openMP4(const string &file) {