diff --git a/CMakeLists.txt b/CMakeLists.txt index b4d6cdd0..4f5c7123 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -463,6 +463,10 @@ if(ENABLE_VIDEOSTACK) endif () endif () +if(ENABLE_SRT) + update_cached_list(MK_COMPILE_DEFINITIONS ENABLE_SRT) +endif() + # ---------------------------------------------------------------------------- # Solution folders: # ---------------------------------------------------------------------------- diff --git a/conf/config.ini b/conf/config.ini index a46739dd..13d83d63 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -400,6 +400,8 @@ port=9000 latencyMul=4 #包缓存的大小 pktBufSize=8192 +#srt udp服务器的密码,为空表示不加密 +passPhrase= [rtsp] diff --git a/postman/ZLMediaKit.postman_collection.json b/postman/ZLMediaKit.postman_collection.json index b35d78e6..a3da5a65 100644 --- a/postman/ZLMediaKit.postman_collection.json +++ b/postman/ZLMediaKit.postman_collection.json @@ -522,7 +522,7 @@ "response": [] }, { - "name": "添加rtsp/rtmp/hls拉流代理(addStreamProxy)", + "name": "添加rtsp/rtmp/hls/srt拉流代理(addStreamProxy)", "request": { "method": "GET", "header": [], @@ -663,7 +663,19 @@ "value": null, "description": "无人观看时,是否直接关闭(而不是通过on_none_reader hook返回close)", "disabled": true - } + }, + { + "key": "latency", + "value": null, + "description": "srt延时, 单位毫秒", + "disabled": true + }, + { + "key": "passphrase", + "value": null, + "description": "srt拉流的密码", + "disabled": true + } ] } }, @@ -753,7 +765,7 @@ "response": [] }, { - "name": "添加rtsp/rtmp推流(addStreamPusherProxy)", + "name": "添加rtsp/rtmp/srt推流(addStreamPusherProxy)", "request": { "method": "GET", "header": [], @@ -815,7 +827,20 @@ "value": null, "description": "推流重试次数,不传此参数或传值<=0时,则无限重试", "disabled": true - } + }, + { + "key": "latency", + "value": null, + "description": "srt延时, 单位毫秒", + "disabled": true + }, + { + "key": "passphrase", + "value": null, + "description": "srt推流的密码", + "disabled": true + } + ] } }, @@ -2610,4 +2635,4 @@ "value": "__defaultVhost__" } ] -} \ No newline at end of file +} diff --git a/server/WebApi.cpp b/server/WebApi.cpp index ed3f0b28..f5071027 100755 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -685,6 +685,7 @@ void addStreamPusherProxy(const string &schema, int retry_count, int rtp_type, float timeout_sec, + const mINI &args, const function &cb) { auto key = getPusherKey(schema, vhost, app, stream, url); auto src = MediaSource::find(schema, vhost, app, stream); @@ -703,14 +704,20 @@ void addStreamPusherProxy(const string &schema, // Add push stream proxy auto pusher = s_pusher_proxy.make(key, src, retry_count); + // 先透传拷贝参数 [AUTO-TRANSLATED:22b5605e] + // First pass-through copy parameters + for (auto &pr : args) { + (*pusher)[pr.first] = pr.second; + } + // 指定RTP over TCP(播放rtsp时有效) [AUTO-TRANSLATED:1a062656] // Specify RTP over TCP (effective when playing RTSP) - pusher->emplace(Client::kRtpType, rtp_type); + (*pusher)[Client::kRtpType] = rtp_type; if (timeout_sec > 0.1f) { // 推流握手超时时间 [AUTO-TRANSLATED:00762fc1] // Push stream handshake timeout - pusher->emplace(Client::kTimeoutMS, timeout_sec * 1000); + (*pusher)[Client::kTimeoutMS] = timeout_sec * 1000; } // 开始推流,如果推流失败或者推流中止,将会自动重试若干次,默认一直重试 [AUTO-TRANSLATED:c8b95088] @@ -1174,6 +1181,12 @@ void installWebApi() { api_regist("/index/api/addStreamPusherProxy", [](API_ARGS_MAP_ASYNC) { CHECK_SECRET(); CHECK_ARGS("schema", "vhost", "app", "stream", "dst_url"); + + mINI args; + for (auto &pr : allArgs.args) { + args.emplace(pr.first, pr.second); + } + auto dst_url = allArgs["dst_url"]; auto retry_count = allArgs["retry_count"].empty() ? -1 : allArgs["retry_count"].as(); addStreamPusherProxy(allArgs["schema"], @@ -1184,6 +1197,7 @@ void installWebApi() { retry_count, allArgs["rtp_type"], allArgs["timeout_sec"], + args, [invoker, val, headerOut, dst_url](const SockException &ex, const string &key) mutable { if (ex) { val["code"] = API::OtherFailed; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e8ed4547..7ce2eb88 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -26,6 +26,14 @@ file(GLOB MediaKit_SRC_LIST ${CMAKE_CURRENT_SOURCE_DIR}/*/*.cpp ${CMAKE_CURRENT_SOURCE_DIR}/*/*.h) +if(NOT ENABLE_SRT) + file(GLOB SRT_SRC_LIST + ${CMAKE_CURRENT_SOURCE_DIR}/Srt/*.c + ${CMAKE_CURRENT_SOURCE_DIR}/Srt/*.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/Srt/*.h) + list(REMOVE_ITEM MediaKit_SRC_LIST ${SRT_SRC_LIST}) +endif() + if(USE_SOLUTION_FOLDERS AND (NOT GROUP_BY_EXPLORER)) # 在 IDE 中对文件进行分组, 源文件和头文件分开 set_file_group("${CMAKE_CURRENT_SOURCE_DIR}" ${MediaKit_SRC_LIST}) @@ -49,6 +57,7 @@ target_link_libraries(zlmediakit target_include_directories(zlmediakit PRIVATE "$" + "$" PUBLIC "$") diff --git a/src/Common/config.cpp b/src/Common/config.cpp index 1968d5f3..6a9c4c45 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -396,6 +396,8 @@ const string kWaitTrackReady = "wait_track_ready"; const string kPlayTrack = "play_track"; const string kProxyUrl = "proxy_url"; const string kRtspSpeed = "rtsp_speed"; +const string kLatency = "latency"; +const string kPassPhrase = "passPhrase"; } // namespace Client } // namespace mediakit diff --git a/src/Common/config.h b/src/Common/config.h index 22a5d3be..f9b19219 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -624,6 +624,10 @@ extern const std::string kProxyUrl; // 设置开始rtsp倍速播放 [AUTO-TRANSLATED:5db03cad] // Set the start RTSP playback speed extern const std::string kRtspSpeed; +// Set SRT delay +extern const std::string kLatency; +// Set SRT PassPhrase +extern const std::string kPassPhrase; } // namespace Client } // namespace mediakit diff --git a/src/Player/PlayerBase.cpp b/src/Player/PlayerBase.cpp index b94aaee9..1c8cc501 100644 --- a/src/Player/PlayerBase.cpp +++ b/src/Player/PlayerBase.cpp @@ -15,6 +15,9 @@ #include "Rtmp/FlvPlayer.h" #include "Http/HlsPlayer.h" #include "Http/TsPlayerImp.h" +#ifdef ENABLE_SRT +#include "Srt/SrtPlayerImp.h" +#endif // ENABLE_SRT using namespace std; using namespace toolkit; @@ -70,6 +73,12 @@ PlayerBase::Ptr PlayerBase::createPlayer(const EventPoller::Ptr &in_poller, cons } } +#ifdef ENABLE_SRT + if (strcasecmp("srt", prefix.data()) == 0) { + return PlayerBase::Ptr(new SrtPlayerImp(poller), release_func); + } +#endif//ENABLE_SRT + throw std::invalid_argument("not supported play schema:" + url_in); } @@ -78,6 +87,8 @@ PlayerBase::PlayerBase() { this->mINI::operator[](Client::kMediaTimeoutMS) = 5000; this->mINI::operator[](Client::kBeatIntervalMS) = 5000; this->mINI::operator[](Client::kWaitTrackReady) = true; + this->mINI::operator[](Client::kLatency) = 0; + this->mINI::operator[](Client::kPassPhrase) = ""; } } /* namespace mediakit */ diff --git a/src/Pusher/PusherBase.cpp b/src/Pusher/PusherBase.cpp index fe454bc0..63b2011d 100644 --- a/src/Pusher/PusherBase.cpp +++ b/src/Pusher/PusherBase.cpp @@ -12,6 +12,9 @@ #include "PusherBase.h" #include "Rtsp/RtspPusher.h" #include "Rtmp/RtmpPusher.h" +#ifdef ENABLE_SRT +#include "Srt/SrtPusher.h" +#endif // ENABLE_SRT using namespace toolkit; @@ -50,6 +53,13 @@ PusherBase::Ptr PusherBase::createPusher(const EventPoller::Ptr &in_poller, return PusherBase::Ptr(new RtmpPusherImp(poller, std::dynamic_pointer_cast(src)), release_func); } +#ifdef ENABLE_SRT + if (strcasecmp("srt", prefix.data()) == 0) { + return PusherBase::Ptr(new SrtPusherImp(poller, std::dynamic_pointer_cast(src)), release_func); + } +#endif//ENABLE_SRT + + throw std::invalid_argument("not supported push schema:" + url); } diff --git a/src/Srt/SrtCaller.cpp b/src/Srt/SrtCaller.cpp new file mode 100644 index 00000000..2a84c8c3 --- /dev/null +++ b/src/Srt/SrtCaller.cpp @@ -0,0 +1,1047 @@ +/* + * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). + * + * Use of this source code is governed by MIT-like license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "SrtCaller.h" +#include "srt/Ack.hpp" +#include "srt/SrtTransport.hpp" +#include "Common/config.h" +#include "Common/Parser.h" +#include + +using namespace toolkit; +using namespace std; +using namespace SRT; + +namespace mediakit { + +//srt://127.0.0.1:9000?streamid=#!::r=live/test +//srt://127.0.0.1:9000?streamid=#!::r=live/test,h=__defaultVhost__ +void SrtUrl::parse(const string &strUrl) { + //DebugL << "url: " << strUrl; + _full_url = strUrl; + auto url = strUrl; + + auto ip = findSubString(url.data(), "://", "?"); + splitUrl(ip, _host, _port); + + auto _params = findSubString(url.data(), "?" , NULL); + + auto kv = Parser::parseArgs(_params); + auto it = kv.find("streamid"); + if (it != kv.end()) { + auto streamid = it->second; + if (!toolkit::start_with(streamid, "#!::")) { + return; + } + std::string real_streamid = streamid.substr(4); + + auto params = Parser::parseArgs(real_streamid, ",", "="); + + for (auto iit : params) { + if (iit.first == "h") { + _vhost = iit.second; + } else if (iit.first == "r") { + auto tmps = toolkit::split(iit.second, "/"); + if (tmps.size() < 2) { + continue; + } + _app = tmps[0]; + _stream = tmps[1]; + } else { + //nop + } + } + + if (_vhost.empty()) { + _vhost = DEFAULT_VHOST; + } + } + + //TraceL << "ip: " << ip; + //TraceL << "_host: " << _host; + //TraceL << "_port: " << _port; + //TraceL << "_params: " << _params; + //TraceL << "_vhost: " << _vhost; + //TraceL << "_app: " << _app; + //TraceL << "_stream: " << _stream; + return; +} + + +//////////// SrtCaller ////////////////////////// +SrtCaller::SrtCaller(const toolkit::EventPoller::Ptr &poller) { + _poller = poller ? std::move(poller) : EventPollerPool::Instance().getPoller(); + _start_timestamp = SteadyClock::now(); + _socket_id = generateSocketId(); + + /* _init_seq_number = generateInitSeq(); */ + _init_seq_number = 0; + + _last_pkt_seq = _init_seq_number - 1; + _pkt_recv_rate_context = std::make_shared(_start_timestamp); + _estimated_link_capacity_context = std::make_shared(_start_timestamp); + _estimated_link_capacity_context->setLastSeq(_last_pkt_seq); + + _send_packet_seq_number = _init_seq_number; +} + +SrtCaller::~SrtCaller(void) { + DebugL; +} + +void SrtCaller::onConnect() { + //DebugL; + + auto peer_addr = SockUtil::make_sockaddr(_url._host.c_str(), (_url._port)); + _socket = Socket::createSocket(_poller, false); + _socket->bindUdpSock(0); + _socket->bindPeerAddr((struct sockaddr *)&peer_addr, 0, true); + + weak_ptr weak_self = shared_from_this(); + _socket->setOnRead([weak_self](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) mutable { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return; + } + strong_self->inputSockData((uint8_t*)buf->data(), buf->size(), addr); + }); + + doHandshake(); +} + +void SrtCaller::onResult(const SockException &ex) { + if (!ex) { + // 会话建立成功 + } else { + if (ex.getErrCode() == Err_shutdown) { + // 主动shutdown的,不触发回调 + return; + } + + if (_socket && _is_handleshake_finished) { + sendShutDown(); + } + _is_handleshake_finished = false; + _handleshake_timer.reset(); + _keeplive_timer.reset(); + _announce_timer.reset(); + } + return; +} + +void SrtCaller::onHandShakeFinished() { + DebugL; + _is_handleshake_finished = true; + if (_handleshake_timer) { + _handleshake_timer.reset(); + } + _handleshake_req = nullptr; + + std::weak_ptr weak_self = std::static_pointer_cast(shared_from_this()); + _keeplive_timer = std::make_shared(0.2, [weak_self]()->bool{ + auto strong_self = weak_self.lock(); + if (!strong_self) { + return false; + } + + //Keep-alive control packets are sent after a certain timeout from the last time any packet (Control or Data) was sent. + //The default timeout for a keep-alive packet to be sent is 1 second. + if (strong_self->_send_ticker.elapsedTime() > 1000) { + strong_self->sendKeepLivePacket(); + } + return true; + }, getPoller()); + + return; +} + +void SrtCaller::onSRTData(DataPacket::Ptr pkt) { + InfoL; + if (!isPlayer()) { + WarnL << "this is not a player data ignore"; + return; + } +} + +void SrtCaller::onSendTSData(const Buffer::Ptr &buffer, bool flush) { + // TraceL; + // + DataPacket::Ptr pkt; + size_t payloadSize = getPayloadSize(); + size_t size = buffer->size(); + char *ptr = buffer->data(); + char *end = buffer->data() + size; + + + while (ptr < end && size >= payloadSize) { + pkt = std::make_shared(); + pkt->f = 0; + pkt->packet_seq_number = _send_packet_seq_number & 0x7fffffff; + _send_packet_seq_number = (_send_packet_seq_number + 1) & 0x7fffffff; + pkt->PP = 3; + pkt->O = 0; + pkt->KK = 0; + pkt->R = 0; + pkt->msg_number = _send_msg_number++; + pkt->dst_socket_id = _peer_socket_id; + pkt->timestamp = DurationCountMicroseconds(SteadyClock::now() - _start_timestamp); + + sendDataPacket(pkt, ptr, (int)payloadSize, flush); + ptr += payloadSize; + size -= payloadSize; + } + + if (size > 0 && ptr < end) { + pkt = std::make_shared(); + pkt->f = 0; + pkt->packet_seq_number = _send_packet_seq_number & 0x7fffffff; + _send_packet_seq_number = (_send_packet_seq_number + 1) & 0x7fffffff; + pkt->PP = 3; + pkt->O = 0; + pkt->KK = 0; + pkt->R = 0; + pkt->msg_number = _send_msg_number++; + pkt->dst_socket_id = _peer_socket_id; + pkt->timestamp = DurationCountMicroseconds(SteadyClock::now() - _start_timestamp); + sendDataPacket(pkt, ptr, (int)size, flush); + } +} + +void SrtCaller::inputSockData(uint8_t *buf, int len, struct sockaddr *addr) { + //TraceL << hexdump((void*)buf, len); + + using srt_control_handler = void (SrtCaller::*)(uint8_t * buf, int len, struct sockaddr *addr); + static std::unordered_map s_control_functions; + static onceToken token([]() { + s_control_functions.emplace(SRT::ControlPacket::HANDSHAKE, &SrtCaller::handleHandshake); + s_control_functions.emplace(SRT::ControlPacket::ACK, &SrtCaller::handleACK); + s_control_functions.emplace(SRT::ControlPacket::ACKACK, &SrtCaller::handleACKACK); + s_control_functions.emplace(SRT::ControlPacket::NAK, &SrtCaller::handleNAK); + s_control_functions.emplace(SRT::ControlPacket::DROPREQ, &SrtCaller::handleDropReq); + s_control_functions.emplace(SRT::ControlPacket::KEEPALIVE, &SrtCaller::handleKeeplive); + s_control_functions.emplace(SRT::ControlPacket::SHUTDOWN, &SrtCaller::handleShutDown); + s_control_functions.emplace(SRT::ControlPacket::PEERERROR, &SrtCaller::handlePeerError); + s_control_functions.emplace(SRT::ControlPacket::CONGESTIONWARNING, &SrtCaller::handleCongestionWarning); + s_control_functions.emplace(SRT::ControlPacket::USERDEFINEDTYPE, &SrtCaller::handleUserDefinedType); + }); + + _alive_ticker.resetTime(); + _now = SteadyClock::now(); + + // 处理srt数据 + if (DataPacket::isDataPacket(buf, len)) { + uint32_t socketId = DataPacket::getSocketID(buf, len); + if (isPlayer()) { + if (socketId == _socket_id) { + _pkt_recv_rate_context->inputPacket(_now, len + UDP_HDR_SIZE); + handleDataPacket(buf, len, addr); + checkAndSendAckNak(); + } + } + } else if (ControlPacket::isControlPacket(buf, len)) { + uint32_t socketId = ControlPacket::getSocketID(buf, len); + uint16_t type = ControlPacket::getControlType(buf, len); + + auto it = s_control_functions.find(type); + if (it == s_control_functions.end()) { + WarnL << " not support type ignore: " << ControlPacket::getControlType(buf, len); + return; + } else { + (this->*(it->second))(buf, len, addr); + } + + if (_is_handleshake_finished && isPlayer()){ + checkAndSendAckNak(); + } + + } else { + // not reach + WarnL << "not reach this"; + } +} + +void SrtCaller::doHandshake() { + _alive_ticker.resetTime(); + if (!_alive_timer) { + createTimerForCheckAlive(); + } + + if (!getPassphrase().empty()) { + _crypto = std::make_shared(getPassphrase()); + } + + sendHandshakeInduction(); + return; +} + +void SrtCaller::sendHandshakeInduction() { + DebugL; + _induction_ts = SteadyClock::now(); + + SRT::HandshakePacket::Ptr req = std::make_shared(); + req->timestamp = DurationCountMicroseconds(_induction_ts - _start_timestamp); + req->dst_socket_id = 0; + + req->version = 4; + req->encryption_field = 0; + req->extension_field = 0x0002; + req->initial_packet_sequence_number = _init_seq_number; + req->mtu = _mtu; + req->max_flow_window_size = _max_flow_window_size; + req->handshake_type = SRT::HandshakePacket::HS_TYPE_INDUCTION; + req->srt_socket_id = _socket_id; + req->syn_cookie = 0; + + auto dataSenderAddr = SockUtil::make_sockaddr(_url._host.c_str(), _url._port); + req->assignPeerIPBE(&dataSenderAddr); + req->storeToData(); + _handleshake_req = req; + sendControlPacket(req, true); + + std::weak_ptr weak_self = std::static_pointer_cast(shared_from_this()); + _handleshake_timer = std::make_shared(0.2, [weak_self]()->bool{ + auto strong_self = weak_self.lock(); + if (!strong_self) { + return false; + } + + if (strong_self->_is_handleshake_finished) { + return false; + } + strong_self->sendControlPacket(strong_self->_handleshake_req, true); + return true; + }, getPoller()); + + return; +} + +void SrtCaller::sendHandshakeConclusion() { + DebugL; + + SRT::HandshakePacket::Ptr req = std::make_shared(); + req->timestamp = DurationCountMicroseconds(_now - _start_timestamp); + req->dst_socket_id = 0; + + req->version = 5; + req->encryption_field = SRT::HandshakePacket::NO_ENCRYPTION; + req->extension_field = HandshakePacket::HS_EXT_FILED_HSREQ | HandshakePacket::HS_EXT_FILED_CONFIG; + if (_crypto) { + //The default value is 0 (no encryption advertised). + //If neither peer advertises encryption, AES-128 is selected by default + /* req->encryption_field = SRT::HandshakePacket::AES_128; */ + req->extension_field |= HandshakePacket::HS_EXT_FILED_KMREQ; + } + req->initial_packet_sequence_number = _init_seq_number; + req->mtu = _mtu; + req->max_flow_window_size = _max_flow_window_size; + req->handshake_type = SRT::HandshakePacket::HS_TYPE_CONCLUSION; + req->srt_socket_id = _socket_id; + req->syn_cookie = _sync_cookie; + + auto addr = SockUtil::make_sockaddr(_url._host.c_str(), _url._port); + req->assignPeerIPBE(&addr); + + HSExtMessage::Ptr ext = std::make_shared(); + ext->extension_type = HSExt::SRT_CMD_HSREQ; + ext->srt_version = srtVersion(1, 5, 0); + ext->srt_flag = 0xbf; + + // if set latency, use set value + _delay = getLatency(); + if (0 == _delay) { + //The value of minimum TsbpdDelay is negotiated during the SRT handshake exchange and is equal to 120 milliseconds. + //The recommended value of TsbpdDelay is 3-4 times RTT. + _delay = DurationCountMicroseconds(_now - _induction_ts) * getLatencyMul() / 1000; + if (_delay <= 120) { + _delay = 120; + } + } + + ext->recv_tsbpd_delay = _delay; + ext->send_tsbpd_delay = _delay; + req->ext_list.push_back(std::move(ext)); + + HSExtStreamID::Ptr extStreamId = std::make_shared(); + extStreamId->streamid = generateStreamId(); + req->ext_list.push_back(std::move(extStreamId)); + + if (_crypto) { + HSExtKeyMaterial::Ptr keyMaterial = _crypto->generateKeyMaterialExt(HSExt::SRT_CMD_KMREQ); + req->ext_list.push_back(std::move(keyMaterial)); + } + + req->storeToData(); + _handleshake_req = req; + sendControlPacket(req); + + return; +} + +void SrtCaller::sendACKPacket() { + uint32_t recv_rate = 0; + + SRT::ACKPacket::Ptr pkt = std::make_shared(); + pkt->dst_socket_id = _peer_socket_id; + pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); + pkt->ack_number = ++_ack_number_count; + pkt->last_ack_pkt_seq_number = _recv_buf->getExpectedSeq(); + pkt->rtt = _rtt; + pkt->rtt_variance = _rtt_variance; + pkt->available_buf_size = _recv_buf->getAvailableBufferSize(); + pkt->pkt_recv_rate = _pkt_recv_rate_context->getPacketRecvRate(recv_rate); + pkt->estimated_link_capacity = _estimated_link_capacity_context->getEstimatedLinkCapacity(); + pkt->recv_rate = recv_rate; + if(0){ + TraceL<pkt_recv_rate<<" pkt/s "<estimated_link_capacity<<" pkt/s (cap) "<available_buf_size<<" available buf"; + //TraceL<<_pkt_recv_rate_context->dump(); + //TraceL<<"recv estimated:"; + //TraceL<< _pkt_recv_rate_context->dump(); + //TraceL<<"recv queue:"; + //TraceL<<_recv_buf->dump(); + } + if (pkt->available_buf_size < 2) { + pkt->available_buf_size = 2; + } + pkt->storeToData(); + _ack_send_timestamp[pkt->ack_number] = _now; + _last_ack_pkt_seq = pkt->last_ack_pkt_seq_number; + sendControlPacket(pkt, true); + // TraceL<<"send ack "<dump(); + // TraceL<<_recv_buf->dump(); + return; +} + +void SrtCaller::sendLightACKPacket() { + ACKPacket::Ptr pkt = std::make_shared(); + pkt->dst_socket_id = _peer_socket_id; + pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); + pkt->ack_number = 0; + pkt->last_ack_pkt_seq_number = _recv_buf->getExpectedSeq(); + pkt->rtt = 0; + pkt->rtt_variance = 0; + pkt->available_buf_size = 0; + pkt->pkt_recv_rate = 0; + pkt->estimated_link_capacity = 0; + pkt->recv_rate = 0; + pkt->storeToData(); + _last_ack_pkt_seq = pkt->last_ack_pkt_seq_number; + sendControlPacket(pkt, true); + TraceL << "send ack " << pkt->dump(); + return; +} + +void SrtCaller::sendNAKPacket(std::list &lost_list) { + SRT::NAKPacket::Ptr pkt = std::make_shared(); + std::list tmp; + auto size = SRT::NAKPacket::getCIFSize(lost_list); + size_t paylaod_size = getPayloadSize(); + if (size > paylaod_size) { + WarnL << "loss report cif size " << size; + size_t num = paylaod_size / 8; + + size_t msgNum = (lost_list.size() + num - 1) / num; + decltype(lost_list.begin()) cur, next; + for (size_t i = 0; i < msgNum; ++i) { + cur = lost_list.begin(); + std::advance(cur, i * num); + + if (i == msgNum - 1) { + next = lost_list.end(); + } else { + next = lost_list.begin(); + std::advance(next, (i + 1) * num); + } + tmp.assign(cur, next); + pkt->dst_socket_id = _peer_socket_id; + pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); + pkt->lost_list = tmp; + pkt->storeToData(); + sendControlPacket(pkt, true); + } + + } else { + pkt->dst_socket_id = _peer_socket_id; + pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); + pkt->lost_list = lost_list; + pkt->storeToData(); + sendControlPacket(pkt, true); + } + + // TraceL<<"send NAK "<dump(); + return; +} + +void SrtCaller::sendMsgDropReq(uint32_t first, uint32_t last) { + MsgDropReqPacket::Ptr pkt = std::make_shared(); + pkt->dst_socket_id = _peer_socket_id; + pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); + pkt->first_pkt_seq_num = first; + pkt->last_pkt_seq_num = last; + pkt->storeToData(); + sendControlPacket(pkt, true); + return; +} + +void SrtCaller::sendKeepLivePacket() { + auto now = SteadyClock::now(); + SRT::KeepLivePacket::Ptr req = std::make_shared(); + req->timestamp = SRT::DurationCountMicroseconds(now - _start_timestamp); + req->dst_socket_id = _peer_socket_id; + req->storeToData(); + sendControlPacket(req, true); + return; +} + +void SrtCaller::sendShutDown() { + auto now = SteadyClock::now(); + ShutDownPacket::Ptr pkt = std::make_shared(); + pkt->dst_socket_id = _peer_socket_id; + pkt->timestamp = SRT::DurationCountMicroseconds(now - _start_timestamp); + pkt->storeToData(); + sendControlPacket(pkt, true); + return; +} + +void SrtCaller::tryAnnounceKeyMaterial() { + //TraceL; + + if (!_crypto) { + return; + } + + auto pkt = _crypto->takeAwayAnnouncePacket(); + if (!pkt) { + return; + } + + auto now = SteadyClock::now(); + pkt->dst_socket_id = _peer_socket_id; + pkt->timestamp = SRT::DurationCountMicroseconds(now - _start_timestamp); + pkt->storeToData(); + _announce_req = pkt; + sendControlPacket(pkt, true); + + std::weak_ptr weak_self = std::static_pointer_cast(shared_from_this()); + _announce_timer = std::make_shared(0.2, [weak_self]()->bool{ + auto strong_self = weak_self.lock(); + if (!strong_self) { + return false; + } + if (!strong_self->_announce_req) { + return false; + } + + strong_self->sendControlPacket(strong_self->_announce_req, true); + return true; + }, getPoller()); + + return; +} + +void SrtCaller::sendControlPacket(SRT::ControlPacket::Ptr pkt, bool flush) { + //TraceL; + sendPacket(pkt, flush); + return; +} + +void SrtCaller::sendDataPacket(SRT::DataPacket::Ptr pkt, char *buf, int len, bool flush) { + auto data = buf; + auto size = len; + BufferLikeString::Ptr payload; + if (_crypto) { + payload = _crypto->encrypt(pkt, const_cast(buf), len); + if (!payload) { + WarnL << "encrypt pkt->packet_seq_number: " << pkt->packet_seq_number << ", timestamp: " << "pkt->timestamp " << " fail"; + return; + } + + data = payload->data(); + size = payload->size(); + + tryAnnounceKeyMaterial(); + } + + pkt->storeToData((uint8_t *)data, size); + sendPacket(pkt, flush); + _send_buf->inputPacket(pkt); + return; +} + +void SrtCaller::sendPacket(Buffer::Ptr pkt, bool flush) { + //TraceL << pkt->size(); + auto tmp = _packet_pool.obtain2(); + tmp->assign(pkt->data(), pkt->size()); + _socket->send(std::move(tmp), nullptr, 0, flush); + + _send_ticker.resetTime(); + return; +} + +void SrtCaller::handleHandshake(uint8_t *buf, int len, struct sockaddr *addr) { + //DebugL; + SRT::HandshakePacket pkt; + if(!pkt.loadFromData(buf, len)){ + WarnL<< "is not vaild HandshakePacket"; + return; + } + + if (pkt.handshake_type == SRT::HandshakePacket::HS_TYPE_INDUCTION) { + handleHandshakeInduction(pkt, addr); + } else if (pkt.handshake_type == SRT::HandshakePacket::HS_TYPE_CONCLUSION) { + handleHandshakeConclusion(pkt, addr); + } else if (pkt.isReject()){ + onResult(SockException(Err_other, StrPrinter << "handshake fail, reject resaon: " << pkt.handshake_type + << ", " << SRT::getRejectReason((SRT_REJECT_REASON)pkt.handshake_type))); + return; + } else { + WarnL << " not support handshake type = " << pkt.handshake_type; + WarnL << pkt.dump(); + } + _ack_ticker.resetTime(_now); + _nak_ticker.resetTime(_now); + return; +} + +void SrtCaller::handleHandshakeInduction(SRT::HandshakePacket &pkt, struct sockaddr *addr) { + DebugL; + + if (!_handleshake_req) { + WarnL << "must Induction Phase for handleshake"; + return; + } + + if (_handleshake_req->handshake_type == HandshakePacket::HS_TYPE_CONCLUSION) { + WarnL << "should be Conclusion Phase for handleshake "; + return; + } else if (_handleshake_req->handshake_type != HandshakePacket::HS_TYPE_INDUCTION) { + WarnL <<"not reach this"; + return; + } + + // Induction Phase + if (pkt.version != 5) { + WarnL << "not support handleshake version: " << pkt.version; + return; + } + + if (pkt.extension_field != 0x4A17) { + WarnL << "not match SRT MAGIC"; + return; + } + + if (pkt.dst_socket_id != _handleshake_req->srt_socket_id) { + WarnL << "not match _socket_id"; + return; + } + + // TODO: encryption_field + + _sync_cookie = pkt.syn_cookie; + + _mtu = std::min(pkt.mtu, _mtu); + _max_flow_window_size = std::min(pkt.max_flow_window_size, _max_flow_window_size); + sendHandshakeConclusion(); + return; +} + +void SrtCaller::handleHandshakeConclusion(SRT::HandshakePacket &pkt, struct sockaddr *addr) { + DebugL; + + if (!_handleshake_req) { + WarnL << "must Conclusion Phase for handleshake "; + return; + } + + if (_handleshake_req->handshake_type == HandshakePacket::HS_TYPE_INDUCTION) { + WarnL << "should be Conclusion Phase for handleshake "; + return; + } else if (_handleshake_req->handshake_type != HandshakePacket::HS_TYPE_CONCLUSION) { + WarnL <<"not reach this"; + return; + } + + // Conclusion Phase + if (pkt.version != 5) { + WarnL << "not support handleshake version: " << pkt.version; + return; + } + + if (pkt.dst_socket_id != _handleshake_req->srt_socket_id) { + WarnL << "not match _socket_id"; + return; + } + + // TODO: encryption_field + + _peer_socket_id = pkt.srt_socket_id; + + HSExtMessage::Ptr resp; + HSExtKeyMaterial::Ptr keyMaterial; + + for (auto& ext : pkt.ext_list) { + if (!resp) { + resp = std::dynamic_pointer_cast(ext); + } + if (!keyMaterial) { + keyMaterial = std::dynamic_pointer_cast(ext); + } + } + + if (resp) { + _delay = std::max(_delay, resp->recv_tsbpd_delay); + //DebugL << "flag " << resp->srt_flag; + //DebugL << "recv_tsbpd_delay " << resp->recv_tsbpd_delay; + //DebugL << "send_tsbpd_delay " << resp->send_tsbpd_delay; + } + + if (keyMaterial && _crypto) { + _crypto->loadFromKeyMaterial(keyMaterial); + } + + if (isPlayer()) { + //The recommended threshold value is 1.25 times the SRT latency value. + _recv_buf = std::make_shared(getPktBufSize(), _init_seq_number, _delay * 1250, resp->srt_flag); + } else { + //The recommended threshold value is 1.25 times the SRT latency value. + //Note that the SRT sender keeps packets for at least 1 second in case the latency is not high enough for a large RTT + _send_buf = std::make_shared(getPktBufSize(), std::min((uint32_t)_delay * 1250, 1000000), resp->srt_flag); + } + + onHandShakeFinished(); + return; +} + +void SrtCaller::handleACK(uint8_t *buf, int len, struct sockaddr *addr) { + // TraceL; + //Acknowledgement of Acknowledgement (ACKACK) control packets are sent to acknowledge the reception of a Full ACK + ACKPacket ack; + if (!ack.loadFromData(buf, len)) { + return; + } + ACKACKPacket::Ptr pkt = std::make_shared(); + pkt->dst_socket_id = _peer_socket_id; + pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); + pkt->ack_number = ack.ack_number; + pkt->storeToData(); + _send_buf->drop(ack.last_ack_pkt_seq_number); + sendControlPacket(pkt, true); + // TraceL<<"ack number "<(); + pkt->loadFromData(buf, len); + + if(_ack_send_timestamp.find(pkt->ack_number) != _ack_send_timestamp.end()){ + uint32_t rtt = DurationCountMicroseconds(_now - _ack_send_timestamp[pkt->ack_number]); + _rtt_variance = (3 * _rtt_variance + abs((long)_rtt - (long)rtt)) / 4; + _rtt = (7 * rtt + _rtt) / 8; + // TraceL<<" rtt:"<<_rtt<<" rtt variance:"<<_rtt_variance; + _ack_send_timestamp.erase(pkt->ack_number); + + if(_last_recv_ackack_seq_num < pkt->ack_number){ + _last_recv_ackack_seq_num = pkt->ack_number; + }else{ + if((_last_recv_ackack_seq_num-pkt->ack_number)>(MAX_TS>>1)){ + _last_recv_ackack_seq_num = pkt->ack_number; + } + } + + if(_ack_send_timestamp.size()>1000){ + // clear data + for(auto it = _ack_send_timestamp.begin(); it != _ack_send_timestamp.end();){ + if(DurationCountMicroseconds(_now-it->second)>5e6){ + // 超过五秒没有ackack 丢弃 + it = _ack_send_timestamp.erase(it); + }else{ + it++; + } + } + } + + } + return; +} + +void SrtCaller::handleNAK(uint8_t *buf, int len, struct sockaddr *addr) { + //TraceL; + NAKPacket pkt; + pkt.loadFromData(buf, len); + bool empty = false; + bool flush = false; + + for (auto& it : pkt.lost_list) { + if (pkt.lost_list.back() == it) { + flush = true; + } + empty = true; + auto re_list = _send_buf->findPacketBySeq(it.first, it.second - 1); + for (auto& pkt : re_list) { + pkt->R = 1; + pkt->storeToHeader(); + sendPacket(pkt, flush); + empty = false; + } + if (empty) { + sendMsgDropReq(it.first, it.second - 1); + } + } + return; +} + +void SrtCaller::handleDropReq(uint8_t *buf, int len, struct sockaddr *addr) { + MsgDropReqPacket pkt; + pkt.loadFromData(buf, len); + std::list list; + // TraceL<<"drop "<drop(pkt.first_pkt_seq_num, pkt.last_pkt_seq_num, list); + //checkAndSendAckNak(); + if (list.empty()) { + return; + } + // uint32_t max_seq = 0; + for (auto& data : list) { + // max_seq = data->packet_seq_number; + if (_last_pkt_seq + 1 != data->packet_seq_number) { + TraceL << "pkt lost " << _last_pkt_seq + 1 << "->" << data->packet_seq_number; + } + _last_pkt_seq = data->packet_seq_number; + onSRTData(std::move(data)); + } + return; +} + +void SrtCaller::handleKeeplive(uint8_t *buf, int len, struct sockaddr *addr) { + // TraceL; + return; +} + +void SrtCaller::handleShutDown(uint8_t *buf, int len, struct sockaddr *addr) { + TraceL; + onResult(SockException(Err_other, "peer close connection")); + return; +} + +void SrtCaller::handlePeerError(uint8_t *buf, int len, struct sockaddr *addr) { + TraceL; + return; +} + +void SrtCaller::handleCongestionWarning(uint8_t *buf, int len, struct sockaddr *addr) { + TraceL; + return; +} + +void SrtCaller::handleUserDefinedType(uint8_t *buf, int len, struct sockaddr *addr) { + /* TraceL; */ + + using srt_userd_defined_handler = void (SrtCaller::*)(uint8_t * buf, int len, struct sockaddr *addr); + static std::unordered_map s_userd_defined_functions; + static onceToken token([]() { + s_userd_defined_functions.emplace(SRT::HSExt::SRT_CMD_KMREQ, &SrtCaller::handleKeyMaterialReqPacket); + s_userd_defined_functions.emplace(SRT::HSExt::SRT_CMD_KMRSP, &SrtCaller::handleKeyMaterialRspPacket); + }); + + uint16_t subtype = ControlPacket::getSubType(buf, len); + auto it = s_userd_defined_functions.find(subtype); + if (it == s_userd_defined_functions.end()) { + WarnL << " not support subtype in user defined msg ignore: " << subtype; + return; + } else { + (this->*(it->second))(buf, len, addr); + } + + return; +} + +void SrtCaller::handleKeyMaterialReqPacket(uint8_t *buf, int len, struct sockaddr *addr) { + /* TraceL; */ + + if (!_crypto) { + WarnL << " not enable crypto, ignore"; + return; + } + + KeyMaterialPacket::Ptr pkt = std::make_shared(); + pkt->loadFromData(buf, len); + _crypto->loadFromKeyMaterial(pkt); + + //rsp + pkt->sub_type = SRT::HSExt::SRT_CMD_KMRSP; + pkt->dst_socket_id = _peer_socket_id; + pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); + pkt->storeToData(); + sendControlPacket(pkt, true); + return; +} + +void SrtCaller::handleKeyMaterialRspPacket(uint8_t *buf, int len, struct sockaddr *addr) { + /* TraceL; */ + _announce_req = nullptr; + return; +} + +void SrtCaller::handleDataPacket(uint8_t *buf, int len, struct sockaddr *addr) { + //TraceL; + DataPacket::Ptr pkt = std::make_shared(); + pkt->loadFromData(buf, len); + + if (_crypto) { + auto payload = _crypto->decrypt(pkt, pkt->payloadData(), pkt->payloadSize()); + if (!payload) { + WarnL << "decrypt pkt->packet_seq_number: " << pkt->packet_seq_number << ", timestamp: " << "pkt->timestamp " << " fail"; + return; + } + + pkt->reloadPayload((uint8_t*)payload->data(), payload->size()); + } + + _estimated_link_capacity_context->inputPacket(_now, pkt); + + std::list list; + _recv_buf->inputPacket(pkt, list); + for (auto& data : list) { + if (_last_pkt_seq + 1 != data->packet_seq_number) { + TraceL << "pkt lost " << _last_pkt_seq + 1 << "->" << data->packet_seq_number; + } + _last_pkt_seq = data->packet_seq_number; + onSRTData(std::move(data)); + } + return; +} + +void SrtCaller::checkAndSendAckNak() { + //SRT Periodic NAK reports are sent with a period of (RTT + 4 * RTTVar) / 2 (so called NAKInterval), + //with a 20 milliseconds floor + auto nak_interval = (_rtt + _rtt_variance * 4) / 2; + if (nak_interval <= 20 * 1000) { + nak_interval = 20 * 1000; + } + if (_nak_ticker.elapsedTime(_now) > nak_interval) { + auto lost = _recv_buf->getLostSeq(); + if (!lost.empty()) { + sendNAKPacket(lost); + } + _nak_ticker.resetTime(_now); + } + + //A Full ACK control packet is sent every 10 ms + if (_ack_ticker.elapsedTime(_now) > 10 * 1000) { + _light_ack_pkt_count = 0; + _ack_ticker.resetTime(_now); + // send a ack per 10 ms for receiver + if(_last_ack_pkt_seq != _recv_buf->getExpectedSeq()){ + //TraceL<<"send a ack packet"; + sendACKPacket(); + } else{ + //TraceL<<" ignore repeate ack packet"; + } + } else { + //The recommendation is to send a Light ACK for every 64 packets received. + if (_light_ack_pkt_count >= 64) { + // for high bitrate stream send light ack + // TODO + sendLightACKPacket(); + TraceL << "send light ack"; + } + _light_ack_pkt_count = 0; + } + _light_ack_pkt_count++; + return; +} + +void SrtCaller::createTimerForCheckAlive(){ + std::weak_ptr weak_self = std::static_pointer_cast(shared_from_this()); + auto timeoutSec = getTimeOutSec(); + _alive_timer = std::make_shared( + timeoutSec /2, + [weak_self,timeoutSec]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return false; + } + if (strong_self->_alive_ticker.elapsedTime() > timeoutSec * 1000) { + strong_self->onResult(SockException(Err_timeout, "Receive srt socket data timeout")); + return false; + } + return true; + }, getPoller()); + + return; +} + +int SrtCaller::getLatencyMul() { + GET_CONFIG(int, latencyMul, SRT::kLatencyMul); + if (latencyMul < 0) { + WarnL << "config srt " << kLatencyMul << " not vaild"; + return 4; + } + return latencyMul; +} + +int SrtCaller::getPktBufSize() { + GET_CONFIG(int, pktBufSize, SRT::kPktBufSize); + if (pktBufSize <= 0) { + WarnL << "config srt " << kPktBufSize << " not vaild"; + return 8912; + } + return pktBufSize; +} + +float SrtCaller::getTimeOutSec() { + GET_CONFIG(uint32_t, timeout, SRT::kTimeOutSec); + if (timeout <= 0) { + WarnL << "config srt " << kTimeOutSec << " not vaild"; + return 5 * 1000; + } + return (float)timeout * (float)1000; +}; + +std::string SrtCaller::generateStreamId() { + auto streamId = "#!::r=" + _url._app + "/" + _url._stream; + if (_url._vhost != DEFAULT_VHOST) { + streamId += ",h=" +_url._vhost; + } + if (!isPlayer()) { + streamId += ",m=publish"; + } + return streamId; +}; + +uint32_t SrtCaller::generateSocketId() { + // 生成一个 32 位的随机整数 + std::random_device rd; + std::mt19937 mt(rd()); + std::uniform_int_distribution dist(0, UINT32_MAX); + uint32_t id = dist(mt); + + return id; +} + +int32_t SrtCaller::generateInitSeq() { + // 生成一个 32 位的随机整数 + std::random_device rd; + std::mt19937 mt(rd()); + std::uniform_int_distribution dist(0, MAX_SEQ); + int32_t id = dist(mt); + return id; +} + +size_t SrtCaller::getPayloadSize() { + size_t ret = (_mtu - 28 - 16) / 188 * 188; + return ret; +} + + +} /* namespace mediakit */ + diff --git a/src/Srt/SrtCaller.h b/src/Srt/SrtCaller.h new file mode 100644 index 00000000..84aa8089 --- /dev/null +++ b/src/Srt/SrtCaller.h @@ -0,0 +1,199 @@ +/* + * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). + * + * Use of this source code is governed by MIT-like license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_SRTCALLER_H +#define ZLMEDIAKIT_SRTCALLER_H + +//srt +#include "srt/Packet.hpp" +#include "srt/Crypto.hpp" +#include "srt/PacketQueue.hpp" +#include "srt/PacketSendQueue.hpp" +#include "srt/Statistic.hpp" + +#include "Poller/EventPoller.h" +#include "Network/Socket.h" +#include "Poller/Timer.h" +#include "Util/TimeTicker.h" +#include "Common/MultiMediaSourceMuxer.h" +#include "Rtp/Decoder.h" +#include "TS/TSMediaSource.h" +#include +#include + + +namespace mediakit { + +// 解析srt 信令url的工具类 +class SrtUrl { +public: + std::string _full_url; + std::string _params; + std::string _host; + uint16_t _port; + std::string _vhost; + std::string _app; + std::string _stream; + +public: + void parse(const std::string &url); +}; + +// 实现了webrtc代理拉流功能 +class SrtCaller : public std::enable_shared_from_this{ +public: + using Ptr = std::shared_ptr; + + using SteadyClock = std::chrono::steady_clock; + using TimePoint = std::chrono::time_point; + + SrtCaller(const toolkit::EventPoller::Ptr &poller); + virtual ~SrtCaller(); + + const toolkit::EventPoller::Ptr &getPoller() const {return _poller;} + + virtual void inputSockData(uint8_t *buf, int len, struct sockaddr *addr); + virtual void onSendTSData(const SRT::Buffer::Ptr &buffer, bool flush); + +protected: + + virtual void onConnect(); + virtual void onHandShakeFinished(); + virtual void onResult(const toolkit::SockException &ex); + + virtual void onSRTData(SRT::DataPacket::Ptr pkt); + + virtual uint16_t getLatency() = 0; + virtual int getLatencyMul(); + virtual int getPktBufSize(); + virtual float getTimeOutSec(); + + virtual bool isPlayer() = 0; + +private: + void doHandshake(); + + void sendHandshakeInduction(); + void sendHandshakeConclusion(); + void sendACKPacket(); + void sendLightACKPacket(); + void sendNAKPacket(std::list &lost_list); + void sendMsgDropReq(uint32_t first, uint32_t last); + void sendKeepLivePacket(); + void sendShutDown(); + void tryAnnounceKeyMaterial(); + void sendControlPacket(SRT::ControlPacket::Ptr pkt, bool flush = true); + void sendDataPacket(SRT::DataPacket::Ptr pkt, char *buf, int len, bool flush = false); + void sendPacket(toolkit::Buffer::Ptr pkt, bool flush); + + void handleHandshake(uint8_t *buf, int len, struct sockaddr *addr); + void handleHandshakeInduction(SRT::HandshakePacket &pkt, struct sockaddr *addr); + void handleHandshakeConclusion(SRT::HandshakePacket &pkt, struct sockaddr *addr); + void handleACK(uint8_t *buf, int len, struct sockaddr *addr); + void handleACKACK(uint8_t *buf, int len, struct sockaddr *addr); + void handleNAK(uint8_t *buf, int len, struct sockaddr *addr); + void handleDropReq(uint8_t *buf, int len, struct sockaddr *addr); + void handleKeeplive(uint8_t *buf, int len, struct sockaddr *addr); + void handleShutDown(uint8_t *buf, int len, struct sockaddr *addr); + void handlePeerError(uint8_t *buf, int len, struct sockaddr *addr); + void handleCongestionWarning(uint8_t *buf, int len, struct sockaddr *addr); + void handleUserDefinedType(uint8_t *buf, int len, struct sockaddr *addr); + void handleDataPacket(uint8_t *buf, int len, struct sockaddr *addr); + void handleKeyMaterialReqPacket(uint8_t *buf, int len, struct sockaddr *addr); + void handleKeyMaterialRspPacket(uint8_t *buf, int len, struct sockaddr *addr); + + void checkAndSendAckNak(); + void createTimerForCheckAlive(); + + std::string generateStreamId(); + uint32_t generateSocketId(); + int32_t generateInitSeq(); + size_t getPayloadSize(); + + virtual std::string getPassphrase() = 0; + +protected: + SrtUrl _url; + toolkit::EventPoller::Ptr _poller; + + bool _is_handleshake_finished = false; + +private: + toolkit::Socket::Ptr _socket; + + TimePoint _now; + TimePoint _start_timestamp; + // for calculate rtt for delay + TimePoint _induction_ts; + + //the initial value of RTT is 100 milliseconds + //RTTVar is 50 milliseconds + uint32_t _rtt = 100 * 1000; + uint32_t _rtt_variance = 50 * 1000; + + //local + uint32_t _socket_id = 0; + uint32_t _init_seq_number = 0; + uint32_t _mtu = 1500; + uint32_t _max_flow_window_size = 8192; + uint16_t _delay = 120; + + //peer + uint32_t _sync_cookie = 0; + uint32_t _peer_socket_id; + + // for handshake + SRT::Timer::Ptr _handleshake_timer; + SRT::HandshakePacket::Ptr _handleshake_req; + + // for keeplive + SRT::Ticker _send_ticker; + SRT::Timer::Ptr _keeplive_timer; + + // for alive + SRT::Ticker _alive_ticker; + SRT::Timer::Ptr _alive_timer; + + // for recv + SRT::PacketQueueInterface::Ptr _recv_buf; + uint32_t _last_pkt_seq = 0; + + // Ack + SRT::UTicker _ack_ticker; + uint32_t _last_ack_pkt_seq = 0; + uint32_t _light_ack_pkt_count = 0; + uint32_t _ack_number_count = 0; + std::map _ack_send_timestamp; + // Full Ack + // Link Capacity and Receiving Rate Estimation + std::shared_ptr _pkt_recv_rate_context; + std::shared_ptr _estimated_link_capacity_context; + + // Nak + SRT::UTicker _nak_ticker; + + //for Send + SRT::PacketSendQueue::Ptr _send_buf; + SRT::ResourcePool _packet_pool; + uint32_t _send_packet_seq_number = 0; + uint32_t _send_msg_number = 1; + + //AckAck + uint32_t _last_recv_ackack_seq_num = 0; + + // for encryption + SRT::Crypto::Ptr _crypto; + SRT::Timer::Ptr _announce_timer; + SRT::KeyMaterialPacket::Ptr _announce_req; +}; + +} /* namespace mediakit */ +#endif /* ZLMEDIAKIT_SRTCALLER_H */ + diff --git a/src/Srt/SrtPlayer.cpp b/src/Srt/SrtPlayer.cpp new file mode 100644 index 00000000..ea20cd77 --- /dev/null +++ b/src/Srt/SrtPlayer.cpp @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). + * + * Use of this source code is governed by MIT-like license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "SrtPlayer.h" +#include "SrtPlayerImp.h" +#include "Common/config.h" +#include "Http/HlsPlayer.h" + +using namespace toolkit; +using namespace std; + +namespace mediakit { + + +SrtPlayer::SrtPlayer(const EventPoller::Ptr &poller) + : SrtCaller(poller) { + DebugL; +} + +SrtPlayer::~SrtPlayer(void) { + DebugL; +} + +void SrtPlayer::play(const string &strUrl) { + DebugL; + try { + _url.parse(strUrl); + } catch (std::exception &ex) { + onResult(SockException(Err_other, StrPrinter << "illegal srt url:" << ex.what())); + return; + } + onConnect(); + return; +} + +void SrtPlayer::teardown() { + SrtCaller::onResult(SockException(Err_other, StrPrinter << "teardown: " << _url._full_url)); +} + +void SrtPlayer::pause(bool bPause) { + DebugL; +} + +void SrtPlayer::speed(float speed) { + DebugL; +} + +void SrtPlayer::onHandShakeFinished() { + SrtCaller::onHandShakeFinished(); + onResult(SockException(Err_success, "srt play success")); +} + +void SrtPlayer::onResult(const SockException &ex) { + SrtCaller::onResult(ex); + + if (!ex) { + // 播放成功 + onPlayResult(ex); + _benchmark_mode = (*this)[Client::kBenchmarkMode].as(); + + // 播放成功,恢复数据包接收超时定时器 + _recv_ticker.resetTime(); + auto timeout = getTimeOutSec(); + //读取配置文件 + weak_ptr weakSelf = static_pointer_cast(shared_from_this()); + // 创建rtp数据接收超时检测定时器 + _check_timer = std::make_shared(timeout /2, + [weakSelf, timeout]() { + auto strongSelf = weakSelf.lock(); + if (!strongSelf) { + return false; + } + if (strongSelf->_recv_ticker.elapsedTime() > timeout * 1000) { + // 接收媒体数据包超时 + strongSelf->onResult(SockException(Err_timeout, "receive srt media data timeout:" + strongSelf->_url._full_url)); + return false; + } + + return true; + }, getPoller()); + } else { + WarnL << ex.getErrCode() << " " << ex.what(); + if (ex.getErrCode() == Err_shutdown) { + // 主动shutdown的,不触发回调 + return; + } + if (!_is_handleshake_finished) { + onPlayResult(ex); + } else { + onShutdown(ex); + } + } + return; +} + + +void SrtPlayer::onSRTData(SRT::DataPacket::Ptr pkt) { + _recv_ticker.resetTime(); +} + +uint16_t SrtPlayer::getLatency() { + auto latency = (*this)[Client::kLatency].as(); + return (uint16_t)latency ; +} + +float SrtPlayer::getTimeOutSec() { + auto timeoutMS = (*this)[Client::kTimeoutMS].as(); + return (float)timeoutMS / (float)1000; +} + +std::string SrtPlayer::getPassphrase() { + auto passPhrase = (*this)[Client::kPassPhrase].as(); + return passPhrase; +} + +/////////////////////////////////////////////////// +// SrtPlayerImp + +void SrtPlayerImp::onPlayResult(const toolkit::SockException &ex) { + if (ex) { + Super::onPlayResult(ex); + } + //success result only occur when addTrackCompleted + return; +} + +std::vector SrtPlayerImp::getTracks(bool ready /*= true*/) const { + return _demuxer ? static_pointer_cast(_demuxer)->getTracks(ready) : Super::getTracks(ready); +} + +void SrtPlayerImp::addTrackCompleted() { + Super::onPlayResult(toolkit::SockException(toolkit::Err_success, "play success")); +} + +void SrtPlayerImp::onSRTData(SRT::DataPacket::Ptr pkt) { + SrtPlayer::onSRTData(pkt); + + if (_benchmark_mode) { + return; + } + + auto strong_self = shared_from_this(); + if (!_demuxer) { + auto demuxer = std::make_shared(); + demuxer->start(getPoller(), this); + _demuxer = std::move(demuxer); + } + + if (!_decoder && _demuxer) { + _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, _demuxer.get()); + } + + if (_decoder && _demuxer) { + _decoder->input(reinterpret_cast(pkt->payloadData()), pkt->payloadSize()); + } + + return; +} + + +} /* namespace mediakit */ + diff --git a/src/Srt/SrtPlayer.h b/src/Srt/SrtPlayer.h new file mode 100644 index 00000000..23d206ca --- /dev/null +++ b/src/Srt/SrtPlayer.h @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). + * + * Use of this source code is governed by MIT-like license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_SRTPLAYER_H +#define ZLMEDIAKIT_SRTPLAYER_H + +#include "Network/Socket.h" +#include "Player/PlayerBase.h" +#include "Poller/Timer.h" +#include "Util/TimeTicker.h" +#include "srt/SrtTransport.hpp" +#include "Http/HttpRequester.h" +#include +#include +#include "SrtCaller.h" + +namespace mediakit { + + +// 实现了srt代理拉流功能 +class SrtPlayer + : public PlayerBase , public SrtCaller { +public: + using Ptr = std::shared_ptr; + + SrtPlayer(const toolkit::EventPoller::Ptr &poller); + ~SrtPlayer() override; + + //// PlayerBase override//// + void play(const std::string &strUrl) override; + void teardown() override; + void pause(bool pause) override; + void speed(float speed) override; + +protected: + + //// SrtCaller override//// + void onHandShakeFinished() override; + void onSRTData(SRT::DataPacket::Ptr pkt) override; + void onResult(const toolkit::SockException &ex) override; + + bool isPlayer() override {return true;} + + uint16_t getLatency() override; + float getTimeOutSec() override; + std::string getPassphrase() override; + +protected: + //是否为性能测试模式 + bool _benchmark_mode = false; + + //超时功能实现 + toolkit::Ticker _recv_ticker; + std::shared_ptr _check_timer; +}; + +} /* namespace mediakit */ +#endif /* ZLMEDIAKIT_SRTPLAYER_H */ diff --git a/src/Srt/SrtPlayerImp.h b/src/Srt/SrtPlayerImp.h new file mode 100644 index 00000000..0828fe5e --- /dev/null +++ b/src/Srt/SrtPlayerImp.h @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). + * + * Use of this source code is governed by MIT-like license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_SRtPLAYERIMP_H +#define ZLMEDIAKIT_SRtPLAYERIMP_H + +#include "SrtPlayer.h" + +namespace mediakit { + +class SrtPlayerImp + : public PlayerImp + , private TrackListener { +public: + using Ptr = std::shared_ptr; + using Super = PlayerImp; + + SrtPlayerImp(const toolkit::EventPoller::Ptr &poller) : Super(poller) {} + ~SrtPlayerImp() override { DebugL; } + +private: + //// SrtPlayer override//// + void onSRTData(SRT::DataPacket::Ptr pkt) override; + + //// PlayerBase override//// + void onPlayResult(const toolkit::SockException &ex) override; + std::vector getTracks(bool ready = true) const override; + +private: + //// TrackListener override//// + bool addTrack(const Track::Ptr &track) override { return true; } + void addTrackCompleted() override; + +private: + // for player + DecoderImp::Ptr _decoder; + MediaSinkInterface::Ptr _demuxer; + + // for pusher + TSMediaSource::RingType::RingReader::Ptr _ts_reader; +}; + +} /* namespace mediakit */ +#endif /* ZLMEDIAKIT_SRtPLAYERIMP_H */ diff --git a/src/Srt/SrtPusher.cpp b/src/Srt/SrtPusher.cpp new file mode 100644 index 00000000..73e2e501 --- /dev/null +++ b/src/Srt/SrtPusher.cpp @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). + * + * Use of this source code is governed by MIT-like license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "SrtPusher.h" +#include "Common/config.h" + +using namespace toolkit; +using namespace std; +namespace mediakit { + +SrtPusher::SrtPusher(const EventPoller::Ptr &poller, const TSMediaSource::Ptr &src) : SrtCaller(poller) { + _push_src = src; + DebugL; +} + +SrtPusher::~SrtPusher(void) { + DebugL; +} + +void SrtPusher::publish(const string &strUrl) { + DebugL; + try { + _url.parse(strUrl); + } catch (std::exception &ex) { + onResult(SockException(Err_other, StrPrinter << "illegal srt url:" << ex.what())); + return; + } + onConnect(); + return; +} + +void SrtPusher::teardown() { + SrtCaller::onResult(SockException(Err_other, StrPrinter << "teardown: " << _url._full_url)); +} + +void SrtPusher::onHandShakeFinished() { + SrtCaller::onHandShakeFinished(); + onResult(SockException(Err_success, "srt push success")); + doPublish(); +} + +void SrtPusher::onResult(const SockException &ex) { + SrtCaller::onResult(ex); + + if (!ex) { + onPublishResult(ex); + } else { + WarnL << ex.getErrCode() << " " << ex.what(); + if (ex.getErrCode() == Err_shutdown) { + // 主动shutdown的,不触发回调 + return; + } + if (!_is_handleshake_finished) { + onPublishResult(ex); + } else { + onShutdown(ex); + } + } + return; +} + +uint16_t SrtPusher::getLatency() { + auto latency = (*this)[Client::kLatency].as(); + return (uint16_t)latency ; +} + +float SrtPusher::getTimeOutSec() { + auto timeoutMS = (*this)[Client::kTimeoutMS].as(); + return (float)timeoutMS / (float)1000; +} + +std::string SrtPusher::getPassphrase() { + auto passPhrase = (*this)[Client::kPassPhrase].as(); + return passPhrase; +} + +void SrtPusher::doPublish() { + auto src = _push_src.lock(); + if (!src) { + onResult(SockException(Err_eof, "the media source was released")); + return; + } + // 异步查找直播流 + std::weak_ptr weak_self = static_pointer_cast(shared_from_this()); + _ts_reader = src->getRing()->attach(getPoller()); + _ts_reader->setDetachCB([weak_self]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { + // 本对象已经销毁 + return; + } + strong_self->onShutdown(SockException(Err_shutdown)); + }); + _ts_reader->setReadCB([weak_self](const TSMediaSource::RingDataType &ts_list) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + // 本对象已经销毁 + return; + } + size_t i = 0; + auto size = ts_list->size(); + ts_list->for_each([&](const TSPacket::Ptr &ts) { + strong_self->onSendTSData(ts, ++i == size); + }); + }); +} + +} /* namespace mediakit */ + diff --git a/src/Srt/SrtPusher.h b/src/Srt/SrtPusher.h new file mode 100644 index 00000000..727b59f3 --- /dev/null +++ b/src/Srt/SrtPusher.h @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2016-present The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/ZLMediaKit/ZLMediaKit). + * + * Use of this source code is governed by MIT-like license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_SRTPUSHER_H +#define ZLMEDIAKIT_SRTPUSHER_H + +#include "Network/Socket.h" +#include "Pusher/PusherBase.h" +#include "Poller/Timer.h" +#include "Util/TimeTicker.h" +#include "srt/SrtTransport.hpp" +#include "Http/HttpRequester.h" +#include +#include +#include "SrtCaller.h" + +namespace mediakit { + +// 实现了srt代理推流功能 +class SrtPusher + : public PusherBase , public SrtCaller { +public: + using Ptr = std::shared_ptr; + + SrtPusher(const toolkit::EventPoller::Ptr &poller,const TSMediaSource::Ptr &src); + ~SrtPusher() override; + + //// PusherBase override//// + void publish(const std::string &url) override; + void teardown() override; + + void doPublish(); +protected: + + //// SrtCaller override//// + void onHandShakeFinished() override; + void onResult(const toolkit::SockException &ex) override; + + bool isPlayer() override {return false;} + uint16_t getLatency() override; + float getTimeOutSec() override; + std::string getPassphrase() override; + +protected: + std::weak_ptr _push_src; + TSMediaSource::RingType::RingReader::Ptr _ts_reader; +}; + +using SrtPusherImp = PusherImp; + +} /* namespace mediakit */ +#endif /* ZLMEDIAKIT_SRTPUSHER_H */ diff --git a/srt/Common.hpp b/srt/Common.hpp index ba7dc3ff..c190f53d 100644 --- a/srt/Common.hpp +++ b/srt/Common.hpp @@ -28,6 +28,10 @@ static inline int64_t DurationCountMicroseconds(SteadyClock::duration dur) { return std::chrono::duration_cast(dur).count(); } +static inline uint32_t DurationCountSeconds(SteadyClock::duration dur) { + return std::chrono::duration_cast(dur).count(); +} + static inline uint32_t loadUint32(uint8_t *ptr) { return ptr[0] << 24 | ptr[1] << 16 | ptr[2] << 8 | ptr[3]; } @@ -113,4 +117,4 @@ private: } // namespace SRT -#endif // ZLMEDIAKIT_SRT_COMMON_H \ No newline at end of file +#endif // ZLMEDIAKIT_SRT_COMMON_H diff --git a/srt/Crypto.cpp b/srt/Crypto.cpp new file mode 100644 index 00000000..40c29f8f --- /dev/null +++ b/srt/Crypto.cpp @@ -0,0 +1,507 @@ +#include +#include "Util/MD5.h" +#include "Util/logger.h" + +#include "Crypto.hpp" + +#if defined(ENABLE_OPENSSL) +#include "openssl/evp.h" +#endif + +using namespace toolkit; +using namespace std; +using namespace SRT; + +namespace SRT { + +#if defined(ENABLE_OPENSSL) +inline const EVP_CIPHER* aes_key_len_mapping_wrap_cipher(int key_len) { + switch (key_len) { + case 192/8: return EVP_aes_192_wrap(); + case 256/8: return EVP_aes_256_wrap(); + case 128/8: + default: + return EVP_aes_128_wrap(); + } +} + +inline const EVP_CIPHER* aes_key_len_mapping_ctr_cipher(int key_len) { + switch (key_len) { + case 192/8: return EVP_aes_192_ctr(); + case 256/8: return EVP_aes_256_ctr(); + case 128/8: + default: + return EVP_aes_128_ctr(); + } +} +#endif + +/** + * @brief: aes_wrap + * @param [in]: in 待warp的数据 + * @param [in]: in_len 待warp的数据长度 + * @param [out]: out warp后输出的数据 + * @param [out]: outLen 加密后输出的数据长度 + * @param [in]: key 密钥 + * @param [in]: key_len 密钥长度 + * @return : true: 成功,false: 失败 +**/ +static bool aes_wrap(const uint8_t* in, int in_len, uint8_t* out, int* outLen, uint8_t* key, int key_len) { + +#if defined(ENABLE_OPENSSL) + EVP_CIPHER_CTX* ctx = NULL; + + *outLen = 0; + + do { + if (!(ctx = EVP_CIPHER_CTX_new())) { + WarnL << "EVP_CIPHER_CTX_new fail"; + break; + } + EVP_CIPHER_CTX_set_flags(ctx, EVP_CIPHER_CTX_FLAG_WRAP_ALLOW); + + if (1 != EVP_EncryptInit_ex(ctx, aes_key_len_mapping_wrap_cipher(key_len), NULL, key, NULL)) { + WarnL << "EVP_EncryptInit_ex fail"; + break; + } + + int len1 = 0; + if (1 != EVP_EncryptUpdate(ctx, (uint8_t*)out, &len1, (uint8_t*)in, in_len)) { + WarnL << "EVP_EncryptUpdate fail"; + break; + } + + int len2 = 0; + if (1 != EVP_EncryptFinal_ex(ctx, (uint8_t*)out + len1, &len2)) { + WarnL << "EVP_EncryptFinal_ex fail"; + break; + } + + *outLen = len1 + len2; + } while (0); + + if (ctx != NULL) { + EVP_CIPHER_CTX_free(ctx); + } + + return *outLen != 0; +#else + return false; +#endif +} + +/** + * @brief: aes_unwrap + * @param [in]: in 待unwrap的数据 + * @param [in]: in_len 待unwrap的数据长度 + * @param [out]: out unwrap后输出的数据 + * @param [out]: outLen unwrap后输出的数据长度 + * @param [in]: key 密钥 + * @param [in]: key_len 密钥长度 + * @return : true: 成功,false: 失败 +**/ +static bool aes_unwrap(const uint8_t* in, int in_len, uint8_t* out, int* outLen, uint8_t* key, int key_len) { + +#if defined(ENABLE_OPENSSL) + EVP_CIPHER_CTX* ctx = NULL; + + *outLen = 0; + + do { + + if (!(ctx = EVP_CIPHER_CTX_new())) { + WarnL << "EVP_CIPHER_CTX_new fail"; + break; + } + EVP_CIPHER_CTX_set_flags(ctx, EVP_CIPHER_CTX_FLAG_WRAP_ALLOW); + + if (1 != EVP_DecryptInit_ex(ctx, aes_key_len_mapping_wrap_cipher(key_len), NULL, key, NULL)) { + WarnL << "EVP_DecryptInit_ex fail"; + break; + } + + //设置pkcs7padding + if (1 != EVP_CIPHER_CTX_set_padding(ctx, 1)) { + WarnL << "EVP_CIPHER_CTX_set_padding fail"; + break; + } + + int len1 = 0; + if (1 != EVP_DecryptUpdate(ctx, (uint8_t*)out, &len1, (uint8_t*)in, in_len)) { + WarnL << "EVP_DecryptUpdate fail"; + break; + } + + int len2 = 0; + if (1 != EVP_DecryptFinal_ex(ctx, (uint8_t*)out + len1, &len2)) { + WarnL << "EVP_DecryptFinal_ex fail"; + break; + } + + *outLen = len1 + len2; + } while (0); + + if (ctx != NULL) { + EVP_CIPHER_CTX_free(ctx); + } + + return *outLen != 0; + +#else + return false; +#endif +} + +/** + * @brief: aes ctr 加密 + * @param [in]: in 待加密的数据 + * @param [in]: in_len 待加密的数据长度 + * @param [out]: out 加密后输出的数据 + * @param [out]: outLen 加密后输出的数据长度 + * @param [in]: key 密钥 + * @param [in]: key_len 密钥长度 + * @param [in]: iv iv向量(16byte) + * @return : true: 成功,false: 失败 +**/ +static bool aes_ctr_encrypt(const uint8_t* in, int in_len, uint8_t* out, int* outLen, uint8_t* key, int key_len, uint8_t* iv) { + +#if defined(ENABLE_OPENSSL) + EVP_CIPHER_CTX* ctx = NULL; + + *outLen = 0; + + do { + if (!(ctx = EVP_CIPHER_CTX_new())) { + WarnL << "EVP_CIPHER_CTX_new fail"; + break; + } + + if (1 != EVP_EncryptInit_ex(ctx, aes_key_len_mapping_ctr_cipher(key_len), NULL, key, iv)) { + WarnL << "EVP_EncryptInit_ex fail"; + break; + } + + int len1 = 0; + if (1 != EVP_EncryptUpdate(ctx, (uint8_t*)out, &len1, (uint8_t*)in, in_len)) { + WarnL << "EVP_EncryptUpdate fail"; + break; + } + + int len2 = 0; + if (1 != EVP_EncryptFinal_ex(ctx, (uint8_t*)out + len1, &len2)) { + WarnL << "EVP_EncryptFinal_ex fail"; + break; + } + + *outLen = len1 + len2; + } while (0); + + if (ctx != NULL) { + EVP_CIPHER_CTX_free(ctx); + } + + return *outLen != 0; +#else + return false; +#endif +} + + +/** + * @brief: aes ctr 解密 + * @param [in]: in 待解密的数据 + * @param [in]: in_len 待解密的数据长度 + * @param [out]: out 解密后输出的数据 + * @param [out]: outLen 解密后输出的数据长度 + * @param [in]: key 密钥 + * @param [in]: key_len 密钥长度 + * @param [in]: iv iv向量(16byte) + * @return : true: 成功,false: 失败 +**/ +static bool aes_ctr_decrypt(const uint8_t* in, int in_len, uint8_t* out, int* outLen, uint8_t* key, int key_len, uint8_t* iv) { + +#if defined(ENABLE_OPENSSL) + EVP_CIPHER_CTX* ctx = NULL; + + *outLen = 0; + + do { + + if (!(ctx = EVP_CIPHER_CTX_new())) { + WarnL << "EVP_CIPHER_CTX_new fail"; + break; + } + + if (1 != EVP_DecryptInit_ex(ctx, aes_key_len_mapping_ctr_cipher(key_len), NULL, key, iv)) { + WarnL << "EVP_DecryptInit_ex fail"; + break; + } + + int len1 = 0; + if (1 != EVP_DecryptUpdate(ctx, (uint8_t*)out, &len1, (uint8_t*)in, in_len)) { + WarnL << "EVP_DecryptUpdate fail"; + break; + } + + int len2 = 0; + if (1 != EVP_DecryptFinal_ex(ctx, (uint8_t*)out + len1, &len2)) { + WarnL << "EVP_DecryptFinal_ex fail"; + break; + } + + *outLen = len1 + len2; + } while (0); + + if (ctx != NULL) { + EVP_CIPHER_CTX_free(ctx); + } + + return *outLen != 0; + +#else + return false; +#endif +} + + +/////////////////////////////////////////////////// +// CryptoContext +CryptoContext::CryptoContext(const std::string& passparase, uint8_t kk, KeyMaterial::Ptr packet) : + _passparase(passparase), _kk(kk) { + if (packet) { + loadFromKeyMaterial(packet); + } else { + refresh(); + } +} + +void CryptoContext::refresh() { + if (_salt.empty()) { + _salt = makeRandStr(_slen, false); + generateKEK(); + } + + _sek = makeRandStr(_klen, false); + return; +} + +std::string CryptoContext::generateWarppedKey() { + string warpped_key; + int size = (_sek.size() + 15) /16 * 16 + 8; + warpped_key.resize(size); + auto res = aes_wrap((uint8_t*)_sek.data(), _sek.size(), (uint8_t*)warpped_key.data(), &size, (uint8_t*)_kek.data(), _kek.size()); + if (!res) { + return ""; + } + warpped_key.resize(size); + return warpped_key; +} + +void CryptoContext::loadFromKeyMaterial(KeyMaterial::Ptr packet) { + + _slen = packet->_slen; + _klen = packet->_klen; + _salt = packet->_salt; + + generateKEK(); + + auto warpped_key = packet->_warpped_key; + BufferLikeString sek; + int size = warpped_key.size(); + sek.resize(size); + auto ret = aes_unwrap((uint8_t*)warpped_key.data(), warpped_key.size(), (uint8_t*)sek.data(), &size, (uint8_t*)_kek.data(), _kek.size()); + if (!ret) { + throw std::runtime_error(StrPrinter <<"warpped_key unwrap fail, password may mismatch"); + } + + sek.resize(size); + if (packet->_kk == KeyMaterial::KEY_BASED_ENCRYPTION_BOTH_SEK) { + if (_kk == KeyMaterial::KEY_BASED_ENCRYPTION_EVEN_SEK) { + _sek = sek.substr(0, _slen); + } else { + _sek = sek.substr(_slen, _slen); + } + } else { + _sek = sek; + } + return; +} + +bool CryptoContext::generateKEK() { + /** + SEK = PRNG(KLen) + Salt = PRNG(128) + KEK = PBKDF2(passphrase, LSB(64,Salt), Iter, KLen) + **/ + _kek.resize(_klen); +#if defined(ENABLE_OPENSSL) + if (PKCS5_PBKDF2_HMAC(_passparase.data(), _passparase.length(), (uint8_t*)_salt.data() + _slen - 64/8, 64 /8, _iter, EVP_sha1(), _klen, (uint8_t*)_kek.data()) != 1) { + return false; + } + return true; +#else + return false; +#endif +} + +BufferLikeString::Ptr CryptoContext::generateIv(uint32_t pkt_seq_no) { + auto iv = std::make_shared(); + iv->resize(128 /8); + + uint8_t* saltData = (uint8_t*)_salt.data(); + uint8_t* ivData = (uint8_t*)iv->data(); + memset((void*)ivData, 0, iv->size()); + memcpy((void*)(ivData + 10), (void*)&pkt_seq_no, 4); + for (size_t i = 0; i < std::min(_salt.size(), (size_t)112 /8); ++i) { + ivData[i] ^= saltData[i]; + } + return iv; +} + +/////////////////////////////////////////////////// +// AesCtrCryptoContext + +AesCtrCryptoContext::AesCtrCryptoContext(const std::string& passparase, uint8_t kk, KeyMaterial::Ptr packet) : + CryptoContext(passparase, kk, packet) { +} + +BufferLikeString::Ptr AesCtrCryptoContext::encrypt(uint32_t pkt_seq_no, const char *buf, int len) { + auto iv = generateIv(htonl(pkt_seq_no)); + auto payload = std::make_shared(); + int size = (len + 15) /16 * 16 + 8; + payload->resize(size); + auto ret = aes_ctr_encrypt((const uint8_t*)buf, len, (uint8_t*)payload->data(), &size, (uint8_t*)_sek.data(), _sek.size(), (uint8_t*)iv->data()); + if (!ret) { + return nullptr; + } + payload->resize(size); + return payload; +} + +BufferLikeString::Ptr AesCtrCryptoContext::decrypt(uint32_t pkt_seq_no, const char *buf, int len) { + auto iv = generateIv(htonl(pkt_seq_no)); + auto payload = std::make_shared(); + int size = len; + payload->resize(size); + auto ret = aes_ctr_decrypt((const uint8_t*)buf, len, (uint8_t*)payload->data(), &size, (uint8_t*)_sek.data(), _sek.size(), (uint8_t*)iv->data()); + if (!ret) { + return nullptr; + } + payload->resize(size); + return payload; +} + +/////////////////////////////////////////////////// +// Crypto + +Crypto::Crypto(const std::string& passparase) : + _passparase(passparase) { + +#ifndef ENABLE_OPENSSL + throw std::invalid_argument("openssl disable, please set ENABLE_OPENSSL when compile"); +#endif + + _ctx_pair[0] = createCtx(KeyMaterial::CIPHER_AES_CTR, _passparase, KeyMaterial::KEY_BASED_ENCRYPTION_EVEN_SEK); + _ctx_pair[1] = createCtx(KeyMaterial::CIPHER_AES_CTR, _passparase, KeyMaterial::KEY_BASED_ENCRYPTION_ODD_SEK); + _ctx_idx = 0; +} + +CryptoContext::Ptr Crypto::createCtx(int cipher, const std::string& passparase, uint8_t kk, KeyMaterial::Ptr packet) { + switch (cipher){ + case KeyMaterial::CIPHER_AES_CTR: + return std::make_shared(passparase, kk, packet); + case KeyMaterial::CIPHER_AES_ECB: + case KeyMaterial::CIPHER_AES_CBC: + case KeyMaterial::CIPHER_AES_GCM: + default: + throw std::runtime_error(StrPrinter <<"not support cipher " << cipher); + } +} + +HSExtKeyMaterial::Ptr Crypto::generateKeyMaterialExt(uint16_t extension_type) { + HSExtKeyMaterial::Ptr ext = std::make_shared(); + ext->extension_type = extension_type; + ext->_kk = _ctx_pair[_ctx_idx]->_kk; + ext->_cipher = _ctx_pair[_ctx_idx]->getCipher(); + ext->_slen = _ctx_pair[_ctx_idx]->_slen; + ext->_klen = _ctx_pair[_ctx_idx]->_klen; + ext->_salt = _ctx_pair[_ctx_idx]->_salt; + ext->_warpped_key = _ctx_pair[_ctx_idx]->generateWarppedKey(); + return ext; +} + +KeyMaterialPacket::Ptr Crypto::generateAnnouncePacket(CryptoContext::Ptr ctx) { + KeyMaterialPacket::Ptr pkt = std::make_shared(); + pkt->sub_type = HSExt::SRT_CMD_KMREQ; + pkt->_kk = ctx->_kk; + pkt->_cipher = ctx->getCipher(); + pkt->_slen = ctx->_slen; + pkt->_klen = ctx->_klen; + pkt->_salt = ctx->_salt; + pkt->_warpped_key = ctx->generateWarppedKey(); + return pkt; +} + +KeyMaterialPacket::Ptr Crypto::takeAwayAnnouncePacket() { + auto pkt = _re_announce_pkt; + _re_announce_pkt = nullptr; + return pkt; +} + +bool Crypto::loadFromKeyMaterial(KeyMaterial::Ptr packet) { + try { + if (packet->_kk == KeyMaterial::KEY_BASED_ENCRYPTION_EVEN_SEK) { + _ctx_pair[0] = createCtx(packet->_cipher, _passparase, packet->_kk, packet); + } else if (packet->_kk == KeyMaterial::KEY_BASED_ENCRYPTION_ODD_SEK) { + _ctx_pair[1] = createCtx(packet->_cipher, _passparase, packet->_kk, packet); + } else if (packet->_kk == KeyMaterial::KEY_BASED_ENCRYPTION_BOTH_SEK) { + _ctx_pair[0] = createCtx(packet->_cipher, _passparase, KeyMaterial::KEY_BASED_ENCRYPTION_EVEN_SEK, packet); + _ctx_pair[1] = createCtx(packet->_cipher, _passparase, KeyMaterial::KEY_BASED_ENCRYPTION_ODD_SEK, packet); + } + } catch (std::exception &ex) { + WarnL << ex.what(); + return false; + } + return true; +} + +BufferLikeString::Ptr Crypto::encrypt(DataPacket::Ptr pkt, const char *buf, int len) { + _pkt_count++; + + //refresh + if (_pkt_count == _re_announcement_period) { + auto ctx = createCtx(KeyMaterial::CIPHER_AES_CTR, _passparase, _ctx_pair[!_ctx_idx]->_kk); + _ctx_pair[!_ctx_idx] = ctx; + _re_announce_pkt = generateAnnouncePacket(ctx); + } + + if (_pkt_count > _refresh_period) { + _pkt_count = 0; + _ctx_idx = !_ctx_idx; + } + + pkt->KK = _ctx_pair[_ctx_idx]->_kk; + return _ctx_pair[_ctx_idx]->encrypt(pkt->packet_seq_number, buf, len); +} + +BufferLikeString::Ptr Crypto::decrypt(DataPacket::Ptr pkt, const char *buf, int len) { + CryptoContext::Ptr _ctx; + if (pkt->KK == KeyMaterial::KEY_BASED_ENCRYPTION_NO_SEK) { + auto payload = std::make_shared(); + payload->assign(buf, len); + return payload; + } else if (pkt->KK == KeyMaterial::KEY_BASED_ENCRYPTION_EVEN_SEK) { + _ctx = _ctx_pair[0]; + } else if (pkt->KK == KeyMaterial::KEY_BASED_ENCRYPTION_ODD_SEK) { + _ctx = _ctx_pair[1]; + } + + if (!_ctx) { + WarnL << "not has effective KeyMaterial with kk: " << pkt->KK; + return nullptr; + } + + return _ctx->decrypt(pkt->packet_seq_number, buf, len); +} + +} // namespace SRT diff --git a/srt/Crypto.hpp b/srt/Crypto.hpp new file mode 100644 index 00000000..02931ec1 --- /dev/null +++ b/srt/Crypto.hpp @@ -0,0 +1,102 @@ +#ifndef ZLMEDIAKIT_SRT_CRYPTO_H +#define ZLMEDIAKIT_SRT_CRYPTO_H +#include +#include + +#include "Network/Buffer.h" +#include "Network/sockutil.h" +#include "Util/logger.h" + +#include "Common.hpp" +#include "HSExt.hpp" +#include "Packet.hpp" + +namespace SRT { + +class CryptoContext : public std::enable_shared_from_this { +public: + using Ptr = std::shared_ptr; + CryptoContext(const std::string& passparase, uint8_t kk, KeyMaterial::Ptr packet = nullptr); + virtual ~CryptoContext() = default; + + virtual void refresh(); + virtual std::string generateWarppedKey(); + + virtual BufferLikeString::Ptr encrypt(uint32_t pkt_seq_no, const char *buf, int len) = 0; + virtual BufferLikeString::Ptr decrypt(uint32_t pkt_seq_no, const char *buf, int len) = 0; + virtual uint8_t getCipher() const = 0; + +protected: + virtual void loadFromKeyMaterial(KeyMaterial::Ptr packet); + virtual bool generateKEK(); + BufferLikeString::Ptr generateIv(uint32_t pkt_seq_no); + +private: + +public: + std::string _passparase; + + uint8_t _kk = SRT::KeyMaterial::KEY_BASED_ENCRYPTION_EVEN_SEK; + + BufferLikeString _kek; + const uint32_t _iter = 2048; + + size_t _slen = 16; + BufferLikeString _salt; + + size_t _klen = 16; + BufferLikeString _sek; +}; + +class AesCtrCryptoContext : public CryptoContext { +public: + using Ptr = std::shared_ptr; + AesCtrCryptoContext(const std::string& passparase, uint8_t kk, KeyMaterial::Ptr packet = nullptr); + virtual ~AesCtrCryptoContext() = default; + + uint8_t getCipher() const override { + return KeyMaterial::CIPHER_AES_CTR; + } + + BufferLikeString::Ptr encrypt(uint32_t pkt_seq_no, const char *buf, int len) override; + BufferLikeString::Ptr decrypt(uint32_t pkt_seq_no, const char *buf, int len) override; + +}; + + +class Crypto : public std::enable_shared_from_this{ +public: + using Ptr = std::shared_ptr; + Crypto(const std::string& passparase); + virtual ~Crypto() = default; + + HSExtKeyMaterial::Ptr generateKeyMaterialExt(uint16_t extension_type); + KeyMaterialPacket::Ptr takeAwayAnnouncePacket(); + + bool loadFromKeyMaterial(KeyMaterial::Ptr packet); + + // for encryption + std::string _passparase; + + //The recommended KM Refresh Period is after 2^25 packets encrypted with the same SEK are sent. + const uint32_t _refresh_period = 1 <<25; + const uint32_t _re_announcement_period = (1 <<25) - 4000; + + uint32_t _pkt_count = 0; + KeyMaterialPacket::Ptr _re_announce_pkt; + + CryptoContext::Ptr _ctx_pair[2]; /* Even(0)/Odd(1) crypto contexts */ + uint32_t _ctx_idx = 0; + + BufferLikeString::Ptr encrypt(DataPacket::Ptr pkt, const char *buf, int len); + BufferLikeString::Ptr decrypt(DataPacket::Ptr pkt, const char *buf, int len); + +private: + + CryptoContext::Ptr createCtx(int cipher, const std::string& passparase, uint8_t kk, KeyMaterial::Ptr packet = nullptr); + KeyMaterialPacket::Ptr generateAnnouncePacket(CryptoContext::Ptr ctx); +}; + +} // namespace SRT + +#endif // ZLMEDIAKIT_SRT_CRYPTO_H diff --git a/srt/HSExt.cpp b/srt/HSExt.cpp index d12b2b3c..45be1a12 100644 --- a/srt/HSExt.cpp +++ b/srt/HSExt.cpp @@ -131,4 +131,162 @@ std::string HSExtStreamID::dump() { return std::move(printer); } -} // namespace SRT \ No newline at end of file +size_t KeyMaterial::getContentSize() { + size_t variable_width = _slen + _warpped_key.size(); + size_t content_size = variable_width + 16; + return content_size; +} + +bool KeyMaterial::loadFromData(uint8_t *buf, size_t len) { + if (buf == NULL || len < 16) { + return false; + } + uint8_t *ptr = (uint8_t *)buf; + + _km_version = (*ptr & 0x70) >> 4; + _pt = *ptr & 0x0f; + ptr += 1; + + _sign = loadUint16(ptr); + ptr += 2; + + _kk = *ptr & 0x03; + auto sek_num = 1; + if (_kk == KEY_BASED_ENCRYPTION_BOTH_SEK) { + sek_num = 2; + } + ptr += 1; + + _keki = loadUint32(ptr); + ptr += 4; + + _cipher = *ptr; + ptr += 1; + + _auth = *ptr; + ptr += 1; + + _se = *ptr; + ptr += 1; + + //Resv2 + ptr += 1; + //Resv3 + ptr += 2; + + _slen = *ptr *4; + ptr += 1; + + _klen = *ptr *4; + ptr += 1; + + size_t wrapped_key_len = 8 + sek_num * _klen; + size_t variable_width = _slen + wrapped_key_len; + if (len < variable_width + 16) { + return false; + } + + _salt.assign((const char*)ptr, (size_t)_slen); + ptr += _slen; + + _warpped_key.assign((const char*)ptr, (size_t)wrapped_key_len); + + return true; +} + +bool KeyMaterial::storeToData(uint8_t *buf, size_t len) { + auto content_size = getContentSize(); + if (len < content_size) { + return false; + } + + uint8_t *ptr = (uint8_t *)buf; + memset(ptr, 0, len); + + *ptr = ((_km_version << 4)& 0x70) | (_pt & 0x0f); + ptr += 1; + + storeUint16(ptr, _sign); + ptr += 2; + + *ptr = _kk & 0x03; + ptr += 1; + + storeUint32(ptr, _keki); + ptr += 4; + + *ptr = _cipher; + ptr += 1; + + *ptr = _auth; + ptr += 1; + + *ptr = _se; + ptr += 1; + + *ptr = 0; //Resv2 + ptr += 1; + + storeUint16(ptr, 0);//Resv3 + ptr += 2; + + *ptr = (uint8_t)(_slen/4); + ptr += 1; + + *ptr = (uint8_t)(_klen/4); + ptr += 1; + + const char *src = _salt.data(); + for (size_t i = 0; i < _salt.size(); ptr++, src++, i++) { + *ptr = *src; + } + + src = _warpped_key.data(); + for (size_t i = 0; i < _warpped_key.size(); ptr++, src++, i++) { + *ptr = *src; + } + return true; +} + +std::string KeyMaterial::dump() { + _StrPrinter printer; + printer << "kmVersion: " << _km_version + << " pt : " << _pt + << " sign : " << std::hex << _sign + << " kk : " << _kk + << " keki : " << _keki + << " cipher : " << _cipher + << " auth : " << _auth + << " se : " << _se + << " sLen : " << _slen + << " salt : " << std::hex << _salt.data() + << " kLen : " << _klen; + return std::move(printer); +} + +bool HSExtKeyMaterial::loadFromData(uint8_t *buf, size_t len) { + if (buf == NULL || len < 4) { + return false; + } + HSExt::_data = BufferRaw::create(); + HSExt::_data->assign((char *)buf, len); + HSExt::loadHeader(); + assert(extension_type == SRT_CMD_KMREQ || extension_type == SRT_CMD_KMRSP); + return KeyMaterial::loadFromData(buf +4, len -4); +} + +bool HSExtKeyMaterial::storeToData() { + size_t content_size = ((KeyMaterial::getContentSize() + 4) + 3) / 4 * 4; + HSExt::_data = BufferRaw::create(); + HSExt::_data->setCapacity(content_size); + HSExt::_data->setSize(content_size); + extension_length = (content_size - 4) / 4; + HSExt::storeHeader(); + return KeyMaterial::storeToData((uint8_t*)_data->data() + 4, content_size - 4); +} + +std::string HSExtKeyMaterial::dump() { + return KeyMaterial::dump(); +} + +} // namespace SRT diff --git a/srt/HSExt.hpp b/srt/HSExt.hpp index 55eba100..b28f29cd 100644 --- a/srt/HSExt.hpp +++ b/srt/HSExt.hpp @@ -125,5 +125,118 @@ public: std::string dump() override; std::string streamid; }; + +/* + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +|S| V | PT | Sign | Resv1 | KK| ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| KEKI | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Cipher | Auth | SE | Resv2 | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Resv3 | SLen/4 | KLen/4 | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Salt | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | ++ Wrapped Key + +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +Figure 11: Key Material Message structure +https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-key-material +*/ +class KeyMaterial { +public: + using Ptr = std::shared_ptr; + KeyMaterial() = default; + virtual ~KeyMaterial() = default; + bool loadFromData(uint8_t *buf, size_t len); + bool storeToData(uint8_t *buf, size_t len); + std::string dump(); + +protected: + size_t getContentSize(); + +public: + + enum { + PACKET_TYPE_RESERVED = 0b0000, + PACKET_TYPE_MSMSG = 0b0001, // 1-Media Strem Message + PACKET_TYPE_KMMSG = 0b0010, // 2-Keying Material Message + PACKET_TYPE_MPEG_TS = 0b0111, // 7-MPEG-TS packet + }; + + enum { + KEY_BASED_ENCRYPTION_NO_SEK = 0b00, + KEY_BASED_ENCRYPTION_EVEN_SEK = 0b01, + KEY_BASED_ENCRYPTION_ODD_SEK = 0b10, + KEY_BASED_ENCRYPTION_BOTH_SEK = 0b11, + }; + + enum { + CIPHER_NONE = 0x00, + CIPHER_AES_ECB = 0x01, //reserved, not support + CIPHER_AES_CTR = 0x02, + CIPHER_AES_CBC = 0x03, //reserved, not support + CIPHER_AES_GCM = 0x04 + }; + + enum { + AUTHENTICATION_NONE = 0x00, + AUTH_AES_GCM = 0x01, + }; + + enum { + STREAM_ENCAPSUALTION_UNSPECIFIED = 0x00, + STREAM_ENCAPSUALTION_MPEG_TS_UDP = 0x01, + STREAM_ENCAPSUALTION_MPEG_TS_SRT = 0x02, + }; + + uint8_t _km_version = 0b001; + uint8_t _pt = PACKET_TYPE_KMMSG; + uint16_t _sign = 0x2029; + uint8_t _kk = KEY_BASED_ENCRYPTION_EVEN_SEK; + uint32_t _keki = 0; + uint8_t _cipher = CIPHER_AES_CTR; + uint8_t _auth = AUTHENTICATION_NONE; + uint8_t _se = STREAM_ENCAPSUALTION_MPEG_TS_SRT; + uint16_t _slen = 16; + uint16_t _klen = 16; + BufferLikeString _salt; + BufferLikeString _warpped_key; +}; + + +class HSExtKeyMaterial : public HSExt, public KeyMaterial { +public: + using Ptr = std::shared_ptr; + HSExtKeyMaterial() = default; + virtual ~HSExtKeyMaterial() = default; + bool loadFromData(uint8_t *buf, size_t len) override; + bool storeToData() override; + std::string dump() override; +}; + +/* + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| KM State | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +Figure 7: KM Response Error +https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-key-material-extension-mess +*/ +class HSExtKMResponseError : public HSExt { +public: + using Ptr = std::shared_ptr; + HSExtKMResponseError() = default; + ~HSExtKMResponseError() = default; + bool loadFromData(uint8_t *buf, size_t len) override; + bool storeToData() override; + std::string dump() override; +}; + } // namespace SRT -#endif // ZLMEDIAKIT_SRT_HS_EXT_H \ No newline at end of file +#endif // ZLMEDIAKIT_SRT_HS_EXT_H diff --git a/srt/Packet.cpp b/srt/Packet.cpp index a675f58a..edfbe36d 100644 --- a/srt/Packet.cpp +++ b/srt/Packet.cpp @@ -55,6 +55,13 @@ bool DataPacket::loadFromData(uint8_t *buf, size_t len) { return true; } +bool DataPacket::reloadPayload(uint8_t *buf, size_t len) { + _data->setCapacity(len + HEADER_SIZE); + _data->setSize(len + HEADER_SIZE); + memcpy(_data->data() + HEADER_SIZE, buf, len); + return true; +} + bool DataPacket::storeToHeader() { if (!_data || _data->size() < HEADER_SIZE) { WarnL << "data size less " << HEADER_SIZE; @@ -162,6 +169,12 @@ uint16_t ControlPacket::getControlType(uint8_t *buf, size_t len) { return control_type; } +uint16_t ControlPacket::getSubType(uint8_t *buf, size_t len) { + uint8_t *ptr = buf; + uint16_t subtype = loadUint16(ptr + 2); + return subtype; +} + bool ControlPacket::loadHeader() { uint8_t *ptr = (uint8_t *)_data->data(); f = ptr[0] >> 7; @@ -225,6 +238,20 @@ size_t ControlPacket::size() const { uint32_t ControlPacket::getSocketID(uint8_t *buf, size_t len) { return loadUint32(buf + 12); } + +#define XX(name, value, str) {str, name}, +std::map reject_map = {REJ_MAP(XX)}; +#undef XX + +std::string getRejectReason(SRT_REJECT_REASON code) { + switch (code) { +#define XX(name, value, str) case name : return str; + REJ_MAP(XX) +#undef XX + default : return "invalid"; + } +} + std::string HandshakePacket::dump(){ _StrPrinter printer; printer <<"flag:"<< (int)f<<"\r\n"; @@ -324,6 +351,9 @@ bool HandshakePacket::loadExtMessage(uint8_t *buf, size_t len) { case HSExt::SRT_CMD_HSREQ: case HSExt::SRT_CMD_HSRSP: ext = std::make_shared(); break; case HSExt::SRT_CMD_SID: ext = std::make_shared(); break; + case HSExt::SRT_CMD_KMREQ: + case HSExt::SRT_CMD_KMRSP: + ext = std::make_shared(); break; default: WarnL << "not support ext " << type; break; } if (ext) { @@ -451,6 +481,23 @@ void HandshakePacket::assignPeerIP(struct sockaddr_storage *addr) { } } +void HandshakePacket::assignPeerIPBE(struct sockaddr_storage *addr) { + memset(peer_ip_addr, 0, sizeof(peer_ip_addr) * sizeof(peer_ip_addr[0])); + if (addr->ss_family == AF_INET) { + struct sockaddr_in *ipv4 = (struct sockaddr_in *)addr; + storeUint32(peer_ip_addr, ipv4->sin_addr.s_addr); + } else if (addr->ss_family == AF_INET6) { + if (IN6_IS_ADDR_V4MAPPED(&((struct sockaddr_in6 *)addr)->sin6_addr)) { + struct in_addr addr4; + memcpy(&addr4, 12 + (char *)&(((struct sockaddr_in6 *)addr)->sin6_addr), 4); + storeUint32(peer_ip_addr, addr4.s_addr); + } else { + const sockaddr_in6 *ipv6 = (struct sockaddr_in6 *)addr; + memcpy(peer_ip_addr, ipv6->sin6_addr.s6_addr, sizeof(peer_ip_addr) * sizeof(peer_ip_addr[0])); + } + } + } + uint32_t HandshakePacket::generateSynCookie( struct sockaddr_storage *addr, TimePoint ts, uint32_t current_cookie, int correction) { static std::atomic distractor { 0 }; @@ -619,4 +666,4 @@ bool MsgDropReqPacket::storeToData() { ptr += 4; return true; } -} // namespace SRT \ No newline at end of file +} // namespace SRT diff --git a/srt/Packet.hpp b/srt/Packet.hpp index 601bf94f..a765983b 100644 --- a/srt/Packet.hpp +++ b/srt/Packet.hpp @@ -57,6 +57,7 @@ public: static bool isDataPacket(uint8_t *buf, size_t len); static uint32_t getSocketID(uint8_t *buf, size_t len); bool loadFromData(uint8_t *buf, size_t len); + bool reloadPayload(uint8_t *buf, size_t len); bool storeToData(uint8_t *buf, size_t len); bool storeToHeader(); @@ -105,6 +106,7 @@ public: static const size_t HEADER_SIZE = 16; static bool isControlPacket(uint8_t *buf, size_t len); static uint16_t getControlType(uint8_t *buf, size_t len); + static uint16_t getSubType(uint8_t *buf, size_t len); static uint32_t getSocketID(uint8_t *buf, size_t len); ControlPacket() = default; @@ -180,6 +182,37 @@ protected: Figure 5: Handshake packet structure https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-handshake */ + +// REJ code,from libsrt +#define REJ_MAP(XX) \ +XX(SRT_REJ_UNKNOWN, 1000, "Unknown or erroneous") \ +XX(SRT_REJ_SYSTEM, 1001, "Error in system calls") \ +XX(SRT_REJ_PEER, 1002, "Peer rejected connection") \ +XX(SRT_REJ_RESOURCE, 1003, "Resource allocation failure") \ +XX(SRT_REJ_ROGUE, 1004, "Rogue peer or incorrect parameters") \ +XX(SRT_REJ_BACKLOG, 1005, "Listener's backlog exceeded") \ +XX(SRT_REJ_IPE, 1006, "Internal Program Error") \ +XX(SRT_REJ_CLOSE, 1007, "Socket is being closed") \ +XX(SRT_REJ_VERSION, 1008, "Peer version too old") \ +XX(SRT_REJ_RDVCOOKIE, 1009, "Rendezvous-mode cookie collision") \ +XX(SRT_REJ_BADSECRET, 1010, "Incorrect passphrase") \ +XX(SRT_REJ_UNSECURE, 1011, "Password required or unexpected") \ +XX(SRT_REJ_MESSAGEAPI, 1012, "MessageAPI/StreamAPI collision") \ +XX(SRT_REJ_CONGESTION, 1013, "Congestion controller type collision") \ +XX(SRT_REJ_FILTER, 1014, "Packet Filter settings error") \ +XX(SRT_REJ_GROUP, 1015, "Group settings collision") \ +XX(SRT_REJ_TIMEOUT, 1016, "Connection timeout") \ +XX(SRT_REJ_CRYPTO, 1017, "Crypto mode") + +typedef enum { +#define XX(name, value, str) name = value, + REJ_MAP(XX) +#undef XX + SRT_REJ_E_SIZE +} SRT_REJECT_REASON; + +std::string getRejectReason(SRT_REJECT_REASON code); + class HandshakePacket : public ControlPacket { public: using Ptr = std::shared_ptr; @@ -205,6 +238,10 @@ public: generateSynCookie(struct sockaddr_storage *addr, TimePoint ts, uint32_t current_cookie = 0, int correction = 0); std::string dump(); void assignPeerIP(struct sockaddr_storage *addr); + void assignPeerIPBE(struct sockaddr_storage *addr); + bool isReject() { + return (handshake_type >= SRT_REJ_UNKNOWN && handshake_type < SRT_REJ_E_SIZE); + } ///////ControlPacket override/////// bool loadFromData(uint8_t *buf, size_t len) override; bool storeToData() override; @@ -367,6 +404,56 @@ public: } }; +/* + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+- SRT Header +-+-+-+-+-+-+-+-+-+-+-+-+-+ +|1| Control Type = 0x7FFF | Subtype = 3/4 | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Type-specific Information | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Timestamp | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Destination Socket ID | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +the Control Type field of the SRT packet header is set to User-Defined Type (see Table 1), +the Subtype field of the header is set to SRT_CMD_KMREQ for key-refresh request +and SRT_CMD_KMRSP for key-refresh response (Table 5). The KM Refresh mechanism is described in Section 6.1.6. +https://haivision.github.io/srt-rfc/draft-sharabayko-srt.html#name-key-material +*/ + +class KeyMaterialPacket : public ControlPacket, public KeyMaterial { +public: + using Ptr = std::shared_ptr; + KeyMaterialPacket() = default; + ~KeyMaterialPacket() = default; + + ///////ControlPacket override/////// + bool loadFromData(uint8_t *buf, size_t len) override { + if (len < HEADER_SIZE) { + WarnL << "data size" << len << " less " << HEADER_SIZE; + return false; + } + _data = BufferRaw::create(); + _data->assign((char *)buf, len); + loadHeader(); + assert(sub_type == HSExt::SRT_CMD_KMREQ || sub_type == HSExt::SRT_CMD_KMRSP); + return KeyMaterial::loadFromData(buf + HEADER_SIZE, len - HEADER_SIZE); + } + + bool storeToData() override { + size_t content_size = ((KeyMaterial::getContentSize() + HEADER_SIZE) + 3) / 4 * 4; + control_type = ControlPacket::USERDEFINEDTYPE; + /* sub_type = HSExt::SRT_CMD_KMREQ; */ + /* sub_type = HSExt::SRT_CMD_KMRSP; */ + _data = BufferRaw::create(); + _data->setCapacity(content_size); + _data->setSize(content_size); + storeToHeader(); + return KeyMaterial::storeToData((uint8_t*)_data->data() + HEADER_SIZE, content_size - HEADER_SIZE); + } +}; + } // namespace SRT -#endif // ZLMEDIAKIT_SRT_PACKET_H \ No newline at end of file +#endif // ZLMEDIAKIT_SRT_PACKET_H diff --git a/srt/SrtTransport.cpp b/srt/SrtTransport.cpp index e3db949f..a968dbcd 100644 --- a/srt/SrtTransport.cpp +++ b/srt/SrtTransport.cpp @@ -18,12 +18,14 @@ const std::string kTimeOutSec = SRT_FIELD "timeoutSec"; const std::string kPort = SRT_FIELD "port"; const std::string kLatencyMul = SRT_FIELD "latencyMul"; const std::string kPktBufSize = SRT_FIELD "pktBufSize"; +const std::string kPassPhrase = SRT_FIELD "passPhrase"; static onceToken token([]() { mINI::Instance()[kTimeOutSec] = 5; mINI::Instance()[kPort] = 9000; mINI::Instance()[kLatencyMul] = 4; mINI::Instance()[kPktBufSize] = 8192; + mINI::Instance()[kPassPhrase] = ""; }); static std::atomic s_srt_socket_id_generate { 125 }; @@ -228,6 +230,8 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad // first HSExtMessage::Ptr req; HSExtStreamID::Ptr sid; + HSExtKeyMaterial::Ptr keyMaterial; + uint32_t srt_flag = 0xbf; uint16_t delay = DurationCountMicroseconds(_now - _induction_ts) * getLatencyMul() / 1000; if (delay <= 120) { @@ -241,6 +245,9 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad if (!sid) { sid = std::dynamic_pointer_cast(ext); } + if (!keyMaterial) { + keyMaterial = std::dynamic_pointer_cast(ext); + } } if (sid) { _stream_id = sid->streamid; @@ -252,6 +259,22 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad srt_flag = req->srt_flag; delay = delay <= req->recv_tsbpd_delay ? req->recv_tsbpd_delay : delay; } + + if (!keyMaterial && getPassphrase().empty()) { + //nop + } else if (keyMaterial && !getPassphrase().empty()) { + _crypto = std::make_shared(getPassphrase()); + if (!_crypto->loadFromKeyMaterial(keyMaterial)) { + sendRejectPacket(SRT_REJ_BADSECRET, addr); + onShutdown(SockException(Err_other, StrPrinter << "handshake fail, reject resaon: " << SRT::getRejectReason(SRT_REJ_BADSECRET))); + return; + } + } else { + sendRejectPacket(SRT_REJ_UNSECURE, addr); + onShutdown(SockException(Err_other, StrPrinter << "handshake fail, reject resaon: " << SRT::getRejectReason(SRT_REJ_UNSECURE))); + return; + } + TraceL << getIdentifier() << " CONCLUSION Phase from"<(); res->dst_socket_id = _peer_socket_id; @@ -262,6 +285,12 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad res->version = 5; res->encryption_field = HandshakePacket::NO_ENCRYPTION; res->extension_field = HandshakePacket::HS_EXT_FILED_HSREQ; + if (_crypto) { + //The default value is 0 (no encryption advertised). + //If neither peer advertises encryption, AES-128 is selected by default + /* req->encryption_field = SRT::HandshakePacket::AES_128; */ + res->extension_field |= HandshakePacket::HS_EXT_FILED_KMREQ; + } res->handshake_type = HandshakePacket::HS_TYPE_CONCLUSION; res->srt_socket_id = _socket_id; res->syn_cookie = 0; @@ -272,6 +301,10 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad ext->srt_flag = srt_flag; ext->recv_tsbpd_delay = ext->send_tsbpd_delay = delay; res->ext_list.push_back(std::move(ext)); + if (keyMaterial) { + keyMaterial->extension_type = HSExt::SRT_CMD_KMRSP; + res->ext_list.push_back(std::move(keyMaterial)); + } res->storeToData(); _handleshake_res = res; unregisterSelfHandshake(); @@ -366,6 +399,42 @@ void SrtTransport::sendMsgDropReq(uint32_t first, uint32_t last) { sendControlPacket(pkt, true); } +void SrtTransport::tryAnnounceKeyMaterial() { + //TraceL; + + if (!_crypto) { + return; + } + + auto pkt = _crypto->takeAwayAnnouncePacket(); + if (!pkt) { + return; + } + + auto now = SteadyClock::now(); + pkt->dst_socket_id = _peer_socket_id; + pkt->timestamp = SRT::DurationCountMicroseconds(now - _start_timestamp); + pkt->storeToData(); + _announce_req = pkt; + sendControlPacket(pkt, true); + + std::weak_ptr weak_self = std::static_pointer_cast(shared_from_this()); + _announce_timer = std::make_shared(0.2, [weak_self]()->bool{ + auto strong_self = weak_self.lock(); + if (!strong_self) { + return false; + } + if (!strong_self->_announce_req) { + return false; + } + + strong_self->sendControlPacket(strong_self->_announce_req, true); + return true; + }, getPoller()); + + return; +} + void SrtTransport::handleNAK(uint8_t *buf, int len, struct sockaddr_storage *addr) { // TraceL; NAKPacket pkt; @@ -433,6 +502,8 @@ void SrtTransport::handleDropReq(uint8_t *buf, int len, struct sockaddr_storage */ } void SrtTransport::checkAndSendAckNak(){ + //SRT Periodic NAK reports are sent with a period of (RTT + 4 * RTTVar) / 2 (so called NAKInterval), + //with a 20 milliseconds floor auto nak_interval = (_rtt + _rtt_variance * 4) / 2; if (nak_interval <= 20 * 1000) { nak_interval = 20 * 1000; @@ -468,7 +539,52 @@ void SrtTransport::checkAndSendAckNak(){ _light_ack_pkt_count++; } void SrtTransport::handleUserDefinedType(uint8_t *buf, int len, struct sockaddr_storage *addr) { - TraceL; + /* TraceL; */ + + using srt_userd_defined_handler = void (SrtTransport::*)(uint8_t * buf, int len, struct sockaddr_storage *addr); + static std::unordered_map s_userd_defined_functions; + static onceToken token([]() { + s_userd_defined_functions.emplace(SRT::HSExt::SRT_CMD_KMREQ, &SrtTransport::handleKeyMaterialReqPacket); + s_userd_defined_functions.emplace(SRT::HSExt::SRT_CMD_KMRSP, &SrtTransport::handleKeyMaterialRspPacket); + }); + + uint16_t subtype = ControlPacket::getSubType(buf, len); + auto it = s_userd_defined_functions.find(subtype); + if (it == s_userd_defined_functions.end()) { + WarnL << " not support subtype in user defined msg ignore: " << subtype; + return; + } else { + (this->*(it->second))(buf, len, addr); + } + + return; +} + +void SrtTransport::handleKeyMaterialReqPacket(uint8_t *buf, int len, struct sockaddr_storage *addr) { + /* TraceL; */ + + if (!_crypto) { + WarnL << " not enable crypto, ignore"; + return; + } + + KeyMaterialPacket::Ptr pkt = std::make_shared(); + pkt->loadFromData(buf, len); + _crypto->loadFromKeyMaterial(pkt); + + //rsp + pkt->sub_type = SRT::HSExt::SRT_CMD_KMRSP; + pkt->dst_socket_id = _peer_socket_id; + pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp); + pkt->storeToData(); + sendControlPacket(pkt, true); + return; +} + +void SrtTransport::handleKeyMaterialRspPacket(uint8_t *buf, int len, struct sockaddr_storage *addr) { + /* TraceL; */ + _announce_req = nullptr; + return; } void SrtTransport::handleACKACK(uint8_t *buf, int len, struct sockaddr_storage *addr) { @@ -603,6 +719,25 @@ void SrtTransport::sendNAKPacket(std::list &lost_list) { // TraceL<<"send NAK "<dump(); } +void SrtTransport::sendRejectPacket(SRT_REJECT_REASON reason, struct sockaddr_storage *addr) { + HandshakePacket::Ptr res = std::make_shared(); + res->dst_socket_id = _peer_socket_id; + res->timestamp = DurationCountMicroseconds(_now - _start_timestamp); + res->mtu = _mtu; + res->max_flow_window_size = _max_window_size; + res->initial_packet_sequence_number = _init_seq_number; + res->version = 5; + res->encryption_field = HandshakePacket::NO_ENCRYPTION; + res->extension_field = HandshakePacket::HS_EXT_FILED_HSREQ; + res->handshake_type = reason; + res->srt_socket_id = _socket_id; + res->syn_cookie = 0; + res->assignPeerIP(addr); + res->storeToData(); + sendControlPacket(res, true); + return; +} + void SrtTransport::sendShutDown() { ShutDownPacket::Ptr pkt = std::make_shared(); pkt->dst_socket_id = _peer_socket_id; @@ -615,6 +750,16 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora DataPacket::Ptr pkt = std::make_shared(); pkt->loadFromData(buf, len); + if (_crypto) { + auto payload = _crypto->decrypt(pkt, pkt->payloadData(), pkt->payloadSize()); + if (!payload) { + WarnL << "decrypt pkt->packet_seq_number: " << pkt->packet_seq_number << ", timestamp: " << "pkt->timestamp " << " fail"; + return; + } + + pkt->reloadPayload((uint8_t*)payload->data(), payload->size()); + } + _estimated_link_capacity_context->inputPacket(_now,pkt); std::list list; @@ -684,9 +829,26 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora } void SrtTransport::sendDataPacket(DataPacket::Ptr pkt, char *buf, int len, bool flush) { - pkt->storeToData((uint8_t *)buf, len); + auto data = buf; + auto size = len; + BufferLikeString::Ptr payload; + if (_crypto) { + payload = _crypto->encrypt(pkt, const_cast(buf), len); + if (!payload) { + WarnL << "encrypt pkt->packet_seq_number: " << pkt->packet_seq_number << ", timestamp: " << "pkt->timestamp " << " fail"; + return; + } + + data = payload->data(); + size = payload->size(); + + tryAnnounceKeyMaterial(); + } + + pkt->storeToData((uint8_t *)data, size); sendPacket(pkt, flush); _send_buf->inputPacket(pkt); + return; } void SrtTransport::sendControlPacket(ControlPacket::Ptr pkt, bool flush) { @@ -836,4 +998,4 @@ SrtTransport::Ptr SrtTransportManager::getHandshakeItem(const uint32_t key) { return it->second.lock(); } -} // namespace SRT \ No newline at end of file +} // namespace SRT diff --git a/srt/SrtTransport.hpp b/srt/SrtTransport.hpp index 56b75b4c..36edf093 100644 --- a/srt/SrtTransport.hpp +++ b/srt/SrtTransport.hpp @@ -13,6 +13,7 @@ #include "Common.hpp" #include "NackContext.hpp" #include "Packet.hpp" +#include "Crypto.hpp" #include "PacketQueue.hpp" #include "PacketSendQueue.hpp" #include "Statistic.hpp" @@ -24,6 +25,7 @@ extern const std::string kPort; extern const std::string kTimeOutSec; extern const std::string kLatencyMul; extern const std::string kPktBufSize; +extern const std::string kPassPhrase; class SrtTransport : public std::enable_shared_from_this { public: @@ -60,6 +62,7 @@ protected: virtual int getLatencyMul() { return 4; }; virtual int getPktBufSize() { return 8192; }; virtual float getTimeOutSec(){return 5.0;}; + virtual std::string getPassphrase() {return "";}; private: void registerSelf(); @@ -79,15 +82,19 @@ private: void handleShutDown(uint8_t *buf, int len, struct sockaddr_storage *addr); void handleDropReq(uint8_t *buf, int len, struct sockaddr_storage *addr); void handleUserDefinedType(uint8_t *buf, int len, struct sockaddr_storage *addr); + void handleKeyMaterialReqPacket(uint8_t *buf, int len, struct sockaddr_storage *addr); + void handleKeyMaterialRspPacket(uint8_t *buf, int len, struct sockaddr_storage *addr); void handlePeerError(uint8_t *buf, int len, struct sockaddr_storage *addr); void handleDataPacket(uint8_t *buf, int len, struct sockaddr_storage *addr); void sendNAKPacket(std::list &lost_list); void sendACKPacket(); + void sendRejectPacket(SRT_REJECT_REASON reason, struct sockaddr_storage *addr); void sendLightACKPacket(); void sendKeepLivePacket(); void sendShutDown(); void sendMsgDropReq(uint32_t first, uint32_t last); + void tryAnnounceKeyMaterial(); size_t getPayloadSize() const; @@ -159,6 +166,11 @@ private: Ticker _alive_ticker; bool _is_handleshake_finished = false; + + // for encryption + Crypto::Ptr _crypto; + Timer::Ptr _announce_timer; + KeyMaterialPacket::Ptr _announce_req; }; class SrtTransportManager { @@ -185,4 +197,4 @@ private: } // namespace SRT -#endif // ZLMEDIAKIT_SRT_TRANSPORT_H \ No newline at end of file +#endif // ZLMEDIAKIT_SRT_TRANSPORT_H diff --git a/srt/SrtTransportImp.cpp b/srt/SrtTransportImp.cpp index 60c0b59b..d0323483 100644 --- a/srt/SrtTransportImp.cpp +++ b/srt/SrtTransportImp.cpp @@ -370,6 +370,11 @@ float SrtTransportImp::getTimeOutSec() { return timeOutSec; } +std::string SrtTransportImp::getPassphrase() { + GET_CONFIG(string, passphrase, kPassPhrase); + return passphrase; +} + int SrtTransportImp::getPktBufSize() { // kPktBufSize GET_CONFIG(int, pktBufSize, kPktBufSize); @@ -380,4 +385,4 @@ int SrtTransportImp::getPktBufSize() { return pktBufSize; } -} // namespace SRT \ No newline at end of file +} // namespace SRT diff --git a/srt/SrtTransportImp.hpp b/srt/SrtTransportImp.hpp index 9c1ac47e..ab0b2b13 100644 --- a/srt/SrtTransportImp.hpp +++ b/srt/SrtTransportImp.hpp @@ -38,6 +38,7 @@ protected: int getLatencyMul() override; int getPktBufSize() override; float getTimeOutSec() override; + std::string getPassphrase() override; void onSRTData(DataPacket::Ptr pkt) override; void onShutdown(const SockException &ex) override; void onHandShakeFinished(std::string &streamid, struct sockaddr_storage *addr) override;