diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index a5312d73..87dfceed 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -329,6 +329,13 @@ bool RtpProcess::close(mediakit::MediaSource &sender) { return true; } +bool RtpProcess::pause(MediaSource &sender, bool pause) { + if (_sock) { + _sock->enableRecv(!pause); + } + return static_cast(_sock); +} + toolkit::EventPoller::Ptr RtpProcess::getOwnerPoller(MediaSource &sender) { if (_sock) { return _sock->getPoller(); diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index 08b91e5e..46a3a4b1 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -115,8 +115,9 @@ protected: std::shared_ptr getOriginSock(MediaSource &sender) const override; toolkit::EventPoller::Ptr getOwnerPoller(MediaSource &sender) override; float getLossRate(MediaSource &sender, TrackType type) override; - Ptr getRtpProcess(mediakit::MediaSource &sender) const override; - bool close(mediakit::MediaSource &sender) override; + Ptr getRtpProcess(MediaSource &sender) const override; + bool close(MediaSource &sender) override; + bool pause(MediaSource &sender, bool pause) override; private: RtpProcess(const MediaTuple &tuple); diff --git a/src/Rtp/RtpSender.cpp b/src/Rtp/RtpSender.cpp index 3ae929e7..d89df47a 100644 --- a/src/Rtp/RtpSender.cpp +++ b/src/Rtp/RtpSender.cpp @@ -36,15 +36,7 @@ RtpSender::~RtpSender() { } void RtpSender::startSend(const MediaSourceEvent &sender, const MediaSourceEvent::SendRtpArgs &args, const function &cb){ - auto origin_socket = sender.getOriginSock(MediaSource::NullMediaSource()); - _origin_socket = dynamic_pointer_cast(origin_socket); - if (!_origin_socket) { - auto process = dynamic_pointer_cast(origin_socket); - if (process) { - _origin_socket = process->getSock(); - } - } - + _muxer = sender.getMuxer(MediaSource::NullMediaSource()); _args = args; if (!_interface) { // 重连时不重新创建对象 [AUTO-TRANSLATED:b788cd5d] @@ -323,12 +315,18 @@ void RtpSender::onConnect() { }); } - if (_socket_rtp->sockType() == toolkit::SockNum::Sock_TCP && _origin_socket) { + if (_socket_rtp->sockType() == toolkit::SockNum::Sock_TCP) { // rtp 端口是TCP端口,转发速度应当控制收流速度 - auto origin_socket = _origin_socket; - _socket_rtp->setOnFlush([origin_socket]() { - origin_socket->enableRecv(true); - return true; + std::weak_ptr weak_self = shared_from_this(); + _socket_rtp->setOnFlush([weak_self]() { + if (auto strong_self = weak_self.lock()) { + auto muxer = strong_self->_muxer.lock(); + if (muxer) { + muxer->pause(MediaSource::NullMediaSource(), false); + } + return true; + } + return false; }); } InfoL << "startSend rtp success: " << _socket_rtp->get_peer_ip() << ":" << _socket_rtp->get_peer_port() << ", data_type: " << _args.data_type << ", con_type: " << _args.con_type; @@ -451,8 +449,11 @@ void RtpSender::onFlushRtpList(shared_ptr> rtp_list) { } default: CHECK(0); } - if (_args.enable_origin_recv_limit && _socket_rtp->sockType() == toolkit::SockNum::Sock_TCP && _socket_rtp->isSocketBusy() && _origin_socket) { - _origin_socket->enableRecv(false); + if (_args.enable_origin_recv_limit && _socket_rtp->sockType() == toolkit::SockNum::Sock_TCP && _socket_rtp->isSocketBusy()) { + auto muxer = _muxer.lock(); + if (muxer) { + muxer->pause(MediaSource::NullMediaSource(), true); + } } }); }; diff --git a/src/Rtp/RtpSender.h b/src/Rtp/RtpSender.h index 8ed3eb80..d800c2ac 100644 --- a/src/Rtp/RtpSender.h +++ b/src/Rtp/RtpSender.h @@ -116,7 +116,6 @@ private: private: bool _is_connect = false; - toolkit::Socket::Ptr _origin_socket; MediaSourceEvent::SendRtpArgs _args; toolkit::Socket::Ptr _socket_rtp; toolkit::Socket::Ptr _socket_rtcp; @@ -127,6 +126,7 @@ private: toolkit::Ticker _rtcp_recv_ticker; std::shared_ptr _rtp_session; std::function _on_close; + std::weak_ptr _muxer; }; }//namespace mediakit