diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 1916ed06..37962db3 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -245,6 +245,10 @@ extern std::vector getBlockTypeSize(); extern uint64_t getTotalMemBlockByType(int type); extern uint64_t getThisThreadMemBlockByType(int type) ; +namespace mediakit { +extern ThreadPool &getMP4Thread(); +extern ThreadPool &getHlsThread(); +} static void *web_api_tag = nullptr; static inline void addHttpListener(){ @@ -334,7 +338,7 @@ public: return _map.erase(key); } - size_t size() { + size_t size() { std::lock_guard lck(_mtx); return _map.size(); } @@ -587,7 +591,7 @@ void getStatisticJson(const function &cb) { val["totalMemBlockTypeCount"] = str; } - auto thread_size = EventPollerPool::Instance().getExecutorSize() + WorkThreadPool::Instance().getExecutorSize(); + auto thread_size = 2 + EventPollerPool::Instance().getExecutorSize() + WorkThreadPool::Instance().getExecutorSize(); std::shared_ptr > thread_mem_info = std::make_shared >(thread_size); shared_ptr finished(nullptr, [thread_mem_info, cb, obj](void *) { @@ -626,6 +630,8 @@ void getStatisticJson(const function &cb) { }; EventPollerPool::Instance().for_each(lam1); WorkThreadPool::Instance().for_each(lam1); + lam0(getMP4Thread()); + lam0(getHlsThread()); #else cb(*obj); #endif @@ -749,7 +755,7 @@ void addStreamPusherProxy(const string &schema, * Install api interface * All apis support GET and POST methods * POST method parameters support application/json and application/x-www-form-urlencoded methods - + * [AUTO-TRANSLATED:62e68c43] */ void installWebApi() { diff --git a/src/Record/HlsRecorder.cpp b/src/Record/HlsRecorder.cpp new file mode 100644 index 00000000..8aab997f --- /dev/null +++ b/src/Record/HlsRecorder.cpp @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "HlsRecorder.h" +using namespace toolkit; + +namespace mediakit { + +ThreadPool &getHlsThread() { + static ThreadPool ret(1, ThreadPool::PRIORITY_LOWEST, true); + static onceToken s_token([]() { + ret.async([]() { + setThreadName("hls thread"); + }); + }); + return ret; +} + +}//namespace mediakit diff --git a/src/Record/HlsRecorder.h b/src/Record/HlsRecorder.h index 1810aec5..9ae5c333 100644 --- a/src/Record/HlsRecorder.h +++ b/src/Record/HlsRecorder.h @@ -15,9 +15,12 @@ #include "MPEG.h" #include "MP4Muxer.h" #include "Common/config.h" +#include "Thread/ThreadPool.h" namespace mediakit { +toolkit::ThreadPool& getHlsThread(); + template class HlsRecorderBase : public MediaSourceEventInterceptor, public Muxer, public std::enable_shared_from_this > { public: @@ -58,6 +61,15 @@ public: } bool inputFrame(const Frame::Ptr &frame) override { + auto ptr = this->shared_from_this(); + auto cached_frame = Frame::getCacheAbleFrame(frame); + getHlsThread().async([ptr, cached_frame]() { + ptr->inputFrame_l(cached_frame); + }); + return true; + } + + bool inputFrame_l(const Frame::Ptr &frame) { if (_clear_cache && _option.hls_demand) { _clear_cache = false; // 清空旧的m3u8索引文件于ts切片 [AUTO-TRANSLATED:a4ce0664] diff --git a/src/Record/MP4Recorder.cpp b/src/Record/MP4Recorder.cpp index 7576ba6a..76a17cb5 100644 --- a/src/Record/MP4Recorder.cpp +++ b/src/Record/MP4Recorder.cpp @@ -22,6 +22,16 @@ using namespace toolkit; namespace mediakit { +ThreadPool &getMP4Thread() { + static ThreadPool ret(1, ThreadPool::PRIORITY_LOWEST, true); + static onceToken s_token([]() { + ret.async([]() { + setThreadName("mp4 thread"); + }); + }); + return ret; +} + MP4Recorder::MP4Recorder(const MediaTuple &tuple, const string &path, size_t max_second) { _folder_path = path; // ///record 业务逻辑////// [AUTO-TRANSLATED:2e78931a] @@ -78,7 +88,7 @@ void MP4Recorder::asyncClose() { auto full_path = _full_path; auto info = _info; TraceL << "Start close tmp mp4 file: " << full_path_tmp; - WorkThreadPool::Instance().getExecutor()->async([muxer, full_path_tmp, full_path, info]() mutable { + getMP4Thread().async([muxer, full_path_tmp, full_path, info]() mutable { info.time_len = muxer->getDuration() / 1000.0f; // 关闭mp4可能非常耗时,所以要放在后台线程执行 [AUTO-TRANSLATED:a7378a11] // Closing mp4 can be very time-consuming, so it should be executed in the background thread @@ -131,7 +141,7 @@ bool MP4Recorder::inputFrame(const Frame::Ptr &frame) { // In the case of b-frames, the dts timestamp may regress _last_dts = MIN(frame->dts(), _last_dts); } - + auto duration = 5u; // 默认至少一帧5ms if (frame->dts() > 0 && frame->dts() > _last_dts) { duration = MAX(duration, frame->dts() - _last_dts); @@ -153,7 +163,12 @@ bool MP4Recorder::inputFrame(const Frame::Ptr &frame) { if (_muxer) { // 生成mp4文件 [AUTO-TRANSLATED:76a8d77c] // Generate mp4 file - return _muxer->inputFrame(frame); + auto muxer = _muxer; + auto cached_frame = Frame::getCacheAbleFrame(frame); + getMP4Thread().async([muxer, cached_frame]() { + return muxer->inputFrame(cached_frame); + }); + return true; } return false; }