diff --git a/server/WebApi.cpp b/server/WebApi.cpp index a822114a..49588b60 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -115,8 +115,6 @@ static void responseApi(int code, const string &msg, const HttpSession::HttpResp responseApi(res, invoker); } -static ApiArgsType getAllArgs(const Parser &parser); - static HttpApi toApi(const function &cb) { return [cb](const Parser &parser, const HttpSession::HttpResponseInvoker &invoker, SockInfo &sender) { GET_CONFIG(string, charSet, Http::kCharSet); @@ -215,7 +213,7 @@ void api_regist(const string &api_path, const function &cb); void addStreamProxy(const mediakit::MediaTuple &tuple, const std::string &url, int retry_count, const mediakit::ProtocolOption &option, int rtp_type, float timeout_sec, const toolkit::mINI &args, diff --git a/server/WebHook.cpp b/server/WebHook.cpp index d2498ffd..35642771 100755 --- a/server/WebHook.cpp +++ b/server/WebHook.cpp @@ -18,6 +18,7 @@ #include "Http/HttpRequester.h" #include "Network/Session.h" #include "Rtsp/RtspSession.h" +#include "Player/PlayerProxy.h" #include "WebHook.h" #include "WebApi.h" @@ -500,10 +501,6 @@ void installWebHook() { // 监听rtsp、rtmp源注册或注销事件 [AUTO-TRANSLATED:6396afa8] // Listen to rtsp, rtmp source registration or deregistration events NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastMediaChanged, [](BroadcastMediaChangedArgs) { - GET_CONFIG(string, hook_stream_changed, Hook::kOnStreamChanged); - if (!hook_enable || hook_stream_changed.empty()) { - return; - } GET_CONFIG_FUNC(std::set, stream_changed_set, Hook::kStreamChangedSchemas, [](const std::string &str) { std::set ret; auto vec = split(str, "/"); @@ -520,6 +517,15 @@ void installWebHook() { // This protocol registration deregistration event is ignored return; } +#if defined(ENABLE_PYTHON) + if (PythonInvoker::Instance().on_media_changed(bRegist, sender)) { + return; + } +#endif + GET_CONFIG(string, hook_stream_changed, Hook::kOnStreamChanged); + if (!hook_enable || hook_stream_changed.empty()) { + return; + } ArgsType body; if (bRegist) { @@ -799,6 +805,14 @@ void installWebHook() { do_http_hook(rtp_server_timeout, body); }); + NoticeCenter::Instance().addListener(&web_hook_tag, Broadcast::kBroadcastPlayerProxyFailed, [](BroadcastPlayerProxyFailedArgs) { +#if defined(ENABLE_PYTHON) + if (PythonInvoker::Instance().on_player_proxy_failed(sender, ex)) { + return; + } +#endif + }); + // 汇报服务器重新启动 [AUTO-TRANSLATED:bd7d83df] // Report server restart reportServerStarted(); diff --git a/server/main.cpp b/server/main.cpp index a6013bc9..6e1102a6 100644 --- a/server/main.cpp +++ b/server/main.cpp @@ -273,6 +273,16 @@ int start_main(int argc,char *argv[]) { } #endif //! defined(_WIN32) + // 设置poller线程数和cpu亲和性,该函数必须在使用ZLToolKit网络相关对象之前调用才能生效 [AUTO-TRANSLATED:7f03a1e5] + // Set the number of poller threads and CPU affinity. This function must be called before using ZLToolKit network related objects to take effect. + // 如果需要调用getSnap和addFFmpegSource接口,可以关闭cpu亲和性 [AUTO-TRANSLATED:7629f7bc] + // If you need to call the getSnap and addFFmpegSource interfaces, you can turn off CPU affinity + + EventPollerPool::setPoolSize(threads); + WorkThreadPool::setPoolSize(threads); + EventPollerPool::enableCpuAffinity(affinity); + WorkThreadPool::enableCpuAffinity(affinity); + // 开启崩溃捕获等 [AUTO-TRANSLATED:9c7c759c] // Enable crash capture, etc. System::systemSetup(); @@ -329,15 +339,6 @@ int start_main(int argc,char *argv[]) { uint16_t httpsPort = mINI::Instance()[Http::kSSLPort]; uint16_t rtpPort = mINI::Instance()[RtpProxy::kPort]; - // 设置poller线程数和cpu亲和性,该函数必须在使用ZLToolKit网络相关对象之前调用才能生效 [AUTO-TRANSLATED:7f03a1e5] - // Set the number of poller threads and CPU affinity. This function must be called before using ZLToolKit network related objects to take effect. - // 如果需要调用getSnap和addFFmpegSource接口,可以关闭cpu亲和性 [AUTO-TRANSLATED:7629f7bc] - // If you need to call the getSnap and addFFmpegSource interfaces, you can turn off CPU affinity - - EventPollerPool::setPoolSize(threads); - WorkThreadPool::setPoolSize(threads); - EventPollerPool::enableCpuAffinity(affinity); - // 简单的telnet服务器,可用于服务器调试,但是不能使用23端口,否则telnet上了莫名其妙的现象 [AUTO-TRANSLATED:f9324c6e] // Simple telnet server, can be used for server debugging, but cannot use port 23, otherwise telnet will have inexplicable phenomena // 测试方法:telnet 127.0.0.1 9000 [AUTO-TRANSLATED:de0ac883] @@ -508,9 +509,11 @@ int start_main(int argc,char *argv[]) { #endif #if defined(ENABLE_PYTHON) + // 初始化python解释器 + auto &ref = PythonInvoker::Instance(); auto py_plugin = mINI::Instance()[Python::kPlugin]; if (!py_plugin.empty()) { - PythonInvoker::Instance().load(py_plugin); + ref.load(py_plugin); } #endif sem.wait(); @@ -519,6 +522,10 @@ int start_main(int argc,char *argv[]) { unInstallWebHook(); onProcessExited(); +#if defined(ENABLE_PYTHON) + PythonInvoker::release(); +#endif + // 休眠1秒再退出,防止资源释放顺序错误 [AUTO-TRANSLATED:1b11a74f] // sleep for 1 second before exiting, to prevent resource release order errors InfoL << "程序退出中,请等待..."; diff --git a/server/pyinvoker.cpp b/server/pyinvoker.cpp index cee87800..5efe0a7e 100644 --- a/server/pyinvoker.cpp +++ b/server/pyinvoker.cpp @@ -7,7 +7,10 @@ #include #include #include +#include "WebApi.h" #include "WebHook.h" +#include "Util/util.h" +#include "Util/File.h" #include "Common/Parser.h" #include "Http/HttpSession.h" @@ -70,6 +73,11 @@ py::dict to_python(const SockInfo &info) { return jsonToPython(json); } +template +std::shared_ptr to_python2(const T &t) { + return std::shared_ptr(const_cast(&t), py::nodelete()); +} + template T &to_native(const py::capsule &cap) { static auto name_str = toolkit::demangle(typeid(T).name()); @@ -110,6 +118,22 @@ void handle_http_request(const py::object &check_route, const py::object &submit } consumed = true; + // http api被python拦截了,再api统一鉴权 + try { + auto args = getAllArgs(parser); + auto allArgs = ArgsMap(parser, args); + GET_CONFIG(std::string, api_secret, API::kSecret); + CHECK_SECRET(); // 检测secret + } catch (std::exception &ex) { + Json::Value val; + val["code"] = API::Exception; + val["msg"] = ex.what(); + HttpSession::KeyValue headerOut; + headerOut["Content-Type"] = "application/json"; + invoker(200, headerOut, val.toStyledString()); + return; + } + StrCaseMap resp_headers; std::string resp_body; int status = 500; @@ -152,11 +176,18 @@ PYBIND11_EMBEDDED_MODULE(mk_loader, m) { } return ""; }); + + m.def("get_full_path", [](const std::string &path, const std::string ¤t_path) -> std::string { + py::gil_scoped_release release; + return File::absolutePath(path, current_path); + }); + m.def("set_config", [](const std::string &key, const std::string &value) -> bool { py::gil_scoped_release release; mINI::Instance()[key]= value; return true; }); + m.def("update_config", []() { NOTICE_EMIT(BroadcastReloadConfigArgs, Broadcast::kBroadcastReloadConfig); mINI::Instance().dumpFile(g_ini_file); @@ -171,6 +202,7 @@ PYBIND11_EMBEDDED_MODULE(mk_loader, m) { auto &invoker = to_native(cap); invoker(err, option); }); + m.def("auth_invoker_do", [](const py::capsule &cap, const std::string &err) { // 执行c++代码时释放gil锁 py::gil_scoped_release release; @@ -186,6 +218,46 @@ PYBIND11_EMBEDDED_MODULE(mk_loader, m) { }); }); + py::enum_(m, "TrackType") + .value("Invalid", TrackInvalid) + .value("Video", TrackVideo) + .value("Audio", TrackAudio) + .value("Title", TrackTitle) + .value("Application", TrackApplication) + .export_values(); + + py::class_(m, "MediaSource") + .def("getSchema", &MediaSource::getSchema) + .def("getUrl", &MediaSource::getUrl) + .def("getMediaTuple", &MediaSource::getMediaTuple) + .def("getTimeStamp", &MediaSource::getTimeStamp) + .def("setTimeStamp", &MediaSource::setTimeStamp) + .def("getBytesSpeed", &MediaSource::getBytesSpeed) + .def("getTotalBytes", &MediaSource::getTotalBytes) + .def("getCreateStamp", &MediaSource::getCreateStamp) + .def("getAliveSecond", &MediaSource::getAliveSecond) + .def("readerCount", &MediaSource::readerCount) + .def("totalReaderCount", &MediaSource::totalReaderCount) + .def("getOriginType", &MediaSource::getOriginType) + .def("getOriginUrl", &MediaSource::getOriginUrl) + .def("getOriginSock", &MediaSource::getOriginSock) + .def("seekTo", &MediaSource::seekTo) + .def("pause", &MediaSource::pause) + .def("speed", &MediaSource::speed) + .def("close", &MediaSource::close) + .def("setupRecord", &MediaSource::setupRecord) + .def("isRecording", &MediaSource::isRecording) + .def("stopSendRtp", &MediaSource::stopSendRtp) + .def("getLossRate", &MediaSource::getLossRate); + + py::class_>(m, "MediaTuple") + .def_readwrite("vhost", &MediaTuple::vhost) + .def_readwrite("app", &MediaTuple::app) + .def_readwrite("stream", &MediaTuple::stream) + .def_readwrite("params", &MediaTuple::params) + .def("shortUrl", &MediaTuple::shortUrl); + + py::class_>(m, "SockException").def("what", &SockException::what).def("code", &SockException::getErrCode); } namespace mediakit { @@ -215,9 +287,18 @@ bool set_python_path() { return true; } +static std::shared_ptr g_instance; + PythonInvoker &PythonInvoker::Instance() { - static std::shared_ptr instance(new PythonInvoker); - return *instance; + static toolkit::onceToken s_token([]() { + g_instance.reset(new PythonInvoker); + }); + + return *g_instance; +} + +void PythonInvoker::release() { + g_instance = nullptr; } PythonInvoker::PythonInvoker() { @@ -244,32 +325,34 @@ PythonInvoker::~PythonInvoker() { } _on_exit = py::object(); _on_publish = py::object(); + _on_play = py::object(); + _on_flow_report = py::object(); + _on_reload_config = py::object(); + _on_media_changed = py::object(); + _on_player_proxy_failed = py::object(); _module = py::module(); } - delete _rel; delete _interpreter; } +#define GET_FUNC(instance, name) \ + if (hasattr(instance, #name)) { \ + _##name = instance.attr(#name); \ + } + void PythonInvoker::load(const std::string &module_name) { try { py::gil_scoped_acquire gil; // 加锁 _module = py::module::import(module_name.c_str()); - if (hasattr(_module, "on_exit")) { - _on_exit = _module.attr("on_exit"); - } - if (hasattr(_module, "on_publish")) { - _on_publish = _module.attr("on_publish"); - } - if (hasattr(_module, "on_play")) { - _on_play = _module.attr("on_play"); - } - if (hasattr(_module, "on_flow_report")) { - _on_flow_report = _module.attr("on_flow_report"); - } - if (hasattr(_module, "on_reload_config")) { - _on_reload_config = _module.attr("on_reload_config"); - } + GET_FUNC(_module, on_exit); + GET_FUNC(_module, on_publish); + GET_FUNC(_module, on_play); + GET_FUNC(_module, on_flow_report); + GET_FUNC(_module, on_reload_config); + GET_FUNC(_module, on_media_changed); + GET_FUNC(_module, on_player_proxy_failed); + if (hasattr(_module, "on_start")) { py::object on_start = _module.attr("on_start"); if (on_start) { @@ -305,6 +388,22 @@ bool PythonInvoker::on_flow_report(BroadcastFlowReportArgs) const { return _on_flow_report(to_python(args), totalBytes, totalDuration, isPlayer, to_python(sender)).cast(); } +bool PythonInvoker::on_media_changed(BroadcastMediaChangedArgs) const { + py::gil_scoped_acquire gil; // 确保在 Python 调用期间持有 GIL + if (!_on_media_changed) { + return false; + } + return _on_media_changed(bRegist, to_python2(sender)).cast(); +} + +bool PythonInvoker::on_player_proxy_failed(BroadcastPlayerProxyFailedArgs) const { + py::gil_scoped_acquire gil; // 确保在 Python 调用期间持有 GIL + if (!_on_player_proxy_failed) { + return false; + } + return _on_player_proxy_failed(sender.getUrl(), to_python2(sender.getMediaTuple()), to_python2(ex)).cast(); +} + } // namespace mediakit #endif diff --git a/server/pyinvoker.h b/server/pyinvoker.h index 9b9741fb..f5d87941 100644 --- a/server/pyinvoker.h +++ b/server/pyinvoker.h @@ -11,6 +11,7 @@ #include "Util/logger.h" #include "Common/config.h" #include "Common/MediaSource.h" +#include "Player/PlayerProxy.h" namespace py = pybind11; @@ -21,11 +22,14 @@ public: ~PythonInvoker(); static PythonInvoker& Instance(); + static void release(); void load(const std::string &module_name); bool on_publish(BroadcastMediaPublishArgs) const; bool on_play(BroadcastMediaPlayedArgs) const; bool on_flow_report(BroadcastFlowReportArgs) const; + bool on_media_changed(BroadcastMediaChangedArgs) const; + bool on_player_proxy_failed(BroadcastPlayerProxyFailedArgs) const; private: PythonInvoker(); @@ -46,6 +50,10 @@ private: py::object _on_flow_report; // 配置文件热更新回调 py::object _on_reload_config; + // 媒体注册注销 + py::object _on_media_changed; + // 拉流代理失败 + py::object _on_player_proxy_failed; }; } // namespace mediakit diff --git a/src/Common/config.cpp b/src/Common/config.cpp index 22e95822..a3714e7b 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -81,6 +81,7 @@ const string kBroadcastRtcSctpClosed = "kBroadcastRtcSctpClosed"; const string kBroadcastRtcSctpSend = "kBroadcastRtcSctpSend"; const string kBroadcastRtcSctpReceived = "kBroadcastRtcSctpReceived"; const string kBroadcastPlayerCountChanged = "kBroadcastPlayerCountChanged"; +const string kBroadcastPlayerProxyFailed = "kBroadcastPlayerProxyFailed"; } // namespace Broadcast diff --git a/src/Common/config.h b/src/Common/config.h index a26757cf..9f75cfe0 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -161,6 +161,9 @@ extern const std::string kBroadcastRtcSctpReceived; extern const std::string kBroadcastPlayerCountChanged; #define BroadcastPlayerCountChangedArgs const MediaTuple& args, const int& count +extern const std::string kBroadcastPlayerProxyFailed; +#define BroadcastPlayerProxyFailedArgs const PlayerProxy& sender, const toolkit::SockException &ex + #define ReloadConfigTag ((void *)(0xFF)) #define RELOAD_KEY(arg, key) \ do { \ diff --git a/src/Player/PlayerProxy.cpp b/src/Player/PlayerProxy.cpp index 58e4850c..0d465eb2 100644 --- a/src/Player/PlayerProxy.cpp +++ b/src/Player/PlayerProxy.cpp @@ -110,7 +110,9 @@ void PlayerProxy::play(const string &strUrlTmp) { if (!strongSelf) { return; } - + if (err) { + NOTICE_EMIT(BroadcastPlayerProxyFailedArgs, Broadcast::kBroadcastPlayerProxyFailed, *strongSelf, err); + } if (strongSelf->_on_play) { strongSelf->_on_play(err); strongSelf->_on_play = nullptr; @@ -146,6 +148,9 @@ void PlayerProxy::play(const string &strUrlTmp) { if (!strongSelf) { return; } + if (err) { + NOTICE_EMIT(BroadcastPlayerProxyFailedArgs, Broadcast::kBroadcastPlayerProxyFailed, *strongSelf, err); + } // 注销直接拉流代理产生的流:#532 [AUTO-TRANSLATED:c6343a3b] // Unregister the stream generated by the direct stream proxy: #532 diff --git a/src/Player/PlayerProxy.h b/src/Player/PlayerProxy.h index cc67ad94..5c8759f7 100644 --- a/src/Player/PlayerProxy.h +++ b/src/Player/PlayerProxy.h @@ -18,8 +18,7 @@ namespace mediakit { -struct StreamInfo -{ +struct StreamInfo { TrackType codec_type; std::string codec_name; int bitrate; @@ -30,8 +29,7 @@ struct StreamInfo int video_height; float video_fps; - StreamInfo() - { + StreamInfo() { codec_type = TrackInvalid; codec_name = "none"; bitrate = -1; @@ -44,14 +42,12 @@ struct StreamInfo } }; -struct TranslationInfo -{ +struct TranslationInfo { std::vector stream_info; int byte_speed; uint64_t start_time_stamp; - TranslationInfo() - { + TranslationInfo() { byte_speed = -1; start_time_stamp = 0; } diff --git a/src/Record/MP4Muxer.cpp b/src/Record/MP4Muxer.cpp index 8fd0567e..35dec7fd 100644 --- a/src/Record/MP4Muxer.cpp +++ b/src/Record/MP4Muxer.cpp @@ -19,7 +19,11 @@ using namespace toolkit; namespace mediakit { MP4Muxer::~MP4Muxer() { - closeMP4(); + try { + closeMP4(); + } catch (std::exception &e) { + WarnL << e.what(); + } } void MP4Muxer::openMP4(const string &file) {