From c0a93f3c8f4c326281aa996ee59b060f5f1cb2c1 Mon Sep 17 00:00:00 2001 From: baigao-X <1007668733@qq.com> Date: Fri, 28 Feb 2025 12:46:43 +0800 Subject: [PATCH] Fix SrtCaller Crash problem (#4171) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1.Fix the crash problem when SrtPlayer reveives an Ack packet 2.Remove SrtCaller's Check on streamid format to make it compatible with other SRT streaming service。 3.Modify the coding format: replace tab to space --- src/Srt/SrtCaller.cpp | 296 ++++++++++++++---------------- src/Srt/SrtCaller.h | 396 ++++++++++++++++++++--------------------- src/Srt/SrtPlayer.cpp | 338 +++++++++++++++++------------------ src/Srt/SrtPlayer.h | 130 +++++++------- src/Srt/SrtPlayerImp.h | 102 +++++------ src/Srt/SrtPusher.cpp | 232 ++++++++++++------------ src/Srt/SrtPusher.h | 118 ++++++------ 7 files changed, 793 insertions(+), 819 deletions(-) diff --git a/src/Srt/SrtCaller.cpp b/src/Srt/SrtCaller.cpp index 16289fdf..3a1134b6 100644 --- a/src/Srt/SrtCaller.cpp +++ b/src/Srt/SrtCaller.cpp @@ -21,57 +21,38 @@ using namespace SRT; namespace mediakit { +//zlm play format //srt://127.0.0.1:9000?streamid=#!::r=live/test //srt://127.0.0.1:9000?streamid=#!::r=live/test,h=__defaultVhost__ +//zlm push format +//srt://127.0.0.1:9000?streamid=#!::r=live/test,m=publish +//srt://127.0.0.1:9000?streamid=#!::r=live/test,h=__defaultVhost__,m=publish void SrtUrl::parse(const string &strUrl) { - //DebugL << "url: " << strUrl; + //DebugL << "url: " << strUrl; _full_url = strUrl; auto url = strUrl; auto ip = findSubString(url.data(), "://", "?"); splitUrl(ip, _host, _port); - auto _params = findSubString(url.data(), "?" , NULL); + auto _params = findSubString(url.data(), "?" , NULL); auto kv = Parser::parseArgs(_params); auto it = kv.find("streamid"); - if (it != kv.end()) { - auto streamid = it->second; - if (!toolkit::start_with(streamid, "#!::")) { - return; - } - std::string real_streamid = streamid.substr(4); + if (it != kv.end()) { + auto streamid = it->second; + if (!toolkit::start_with(streamid, "#!::")) { + return; + } + _streamid = streamid; + } - auto params = Parser::parseArgs(real_streamid, ",", "="); - - for (auto iit : params) { - if (iit.first == "h") { - _vhost = iit.second; - } else if (iit.first == "r") { - auto tmps = toolkit::split(iit.second, "/"); - if (tmps.size() < 2) { - continue; - } - _app = tmps[0]; - _stream = tmps[1]; - } else { - //nop - } - } - - if (_vhost.empty()) { - _vhost = DEFAULT_VHOST; - } - } - - //TraceL << "ip: " << ip; - //TraceL << "_host: " << _host; - //TraceL << "_port: " << _port; - //TraceL << "_params: " << _params; - //TraceL << "_vhost: " << _vhost; - //TraceL << "_app: " << _app; - //TraceL << "_stream: " << _stream; - return; + //TraceL << "ip: " << ip; + //TraceL << "_host: " << _host; + //TraceL << "_port: " << _port; + //TraceL << "_params: " << _params; + //TraceL << "_streamid: " << _streamid; + return; } @@ -79,10 +60,10 @@ void SrtUrl::parse(const string &strUrl) { SrtCaller::SrtCaller(const toolkit::EventPoller::Ptr &poller) { _poller = poller ? std::move(poller) : EventPollerPool::Instance().getPoller(); _start_timestamp = SteadyClock::now(); - _socket_id = generateSocketId(); + _socket_id = generateSocketId(); - /* _init_seq_number = generateInitSeq(); */ - _init_seq_number = 0; + /* _init_seq_number = generateInitSeq(); */ + _init_seq_number = 0; _last_pkt_seq = _init_seq_number - 1; _pkt_recv_rate_context = std::make_shared(_start_timestamp); @@ -93,16 +74,16 @@ SrtCaller::SrtCaller(const toolkit::EventPoller::Ptr &poller) { } SrtCaller::~SrtCaller(void) { - DebugL; + DebugL; } void SrtCaller::onConnect() { - //DebugL; + //DebugL; - auto peer_addr = SockUtil::make_sockaddr(_url._host.c_str(), (_url._port)); - _socket = Socket::createSocket(_poller, false); + auto peer_addr = SockUtil::make_sockaddr(_url._host.c_str(), (_url._port)); + _socket = Socket::createSocket(_poller, false); _socket->bindUdpSock(0, SockUtil::is_ipv4(_url._host.data()) ? "0.0.0.0" : "::"); - _socket->bindPeerAddr((struct sockaddr *)&peer_addr, 0, true); + _socket->bindPeerAddr((struct sockaddr *)&peer_addr, 0, true); weak_ptr weak_self = shared_from_this(); _socket->setOnRead([weak_self](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) mutable { @@ -110,10 +91,10 @@ void SrtCaller::onConnect() { if (!strong_self) { return; } - strong_self->inputSockData((uint8_t*)buf->data(), buf->size(), addr); + strong_self->inputSockData((uint8_t*)buf->data(), buf->size(), addr); }); - doHandshake(); + doHandshake(); } void SrtCaller::onResult(const SockException &ex) { @@ -138,7 +119,7 @@ void SrtCaller::onResult(const SockException &ex) { void SrtCaller::onHandShakeFinished() { DebugL; - _is_handleshake_finished = true; + _is_handleshake_finished = true; if (_handleshake_timer) { _handleshake_timer.reset(); } @@ -165,7 +146,7 @@ void SrtCaller::onHandShakeFinished() { void SrtCaller::onSRTData(DataPacket::Ptr pkt) { InfoL; if (!isPlayer()) { - WarnL << "this is not a player data ignore"; + WarnL << "this is not a player data ignore"; return; } } @@ -215,7 +196,7 @@ void SrtCaller::onSendTSData(const Buffer::Ptr &buffer, bool flush) { } void SrtCaller::inputSockData(uint8_t *buf, int len, struct sockaddr *addr) { - //TraceL << hexdump((void*)buf, len); + //TraceL << hexdump((void*)buf, len); using srt_control_handler = void (SrtCaller::*)(uint8_t * buf, int len, struct sockaddr *addr); static std::unordered_map s_control_functions; @@ -277,16 +258,16 @@ void SrtCaller::doHandshake() { _crypto = std::make_shared(getPassphrase()); } - sendHandshakeInduction(); + sendHandshakeInduction(); return; } void SrtCaller::sendHandshakeInduction() { - DebugL; + DebugL; _induction_ts = SteadyClock::now(); - SRT::HandshakePacket::Ptr req = std::make_shared(); - req->timestamp = DurationCountMicroseconds(_induction_ts - _start_timestamp); + SRT::HandshakePacket::Ptr req = std::make_shared(); + req->timestamp = DurationCountMicroseconds(_induction_ts - _start_timestamp); req->dst_socket_id = 0; req->version = 4; @@ -299,11 +280,11 @@ void SrtCaller::sendHandshakeInduction() { req->srt_socket_id = _socket_id; req->syn_cookie = 0; - auto dataSenderAddr = SockUtil::make_sockaddr(_url._host.c_str(), _url._port); - req->assignPeerIPBE(&dataSenderAddr); + auto dataSenderAddr = SockUtil::make_sockaddr(_url._host.c_str(), _url._port); + req->assignPeerIPBE(&dataSenderAddr); req->storeToData(); - _handleshake_req = req; - sendControlPacket(req, true); + _handleshake_req = req; + sendControlPacket(req, true); std::weak_ptr weak_self = std::static_pointer_cast(shared_from_this()); _handleshake_timer = std::make_shared(0.2, [weak_self]()->bool{ @@ -323,10 +304,10 @@ void SrtCaller::sendHandshakeInduction() { } void SrtCaller::sendHandshakeConclusion() { - DebugL; + DebugL; - SRT::HandshakePacket::Ptr req = std::make_shared(); - req->timestamp = DurationCountMicroseconds(_now - _start_timestamp); + SRT::HandshakePacket::Ptr req = std::make_shared(); + req->timestamp = DurationCountMicroseconds(_now - _start_timestamp); req->dst_socket_id = 0; req->version = 5; @@ -345,13 +326,13 @@ void SrtCaller::sendHandshakeConclusion() { req->srt_socket_id = _socket_id; req->syn_cookie = _sync_cookie; - auto addr = SockUtil::make_sockaddr(_url._host.c_str(), _url._port); - req->assignPeerIPBE(&addr); + auto addr = SockUtil::make_sockaddr(_url._host.c_str(), _url._port); + req->assignPeerIPBE(&addr); - HSExtMessage::Ptr ext = std::make_shared(); - ext->extension_type = HSExt::SRT_CMD_HSREQ; - ext->srt_version = srtVersion(1, 5, 0); - ext->srt_flag = 0xbf; + HSExtMessage::Ptr ext = std::make_shared(); + ext->extension_type = HSExt::SRT_CMD_HSREQ; + ext->srt_version = srtVersion(1, 5, 0); + ext->srt_flag = 0xbf; // if set latency, use set value _delay = getLatency(); @@ -364,13 +345,13 @@ void SrtCaller::sendHandshakeConclusion() { } } - ext->recv_tsbpd_delay = _delay; - ext->send_tsbpd_delay = _delay; - req->ext_list.push_back(std::move(ext)); + ext->recv_tsbpd_delay = _delay; + ext->send_tsbpd_delay = _delay; + req->ext_list.push_back(std::move(ext)); - HSExtStreamID::Ptr extStreamId = std::make_shared(); - extStreamId->streamid = generateStreamId(); - req->ext_list.push_back(std::move(extStreamId)); + HSExtStreamID::Ptr extStreamId = std::make_shared(); + extStreamId->streamid = generateStreamId(); + req->ext_list.push_back(std::move(extStreamId)); if (_crypto) { HSExtKeyMaterial::Ptr keyMaterial = _crypto->generateKeyMaterialExt(HSExt::SRT_CMD_KMREQ); @@ -378,8 +359,8 @@ void SrtCaller::sendHandshakeConclusion() { } req->storeToData(); - _handleshake_req = req; - sendControlPacket(req); + _handleshake_req = req; + sendControlPacket(req); return; } @@ -491,7 +472,7 @@ void SrtCaller::sendMsgDropReq(uint32_t first, uint32_t last) { void SrtCaller::sendKeepLivePacket() { auto now = SteadyClock::now(); - SRT::KeepLivePacket::Ptr req = std::make_shared(); + SRT::KeepLivePacket::Ptr req = std::make_shared(); req->timestamp = SRT::DurationCountMicroseconds(now - _start_timestamp); req->dst_socket_id = _peer_socket_id; req->storeToData(); @@ -510,7 +491,7 @@ void SrtCaller::sendShutDown() { } void SrtCaller::tryAnnounceKeyMaterial() { - //TraceL; + //TraceL; if (!_crypto) { return; @@ -546,9 +527,9 @@ void SrtCaller::tryAnnounceKeyMaterial() { } void SrtCaller::sendControlPacket(SRT::ControlPacket::Ptr pkt, bool flush) { - //TraceL; + //TraceL; sendPacket(pkt, flush); - return; + return; } void SrtCaller::sendDataPacket(SRT::DataPacket::Ptr pkt, char *buf, int len, bool flush) { @@ -571,22 +552,22 @@ void SrtCaller::sendDataPacket(SRT::DataPacket::Ptr pkt, char *buf, int len, boo pkt->storeToData((uint8_t *)data, size); sendPacket(pkt, flush); _send_buf->inputPacket(pkt); - return; + return; } void SrtCaller::sendPacket(Buffer::Ptr pkt, bool flush) { - //TraceL << pkt->size(); + //TraceL << pkt->size(); auto tmp = _packet_pool.obtain2(); tmp->assign(pkt->data(), pkt->size()); - _socket->send(std::move(tmp), nullptr, 0, flush); + _socket->send(std::move(tmp), nullptr, 0, flush); _send_ticker.resetTime(); - return; + return; } void SrtCaller::handleHandshake(uint8_t *buf, int len, struct sockaddr *addr) { - //DebugL; - SRT::HandshakePacket pkt; + //DebugL; + SRT::HandshakePacket pkt; if(!pkt.loadFromData(buf, len)){ WarnL<< "is not vaild HandshakePacket"; return; @@ -610,96 +591,96 @@ void SrtCaller::handleHandshake(uint8_t *buf, int len, struct sockaddr *addr) { } void SrtCaller::handleHandshakeInduction(SRT::HandshakePacket &pkt, struct sockaddr *addr) { - DebugL; + DebugL; - if (!_handleshake_req) { - WarnL << "must Induction Phase for handleshake"; - return; - } + if (!_handleshake_req) { + WarnL << "must Induction Phase for handleshake"; + return; + } - if (_handleshake_req->handshake_type == HandshakePacket::HS_TYPE_CONCLUSION) { - WarnL << "should be Conclusion Phase for handleshake "; - return; - } else if (_handleshake_req->handshake_type != HandshakePacket::HS_TYPE_INDUCTION) { - WarnL <<"not reach this"; - return; - } + if (_handleshake_req->handshake_type == HandshakePacket::HS_TYPE_CONCLUSION) { + WarnL << "should be Conclusion Phase for handleshake "; + return; + } else if (_handleshake_req->handshake_type != HandshakePacket::HS_TYPE_INDUCTION) { + WarnL <<"not reach this"; + return; + } - // Induction Phase + // Induction Phase if (pkt.version != 5) { - WarnL << "not support handleshake version: " << pkt.version; - return; - } + WarnL << "not support handleshake version: " << pkt.version; + return; + } - if (pkt.extension_field != 0x4A17) { - WarnL << "not match SRT MAGIC"; - return; - } + if (pkt.extension_field != 0x4A17) { + WarnL << "not match SRT MAGIC"; + return; + } - if (pkt.dst_socket_id != _handleshake_req->srt_socket_id) { - WarnL << "not match _socket_id"; - return; - } + if (pkt.dst_socket_id != _handleshake_req->srt_socket_id) { + WarnL << "not match _socket_id"; + return; + } // TODO: encryption_field - _sync_cookie = pkt.syn_cookie; + _sync_cookie = pkt.syn_cookie; _mtu = std::min(pkt.mtu, _mtu); _max_flow_window_size = std::min(pkt.max_flow_window_size, _max_flow_window_size); - sendHandshakeConclusion(); + sendHandshakeConclusion(); return; } void SrtCaller::handleHandshakeConclusion(SRT::HandshakePacket &pkt, struct sockaddr *addr) { - DebugL; + DebugL; - if (!_handleshake_req) { - WarnL << "must Conclusion Phase for handleshake "; - return; - } + if (!_handleshake_req) { + WarnL << "must Conclusion Phase for handleshake "; + return; + } - if (_handleshake_req->handshake_type == HandshakePacket::HS_TYPE_INDUCTION) { - WarnL << "should be Conclusion Phase for handleshake "; - return; - } else if (_handleshake_req->handshake_type != HandshakePacket::HS_TYPE_CONCLUSION) { - WarnL <<"not reach this"; - return; - } + if (_handleshake_req->handshake_type == HandshakePacket::HS_TYPE_INDUCTION) { + WarnL << "should be Conclusion Phase for handleshake "; + return; + } else if (_handleshake_req->handshake_type != HandshakePacket::HS_TYPE_CONCLUSION) { + WarnL <<"not reach this"; + return; + } - // Conclusion Phase + // Conclusion Phase if (pkt.version != 5) { - WarnL << "not support handleshake version: " << pkt.version; - return; - } + WarnL << "not support handleshake version: " << pkt.version; + return; + } - if (pkt.dst_socket_id != _handleshake_req->srt_socket_id) { - WarnL << "not match _socket_id"; - return; - } + if (pkt.dst_socket_id != _handleshake_req->srt_socket_id) { + WarnL << "not match _socket_id"; + return; + } // TODO: encryption_field - _peer_socket_id = pkt.srt_socket_id; + _peer_socket_id = pkt.srt_socket_id; - HSExtMessage::Ptr resp; + HSExtMessage::Ptr resp; HSExtKeyMaterial::Ptr keyMaterial; - for (auto& ext : pkt.ext_list) { - if (!resp) { - resp = std::dynamic_pointer_cast(ext); - } + for (auto& ext : pkt.ext_list) { + if (!resp) { + resp = std::dynamic_pointer_cast(ext); + } if (!keyMaterial) { keyMaterial = std::dynamic_pointer_cast(ext); } - } + } - if (resp) { + if (resp) { _delay = std::max(_delay, resp->recv_tsbpd_delay); - //DebugL << "flag " << resp->srt_flag; - //DebugL << "recv_tsbpd_delay " << resp->recv_tsbpd_delay; - //DebugL << "send_tsbpd_delay " << resp->send_tsbpd_delay; - } + //DebugL << "flag " << resp->srt_flag; + //DebugL << "recv_tsbpd_delay " << resp->recv_tsbpd_delay; + //DebugL << "send_tsbpd_delay " << resp->send_tsbpd_delay; + } if (keyMaterial && _crypto) { _crypto->loadFromKeyMaterial(keyMaterial); @@ -715,7 +696,7 @@ void SrtCaller::handleHandshakeConclusion(SRT::HandshakePacket &pkt, struct sock } onHandShakeFinished(); - return; + return; } void SrtCaller::handleACK(uint8_t *buf, int len, struct sockaddr *addr) { @@ -730,7 +711,9 @@ void SrtCaller::handleACK(uint8_t *buf, int len, struct sockaddr *addr) { pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); pkt->ack_number = ack.ack_number; pkt->storeToData(); - _send_buf->drop(ack.last_ack_pkt_seq_number); + if (_send_buf) { + _send_buf->drop(ack.last_ack_pkt_seq_number); + } sendControlPacket(pkt, true); // TraceL<<"ack number "<(); - pkt->loadFromData(buf, len); + //TraceL; + DataPacket::Ptr pkt = std::make_shared(); + pkt->loadFromData(buf, len); if (_crypto) { auto payload = _crypto->decrypt(pkt, pkt->payloadData(), pkt->payloadSize()); @@ -906,10 +889,10 @@ void SrtCaller::handleDataPacket(uint8_t *buf, int len, struct sockaddr *addr) { pkt->reloadPayload((uint8_t*)payload->data(), payload->size()); } - _estimated_link_capacity_context->inputPacket(_now, pkt); + _estimated_link_capacity_context->inputPacket(_now, pkt); - std::list list; - _recv_buf->inputPacket(pkt, list); + std::list list; + _recv_buf->inputPacket(pkt, list); for (auto& data : list) { if (_last_pkt_seq + 1 != data->packet_seq_number) { TraceL << "pkt lost " << _last_pkt_seq + 1 << "->" << data->packet_seq_number; @@ -1008,14 +991,7 @@ float SrtCaller::getTimeOutSec() { }; std::string SrtCaller::generateStreamId() { - auto streamId = "#!::r=" + _url._app + "/" + _url._stream; - if (_url._vhost != DEFAULT_VHOST) { - streamId += ",h=" +_url._vhost; - } - if (!isPlayer()) { - streamId += ",m=publish"; - } - return streamId; + return _url._streamid; }; uint32_t SrtCaller::generateSocketId() { diff --git a/src/Srt/SrtCaller.h b/src/Srt/SrtCaller.h index 84aa8089..08b20ff7 100644 --- a/src/Srt/SrtCaller.h +++ b/src/Srt/SrtCaller.h @@ -1,199 +1,197 @@ -/* - * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. - * - * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). - * - * Use of this source code is governed by MIT-like license that can be found in the - * LICENSE file in the root of the source tree. All contributing project authors - * may be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef ZLMEDIAKIT_SRTCALLER_H -#define ZLMEDIAKIT_SRTCALLER_H - -//srt -#include "srt/Packet.hpp" -#include "srt/Crypto.hpp" -#include "srt/PacketQueue.hpp" -#include "srt/PacketSendQueue.hpp" -#include "srt/Statistic.hpp" - -#include "Poller/EventPoller.h" -#include "Network/Socket.h" -#include "Poller/Timer.h" -#include "Util/TimeTicker.h" -#include "Common/MultiMediaSourceMuxer.h" -#include "Rtp/Decoder.h" -#include "TS/TSMediaSource.h" -#include -#include - - -namespace mediakit { - -// 解析srt 信令url的工具类 -class SrtUrl { -public: - std::string _full_url; - std::string _params; - std::string _host; - uint16_t _port; - std::string _vhost; - std::string _app; - std::string _stream; - -public: - void parse(const std::string &url); -}; - -// 实现了webrtc代理拉流功能 -class SrtCaller : public std::enable_shared_from_this{ -public: - using Ptr = std::shared_ptr; - - using SteadyClock = std::chrono::steady_clock; - using TimePoint = std::chrono::time_point; - - SrtCaller(const toolkit::EventPoller::Ptr &poller); - virtual ~SrtCaller(); - - const toolkit::EventPoller::Ptr &getPoller() const {return _poller;} - - virtual void inputSockData(uint8_t *buf, int len, struct sockaddr *addr); - virtual void onSendTSData(const SRT::Buffer::Ptr &buffer, bool flush); - -protected: - - virtual void onConnect(); - virtual void onHandShakeFinished(); - virtual void onResult(const toolkit::SockException &ex); - - virtual void onSRTData(SRT::DataPacket::Ptr pkt); - - virtual uint16_t getLatency() = 0; - virtual int getLatencyMul(); - virtual int getPktBufSize(); - virtual float getTimeOutSec(); - - virtual bool isPlayer() = 0; - -private: - void doHandshake(); - - void sendHandshakeInduction(); - void sendHandshakeConclusion(); - void sendACKPacket(); - void sendLightACKPacket(); - void sendNAKPacket(std::list &lost_list); - void sendMsgDropReq(uint32_t first, uint32_t last); - void sendKeepLivePacket(); - void sendShutDown(); - void tryAnnounceKeyMaterial(); - void sendControlPacket(SRT::ControlPacket::Ptr pkt, bool flush = true); - void sendDataPacket(SRT::DataPacket::Ptr pkt, char *buf, int len, bool flush = false); - void sendPacket(toolkit::Buffer::Ptr pkt, bool flush); - - void handleHandshake(uint8_t *buf, int len, struct sockaddr *addr); - void handleHandshakeInduction(SRT::HandshakePacket &pkt, struct sockaddr *addr); - void handleHandshakeConclusion(SRT::HandshakePacket &pkt, struct sockaddr *addr); - void handleACK(uint8_t *buf, int len, struct sockaddr *addr); - void handleACKACK(uint8_t *buf, int len, struct sockaddr *addr); - void handleNAK(uint8_t *buf, int len, struct sockaddr *addr); - void handleDropReq(uint8_t *buf, int len, struct sockaddr *addr); - void handleKeeplive(uint8_t *buf, int len, struct sockaddr *addr); - void handleShutDown(uint8_t *buf, int len, struct sockaddr *addr); - void handlePeerError(uint8_t *buf, int len, struct sockaddr *addr); - void handleCongestionWarning(uint8_t *buf, int len, struct sockaddr *addr); - void handleUserDefinedType(uint8_t *buf, int len, struct sockaddr *addr); - void handleDataPacket(uint8_t *buf, int len, struct sockaddr *addr); - void handleKeyMaterialReqPacket(uint8_t *buf, int len, struct sockaddr *addr); - void handleKeyMaterialRspPacket(uint8_t *buf, int len, struct sockaddr *addr); - - void checkAndSendAckNak(); - void createTimerForCheckAlive(); - - std::string generateStreamId(); - uint32_t generateSocketId(); - int32_t generateInitSeq(); - size_t getPayloadSize(); - - virtual std::string getPassphrase() = 0; - -protected: - SrtUrl _url; - toolkit::EventPoller::Ptr _poller; - - bool _is_handleshake_finished = false; - -private: - toolkit::Socket::Ptr _socket; - - TimePoint _now; - TimePoint _start_timestamp; - // for calculate rtt for delay - TimePoint _induction_ts; - - //the initial value of RTT is 100 milliseconds - //RTTVar is 50 milliseconds - uint32_t _rtt = 100 * 1000; - uint32_t _rtt_variance = 50 * 1000; - - //local - uint32_t _socket_id = 0; - uint32_t _init_seq_number = 0; - uint32_t _mtu = 1500; - uint32_t _max_flow_window_size = 8192; - uint16_t _delay = 120; - - //peer - uint32_t _sync_cookie = 0; - uint32_t _peer_socket_id; - - // for handshake - SRT::Timer::Ptr _handleshake_timer; - SRT::HandshakePacket::Ptr _handleshake_req; - - // for keeplive - SRT::Ticker _send_ticker; - SRT::Timer::Ptr _keeplive_timer; - - // for alive - SRT::Ticker _alive_ticker; - SRT::Timer::Ptr _alive_timer; - - // for recv - SRT::PacketQueueInterface::Ptr _recv_buf; - uint32_t _last_pkt_seq = 0; - - // Ack - SRT::UTicker _ack_ticker; - uint32_t _last_ack_pkt_seq = 0; - uint32_t _light_ack_pkt_count = 0; - uint32_t _ack_number_count = 0; - std::map _ack_send_timestamp; - // Full Ack - // Link Capacity and Receiving Rate Estimation - std::shared_ptr _pkt_recv_rate_context; - std::shared_ptr _estimated_link_capacity_context; - - // Nak - SRT::UTicker _nak_ticker; - - //for Send - SRT::PacketSendQueue::Ptr _send_buf; - SRT::ResourcePool _packet_pool; - uint32_t _send_packet_seq_number = 0; - uint32_t _send_msg_number = 1; - - //AckAck - uint32_t _last_recv_ackack_seq_num = 0; - - // for encryption - SRT::Crypto::Ptr _crypto; - SRT::Timer::Ptr _announce_timer; - SRT::KeyMaterialPacket::Ptr _announce_req; -}; - -} /* namespace mediakit */ -#endif /* ZLMEDIAKIT_SRTCALLER_H */ - +/* + * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). + * + * Use of this source code is governed by MIT-like license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_SRTCALLER_H +#define ZLMEDIAKIT_SRTCALLER_H + +//srt +#include "srt/Packet.hpp" +#include "srt/Crypto.hpp" +#include "srt/PacketQueue.hpp" +#include "srt/PacketSendQueue.hpp" +#include "srt/Statistic.hpp" + +#include "Poller/EventPoller.h" +#include "Network/Socket.h" +#include "Poller/Timer.h" +#include "Util/TimeTicker.h" +#include "Common/MultiMediaSourceMuxer.h" +#include "Rtp/Decoder.h" +#include "TS/TSMediaSource.h" +#include +#include + + +namespace mediakit { + +// 解析srt 信令url的工具类 +class SrtUrl { +public: + std::string _full_url; + std::string _params; + std::string _host; + uint16_t _port; + std::string _streamid; + +public: + void parse(const std::string &url); +}; + +// 实现了webrtc代理拉流功能 +class SrtCaller : public std::enable_shared_from_this{ +public: + using Ptr = std::shared_ptr; + + using SteadyClock = std::chrono::steady_clock; + using TimePoint = std::chrono::time_point; + + SrtCaller(const toolkit::EventPoller::Ptr &poller); + virtual ~SrtCaller(); + + const toolkit::EventPoller::Ptr &getPoller() const {return _poller;} + + virtual void inputSockData(uint8_t *buf, int len, struct sockaddr *addr); + virtual void onSendTSData(const SRT::Buffer::Ptr &buffer, bool flush); + +protected: + + virtual void onConnect(); + virtual void onHandShakeFinished(); + virtual void onResult(const toolkit::SockException &ex); + + virtual void onSRTData(SRT::DataPacket::Ptr pkt); + + virtual uint16_t getLatency() = 0; + virtual int getLatencyMul(); + virtual int getPktBufSize(); + virtual float getTimeOutSec(); + + virtual bool isPlayer() = 0; + +private: + void doHandshake(); + + void sendHandshakeInduction(); + void sendHandshakeConclusion(); + void sendACKPacket(); + void sendLightACKPacket(); + void sendNAKPacket(std::list &lost_list); + void sendMsgDropReq(uint32_t first, uint32_t last); + void sendKeepLivePacket(); + void sendShutDown(); + void tryAnnounceKeyMaterial(); + void sendControlPacket(SRT::ControlPacket::Ptr pkt, bool flush = true); + void sendDataPacket(SRT::DataPacket::Ptr pkt, char *buf, int len, bool flush = false); + void sendPacket(toolkit::Buffer::Ptr pkt, bool flush); + + void handleHandshake(uint8_t *buf, int len, struct sockaddr *addr); + void handleHandshakeInduction(SRT::HandshakePacket &pkt, struct sockaddr *addr); + void handleHandshakeConclusion(SRT::HandshakePacket &pkt, struct sockaddr *addr); + void handleACK(uint8_t *buf, int len, struct sockaddr *addr); + void handleACKACK(uint8_t *buf, int len, struct sockaddr *addr); + void handleNAK(uint8_t *buf, int len, struct sockaddr *addr); + void handleDropReq(uint8_t *buf, int len, struct sockaddr *addr); + void handleKeeplive(uint8_t *buf, int len, struct sockaddr *addr); + void handleShutDown(uint8_t *buf, int len, struct sockaddr *addr); + void handlePeerError(uint8_t *buf, int len, struct sockaddr *addr); + void handleCongestionWarning(uint8_t *buf, int len, struct sockaddr *addr); + void handleUserDefinedType(uint8_t *buf, int len, struct sockaddr *addr); + void handleDataPacket(uint8_t *buf, int len, struct sockaddr *addr); + void handleKeyMaterialReqPacket(uint8_t *buf, int len, struct sockaddr *addr); + void handleKeyMaterialRspPacket(uint8_t *buf, int len, struct sockaddr *addr); + + void checkAndSendAckNak(); + void createTimerForCheckAlive(); + + std::string generateStreamId(); + uint32_t generateSocketId(); + int32_t generateInitSeq(); + size_t getPayloadSize(); + + virtual std::string getPassphrase() = 0; + +protected: + SrtUrl _url; + toolkit::EventPoller::Ptr _poller; + + bool _is_handleshake_finished = false; + +private: + toolkit::Socket::Ptr _socket; + + TimePoint _now; + TimePoint _start_timestamp; + // for calculate rtt for delay + TimePoint _induction_ts; + + //the initial value of RTT is 100 milliseconds + //RTTVar is 50 milliseconds + uint32_t _rtt = 100 * 1000; + uint32_t _rtt_variance = 50 * 1000; + + //local + uint32_t _socket_id = 0; + uint32_t _init_seq_number = 0; + uint32_t _mtu = 1500; + uint32_t _max_flow_window_size = 8192; + uint16_t _delay = 120; + + //peer + uint32_t _sync_cookie = 0; + uint32_t _peer_socket_id; + + // for handshake + SRT::Timer::Ptr _handleshake_timer; + SRT::HandshakePacket::Ptr _handleshake_req; + + // for keeplive + SRT::Ticker _send_ticker; + SRT::Timer::Ptr _keeplive_timer; + + // for alive + SRT::Ticker _alive_ticker; + SRT::Timer::Ptr _alive_timer; + + // for recv + SRT::PacketQueueInterface::Ptr _recv_buf; + uint32_t _last_pkt_seq = 0; + + // Ack + SRT::UTicker _ack_ticker; + uint32_t _last_ack_pkt_seq = 0; + uint32_t _light_ack_pkt_count = 0; + uint32_t _ack_number_count = 0; + std::map _ack_send_timestamp; + // Full Ack + // Link Capacity and Receiving Rate Estimation + std::shared_ptr _pkt_recv_rate_context; + std::shared_ptr _estimated_link_capacity_context; + + // Nak + SRT::UTicker _nak_ticker; + + //for Send + SRT::PacketSendQueue::Ptr _send_buf; + SRT::ResourcePool _packet_pool; + uint32_t _send_packet_seq_number = 0; + uint32_t _send_msg_number = 1; + + //AckAck + uint32_t _last_recv_ackack_seq_num = 0; + + // for encryption + SRT::Crypto::Ptr _crypto; + SRT::Timer::Ptr _announce_timer; + SRT::KeyMaterialPacket::Ptr _announce_req; +}; + +} /* namespace mediakit */ +#endif /* ZLMEDIAKIT_SRTCALLER_H */ + diff --git a/src/Srt/SrtPlayer.cpp b/src/Srt/SrtPlayer.cpp index ea20cd77..ebb7676e 100644 --- a/src/Srt/SrtPlayer.cpp +++ b/src/Srt/SrtPlayer.cpp @@ -1,169 +1,169 @@ -/* - * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. - * - * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). - * - * Use of this source code is governed by MIT-like license that can be found in the - * LICENSE file in the root of the source tree. All contributing project authors - * may be found in the AUTHORS file in the root of the source tree. - */ - -#include "SrtPlayer.h" -#include "SrtPlayerImp.h" -#include "Common/config.h" -#include "Http/HlsPlayer.h" - -using namespace toolkit; -using namespace std; - -namespace mediakit { - - -SrtPlayer::SrtPlayer(const EventPoller::Ptr &poller) - : SrtCaller(poller) { - DebugL; -} - -SrtPlayer::~SrtPlayer(void) { - DebugL; -} - -void SrtPlayer::play(const string &strUrl) { - DebugL; - try { - _url.parse(strUrl); - } catch (std::exception &ex) { - onResult(SockException(Err_other, StrPrinter << "illegal srt url:" << ex.what())); - return; - } - onConnect(); - return; -} - -void SrtPlayer::teardown() { - SrtCaller::onResult(SockException(Err_other, StrPrinter << "teardown: " << _url._full_url)); -} - -void SrtPlayer::pause(bool bPause) { - DebugL; -} - -void SrtPlayer::speed(float speed) { - DebugL; -} - -void SrtPlayer::onHandShakeFinished() { - SrtCaller::onHandShakeFinished(); - onResult(SockException(Err_success, "srt play success")); -} - -void SrtPlayer::onResult(const SockException &ex) { - SrtCaller::onResult(ex); - - if (!ex) { - // 播放成功 - onPlayResult(ex); - _benchmark_mode = (*this)[Client::kBenchmarkMode].as(); - - // 播放成功,恢复数据包接收超时定时器 - _recv_ticker.resetTime(); - auto timeout = getTimeOutSec(); - //读取配置文件 - weak_ptr weakSelf = static_pointer_cast(shared_from_this()); - // 创建rtp数据接收超时检测定时器 - _check_timer = std::make_shared(timeout /2, - [weakSelf, timeout]() { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { - return false; - } - if (strongSelf->_recv_ticker.elapsedTime() > timeout * 1000) { - // 接收媒体数据包超时 - strongSelf->onResult(SockException(Err_timeout, "receive srt media data timeout:" + strongSelf->_url._full_url)); - return false; - } - - return true; - }, getPoller()); - } else { - WarnL << ex.getErrCode() << " " << ex.what(); - if (ex.getErrCode() == Err_shutdown) { - // 主动shutdown的,不触发回调 - return; - } - if (!_is_handleshake_finished) { - onPlayResult(ex); - } else { - onShutdown(ex); - } - } - return; -} - - -void SrtPlayer::onSRTData(SRT::DataPacket::Ptr pkt) { - _recv_ticker.resetTime(); -} - -uint16_t SrtPlayer::getLatency() { - auto latency = (*this)[Client::kLatency].as(); - return (uint16_t)latency ; -} - -float SrtPlayer::getTimeOutSec() { - auto timeoutMS = (*this)[Client::kTimeoutMS].as(); - return (float)timeoutMS / (float)1000; -} - -std::string SrtPlayer::getPassphrase() { - auto passPhrase = (*this)[Client::kPassPhrase].as(); - return passPhrase; -} - -/////////////////////////////////////////////////// -// SrtPlayerImp - -void SrtPlayerImp::onPlayResult(const toolkit::SockException &ex) { - if (ex) { - Super::onPlayResult(ex); - } - //success result only occur when addTrackCompleted - return; -} - -std::vector SrtPlayerImp::getTracks(bool ready /*= true*/) const { - return _demuxer ? static_pointer_cast(_demuxer)->getTracks(ready) : Super::getTracks(ready); -} - -void SrtPlayerImp::addTrackCompleted() { - Super::onPlayResult(toolkit::SockException(toolkit::Err_success, "play success")); -} - -void SrtPlayerImp::onSRTData(SRT::DataPacket::Ptr pkt) { - SrtPlayer::onSRTData(pkt); - - if (_benchmark_mode) { - return; - } - - auto strong_self = shared_from_this(); - if (!_demuxer) { - auto demuxer = std::make_shared(); - demuxer->start(getPoller(), this); - _demuxer = std::move(demuxer); - } - - if (!_decoder && _demuxer) { - _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, _demuxer.get()); - } - - if (_decoder && _demuxer) { - _decoder->input(reinterpret_cast(pkt->payloadData()), pkt->payloadSize()); - } - - return; -} - - -} /* namespace mediakit */ - +/* + * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). + * + * Use of this source code is governed by MIT-like license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "SrtPlayer.h" +#include "SrtPlayerImp.h" +#include "Common/config.h" +#include "Http/HlsPlayer.h" + +using namespace toolkit; +using namespace std; + +namespace mediakit { + + +SrtPlayer::SrtPlayer(const EventPoller::Ptr &poller) + : SrtCaller(poller) { + DebugL; +} + +SrtPlayer::~SrtPlayer(void) { + DebugL; +} + +void SrtPlayer::play(const string &strUrl) { + DebugL; + try { + _url.parse(strUrl); + } catch (std::exception &ex) { + onResult(SockException(Err_other, StrPrinter << "illegal srt url:" << ex.what())); + return; + } + onConnect(); + return; +} + +void SrtPlayer::teardown() { + SrtCaller::onResult(SockException(Err_other, StrPrinter << "teardown: " << _url._full_url)); +} + +void SrtPlayer::pause(bool bPause) { + DebugL; +} + +void SrtPlayer::speed(float speed) { + DebugL; +} + +void SrtPlayer::onHandShakeFinished() { + SrtCaller::onHandShakeFinished(); + onResult(SockException(Err_success, "srt play success")); +} + +void SrtPlayer::onResult(const SockException &ex) { + SrtCaller::onResult(ex); + + if (!ex) { + // 播放成功 + onPlayResult(ex); + _benchmark_mode = (*this)[Client::kBenchmarkMode].as(); + + // 播放成功,恢复数据包接收超时定时器 + _recv_ticker.resetTime(); + auto timeout = getTimeOutSec(); + //读取配置文件 + weak_ptr weakSelf = static_pointer_cast(shared_from_this()); + // 创建rtp数据接收超时检测定时器 + _check_timer = std::make_shared(timeout /2, + [weakSelf, timeout]() { + auto strongSelf = weakSelf.lock(); + if (!strongSelf) { + return false; + } + if (strongSelf->_recv_ticker.elapsedTime() > timeout * 1000) { + // 接收媒体数据包超时 + strongSelf->onResult(SockException(Err_timeout, "receive srt media data timeout:" + strongSelf->_url._full_url)); + return false; + } + + return true; + }, getPoller()); + } else { + WarnL << ex.getErrCode() << " " << ex.what(); + if (ex.getErrCode() == Err_shutdown) { + // 主动shutdown的,不触发回调 + return; + } + if (!_is_handleshake_finished) { + onPlayResult(ex); + } else { + onShutdown(ex); + } + } + return; +} + + +void SrtPlayer::onSRTData(SRT::DataPacket::Ptr pkt) { + _recv_ticker.resetTime(); +} + +uint16_t SrtPlayer::getLatency() { + auto latency = (*this)[Client::kLatency].as(); + return (uint16_t)latency ; +} + +float SrtPlayer::getTimeOutSec() { + auto timeoutMS = (*this)[Client::kTimeoutMS].as(); + return (float)timeoutMS / (float)1000; +} + +std::string SrtPlayer::getPassphrase() { + auto passPhrase = (*this)[Client::kPassPhrase].as(); + return passPhrase; +} + +/////////////////////////////////////////////////// +// SrtPlayerImp + +void SrtPlayerImp::onPlayResult(const toolkit::SockException &ex) { + if (ex) { + Super::onPlayResult(ex); + } + //success result only occur when addTrackCompleted + return; +} + +std::vector SrtPlayerImp::getTracks(bool ready /*= true*/) const { + return _demuxer ? static_pointer_cast(_demuxer)->getTracks(ready) : Super::getTracks(ready); +} + +void SrtPlayerImp::addTrackCompleted() { + Super::onPlayResult(toolkit::SockException(toolkit::Err_success, "play success")); +} + +void SrtPlayerImp::onSRTData(SRT::DataPacket::Ptr pkt) { + SrtPlayer::onSRTData(pkt); + + if (_benchmark_mode) { + return; + } + + auto strong_self = shared_from_this(); + if (!_demuxer) { + auto demuxer = std::make_shared(); + demuxer->start(getPoller(), this); + _demuxer = std::move(demuxer); + } + + if (!_decoder && _demuxer) { + _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, _demuxer.get()); + } + + if (_decoder && _demuxer) { + _decoder->input(reinterpret_cast(pkt->payloadData()), pkt->payloadSize()); + } + + return; +} + + +} /* namespace mediakit */ + diff --git a/src/Srt/SrtPlayer.h b/src/Srt/SrtPlayer.h index 23d206ca..710ad237 100644 --- a/src/Srt/SrtPlayer.h +++ b/src/Srt/SrtPlayer.h @@ -1,65 +1,65 @@ -/* - * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. - * - * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). - * - * Use of this source code is governed by MIT-like license that can be found in the - * LICENSE file in the root of the source tree. All contributing project authors - * may be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef ZLMEDIAKIT_SRTPLAYER_H -#define ZLMEDIAKIT_SRTPLAYER_H - -#include "Network/Socket.h" -#include "Player/PlayerBase.h" -#include "Poller/Timer.h" -#include "Util/TimeTicker.h" -#include "srt/SrtTransport.hpp" -#include "Http/HttpRequester.h" -#include -#include -#include "SrtCaller.h" - -namespace mediakit { - - -// 实现了srt代理拉流功能 -class SrtPlayer - : public PlayerBase , public SrtCaller { -public: - using Ptr = std::shared_ptr; - - SrtPlayer(const toolkit::EventPoller::Ptr &poller); - ~SrtPlayer() override; - - //// PlayerBase override//// - void play(const std::string &strUrl) override; - void teardown() override; - void pause(bool pause) override; - void speed(float speed) override; - -protected: - - //// SrtCaller override//// - void onHandShakeFinished() override; - void onSRTData(SRT::DataPacket::Ptr pkt) override; - void onResult(const toolkit::SockException &ex) override; - - bool isPlayer() override {return true;} - - uint16_t getLatency() override; - float getTimeOutSec() override; - std::string getPassphrase() override; - -protected: - //是否为性能测试模式 - bool _benchmark_mode = false; - - //超时功能实现 - toolkit::Ticker _recv_ticker; - std::shared_ptr _check_timer; -}; - -} /* namespace mediakit */ -#endif /* ZLMEDIAKIT_SRTPLAYER_H */ +/* + * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). + * + * Use of this source code is governed by MIT-like license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_SRTPLAYER_H +#define ZLMEDIAKIT_SRTPLAYER_H + +#include "Network/Socket.h" +#include "Player/PlayerBase.h" +#include "Poller/Timer.h" +#include "Util/TimeTicker.h" +#include "srt/SrtTransport.hpp" +#include "Http/HttpRequester.h" +#include +#include +#include "SrtCaller.h" + +namespace mediakit { + + +// 实现了srt代理拉流功能 +class SrtPlayer + : public PlayerBase , public SrtCaller { +public: + using Ptr = std::shared_ptr; + + SrtPlayer(const toolkit::EventPoller::Ptr &poller); + ~SrtPlayer() override; + + //// PlayerBase override//// + void play(const std::string &strUrl) override; + void teardown() override; + void pause(bool pause) override; + void speed(float speed) override; + +protected: + + //// SrtCaller override//// + void onHandShakeFinished() override; + void onSRTData(SRT::DataPacket::Ptr pkt) override; + void onResult(const toolkit::SockException &ex) override; + + bool isPlayer() override {return true;} + + uint16_t getLatency() override; + float getTimeOutSec() override; + std::string getPassphrase() override; + +protected: + //是否为性能测试模式 + bool _benchmark_mode = false; + + //超时功能实现 + toolkit::Ticker _recv_ticker; + std::shared_ptr _check_timer; +}; + +} /* namespace mediakit */ +#endif /* ZLMEDIAKIT_SRTPLAYER_H */ diff --git a/src/Srt/SrtPlayerImp.h b/src/Srt/SrtPlayerImp.h index 0828fe5e..620612b6 100644 --- a/src/Srt/SrtPlayerImp.h +++ b/src/Srt/SrtPlayerImp.h @@ -1,51 +1,51 @@ -/* - * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. - * - * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). - * - * Use of this source code is governed by MIT-like license that can be found in the - * LICENSE file in the root of the source tree. All contributing project authors - * may be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef ZLMEDIAKIT_SRtPLAYERIMP_H -#define ZLMEDIAKIT_SRtPLAYERIMP_H - -#include "SrtPlayer.h" - -namespace mediakit { - -class SrtPlayerImp - : public PlayerImp - , private TrackListener { -public: - using Ptr = std::shared_ptr; - using Super = PlayerImp; - - SrtPlayerImp(const toolkit::EventPoller::Ptr &poller) : Super(poller) {} - ~SrtPlayerImp() override { DebugL; } - -private: - //// SrtPlayer override//// - void onSRTData(SRT::DataPacket::Ptr pkt) override; - - //// PlayerBase override//// - void onPlayResult(const toolkit::SockException &ex) override; - std::vector getTracks(bool ready = true) const override; - -private: - //// TrackListener override//// - bool addTrack(const Track::Ptr &track) override { return true; } - void addTrackCompleted() override; - -private: - // for player - DecoderImp::Ptr _decoder; - MediaSinkInterface::Ptr _demuxer; - - // for pusher - TSMediaSource::RingType::RingReader::Ptr _ts_reader; -}; - -} /* namespace mediakit */ -#endif /* ZLMEDIAKIT_SRtPLAYERIMP_H */ +/* + * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). + * + * Use of this source code is governed by MIT-like license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_SRtPLAYERIMP_H +#define ZLMEDIAKIT_SRtPLAYERIMP_H + +#include "SrtPlayer.h" + +namespace mediakit { + +class SrtPlayerImp + : public PlayerImp + , private TrackListener { +public: + using Ptr = std::shared_ptr; + using Super = PlayerImp; + + SrtPlayerImp(const toolkit::EventPoller::Ptr &poller) : Super(poller) {} + ~SrtPlayerImp() override { DebugL; } + +private: + //// SrtPlayer override//// + void onSRTData(SRT::DataPacket::Ptr pkt) override; + + //// PlayerBase override//// + void onPlayResult(const toolkit::SockException &ex) override; + std::vector getTracks(bool ready = true) const override; + +private: + //// TrackListener override//// + bool addTrack(const Track::Ptr &track) override { return true; } + void addTrackCompleted() override; + +private: + // for player + DecoderImp::Ptr _decoder; + MediaSinkInterface::Ptr _demuxer; + + // for pusher + TSMediaSource::RingType::RingReader::Ptr _ts_reader; +}; + +} /* namespace mediakit */ +#endif /* ZLMEDIAKIT_SRtPLAYERIMP_H */ diff --git a/src/Srt/SrtPusher.cpp b/src/Srt/SrtPusher.cpp index 73e2e501..9cbffed0 100644 --- a/src/Srt/SrtPusher.cpp +++ b/src/Srt/SrtPusher.cpp @@ -1,116 +1,116 @@ -/* - * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. - * - * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). - * - * Use of this source code is governed by MIT-like license that can be found in the - * LICENSE file in the root of the source tree. All contributing project authors - * may be found in the AUTHORS file in the root of the source tree. - */ - -#include "SrtPusher.h" -#include "Common/config.h" - -using namespace toolkit; -using namespace std; -namespace mediakit { - -SrtPusher::SrtPusher(const EventPoller::Ptr &poller, const TSMediaSource::Ptr &src) : SrtCaller(poller) { - _push_src = src; - DebugL; -} - -SrtPusher::~SrtPusher(void) { - DebugL; -} - -void SrtPusher::publish(const string &strUrl) { - DebugL; - try { - _url.parse(strUrl); - } catch (std::exception &ex) { - onResult(SockException(Err_other, StrPrinter << "illegal srt url:" << ex.what())); - return; - } - onConnect(); - return; -} - -void SrtPusher::teardown() { - SrtCaller::onResult(SockException(Err_other, StrPrinter << "teardown: " << _url._full_url)); -} - -void SrtPusher::onHandShakeFinished() { - SrtCaller::onHandShakeFinished(); - onResult(SockException(Err_success, "srt push success")); - doPublish(); -} - -void SrtPusher::onResult(const SockException &ex) { - SrtCaller::onResult(ex); - - if (!ex) { - onPublishResult(ex); - } else { - WarnL << ex.getErrCode() << " " << ex.what(); - if (ex.getErrCode() == Err_shutdown) { - // 主动shutdown的,不触发回调 - return; - } - if (!_is_handleshake_finished) { - onPublishResult(ex); - } else { - onShutdown(ex); - } - } - return; -} - -uint16_t SrtPusher::getLatency() { - auto latency = (*this)[Client::kLatency].as(); - return (uint16_t)latency ; -} - -float SrtPusher::getTimeOutSec() { - auto timeoutMS = (*this)[Client::kTimeoutMS].as(); - return (float)timeoutMS / (float)1000; -} - -std::string SrtPusher::getPassphrase() { - auto passPhrase = (*this)[Client::kPassPhrase].as(); - return passPhrase; -} - -void SrtPusher::doPublish() { - auto src = _push_src.lock(); - if (!src) { - onResult(SockException(Err_eof, "the media source was released")); - return; - } - // 异步查找直播流 - std::weak_ptr weak_self = static_pointer_cast(shared_from_this()); - _ts_reader = src->getRing()->attach(getPoller()); - _ts_reader->setDetachCB([weak_self]() { - auto strong_self = weak_self.lock(); - if (!strong_self) { - // 本对象已经销毁 - return; - } - strong_self->onShutdown(SockException(Err_shutdown)); - }); - _ts_reader->setReadCB([weak_self](const TSMediaSource::RingDataType &ts_list) { - auto strong_self = weak_self.lock(); - if (!strong_self) { - // 本对象已经销毁 - return; - } - size_t i = 0; - auto size = ts_list->size(); - ts_list->for_each([&](const TSPacket::Ptr &ts) { - strong_self->onSendTSData(ts, ++i == size); - }); - }); -} - -} /* namespace mediakit */ - +/* + * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). + * + * Use of this source code is governed by MIT-like license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "SrtPusher.h" +#include "Common/config.h" + +using namespace toolkit; +using namespace std; +namespace mediakit { + +SrtPusher::SrtPusher(const EventPoller::Ptr &poller, const TSMediaSource::Ptr &src) : SrtCaller(poller) { + _push_src = src; + DebugL; +} + +SrtPusher::~SrtPusher(void) { + DebugL; +} + +void SrtPusher::publish(const string &strUrl) { + DebugL; + try { + _url.parse(strUrl); + } catch (std::exception &ex) { + onResult(SockException(Err_other, StrPrinter << "illegal srt url:" << ex.what())); + return; + } + onConnect(); + return; +} + +void SrtPusher::teardown() { + SrtCaller::onResult(SockException(Err_other, StrPrinter << "teardown: " << _url._full_url)); +} + +void SrtPusher::onHandShakeFinished() { + SrtCaller::onHandShakeFinished(); + onResult(SockException(Err_success, "srt push success")); + doPublish(); +} + +void SrtPusher::onResult(const SockException &ex) { + SrtCaller::onResult(ex); + + if (!ex) { + onPublishResult(ex); + } else { + WarnL << ex.getErrCode() << " " << ex.what(); + if (ex.getErrCode() == Err_shutdown) { + // 主动shutdown的,不触发回调 + return; + } + if (!_is_handleshake_finished) { + onPublishResult(ex); + } else { + onShutdown(ex); + } + } + return; +} + +uint16_t SrtPusher::getLatency() { + auto latency = (*this)[Client::kLatency].as(); + return (uint16_t)latency ; +} + +float SrtPusher::getTimeOutSec() { + auto timeoutMS = (*this)[Client::kTimeoutMS].as(); + return (float)timeoutMS / (float)1000; +} + +std::string SrtPusher::getPassphrase() { + auto passPhrase = (*this)[Client::kPassPhrase].as(); + return passPhrase; +} + +void SrtPusher::doPublish() { + auto src = _push_src.lock(); + if (!src) { + onResult(SockException(Err_eof, "the media source was released")); + return; + } + // 异步查找直播流 + std::weak_ptr weak_self = static_pointer_cast(shared_from_this()); + _ts_reader = src->getRing()->attach(getPoller()); + _ts_reader->setDetachCB([weak_self]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { + // 本对象已经销毁 + return; + } + strong_self->onShutdown(SockException(Err_shutdown)); + }); + _ts_reader->setReadCB([weak_self](const TSMediaSource::RingDataType &ts_list) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + // 本对象已经销毁 + return; + } + size_t i = 0; + auto size = ts_list->size(); + ts_list->for_each([&](const TSPacket::Ptr &ts) { + strong_self->onSendTSData(ts, ++i == size); + }); + }); +} + +} /* namespace mediakit */ + diff --git a/src/Srt/SrtPusher.h b/src/Srt/SrtPusher.h index 727b59f3..c7c973cc 100644 --- a/src/Srt/SrtPusher.h +++ b/src/Srt/SrtPusher.h @@ -1,59 +1,59 @@ -/* - * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. - * - * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). - * - * Use of this source code is governed by MIT-like license that can be found in the - * LICENSE file in the root of the source tree. All contributing project authors - * may be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef ZLMEDIAKIT_SRTPUSHER_H -#define ZLMEDIAKIT_SRTPUSHER_H - -#include "Network/Socket.h" -#include "Pusher/PusherBase.h" -#include "Poller/Timer.h" -#include "Util/TimeTicker.h" -#include "srt/SrtTransport.hpp" -#include "Http/HttpRequester.h" -#include -#include -#include "SrtCaller.h" - -namespace mediakit { - -// 实现了srt代理推流功能 -class SrtPusher - : public PusherBase , public SrtCaller { -public: - using Ptr = std::shared_ptr; - - SrtPusher(const toolkit::EventPoller::Ptr &poller,const TSMediaSource::Ptr &src); - ~SrtPusher() override; - - //// PusherBase override//// - void publish(const std::string &url) override; - void teardown() override; - - void doPublish(); -protected: - - //// SrtCaller override//// - void onHandShakeFinished() override; - void onResult(const toolkit::SockException &ex) override; - - bool isPlayer() override {return false;} - uint16_t getLatency() override; - float getTimeOutSec() override; - std::string getPassphrase() override; - -protected: - std::weak_ptr _push_src; - TSMediaSource::RingType::RingReader::Ptr _ts_reader; -}; - -using SrtPusherImp = PusherImp; - -} /* namespace mediakit */ -#endif /* ZLMEDIAKIT_SRTPUSHER_H */ +/* + * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). + * + * Use of this source code is governed by MIT-like license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_SRTPUSHER_H +#define ZLMEDIAKIT_SRTPUSHER_H + +#include "Network/Socket.h" +#include "Pusher/PusherBase.h" +#include "Poller/Timer.h" +#include "Util/TimeTicker.h" +#include "srt/SrtTransport.hpp" +#include "Http/HttpRequester.h" +#include +#include +#include "SrtCaller.h" + +namespace mediakit { + +// 实现了srt代理推流功能 +class SrtPusher + : public PusherBase , public SrtCaller { +public: + using Ptr = std::shared_ptr; + + SrtPusher(const toolkit::EventPoller::Ptr &poller,const TSMediaSource::Ptr &src); + ~SrtPusher() override; + + //// PusherBase override//// + void publish(const std::string &url) override; + void teardown() override; + + void doPublish(); +protected: + + //// SrtCaller override//// + void onHandShakeFinished() override; + void onResult(const toolkit::SockException &ex) override; + + bool isPlayer() override {return false;} + uint16_t getLatency() override; + float getTimeOutSec() override; + std::string getPassphrase() override; + +protected: + std::weak_ptr _push_src; + TSMediaSource::RingType::RingReader::Ptr _ts_reader; +}; + +using SrtPusherImp = PusherImp; + +} /* namespace mediakit */ +#endif /* ZLMEDIAKIT_SRTPUSHER_H */