From ead059fd0106a8116a4fce92629993650b2ab4cc Mon Sep 17 00:00:00 2001 From: panlinlin <648540858@qq.com> Date: Thu, 21 May 2026 00:05:48 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84SSRC=E7=AE=A1=E7=90=86?= =?UTF-8?q?=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/gb28181/bean/InviteMessageInfo.java | 2 - .../iot/vmp/gb28181/bean/SendRtpInfo.java | 9 +- .../iot/vmp/gb28181/bean/SsrcTransaction.java | 9 +- .../service/impl/DeviceServiceImpl.java | 1 - .../service/impl/PlatformServiceImpl.java | 35 ---- .../gb28181/service/impl/PlayServiceImpl.java | 55 +---- .../iot/vmp/gb28181/session/SSRCFactory.java | 197 +++++++----------- .../vmp/gb28181/session/SendSsrcFactory.java | 29 +++ .../gb28181/transmit/cmd/ISIPCommander.java | 2 +- .../transmit/cmd/impl/SIPCommander.java | 12 +- .../cmd/impl/SIPCommanderForPlatform.java | 3 - .../request/impl/ByeRequestProcessor.java | 11 - .../request/impl/InviteRequestProcessor.java | 31 +-- .../media/service/IMediaServerService.java | 2 - .../service/impl/MediaServerServiceImpl.java | 24 --- .../iot/vmp/service/bean/SSRCInfo.java | 5 - .../service/impl/RtpServerServiceImpl.java | 19 -- .../control/RedisRpcSendRtpController.java | 14 +- .../control/RedisRpcStreamPushController.java | 16 +- .../redisMsg/service/RedisRpcServiceImpl.java | 10 +- .../vmp/gb28181/session/SSRCFactoryTest.java | 176 ++++++++++++++++ 21 files changed, 321 insertions(+), 341 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/session/SendSsrcFactory.java create mode 100644 src/test/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactoryTest.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteMessageInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteMessageInfo.java index b745f494c..beadb6901 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteMessageInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/InviteMessageInfo.java @@ -10,8 +10,6 @@ public class InviteMessageInfo { private String sourceChannelId; private String sessionName; private String ssrc; - private String allocatedSsrc; - private String allocatedSsrcMediaServerId; private boolean tcp; private boolean tcpActive; private String callId; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java index f01299564..5f89c94c3 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java @@ -24,11 +24,6 @@ public class SendRtpInfo { */ private String ssrc; - /** - * 从SSRC池中分配的SSRC - */ - private String allocatedSsrc; - /** * 目标平台或设备的编号 */ @@ -253,7 +248,5 @@ public class SendRtpInfo { } } - public String getSsrcToRelease() { - return allocatedSsrc; - } + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SsrcTransaction.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SsrcTransaction.java index 10ba63ca1..4c64f61cf 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SsrcTransaction.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SsrcTransaction.java @@ -47,11 +47,6 @@ public class SsrcTransaction { */ private String ssrc; - /** - * 从SSRC池中分配的SSRC - */ - private String allocatedSsrc; - /** * 事务信息 */ @@ -94,7 +89,5 @@ public class SsrcTransaction { public SsrcTransaction() { } - public String getSsrcToRelease() { - return allocatedSsrc; - } + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 8feb2389e..21b43849e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -266,7 +266,6 @@ public class DeviceServiceImpl implements IDeviceService { List ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(device.getDeviceId()); if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) { for (SsrcTransaction ssrcTransaction : ssrcTransactions) { - mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrcToRelease()); receiveRtpServerService.closeRTPServerByMediaServerId(ssrcTransaction.getMediaServerId(), ssrcTransaction.getApp(), ssrcTransaction.getStream()); sessionManager.removeByCallId(ssrcTransaction.getCallId()); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index f6cc67863..094294425 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -128,7 +128,6 @@ public class PlatformServiceImpl implements IPlatformService { } sendRtpServerService.delete(sendRtpItem); if (mediaServerItem != null) { - ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrcToRelease()); boolean stopResult = mediaServerService.initStopSendRtp(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); if (stopResult) { Platform platform = queryPlatformByServerGBId(sendRtpItem.getTargetId()); @@ -335,7 +334,6 @@ public class PlatformServiceImpl implements IPlatformService { if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp()) && sendRtpItem.isSendToPlatform()) { Platform platform = platformMapper.getParentPlatByServerGBId(sendRtpItem.getTargetId()); CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId()); - ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrcToRelease()); try { commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel); } catch (SipException | InvalidArgumentException | ParseException e) { @@ -522,7 +520,6 @@ public class PlatformServiceImpl implements IPlatformService { List sendRtpItems = sendRtpServerService.queryForPlatform(platformId); if (sendRtpItems != null && !sendRtpItems.isEmpty()) { for (SendRtpInfo sendRtpItem : sendRtpItems) { - ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrcToRelease()); sendRtpServerService.delete(sendRtpItem); MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null); @@ -626,7 +623,6 @@ public class PlatformServiceImpl implements IPlatformService { log.error("[点播超时], 发送BYE失败 {}", e.getMessage()); } finally { timeoutCallback.run(1, "收流超时"); - mediaServerService.releaseSsrc(mediaServerItem.getId(), data.getSsrcInfo().getSsrcToRelease()); receiveRtpServerService.closeRTPServer(mediaServerItem, data.getSsrcInfo().getApp(), data.getSsrcInfo().getStream()); sessionManager.removeByStream(data.getSsrcInfo().getApp(), data.getSsrcInfo().getStream()); } @@ -702,7 +698,6 @@ public class PlatformServiceImpl implements IPlatformService { // ssrc检验 // 更新ssrc log.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); - releaseAllocatedSsrc(mediaServerItem, ssrcInfo); Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse); if (!result) { try { @@ -711,8 +706,6 @@ public class PlatformServiceImpl implements IPlatformService { } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { log.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage()); } finally { - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrcToRelease()); receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream()); sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); @@ -724,7 +717,6 @@ public class PlatformServiceImpl implements IPlatformService { } }else { ssrcInfo.setSsrc(ssrcInResponse); - updateSsrcTransaction(ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse, null); inviteInfo.setSsrcInfo(ssrcInfo); inviteInfo.setStream(ssrcInfo.getStream()); if (userSetting.getBroadcastForPlatform().equalsIgnoreCase("TCP-ACTIVE")) { @@ -738,9 +730,7 @@ public class PlatformServiceImpl implements IPlatformService { inviteStreamService.updateInviteInfo(inviteInfo); } }else { - releaseAllocatedSsrc(mediaServerItem, ssrcInfo); ssrcInfo.setSsrc(ssrcInResponse); - updateSsrcTransaction(ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse, null); inviteInfo.setSsrcInfo(ssrcInfo); inviteInfo.setStream(ssrcInfo.getStream()); if (userSetting.getBroadcastForPlatform().equalsIgnoreCase("TCP-ACTIVE")) { @@ -761,7 +751,6 @@ public class PlatformServiceImpl implements IPlatformService { if (ssrcTransaction == null) { return; } - releaseAllocatedSsrc(mediaServerItem, ssrcInfo); sessionManager.removeByStream(ssrcInfo.getApp(), inviteInfo.getStream()); inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse); @@ -770,7 +759,6 @@ public class PlatformServiceImpl implements IPlatformService { ssrcTransaction.setApp(ssrcInfo.getApp()); ssrcTransaction.setStream(inviteInfo.getStream()); ssrcTransaction.setSsrc(ssrcInResponse); - ssrcTransaction.setAllocatedSsrc(null); ssrcTransaction.setMediaServerId(mediaServerItem.getId()); ssrcTransaction.setSipTransactionInfo(new SipTransactionInfo((SIPResponse) responseEvent.getResponse())); ssrcTransaction.setType(inviteSessionType); @@ -781,25 +769,6 @@ public class PlatformServiceImpl implements IPlatformService { } } - private void releaseAllocatedSsrc(MediaServer mediaServerItem, SSRCInfo ssrcInfo) { - if (ssrcInfo == null || ssrcInfo.getAllocatedSsrc() == null) { - return; - } - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getAllocatedSsrc()); - ssrcInfo.setAllocatedSsrc(null); - } - - private void updateSsrcTransaction(String app, String stream, String ssrc, String allocatedSsrc) { - SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(app, stream); - if (ssrcTransaction == null) { - return; - } - ssrcTransaction.setSsrc(ssrc); - ssrcTransaction.setAllocatedSsrc(allocatedSsrc); - sessionManager.put(ssrcTransaction); - } - - private void tcpActiveHandler(Platform platform, CommonGBChannel channel, String contentString, MediaServer mediaServerItem, boolean ssrcCheck, SSRCInfo ssrcInfo, ErrorCallback callback){ @@ -830,8 +799,6 @@ public class PlatformServiceImpl implements IPlatformService { } catch (SdpException e) { log.error("[TCP主动连接对方] serverGbId: {}, channelId: {}, 解析200OK的SDP信息失败", platform.getServerGBId(), channel.getGbDeviceId(), e); receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream()); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrcToRelease()); sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), @@ -855,8 +822,6 @@ public class PlatformServiceImpl implements IPlatformService { receiveRtpServerService.closeRTPServer(mediaServerItem, app, stream); InviteInfo inviteInfo = inviteStreamService.getInviteInfo(null, channel.getGbId(), stream); if (inviteInfo != null) { - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), inviteInfo.getSsrcInfo().getSsrcToRelease()); inviteStreamService.removeInviteInfo(inviteInfo); } sessionManager.removeByStream(app, stream); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index d9f72cfe9..96baea363 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe; import com.genersoft.iot.vmp.gb28181.service.*; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; +import com.genersoft.iot.vmp.gb28181.session.SendSsrcFactory; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; @@ -109,6 +110,9 @@ public class PlayServiceImpl implements IPlayService { @Autowired private SSRCFactory ssrcFactory; + @Autowired + private SendSsrcFactory sendSsrcFactory; + @Autowired private IPlatformService platformService; @@ -335,9 +339,6 @@ public class PlayServiceImpl implements IPlayService { InviteInfo inviteInfoInCatch = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId()); if (inviteInfoInCatch != null ) { if (inviteInfoInCatch.getStreamInfo() == null) { - // 释放生成的ssrc,使用上一次申请的 - - ssrcFactory.releaseSsrc(mediaServer.getId(), ssrc); // 点播发起了但是尚未成功, 仅注册回调等待结果即可 inviteStreamService.once(InviteSessionType.PLAY, channel.getId(), null, callback); log.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId({}): {}", device.getDeviceId(), channel.getDeviceId(), channel.getId()); @@ -480,25 +481,23 @@ public class PlayServiceImpl implements IPlayService { private void talk(MediaServer mediaServerItem, Device device, DeviceChannel channel, String stream, SipSubscribe.Event errorEvent, Runnable timeoutCallback, AudioBroadcastEvent audioEvent) { - String playSsrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId()); + String ySsrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId()); - if (playSsrc == null) { + if (ySsrc == null) { audioEvent.call("ssrc已经用尽"); return; } + String sendSsrc = sendSsrcFactory.getSendSsrc("0"); SendRtpInfo sendRtpInfo; try { - sendRtpInfo = sendRtpServerService.createSendRtpInfo(mediaServerItem, null, null, playSsrc, device.getDeviceId(), MediaStreamUtil.GB28181_TALK, stream, + sendRtpInfo = sendRtpServerService.createSendRtpInfo(mediaServerItem, null, null, sendSsrc, device.getDeviceId(), MediaStreamUtil.GB28181_TALK, stream, channel.getId(), true, false); if (sendRtpInfo == null) { - ssrcFactory.releaseSsrc(mediaServerItem.getId(), playSsrc); audioEvent.call("获取发流端口失败"); return; } - sendRtpInfo.setAllocatedSsrc(playSsrc); sendRtpInfo.setPlayType(InviteStreamType.TALK); }catch (PlayException e) { - ssrcFactory.releaseSsrc(mediaServerItem.getId(), playSsrc); log.info("[语音对讲]开始 获取发流端口失败 deviceId: {}, channelId: {},", device.getDeviceId(), channel.getDeviceId()); return; } @@ -525,7 +524,6 @@ public class PlayServiceImpl implements IPlayService { log.error("[语音对讲]超时, 发送BYE失败 {}", e.getMessage()); } finally { timeoutCallback.run(); - mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease()); sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream()); } }, userSetting.getPlayTimeout()); @@ -534,7 +532,6 @@ public class PlayServiceImpl implements IPlayService { Integer localPort = mediaServerService.startSendRtpPassive(mediaServerItem, sendRtpInfo, userSetting.getPlayTimeout() * 1000); if (localPort == null || localPort <= 0) { timeoutCallback.run(); - mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease()); sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream()); return; } @@ -543,7 +540,6 @@ public class PlayServiceImpl implements IPlayService { receiveRtpServerService.addAuthenticateInfoForGb28181Talk(mediaServerItem, sendRtpInfo.getStream()); }catch (ControllerException e) { - mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease()); log.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channel.getDeviceId()); audioEvent.call("失败, " + e.getMessage()); // 查看是否已经建立了通道,存在则发送bye @@ -553,7 +549,7 @@ public class PlayServiceImpl implements IPlayService { // 查看设备是否已经在推流 try { - cmder.talkStreamCmd(mediaServerItem, sendRtpInfo, device, channel, callId, (hookData) -> { + cmder.talkStreamCmd(mediaServerItem, sendRtpInfo, ySsrc, device, channel, callId, (hookData) -> { log.info("[语音对讲] 流已生成, 开始推流: " + hookData); dynamicTask.stop(timeOutTaskKey); // TODO 暂不做处理 @@ -588,8 +584,6 @@ public class PlayServiceImpl implements IPlayService { }, (event) -> { dynamicTask.stop(timeOutTaskKey); receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream()); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease()); sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream()); errorEvent.response(event); }, userSetting.getPlayTimeout().longValue()); @@ -598,9 +592,6 @@ public class PlayServiceImpl implements IPlayService { log.error("[命令发送失败] 对讲消息: {}", e.getMessage()); dynamicTask.stop(timeOutTaskKey); receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream()); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease()); - sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream()); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult(); eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent; @@ -875,7 +866,6 @@ public class PlayServiceImpl implements IPlayService { // ssrc检验 // 更新ssrc log.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); - releaseAllocatedSsrc(mediaServerItem, ssrcInfo); Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse); if (!result) { try { @@ -885,8 +875,6 @@ public class PlayServiceImpl implements IPlayService { log.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage()); } - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrcToRelease()); - sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), @@ -897,7 +885,6 @@ public class PlayServiceImpl implements IPlayService { }else { ssrcInfo.setSsrc(ssrcInResponse); - updateSsrcTransaction(ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse, null); inviteInfo.setSsrcInfo(ssrcInfo); inviteInfo.setStream(ssrcInfo.getStream()); if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { @@ -910,9 +897,7 @@ public class PlayServiceImpl implements IPlayService { inviteStreamService.updateInviteInfo(inviteInfo); } } else { - releaseAllocatedSsrc(mediaServerItem, ssrcInfo); ssrcInfo.setSsrc(ssrcInResponse); - updateSsrcTransaction(ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse, null); inviteInfo.setSsrcInfo(ssrcInfo); inviteInfo.setStream(ssrcInfo.getStream()); if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) { @@ -932,14 +917,12 @@ public class PlayServiceImpl implements IPlayService { if (ssrcTransaction == null) { return; } - releaseAllocatedSsrc(mediaServerItem, ssrcInfo); sessionManager.removeByStream(MediaStreamUtil.RTP_APP, inviteInfo.getStream()); inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse); ssrcTransaction.setDeviceId(device.getDeviceId()); ssrcTransaction.setChannelId(ssrcTransaction.getChannelId()); ssrcTransaction.setCallId(ssrcTransaction.getCallId()); ssrcTransaction.setSsrc(ssrcInResponse); - ssrcTransaction.setAllocatedSsrc(null); ssrcTransaction.setApp(MediaStreamUtil.RTP_APP); ssrcTransaction.setStream(inviteInfo.getStream()); ssrcTransaction.setMediaServerId(mediaServerItem.getId()); @@ -952,24 +935,6 @@ public class PlayServiceImpl implements IPlayService { } } - private void releaseAllocatedSsrc(MediaServer mediaServerItem, SSRCInfo ssrcInfo) { - if (ssrcInfo == null || ssrcInfo.getAllocatedSsrc() == null) { - return; - } - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getAllocatedSsrc()); - ssrcInfo.setAllocatedSsrc(null); - } - - private void updateSsrcTransaction(String app, String stream, String ssrc, String allocatedSsrc) { - SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(app, stream); - if (ssrcTransaction == null) { - return; - } - ssrcTransaction.setSsrc(ssrc); - ssrcTransaction.setAllocatedSsrc(allocatedSsrc); - sessionManager.put(ssrcTransaction); - } - @Override public void download(Device device, DeviceChannel channel, String startTime, String endTime, int downloadSpeed, ErrorCallback callback) { @@ -1603,8 +1568,6 @@ public class PlayServiceImpl implements IPlayService { mediaServerService.stopSendRtp(mediaServer, sendRtpInfo.getApp(), sendRtpInfo.getStream(), sendRtpInfo.getSsrc()); } - ssrcFactory.releaseSsrc(mediaServerId, sendRtpInfo.getSsrcToRelease()); - SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream()); if (ssrcTransaction != null) { try { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java index 1c9e5ba4d..7408cb469 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactory.java @@ -1,143 +1,108 @@ package com.genersoft.iot.vmp.gb28181.session; +import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.conf.SipConfig; -import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils; +import com.genersoft.iot.vmp.media.zlm.dto.ZLMResult; +import jakarta.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; -import java.util.ArrayList; +import java.util.BitSet; import java.util.List; -import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; -/** - * ssrc使用 - */ @Slf4j @Component public class SSRCFactory { - /** - * 播流最大并发个数 - */ - private static final Integer MAX_STREAM_COUNT = 10000; - - /** - * 播流最大并发个数 - */ - private static final String SSRC_INFO_KEY = "VMP_SSRC_INFO_"; + private final ConcurrentHashMap usedMap = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> { + Thread t = new Thread(r, "ssrc-rebuild"); + t.setDaemon(true); + return t; + }); @Autowired - private StringRedisTemplate redisTemplate; + private ZLMRESTfulUtils zlmresTfulUtils; + + @Autowired + private IMediaServerService mediaServerService; @Autowired private SipConfig sipConfig; - @Autowired - private UserSetting userSetting; + private String domainPart; + @PostConstruct + public void init() { + String sipDomain = sipConfig.getDomain(); + domainPart = sipDomain.length() >= 8 ? sipDomain.substring(3, 8) : sipDomain; + scheduler.scheduleAtFixedRate(this::rebuild, 10, 30, TimeUnit.SECONDS); + } - public void initMediaServerSSRC(String mediaServerId, Set usedSet) { - String ssrcPrefix = getSsrcPrefix(); - String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId; - List ssrcList = new ArrayList<>(); - for (int i = 1; i < MAX_STREAM_COUNT; i++) { - String ssrc = String.format("%s%04d", ssrcPrefix, i); + public String getPlaySsrc(String mediaServerId) { + String suffix = allocate(mediaServerId); + return suffix != null ? "0" + suffix : null; + } - if (null == usedSet || !usedSet.contains(ssrc)) { - ssrcList.add(ssrc); + public String getPlayBackSsrc(String mediaServerId) { + String suffix = allocate(mediaServerId); + return suffix != null ? "1" + suffix : null; + } + private String allocate(String mediaServerId) { + BitSet bits = usedMap.computeIfAbsent(mediaServerId, k -> new BitSet(10000)); + int start = ThreadLocalRandom.current().nextInt(10000); + int index = start; + do { + if (!bits.get(index)) { + bits.set(index); + return domainPart + String.format("%04d", index); + } + index = (index + 1) % 10000; + } while (index != start); + log.warn("[SSRC] 媒体节点 {} 的SSRC已用尽", mediaServerId); + return null; + } + + void rebuild() { + List servers = mediaServerService.getAll(); + for (MediaServer server : servers) { + BitSet bits = new BitSet(10000); + int count = 0; + try { + ZLMResult result = zlmresTfulUtils.getMediaList(server, null, null, "rtsp", null); + if (result != null && result.getCode() == 0 && result.getData() != null) { + List list = (List) result.getData(); + for (JSONObject obj : list) { + if (obj.getIntValue("originType") != 3) continue; + String originUrl = obj.getString("originUrl"); + if (originUrl == null) continue; + int idx = originUrl.lastIndexOf("/rtp/"); + if (idx == -1) continue; + try { + int suffix = (int) (Long.parseLong(originUrl.substring(idx + 5), 16) % 10000); + bits.set(suffix); + count++; + } catch (NumberFormatException ignored) { + } + } + } + } catch (Exception e) { + log.warn("[SSRC重建] 查询媒体节点 {} 失败: {}", server.getId(), e.getMessage()); + } + usedMap.put(server.getId(), bits); + if (log.isDebugEnabled()) { + log.debug("[SSRC重建] 节点 {} 已占用 {} 个SSRC", server.getId(), count); } } - if (redisTemplate.opsForSet().size(redisKey) != null) { - redisTemplate.delete(redisKey); - } - redisTemplate.opsForSet().add(redisKey, ssrcList.toArray(new String[0])); } - - - /** - * 获取视频预览的SSRC值,第一位固定为0 - * - * @return ssrc - */ - public String getPlaySsrc(String mediaServerId) { - return "0" + getSN(mediaServerId); - } - - /** - * 获取录像回放的SSRC值,第一位固定为1 - */ - public String getPlayBackSsrc(String mediaServerId) { - return "1" + getSN(mediaServerId); - } - - /** - * 释放ssrc,主要用完的ssrc一定要释放,否则会耗尽 - * - * @param ssrc 需要重置的ssrc - */ - public void releaseSsrc(String mediaServerId, String ssrc) { - if (ssrc == null) { - return; - } - if (!isFactorySsrc(ssrc)) { - log.warn("[释放 SSRC] 忽略非SSRC池分配的值: {}", ssrc); - return; - } - String sn = ssrc.substring(1); - log.debug("[释放 SSRC] SSRC:{} -> SN: {}", ssrc, sn); - String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId; - redisTemplate.opsForSet().add(redisKey, sn); - } - - /** - * 获取后四位数SN,随机数 - */ - private String getSN(String mediaServerId) { - String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId; - Long size = redisTemplate.opsForSet().size(redisKey); - if (size == null || size == 0) { - log.info("[获取 SSRC 失败] redisKey: {}", redisKey); - throw new RuntimeException("ssrc已经用完"); - } else { - // 在集合中移除并返回一个随机成员。 - return redisTemplate.opsForSet().pop(redisKey); - } - } - - /** - * 重置一个流媒体服务的所有ssrc - * - * @param mediaServerId 流媒体服务ID - */ - public void reset(String mediaServerId) { - this.initMediaServerSSRC(mediaServerId, null); - } - - /** - * 是否已经存在了某个MediaServer的SSRC信息 - * - * @param mediaServerId 流媒体服务ID - */ - public boolean hasMediaServerSSRC(String mediaServerId) { - String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId; - return Boolean.TRUE.equals(redisTemplate.hasKey(redisKey)); - } - - private String getSsrcPrefix() { - String sipDomain = sipConfig.getDomain(); - return sipDomain.length() >= 8 ? sipDomain.substring(3, 8) : sipDomain; - } - - private boolean isFactorySsrc(String ssrc) { - if (ssrc.length() < 2) { - return false; - } - String sn = ssrc.substring(1); - String ssrcPrefix = getSsrcPrefix(); - return sn.length() == ssrcPrefix.length() + 4 && sn.startsWith(ssrcPrefix); - } - } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SendSsrcFactory.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SendSsrcFactory.java new file mode 100644 index 000000000..637c36303 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SendSsrcFactory.java @@ -0,0 +1,29 @@ +package com.genersoft.iot.vmp.gb28181.session; + +import com.genersoft.iot.vmp.conf.SipConfig; +import jakarta.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.concurrent.ThreadLocalRandom; + +@Slf4j +@Component +public class SendSsrcFactory { + + @Autowired + private SipConfig sipConfig; + + private String domainPart; + + @PostConstruct + public void init() { + String sipDomain = sipConfig.getDomain(); + domainPart = sipDomain.length() >= 8 ? sipDomain.substring(3, 8) : sipDomain; + } + + public String getSendSsrc(String prefix) { + return prefix + domainPart + String.format("%04d", ThreadLocalRandom.current().nextInt(10000)); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java index deb86fc99..6bac754b3 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommander.java @@ -94,7 +94,7 @@ public interface ISIPCommander { */ void streamByeCmd(Device device, String channelId, String app, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException; - void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, Device device, DeviceChannel channelId, String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException; + void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, String ySsrc, Device device, DeviceChannel channelId, String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException; void streamByeCmd(Device device, String channelId, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 8f30fbc1a..53a5e75cf 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -281,7 +281,6 @@ public class SIPCommander implements ISIPCommander { Request request = headerProvider.createInviteRequest(device, channel.getDeviceId(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(),sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport())); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> { sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrcToRelease()); errorEvent.response(e); }), e -> { ResponseEvent responseEvent = (ResponseEvent) e.event; @@ -290,7 +289,6 @@ public class SIPCommander implements ISIPCommander { SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), callId,ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.PLAY); - ssrcTransaction.setAllocatedSsrc(ssrcInfo.getAllocatedSsrc()); sessionManager.put(ssrcTransaction); okEvent.response(e); }, timeout); @@ -388,7 +386,6 @@ public class SIPCommander implements ISIPCommander { channel.getId(), sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport()).getCallId(), ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.PLAYBACK); - ssrcTransaction.setAllocatedSsrc(ssrcInfo.getAllocatedSsrc()); sessionManager.put(ssrcTransaction); okEvent.response(event); }, timeout); @@ -482,14 +479,13 @@ public class SIPCommander implements ISIPCommander { SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), response.getCallIdHeader().getCallId(), ssrcInfo.getApp(), ssrcInfo.getStream(), ssrc, mediaServerItem.getId(), response, InviteSessionType.DOWNLOAD); - ssrcTransaction.setAllocatedSsrc(ssrcInfo.getAllocatedSsrc()); sessionManager.put(ssrcTransaction); okEvent.response(event); }, timeout); } @Override - public void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, Device device, DeviceChannel channel, + public void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, String ySsrc, Device device, DeviceChannel channel, String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException { @@ -535,22 +531,20 @@ public class SIPCommander implements ISIPCommander { content.append("a=sendrecv\r\n"); content.append("a=rtpmap:8 PCMA/8000\r\n"); - content.append("y=" + sendRtpItem.getSsrc() + "\r\n");//ssrc + content.append("y=" + ySsrc + "\r\n");//ssrc // f字段:f= v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率 content.append("f=v/////a/1/8/1" + "\r\n"); Request request = headerProvider.createInviteRequest(device, channel.getDeviceId(), content.toString(), - SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, sendRtpItem.getSsrc(), callIdHeader); + SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ySsrc, callIdHeader); sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> { sessionManager.removeByStream(sendRtpItem.getApp(), sendRtpItem.getStream()); - mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrcToRelease()); errorEvent.response(e); }), e -> { // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值 ResponseEvent responseEvent = (ResponseEvent) e.event; SIPResponse response = (SIPResponse) responseEvent.getResponse(); SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), MediaStreamUtil.GB28181_TALK,sendRtpItem.getApp(), stream, sendRtpItem.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.TALK); - ssrcTransaction.setAllocatedSsrc(sendRtpItem.getAllocatedSsrc()); sessionManager.put(ssrcTransaction); okEvent.response(e); }, timeout); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java index 46198e5fb..c1c515d8f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java @@ -642,7 +642,6 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform { String mediaServerId = sendRtpItem.getMediaServerId(); MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem != null) { - mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrcToRelease()); receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); } SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem, channel); @@ -744,14 +743,12 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform { callIdHeader); sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), request, (e -> { sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); - mediaServerService.releaseSsrc(mediaServer.getId(), ssrcInfo.getSsrcToRelease()); errorEvent.response(e); }), e -> { ResponseEvent responseEvent = (ResponseEvent) e.event; SIPResponse response = (SIPResponse) responseEvent.getResponse(); SsrcTransaction ssrcTransaction = SsrcTransaction.buildForPlatform(platform.getServerGBId(), channel.getGbId(), callIdHeader.getCallId(), ssrcInfo.getApp(), stream, ssrcInfo.getSsrc(), mediaServer.getId(), response, InviteSessionType.BROADCAST); - ssrcTransaction.setAllocatedSsrc(ssrcInfo.getAllocatedSsrc()); sessionManager.put(ssrcTransaction); okEvent.response(e); }); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index 9499ae8ce..654f2d8ff 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -131,9 +131,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In sendRtpServerService.deleteByCallId(callIdHeader.getCallId()); if (mediaServer != null) { mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); - if (userSetting.getUseCustomSsrcForParentInvite()) { - mediaServerService.releaseSsrc(mediaServer.getId(), sendRtpItem.getSsrcToRelease()); - } } } }else { @@ -143,9 +140,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId()); sendRtpServerService.delete(sendRtpItem); mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc()); - if (userSetting.getUseCustomSsrcForParentInvite()) { - mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrcToRelease()); - } } if (sendRtpItem.getServerId().equals(userSetting.getServerId())) { MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId()); @@ -251,11 +245,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In } break; } - // 释放ssrc - MediaServer mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId()); - if (mediaServerItem != null) { - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrcToRelease()); - } sessionManager.removeByCallId(ssrcTransaction.getCallId()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java index f0277784b..b95bef8d9 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/InviteRequestProcessor.java @@ -9,7 +9,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.service.*; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; -import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; +import com.genersoft.iot.vmp.gb28181.session.SendSsrcFactory; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; @@ -39,7 +39,6 @@ import javax.sip.RequestEvent; import javax.sip.SipException; import javax.sip.header.CallIdHeader; import javax.sip.message.Response; -import java.security.SecureRandom; import java.text.ParseException; import java.util.List; import java.util.Vector; @@ -103,7 +102,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements private UserSetting userSetting; @Autowired - private SSRCFactory ssrcFactory; + private SendSsrcFactory sendSsrcFactory; @Override @@ -175,22 +174,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 点播成功, TODO 可以在此处检测cancel命令是否存在,存在则不发送 if (userSetting.getUseCustomSsrcForParentInvite()) { // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 - MediaServer mediaServer = mediaServerService.getOne(streamInfo.getMediaServer().getId()); - if (mediaServer != null) { - String ssrc = "Play".equalsIgnoreCase(finalInviteInfo.getSessionName()) - ? ssrcFactory.getPlaySsrc(streamInfo.getMediaServer().getId()) - : ssrcFactory.getPlayBackSsrc(streamInfo.getMediaServer().getId()); - finalInviteInfo.setSsrc(ssrc); - finalInviteInfo.setAllocatedSsrc(ssrc); - finalInviteInfo.setAllocatedSsrcMediaServerId(streamInfo.getMediaServer().getId()); - } + finalInviteInfo.setSsrc(sendSsrcFactory.getSendSsrc( + "Play".equalsIgnoreCase(finalInviteInfo.getSessionName()) ? "0" : "1")); } // 构建sendRTP内容 SendRtpInfo sendRtpItem = sendRtpServerService.createSendRtpInfo(streamInfo.getMediaServer(), finalInviteInfo.getIp(), finalInviteInfo.getPort(), finalInviteInfo.getSsrc(), platform.getServerGBId(), streamInfo.getApp(), streamInfo.getStream(), channel.getGbId(), finalInviteInfo.isTcp(), platform.isRtcp()); - sendRtpItem.setAllocatedSsrc(finalInviteInfo.getAllocatedSsrc()); if (finalInviteInfo.isTcp() && finalInviteInfo.isTcpActive()) { sendRtpItem.setTcpActive(true); } @@ -208,7 +199,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements // 超时未收到Ack应该回复bye,当前等待时间为10秒 dynamicTask.startDelay(finalInviteInfo.getCallId(), () -> { log.info("[Ack ] 等待超时, {}/{}", finalInviteInfo.getCallId(), channel.getGbDeviceId()); - mediaServerService.releaseSsrc(streamInfo.getMediaServer().getId(), sendRtpItem.getSsrcToRelease()); // 回复bye sendBye(platform, finalInviteInfo.getCallId()); }, 60 * 1000); @@ -249,7 +239,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements log.error("[命令发送失败] invite BAD_REQUEST: {}", sendException.getMessage()); } } catch (PlayException e) { - releaseAllocatedSsrc(inviteInfo); try { responseAck(request, e.getCode(), e.getMsg()); } catch (SipException | InvalidArgumentException | ParseException sendException) { @@ -257,7 +246,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } catch (Exception e) { log.error("[Invite处理异常] ", e); - releaseAllocatedSsrc(inviteInfo); try { responseAck(request, Response.SERVER_INTERNAL_ERROR, ""); } catch (SipException | InvalidArgumentException | ParseException sendException) { @@ -266,15 +254,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements } } - private void releaseAllocatedSsrc(InviteMessageInfo inviteInfo) { - if (inviteInfo == null || inviteInfo.getAllocatedSsrc() == null || inviteInfo.getAllocatedSsrcMediaServerId() == null) { - return; - } - mediaServerService.releaseSsrc(inviteInfo.getAllocatedSsrcMediaServerId(), inviteInfo.getAllocatedSsrc()); - inviteInfo.setAllocatedSsrc(null); - inviteInfo.setAllocatedSsrcMediaServerId(null); - } - private InviteMessageInfo decode(RequestEvent evt) throws SdpException { InviteMessageInfo inviteInfo = new InviteMessageInfo(); @@ -499,7 +478,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements SessionDescription sdp = gb28181Sdp.getBaseSdb(); if (ObjectUtils.isEmpty(gb28181Sdp.getSsrc()) ) { - String ssrc = Integer.toUnsignedString(new SecureRandom().nextInt()); + String ssrc = sendSsrcFactory.getSendSsrc("0"); log.warn("来自设备的Invite请求,未携带SSRC,生成随机ssrc: {},requesterId: {}/{}", ssrc, inviteInfo.getRequesterId(), inviteInfo.getSourceChannelId()); gb28181Sdp.setSsrc(ssrc); } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java index a65a91961..cf09025df 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java @@ -49,8 +49,6 @@ public interface IMediaServerService { void removeCount(String mediaServerId); - void releaseSsrc(String mediaServerItemId, String ssrc); - void clearMediaServerForOnline(); void add(MediaServer mediaSerItem); diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 0cc045ab5..1547e0319 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; import com.genersoft.iot.vmp.gb28181.bean.TalkRtpInfo; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; -import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.bean.RecordInfo; @@ -57,9 +56,6 @@ import java.util.*; @Service public class MediaServerServiceImpl implements IMediaServerService { - @Autowired - private SSRCFactory ssrcFactory; - @Autowired private UserSetting userSetting; @@ -150,10 +146,6 @@ public class MediaServerServiceImpl implements IMediaServerService { if (ObjectUtils.isEmpty(mediaServer.getId())) { continue; } - // 更新 - if (!ssrcFactory.hasMediaServerSSRC(mediaServer.getId())) { - ssrcFactory.initMediaServerSSRC(mediaServer.getId(), null); - } // 查询redis是否存在此mediaServer String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId(); Boolean hasKey = redisTemplate.hasKey(key); @@ -229,21 +221,11 @@ public class MediaServerServiceImpl implements IMediaServerService { return mediaNodeServerService.updateRtpServerSSRC(mediaServer, app, streamId, ssrc); } - @Override - public void releaseSsrc(String mediaServerId, String ssrc) { - MediaServer mediaServer = getOne(mediaServerId); - if (mediaServer == null || ssrc == null) { - return; - } - ssrcFactory.releaseSsrc(mediaServerId, ssrc); - } - /** * 媒体服务节点 重启后重置他的推流信息, TODO 给正在使用的设备发送停止命令 */ @Override public void clearRTPServer(MediaServer mediaServer) { - ssrcFactory.reset(mediaServer.getId()); } @Override @@ -254,12 +236,6 @@ public class MediaServerServiceImpl implements IMediaServerService { mediaServerMapper.add(mediaServer); } - MediaServer mediaServerInRedis = getOne(mediaServer.getId()); - - if (mediaServerInRedis == null || !ssrcFactory.hasMediaServerSSRC(mediaServer.getId())) { - ssrcFactory.initMediaServerSSRC(mediaServer.getId(),null); - } - String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId(); redisTemplate.opsForHash().put(key, mediaServer.getId(), mediaServer); if (mediaServer.isStatus()) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/SSRCInfo.java b/src/main/java/com/genersoft/iot/vmp/service/bean/SSRCInfo.java index 5ee85ca78..a700d7161 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/SSRCInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/SSRCInfo.java @@ -7,7 +7,6 @@ public class SSRCInfo { private int port; private String ssrc; - private String allocatedSsrc; private String app; private String stream; @@ -18,8 +17,4 @@ public class SSRCInfo { this.stream = stream; } - public String getSsrcToRelease() { - return allocatedSsrc; - } - } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java index 3f3b8d840..9e5732087 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java @@ -106,9 +106,6 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { } SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamId); - if (presetSSRC == null) { - ssrcInfo.setAllocatedSsrc(ssrc); - } RTPServerParam rtpServerParam = new RTPServerParam(mediaServer, MediaStreamUtil.RTP_APP, streamId, ssrcCheck ? Long.parseLong(ssrc): 0L, null, onlyAuto, disableAuto, false, tcpMode); int rtpServerPort = openCommonRTPServer(rtpServerParam, ((code, msg, data) -> { if (code == InviteErrorCode.SUCCESS.getCode()) { @@ -117,11 +114,6 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { openRTPServerResult.setSsrcInfo(ssrcInfo); callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), openRTPServerResult); } else { - // 释放ssrc - if (presetSSRC == null) { - ssrcFactory.releaseSsrc(mediaServer.getId(), ssrc); - ssrcInfo.setAllocatedSsrc(null); - } OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult(); openRTPServerResult.setSsrcInfo(ssrcInfo); callback.run(code, msg, openRTPServerResult); @@ -170,9 +162,6 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { Long checkSsrc = device.isSsrcCheck() ? Long.parseLong(ssrc) : 0L; SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamReplace != null ? streamReplace : streamId); - if (presetSSRC == null) { - ssrcInfo.setAllocatedSsrc(ssrc); - } openRtpServer(mediaServer, ssrcInfo, checkSsrc, !channel.isHasAudio(), false, tcpMode, callback); addAuthenticateInfo(streamId, streamReplace, channel.isHasAudio(), record, null); return ssrcInfo; @@ -212,7 +201,6 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { Long checkSsrc = device.isSsrcCheck() ? Long.parseLong(ssrc) : 0L; SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamReplace != null ? streamReplace : streamId); - ssrcInfo.setAllocatedSsrc(ssrc); openRtpServer(mediaServer, ssrcInfo, checkSsrc, !channel.isHasAudio(), false, tcpMode, callback); addAuthenticateInfo(streamId, streamReplace, channel.isHasAudio(), false,null); return ssrcInfo; @@ -255,7 +243,6 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { Long checkSsrc = device.isSsrcCheck() ? Long.parseLong(ssrc) : 0L; SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamId); - ssrcInfo.setAllocatedSsrc(ssrc); openRtpServer(mediaServer, ssrcInfo, checkSsrc, !channel.isHasAudio(), false, tcpMode, callback); long difference = DateUtil.getDifference(startTime, endTime) / 1000; @@ -294,7 +281,6 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { String ssrc = ssrcFactory.getPlaySsrc(mediaServer.getId()); SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamId); - ssrcInfo.setAllocatedSsrc(ssrc); openRtpServer(mediaServer, ssrcInfo, 0L, false, true, tcpMode, callback); return ssrcInfo; } @@ -310,11 +296,6 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { openRTPServerResult.setSsrcInfo(ssrcInfo); callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), openRTPServerResult); } else { - // 释放ssrc - if (ssrcInfo.getAllocatedSsrc() != null) { - ssrcFactory.releaseSsrc(mediaServer.getId(), ssrcInfo.getAllocatedSsrc()); - ssrcInfo.setAllocatedSsrc(null); - } OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult(); openRTPServerResult.setSsrcInfo(ssrcInfo); callback.run(code, msg, openRTPServerResult); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcSendRtpController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcSendRtpController.java index ee7d04bf1..323caa3b0 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcSendRtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcSendRtpController.java @@ -5,7 +5,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; -import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; +import com.genersoft.iot.vmp.gb28181.session.SendSsrcFactory; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; @@ -25,7 +25,7 @@ import org.springframework.stereotype.Component; public class RedisRpcSendRtpController extends RpcController { @Autowired - private SSRCFactory ssrcFactory; + private SendSsrcFactory sendSsrcFactory; @Autowired private IMediaServerService mediaServerService; @@ -71,10 +71,8 @@ public class RedisRpcSendRtpController extends RpcController { sendRtpItem.setServerId(userSetting.getServerId()); sendRtpItem.setLocalIp(mediaServerItem.getSdpIp()); if (sendRtpItem.getSsrc() == null) { - // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 - String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId()); - sendRtpItem.setSsrc(ssrc); - sendRtpItem.setAllocatedSsrc(ssrc); + sendRtpItem.setSsrc(sendSsrcFactory.getSendSsrc( + "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? "0" : "1")); } sendRtpServerService.update(sendRtpItem); RedisRpcResponse response = request.getResponse(); @@ -173,9 +171,7 @@ public class RedisRpcSendRtpController extends RpcController { return; } sendRtpServerService.delete(sendRtpItem); - if (sendRtpItem.getMediaServerId() != null) { - mediaServerService.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrcToRelease()); - } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java index 5c7927568..9eed25c87 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java @@ -8,7 +8,7 @@ import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest; import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; -import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; +import com.genersoft.iot.vmp.gb28181.session.SendSsrcFactory; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; @@ -31,7 +31,7 @@ import org.springframework.stereotype.Component; public class RedisRpcStreamPushController extends RpcController { @Autowired - private SSRCFactory ssrcFactory; + private SendSsrcFactory sendSsrcFactory; @Autowired private IMediaServerService mediaServerService; @@ -73,10 +73,8 @@ public class RedisRpcStreamPushController extends RpcController { log.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() ); // 读取redis中的上级点播信息,生成sendRtpItm发送出去 if (sendRtpItem.getSsrc() == null) { - // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 - String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServer.getId()) : ssrcFactory.getPlayBackSsrc(mediaServer.getId()); - sendRtpItem.setSsrc(ssrc); - sendRtpItem.setAllocatedSsrc(ssrc); + sendRtpItem.setSsrc(sendSsrcFactory.getSendSsrc( + "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? "0" : "1")); } sendRtpItem.setMediaServerId(mediaServer.getId()); sendRtpItem.setLocalIp(mediaServer.getSdpIp()); @@ -93,10 +91,8 @@ public class RedisRpcStreamPushController extends RpcController { log.info("[redis-rpc] 监听流上线,流已上线: {}/{}, 目标地址: {}:{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort()); // 读取redis中的上级点播信息,生成sendRtpItm发送出去 if (sendRtpItem.getSsrc() == null) { - // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 - String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(hookData.getMediaServer().getId()) : ssrcFactory.getPlayBackSsrc(hookData.getMediaServer().getId()); - sendRtpItem.setSsrc(ssrc); - sendRtpItem.setAllocatedSsrc(ssrc); + sendRtpItem.setSsrc(sendSsrcFactory.getSendSsrc( + "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? "0" : "1")); } sendRtpItem.setMediaServerId(hookData.getMediaServer().getId()); sendRtpItem.setLocalIp(hookData.getMediaServer().getSdpIp()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index 7c8242bca..641cf6aa8 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -12,7 +12,7 @@ import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelListForRpcParam; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; -import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; +import com.genersoft.iot.vmp.gb28181.session.SendSsrcFactory; import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; @@ -44,7 +44,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService { private HookSubscribe hookSubscribe; @Autowired - private SSRCFactory ssrcFactory; + private SendSsrcFactory sendSsrcFactory; @Autowired private RedisTemplate redisTemplate; @@ -107,10 +107,8 @@ public class RedisRpcServiceImpl implements IRedisRpcService { // 读取redis中的上级点播信息,生成sendRtpItm发送出去 if (sendRtpItem.getSsrc() == null) { - // 上级平台点播时不使用上级平台指定的ssrc,使用自定义的ssrc,参考国标文档-点播外域设备媒体流SSRC处理方式 - String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(hookData.getMediaServer().getId()) : ssrcFactory.getPlayBackSsrc(hookData.getMediaServer().getId()); - sendRtpItem.setSsrc(ssrc); - sendRtpItem.setAllocatedSsrc(ssrc); + sendRtpItem.setSsrc(sendSsrcFactory.getSendSsrc( + "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? "0" : "1")); } sendRtpItem.setMediaServerId(hookData.getMediaServer().getId()); sendRtpItem.setLocalIp(hookData.getMediaServer().getSdpIp()); diff --git a/src/test/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactoryTest.java b/src/test/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactoryTest.java new file mode 100644 index 000000000..eaf5fb007 --- /dev/null +++ b/src/test/java/com/genersoft/iot/vmp/gb28181/session/SSRCFactoryTest.java @@ -0,0 +1,176 @@ +package com.genersoft.iot.vmp.gb28181.session; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.springframework.test.util.ReflectionTestUtils; + +import java.lang.reflect.Field; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.*; + +class SSRCFactoryTest { + + private SSRCFactory ssrcFactory; + + private static final String DOMAIN_PART = "20000"; + private static final String SERVER_ID = "test-server"; + + @BeforeEach + void setUp() throws Exception { + ssrcFactory = new SSRCFactory(); + ReflectionTestUtils.setField(ssrcFactory, "domainPart", DOMAIN_PART); + + Field schedulerField = SSRCFactory.class.getDeclaredField("scheduler"); + schedulerField.setAccessible(true); + java.util.concurrent.ScheduledExecutorService scheduler = + (java.util.concurrent.ScheduledExecutorService) schedulerField.get(ssrcFactory); + scheduler.shutdownNow(); + } + + @Test + void getPlaySsrc_shouldReturnCorrectFormat() { + String ssrc = ssrcFactory.getPlaySsrc(SERVER_ID); + assertNotNull(ssrc); + assertEquals(10, ssrc.length(), "SSRC should be 10 characters: prefix(1) + domain(5) + seq(4)"); + assertTrue(ssrc.startsWith("0"), "Play SSRC should start with '0'"); + assertTrue(ssrc.substring(1).startsWith(DOMAIN_PART), "SSRC should contain domain part"); + assertTrue(ssrc.matches("0" + DOMAIN_PART + "\\d{4}"), "SSRC format: 0" + DOMAIN_PART + "NNNN"); + } + + @Test + void getPlayBackSsrc_shouldReturnCorrectFormat() { + String ssrc = ssrcFactory.getPlayBackSsrc(SERVER_ID); + assertNotNull(ssrc); + assertEquals(10, ssrc.length(), "SSRC should be 10 characters: prefix(1) + domain(5) + seq(4)"); + assertTrue(ssrc.startsWith("1"), "PlayBack SSRC should start with '1'"); + assertTrue(ssrc.substring(1).startsWith(DOMAIN_PART), "SSRC should contain domain part"); + assertTrue(ssrc.matches("1" + DOMAIN_PART + "\\d{4}"), "SSRC format: 1" + DOMAIN_PART + "NNNN"); + } + + @Test + void allocations_withinSameServer_shouldBeUnique() { + Set allocated = new HashSet<>(); + for (int i = 0; i < 1000; i++) { + String ssrc = ssrcFactory.getPlaySsrc(SERVER_ID); + assertNotNull(ssrc, "Should allocate SSRC #" + i); + assertTrue(allocated.add(ssrc), "SSRC should be unique: " + ssrc); + } + assertEquals(1000, allocated.size()); + } + + @Test + void allocations_forDifferentServers_shouldBeIndependent() { + String serverA = "server-a"; + String serverB = "server-b"; + + for (int i = 0; i < 10000; i++) { + assertNotNull(ssrcFactory.getPlaySsrc(serverA), "Server A should allocate SSRC #" + i); + } + assertNull(ssrcFactory.getPlaySsrc(serverA), "Server A should be exhausted"); + + for (int i = 0; i < 1000; i++) { + assertNotNull(ssrcFactory.getPlaySsrc(serverB), "Server B should allocate SSRC #" + i); + } + } + + @Test + void exhaustion_shouldReturnNull() { + for (int i = 0; i < 10000; i++) { + assertNotNull(ssrcFactory.getPlaySsrc(SERVER_ID), "iteration " + i); + } + assertNull(ssrcFactory.getPlaySsrc(SERVER_ID), "Should return null when exhausted"); + assertNull(ssrcFactory.getPlayBackSsrc(SERVER_ID), "Should return null for PlayBack too"); + } + + @Test + @Disabled("Needs mocked mediaServerService for ZLM query") + void rebuild_shouldResetUsage() { + for (int i = 0; i < 500; i++) { + ssrcFactory.getPlaySsrc(SERVER_ID); + } + ssrcFactory.rebuild(); + + for (int i = 0; i < 500; i++) { + String ssrc = ssrcFactory.getPlaySsrc(SERVER_ID); + assertNotNull(ssrc, "After rebuild should allocate SSRC #" + i); + } + } + + @Test + void allocateAll_shouldUseAll10000Slots() { + Set allocated = new HashSet<>(); + for (int i = 0; i < 10000; i++) { + String ssrc = ssrcFactory.getPlaySsrc(SERVER_ID); + assertNotNull(ssrc, "Should allocate at iteration " + i); + allocated.add(ssrc); + } + assertEquals(10000, allocated.size(), "All 10000 slots should be unique"); + } + + @Test + void twoPrefixes_shareSamePool() throws Exception { + for (int i = 0; i < 5000; i++) { + assertNotNull(ssrcFactory.getPlaySsrc(SERVER_ID), "play #" + i); + assertNotNull(ssrcFactory.getPlayBackSsrc(SERVER_ID), "playback #" + i); + } + + Field usedMapField = SSRCFactory.class.getDeclaredField("usedMap"); + usedMapField.setAccessible(true); + java.util.concurrent.ConcurrentHashMap usedMap = + (java.util.concurrent.ConcurrentHashMap) usedMapField.get(ssrcFactory); + java.util.BitSet bits = usedMap.get(SERVER_ID); + assertNotNull(bits); + assertEquals(10000, bits.cardinality(), "All 10000 bits should be set"); + } + + @Test + void multipleServers_shouldNotAffectEachOther() { + String server1 = "server-1"; + String server2 = "server-2"; + String server3 = "server-3"; + + for (int i = 0; i < 10000; i++) { + ssrcFactory.getPlaySsrc(server1); + } + assertNull(ssrcFactory.getPlaySsrc(server1)); + + assertNotNull(ssrcFactory.getPlaySsrc(server2)); + assertNotNull(ssrcFactory.getPlaySsrc(server3)); + + for (int i = 0; i < 100; i++) { + ssrcFactory.getPlaySsrc(server2); + ssrcFactory.getPlaySsrc(server3); + } + assertNull(ssrcFactory.getPlaySsrc(server1)); + } + + @Test + void linearProbe_skipsUsedSlots() throws Exception { + Field usedMapField = SSRCFactory.class.getDeclaredField("usedMap"); + usedMapField.setAccessible(true); + java.util.concurrent.ConcurrentHashMap usedMap = + (java.util.concurrent.ConcurrentHashMap) usedMapField.get(ssrcFactory); + java.util.BitSet bits = new java.util.BitSet(10000); + for (int i = 0; i < 100; i++) { + bits.set(i); + } + usedMap.put(SERVER_ID, bits); + + String ssrc = ssrcFactory.getPlaySsrc(SERVER_ID); + assertNotNull(ssrc, "Should find a free slot via linear probe"); + int suffix = Integer.parseInt(ssrc.substring(6)); + assertTrue(suffix >= 100, "Should skip used slots 0-99, got suffix " + suffix); + } + + @Test + void ssrc_shouldBeDifferentEachCall() { + Set results = new HashSet<>(); + for (int i = 0; i < 100; i++) { + results.add(ssrcFactory.getPlaySsrc(SERVER_ID)); + } + assertEquals(100, results.size(), "All 100 calls should return different SSRCs"); + } +}