From 6273c89b9db99025f5f726d6c698c3c4098c9513 Mon Sep 17 00:00:00 2001 From: Jerry Yan <792602257@qq.com> Date: Tue, 7 Apr 2026 17:16:26 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8DSSRC=E6=97=A0=E6=B3=95?= =?UTF-8?q?=E6=AD=A3=E7=A1=AE=E5=9B=9E=E6=94=B6=E7=9A=84=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/gb28181/bean/InviteMessageInfo.java | 2 + .../iot/vmp/gb28181/bean/SendRtpInfo.java | 9 ++ .../iot/vmp/gb28181/bean/SsrcTransaction.java | 9 ++ .../service/impl/DeviceServiceImpl.java | 2 +- .../service/impl/PlatformServiceImpl.java | 43 ++++++-- .../gb28181/service/impl/PlayServiceImpl.java | 63 +++++++++-- .../iot/vmp/gb28181/session/SSRCFactory.java | 22 +++- .../transmit/cmd/impl/SIPCommander.java | 104 +++++++++--------- .../cmd/impl/SIPCommanderForPlatform.java | 5 +- .../request/impl/ByeRequestProcessor.java | 6 +- .../request/impl/InviteRequestProcessor.java | 16 ++- .../iot/vmp/service/bean/SSRCInfo.java | 5 + .../service/impl/RtpServerServiceImpl.java | 4 + .../control/RedisRpcSendRtpController.java | 18 +++ .../control/RedisRpcStreamPushController.java | 2 + .../redisMsg/service/RedisRpcServiceImpl.java | 1 + 16 files changed, 233 insertions(+), 78 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteMessageInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteMessageInfo.java index beadb6901..b745f494c 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteMessageInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteMessageInfo.java @@ -10,6 +10,8 @@ public class InviteMessageInfo { private String sourceChannelId; private String sessionName; private String ssrc; + private String allocatedSsrc; + private String allocatedSsrcMediaServerId; private boolean tcp; private boolean tcpActive; private String callId; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java index 756f20d89..67447fe64 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java @@ -24,6 +24,11 @@ public class SendRtpInfo { */ private String ssrc; + /** + * 从SSRC池中分配的SSRC + */ + private String allocatedSsrc; + /** * 目标平台或设备的编号 */ @@ -247,4 +252,8 @@ public class SendRtpInfo { this.setPlayType("Play".equalsIgnoreCase(sessionName) ? InviteStreamType.PLAY : InviteStreamType.PLAYBACK); } } + + public String getSsrcToRelease() { + return allocatedSsrc; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SsrcTransaction.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SsrcTransaction.java index 2a051a771..10ba63ca1 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SsrcTransaction.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SsrcTransaction.java @@ -47,6 +47,11 @@ public class SsrcTransaction { */ private String ssrc; + /** + * 从SSRC池中分配的SSRC + */ + private String allocatedSsrc; + /** * 事务信息 */ @@ -88,4 +93,8 @@ public class SsrcTransaction { public SsrcTransaction() { } + + public String getSsrcToRelease() { + return allocatedSsrc; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index dccbbffad..a56c4264e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -251,7 +251,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { List ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(device.getDeviceId()); if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) { for (SsrcTransaction ssrcTransaction : ssrcTransactions) { - mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); + mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrcToRelease()); receiveRtpServerService.closeRTPServerByMediaServerId(ssrcTransaction.getMediaServerId(), ssrcTransaction.getApp(), ssrcTransaction.getStream()); sessionManager.removeByCallId(ssrcTransaction.getCallId()); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index 8603c3dfe..e3b1211a2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -132,7 +132,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner } sendRtpServerService.delete(sendRtpItem); if (mediaServerItem != null) { - ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); + ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrcToRelease()); boolean stopResult = mediaServerService.initStopSendRtp(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); if (stopResult) { Platform platform = queryPlatformByServerGBId(sendRtpItem.getTargetId()); @@ -339,7 +339,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp()) && sendRtpItem.isSendToPlatform()) { Platform platform = platformMapper.getParentPlatByServerGBId(sendRtpItem.getTargetId()); CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId()); - ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); + ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrcToRelease()); try { commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel); } catch (SipException | InvalidArgumentException | ParseException e) { @@ -526,7 +526,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner List sendRtpItems = sendRtpServerService.queryForPlatform(platformId); if (sendRtpItems != null && !sendRtpItems.isEmpty()) { for (SendRtpInfo sendRtpItem : sendRtpItems) { - ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc()); + ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrcToRelease()); sendRtpServerService.delete(sendRtpItem); MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null); @@ -647,7 +647,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner log.error("[点播超时], 发送BYE失败 {}", e.getMessage()); } finally { timeoutCallback.run(1, "收流超时"); - mediaServerService.releaseSsrc(mediaServerItem.getId(), data.getSsrcInfo().getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), data.getSsrcInfo().getSsrcToRelease()); receiveRtpServerService.closeRTPServer(mediaServerItem, data.getSsrcInfo().getApp(), data.getSsrcInfo().getStream()); sessionManager.removeByStream(data.getSsrcInfo().getApp(), data.getSsrcInfo().getStream()); } @@ -728,8 +728,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner // ssrc检验 // 更新ssrc log.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + releaseAllocatedSsrc(mediaServerItem, ssrcInfo); Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse); if (!result) { try { @@ -739,7 +738,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner log.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage()); } finally { // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrcToRelease()); receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream()); sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); @@ -751,6 +750,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner } }else { ssrcInfo.setSsrc(ssrcInResponse); + updateSsrcTransaction(ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse, null); inviteInfo.setSsrcInfo(ssrcInfo); inviteInfo.setStream(ssrcInfo.getStream()); if (tcpMode == 2) { @@ -764,7 +764,9 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner inviteStreamService.updateInviteInfo(inviteInfo); } }else { + releaseAllocatedSsrc(mediaServerItem, ssrcInfo); ssrcInfo.setSsrc(ssrcInResponse); + updateSsrcTransaction(ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse, null); inviteInfo.setSsrcInfo(ssrcInfo); inviteInfo.setStream(ssrcInfo.getStream()); if (tcpMode == 2) { @@ -782,6 +784,10 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner // 单端口 // 重新订阅流上线 SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(ssrcInfo.getApp(), inviteInfo.getStream()); + if (ssrcTransaction == null) { + return; + } + releaseAllocatedSsrc(mediaServerItem, ssrcInfo); sessionManager.removeByStream(ssrcInfo.getApp(), inviteInfo.getStream()); inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse); @@ -790,6 +796,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner ssrcTransaction.setApp(ssrcInfo.getApp()); ssrcTransaction.setStream(inviteInfo.getStream()); ssrcTransaction.setSsrc(ssrcInResponse); + ssrcTransaction.setAllocatedSsrc(null); ssrcTransaction.setMediaServerId(mediaServerItem.getId()); ssrcTransaction.setSipTransactionInfo(new SipTransactionInfo((SIPResponse) responseEvent.getResponse())); ssrcTransaction.setType(inviteSessionType); @@ -800,6 +807,24 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner } } + private void releaseAllocatedSsrc(MediaServer mediaServerItem, SSRCInfo ssrcInfo) { + if (ssrcInfo == null || ssrcInfo.getAllocatedSsrc() == null) { + return; + } + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getAllocatedSsrc()); + ssrcInfo.setAllocatedSsrc(null); + } + + private void updateSsrcTransaction(String app, String stream, String ssrc, String allocatedSsrc) { + SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(app, stream); + if (ssrcTransaction == null) { + return; + } + ssrcTransaction.setSsrc(ssrc); + ssrcTransaction.setAllocatedSsrc(allocatedSsrc); + sessionManager.put(ssrcTransaction); + } + private void tcpActiveHandler(Platform platform, CommonGBChannel channel, String contentString, MediaServer mediaServerItem, int tcpMode, boolean ssrcCheck, @@ -836,7 +861,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner log.error("[TCP主动连接对方] serverGbId: {}, channelId: {}, 解析200OK的SDP信息失败", platform.getServerGBId(), channel.getGbDeviceId(), e); receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream()); // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrcToRelease()); sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), @@ -861,7 +886,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner InviteInfo inviteInfo = inviteStreamService.getInviteInfo(null, channel.getGbId(), stream); if (inviteInfo != null) { // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), inviteInfo.getSsrcInfo().getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), inviteInfo.getSsrcInfo().getSsrcToRelease()); inviteStreamService.removeInviteInfo(inviteInfo); } sessionManager.removeByStream(app, stream); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index 9e2b0be78..c3a7d531a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -495,8 +495,15 @@ public class PlayServiceImpl implements IPlayService { try { sendRtpInfo = sendRtpServerService.createSendRtpInfo(mediaServerItem, null, null, playSsrc, device.getDeviceId(), MediaApp.GB28181_TALK, stream, channel.getId(), true, false); + if (sendRtpInfo == null) { + ssrcFactory.releaseSsrc(mediaServerItem.getId(), playSsrc); + audioEvent.call("获取发流端口失败"); + return; + } + sendRtpInfo.setAllocatedSsrc(playSsrc); sendRtpInfo.setPlayType(InviteStreamType.TALK); }catch (PlayException e) { + ssrcFactory.releaseSsrc(mediaServerItem.getId(), playSsrc); log.info("[语音对讲]开始 获取发流端口失败 deviceId: {}, channelId: {},", device.getDeviceId(), channel.getDeviceId()); return; } @@ -523,7 +530,7 @@ public class PlayServiceImpl implements IPlayService { log.error("[语音对讲]超时, 发送BYE失败 {}", e.getMessage()); } finally { timeoutCallback.run(); - mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease()); sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream()); } }, userSetting.getPlayTimeout()); @@ -532,13 +539,13 @@ public class PlayServiceImpl implements IPlayService { Integer localPort = mediaServerService.startSendRtpPassive(mediaServerItem, sendRtpInfo, userSetting.getPlayTimeout() * 1000); if (localPort == null || localPort <= 0) { timeoutCallback.run(); - mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease()); sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream()); return; } sendRtpInfo.setPort(localPort); }catch (ControllerException e) { - mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease()); log.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channel.getDeviceId()); audioEvent.call("失败, " + e.getMessage()); // 查看是否已经建立了通道,存在则发送bye @@ -584,7 +591,7 @@ public class PlayServiceImpl implements IPlayService { dynamicTask.stop(timeOutTaskKey); receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream()); // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease()); sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream()); errorEvent.response(event); }, userSetting.getPlayTimeout().longValue()); @@ -594,7 +601,7 @@ public class PlayServiceImpl implements IPlayService { dynamicTask.stop(timeOutTaskKey); receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream()); // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease()); sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream()); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(); @@ -879,8 +886,7 @@ public class PlayServiceImpl implements IPlayService { // ssrc检验 // 更新ssrc log.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + releaseAllocatedSsrc(mediaServerItem, ssrcInfo); Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse); if (!result) { try { @@ -890,8 +896,7 @@ public class PlayServiceImpl implements IPlayService { log.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage()); } - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrcToRelease()); sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); @@ -903,6 +908,7 @@ public class PlayServiceImpl implements IPlayService { }else { ssrcInfo.setSsrc(ssrcInResponse); + updateSsrcTransaction(ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse, null); inviteInfo.setSsrcInfo(ssrcInfo); inviteInfo.setStream(ssrcInfo.getStream()); if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { @@ -914,18 +920,37 @@ public class PlayServiceImpl implements IPlayService { } inviteStreamService.updateInviteInfo(inviteInfo); } + } else { + releaseAllocatedSsrc(mediaServerItem, ssrcInfo); + ssrcInfo.setSsrc(ssrcInResponse); + updateSsrcTransaction(ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse, null); + inviteInfo.setSsrcInfo(ssrcInfo); + inviteInfo.setStream(ssrcInfo.getStream()); + if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { + if (mediaServerItem.isRtpEnable()) { + tcpActiveHandler(device, channel, contentString, mediaServerItem, ssrcInfo, callback); + }else { + log.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流"); + } + } + inviteStreamService.updateInviteInfo(inviteInfo); } }else { if (ssrcInResponse != null) { // 单端口 // 重新订阅流上线 SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(MediaApp.GB28181, inviteInfo.getStream()); + if (ssrcTransaction == null) { + return; + } + releaseAllocatedSsrc(mediaServerItem, ssrcInfo); sessionManager.removeByStream(MediaApp.GB28181, inviteInfo.getStream()); inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse); ssrcTransaction.setDeviceId(device.getDeviceId()); ssrcTransaction.setChannelId(ssrcTransaction.getChannelId()); ssrcTransaction.setCallId(ssrcTransaction.getCallId()); ssrcTransaction.setSsrc(ssrcInResponse); + ssrcTransaction.setAllocatedSsrc(null); ssrcTransaction.setApp(MediaApp.GB28181); ssrcTransaction.setStream(inviteInfo.getStream()); ssrcTransaction.setMediaServerId(mediaServerItem.getId()); @@ -938,6 +963,24 @@ public class PlayServiceImpl implements IPlayService { } } + private void releaseAllocatedSsrc(MediaServer mediaServerItem, SSRCInfo ssrcInfo) { + if (ssrcInfo == null || ssrcInfo.getAllocatedSsrc() == null) { + return; + } + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getAllocatedSsrc()); + ssrcInfo.setAllocatedSsrc(null); + } + + private void updateSsrcTransaction(String app, String stream, String ssrc, String allocatedSsrc) { + SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(app, stream); + if (ssrcTransaction == null) { + return; + } + ssrcTransaction.setSsrc(ssrc); + ssrcTransaction.setAllocatedSsrc(allocatedSsrc); + sessionManager.put(ssrcTransaction); + } + @Override public void download(Device device, DeviceChannel channel, String startTime, String endTime, int downloadSpeed, ErrorCallback callback) { @@ -1577,7 +1620,7 @@ public class PlayServiceImpl implements IPlayService { mediaServerService.stopSendRtp(mediaServer, sendRtpInfo.getApp(), sendRtpInfo.getStream(), sendRtpInfo.getSsrc()); } - ssrcFactory.releaseSsrc(mediaServerId, sendRtpInfo.getSsrc()); + ssrcFactory.releaseSsrc(mediaServerId, sendRtpInfo.getSsrcToRelease()); SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream()); if (ssrcTransaction != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java index 67f9c29d7..1c9e5ba4d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java @@ -39,8 +39,7 @@ public class SSRCFactory { public void initMediaServerSSRC(String mediaServerId, Set usedSet) { - String sipDomain = sipConfig.getDomain(); - String ssrcPrefix = sipDomain.length() >= 8 ? sipDomain.substring(3, 8) : sipDomain; + String ssrcPrefix = getSsrcPrefix(); String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId; List ssrcList = new ArrayList<>(); for (int i = 1; i < MAX_STREAM_COUNT; i++) { @@ -83,7 +82,12 @@ public class SSRCFactory { if (ssrc == null) { return; } + if (!isFactorySsrc(ssrc)) { + log.warn("[释放 SSRC] 忽略非SSRC池分配的值: {}", ssrc); + return; + } String sn = ssrc.substring(1); + log.debug("[释放 SSRC] SSRC:{} -> SN: {}", ssrc, sn); String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId; redisTemplate.opsForSet().add(redisKey, sn); } @@ -122,4 +126,18 @@ public class SSRCFactory { return Boolean.TRUE.equals(redisTemplate.hasKey(redisKey)); } + private String getSsrcPrefix() { + String sipDomain = sipConfig.getDomain(); + return sipDomain.length() >= 8 ? sipDomain.substring(3, 8) : sipDomain; + } + + private boolean isFactorySsrc(String ssrc) { + if (ssrc.length() < 2) { + return false; + } + String sn = ssrc.substring(1); + String ssrcPrefix = getSsrcPrefix(); + return sn.length() == ssrcPrefix.length() + 4 && sn.startsWith(ssrcPrefix); + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 51f296a6a..085968c1b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -282,21 +282,22 @@ public class SIPCommander implements ISIPCommander { // f字段:f= v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率 // content.append("f=v/2/6/25/1/4000a/6/8/1" + "\r\n"); // 未发现支持此特性的设备 - Request request = headerProvider.createInviteRequest(device, channel.getDeviceId(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(),sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); - sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> { - sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - errorEvent.response(e); - }), e -> { - ResponseEvent responseEvent = (ResponseEvent) e.event; - SIPResponse response = (SIPResponse) responseEvent.getResponse(); - String callId = response.getCallIdHeader().getCallId(); - SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), - callId,ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, - InviteSessionType.PLAY); - sessionManager.put(ssrcTransaction); - okEvent.response(e); - }, timeout); + Request request = headerProvider.createInviteRequest(device, channel.getDeviceId(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(),sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); + sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> { + sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrcToRelease()); + errorEvent.response(e); + }), e -> { + ResponseEvent responseEvent = (ResponseEvent) e.event; + SIPResponse response = (SIPResponse) responseEvent.getResponse(); + String callId = response.getCallIdHeader().getCallId(); + SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), + callId,ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, + InviteSessionType.PLAY); + ssrcTransaction.setAllocatedSsrc(ssrcInfo.getAllocatedSsrc()); + sessionManager.put(ssrcTransaction); + okEvent.response(e); + }, timeout); } /** @@ -384,16 +385,17 @@ public class SIPCommander implements ISIPCommander { Request request = headerProvider.createPlaybackInviteRequest(device, channel.getDeviceId(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()), ssrcInfo.getSsrc()); - sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> { - ResponseEvent responseEvent = (ResponseEvent) event.event; - SIPResponse response = (SIPResponse) responseEvent.getResponse(); - SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), - channel.getId(), sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), - device.getTransport()).getCallId(), ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), - mediaServerItem.getId(), response, InviteSessionType.PLAYBACK); - sessionManager.put(ssrcTransaction); - okEvent.response(event); - }, timeout); + sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> { + ResponseEvent responseEvent = (ResponseEvent) event.event; + SIPResponse response = (SIPResponse) responseEvent.getResponse(); + SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), + channel.getId(), sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), + device.getTransport()).getCallId(), ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), + mediaServerItem.getId(), response, InviteSessionType.PLAYBACK); + ssrcTransaction.setAllocatedSsrc(ssrcInfo.getAllocatedSsrc()); + sessionManager.put(ssrcTransaction); + okEvent.response(event); + }, timeout); } /** @@ -476,17 +478,18 @@ public class SIPCommander implements ISIPCommander { CallIdHeader newCallIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport()); Request request = headerProvider.createPlaybackInviteRequest(device, channel.getDeviceId(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null,newCallIdHeader, ssrcInfo.getSsrc()); - sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> { - ResponseEvent responseEvent = (ResponseEvent) event.event; - SIPResponse response = (SIPResponse) responseEvent.getResponse(); - String contentString =new String(response.getRawContent()); - String ssrc = SipUtils.getSsrcFromSdp(contentString); - SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), - response.getCallIdHeader().getCallId(), ssrcInfo.getApp(), ssrcInfo.getStream(), ssrc, - mediaServerItem.getId(), response, InviteSessionType.DOWNLOAD); - sessionManager.put(ssrcTransaction); - okEvent.response(event); - }, timeout); + sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, errorEvent, event -> { + ResponseEvent responseEvent = (ResponseEvent) event.event; + SIPResponse response = (SIPResponse) responseEvent.getResponse(); + String contentString =new String(response.getRawContent()); + String ssrc = SipUtils.getSsrcFromSdp(contentString); + SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), + response.getCallIdHeader().getCallId(), ssrcInfo.getApp(), ssrcInfo.getStream(), ssrc, + mediaServerItem.getId(), response, InviteSessionType.DOWNLOAD); + ssrcTransaction.setAllocatedSsrc(ssrcInfo.getAllocatedSsrc()); + sessionManager.put(ssrcTransaction); + okEvent.response(event); + }, timeout); } @Override @@ -540,20 +543,21 @@ public class SIPCommander implements ISIPCommander { // f字段:f= v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率 content.append("f=v/////a/1/8/1" + "\r\n"); - Request request = headerProvider.createInviteRequest(device, channel.getDeviceId(), content.toString(), - SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, sendRtpItem.getSsrc(), callIdHeader); - sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> { - sessionManager.removeByStream(sendRtpItem.getApp(), sendRtpItem.getStream()); - mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); - errorEvent.response(e); - }), e -> { - // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值 - ResponseEvent responseEvent = (ResponseEvent) e.event; - SIPResponse response = (SIPResponse) responseEvent.getResponse(); - SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), MediaApp.GB28181_TALK,sendRtpItem.getApp(), stream, sendRtpItem.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.TALK); - sessionManager.put(ssrcTransaction); - okEvent.response(e); - }, timeout); + Request request = headerProvider.createInviteRequest(device, channel.getDeviceId(), content.toString(), + SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, sendRtpItem.getSsrc(), callIdHeader); + sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> { + sessionManager.removeByStream(sendRtpItem.getApp(), sendRtpItem.getStream()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrcToRelease()); + errorEvent.response(e); + }), e -> { + // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值 + ResponseEvent responseEvent = (ResponseEvent) e.event; + SIPResponse response = (SIPResponse) responseEvent.getResponse(); + SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), MediaApp.GB28181_TALK,sendRtpItem.getApp(), stream, sendRtpItem.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.TALK); + ssrcTransaction.setAllocatedSsrc(sendRtpItem.getAllocatedSsrc()); + sessionManager.put(ssrcTransaction); + okEvent.response(e); + }, timeout); } /** diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java index 2328b0abc..8b40ee35b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java @@ -645,7 +645,7 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform { String mediaServerId = sendRtpItem.getMediaServerId(); MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem != null) { - mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrcToRelease()); receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); } SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem, channel); @@ -747,13 +747,14 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform { callIdHeader); sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), request, (e -> { sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrcToRelease()); errorEvent.response(e); }), e -> { ResponseEvent responseEvent = (ResponseEvent) e.event; SIPResponse response = (SIPResponse) responseEvent.getResponse(); SsrcTransaction ssrcTransaction = SsrcTransaction.buildForPlatform(platform.getServerGBId(), channel.getGbId(), callIdHeader.getCallId(), ssrcInfo.getApp(), stream, ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.BROADCAST); + ssrcTransaction.setAllocatedSsrc(ssrcInfo.getAllocatedSsrc()); sessionManager.put(ssrcTransaction); okEvent.response(e); }); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 0868ef826..9499ae8ce 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -132,7 +132,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In if (mediaServer != null) { mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); if (userSetting.getUseCustomSsrcForParentInvite()) { - mediaServerService.releaseSsrc(mediaServer.getId(), sendRtpItem.getSsrc()); + mediaServerService.releaseSsrc(mediaServer.getId(), sendRtpItem.getSsrcToRelease()); } } } @@ -144,7 +144,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In sendRtpServerService.delete(sendRtpItem); mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); if (userSetting.getUseCustomSsrcForParentInvite()) { - mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrc()); + mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrcToRelease()); } } if (sendRtpItem.getServerId().equals(userSetting.getServerId())) { @@ -254,7 +254,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In // 释放ssrc MediaServer mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId()); if (mediaServerItem != null) { - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrc()); + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrcToRelease()); } sessionManager.removeByCallId(ssrcTransaction.getCallId()); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index 6daf12133..bc9263761 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -179,6 +179,8 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements ? ssrcFactory.getPlaySsrc(streamInfo.getMediaServer().getId()) : ssrcFactory.getPlayBackSsrc(streamInfo.getMediaServer().getId()); inviteInfo.setSsrc(ssrc); + inviteInfo.setAllocatedSsrc(ssrc); + inviteInfo.setAllocatedSsrcMediaServerId(streamInfo.getMediaServer().getId()); } } // 构建sendRTP内容 @@ -186,6 +188,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements inviteInfo.getIp(), inviteInfo.getPort(), inviteInfo.getSsrc(), platform.getServerGBId(), streamInfo.getApp(), streamInfo.getStream(), channel.getGbId(), inviteInfo.isTcp(), platform.isRtcp()); + sendRtpItem.setAllocatedSsrc(inviteInfo.getAllocatedSsrc()); if (inviteInfo.isTcp() && inviteInfo.isTcpActive()) { sendRtpItem.setTcpActive(true); } @@ -203,7 +206,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 超时未收到Ack应该回复bye,当前等待时间为10秒 dynamicTask.startDelay(inviteInfo.getCallId(), () -> { log.info("[Ack ] 等待超时, {}/{}", inviteInfo.getCallId(), channel.getGbDeviceId()); - mediaServerService.releaseSsrc(streamInfo.getMediaServer().getId(), sendRtpItem.getSsrc()); + mediaServerService.releaseSsrc(streamInfo.getMediaServer().getId(), sendRtpItem.getSsrcToRelease()); // 回复bye sendBye(platform, inviteInfo.getCallId()); }, 60 * 1000); @@ -244,6 +247,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements log.error("[命令发送失败] invite BAD_REQUEST: {}", sendException.getMessage()); } }catch (PlayException e) { + releaseAllocatedSsrc(inviteInfo); try { responseAck(request, e.getCode(), e.getMsg()); } catch (SipException | InvalidArgumentException | ParseException sendException) { @@ -251,6 +255,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } }catch (Exception e) { log.error("[Invite处理异常] ", e); + releaseAllocatedSsrc(inviteInfo); try { responseAck(request, Response.SERVER_INTERNAL_ERROR, ""); } catch (SipException | InvalidArgumentException | ParseException sendException) { @@ -259,6 +264,15 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } + private void releaseAllocatedSsrc(InviteMessageInfo inviteInfo) { + if (inviteInfo == null || inviteInfo.getAllocatedSsrc() == null || inviteInfo.getAllocatedSsrcMediaServerId() == null) { + return; + } + mediaServerService.releaseSsrc(inviteInfo.getAllocatedSsrcMediaServerId(), inviteInfo.getAllocatedSsrc()); + inviteInfo.setAllocatedSsrc(null); + inviteInfo.setAllocatedSsrcMediaServerId(null); + } + private InviteMessageInfo decode(RequestEvent evt) throws SdpException { InviteMessageInfo inviteInfo = new InviteMessageInfo(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/SSRCInfo.java b/src/main/java/com/genersoft/iot/vmp/service/bean/SSRCInfo.java index e74d17e0e..b14bffffd 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/SSRCInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/SSRCInfo.java @@ -7,6 +7,7 @@ public class SSRCInfo { private int port; private String ssrc; + private String allocatedSsrc; private String app; private String Stream; @@ -17,4 +18,8 @@ public class SSRCInfo { this.Stream = stream; } + public String getSsrcToRelease() { + return allocatedSsrc; + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java index 991e406ec..8d25e4669 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java @@ -102,6 +102,9 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { } SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaApp.GB28181, streamId); + if (presetSSRC == null) { + ssrcInfo.setAllocatedSsrc(ssrc); + } RTPServerParam rtpServerParam = new RTPServerParam(mediaServer, MediaApp.GB28181, streamId, ssrcCheck ? Long.parseLong(ssrc): 0L, null, onlyAuto, disableAuto, false, tcpMode); int rtpServerPort = openRTPServer(rtpServerParam, ((code, msg, data) -> { if (code == InviteErrorCode.SUCCESS.getCode()) { @@ -113,6 +116,7 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { // 释放ssrc if (presetSSRC == null) { ssrcFactory.releaseSsrc(mediaServer.getId(), ssrc); + ssrcInfo.setAllocatedSsrc(null); } OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult(); openRTPServerResult.setSsrcInfo(ssrcInfo); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcSendRtpController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcSendRtpController.java index d809d61ea..ee7d04bf1 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcSendRtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcSendRtpController.java @@ -56,6 +56,7 @@ public class RedisRpcSendRtpController extends RpcController { if (mediaServerItem == null) { RedisRpcResponse response = request.getResponse(); response.setStatusCode(ErrorCode.SUCCESS.getCode()); + return response; } // 自平台内容 int localPort = sendRtpServerService.getNextPort(mediaServerItem); @@ -63,6 +64,7 @@ public class RedisRpcSendRtpController extends RpcController { log.info("[redis-rpc] getSendRtpItem->服务器端口资源不足" ); RedisRpcResponse response = request.getResponse(); response.setStatusCode(ErrorCode.SUCCESS.getCode()); + return response; } // 写入redis, 超时时回复 sendRtpItem.setStatus(1); @@ -72,6 +74,7 @@ public class RedisRpcSendRtpController extends RpcController { // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); sendRtpItem.setSsrc(ssrc); + sendRtpItem.setAllocatedSsrc(ssrc); } sendRtpServerService.update(sendRtpItem); RedisRpcResponse response = request.getResponse(); @@ -99,6 +102,7 @@ public class RedisRpcSendRtpController extends RpcController { MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); if (mediaServer == null) { log.info("[redis-rpc] startSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() ); + clearSendRtpItem(sendRtpItem); WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer"); response.setBody(wvpResult); return response; @@ -106,6 +110,7 @@ public class RedisRpcSendRtpController extends RpcController { MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream()); if (mediaInfo == null) { log.info("[redis-rpc] startSendRtp->流不在线: {}/{}", sendRtpItem.getApp(), sendRtpItem.getStream() ); + clearSendRtpItem(sendRtpItem); WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "流不在线"); response.setBody(wvpResult); return response; @@ -114,6 +119,7 @@ public class RedisRpcSendRtpController extends RpcController { mediaServerService.startSendRtp(mediaServer, sendRtpItem); }catch (ControllerException exception) { log.info("[redis-rpc] 发流失败: {}/{}, 目标地址: {}:{}, {}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort(), exception.getMsg()); + clearSendRtpItem(sendRtpItem); WVPResult wvpResult = WVPResult.fail(exception.getCode(), exception.getMsg()); response.setBody(wvpResult); return response; @@ -143,6 +149,7 @@ public class RedisRpcSendRtpController extends RpcController { MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); if (mediaServer == null) { log.info("[redis-rpc] stopSendRtp->未找到MediaServer: {}", sendRtpItem.getMediaServerId() ); + clearSendRtpItem(sendRtpItem); WVPResult wvpResult = WVPResult.fail(ErrorCode.ERROR100.getCode(), "未找到MediaServer"); response.setBody(wvpResult); return response; @@ -155,9 +162,20 @@ public class RedisRpcSendRtpController extends RpcController { response.setBody(WVPResult.fail(exception.getCode(), exception.getMsg())); return response; } + clearSendRtpItem(sendRtpItem); log.info("[redis-rpc] 停止推流成功: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); response.setBody(WVPResult.success()); return response; } + private void clearSendRtpItem(SendRtpInfo sendRtpItem) { + if (sendRtpItem == null) { + return; + } + sendRtpServerService.delete(sendRtpItem); + if (sendRtpItem.getMediaServerId() != null) { + mediaServerService.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrcToRelease()); + } + } + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java index babaa0f45..84f90d3b9 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java @@ -76,6 +76,7 @@ public class RedisRpcStreamPushController extends RpcController { // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServer.getId()) : ssrcFactory.getPlayBackSsrc(mediaServer.getId()); sendRtpItem.setSsrc(ssrc); + sendRtpItem.setAllocatedSsrc(ssrc); } sendRtpItem.setMediaServerId(mediaServer.getId()); sendRtpItem.setLocalIp(mediaServer.getSdpIp()); @@ -95,6 +96,7 @@ public class RedisRpcStreamPushController extends RpcController { // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(hookData.getMediaServer().getId()) : ssrcFactory.getPlayBackSsrc(hookData.getMediaServer().getId()); sendRtpItem.setSsrc(ssrc); + sendRtpItem.setAllocatedSsrc(ssrc); } sendRtpItem.setMediaServerId(hookData.getMediaServer().getId()); sendRtpItem.setLocalIp(hookData.getMediaServer().getSdpIp()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index de224dfe0..ef0eb43a7 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -110,6 +110,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService { // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(hookData.getMediaServer().getId()) : ssrcFactory.getPlayBackSsrc(hookData.getMediaServer().getId()); sendRtpItem.setSsrc(ssrc); + sendRtpItem.setAllocatedSsrc(ssrc); } sendRtpItem.setMediaServerId(hookData.getMediaServer().getId()); sendRtpItem.setLocalIp(hookData.getMediaServer().getSdpIp());