Compare commits

...

23 Commits

Author SHA1 Message Date
ziyue
df41d4d410 hls/mp4录制放置在后台线程
Some checks failed
Android / build (push) Has been cancelled
CodeQL / Analyze (cpp) (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
Docker / build (push) Has been cancelled
Linux / build (push) Has been cancelled
macOS / build (push) Has been cancelled
Windows / build (push) Has been cancelled
2025-05-31 11:29:24 +08:00
PioLing
2bbd177d5a
Add win32 core dump (#4291)
Some checks failed
Android / build (push) Has been cancelled
CodeQL / Analyze (cpp) (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
Docker / build (push) Has been cancelled
Linux / build (push) Has been cancelled
macOS / build (push) Has been cancelled
Windows / build (push) Has been cancelled
单元测试代码
int main() {
    System::systemSetup();
    int *p = nullptr;
    *p = 1; // 强制崩溃
    return 0;
}

崩溃后直接用 vs2019(2022) 打开 dmp 文件。
菜单选择 File → Open → File...,选择你的 .dmp 文件。
提示你“选择符号和源码”,若有源码和 pdb/symbol 文件能看到更详细的堆栈。
2025-05-31 11:26:00 +08:00
xiongguangjie
a050f38cc9
提升浏览器webrtc g711音频播放效果 (#4280 #4282)
For chrome (136.0.7103.93)
2025-05-11 11:18:34 +08:00
xia-chu
7a7f618a73 修复srtp编译失败导致webrtc特性打开失败问题 2025-05-09 20:25:42 +08:00
PioLing
7b1f8fedac
Add network traffic statistics (#4239)
Some checks failed
Android / build (push) Has been cancelled
CodeQL / Analyze (cpp) (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
Docker / build (push) Has been cancelled
Linux / build (push) Has been cancelled
macOS / build (push) Has been cancelled
Windows / build (push) Has been cancelled
Co-authored-by: xiongguangjie <xiong_panda@163.com>
Co-authored-by: xia-chu <771730766@qq.com>
2025-05-02 16:23:25 +08:00
PioLing
ab14adb94d
Add loadMP4File respond "durations" (#4262 #4260)
Some checks failed
Android / build (push) Has been cancelled
CodeQL / Analyze (cpp) (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
Docker / build (push) Has been cancelled
Linux / build (push) Has been cancelled
macOS / build (push) Has been cancelled
Windows / build (push) Has been cancelled
2025-04-26 19:58:48 +08:00
xia-chu
444aeceacc rtp推流时打印ssrc
Some checks failed
Android / build (push) Has been cancelled
CodeQL / Analyze (cpp) (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
Docker / build (push) Has been cancelled
Linux / build (push) Has been cancelled
macOS / build (push) Has been cancelled
Windows / build (push) Has been cancelled
2025-04-20 14:54:20 +08:00
xia-chu
dd1e8ef430 web hook 忽略返回值为null的值 2025-04-20 14:50:56 +08:00
xiongguangjie
97b81ea179
update ci workflow (#4257)
Some checks failed
Android / build (push) Has been cancelled
CodeQL / Analyze (cpp) (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
Docker / build (push) Has been cancelled
Linux / build (push) Has been cancelled
macOS / build (push) Has been cancelled
Windows / build (push) Has been cancelled
action image update for
https://github.com/actions/runner-images/issues/11101
2025-04-18 09:54:31 +08:00
PioLing
beff8c0527
修复编译失败的问题 (#4249) #4252
Some checks failed
Android / build (push) Has been cancelled
CodeQL / Analyze (cpp) (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
Docker / build (push) Has been cancelled
Linux / build (push) Has been cancelled
macOS / build (push) Has been cancelled
Windows / build (push) Has been cancelled
2025-04-15 09:48:00 +08:00
xia-chu
41bdfa0755 addStreamProxy、addStreamPusherProxy接口确保线程负载均衡
Some checks failed
Android / build (push) Has been cancelled
CodeQL / Analyze (cpp) (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
Docker / build (push) Has been cancelled
Linux / build (push) Has been cancelled
macOS / build (push) Has been cancelled
Windows / build (push) Has been cancelled
2025-04-13 19:42:30 +08:00
xia-chu
21610f5ddf 修复bom头导致的编译问题 2025-04-13 19:42:30 +08:00
Wayne Chen
dfca520857
修复http代理的两个问题,原http代理功能提交(#2988):d593267 (#4219)
Some checks failed
Android / build (push) Has been cancelled
CodeQL / Analyze (cpp) (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
Docker / build (push) Has been cancelled
Linux / build (push) Has been cancelled
macOS / build (push) Has been cancelled
Windows / build (push) Has been cancelled
1. CONNECT请求添加Host字段, 解决400 bad request问题;
    2. HLS拉取第2个ts分片时,重新设置http代理,解决第2个分片及后续分片未走代理的问题。
2025-04-10 15:09:46 +08:00
guohuachan
8ff1459789
doc: update readme.md (#4238)
zlmediakit exporter 已经完善,整体完成度较高。
并且 grafana_demo 服务器更换了更大带宽的服务器,访问更稳定。

希望能加入合作项目,感谢感谢☺️
2025-04-10 15:07:09 +08:00
ibranch7
56fe66da7c
修复http文件下载指定目录时mmap导致崩溃的bug (#4213)
Some checks are pending
Android / build (push) Waiting to run
CodeQL / Analyze (cpp) (push) Waiting to run
CodeQL / Analyze (javascript) (push) Waiting to run
Docker / build (push) Waiting to run
Linux / build (push) Waiting to run
macOS / build (push) Waiting to run
Windows / build (push) Waiting to run
在`HttpBody.cpp`中,`getSharedMmap`函数直接尝试对所有路径进行mmap操作,没有进行文件类型检查
2025-04-09 21:35:28 +08:00
xiongguangjie
026e74d624
修复addFFmpegSource接口线程检测失败相关bug (#4225 #4233) 2025-04-09 21:33:12 +08:00
xiongguangjie
fd4f00dd63
修复非法rtp数据导致空指针崩溃问题 (#4226)
Some checks failed
Android / build (push) Has been cancelled
CodeQL / Analyze (cpp) (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
Docker / build (push) Has been cancelled
Linux / build (push) Has been cancelled
macOS / build (push) Has been cancelled
Windows / build (push) Has been cancelled
2025-04-07 20:50:03 +08:00
PioLing
3351aedeee
优化mp4与flv录制相关代码 (#4206)
Some checks failed
Android / build (push) Has been cancelled
CodeQL / Analyze (cpp) (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
Docker / build (push) Has been cancelled
Linux / build (push) Has been cancelled
macOS / build (push) Has been cancelled
Windows / build (push) Has been cancelled
2025-03-29 20:39:14 +08:00
custompal
f0204ea3b3
播放器(reader)setGetInfoCB时统一使用Session对象 (#4195)
Some checks failed
Android / build (push) Has been cancelled
CodeQL / Analyze (cpp) (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
Docker / build (push) Has been cancelled
Linux / build (push) Has been cancelled
macOS / build (push) Has been cancelled
Windows / build (push) Has been cancelled
Co-authored-by: custompal <chenrengen@gosuncn.com>
2025-03-21 15:22:48 +08:00
KkemChen
fd89e0d801
feat: VideoStack支持等比缩放 (#4191)
Some checks failed
Android / build (push) Has been cancelled
CodeQL / Analyze (cpp) (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
Docker / build (push) Has been cancelled
Linux / build (push) Has been cancelled
macOS / build (push) Has been cancelled
Windows / build (push) Has been cancelled
feat: VideoStack支持等比缩放 (目前以宏VIDEOSTACK_KEEP_ASPECT_RATIO
方式控制开启,后续考虑是否添加到配置文件中来控制)
2025-03-19 09:42:57 +08:00
PioLing
d078446a91
兼容某些不规范的rtsp流 (#4188)
Some checks failed
Android / build (push) Has been cancelled
CodeQL / Analyze (cpp) (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
Docker / build (push) Has been cancelled
Linux / build (push) Has been cancelled
macOS / build (push) Has been cancelled
Windows / build (push) Has been cancelled
2025-03-13 16:22:35 +08:00
夏楚
83afaff142
Update README.md
Some checks failed
Android / build (push) Has been cancelled
CodeQL / Analyze (cpp) (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
Docker / build (push) Has been cancelled
Linux / build (push) Has been cancelled
macOS / build (push) Has been cancelled
Windows / build (push) Has been cancelled
2025-03-09 12:11:58 +08:00
xia-chu
11d1b849ec startSendRtp、startRecord接口支持缓存多个gop (#4180)
Some checks failed
Android / build (push) Has been cancelled
CodeQL / Analyze (cpp) (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
Docker / build (push) Has been cancelled
Linux / build (push) Has been cancelled
macOS / build (push) Has been cancelled
Windows / build (push) Has been cancelled
2025-03-07 23:40:50 +08:00
77 changed files with 798 additions and 201 deletions

View File

@ -2,7 +2,7 @@ name: Android
on: [push, pull_request]
jobs:
build:
runs-on: ubuntu-20.04
runs-on: ubuntu-24.04
steps:
- name: 下载源码

View File

@ -5,7 +5,7 @@ on: [push, pull_request]
jobs:
analyze:
name: Analyze
runs-on: ubuntu-20.04
runs-on: ubuntu-24.04
permissions:
actions: read
contents: read
@ -43,7 +43,7 @@ jobs:
with:
repository: cisco/libsrtp
fetch-depth: 1
ref: v2.3.0
ref: v2.7.0
path: 3rdpart/libsrtp
- name: 编译 SRTP

View File

@ -15,7 +15,7 @@ env:
jobs:
build:
runs-on: ubuntu-20.04
runs-on: ubuntu-24.04
permissions:
contents: read
packages: write

View File

@ -6,7 +6,7 @@ on:
jobs:
issue_lint:
runs-on: ubuntu-22.04
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v3

View File

@ -5,7 +5,7 @@ on: [push, pull_request]
jobs:
build:
runs-on: ubuntu-20.04
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v1

View File

@ -4,7 +4,7 @@ on: [pull_request]
jobs:
check:
runs-on: ubuntu-20.04
runs-on: ubuntu-24.04
steps:
- uses: actions/checkout@v2
with:

@ -1 +1 @@
Subproject commit 0eb87b2eba56e783be702778337e54f74e59789e
Subproject commit 8f25d13f49e016858fae88f1045786ce26611873

View File

@ -47,7 +47,7 @@
## 功能清单
### 功能一览
<img width="800" alt="功能一览" src="https://github.com/ZLMediaKit/ZLMediaKit/assets/11495632/481ea769-5b27-495e-bf7d-31191e6af9d2">
<img width="749" alt="image" src="https://github.com/user-attachments/assets/8cf5911b-4603-4aa0-8e24-0acb0c616a82" />
- RTSP[S]
- RTSP[S] 服务器支持RTMP/MP4/HLS转RTSP[S],支持亚马逊echo show这样的设备
@ -146,7 +146,7 @@
- 支持按需解复用、转协议当有人观看时才开启转协议降低cpu占用率
- 支持溯源模式的集群部署溯源方式支持rtsp/rtmp/hls/http-ts, 边沿站支持hls, 源站支持多个(采用round robin方式溯源)
- rtsp/rtmp/webrtc推流异常断开后可以在超时时间内重连推流播放器无感知
## 闭源专业版
在最新开源代码的基础,新增以下闭源专业版,详询邮箱:<1213642868@qq.com>
- 转码版本
@ -228,6 +228,9 @@ bash build_docker_images.sh
- [java sdk](https://github.com/lidaofu-hub/j_zlm_sdk) 本项目c sdk完整java包装库
- [c# sdk](https://github.com/malegend/ZLMediaKit.Autogen) 本项目c sdk完整c#包装库
- [metaRTC](https://github.com/metartc/metaRTC) 全国产纯c webrtc sdk
- 监控与运维
- [ZLMediaKit_exporter](https://github.com/guohuachan/ZLMediaKit_exporter) 一个用于采集 ZLMediaKit 核心指标的 Prometheus Exporter搭配 Grafana 即可快速构建实时监控面板
- 其他项目(已停止更新)
- [NodeJS实现的GB28181平台](https://gitee.com/hfwudao/GB28181_Node_Http)

View File

@ -404,6 +404,8 @@ bash build_docker_images.sh
- [GB28181 player implemented in C++](https://github.com/any12345com/BXC_gb28181Player)
- [Android RTCPlayer](https://github.com/leo94666/RTCPlayer)
- Monitor
- [Prometheus Exporter for ZLMediaKit](https://github.com/guohuachan/ZLMediaKit_exporter)
## License

View File

@ -1,4 +1,4 @@
#include <atomic>
#include <atomic>
static int test()
{

View File

@ -332,8 +332,8 @@ h265_pt=99
ps_pt=96
#rtp opus 负载的pt
opus_pt=100
#RtpSender相关功能是否提前开启gop缓存优化级联秒开体验默认开启
#如果不调用startSendRtp相关接口可以置0节省内存
#startSendRtp、startRecord相关功能是否提前开启gop缓存优化级联秒开体验默认开启, 并缓存1个GOP
#如果不调用startSendRtp、startRecord后相关接口可以置0节省内存如果缓存多个gop可以加大该参数
gop_cache=1
#国标发送g711 rtp 打包时每个包的语音时长是多少默认是100 ms范围为20~180ms (gb28181-2016c.2.4规定)

View File

@ -38,7 +38,8 @@ bool G711RtpEncoder::inputFrame(const Frame::Ptr &frame) {
_buffer.append(ptr, size);
while (_buffer.size() >= _pkt_bytes) {
RtpCodec::inputRtp(getRtpInfo().makeRtp(TrackAudio, _buffer.data(), _pkt_bytes, false, in_pts), false);
auto tmp = (in_pts+_pkt_dur_ms-1)/_pkt_dur_ms*_pkt_dur_ms;
RtpCodec::inputRtp(getRtpInfo().makeRtp(TrackAudio, _buffer.data(), _pkt_bytes, false, tmp), false);
in_pts += _pkt_dur_ms;
_buffer.erase(0, _pkt_bytes);
}

View File

@ -1,5 +1,5 @@
#ifdef APSTUDIO_INVOKED
#error This file is not editable by Visual C++.
#ifdef APSTUDIO_INVOKED
#error "This file is not editable by Visual C++."
#endif //APSTUDIO_INVOKED
#include "winres.h"

View File

@ -341,10 +341,14 @@ void FFmpegSource::onGetMediaSource(const MediaSource::Ptr &src) {
setDelegate(listener);
muxer->setDelegate(shared_from_this());
if (_enable_hls) {
src->setupRecord(Recorder::type_hls, true, "", 0);
src->getOwnerPoller()->async([=]() mutable {
src->setupRecord(Recorder::type_hls, true, "", 0);
});
}
if (_enable_mp4) {
src->setupRecord(Recorder::type_mp4, true, "", 0);
src->getOwnerPoller()->async([=]() mutable {
src->setupRecord(Recorder::type_mp4, true, "", 0);
});
}
}
}

View File

@ -15,6 +15,12 @@
#if !defined(ANDROID)
#include <execinfo.h>
#endif//!defined(ANDROID)
#else
#include <fcntl.h>
#include <io.h>
#include <Windows.h>
#include <DbgHelp.h>
#pragma comment(lib, "DbgHelp.lib")
#endif//!defined(_WIN32)
#include <cstdlib>
@ -213,6 +219,48 @@ void System::systemSetup(){
// Ignore the hang up signal
signal(SIGHUP, SIG_IGN);
#endif// ANDROID
#else
// 避免系统弹窗导致程序阻塞,适合无界面或后台服务场景。
SetErrorMode(SEM_FAILCRITICALERRORS | SEM_NOGPFAULTERRORBOX | SEM_NOOPENFILEERRORBOX);
#if !defined(__MINGW32__)
// 将assert和error时错误输出
_CrtSetReportMode(_CRT_ASSERT, _CRTDBG_MODE_DEBUG);
_CrtSetReportMode(_CRT_ERROR, _CRTDBG_MODE_DEBUG);
#endif
_setmode(0, _O_BINARY);
_setmode(1, _O_BINARY);
_setmode(2, _O_BINARY);
setvbuf(stdout, NULL, _IONBF, 0);
setvbuf(stderr, NULL, _IONBF, 0);
std::ios_base::sync_with_stdio(false);
// 注册crash自动生成dump等价core dump
SetUnhandledExceptionFilter([](EXCEPTION_POINTERS *pException) -> LONG {
// 生成 dump 文件名,带时间戳
char dumpPath[MAX_PATH];
std::time_t t = std::time(nullptr);
std::tm tm;
#ifdef _MSC_VER
localtime_s(&tm, &t);
#else
tm = *std::localtime(&t);
#endif
std::strftime(dumpPath, sizeof(dumpPath), "crash_%Y%m%d_%H%M%S.dmp", &tm);
HANDLE hFile = CreateFileA(dumpPath, GENERIC_WRITE, 0, nullptr, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, nullptr);
if (hFile != INVALID_HANDLE_VALUE) {
MINIDUMP_EXCEPTION_INFORMATION mdei;
mdei.ThreadId = GetCurrentThreadId();
mdei.ExceptionPointers = pException;
mdei.ClientPointers = FALSE;
MiniDumpWriteDump(GetCurrentProcess(), GetCurrentProcessId(), hFile, MiniDumpNormal, &mdei, nullptr, nullptr);
CloseHandle(hFile);
}
return EXCEPTION_EXECUTE_HANDLER;
});
#endif//!defined(_WIN32)
}

View File

@ -21,6 +21,14 @@
#define RGB_TO_U(R, G, B) (((-26 * (R) - 87 * (G) + 112 * (B) + 128) >> 8) + 128)
#define RGB_TO_V(R, G, B) (((112 * (R) - 102 * (G) - 10 * (B) + 128) >> 8) + 128)
static void fill_yuv_func(const mediakit::FFmpegFrame::Ptr &frame, int y, int u, int v) {
const auto& yuv = frame->get();
memset(yuv->data[0], y, yuv->linesize[0] * yuv->height);
memset(yuv->data[1], u, yuv->linesize[1] * ((yuv->height + 1) / 2));
memset(yuv->data[2], v, yuv->linesize[2] * ((yuv->height + 1) / 2));
}
INSTANCE_IMP(VideoStackManager)
Param::~Param() {
@ -31,6 +39,13 @@ Param::~Param() {
Channel::Channel(const std::string& id, int width, int height, AVPixelFormat pixfmt)
: _id(id), _width(width), _height(height), _pixfmt(pixfmt) {
#if defined(VIDEOSTACK_KEEP_ASPECT_RATIO)
_keepAspectRatio = true;
#else
_keepAspectRatio = false;
#endif
_lastWidht = 0;
_lastHeight = 0;
_tmp = std::make_shared<mediakit::FFmpegFrame>();
_tmp->get()->width = _width;
@ -39,14 +54,9 @@ Channel::Channel(const std::string& id, int width, int height, AVPixelFormat pix
av_frame_get_buffer(_tmp->get(), 32);
memset(_tmp->get()->data[0], 0, _tmp->get()->linesize[0] * _height);
memset(_tmp->get()->data[1], 0, _tmp->get()->linesize[1] * _height / 2);
memset(_tmp->get()->data[2], 0, _tmp->get()->linesize[2] * _height / 2);
auto frame = VideoStackManager::Instance().getBgImg();
_sws = std::make_shared<mediakit::FFmpegSws>(_pixfmt, _width, _height);
_tmp = _sws->inputFrame(frame);
resizeFrame(frame);
}
void Channel::addParam(const std::weak_ptr<Param>& p) {
@ -60,8 +70,7 @@ void Channel::onFrame(const mediakit::FFmpegFrame::Ptr& frame) {
_poller->async([weakSelf, frame]() {
auto self = weakSelf.lock();
if (!self) { return; }
self->_tmp = self->_sws->inputFrame(frame);
self->resizeFrame(frame);
self->forEachParam([self](const Param::Ptr& p) { self->fillBuffer(p); });
});
}
@ -110,6 +119,78 @@ void Channel::copyData(const mediakit::FFmpegFrame::Ptr& buf, const Param::Ptr&
default: WarnL << "No support pixformat: " << av_get_pix_fmt_name(p->pixfmt); break;
}
}
void Channel::resizeFrame(const mediakit::FFmpegFrame::Ptr &frame) {
if (_keepAspectRatio) {
resizeFrameImplWithAspectRatio(frame);
} else {
resizeFrameImplWithoutAspectRatio(frame);
}
}
void Channel::resizeFrameImplWithAspectRatio(const mediakit::FFmpegFrame::Ptr &frame) {
int srcWidth = frame->get()->width;
int srcHeight = frame->get()->height;
if (srcWidth <= 0 || srcHeight <= 0) {
return;
}
// 当新frame宽高变化时重新初始化sws
if (srcWidth != _lastWidht || srcHeight != _lastHeight) {
_lastWidht = srcWidth;
_lastHeight = srcHeight;
fill_yuv_func(_tmp, 16, 128, 128);
int dstWidth = _width;
int dstHeight = _height;
float srcAspectRatio = static_cast<float>(srcWidth) / srcHeight;
float dstAspectRatio = static_cast<float>(dstWidth) / dstHeight;
int scaledWidth, scaledHeight;
if (srcAspectRatio > dstAspectRatio) {
scaledWidth = dstWidth;
scaledHeight = static_cast<int>(dstWidth / srcAspectRatio);
} else {
scaledHeight = dstHeight;
scaledWidth = static_cast<int>(dstHeight * srcAspectRatio);
}
_offsetX = (dstWidth - scaledWidth) / 2;
_offsetY = (dstHeight - scaledHeight) / 2;
_sws = std::make_shared<mediakit::FFmpegSws>(_pixfmt, scaledWidth, scaledHeight);
}
auto scaledFrame = _sws->inputFrame(frame);
int copyWidth = ((_width) < (scaledFrame->get()->width) ? (_width) : (scaledFrame->get()->width));
int copyHeight = ((_height) < (scaledFrame->get()->height) ? (_height) : (scaledFrame->get()->height));
for (int i = 0; i < copyHeight; i++) {
memcpy(
_tmp->get()->data[0] + (i + _offsetY) * _tmp->get()->linesize[0] + _offsetX, scaledFrame->get()->data[0] + i * scaledFrame->get()->linesize[0],
copyWidth);
}
for (int i = 0; i < (copyHeight + 1) / 2; i++) {
memcpy(
_tmp->get()->data[1] + (i + _offsetY / 2) * _tmp->get()->linesize[1] + _offsetX / 2,
scaledFrame->get()->data[1] + i * scaledFrame->get()->linesize[1], copyWidth / 2);
memcpy(
_tmp->get()->data[2] + (i + _offsetY / 2) * _tmp->get()->linesize[2] + _offsetX / 2,
scaledFrame->get()->data[2] + i * scaledFrame->get()->linesize[2], copyWidth / 2);
}
}
void Channel::resizeFrameImplWithoutAspectRatio(const mediakit::FFmpegFrame::Ptr &frame) {
if (!_sws) {
fill_yuv_func(_tmp, 16, 128, 128);
_sws = std::make_shared<mediakit::FFmpegSws>(_pixfmt, _width, _height);
}
_tmp = _sws->inputFrame(frame);
}
void StackPlayer::addChannel(const std::weak_ptr<Channel>& chn) {
std::lock_guard<std::recursive_mutex> lock(_mx);
_channels.push_back(chn);
@ -300,9 +381,7 @@ void VideoStack::initBgColor() {
double U = RGB_TO_U(R, G, B);
double V = RGB_TO_V(R, G, B);
memset(_buffer->get()->data[0], Y, _buffer->get()->linesize[0] * _height);
memset(_buffer->get()->data[1], U, _buffer->get()->linesize[1] * _height / 2);
memset(_buffer->get()->data[2], V, _buffer->get()->linesize[2] * _height / 2);
fill_yuv_func(_buffer, Y, U, V);
}
Channel::Ptr VideoStackManager::getChannel(const std::string& id, int width, int height,

View File

@ -62,12 +62,24 @@ protected:
void copyData(const mediakit::FFmpegFrame::Ptr& buf, const Param::Ptr& p);
void resizeFrame(const mediakit::FFmpegFrame::Ptr &frame);
void resizeFrameImplWithAspectRatio(const mediakit::FFmpegFrame::Ptr &frame);
void resizeFrameImplWithoutAspectRatio(const mediakit::FFmpegFrame::Ptr &frame);
private:
std::string _id;
int _width;
int _height;
AVPixelFormat _pixfmt;
int _lastWidht;
int _lastHeight;
bool _keepAspectRatio;
int _offsetX;
int _offsetY;
mediakit::FFmpegFrame::Ptr _tmp;
std::recursive_mutex _mx;

View File

@ -245,6 +245,10 @@ extern std::vector<size_t> getBlockTypeSize();
extern uint64_t getTotalMemBlockByType(int type);
extern uint64_t getThisThreadMemBlockByType(int type) ;
namespace mediakit {
extern ThreadPool &getMP4Thread();
extern ThreadPool &getHlsThread();
}
static void *web_api_tag = nullptr;
static inline void addHttpListener(){
@ -334,7 +338,7 @@ public:
return _map.erase(key);
}
size_t size() {
size_t size() {
std::lock_guard<std::recursive_mutex> lck(_mtx);
return _map.size();
}
@ -425,7 +429,10 @@ Value ToJson(const PusherProxy::Ptr& p) {
item["url"] = p->getUrl();
item["status"] = p->getStatus();
item["liveSecs"] = p->getLiveSecs();
item["rePublishCount"] = p->getRePublishCount();
item["rePublishCount"] = p->getRePublishCount();
item["bytesSpeed"] = (Json::UInt64) p->getSendSpeed();
item["totalBytes"] =(Json::UInt64) p->getSendTotalBytes();
if (auto src = p->getSrc()) {
dumpMediaTuple(src->getMediaTuple(), item["src"]);
}
@ -439,6 +446,9 @@ Value ToJson(const PlayerProxy::Ptr& p) {
item["liveSecs"] = p->getLiveSecs();
item["rePullCount"] = p->getRePullCount();
item["totalReaderCount"] = p->totalReaderCount();
item["bytesSpeed"] = (Json::UInt64) p->getRecvSpeed();
item["totalBytes"] = (Json::UInt64) p->getRecvTotalBytes();
dumpMediaTuple(p->getMediaTuple(), item["src"]);
return item;
}
@ -449,7 +459,8 @@ Value makeMediaSourceJson(MediaSource &media){
dumpMediaTuple(media.getMediaTuple(), item);
item["createStamp"] = (Json::UInt64) media.getCreateStamp();
item["aliveSecond"] = (Json::UInt64) media.getAliveSecond();
item["bytesSpeed"] = media.getBytesSpeed();
item["bytesSpeed"] = (Json::UInt64) media.getBytesSpeed();
item["totalBytes"] = (Json::UInt64) media.getTotalBytes();
item["readerCount"] = media.readerCount();
item["totalReaderCount"] = media.totalReaderCount();
item["originType"] = (int) media.getOriginType();
@ -587,7 +598,7 @@ void getStatisticJson(const function<void(Value &val)> &cb) {
val["totalMemBlockTypeCount"] = str;
}
auto thread_size = EventPollerPool::Instance().getExecutorSize() + WorkThreadPool::Instance().getExecutorSize();
auto thread_size = 2 + EventPollerPool::Instance().getExecutorSize() + WorkThreadPool::Instance().getExecutorSize();
std::shared_ptr<vector<Value> > thread_mem_info = std::make_shared<vector<Value> >(thread_size);
shared_ptr<void> finished(nullptr, [thread_mem_info, cb, obj](void *) {
@ -626,6 +637,8 @@ void getStatisticJson(const function<void(Value &val)> &cb) {
};
EventPollerPool::Instance().for_each(lam1);
WorkThreadPool::Instance().for_each(lam1);
lam0(getMP4Thread());
lam0(getHlsThread());
#else
cb(*obj);
#endif
@ -749,7 +762,7 @@ void addStreamPusherProxy(const string &schema,
* Install api interface
* All apis support GET and POST methods
* POST method parameters support application/json and application/x-www-form-urlencoded methods
* [AUTO-TRANSLATED:62e68c43]
*/
void installWebApi() {
@ -1009,9 +1022,9 @@ void installWebApi() {
},
[](toolkit::Any &&info) -> toolkit::Any {
auto obj = std::make_shared<Value>();
auto &sock = info.get<SockInfo>();
fillSockInfo(*obj, &sock);
(*obj)["typeid"] = toolkit::demangle(typeid(sock).name());
auto &session = info.get<Session>();
fillSockInfo(*obj, &session);
(*obj)["typeid"] = toolkit::demangle(typeid(session).name());
toolkit::Any ret;
ret.set(obj);
return ret;
@ -1191,25 +1204,27 @@ void installWebApi() {
auto dst_url = allArgs["dst_url"];
auto retry_count = allArgs["retry_count"].empty() ? -1 : allArgs["retry_count"].as<int>();
addStreamPusherProxy(allArgs["schema"],
allArgs["vhost"],
allArgs["app"],
allArgs["stream"],
allArgs["dst_url"],
retry_count,
allArgs["rtp_type"],
allArgs["timeout_sec"],
args,
[invoker, val, headerOut, dst_url](const SockException &ex, const string &key) mutable {
if (ex) {
val["code"] = API::OtherFailed;
val["msg"] = ex.what();
} else {
val["data"]["key"] = key;
InfoL << "Publish success, please play with player:" << dst_url;
}
invoker(200, headerOut, val.toStyledString());
});
EventPollerPool::Instance().getPoller(false)->async([=]() mutable {
addStreamPusherProxy(allArgs["schema"],
allArgs["vhost"],
allArgs["app"],
allArgs["stream"],
allArgs["dst_url"],
retry_count,
allArgs["rtp_type"],
allArgs["timeout_sec"],
args,
[invoker, val, headerOut, dst_url](const SockException &ex, const string &key) mutable {
if (ex) {
val["code"] = API::OtherFailed;
val["msg"] = ex.what();
} else {
val["data"]["key"] = key;
InfoL << "Publish success, please play with player:" << dst_url;
}
invoker(200, headerOut, val.toStyledString());
});
});
});
// 关闭推流代理 [AUTO-TRANSLATED:91602b75]
@ -1258,22 +1273,24 @@ void installWebApi() {
vhost = allArgs["vhost"];
}
auto tuple = MediaTuple { vhost, allArgs["app"], allArgs["stream"], "" };
addStreamProxy(tuple,
allArgs["url"],
retry_count,
option,
allArgs["rtp_type"],
allArgs["timeout_sec"],
args,
[invoker,val,headerOut](const SockException &ex,const string &key) mutable{
if (ex) {
val["code"] = API::OtherFailed;
val["msg"] = ex.what();
} else {
val["data"]["key"] = key;
}
invoker(200, headerOut, val.toStyledString());
});
EventPollerPool::Instance().getPoller(false)->async([=]() mutable {
addStreamProxy(tuple,
allArgs["url"],
retry_count,
option,
allArgs["rtp_type"],
allArgs["timeout_sec"],
args,
[invoker,val,headerOut](const SockException &ex,const string &key) mutable {
if (ex) {
val["code"] = API::OtherFailed;
val["msg"] = ex.what();
} else {
val["data"]["key"] = key;
}
invoker(200, headerOut, val.toStyledString());
});
});
});
// 关闭拉流代理 [AUTO-TRANSLATED:5204f128]
@ -1670,8 +1687,10 @@ void installWebApi() {
CHECK(muxer, "get muxer from media source failed");
src->getOwnerPoller()->async([=]() mutable {
muxer->forEachRtpSender([&](const std::string &ssrc) mutable {
muxer->forEachRtpSender([&](const std::string &ssrc, const RtpSender &sender) mutable {
val["data"].append(ssrc);
val["bytesSpeed"] = (Json::UInt64)sender.getSendSpeed();
val["totalBytes"] = (Json::UInt64)sender.getSendTotalBytes();
});
invoker(200, headerOut, val.toStyledString());
});
@ -2203,6 +2222,7 @@ void installWebApi() {
// sample_ms设置为0从配置文件加载file_repeat可以指定如果配置文件也指定循环解复用那么强制开启 [AUTO-TRANSLATED:23e826b4]
// sample_ms is set to 0, loaded from the configuration file; file_repeat can be specified, if the configuration file also specifies loop demultiplexing, then force it to be enabled
reader->startReadMP4(0, true, allArgs["file_repeat"]);
val["data"]["duration_ms"] = (Json::UInt64)reader->getDemuxer()->getDurationMS();
});
#endif

View File

@ -339,6 +339,10 @@ static mINI jsonToMini(const Value &obj) {
mINI ret;
if (obj.isObject()) {
for (auto it = obj.begin(); it != obj.end(); ++it) {
if (it->isNull()) {
// 忽略null修复wvp传null覆盖Protocol配置的问题
continue;
}
try {
auto str = (*it).asString();
ret[it.name()] = std::move(str);

View File

@ -41,9 +41,7 @@
#include "ZLMVersion.h"
#endif
#if !defined(_WIN32)
#include "System.h"
#endif//!defined(_WIN32)
using namespace std;
using namespace toolkit;
@ -259,10 +257,11 @@ int start_main(int argc,char *argv[]) {
// Start daemon process
System::startDaemon(kill_parent_if_failed);
}
#endif //! defined(_WIN32)
// 开启崩溃捕获等 [AUTO-TRANSLATED:9c7c759c]
// Enable crash capture, etc.
System::systemSetup();
#endif//!defined(_WIN32)
// 启动异步日志线程 [AUTO-TRANSLATED:c93cc6f4]
// Start asynchronous log thread

View File

@ -110,13 +110,20 @@ std::shared_ptr<void> MediaSource::getOwnership() {
});
}
int MediaSource::getBytesSpeed(TrackType type){
size_t MediaSource::getBytesSpeed(TrackType type) {
if(type == TrackInvalid || type == TrackMax){
return _speed[TrackVideo].getSpeed() + _speed[TrackAudio].getSpeed();
}
return _speed[type].getSpeed();
}
size_t MediaSource::getTotalBytes(TrackType type) {
if (type == TrackInvalid || type == TrackMax) {
return _speed[TrackVideo].getTotalBytes() + _speed[TrackAudio].getTotalBytes();
}
return _speed[type].getTotalBytes();
}
uint64_t MediaSource::getAliveSecond() const {
// 使用Ticker对象获取存活时间的目的是防止修改系统时间导致回退 [AUTO-TRANSLATED:68474061]
// The purpose of using the Ticker object to obtain the survival time is to prevent the modification of the system time from causing a rollback

View File

@ -442,7 +442,9 @@ public:
// 获取数据速率单位bytes/s [AUTO-TRANSLATED:c70465c1]
// Get data rate, unit bytes/s
int getBytesSpeed(TrackType type = TrackInvalid);
size_t getBytesSpeed(TrackType type = TrackInvalid);
size_t getTotalBytes(TrackType type = TrackInvalid);
// 获取流创建GMT unix时间戳单位秒 [AUTO-TRANSLATED:0bbe145e]
// Get the stream creation GMT unix timestamp, unit seconds
uint64_t getCreateStamp() const { return _create_stamp; }

View File

@ -183,9 +183,12 @@ std::string MultiMediaSourceMuxer::shortUrl() const {
return _tuple.shortUrl();
}
void MultiMediaSourceMuxer::forEachRtpSender(const std::function<void(const std::string &ssrc)> &cb) const {
void MultiMediaSourceMuxer::forEachRtpSender(const std::function<void(const std::string &ssrc, const RtpSender &sender)> &cb) const {
for (auto &pr : _rtp_sender) {
cb(pr.first);
auto sender = std::get<1>(pr.second).lock();
if (sender) {
cb(pr.first, *sender);
}
}
}
@ -400,7 +403,7 @@ bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type
void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const MediaSourceEvent::SendRtpArgs &args, const std::function<void(uint16_t, const toolkit::SockException &)> cb) {
#if defined(ENABLE_RTPPROXY)
createGopCacheIfNeed();
createGopCacheIfNeed(1);
auto ring = _ring;
auto ssrc = args.ssrc;
@ -443,10 +446,11 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const MediaSourceE
// 可能归属线程发生变更 [AUTO-TRANSLATED:2b379e30]
// The owning thread may change
strong_self->getOwnerPoller(MediaSource::NullMediaSource())->async([=]() {
if(!ssrc_multi_send) {
if (!ssrc_multi_send) {
strong_self->_rtp_sender.erase(ssrc);
}
strong_self->_rtp_sender.emplace(ssrc,reader);
std::weak_ptr<RtpSender> sender = rtp_sender;
strong_self->_rtp_sender.emplace(ssrc, make_tuple(reader, sender));
});
});
#else
@ -576,9 +580,9 @@ void MultiMediaSourceMuxer::onAllTrackReady() {
}
#if defined(ENABLE_RTPPROXY)
GET_CONFIG(bool, gop_cache, RtpProxy::kGopCache);
if (gop_cache) {
createGopCacheIfNeed();
GET_CONFIG(size_t, gop_cache, RtpProxy::kGopCache);
if (gop_cache > 0) {
createGopCacheIfNeed(gop_cache);
}
#endif
@ -593,7 +597,7 @@ void MultiMediaSourceMuxer::onAllTrackReady() {
InfoL << "stream: " << shortUrl() << " , codec info: " << getTrackInfoStr(this);
}
void MultiMediaSourceMuxer::createGopCacheIfNeed() {
void MultiMediaSourceMuxer::createGopCacheIfNeed(size_t gop_count) {
if (_ring) {
return;
}
@ -607,7 +611,7 @@ void MultiMediaSourceMuxer::createGopCacheIfNeed() {
strong_self->onReaderChanged(*src, strong_self->totalReaderCount());
});
}
});
}, gop_count);
}
void MultiMediaSourceMuxer::resetTracks() {

View File

@ -194,7 +194,7 @@ public:
const MediaTuple &getMediaTuple() const;
std::string shortUrl() const;
void forEachRtpSender(const std::function<void(const std::string &ssrc)> &cb) const;
void forEachRtpSender(const std::function<void(const std::string &ssrc, const RtpSender &sender)> &cb) const;
protected:
/////////////////////////////////MediaSink override/////////////////////////////////
@ -231,7 +231,7 @@ protected:
bool onTrackFrame_l(const Frame::Ptr &frame);
private:
void createGopCacheIfNeed();
void createGopCacheIfNeed(size_t gop_count);
std::shared_ptr<MediaSinkInterface> makeRecorder(MediaSource &sender, Recorder::type type);
private:
@ -245,7 +245,7 @@ private:
toolkit::Ticker _last_check;
std::unordered_map<int, Stamp> _stamps;
std::weak_ptr<Listener> _track_listener;
std::unordered_multimap<std::string, RingType::RingReader::Ptr> _rtp_sender;
std::unordered_multimap<std::string, std::tuple<RingType::RingReader::Ptr, std::weak_ptr<RtpSender>>> _rtp_sender;
FMP4MediaSourceMuxer::Ptr _fmp4;
RtmpMediaSourceMuxer::Ptr _rtmp;
RtspMediaSourceMuxer::Ptr _rtsp;

View File

@ -553,8 +553,8 @@ extern const std::string kPSPT;
// rtp server opus 的pt [AUTO-TRANSLATED:9f91f85a]
// Rtp server opus pt
extern const std::string kOpusPT;
// RtpSender相关功能是否提前开启gop缓存优化级联秒开体验默认开启 [AUTO-TRANSLATED:40c37c77]
// Whether to enable gop cache optimization cascade second-open experience for RtpSender related functions, enabled by default
// startSendRtp、startRecord相关功能是否提前开启gop缓存优化级联秒开体验默认开启, 并缓存1个GOP [AUTO-TRANSLATED:40c37c77]
// Whether to enable gop cache optimization cascade second-open experience for startSendRtp/startRecord related functions, enabled by default, and cached 1 GOP
extern const std::string kGopCache;
// 国标发送g711 rtp 打包时每个包的语音时长是多少默认是100 ms范围为20~180ms (gb28181-2016c.2.4规定) [AUTO-TRANSLATED:3b3916a3]
// When sending g711 rtp packets in national standard, what is the duration of each packet, the default is 100 ms, the range is 20~180ms (gb28181-2016, c.2.4),

View File

@ -1,4 +1,4 @@
/*
/*
* Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit).

View File

@ -133,6 +133,10 @@ void HlsPlayer::fetchSegment() {
if (!(*this)[Client::kNetAdapter].empty()) {
_http_ts_player->setNetAdapter((*this)[Client::kNetAdapter]);
}
} else {
// 每次请求新的ts片段时重置HttpTSPlayer状态
_http_ts_player->clear();
_http_ts_player->setProxyUrl((*this)[Client::kProxyUrl]);
}
Ticker ticker;
@ -278,6 +282,7 @@ void HlsPlayer::onResponseHeader(const string &status, const HttpClient::HttpHea
void HlsPlayer::onResponseBody(const char *buf, size_t size) {
_m3u8.append(buf, size);
_recvtotalbytes += getRecvTotalBytes();
}
void HlsPlayer::onResponseCompleted(const SockException &ex) {
@ -353,6 +358,13 @@ void HlsPlayer::playDelay(float delay_sec) {
}, getPoller()));
}
size_t HlsPlayer::getRecvSpeed() {
return TcpClient::getRecvSpeed() + (_http_ts_player ? _http_ts_player->getRecvSpeed() : 0);
}
size_t HlsPlayer::getRecvTotalBytes() {
return TcpClient::getRecvTotalBytes() + (_http_ts_player ? _http_ts_player->getRecvTotalBytes() : 0);
}
//////////////////////////////////////////////////////////////////////////
void HlsDemuxer::start(const EventPoller::Ptr &poller, TrackListener *listener) {
@ -476,6 +488,7 @@ void HlsPlayerImp::onPacket(const char *data, size_t len) {
if (_decoder && _demuxer) {
_decoder->input((uint8_t *) data, len);
}
_recvtotalbytes += HlsPlayer::getRecvTotalBytes();
}
void HlsPlayerImp::addTrackCompleted() {
@ -527,4 +540,11 @@ vector<Track::Ptr> HlsPlayerImp::getTracks(bool ready) const {
return static_pointer_cast<HlsDemuxer>(_demuxer)->getTracks(ready);
}
size_t HlsPlayerImp::getRecvSpeed() {
return PlayerImp<HlsPlayer, PlayerBase>::getRecvSpeed();
}
size_t HlsPlayerImp::getRecvTotalBytes() {
return _recvtotalbytes;
}
}//namespace mediakit

View File

@ -49,7 +49,7 @@ private:
std::deque<std::pair<int64_t, std::function<void()> > > _frame_cache;
};
class HlsPlayer : public HttpClientImp , public PlayerBase , public HlsParser{
class HlsPlayer: public HttpClientImp, public PlayerBase, public HlsParser {
public:
HlsPlayer(const toolkit::EventPoller::Ptr &poller);
@ -73,6 +73,9 @@ public:
*/
void teardown() override;
size_t getRecvSpeed() override;
size_t getRecvTotalBytes() override;
protected:
/**
* ts包
@ -127,12 +130,17 @@ private:
int _timeout_multiple = MIN_TIMEOUT_MULTIPLE;
int _try_fetch_index_times = 0;
int _ts_download_failed_count = 0;
protected:
size_t _recvtotalbytes = 0;
};
class HlsPlayerImp : public PlayerImp<HlsPlayer, PlayerBase>, private TrackListener {
class HlsPlayerImp final: public PlayerImp<HlsPlayer, PlayerBase>, private TrackListener {
public:
using Ptr = std::shared_ptr<HlsPlayerImp>;
HlsPlayerImp(const toolkit::EventPoller::Ptr &poller = nullptr);
size_t getRecvSpeed() override;
size_t getRecvTotalBytes() override;
private:
//// HlsPlayer override////

View File

@ -194,6 +194,13 @@ static std::shared_ptr<char> getSharedMmap(const string &file_path, int64_t &fil
}
HttpFileBody::HttpFileBody(const string &file_path, bool use_mmap) {
// 判断是否为目录避免对目录进行mmap操作导致程序崩溃。
if (File::is_dir(file_path)) {
_read_to = -1;
return;
}
if (use_mmap ) {
_map_addr = getSharedMmap(file_path, _read_to);
}

View File

@ -103,6 +103,8 @@ void HttpClient::clear() {
_user_set_header.clear();
_body.reset();
_method.clear();
// 重置代理连接状态
_proxy_connected = false;
clearResponse();
}
@ -182,6 +184,8 @@ void HttpClient::onConnect_l(const SockException &ex) {
_path.clear();
} else {
printer << "CONNECT " << _last_host << " HTTP/1.1\r\n";
printer << "Host: " << _last_host << "\r\n";
printer << "User-Agent: " << kServerName << "\r\n";
printer << "Proxy-Connection: keep-alive\r\n";
if (!_proxy_auth.empty()) {
printer << "Proxy-Authorization: Basic " << _proxy_auth << "\r\n";
@ -482,9 +486,14 @@ void HttpClient::setProxyUrl(string proxy_url) {
}
bool HttpClient::checkProxyConnected(const char *data, size_t len) {
auto ret = strstr(data, "HTTP/1.1 200 Connection established");
_proxy_connected = ret != nullptr;
return _proxy_connected;
string response(data, len);
if (response.find("HTTP/1.1 200") != string::npos || response.find("HTTP/1.0 200") != string::npos) {
_proxy_connected = true;
return true;
}
_proxy_connected = false;
return false;
}
void HttpClient::setAllowResendRequest(bool allow) {

View File

@ -315,37 +315,6 @@ static bool emitHlsPlayed(const Parser &parser, const MediaInfo &media_info, con
return flag;
}
class SockInfoImp : public SockInfo{
public:
using Ptr = std::shared_ptr<SockInfoImp>;
string get_local_ip() override {
return _local_ip;
}
uint16_t get_local_port() override {
return _local_port;
}
string get_peer_ip() override {
return _peer_ip;
}
uint16_t get_peer_port() override {
return _peer_port;
}
string getIdentifier() const override {
return _identifier;
}
string _local_ip;
string _peer_ip;
string _identifier;
uint16_t _local_port;
uint16_t _peer_port;
};
/**
* http客户端是否有权限访问文件的逻辑步骤
* 1http请求头查找cookie3
@ -415,17 +384,18 @@ static void canAccessPath(Session &sender, const Parser &parser, const MediaInfo
bool is_hls = media_info.schema == HLS_SCHEMA || media_info.schema == HLS_FMP4_SCHEMA;
SockInfoImp::Ptr info = std::make_shared<SockInfoImp>();
info->_identifier = sender.getIdentifier();
info->_peer_ip = sender.get_peer_ip();
info->_peer_port = sender.get_peer_port();
info->_local_ip = sender.get_local_ip();
info->_local_port = sender.get_local_port();
weak_ptr<Session> weak_session = static_pointer_cast<Session>(sender.shared_from_this());
// 该用户从来未获取过cookie这个时候我们广播是否允许该用户访问该http目录 [AUTO-TRANSLATED:8f4b3dd2]
// This user has never obtained a cookie, at this time we broadcast whether to allow this user to access this http directory
HttpSession::HttpAccessPathInvoker accessPathInvoker = [callback, uid, path, is_dir, is_hls, media_info, info]
HttpSession::HttpAccessPathInvoker accessPathInvoker = [callback, uid, path, is_dir, is_hls, media_info, weak_session]
(const string &err_msg, const string &cookie_path_in, int life_second) {
auto strong_session = weak_session.lock();
if (!strong_session) {
// http客户端已经断开不需要回复 [AUTO-TRANSLATED:9a252e21]
// The http client has disconnected and does not need to reply
return;
}
HttpServerCookie::Ptr cookie;
if (life_second) {
// 本次鉴权设置了有效期我们把鉴权结果缓存在cookie中 [AUTO-TRANSLATED:5a12f48e]
@ -447,7 +417,7 @@ static void canAccessPath(Session &sender, const Parser &parser, const MediaInfo
if (is_hls) {
// hls相关信息 [AUTO-TRANSLATED:37893a71]
// hls related information
attach->_hls_data = std::make_shared<HlsCookieData>(media_info, info);
attach->_hls_data = std::make_shared<HlsCookieData>(media_info, strong_session);
}
toolkit::Any any;
any.set(std::move(attach));

View File

@ -394,7 +394,7 @@ bool HttpSession::checkLiveStreamFMP4(const function<void()> &cb) {
_fmp4_reader = fmp4_src->getRing()->attach(getPoller());
_fmp4_reader->setGetInfoCB([weak_self]() {
Any ret;
ret.set(static_pointer_cast<SockInfo>(weak_self.lock()));
ret.set(static_pointer_cast<Session>(weak_self.lock()));
return ret;
});
_fmp4_reader->setDetachCB([weak_self]() {
@ -444,7 +444,7 @@ bool HttpSession::checkLiveStreamTS(const function<void()> &cb) {
_ts_reader = ts_src->getRing()->attach(getPoller());
_ts_reader->setGetInfoCB([weak_self]() {
Any ret;
ret.set(static_pointer_cast<SockInfo>(weak_self.lock()));
ret.set(static_pointer_cast<Session>(weak_self.lock()));
return ret;
});
_ts_reader->setDetachCB([weak_self]() {

View File

@ -57,4 +57,11 @@ void TsPlayer::onResponseBody(const char *buf, size_t size) {
}
}
size_t TsPlayer::getRecvSpeed() {
return TcpClient::getRecvSpeed();
}
size_t TsPlayer::getRecvTotalBytes() {
return TcpClient::getRecvTotalBytes();
}
} // namespace mediakit

View File

@ -28,6 +28,9 @@ public:
*/
void play(const std::string &url) override;
size_t getRecvSpeed() override;
size_t getRecvTotalBytes() override;
/**
*
* Stop playing

View File

@ -21,6 +21,8 @@ public:
using Ptr = std::shared_ptr<TsPlayerImp>;
TsPlayerImp(const toolkit::EventPoller::Ptr &poller = nullptr);
size_t getRecvSpeed() override;
size_t getRecvTotalBytes() override;
private:
//// TsPlayer override////

View File

@ -77,4 +77,11 @@ vector<Track::Ptr> TsPlayerImp::getTracks(bool ready) const {
return static_pointer_cast<HlsDemuxer>(_demuxer)->getTracks(ready);
}
size_t TsPlayerImp::getRecvSpeed() {
return TcpClient::getRecvSpeed();
}
size_t TsPlayerImp::getRecvTotalBytes() {
return TcpClient::getRecvTotalBytes();
}
}//namespace mediakit

View File

@ -165,6 +165,10 @@ public:
* [AUTO-TRANSLATED:8fb31d43]
*/
virtual void setOnResume(const std::function<void()> &cb) = 0;
virtual size_t getRecvSpeed() { return 0; }
virtual size_t getRecvTotalBytes() { return 0; }
protected:
virtual void onResume() = 0;
@ -256,6 +260,14 @@ public:
_on_resume = cb;
}
size_t getRecvSpeed() override {
return _delegate ? _delegate->getRecvSpeed() : Parent::getRecvSpeed();
}
size_t getRecvTotalBytes() override {
return _delegate ? _delegate->getRecvTotalBytes() : Parent::getRecvTotalBytes();
}
protected:
void onShutdown(const toolkit::SockException &ex) override {
if (_on_shutdown) {

View File

@ -35,6 +35,7 @@ public:
void setOnCreateSocket(toolkit::Socket::onCreateSocket cb);
std::shared_ptr<MediaSource> getSrc() { return _src.lock(); }
const std::string& getUrl() const { return _url; }
private:
std::weak_ptr<MediaSource> _src;
toolkit::EventPoller::Ptr _poller;

View File

@ -67,6 +67,9 @@ public:
*/
virtual void setOnShutdown(const Event &cb) = 0;
virtual size_t getSendSpeed() { return 0; }
virtual size_t getSendTotalBytes() { return 0; }
protected:
virtual void onShutdown(const toolkit::SockException &ex) = 0;
virtual void onPublishResult(const toolkit::SockException &ex) = 0;
@ -133,6 +136,14 @@ public:
_on_shutdown = cb;
}
size_t getSendSpeed() override {
return _delegate ? _delegate->getSendSpeed() : Parent::getSendSpeed();
}
size_t getSendTotalBytes() override {
return _delegate ? _delegate->getSendTotalBytes() : Parent::getSendTotalBytes();
}
protected:
void onShutdown(const toolkit::SockException &ex) override {
if (_on_shutdown) {

View File

@ -15,9 +15,37 @@ using namespace toolkit;
namespace mediakit {
HlsCookieData::HlsCookieData(const MediaInfo &info, const std::shared_ptr<SockInfo> &sock_info) {
class SockInfoImp : public SockInfo {
public:
using Ptr = std::shared_ptr<SockInfoImp>;
std::string get_local_ip() override { return _local_ip; }
uint16_t get_local_port() override { return _local_port; }
std::string get_peer_ip() override { return _peer_ip; }
uint16_t get_peer_port() override { return _peer_port; }
std::string getIdentifier() const override { return _identifier; }
std::string _local_ip;
std::string _peer_ip;
std::string _identifier;
uint16_t _local_port;
uint16_t _peer_port;
};
HlsCookieData::HlsCookieData(const MediaInfo &info, const std::shared_ptr<Session> &session) {
_info = info;
auto sock_info = std::make_shared<SockInfoImp>();
sock_info->_identifier = session->getIdentifier();
sock_info->_peer_ip = session->get_peer_ip();
sock_info->_peer_port = session->get_peer_port();
sock_info->_local_ip = session->get_local_ip();
sock_info->_local_port = session->get_local_port();
_sock_info = sock_info;
_session = session;
_added = std::make_shared<bool>(false);
addReaderCount();
}
@ -34,10 +62,10 @@ void HlsCookieData::addReaderCount() {
// HlsMediaSource has been destroyed
*added = false;
});
auto info = _sock_info;
_ring_reader->setGetInfoCB([info]() {
std::weak_ptr<Session> weak_session = _session;
_ring_reader->setGetInfoCB([weak_session]() {
Any ret;
ret.set(info);
ret.set(std::static_pointer_cast<Session>(weak_session.lock()));
return ret;
});
}

View File

@ -14,6 +14,7 @@
#include "Common/MediaSource.h"
#include "Util/TimeTicker.h"
#include "Util/RingBuffer.h"
#include "Network/Session.h"
#include <atomic>
namespace mediakit {
@ -89,7 +90,7 @@ class HlsCookieData {
public:
using Ptr = std::shared_ptr<HlsCookieData>;
HlsCookieData(const MediaInfo &info, const std::shared_ptr<toolkit::SockInfo> &sock_info);
HlsCookieData(const MediaInfo &info, const std::shared_ptr<toolkit::Session> &session);
~HlsCookieData();
void addByteUsage(size_t bytes);
@ -106,6 +107,7 @@ private:
toolkit::Ticker _ticker;
std::weak_ptr<HlsMediaSource> _src;
std::shared_ptr<toolkit::SockInfo> _sock_info;
std::weak_ptr<toolkit::Session> _session;
HlsMediaSource::RingType::RingReader::Ptr _ring_reader;
};

View File

@ -0,0 +1,26 @@
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include "HlsRecorder.h"
using namespace toolkit;
namespace mediakit {
ThreadPool &getHlsThread() {
static ThreadPool ret(1, ThreadPool::PRIORITY_LOWEST, true);
static onceToken s_token([]() {
ret.async([]() {
setThreadName("hls thread");
});
});
return ret;
}
}//namespace mediakit

View File

@ -15,9 +15,12 @@
#include "MPEG.h"
#include "MP4Muxer.h"
#include "Common/config.h"
#include "Thread/ThreadPool.h"
namespace mediakit {
toolkit::ThreadPool& getHlsThread();
template <typename Muxer>
class HlsRecorderBase : public MediaSourceEventInterceptor, public Muxer, public std::enable_shared_from_this<HlsRecorderBase<Muxer> > {
public:
@ -58,6 +61,15 @@ public:
}
bool inputFrame(const Frame::Ptr &frame) override {
auto ptr = this->shared_from_this();
auto cached_frame = Frame::getCacheAbleFrame(frame);
getHlsThread().async([ptr, cached_frame]() {
ptr->inputFrame_l(cached_frame);
});
return true;
}
bool inputFrame_l(const Frame::Ptr &frame) {
if (_clear_cache && _option.hls_demand) {
_clear_cache = false;
// 清空旧的m3u8索引文件于ts切片 [AUTO-TRANSLATED:a4ce0664]
@ -87,8 +99,12 @@ protected:
class HlsRecorder final : public HlsRecorderBase<MpegMuxer> {
public:
using Ptr = std::shared_ptr<HlsRecorder>;
template <typename ...ARGS>
HlsRecorder(ARGS && ...args) : HlsRecorderBase<MpegMuxer>(false, std::forward<ARGS>(args)...) {}
template <typename... ARGS>
static Ptr create(ARGS &&...args) {
return Ptr(new HlsRecorder(std::forward<ARGS>(args)...), [](HlsRecorder *ptr) { getHlsThread().async([ptr]() { delete ptr; }); });
}
~HlsRecorder() override {
try {
this->flush();
@ -98,6 +114,9 @@ public:
}
private:
template <typename ...ARGS>
HlsRecorder(ARGS && ...args) : HlsRecorderBase<MpegMuxer>(false, std::forward<ARGS>(args)...) {}
void onWrite(std::shared_ptr<toolkit::Buffer> buffer, uint64_t timestamp, bool key_pos) override {
if (!buffer) {
// reset tracks
@ -111,8 +130,12 @@ private:
class HlsFMP4Recorder final : public HlsRecorderBase<MP4MuxerMemory> {
public:
using Ptr = std::shared_ptr<HlsFMP4Recorder>;
template <typename ...ARGS>
HlsFMP4Recorder(ARGS && ...args) : HlsRecorderBase<MP4MuxerMemory>(true, std::forward<ARGS>(args)...) {}
template <typename... ARGS>
static Ptr create(ARGS &&...args) {
return Ptr(new HlsFMP4Recorder(std::forward<ARGS>(args)...), [](HlsFMP4Recorder *ptr) { getHlsThread().async([ptr]() { delete ptr; }); });
}
~HlsFMP4Recorder() override {
try {
this->flush();
@ -122,12 +145,18 @@ public:
}
void addTrackCompleted() override {
HlsRecorderBase<MP4MuxerMemory>::addTrackCompleted();
auto data = getInitSegment();
_hls->inputInitSegment(data.data(), data.size());
auto ptr = std::static_pointer_cast<HlsFMP4Recorder>(this->shared_from_this());
getHlsThread().async([ptr]() {
ptr->HlsRecorderBase<MP4MuxerMemory>::addTrackCompleted();
auto data = ptr->getInitSegment();
ptr->_hls->inputInitSegment(data.data(), data.size());
});
}
private:
template <typename ...ARGS>
HlsFMP4Recorder(ARGS && ...args) : HlsRecorderBase<MP4MuxerMemory>(true, std::forward<ARGS>(args)...) {}
void onSegmentData(std::string buffer, uint64_t timestamp, bool key_pos) override {
if (buffer.empty()) {
// reset tracks

View File

@ -138,7 +138,7 @@ Frame::Ptr MP4Demuxer::readFrame(bool &keyFrame, bool &eof) {
}
}
Frame::Ptr MP4Demuxer::makeFrame(uint32_t track_id, const Buffer::Ptr &buf, int64_t pts, int64_t dts) {
Frame::Ptr MP4Demuxer::makeFrame(uint32_t track_id, Buffer::Ptr buf, int64_t pts, int64_t dts) {
auto it = _tracks.find(track_id);
if (it == _tracks.end()) {
return nullptr;
@ -198,11 +198,11 @@ void MultiMP4Demuxer::openMP4(const string &files_string) {
std::vector<std::string> files;
if (File::is_dir(files_string)) {
File::scanDir(files_string, [&](const string &path, bool is_dir) {
if (!is_dir) {
if (!is_dir && end_with(path, ".mp4")) {
files.emplace_back(path);
}
return true;
});
}, true);
std::sort(files.begin(), files.end());
} else {
files = split(files_string, ";");

View File

@ -96,7 +96,7 @@ private:
int getAllTracks();
void onVideoTrack(uint32_t track_id, uint8_t object, int width, int height, const void *extra, size_t bytes);
void onAudioTrack(uint32_t track_id, uint8_t object, int channel_count, int bit_per_sample, int sample_rate, const void *extra, size_t bytes);
Frame::Ptr makeFrame(uint32_t track_id, const toolkit::Buffer::Ptr &buf, int64_t pts, int64_t dts);
Frame::Ptr makeFrame(uint32_t track_id, toolkit::Buffer::Ptr buf, int64_t pts, int64_t dts);
private:
MP4FileDisk::Ptr _mp4_file;

View File

@ -22,8 +22,21 @@ using namespace toolkit;
namespace mediakit {
ThreadPool &getMP4Thread() {
static ThreadPool ret(1, ThreadPool::PRIORITY_LOWEST, true);
static onceToken s_token([]() {
ret.async([]() {
setThreadName("mp4 thread");
});
});
return ret;
}
MP4Recorder::Ptr MP4Recorder::create(const MediaTuple &tuple, const std::string &path, size_t max_second) {
return Ptr(new MP4Recorder(tuple, path, max_second), [](MP4Recorder *ptr) { getMP4Thread().async([ptr]() { delete ptr; }); });
}
MP4Recorder::MP4Recorder(const MediaTuple &tuple, const string &path, size_t max_second) {
_folder_path = path;
// ///record 业务逻辑////// [AUTO-TRANSLATED:2e78931a]
// ///record Business Logic//////
static_cast<MediaTuple &>(_info) = tuple;
@ -44,9 +57,9 @@ MP4Recorder::~MP4Recorder() {
void MP4Recorder::createFile() {
closeFile();
auto date = getTimeStr("%Y-%m-%d");
auto file_name = getTimeStr("%H-%M-%S") + "-" + std::to_string(_file_index++) + ".mp4";
auto full_path = _folder_path + date + "/" + file_name;
auto full_path_tmp = _folder_path + date + "/." + file_name;
auto file_name = date + "-" + getTimeStr("%H-%M-%S") + "-" + std::to_string(_file_index++) + ".mp4";
auto full_path = _info.folder + date + "/" + file_name;
auto full_path_tmp = _info.folder + date + "/." + file_name;
// ///record 业务逻辑////// [AUTO-TRANSLATED:2e78931a]
// ///record Business Logic//////
@ -66,7 +79,6 @@ void MP4Recorder::createFile() {
_muxer->addTrack(track);
}
_full_path_tmp = full_path_tmp;
_full_path = full_path;
} catch (std::exception &ex) {
WarnL << ex.what();
}
@ -75,10 +87,9 @@ void MP4Recorder::createFile() {
void MP4Recorder::asyncClose() {
auto muxer = _muxer;
auto full_path_tmp = _full_path_tmp;
auto full_path = _full_path;
auto info = _info;
TraceL << "Start close tmp mp4 file: " << full_path_tmp;
WorkThreadPool::Instance().getExecutor()->async([muxer, full_path_tmp, full_path, info]() mutable {
getMP4Thread().async([muxer, full_path_tmp, info]() mutable {
info.time_len = muxer->getDuration() / 1000.0f;
// 关闭mp4可能非常耗时所以要放在后台线程执行 [AUTO-TRANSLATED:a7378a11]
// Closing mp4 can be very time-consuming, so it should be executed in the background thread
@ -97,9 +108,9 @@ void MP4Recorder::asyncClose() {
}
// 临时文件名改成正式文件名防止mp4未完成时被访问 [AUTO-TRANSLATED:541a6f00]
// Change the temporary file name to the official file name to prevent access to the mp4 before it is completed
rename(full_path_tmp.data(), full_path.data());
rename(full_path_tmp.data(), info.file_path.data());
}
TraceL << "Emit mp4 record event: " << full_path;
TraceL << "Emit mp4 record event: " << info.file_path;
// 触发mp4录制切片生成事件 [AUTO-TRANSLATED:9959dcd4]
// Trigger mp4 recording slice generation event
NOTICE_EMIT(BroadcastRecordMP4Args, Broadcast::kBroadcastRecordMP4, info);
@ -131,7 +142,7 @@ bool MP4Recorder::inputFrame(const Frame::Ptr &frame) {
// In the case of b-frames, the dts timestamp may regress
_last_dts = MIN(frame->dts(), _last_dts);
}
auto duration = 5u; // 默认至少一帧5ms
if (frame->dts() > 0 && frame->dts() > _last_dts) {
duration = MAX(duration, frame->dts() - _last_dts);
@ -153,7 +164,12 @@ bool MP4Recorder::inputFrame(const Frame::Ptr &frame) {
if (_muxer) {
// 生成mp4文件 [AUTO-TRANSLATED:76a8d77c]
// Generate mp4 file
return _muxer->inputFrame(frame);
auto muxer = _muxer;
auto cached_frame = Frame::getCacheAbleFrame(frame);
getMP4Thread().async([muxer, cached_frame]() {
return muxer->inputFrame(cached_frame);
});
return true;
}
return false;
}

View File

@ -26,7 +26,7 @@ class MP4Recorder final : public MediaSinkInterface {
public:
using Ptr = std::shared_ptr<MP4Recorder>;
MP4Recorder(const MediaTuple &tuple, const std::string &path, size_t max_second);
static Ptr create(const MediaTuple &tuple, const std::string &path, size_t max_second);
~MP4Recorder() override;
/**
@ -63,6 +63,8 @@ public:
bool addTrack(const Track::Ptr & track) override;
private:
MP4Recorder(const MediaTuple &tuple, const std::string &path, size_t max_second);
void createFile();
void closeFile();
void asyncClose();
@ -71,9 +73,7 @@ private:
bool _have_video = false;
size_t _max_second;
uint64_t _last_dts = 0;
uint64_t _file_index = 0;
std::string _folder_path;
std::string _full_path;
std::atomic<uint64_t> _file_index { 0 };
std::string _full_path_tmp;
RecordInfo _info;
MP4Muxer::Ptr _muxer;

View File

@ -78,7 +78,7 @@ std::shared_ptr<MediaSinkInterface> Recorder::createRecorder(type type, const Me
#if defined(ENABLE_HLS)
auto path = Recorder::getRecordPath(type, tuple, option.hls_save_path);
GET_CONFIG(bool, enable_vhost, General::kEnableVhost);
auto ret = std::make_shared<HlsRecorder>(path, enable_vhost ? string(VHOST_KEY) + "=" + tuple.vhost : "", option);
auto ret = HlsRecorder::create(path, enable_vhost ? string(VHOST_KEY) + "=" + tuple.vhost : "", option);
ret->setMediaSource(tuple);
return ret;
#else
@ -89,7 +89,7 @@ std::shared_ptr<MediaSinkInterface> Recorder::createRecorder(type type, const Me
case Recorder::type_mp4: {
#if defined(ENABLE_MP4)
auto path = Recorder::getRecordPath(type, tuple, option.mp4_save_path);
return std::make_shared<MP4Recorder>(tuple, path, option.mp4_max_second);
return MP4Recorder::create(tuple, path, option.mp4_max_second);
#else
throw std::invalid_argument("mp4相关功能未打开请开启ENABLE_MP4宏后编译再测试");
#endif
@ -99,7 +99,7 @@ std::shared_ptr<MediaSinkInterface> Recorder::createRecorder(type type, const Me
#if defined(ENABLE_MP4)
auto path = Recorder::getRecordPath(type, tuple, option.hls_save_path);
GET_CONFIG(bool, enable_vhost, General::kEnableVhost);
auto ret = std::make_shared<HlsFMP4Recorder>(path, enable_vhost ? string(VHOST_KEY) + "=" + tuple.vhost : "", option);
auto ret = HlsFMP4Recorder::create(path, enable_vhost ? string(VHOST_KEY) + "=" + tuple.vhost : "", option);
ret->setMediaSource(tuple);
return ret;
#else

View File

@ -13,7 +13,6 @@
#include "Rtmp/utils.h"
#include "Http/HttpSession.h"
#define FILE_BUF_SIZE (64 * 1024)
using namespace std;
using namespace toolkit;
@ -49,7 +48,7 @@ void FlvMuxer::start(const EventPoller::Ptr &poller, const RtmpMediaSource::Ptr
_ring_reader = media->getRing()->attach(poller);
_ring_reader->setGetInfoCB([weak_self]() {
Any ret;
ret.set(dynamic_pointer_cast<SockInfo>(weak_self.lock()));
ret.set(dynamic_pointer_cast<Session>(weak_self.lock()));
return ret;
});
_ring_reader->setDetachCB([weak_self]() {
@ -168,11 +167,12 @@ void FlvRecorder::startRecord(const EventPoller::Ptr &poller, const string &vhos
void FlvRecorder::startRecord(const EventPoller::Ptr &poller, const RtmpMediaSource::Ptr &media,
const string &file_path) {
GET_CONFIG(uint32_t, flvBufSize, Record::kFileBufSize);
stop();
lock_guard<recursive_mutex> lck(_file_mtx);
// 开辟文件写缓存 [AUTO-TRANSLATED:22d1c17f]
// Allocate file write cache.
std::shared_ptr<char> fileBuf(new char[FILE_BUF_SIZE], [](char *ptr) {
std::shared_ptr<char> fileBuf(new char[flvBufSize], [](char *ptr) {
if (ptr) {
delete[] ptr;
}
@ -191,7 +191,7 @@ void FlvRecorder::startRecord(const EventPoller::Ptr &poller, const RtmpMediaSou
// 设置文件写缓存 [AUTO-TRANSLATED:a767e55c]
// Set the file write cache.
setvbuf(_file.get(), fileBuf.get(), _IOFBF, FILE_BUF_SIZE);
setvbuf(_file.get(), fileBuf.get(), _IOFBF, flvBufSize);
start(poller, media);
}

View File

@ -76,4 +76,12 @@ void FlvPlayer::onRecvRtmpPacket(RtmpPacket::Ptr packet) {
onRtmpPacket(std::move(packet));
}
size_t FlvPlayer::getRecvSpeed() {
return TcpClient::getRecvSpeed();
}
size_t FlvPlayer::getRecvTotalBytes() {
return TcpClient::getRecvTotalBytes();
}
}//mediakit

View File

@ -23,6 +23,8 @@ public:
void play(const std::string &url) override;
void teardown() override;
size_t getRecvSpeed() override;
size_t getRecvTotalBytes() override;
protected:
void onResponseHeader(const std::string &status, const HttpHeader &header) override;

View File

@ -452,4 +452,12 @@ void RtmpPlayer::seekToMilliSecond(uint32_t seekMS){
});
}
size_t RtmpPlayer::getRecvSpeed() {
return TcpClient::getRecvSpeed();
}
size_t RtmpPlayer::getRecvTotalBytes() {
return TcpClient::getRecvTotalBytes();
}
} /* namespace mediakit */

View File

@ -37,6 +37,9 @@ public:
void speed(float speed) override;
void teardown() override;
size_t getRecvSpeed() override;
size_t getRecvTotalBytes() override;
protected:
virtual bool onMetadata(const AMFValue &val) = 0;
virtual void onRtmpPacket(RtmpPacket::Ptr chunk_data) = 0;

View File

@ -332,6 +332,12 @@ void RtmpPusher::onRtmpChunk(RtmpPacket::Ptr packet) {
}
}
size_t RtmpPusher::getSendSpeed() {
return TcpClient::getSendSpeed();
}
size_t RtmpPusher::getSendTotalBytes() {
return TcpClient::getSendTotalBytes();
}
} /* namespace mediakit */

View File

@ -27,6 +27,9 @@ public:
void publish(const std::string &url) override ;
void teardown() override;
size_t getSendSpeed() override;
size_t getSendTotalBytes() override;
protected:
//for Tcpclient override
void onRecv(const toolkit::Buffer::Ptr &buf) override;

View File

@ -308,7 +308,7 @@ void RtmpSession::sendPlayResponse(const string &err, const RtmpMediaSource::Ptr
weak_ptr<RtmpSession> weak_self = static_pointer_cast<RtmpSession>(shared_from_this());
_ring_reader->setGetInfoCB([weak_self]() {
Any ret;
ret.set(static_pointer_cast<SockInfo>(weak_self.lock()));
ret.set(static_pointer_cast<Session>(weak_self.lock()));
return ret;
});
_ring_reader->setReadCB([weak_self](const RtmpMediaSource::RingDataType &pkt) {

View File

@ -13,7 +13,7 @@
#include "RawEncoder.h"
#include "Extension/Factory.h"
#include "Rtsp/RtspMuxer.h"
#include "Common//config.h"
#include "Common/config.h"
using namespace toolkit;

View File

@ -107,6 +107,7 @@ bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data
if (!_auth_err.empty()) {
throw toolkit::SockException(toolkit::Err_other, _auth_err);
}
auto header = (RtpHeader *) data;
if (_sock != sock) {
// 第一次运行本函数 [AUTO-TRANSLATED:a1d7ac17]
// First time running this function
@ -114,7 +115,7 @@ bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data
_sock = sock;
_addr.reset(new sockaddr_storage(*((sockaddr_storage *)addr)));
if (first) {
emitOnPublish();
emitOnPublish(ntohl(header->ssrc));
_cache_ticker.resetTime();
}
}
@ -131,7 +132,6 @@ bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data
_process = std::make_shared<GB28181Process>(_media_info, this);
}
auto header = (RtpHeader *) data;
onRtp(ntohs(header->seq), ntohl(header->stamp), 0/*不发送sr,所以可以设置为0*/ , 90000/*ps/ts流时间戳按照90K采样率*/, len);
GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir);
@ -271,15 +271,15 @@ string RtpProcess::getIdentifier() const {
return _media_info.stream;
}
void RtpProcess::emitOnPublish() {
void RtpProcess::emitOnPublish(uint32_t ssrc) {
weak_ptr<RtpProcess> weak_self = shared_from_this();
Broadcast::PublishAuthInvoker invoker = [weak_self](const string &err, const ProtocolOption &option) {
Broadcast::PublishAuthInvoker invoker = [weak_self, ssrc](const string &err, const ProtocolOption &option) {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
auto poller = strong_self->getOwnerPoller(MediaSource::NullMediaSource());
poller->async([weak_self, err, option]() {
poller->async([weak_self, err, option, ssrc]() {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
@ -293,7 +293,7 @@ void RtpProcess::emitOnPublish() {
}
strong_self->_muxer->setMediaListener(strong_self);
strong_self->doCachedFunc();
InfoP(strong_self) << "允许RTP推流";
InfoP(strong_self) << "允许RTP推流ssrc: " << printSSRC(ssrc);
} else {
strong_self->_auth_err = err;
WarnP(strong_self) << "禁止RTP推流:" << err;

View File

@ -122,7 +122,7 @@ protected:
private:
RtpProcess(const MediaTuple &tuple);
void emitOnPublish();
void emitOnPublish(uint32_t ssrc);
void doCachedFunc();
bool alive();
void onManager();

View File

@ -457,5 +457,49 @@ void RtpSender::setOnClose(std::function<void(const toolkit::SockException &ex)>
_on_close = std::move(on_close);
}
size_t RtpSender::getSendSpeed() const {
size_t ret = 0;
if (_socket_rtp) {
ret += _socket_rtp->getSendSpeed();
}
if (_socket_rtcp) {
ret += _socket_rtcp->getSendSpeed();
}
return ret;
}
size_t RtpSender::getRecvSpeed() const {
size_t ret = 0;
if (_socket_rtp) {
ret += _socket_rtp->getRecvSpeed();
}
if (_socket_rtcp) {
ret += _socket_rtcp->getRecvSpeed();
}
return ret;
}
size_t RtpSender::getRecvTotalBytes() const {
size_t ret = 0;
if (_socket_rtp) {
ret += _socket_rtp->getRecvTotalBytes();
}
if (_socket_rtcp) {
ret += _socket_rtcp->getRecvTotalBytes();
}
return ret;
}
size_t RtpSender::getSendTotalBytes() const {
size_t ret = 0;
if (_socket_rtp) {
ret += _socket_rtp->getSendTotalBytes();
}
if (_socket_rtcp) {
ret += _socket_rtcp->getSendTotalBytes();
}
return ret;
}
} // namespace mediakit
#endif // defined(ENABLE_RTPPROXY)

View File

@ -94,6 +94,11 @@ public:
*/
void setOnClose(std::function<void(const toolkit::SockException &ex)> on_close);
size_t getSendSpeed() const;
size_t getRecvSpeed() const;
size_t getRecvTotalBytes() const;
size_t getSendTotalBytes() const;
private:
// 合并写输出 [AUTO-TRANSLATED:23544836]
// Merge write output

View File

@ -188,8 +188,14 @@ private:
}
iterator popIterator(iterator it) {
output(it->first, std::move(it->second));
return _pkt_sort_cache_map.erase(it);
try {
output(it->first, std::move(it->second));
return _pkt_sort_cache_map.erase(it);
} catch (...) {
// 防止抛异常未移除迭代器导致rtp包为空
_pkt_sort_cache_map.erase(it);
throw;
}
}
void output(SEQ seq, T packet) {

View File

@ -265,7 +265,7 @@ void RtspPlayer::sendSetup(unsigned int track_idx) {
case Rtsp::RTP_TCP: {
sendRtspRequest(
"SETUP", control_url,
{ "Transport", StrPrinter << "RTP/AVP/TCP;unicast;interleaved=" << track->_type * 2 << "-" << track->_type * 2 + 1 << ";mode=play" });
{ "Transport", StrPrinter << "RTP/AVP/TCP;unicast;interleaved=" << track_idx * 2 << "-" << track_idx * 2 + 1 << ";mode=play" });
} break;
case Rtsp::RTP_MULTICAST: {
sendRtspRequest("SETUP", control_url, { "Transport", "RTP/AVP;multicast;mode=play" });
@ -552,7 +552,9 @@ void RtspPlayer::onRtpPacket(const char *data, size_t len) {
int trackIdx = -1;
uint8_t interleaved = data[1];
if (interleaved % 2 == 0) {
trackIdx = getTrackIndexByInterleaved(interleaved);
CHECK(len > RtpPacket::kRtpHeaderSize + RtpPacket::kRtpTcpHeaderSize);
RtpHeader *header = (RtpHeader *)(data + RtpPacket::kRtpTcpHeaderSize);
trackIdx = getTrackIndexByPT(header->pt);
if (trackIdx == -1) {
return;
}
@ -798,12 +800,25 @@ void RtspPlayer::onPlayResult_l(const SockException &ex, bool handshake_done) {
};
// 创建rtp数据接收超时检测定时器 [AUTO-TRANSLATED:edbffc19]
// Create RTP data receive timeout detection timer
_rtp_check_timer = std::make_shared<Timer>(timeoutMS / 2000.0f, lam, getPoller());
_rtp_check_timer = std::make_shared<Timer>(timeoutMS / 2000.0f, std::move(lam), getPoller());
} else {
sendTeardown();
}
}
int RtspPlayer::getTrackIndexByPT(int pt) const {
for (size_t i = 0; i < _sdp_track.size(); ++i) {
if (_sdp_track[i]->_pt == pt) {
return i;
}
}
if (_sdp_track.size() == 1) {
return 0;
}
WarnL << "no such track with pt:" << pt;
return -1;
}
int RtspPlayer::getTrackIndexByInterleaved(int interleaved) const {
for (size_t i = 0; i < _sdp_track.size(); ++i) {
if (_sdp_track[i]->_interleaved == interleaved) {
@ -829,6 +844,36 @@ int RtspPlayer::getTrackIndexByTrackType(TrackType track_type) const {
throw SockException(Err_other, StrPrinter << "no such track with type:" << getTrackString(track_type));
}
size_t RtspPlayer::getRecvSpeed() {
size_t ret = TcpClient::getRecvSpeed();
for (auto &rtp : _rtp_sock) {
if (rtp) {
ret += rtp->getRecvSpeed();
}
}
for (auto &rtcp : _rtcp_sock) {
if (rtcp) {
ret += rtcp->getRecvSpeed();
}
}
return ret;
}
size_t RtspPlayer::getRecvTotalBytes() {
size_t ret = TcpClient::getRecvTotalBytes();
for (auto &rtp : _rtp_sock) {
if (rtp) {
ret += rtp->getRecvTotalBytes();
}
}
for (auto &rtcp : _rtcp_sock) {
if (rtcp) {
ret += rtcp->getRecvTotalBytes();
}
}
return ret;
}
///////////////////////////////////////////////////
// RtspPlayerImp
float RtspPlayerImp::getDuration() const {

View File

@ -39,6 +39,9 @@ public:
void teardown() override;
float getPacketLossRate(TrackType type) const override;
size_t getRecvSpeed() override;
size_t getRecvTotalBytes() override;
protected:
// 派生类回调函数 [AUTO-TRANSLATED:61e20903]
// Derived class callback function
@ -117,6 +120,7 @@ protected:
private:
void onPlayResult_l(const toolkit::SockException &ex , bool handshake_done);
int getTrackIndexByPT(int pt) const;
int getTrackIndexByInterleaved(int interleaved) const;
int getTrackIndexByTrackType(TrackType track_type) const;
@ -155,9 +159,10 @@ private:
std::string _play_url;
// rtsp开始倍速 [AUTO-TRANSLATED:9ab84508]
// Rtsp start speed
float _speed= 0.0f;
float _speed = 0.0f;
std::vector<SdpTrack::Ptr> _sdp_track;
std::function<void(const Parser&)> _on_response;
protected:
// RTP端口,trackid idx 为数组下标 [AUTO-TRANSLATED:77c186bb]
// RTP port, trackid idx is the array subscript
toolkit::Socket::Ptr _rtp_sock[2];
@ -165,6 +170,7 @@ private:
// RTCP port, trackid idx is the array subscript
toolkit::Socket::Ptr _rtcp_sock[2];
private:
// rtsp鉴权相关 [AUTO-TRANSLATED:947dc6a3]
// Rtsp authentication related
std::string _md5_nonce;
@ -174,8 +180,10 @@ private:
uint32_t _cseq_send = 1;
std::string _content_base;
std::string _control_url;
protected:
Rtsp::eRtpType _rtp_type = Rtsp::RTP_TCP;
private:
// 当前rtp时间戳 [AUTO-TRANSLATED:410f2691]
// Current rtp timestamp
uint32_t _stamp[2] = {0, 0};

View File

@ -59,6 +59,36 @@ public:
std::vector<Track::Ptr> getTracks(bool ready = true) const override;
size_t getRecvSpeed() override {
size_t ret = TcpClient::getRecvSpeed();
for (auto &rtp : _rtp_sock) {
if (rtp) {
ret += rtp->getRecvSpeed();
}
}
for (auto &rtcp : _rtcp_sock) {
if (rtcp) {
ret += rtcp->getRecvSpeed();
}
}
return ret;
}
size_t getRecvTotalBytes() override {
size_t ret = TcpClient::getRecvTotalBytes();
for (auto &rtp : _rtp_sock) {
if (rtp) {
ret += rtp->getRecvTotalBytes();
}
}
for (auto &rtcp : _rtcp_sock) {
if (rtcp) {
ret += rtcp->getRecvTotalBytes();
}
}
return ret;
}
private:
// 派生类回调函数 [AUTO-TRANSLATED:61e20903]
// Derived class callback function

View File

@ -277,8 +277,8 @@ void RtspPusher::sendSetup(unsigned int track_idx) {
switch (_rtp_type) {
case Rtsp::RTP_TCP: {
sendRtspRequest("SETUP", control_url, {"Transport",
StrPrinter << "RTP/AVP/TCP;unicast;interleaved=" << track->_type * 2
<< "-" << track->_type * 2 + 1 << ";mode=record"});
StrPrinter << "RTP/AVP/TCP;unicast;interleaved=" << track_idx * 2
<< "-" << track_idx * 2 + 1 << ";mode=record"});
}
break;
case Rtsp::RTP_UDP: {
@ -595,5 +595,34 @@ void RtspPusher::sendRtspRequest(const string &cmd, const string &url,const StrC
SockSender::send(std::move(printer));
}
size_t RtspPusher::getSendSpeed() {
size_t ret = TcpClient::getSendSpeed();
for (auto &rtp : _rtp_sock) {
if (rtp) {
ret += rtp->getSendSpeed();
}
}
for (auto &rtcp : _rtcp_sock) {
if (rtcp) {
ret += rtcp->getSendSpeed();
}
}
return ret;
}
size_t RtspPusher::getSendTotalBytes() {
size_t ret = TcpClient::getSendTotalBytes();
for (auto &rtp : _rtp_sock) {
if (rtp) {
ret += rtp->getSendTotalBytes();
}
}
for (auto &rtcp : _rtcp_sock) {
if (rtcp) {
ret += rtcp->getSendTotalBytes();
}
}
return ret;
}
} /* namespace mediakit */

View File

@ -30,6 +30,8 @@ public:
~RtspPusher() override;
void publish(const std::string &url) override;
void teardown() override;
size_t getSendSpeed() override;
size_t getSendTotalBytes() override;
protected:
//for Tcpclient override

View File

@ -167,7 +167,9 @@ void RtspSession::onWholeRtspPacket(Parser &parser) {
void RtspSession::onRtpPacket(const char *data, size_t len) {
uint8_t interleaved = data[1];
if (interleaved % 2 == 0) {
auto track_idx = getTrackIndexByInterleaved(interleaved);
CHECK(len > RtpPacket::kRtpHeaderSize + RtpPacket::kRtpTcpHeaderSize);
RtpHeader *header = (RtpHeader *)(data + RtpPacket::kRtpTcpHeaderSize);
auto track_idx = getTrackIndexByPT(header->pt);
handleOneRtp(track_idx, _sdp_track[track_idx]->_type, _sdp_track[track_idx]->_samplerate, (uint8_t *) data + RtpPacket::kRtpTcpHeaderSize, len - RtpPacket::kRtpTcpHeaderSize);
} else {
auto track_idx = getTrackIndexByInterleaved(interleaved - 1);
@ -861,7 +863,7 @@ void RtspSession::handleReq_Play(const Parser &parser) {
_play_reader = play_src->getRing()->attach(getPoller(), use_gop);
_play_reader->setGetInfoCB([weak_self]() {
Any ret;
ret.set(static_pointer_cast<SockInfo>(weak_self.lock()));
ret.set(static_pointer_cast<Session>(weak_self.lock()));
return ret;
});
_play_reader->setDetachCB([weak_self]() {
@ -1124,6 +1126,18 @@ bool RtspSession::sendRtspResponse(const string &res_code, const std::initialize
return sendRtspResponse(res_code,header_map,sdp,protocol);
}
int RtspSession::getTrackIndexByPT(int pt) const {
for (size_t i = 0; i < _sdp_track.size(); ++i) {
if (pt == _sdp_track[i]->_pt) {
return i;
}
}
if (_sdp_track.size() == 1) {
return 0;
}
throw SockException(Err_shutdown, StrPrinter << "no such track with pt:" << pt);
}
int RtspSession::getTrackIndexByTrackType(TrackType type) {
for (size_t i = 0; i < _sdp_track.size(); ++i) {
if (type == _sdp_track[i]->_type) {

View File

@ -153,6 +153,7 @@ private:
void send_NotAcceptable();
// 获取track下标 [AUTO-TRANSLATED:36d0b2c2]
// Get the track index
int getTrackIndexByPT(int pt) const;
int getTrackIndexByTrackType(TrackType type);
int getTrackIndexByControlUrl(const std::string &control_url);
int getTrackIndexByInterleaved(int interleaved);

View File

@ -82,7 +82,7 @@ void SrtCaller::onConnect() {
auto peer_addr = SockUtil::make_sockaddr(_url._host.c_str(), (_url._port));
_socket = Socket::createSocket(_poller, false);
_socket->bindUdpSock(0, SockUtil::is_ipv4(_url._host.data()) ? "0.0.0.0" : "::");
_socket->bindUdpSock(0, SockUtil::is_ipv4(_url._host.data()) ? "0.0.0.0" : "::");
_socket->bindPeerAddr((struct sockaddr *)&peer_addr, 0, true);
weak_ptr<SrtCaller> weak_self = shared_from_this();
@ -1018,6 +1018,21 @@ size_t SrtCaller::getPayloadSize() {
return ret;
}
size_t SrtCaller::getRecvSpeed() const {
return _socket ? _socket->getRecvSpeed() : 0;
}
size_t SrtCaller::getRecvTotalBytes() const {
return _socket ? _socket->getRecvTotalBytes() : 0;
}
size_t SrtCaller::getSendSpeed() const {
return _socket ? _socket->getSendSpeed() : 0;
}
size_t SrtCaller::getSendTotalBytes() const {
return _socket ? _socket->getSendTotalBytes() : 0;
}
} /* namespace mediakit */

View File

@ -60,6 +60,11 @@ public:
virtual void inputSockData(uint8_t *buf, int len, struct sockaddr *addr);
virtual void onSendTSData(const SRT::Buffer::Ptr &buffer, bool flush);
size_t getRecvSpeed() const;
size_t getRecvTotalBytes() const;
size_t getSendSpeed() const;
size_t getSendTotalBytes() const;
protected:
virtual void onConnect();

View File

@ -120,6 +120,14 @@ std::string SrtPlayer::getPassphrase() {
return passPhrase;
}
size_t SrtPlayer::getRecvSpeed() {
return SrtCaller::getRecvSpeed();
}
size_t SrtPlayer::getRecvTotalBytes() {
return SrtCaller::getRecvTotalBytes();
}
///////////////////////////////////////////////////
// SrtPlayerImp
@ -164,6 +172,5 @@ void SrtPlayerImp::onSRTData(SRT::DataPacket::Ptr pkt) {
return;
}
} /* namespace mediakit */

View File

@ -38,6 +38,8 @@ public:
void teardown() override;
void pause(bool pause) override;
void speed(float speed) override;
size_t getRecvSpeed() override;
size_t getRecvTotalBytes() override;
protected:

View File

@ -112,5 +112,13 @@ void SrtPusher::doPublish() {
});
}
size_t SrtPusher::getSendSpeed() {
return SrtCaller::getSendSpeed();
}
size_t SrtPusher::getSendTotalBytes() {
return SrtCaller::getSendTotalBytes();
}
} /* namespace mediakit */

View File

@ -51,6 +51,9 @@ protected:
protected:
std::weak_ptr<TSMediaSource> _push_src;
TSMediaSource::RingType::RingReader::Ptr _ts_reader;
size_t getSendSpeed() override;
size_t getSendTotalBytes() override;
};
using SrtPusherImp = PusherImp<SrtPusher, PusherBase>;

View File

@ -253,7 +253,7 @@ void SrtTransportImp::doPlay() {
weak_ptr<Session> weak_session = strong_self->getSession();
strong_self->_ts_reader->setGetInfoCB([weak_session]() {
Any ret;
ret.set(static_pointer_cast<SockInfo>(weak_session.lock()));
ret.set(static_pointer_cast<Session>(weak_session.lock()));
return ret;
});
strong_self->_ts_reader->setDetachCB([weak_self]() {

View File

@ -205,7 +205,7 @@ void WebRtcPlayer::onStartWebRTC() {
weak_ptr<Session> weak_session = static_pointer_cast<Session>(getSession());
_reader->setGetInfoCB([weak_session]() {
Any ret;
ret.set(static_pointer_cast<SockInfo>(weak_session.lock()));
ret.set(static_pointer_cast<Session>(weak_session.lock()));
return ret;
});
_reader->setReadCB([weak_self](const RtspMediaSource::RingDataType &pkt) {