Compare commits

...

8 Commits

Author SHA1 Message Date
xia-chu
f0bc7a75aa 确保断连续推功能开启后流能正常强制关闭 (#4287)
Some checks are pending
Android / build (push) Waiting to run
CodeQL / Analyze (cpp) (push) Waiting to run
CodeQL / Analyze (javascript) (push) Waiting to run
Docker / build (push) Waiting to run
Linux / build (push) Waiting to run
macOS / build (push) Waiting to run
Windows / build (push) Waiting to run
2025-06-29 18:10:00 +08:00
xia-chu
a35444f27e 优化SEI/AUD帧处理逻辑 2025-06-29 18:00:47 +08:00
xia-chu
9a7cca1ad9 paced_sender逻辑新增音视频包排序功能
用于矫正音视频包时间戳交织性
2025-06-29 18:00:04 +08:00
xia-chu
0b21ece801 优化rtp异常处理逻辑 2025-06-29 17:56:48 +08:00
xia-chu
c98137e505 优化线程安全问题 2025-06-29 17:56:10 +08:00
xia-chu
18ca4cf48f 优化音视频时间戳同步逻辑 2025-06-29 17:55:17 +08:00
xia-chu
38821b295a 优化http参数获取逻辑 2025-06-29 17:48:23 +08:00
xia-chu
86e4a494f2 Update README.md 2025-06-29 15:24:23 +08:00
13 changed files with 136 additions and 58 deletions

View File

@ -158,7 +158,7 @@
- 6、支持负载过高时转码主动降低帧率且不花屏。
- 7、支持滤镜支持添加osd文本以及logo角标等能力。
- 8、支持全GPU硬件编解码与滤镜防止显存与内存频繁拷贝。
- 9、支持视频全GPU(cuda)推理插件支持人员、车辆等目标AI识别。
- 9、支持视频全GPU(cuda)推理插件支持人员、车辆等目标AI识别支持目标跟踪支持多边形布防支持ocr支持c++/python插件快速混合开发
- JT1078部标版本
- 1、支持接收jt1078推流转其他协议自适应音视频共享seq和单独seq模式。
@ -173,6 +173,11 @@
- VP9/AV1版本
- 全面新增支持av1/vp9编码rtmp/rtsp/ts/ps/hls/mp4/fmp4等协议全面支持av1/vp9。
- 其他
- 支持s3/minio云存储内存流直接写入解决录像文件io系统瓶颈问题。
- 支持onvif设备扫描与添加拉流。
- 支持GA1400视图api。
## 编译以及测试
**编译前务必仔细参考wiki:[快速开始](https://github.com/ZLMediaKit/ZLMediaKit/wiki/%E5%BF%AB%E9%80%9F%E5%BC%80%E5%A7%8B)操作!!!**

View File

@ -153,7 +153,11 @@ bool H264Track::ready() const {
bool H264Track::inputFrame(const Frame::Ptr &frame) {
using H264FrameInternal = FrameInternal<H264FrameNoCacheAble>;
int type = H264_TYPE(frame->data()[frame->prefixSize()]);
if (type == H264Frame::NAL_AUD) {
// AUD帧丢弃
return false;
}
if ((type == H264Frame::NAL_B_P || type == H264Frame::NAL_IDR) && ready()) {
return inputFrame_l(frame);
}

View File

@ -78,42 +78,45 @@ public:
using ApiArgsType = std::map<std::string, std::string, mediakit::StrCaseCompare>;
template<typename Args, typename First>
std::string getValue(Args &args, const First &first) {
return args[first];
template<typename Args, typename Key>
std::string getValue(Args &args, const Key &key) {
auto it = args.find(key);
if (it == args.end()) {
return "";
}
return it->second;
}
template<typename First>
std::string getValue(Json::Value &args, const First &first) {
return args[first].asString();
template<typename Key>
std::string getValue(Json::Value &args, const Key &key) {
auto it = args.find(key);
if (it == args.end()) {
return "";
}
return it->second.asString();
}
template<typename First>
std::string getValue(std::string &args, const First &first) {
template<typename Key>
std::string getValue(std::string &args, const Key &key) {
return "";
}
template<typename First>
std::string getValue(const mediakit::Parser &parser, const First &first) {
auto ret = parser.getUrlArgs()[first];
template <typename Key>
std::string getValue(const mediakit::Parser &parser, const Key &key) {
auto ret = getValue(parser.getUrlArgs(), key);
if (!ret.empty()) {
return ret;
}
return parser.getHeader()[first];
return getValue(parser.getHeader(), key);
}
template<typename First>
std::string getValue(mediakit::Parser &parser, const First &first) {
return getValue((const mediakit::Parser &) parser, first);
}
template<typename Args, typename First>
std::string getValue(const mediakit::Parser &parser, Args &args, const First &first) {
auto ret = getValue(args, first);
template<typename Args, typename Key>
std::string getValue(const mediakit::Parser &parser, Args &args, const Key &key) {
auto ret = getValue(args, key);
if (!ret.empty()) {
return ret;
}
return getValue(parser, first);
return getValue(parser, key);
}
template<typename Args>
@ -177,14 +180,14 @@ void api_regist(const std::string &api_path, const std::function<void(API_ARGS_S
// Register http request parameters as http original request information asynchronous reply http api
void api_regist(const std::string &api_path, const std::function<void(API_ARGS_STRING_ASYNC)> &func);
template<typename Args, typename First>
bool checkArgs(Args &args, const First &first) {
return !args[first].empty();
template<typename Args, typename Key>
bool checkArgs(Args &args, const Key &key) {
return !args[key].empty();
}
template<typename Args, typename First, typename ...KeyTypes>
bool checkArgs(Args &args, const First &first, const KeyTypes &...keys) {
return checkArgs(args, first) && checkArgs(args, keys...);
template<typename Args, typename Key, typename ...KeyTypes>
bool checkArgs(Args &args, const Key &key, const KeyTypes &...keys) {
return checkArgs(args, key) && checkArgs(args, keys...);
}
// 检查http url中或body中或http header参数是否为空的宏 [AUTO-TRANSLATED:9de001a4]

View File

@ -62,32 +62,38 @@ public:
setCurrentStamp(frame->dts());
resetTimer(EventPoller::getCurrentPoller());
}
_cache.emplace_back(frame->dts() + _cache_ms, Frame::getCacheAbleFrame(frame));
auto &last_dts = _last_dts[frame->getTrackType()];
if (last_dts > frame->dts()) {
// 时间戳回退了,点播流?
WarnL << "Dts decrease: " << last_dts << "->" << frame->dts() << ", flush all paced sender cache: " << _cache.size();
flushCache(frame->dts());
}
_cache.emplace(frame->dts(), Frame::getCacheAbleFrame(frame));
last_dts = frame->dts();
return true;
}
private:
void onTick() {
std::lock_guard<std::recursive_mutex> lck(_mtx);
auto dst = _cache.empty() ? 0 : _cache.back().first;
auto max_dts = _cache.empty() ? 0 : _cache.rbegin()->first;
while (!_cache.empty()) {
auto &front = _cache.front();
if (getCurrentStamp() < front.first) {
auto front = _cache.begin();
if (getCurrentStamp() < front->first + _cache_ms) {
// 还没到消费时间 [AUTO-TRANSLATED:09fb4c3d]
// Not yet time to consume
break;
}
// 时间到了该消费frame了 [AUTO-TRANSLATED:2f007931]
// Time is up, it's time to consume the frame
_cb(front.second);
_cache.pop_front();
_cb(front->second);
_cache.erase(front);
}
if (_cache.empty() && dst) {
if (_cache.empty() && max_dts) {
// 消费太快,需要增加缓存大小 [AUTO-TRANSLATED:c05bfbcd]
// Consumption is too fast, need to increase cache size
setCurrentStamp(dst);
setCurrentStamp(max_dts);
_cache_ms += kMinCacheMS;
}
@ -95,15 +101,20 @@ private:
// Consumption is too slow, need to force flush data
if (_cache.size() > 25 * 5) {
WarnL << "Flush frame paced sender cache: " << _cache.size();
while (!_cache.empty()) {
auto &front = _cache.front();
_cb(front.second);
_cache.pop_front();
}
setCurrentStamp(dst);
flushCache(max_dts);
}
}
void flushCache(uint64_t dts) {
while (!_cache.empty()) {
auto front = _cache.begin();
_cb(front->second);
_cache.erase(front);
}
setCurrentStamp(dts);
_cache_ms = kMinCacheMS;
}
uint64_t getCurrentStamp() { return _ticker.elapsedTime() + _stamp_offset; }
void setCurrentStamp(uint64_t stamp) {
@ -115,11 +126,12 @@ private:
uint32_t _paced_sender_ms;
uint32_t _cache_ms = kMinCacheMS;
uint64_t _stamp_offset = 0;
uint64_t _last_dts[2] = {0, 0};
OnFrame _cb;
Ticker _ticker;
Timer::Ptr _timer;
std::recursive_mutex _mtx;
std::list<std::pair<uint64_t, Frame::Ptr>> _cache;
std::multimap<uint64_t, Frame::Ptr> _cache;
};
std::shared_ptr<MediaSinkInterface> MultiMediaSourceMuxer::makeRecorder(MediaSource &sender, Recorder::type type) {
@ -501,6 +513,18 @@ EventPoller::Ptr MultiMediaSourceMuxer::getOwnerPoller(MediaSource &sender) {
}
}
bool MultiMediaSourceMuxer::close(MediaSource &sender) {
_rtmp = nullptr;
_rtsp = nullptr;
_fmp4 = nullptr;
_ts = nullptr;
_mp4 = nullptr;
_hls = nullptr;
_hls_fmp4 = nullptr;
_rtp_sender.clear();
return true;
}
std::shared_ptr<MultiMediaSourceMuxer> MultiMediaSourceMuxer::getMuxer(MediaSource &sender) const {
return const_cast<MultiMediaSourceMuxer*>(this)->shared_from_this();
}

View File

@ -181,6 +181,12 @@ public:
* [AUTO-TRANSLATED:a4dc847e]
*/
toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override;
/**
*
* @return
*/
bool close(MediaSource &sender) override;
/**
*

View File

@ -87,13 +87,13 @@ void Stamp::setPlayBack(bool playback) {
_playback = playback;
}
void Stamp::syncTo(Stamp &other) {
_need_sync = true;
void Stamp::syncTo(Stamp &other, int count) {
_need_sync += count;
_sync_master = &other;
}
void Stamp::needSync() {
_need_sync = true;
++_need_sync;
}
void Stamp::enableRollback(bool flag) {
@ -145,25 +145,42 @@ void Stamp::revise_l(int64_t dts, int64_t pts, int64_t &dts_out, int64_t &pts_ou
// 音视频dts当前时间差 [AUTO-TRANSLATED:716468a6]
// Audio and video dts current time difference
int64_t dts_diff = _last_dts_in - _sync_master->_last_dts_in;
if (ABS(dts_diff) < 5000) {
if (ABS(dts_diff) < 5000 || _need_sync > 3) {
// 两种时间戳相差不得大于300ms
dts_diff = _relative_stamp - _sync_master->_relative_stamp;
if (dts_diff > 300) {
dts_diff = 300;
} else if (dts_diff < -300) {
dts_diff = -300;
}
// 如果绝对时间戳小于5秒那么说明他们的起始时间戳是一致的那么强制同步 [AUTO-TRANSLATED:5d11ef6a]
// If the absolute timestamp is less than 5 seconds, then it means that their starting timestamps are consistent, then force synchronization
auto target_stamp = _sync_master->_relative_stamp + dts_diff;
if (target_stamp > _relative_stamp || _enable_rollback) {
// 强制同步后,时间戳增加跳跃了,或允许回退 [AUTO-TRANSLATED:805424a9]
// After forced synchronization, the timestamp increases jump, or allows rollback
if (_relative_stamp == target_stamp) {
return;
}
TraceL << "Relative stamp changed: " << _relative_stamp << " -> " << target_stamp;
_relative_stamp = target_stamp;
} else {
// 不允许回退, 则让另外一个Track的时间戳增长 [AUTO-TRANSLATED:428e8ce2]
// Not allowed to rollback, then let the timestamp of the other Track increase
target_stamp = _relative_stamp - dts_diff;
if (_sync_master->_relative_stamp == target_stamp) {
return;
}
TraceL << "Relative stamp changed: " << _sync_master->_relative_stamp << " -> " << target_stamp;
_sync_master->_relative_stamp = target_stamp;
}
}
_need_sync = false;
_sync_master->_need_sync = false;
if (_need_sync) {
--_need_sync;
}
if (_sync_master->_need_sync) {
--_sync_master->_need_sync;
}
}
}

View File

@ -117,7 +117,7 @@ public:
* [AUTO-TRANSLATED:7ac41a76]
*/
void syncTo(Stamp &other);
void syncTo(Stamp &other, int count = 1);
/**
* 退
@ -145,7 +145,7 @@ private:
private:
bool _playback = false;
bool _need_sync = false;
int _need_sync = 0;
// 默认不允许时间戳回滚 [AUTO-TRANSLATED:0163ff03]
// Default does not allow timestamp rollback
bool _enable_rollback = false;

View File

@ -205,7 +205,11 @@ bool FrameMerger::willFlush(const Frame::Ptr &frame) const{
case mp4_nal_size:
case h264_prefix: {
if (!_have_decode_able_frame) {
if (frame->dropAble() && !_have_config_frame) {
// 遇到SEI帧且未缓存配置帧flush之前的帧
return true;
}
if (!_have_decode_able_frame && !_have_drop_able_frame) {
// 缓存中没有有效的能解码的帧所以这次不flush [AUTO-TRANSLATED:5d860722]
// There are no valid frames that can be decoded in the cache, so no flush this time.
return _frame_cache.size() > kMaxFrameCacheSize;
@ -290,6 +294,8 @@ bool FrameMerger::inputFrame(const Frame::Ptr &frame, onOutput cb, BufferLikeStr
cb(back->dts(), back->pts(), merged_frame, have_key_frame);
_frame_cache.clear();
_have_decode_able_frame = false;
_have_drop_able_frame = false;
_have_config_frame = false;
}
if (!frame) {
@ -299,6 +305,12 @@ bool FrameMerger::inputFrame(const Frame::Ptr &frame, onOutput cb, BufferLikeStr
if (frame->decodeAble()) {
_have_decode_able_frame = true;
}
if (frame->dropAble()) {
_have_drop_able_frame = true;
}
if (frame->configFrame()) {
_have_config_frame = true;
}
_cb = std::move(cb);
_frame_cache.emplace_back(Frame::getCacheAbleFrame(frame));
return true;

View File

@ -660,6 +660,8 @@ private:
private:
int _type;
bool _have_decode_able_frame = false;
bool _have_drop_able_frame = false;
bool _have_config_frame = false;
onOutput _cb;
toolkit::List<Frame::Ptr> _frame_cache;
};

View File

@ -151,7 +151,7 @@ void DecoderImp::onTrack(int index, const Track::Ptr &track) {
track->setIndex(index);
auto &ref = _tracks[index];
if (ref.first) {
WarnL << "Already existed a same track: " << index << ", codec: " << track->getCodecName();
// WarnL << "Already existed a same track: " << index << ", codec: " << track->getCodecName();
return;
}
ref.first = track;

View File

@ -56,7 +56,7 @@ ssize_t PSDecoder::input(const uint8_t *data, size_t bytes) {
const char *PSDecoder::onSearchPacketTail(const char *data, size_t len) {
try {
auto ret = ps_demuxer_input(static_cast<struct ps_demuxer_t *>(_ps_demuxer), reinterpret_cast<const uint8_t *>(data), len);
if (ret >= 0) {
if (ret >= 0 && ret <= (ssize_t)len) {
// 解析成功全部或部分 [AUTO-TRANSLATED:a8085d34]
// Parse successful, all or part
return data + ret;

View File

@ -62,7 +62,12 @@ public:
RtpProcess::Ptr getProcess() const { return _process; }
void onRecvRtp(const Socket::Ptr &sock, const Buffer::Ptr &buf, struct sockaddr *addr) {
_process->inputRtp(true, sock, buf->data(), buf->size(), addr);
try {
_process->inputRtp(true, sock, buf->data(), buf->size(), addr);
} catch (std::exception &ex) {
_process->onDetach(SockException(Err_shutdown, ex.what()));
return;
}
// 统计rtp接受情况用于发送rr包 [AUTO-TRANSLATED:bd2fbe7e]
// Count RTP reception status, used to send RR packets
auto header = (RtpHeader *)buf->data();

View File

@ -442,7 +442,7 @@ namespace RTC
}
MS_DUMP(" size: %zu bytes", this->size);
static char transactionId[25];
thread_local static char transactionId[25];
for (int i{ 0 }; i < 12; ++i)
{