重构SSRC管理逻辑

This commit is contained in:
panlinlin 2026-05-21 00:05:48 +08:00
parent e545b00a8f
commit ead059fd01
21 changed files with 321 additions and 341 deletions

View File

@ -10,8 +10,6 @@ public class InviteMessageInfo {
private String sourceChannelId;
private String sessionName;
private String ssrc;
private String allocatedSsrc;
private String allocatedSsrcMediaServerId;
private boolean tcp;
private boolean tcpActive;
private String callId;

View File

@ -24,11 +24,6 @@ public class SendRtpInfo {
*/
private String ssrc;
/**
* 从SSRC池中分配的SSRC
*/
private String allocatedSsrc;
/**
* 目标平台或设备的编号
*/
@ -253,7 +248,5 @@ public class SendRtpInfo {
}
}
public String getSsrcToRelease() {
return allocatedSsrc;
}
}

View File

@ -47,11 +47,6 @@ public class SsrcTransaction {
*/
private String ssrc;
/**
* 从SSRC池中分配的SSRC
*/
private String allocatedSsrc;
/**
* 事务信息
*/
@ -94,7 +89,5 @@ public class SsrcTransaction {
public SsrcTransaction() {
}
public String getSsrcToRelease() {
return allocatedSsrc;
}
}

View File

@ -266,7 +266,6 @@ public class DeviceServiceImpl implements IDeviceService {
List<SsrcTransaction> ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(device.getDeviceId());
if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) {
for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrcToRelease());
receiveRtpServerService.closeRTPServerByMediaServerId(ssrcTransaction.getMediaServerId(), ssrcTransaction.getApp(), ssrcTransaction.getStream());
sessionManager.removeByCallId(ssrcTransaction.getCallId());
}

View File

@ -128,7 +128,6 @@ public class PlatformServiceImpl implements IPlatformService {
}
sendRtpServerService.delete(sendRtpItem);
if (mediaServerItem != null) {
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrcToRelease());
boolean stopResult = mediaServerService.initStopSendRtp(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
if (stopResult) {
Platform platform = queryPlatformByServerGBId(sendRtpItem.getTargetId());
@ -335,7 +334,6 @@ public class PlatformServiceImpl implements IPlatformService {
if (sendRtpItem != null && sendRtpItem.getApp().equals(event.getApp()) && sendRtpItem.isSendToPlatform()) {
Platform platform = platformMapper.getParentPlatByServerGBId(sendRtpItem.getTargetId());
CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId());
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrcToRelease());
try {
commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel);
} catch (SipException | InvalidArgumentException | ParseException e) {
@ -522,7 +520,6 @@ public class PlatformServiceImpl implements IPlatformService {
List<SendRtpInfo> sendRtpItems = sendRtpServerService.queryForPlatform(platformId);
if (sendRtpItems != null && !sendRtpItems.isEmpty()) {
for (SendRtpInfo sendRtpItem : sendRtpItems) {
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrcToRelease());
sendRtpServerService.delete(sendRtpItem);
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), null);
@ -626,7 +623,6 @@ public class PlatformServiceImpl implements IPlatformService {
log.error("[点播超时] 发送BYE失败 {}", e.getMessage());
} finally {
timeoutCallback.run(1, "收流超时");
mediaServerService.releaseSsrc(mediaServerItem.getId(), data.getSsrcInfo().getSsrcToRelease());
receiveRtpServerService.closeRTPServer(mediaServerItem, data.getSsrcInfo().getApp(), data.getSsrcInfo().getStream());
sessionManager.removeByStream(data.getSsrcInfo().getApp(), data.getSsrcInfo().getStream());
}
@ -702,7 +698,6 @@ public class PlatformServiceImpl implements IPlatformService {
// ssrc检验
// 更新ssrc
log.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
releaseAllocatedSsrc(mediaServerItem, ssrcInfo);
Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse);
if (!result) {
try {
@ -711,8 +706,6 @@ public class PlatformServiceImpl implements IPlatformService {
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage());
} finally {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrcToRelease());
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
@ -724,7 +717,6 @@ public class PlatformServiceImpl implements IPlatformService {
}
}else {
ssrcInfo.setSsrc(ssrcInResponse);
updateSsrcTransaction(ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse, null);
inviteInfo.setSsrcInfo(ssrcInfo);
inviteInfo.setStream(ssrcInfo.getStream());
if (userSetting.getBroadcastForPlatform().equalsIgnoreCase("TCP-ACTIVE")) {
@ -738,9 +730,7 @@ public class PlatformServiceImpl implements IPlatformService {
inviteStreamService.updateInviteInfo(inviteInfo);
}
}else {
releaseAllocatedSsrc(mediaServerItem, ssrcInfo);
ssrcInfo.setSsrc(ssrcInResponse);
updateSsrcTransaction(ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse, null);
inviteInfo.setSsrcInfo(ssrcInfo);
inviteInfo.setStream(ssrcInfo.getStream());
if (userSetting.getBroadcastForPlatform().equalsIgnoreCase("TCP-ACTIVE")) {
@ -761,7 +751,6 @@ public class PlatformServiceImpl implements IPlatformService {
if (ssrcTransaction == null) {
return;
}
releaseAllocatedSsrc(mediaServerItem, ssrcInfo);
sessionManager.removeByStream(ssrcInfo.getApp(), inviteInfo.getStream());
inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse);
@ -770,7 +759,6 @@ public class PlatformServiceImpl implements IPlatformService {
ssrcTransaction.setApp(ssrcInfo.getApp());
ssrcTransaction.setStream(inviteInfo.getStream());
ssrcTransaction.setSsrc(ssrcInResponse);
ssrcTransaction.setAllocatedSsrc(null);
ssrcTransaction.setMediaServerId(mediaServerItem.getId());
ssrcTransaction.setSipTransactionInfo(new SipTransactionInfo((SIPResponse) responseEvent.getResponse()));
ssrcTransaction.setType(inviteSessionType);
@ -781,25 +769,6 @@ public class PlatformServiceImpl implements IPlatformService {
}
}
private void releaseAllocatedSsrc(MediaServer mediaServerItem, SSRCInfo ssrcInfo) {
if (ssrcInfo == null || ssrcInfo.getAllocatedSsrc() == null) {
return;
}
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getAllocatedSsrc());
ssrcInfo.setAllocatedSsrc(null);
}
private void updateSsrcTransaction(String app, String stream, String ssrc, String allocatedSsrc) {
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(app, stream);
if (ssrcTransaction == null) {
return;
}
ssrcTransaction.setSsrc(ssrc);
ssrcTransaction.setAllocatedSsrc(allocatedSsrc);
sessionManager.put(ssrcTransaction);
}
private void tcpActiveHandler(Platform platform, CommonGBChannel channel, String contentString,
MediaServer mediaServerItem, boolean ssrcCheck,
SSRCInfo ssrcInfo, ErrorCallback<Object> callback){
@ -830,8 +799,6 @@ public class PlatformServiceImpl implements IPlatformService {
} catch (SdpException e) {
log.error("[TCP主动连接对方] serverGbId: {}, channelId: {}, 解析200OK的SDP信息失败", platform.getServerGBId(), channel.getGbDeviceId(), e);
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrcToRelease());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
@ -855,8 +822,6 @@ public class PlatformServiceImpl implements IPlatformService {
receiveRtpServerService.closeRTPServer(mediaServerItem, app, stream);
InviteInfo inviteInfo = inviteStreamService.getInviteInfo(null, channel.getGbId(), stream);
if (inviteInfo != null) {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), inviteInfo.getSsrcInfo().getSsrcToRelease());
inviteStreamService.removeInviteInfo(inviteInfo);
}
sessionManager.removeByStream(app, stream);

View File

@ -13,6 +13,7 @@ import com.genersoft.iot.vmp.gb28181.event.SipSubscribe;
import com.genersoft.iot.vmp.gb28181.service.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.SendSsrcFactory;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
@ -109,6 +110,9 @@ public class PlayServiceImpl implements IPlayService {
@Autowired
private SSRCFactory ssrcFactory;
@Autowired
private SendSsrcFactory sendSsrcFactory;
@Autowired
private IPlatformService platformService;
@ -335,9 +339,6 @@ public class PlayServiceImpl implements IPlayService {
InviteInfo inviteInfoInCatch = inviteStreamService.getInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId());
if (inviteInfoInCatch != null ) {
if (inviteInfoInCatch.getStreamInfo() == null) {
// 释放生成的ssrc使用上一次申请的
ssrcFactory.releaseSsrc(mediaServer.getId(), ssrc);
// 点播发起了但是尚未成功, 仅注册回调等待结果即可
inviteStreamService.once(InviteSessionType.PLAY, channel.getId(), null, callback);
log.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId({}): {}", device.getDeviceId(), channel.getDeviceId(), channel.getId());
@ -480,25 +481,23 @@ public class PlayServiceImpl implements IPlayService {
private void talk(MediaServer mediaServerItem, Device device, DeviceChannel channel, String stream,
SipSubscribe.Event errorEvent, Runnable timeoutCallback, AudioBroadcastEvent audioEvent) {
String playSsrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId());
String ySsrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId());
if (playSsrc == null) {
if (ySsrc == null) {
audioEvent.call("ssrc已经用尽");
return;
}
String sendSsrc = sendSsrcFactory.getSendSsrc("0");
SendRtpInfo sendRtpInfo;
try {
sendRtpInfo = sendRtpServerService.createSendRtpInfo(mediaServerItem, null, null, playSsrc, device.getDeviceId(), MediaStreamUtil.GB28181_TALK, stream,
sendRtpInfo = sendRtpServerService.createSendRtpInfo(mediaServerItem, null, null, sendSsrc, device.getDeviceId(), MediaStreamUtil.GB28181_TALK, stream,
channel.getId(), true, false);
if (sendRtpInfo == null) {
ssrcFactory.releaseSsrc(mediaServerItem.getId(), playSsrc);
audioEvent.call("获取发流端口失败");
return;
}
sendRtpInfo.setAllocatedSsrc(playSsrc);
sendRtpInfo.setPlayType(InviteStreamType.TALK);
}catch (PlayException e) {
ssrcFactory.releaseSsrc(mediaServerItem.getId(), playSsrc);
log.info("[语音对讲]开始 获取发流端口失败 deviceId: {}, channelId: {},", device.getDeviceId(), channel.getDeviceId());
return;
}
@ -525,7 +524,6 @@ public class PlayServiceImpl implements IPlayService {
log.error("[语音对讲]超时, 发送BYE失败 {}", e.getMessage());
} finally {
timeoutCallback.run();
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease());
sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream());
}
}, userSetting.getPlayTimeout());
@ -534,7 +532,6 @@ public class PlayServiceImpl implements IPlayService {
Integer localPort = mediaServerService.startSendRtpPassive(mediaServerItem, sendRtpInfo, userSetting.getPlayTimeout() * 1000);
if (localPort == null || localPort <= 0) {
timeoutCallback.run();
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease());
sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream());
return;
}
@ -543,7 +540,6 @@ public class PlayServiceImpl implements IPlayService {
receiveRtpServerService.addAuthenticateInfoForGb28181Talk(mediaServerItem, sendRtpInfo.getStream());
}catch (ControllerException e) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease());
log.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channel.getDeviceId());
audioEvent.call("失败, " + e.getMessage());
// 查看是否已经建立了通道存在则发送bye
@ -553,7 +549,7 @@ public class PlayServiceImpl implements IPlayService {
// 查看设备是否已经在推流
try {
cmder.talkStreamCmd(mediaServerItem, sendRtpInfo, device, channel, callId, (hookData) -> {
cmder.talkStreamCmd(mediaServerItem, sendRtpInfo, ySsrc, device, channel, callId, (hookData) -> {
log.info("[语音对讲] 流已生成, 开始推流: " + hookData);
dynamicTask.stop(timeOutTaskKey);
// TODO 暂不做处理
@ -588,8 +584,6 @@ public class PlayServiceImpl implements IPlayService {
}, (event) -> {
dynamicTask.stop(timeOutTaskKey);
receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease());
sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream());
errorEvent.response(event);
}, userSetting.getPlayTimeout().longValue());
@ -598,9 +592,6 @@ public class PlayServiceImpl implements IPlayService {
log.error("[命令发送失败] 对讲消息: {}", e.getMessage());
dynamicTask.stop(timeOutTaskKey);
receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease());
sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream());
SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult();
eventResult.type = SipSubscribe.EventResultType.cmdSendFailEvent;
@ -875,7 +866,6 @@ public class PlayServiceImpl implements IPlayService {
// ssrc检验
// 更新ssrc
log.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
releaseAllocatedSsrc(mediaServerItem, ssrcInfo);
Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse);
if (!result) {
try {
@ -885,8 +875,6 @@ public class PlayServiceImpl implements IPlayService {
log.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage());
}
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrcToRelease());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
@ -897,7 +885,6 @@ public class PlayServiceImpl implements IPlayService {
}else {
ssrcInfo.setSsrc(ssrcInResponse);
updateSsrcTransaction(ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse, null);
inviteInfo.setSsrcInfo(ssrcInfo);
inviteInfo.setStream(ssrcInfo.getStream());
if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
@ -910,9 +897,7 @@ public class PlayServiceImpl implements IPlayService {
inviteStreamService.updateInviteInfo(inviteInfo);
}
} else {
releaseAllocatedSsrc(mediaServerItem, ssrcInfo);
ssrcInfo.setSsrc(ssrcInResponse);
updateSsrcTransaction(ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse, null);
inviteInfo.setSsrcInfo(ssrcInfo);
inviteInfo.setStream(ssrcInfo.getStream());
if (device.getStreamMode().equalsIgnoreCase("TCP-ACTIVE")) {
@ -932,14 +917,12 @@ public class PlayServiceImpl implements IPlayService {
if (ssrcTransaction == null) {
return;
}
releaseAllocatedSsrc(mediaServerItem, ssrcInfo);
sessionManager.removeByStream(MediaStreamUtil.RTP_APP, inviteInfo.getStream());
inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse);
ssrcTransaction.setDeviceId(device.getDeviceId());
ssrcTransaction.setChannelId(ssrcTransaction.getChannelId());
ssrcTransaction.setCallId(ssrcTransaction.getCallId());
ssrcTransaction.setSsrc(ssrcInResponse);
ssrcTransaction.setAllocatedSsrc(null);
ssrcTransaction.setApp(MediaStreamUtil.RTP_APP);
ssrcTransaction.setStream(inviteInfo.getStream());
ssrcTransaction.setMediaServerId(mediaServerItem.getId());
@ -952,24 +935,6 @@ public class PlayServiceImpl implements IPlayService {
}
}
private void releaseAllocatedSsrc(MediaServer mediaServerItem, SSRCInfo ssrcInfo) {
if (ssrcInfo == null || ssrcInfo.getAllocatedSsrc() == null) {
return;
}
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getAllocatedSsrc());
ssrcInfo.setAllocatedSsrc(null);
}
private void updateSsrcTransaction(String app, String stream, String ssrc, String allocatedSsrc) {
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(app, stream);
if (ssrcTransaction == null) {
return;
}
ssrcTransaction.setSsrc(ssrc);
ssrcTransaction.setAllocatedSsrc(allocatedSsrc);
sessionManager.put(ssrcTransaction);
}
@Override
public void download(Device device, DeviceChannel channel, String startTime, String endTime, int downloadSpeed, ErrorCallback<StreamInfo> callback) {
@ -1603,8 +1568,6 @@ public class PlayServiceImpl implements IPlayService {
mediaServerService.stopSendRtp(mediaServer, sendRtpInfo.getApp(), sendRtpInfo.getStream(), sendRtpInfo.getSsrc());
}
ssrcFactory.releaseSsrc(mediaServerId, sendRtpInfo.getSsrcToRelease());
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream());
if (ssrcTransaction != null) {
try {

View File

@ -1,143 +1,108 @@
package com.genersoft.iot.vmp.gb28181.session;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
import com.genersoft.iot.vmp.media.zlm.dto.ZLMResult;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
/**
* ssrc使用
*/
@Slf4j
@Component
public class SSRCFactory {
/**
* 播流最大并发个数
*/
private static final Integer MAX_STREAM_COUNT = 10000;
/**
* 播流最大并发个数
*/
private static final String SSRC_INFO_KEY = "VMP_SSRC_INFO_";
private final ConcurrentHashMap<String, BitSet> usedMap = new ConcurrentHashMap<>();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "ssrc-rebuild");
t.setDaemon(true);
return t;
});
@Autowired
private StringRedisTemplate redisTemplate;
private ZLMRESTfulUtils zlmresTfulUtils;
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private SipConfig sipConfig;
@Autowired
private UserSetting userSetting;
private String domainPart;
@PostConstruct
public void init() {
String sipDomain = sipConfig.getDomain();
domainPart = sipDomain.length() >= 8 ? sipDomain.substring(3, 8) : sipDomain;
scheduler.scheduleAtFixedRate(this::rebuild, 10, 30, TimeUnit.SECONDS);
}
public void initMediaServerSSRC(String mediaServerId, Set<String> usedSet) {
String ssrcPrefix = getSsrcPrefix();
String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId;
List<String> ssrcList = new ArrayList<>();
for (int i = 1; i < MAX_STREAM_COUNT; i++) {
String ssrc = String.format("%s%04d", ssrcPrefix, i);
public String getPlaySsrc(String mediaServerId) {
String suffix = allocate(mediaServerId);
return suffix != null ? "0" + suffix : null;
}
if (null == usedSet || !usedSet.contains(ssrc)) {
ssrcList.add(ssrc);
public String getPlayBackSsrc(String mediaServerId) {
String suffix = allocate(mediaServerId);
return suffix != null ? "1" + suffix : null;
}
private String allocate(String mediaServerId) {
BitSet bits = usedMap.computeIfAbsent(mediaServerId, k -> new BitSet(10000));
int start = ThreadLocalRandom.current().nextInt(10000);
int index = start;
do {
if (!bits.get(index)) {
bits.set(index);
return domainPart + String.format("%04d", index);
}
index = (index + 1) % 10000;
} while (index != start);
log.warn("[SSRC] 媒体节点 {} 的SSRC已用尽", mediaServerId);
return null;
}
void rebuild() {
List<MediaServer> servers = mediaServerService.getAll();
for (MediaServer server : servers) {
BitSet bits = new BitSet(10000);
int count = 0;
try {
ZLMResult<?> result = zlmresTfulUtils.getMediaList(server, null, null, "rtsp", null);
if (result != null && result.getCode() == 0 && result.getData() != null) {
List<JSONObject> list = (List<JSONObject>) result.getData();
for (JSONObject obj : list) {
if (obj.getIntValue("originType") != 3) continue;
String originUrl = obj.getString("originUrl");
if (originUrl == null) continue;
int idx = originUrl.lastIndexOf("/rtp/");
if (idx == -1) continue;
try {
int suffix = (int) (Long.parseLong(originUrl.substring(idx + 5), 16) % 10000);
bits.set(suffix);
count++;
} catch (NumberFormatException ignored) {
}
}
}
} catch (Exception e) {
log.warn("[SSRC重建] 查询媒体节点 {} 失败: {}", server.getId(), e.getMessage());
}
usedMap.put(server.getId(), bits);
if (log.isDebugEnabled()) {
log.debug("[SSRC重建] 节点 {} 已占用 {} 个SSRC", server.getId(), count);
}
}
if (redisTemplate.opsForSet().size(redisKey) != null) {
redisTemplate.delete(redisKey);
}
redisTemplate.opsForSet().add(redisKey, ssrcList.toArray(new String[0]));
}
/**
* 获取视频预览的SSRC值,第一位固定为0
*
* @return ssrc
*/
public String getPlaySsrc(String mediaServerId) {
return "0" + getSN(mediaServerId);
}
/**
* 获取录像回放的SSRC值,第一位固定为1
*/
public String getPlayBackSsrc(String mediaServerId) {
return "1" + getSN(mediaServerId);
}
/**
* 释放ssrc主要用完的ssrc一定要释放否则会耗尽
*
* @param ssrc 需要重置的ssrc
*/
public void releaseSsrc(String mediaServerId, String ssrc) {
if (ssrc == null) {
return;
}
if (!isFactorySsrc(ssrc)) {
log.warn("[释放 SSRC] 忽略非SSRC池分配的值: {}", ssrc);
return;
}
String sn = ssrc.substring(1);
log.debug("[释放 SSRC] SSRC:{} -> SN: {}", ssrc, sn);
String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId;
redisTemplate.opsForSet().add(redisKey, sn);
}
/**
* 获取后四位数SN,随机数
*/
private String getSN(String mediaServerId) {
String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId;
Long size = redisTemplate.opsForSet().size(redisKey);
if (size == null || size == 0) {
log.info("[获取 SSRC 失败] redisKey {}", redisKey);
throw new RuntimeException("ssrc已经用完");
} else {
// 在集合中移除并返回一个随机成员
return redisTemplate.opsForSet().pop(redisKey);
}
}
/**
* 重置一个流媒体服务的所有ssrc
*
* @param mediaServerId 流媒体服务ID
*/
public void reset(String mediaServerId) {
this.initMediaServerSSRC(mediaServerId, null);
}
/**
* 是否已经存在了某个MediaServer的SSRC信息
*
* @param mediaServerId 流媒体服务ID
*/
public boolean hasMediaServerSSRC(String mediaServerId) {
String redisKey = SSRC_INFO_KEY + userSetting.getServerId() + "_" + mediaServerId;
return Boolean.TRUE.equals(redisTemplate.hasKey(redisKey));
}
private String getSsrcPrefix() {
String sipDomain = sipConfig.getDomain();
return sipDomain.length() >= 8 ? sipDomain.substring(3, 8) : sipDomain;
}
private boolean isFactorySsrc(String ssrc) {
if (ssrc.length() < 2) {
return false;
}
String sn = ssrc.substring(1);
String ssrcPrefix = getSsrcPrefix();
return sn.length() == ssrcPrefix.length() + 4 && sn.startsWith(ssrcPrefix);
}
}

View File

@ -0,0 +1,29 @@
package com.genersoft.iot.vmp.gb28181.session;
import com.genersoft.iot.vmp.conf.SipConfig;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.ThreadLocalRandom;
@Slf4j
@Component
public class SendSsrcFactory {
@Autowired
private SipConfig sipConfig;
private String domainPart;
@PostConstruct
public void init() {
String sipDomain = sipConfig.getDomain();
domainPart = sipDomain.length() >= 8 ? sipDomain.substring(3, 8) : sipDomain;
}
public String getSendSsrc(String prefix) {
return prefix + domainPart + String.format("%04d", ThreadLocalRandom.current().nextInt(10000));
}
}

View File

@ -94,7 +94,7 @@ public interface ISIPCommander {
*/
void streamByeCmd(Device device, String channelId, String app, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;
void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, Device device, DeviceChannel channelId, String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException;
void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, String ySsrc, Device device, DeviceChannel channelId, String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException;
void streamByeCmd(Device device, String channelId, SipTransactionInfo sipTransactionInfo, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;

View File

@ -281,7 +281,6 @@ public class SIPCommander implements ISIPCommander {
Request request = headerProvider.createInviteRequest(device, channel.getDeviceId(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(),sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> {
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrcToRelease());
errorEvent.response(e);
}), e -> {
ResponseEvent responseEvent = (ResponseEvent) e.event;
@ -290,7 +289,6 @@ public class SIPCommander implements ISIPCommander {
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(),
callId,ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInfo.getSsrc(), mediaServerItem.getId(), response,
InviteSessionType.PLAY);
ssrcTransaction.setAllocatedSsrc(ssrcInfo.getAllocatedSsrc());
sessionManager.put(ssrcTransaction);
okEvent.response(e);
}, timeout);
@ -388,7 +386,6 @@ public class SIPCommander implements ISIPCommander {
channel.getId(), sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),
device.getTransport()).getCallId(), ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInfo.getSsrc(),
mediaServerItem.getId(), response, InviteSessionType.PLAYBACK);
ssrcTransaction.setAllocatedSsrc(ssrcInfo.getAllocatedSsrc());
sessionManager.put(ssrcTransaction);
okEvent.response(event);
}, timeout);
@ -482,14 +479,13 @@ public class SIPCommander implements ISIPCommander {
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(),
response.getCallIdHeader().getCallId(), ssrcInfo.getApp(), ssrcInfo.getStream(), ssrc,
mediaServerItem.getId(), response, InviteSessionType.DOWNLOAD);
ssrcTransaction.setAllocatedSsrc(ssrcInfo.getAllocatedSsrc());
sessionManager.put(ssrcTransaction);
okEvent.response(event);
}, timeout);
}
@Override
public void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, Device device, DeviceChannel channel,
public void talkStreamCmd(MediaServer mediaServerItem, SendRtpInfo sendRtpItem, String ySsrc, Device device, DeviceChannel channel,
String callId, HookSubscribe.Event event, HookSubscribe.Event eventForPush, SipSubscribe.Event okEvent,
SipSubscribe.Event errorEvent, Long timeout) throws InvalidArgumentException, SipException, ParseException {
@ -535,22 +531,20 @@ public class SIPCommander implements ISIPCommander {
content.append("a=sendrecv\r\n");
content.append("a=rtpmap:8 PCMA/8000\r\n");
content.append("y=" + sendRtpItem.getSsrc() + "\r\n");//ssrc
content.append("y=" + ySsrc + "\r\n");//ssrc
// f字段:f= v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
content.append("f=v/////a/1/8/1" + "\r\n");
Request request = headerProvider.createInviteRequest(device, channel.getDeviceId(), content.toString(),
SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, sendRtpItem.getSsrc(), callIdHeader);
SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ySsrc, callIdHeader);
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> {
sessionManager.removeByStream(sendRtpItem.getApp(), sendRtpItem.getStream());
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrcToRelease());
errorEvent.response(e);
}), e -> {
// 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
ResponseEvent responseEvent = (ResponseEvent) e.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), MediaStreamUtil.GB28181_TALK,sendRtpItem.getApp(), stream, sendRtpItem.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.TALK);
ssrcTransaction.setAllocatedSsrc(sendRtpItem.getAllocatedSsrc());
sessionManager.put(ssrcTransaction);
okEvent.response(e);
}, timeout);

View File

@ -642,7 +642,6 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
String mediaServerId = sendRtpItem.getMediaServerId();
MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem != null) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrcToRelease());
receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
}
SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem, channel);
@ -744,14 +743,12 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
callIdHeader);
sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), request, (e -> {
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
mediaServerService.releaseSsrc(mediaServer.getId(), ssrcInfo.getSsrcToRelease());
errorEvent.response(e);
}), e -> {
ResponseEvent responseEvent = (ResponseEvent) e.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForPlatform(platform.getServerGBId(), channel.getGbId(),
callIdHeader.getCallId(), ssrcInfo.getApp(), stream, ssrcInfo.getSsrc(), mediaServer.getId(), response, InviteSessionType.BROADCAST);
ssrcTransaction.setAllocatedSsrc(ssrcInfo.getAllocatedSsrc());
sessionManager.put(ssrcTransaction);
okEvent.response(e);
});

View File

@ -131,9 +131,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
sendRtpServerService.deleteByCallId(callIdHeader.getCallId());
if (mediaServer != null) {
mediaServerService.stopSendRtp(mediaServer, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
if (userSetting.getUseCustomSsrcForParentInvite()) {
mediaServerService.releaseSsrc(mediaServer.getId(), sendRtpItem.getSsrcToRelease());
}
}
}
}else {
@ -143,9 +140,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
MediaServer mediaInfo = mediaServerService.getOne(sendRtpItem.getMediaServerId());
sendRtpServerService.delete(sendRtpItem);
mediaServerService.stopSendRtp(mediaInfo, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
if (userSetting.getUseCustomSsrcForParentInvite()) {
mediaServerService.releaseSsrc(mediaInfo.getId(), sendRtpItem.getSsrcToRelease());
}
}
if (sendRtpItem.getServerId().equals(userSetting.getServerId())) {
MediaServer mediaServer = mediaServerService.getOne(sendRtpItem.getMediaServerId());
@ -251,11 +245,6 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
}
break;
}
// 释放ssrc
MediaServer mediaServerItem = mediaServerService.getOne(ssrcTransaction.getMediaServerId());
if (mediaServerItem != null) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcTransaction.getSsrcToRelease());
}
sessionManager.removeByCallId(ssrcTransaction.getCallId());
}
}

View File

@ -9,7 +9,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.service.*;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.SendSsrcFactory;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.transmit.SIPProcessorObserver;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
@ -39,7 +39,6 @@ import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.header.CallIdHeader;
import javax.sip.message.Response;
import java.security.SecureRandom;
import java.text.ParseException;
import java.util.List;
import java.util.Vector;
@ -103,7 +102,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
private UserSetting userSetting;
@Autowired
private SSRCFactory ssrcFactory;
private SendSsrcFactory sendSsrcFactory;
@Override
@ -175,22 +174,14 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// 点播成功 TODO 可以在此处检测cancel命令是否存在存在则不发送
if (userSetting.getUseCustomSsrcForParentInvite()) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
MediaServer mediaServer = mediaServerService.getOne(streamInfo.getMediaServer().getId());
if (mediaServer != null) {
String ssrc = "Play".equalsIgnoreCase(finalInviteInfo.getSessionName())
? ssrcFactory.getPlaySsrc(streamInfo.getMediaServer().getId())
: ssrcFactory.getPlayBackSsrc(streamInfo.getMediaServer().getId());
finalInviteInfo.setSsrc(ssrc);
finalInviteInfo.setAllocatedSsrc(ssrc);
finalInviteInfo.setAllocatedSsrcMediaServerId(streamInfo.getMediaServer().getId());
}
finalInviteInfo.setSsrc(sendSsrcFactory.getSendSsrc(
"Play".equalsIgnoreCase(finalInviteInfo.getSessionName()) ? "0" : "1"));
}
// 构建sendRTP内容
SendRtpInfo sendRtpItem = sendRtpServerService.createSendRtpInfo(streamInfo.getMediaServer(),
finalInviteInfo.getIp(), finalInviteInfo.getPort(), finalInviteInfo.getSsrc(), platform.getServerGBId(),
streamInfo.getApp(), streamInfo.getStream(),
channel.getGbId(), finalInviteInfo.isTcp(), platform.isRtcp());
sendRtpItem.setAllocatedSsrc(finalInviteInfo.getAllocatedSsrc());
if (finalInviteInfo.isTcp() && finalInviteInfo.isTcpActive()) {
sendRtpItem.setTcpActive(true);
}
@ -208,7 +199,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
// 超时未收到Ack应该回复bye,当前等待时间为10秒
dynamicTask.startDelay(finalInviteInfo.getCallId(), () -> {
log.info("[Ack ] 等待超时, {}/{}", finalInviteInfo.getCallId(), channel.getGbDeviceId());
mediaServerService.releaseSsrc(streamInfo.getMediaServer().getId(), sendRtpItem.getSsrcToRelease());
// 回复bye
sendBye(platform, finalInviteInfo.getCallId());
}, 60 * 1000);
@ -249,7 +239,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
log.error("[命令发送失败] invite BAD_REQUEST: {}", sendException.getMessage());
}
} catch (PlayException e) {
releaseAllocatedSsrc(inviteInfo);
try {
responseAck(request, e.getCode(), e.getMsg());
} catch (SipException | InvalidArgumentException | ParseException sendException) {
@ -257,7 +246,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
} catch (Exception e) {
log.error("[Invite处理异常] ", e);
releaseAllocatedSsrc(inviteInfo);
try {
responseAck(request, Response.SERVER_INTERNAL_ERROR, "");
} catch (SipException | InvalidArgumentException | ParseException sendException) {
@ -266,15 +254,6 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
}
}
private void releaseAllocatedSsrc(InviteMessageInfo inviteInfo) {
if (inviteInfo == null || inviteInfo.getAllocatedSsrc() == null || inviteInfo.getAllocatedSsrcMediaServerId() == null) {
return;
}
mediaServerService.releaseSsrc(inviteInfo.getAllocatedSsrcMediaServerId(), inviteInfo.getAllocatedSsrc());
inviteInfo.setAllocatedSsrc(null);
inviteInfo.setAllocatedSsrcMediaServerId(null);
}
private InviteMessageInfo decode(RequestEvent evt) throws SdpException {
InviteMessageInfo inviteInfo = new InviteMessageInfo();
@ -499,7 +478,7 @@ public class InviteRequestProcessor extends SIPRequestProcessorParent implements
SessionDescription sdp = gb28181Sdp.getBaseSdb();
if (ObjectUtils.isEmpty(gb28181Sdp.getSsrc()) ) {
String ssrc = Integer.toUnsignedString(new SecureRandom().nextInt());
String ssrc = sendSsrcFactory.getSendSsrc("0");
log.warn("来自设备的Invite请求未携带SSRC生成随机ssrc: {}requesterId {}/{}", ssrc, inviteInfo.getRequesterId(), inviteInfo.getSourceChannelId());
gb28181Sdp.setSsrc(ssrc);
}

View File

@ -49,8 +49,6 @@ public interface IMediaServerService {
void removeCount(String mediaServerId);
void releaseSsrc(String mediaServerItemId, String ssrc);
void clearMediaServerForOnline();
void add(MediaServer mediaSerItem);

View File

@ -9,7 +9,6 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.bean.TalkRtpInfo;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.bean.RecordInfo;
@ -57,9 +56,6 @@ import java.util.*;
@Service
public class MediaServerServiceImpl implements IMediaServerService {
@Autowired
private SSRCFactory ssrcFactory;
@Autowired
private UserSetting userSetting;
@ -150,10 +146,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
if (ObjectUtils.isEmpty(mediaServer.getId())) {
continue;
}
// 更新
if (!ssrcFactory.hasMediaServerSSRC(mediaServer.getId())) {
ssrcFactory.initMediaServerSSRC(mediaServer.getId(), null);
}
// 查询redis是否存在此mediaServer
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId();
Boolean hasKey = redisTemplate.hasKey(key);
@ -229,21 +221,11 @@ public class MediaServerServiceImpl implements IMediaServerService {
return mediaNodeServerService.updateRtpServerSSRC(mediaServer, app, streamId, ssrc);
}
@Override
public void releaseSsrc(String mediaServerId, String ssrc) {
MediaServer mediaServer = getOne(mediaServerId);
if (mediaServer == null || ssrc == null) {
return;
}
ssrcFactory.releaseSsrc(mediaServerId, ssrc);
}
/**
* 媒体服务节点 重启后重置他的推流信息 TODO 给正在使用的设备发送停止命令
*/
@Override
public void clearRTPServer(MediaServer mediaServer) {
ssrcFactory.reset(mediaServer.getId());
}
@Override
@ -254,12 +236,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
mediaServerMapper.add(mediaServer);
}
MediaServer mediaServerInRedis = getOne(mediaServer.getId());
if (mediaServerInRedis == null || !ssrcFactory.hasMediaServerSSRC(mediaServer.getId())) {
ssrcFactory.initMediaServerSSRC(mediaServer.getId(),null);
}
String key = VideoManagerConstants.MEDIA_SERVER_PREFIX + userSetting.getServerId();
redisTemplate.opsForHash().put(key, mediaServer.getId(), mediaServer);
if (mediaServer.isStatus()) {

View File

@ -7,7 +7,6 @@ public class SSRCInfo {
private int port;
private String ssrc;
private String allocatedSsrc;
private String app;
private String stream;
@ -18,8 +17,4 @@ public class SSRCInfo {
this.stream = stream;
}
public String getSsrcToRelease() {
return allocatedSsrc;
}
}

View File

@ -106,9 +106,6 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
}
SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamId);
if (presetSSRC == null) {
ssrcInfo.setAllocatedSsrc(ssrc);
}
RTPServerParam rtpServerParam = new RTPServerParam(mediaServer, MediaStreamUtil.RTP_APP, streamId, ssrcCheck ? Long.parseLong(ssrc): 0L, null, onlyAuto, disableAuto, false, tcpMode);
int rtpServerPort = openCommonRTPServer(rtpServerParam, ((code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
@ -117,11 +114,6 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
openRTPServerResult.setSsrcInfo(ssrcInfo);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), openRTPServerResult);
} else {
// 释放ssrc
if (presetSSRC == null) {
ssrcFactory.releaseSsrc(mediaServer.getId(), ssrc);
ssrcInfo.setAllocatedSsrc(null);
}
OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult();
openRTPServerResult.setSsrcInfo(ssrcInfo);
callback.run(code, msg, openRTPServerResult);
@ -170,9 +162,6 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
Long checkSsrc = device.isSsrcCheck() ? Long.parseLong(ssrc) : 0L;
SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamReplace != null ? streamReplace : streamId);
if (presetSSRC == null) {
ssrcInfo.setAllocatedSsrc(ssrc);
}
openRtpServer(mediaServer, ssrcInfo, checkSsrc, !channel.isHasAudio(), false, tcpMode, callback);
addAuthenticateInfo(streamId, streamReplace, channel.isHasAudio(), record, null);
return ssrcInfo;
@ -212,7 +201,6 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
Long checkSsrc = device.isSsrcCheck() ? Long.parseLong(ssrc) : 0L;
SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamReplace != null ? streamReplace : streamId);
ssrcInfo.setAllocatedSsrc(ssrc);
openRtpServer(mediaServer, ssrcInfo, checkSsrc, !channel.isHasAudio(), false, tcpMode, callback);
addAuthenticateInfo(streamId, streamReplace, channel.isHasAudio(), false,null);
return ssrcInfo;
@ -255,7 +243,6 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
Long checkSsrc = device.isSsrcCheck() ? Long.parseLong(ssrc) : 0L;
SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamId);
ssrcInfo.setAllocatedSsrc(ssrc);
openRtpServer(mediaServer, ssrcInfo, checkSsrc, !channel.isHasAudio(), false, tcpMode, callback);
long difference = DateUtil.getDifference(startTime, endTime) / 1000;
@ -294,7 +281,6 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
String ssrc = ssrcFactory.getPlaySsrc(mediaServer.getId());
SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamId);
ssrcInfo.setAllocatedSsrc(ssrc);
openRtpServer(mediaServer, ssrcInfo, 0L, false, true, tcpMode, callback);
return ssrcInfo;
}
@ -310,11 +296,6 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
openRTPServerResult.setSsrcInfo(ssrcInfo);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), openRTPServerResult);
} else {
// 释放ssrc
if (ssrcInfo.getAllocatedSsrc() != null) {
ssrcFactory.releaseSsrc(mediaServer.getId(), ssrcInfo.getAllocatedSsrc());
ssrcInfo.setAllocatedSsrc(null);
}
OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult();
openRTPServerResult.setSsrcInfo(ssrcInfo);
callback.run(code, msg, openRTPServerResult);

View File

@ -5,7 +5,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.SendSsrcFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
@ -25,7 +25,7 @@ import org.springframework.stereotype.Component;
public class RedisRpcSendRtpController extends RpcController {
@Autowired
private SSRCFactory ssrcFactory;
private SendSsrcFactory sendSsrcFactory;
@Autowired
private IMediaServerService mediaServerService;
@ -71,10 +71,8 @@ public class RedisRpcSendRtpController extends RpcController {
sendRtpItem.setServerId(userSetting.getServerId());
sendRtpItem.setLocalIp(mediaServerItem.getSdpIp());
if (sendRtpItem.getSsrc() == null) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServerItem.getId()) : ssrcFactory.getPlayBackSsrc(mediaServerItem.getId());
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setAllocatedSsrc(ssrc);
sendRtpItem.setSsrc(sendSsrcFactory.getSendSsrc(
"Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? "0" : "1"));
}
sendRtpServerService.update(sendRtpItem);
RedisRpcResponse response = request.getResponse();
@ -173,9 +171,7 @@ public class RedisRpcSendRtpController extends RpcController {
return;
}
sendRtpServerService.delete(sendRtpItem);
if (sendRtpItem.getMediaServerId() != null) {
mediaServerService.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrcToRelease());
}
}
}

View File

@ -8,7 +8,7 @@ import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcRequest;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.SendSsrcFactory;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
@ -31,7 +31,7 @@ import org.springframework.stereotype.Component;
public class RedisRpcStreamPushController extends RpcController {
@Autowired
private SSRCFactory ssrcFactory;
private SendSsrcFactory sendSsrcFactory;
@Autowired
private IMediaServerService mediaServerService;
@ -73,10 +73,8 @@ public class RedisRpcStreamPushController extends RpcController {
log.info("[redis-rpc] 监听流上线时发现流已存在直接返回: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort() );
// 读取redis中的上级点播信息生成sendRtpItm发送出去
if (sendRtpItem.getSsrc() == null) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(mediaServer.getId()) : ssrcFactory.getPlayBackSsrc(mediaServer.getId());
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setAllocatedSsrc(ssrc);
sendRtpItem.setSsrc(sendSsrcFactory.getSendSsrc(
"Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? "0" : "1"));
}
sendRtpItem.setMediaServerId(mediaServer.getId());
sendRtpItem.setLocalIp(mediaServer.getSdpIp());
@ -93,10 +91,8 @@ public class RedisRpcStreamPushController extends RpcController {
log.info("[redis-rpc] 监听流上线,流已上线: {}/{}, 目标地址: {}{}", sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getIp(), sendRtpItem.getPort());
// 读取redis中的上级点播信息生成sendRtpItm发送出去
if (sendRtpItem.getSsrc() == null) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(hookData.getMediaServer().getId()) : ssrcFactory.getPlayBackSsrc(hookData.getMediaServer().getId());
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setAllocatedSsrc(ssrc);
sendRtpItem.setSsrc(sendSsrcFactory.getSendSsrc(
"Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? "0" : "1"));
}
sendRtpItem.setMediaServerId(hookData.getMediaServer().getId());
sendRtpItem.setLocalIp(hookData.getMediaServer().getSdpIp());

View File

@ -12,7 +12,7 @@ import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcResponse;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelListForRpcParam;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.SendSsrcFactory;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
@ -44,7 +44,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
private HookSubscribe hookSubscribe;
@Autowired
private SSRCFactory ssrcFactory;
private SendSsrcFactory sendSsrcFactory;
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@ -107,10 +107,8 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
// 读取redis中的上级点播信息生成sendRtpItm发送出去
if (sendRtpItem.getSsrc() == null) {
// 上级平台点播时不使用上级平台指定的ssrc使用自定义的ssrc参考国标文档-点播外域设备媒体流SSRC处理方式
String ssrc = "Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? ssrcFactory.getPlaySsrc(hookData.getMediaServer().getId()) : ssrcFactory.getPlayBackSsrc(hookData.getMediaServer().getId());
sendRtpItem.setSsrc(ssrc);
sendRtpItem.setAllocatedSsrc(ssrc);
sendRtpItem.setSsrc(sendSsrcFactory.getSendSsrc(
"Play".equalsIgnoreCase(sendRtpItem.getSessionName()) ? "0" : "1"));
}
sendRtpItem.setMediaServerId(hookData.getMediaServer().getId());
sendRtpItem.setLocalIp(hookData.getMediaServer().getSdpIp());

View File

@ -0,0 +1,176 @@
package com.genersoft.iot.vmp.gb28181.session;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.springframework.test.util.ReflectionTestUtils;
import java.lang.reflect.Field;
import java.util.HashSet;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.*;
class SSRCFactoryTest {
private SSRCFactory ssrcFactory;
private static final String DOMAIN_PART = "20000";
private static final String SERVER_ID = "test-server";
@BeforeEach
void setUp() throws Exception {
ssrcFactory = new SSRCFactory();
ReflectionTestUtils.setField(ssrcFactory, "domainPart", DOMAIN_PART);
Field schedulerField = SSRCFactory.class.getDeclaredField("scheduler");
schedulerField.setAccessible(true);
java.util.concurrent.ScheduledExecutorService scheduler =
(java.util.concurrent.ScheduledExecutorService) schedulerField.get(ssrcFactory);
scheduler.shutdownNow();
}
@Test
void getPlaySsrc_shouldReturnCorrectFormat() {
String ssrc = ssrcFactory.getPlaySsrc(SERVER_ID);
assertNotNull(ssrc);
assertEquals(10, ssrc.length(), "SSRC should be 10 characters: prefix(1) + domain(5) + seq(4)");
assertTrue(ssrc.startsWith("0"), "Play SSRC should start with '0'");
assertTrue(ssrc.substring(1).startsWith(DOMAIN_PART), "SSRC should contain domain part");
assertTrue(ssrc.matches("0" + DOMAIN_PART + "\\d{4}"), "SSRC format: 0" + DOMAIN_PART + "NNNN");
}
@Test
void getPlayBackSsrc_shouldReturnCorrectFormat() {
String ssrc = ssrcFactory.getPlayBackSsrc(SERVER_ID);
assertNotNull(ssrc);
assertEquals(10, ssrc.length(), "SSRC should be 10 characters: prefix(1) + domain(5) + seq(4)");
assertTrue(ssrc.startsWith("1"), "PlayBack SSRC should start with '1'");
assertTrue(ssrc.substring(1).startsWith(DOMAIN_PART), "SSRC should contain domain part");
assertTrue(ssrc.matches("1" + DOMAIN_PART + "\\d{4}"), "SSRC format: 1" + DOMAIN_PART + "NNNN");
}
@Test
void allocations_withinSameServer_shouldBeUnique() {
Set<String> allocated = new HashSet<>();
for (int i = 0; i < 1000; i++) {
String ssrc = ssrcFactory.getPlaySsrc(SERVER_ID);
assertNotNull(ssrc, "Should allocate SSRC #" + i);
assertTrue(allocated.add(ssrc), "SSRC should be unique: " + ssrc);
}
assertEquals(1000, allocated.size());
}
@Test
void allocations_forDifferentServers_shouldBeIndependent() {
String serverA = "server-a";
String serverB = "server-b";
for (int i = 0; i < 10000; i++) {
assertNotNull(ssrcFactory.getPlaySsrc(serverA), "Server A should allocate SSRC #" + i);
}
assertNull(ssrcFactory.getPlaySsrc(serverA), "Server A should be exhausted");
for (int i = 0; i < 1000; i++) {
assertNotNull(ssrcFactory.getPlaySsrc(serverB), "Server B should allocate SSRC #" + i);
}
}
@Test
void exhaustion_shouldReturnNull() {
for (int i = 0; i < 10000; i++) {
assertNotNull(ssrcFactory.getPlaySsrc(SERVER_ID), "iteration " + i);
}
assertNull(ssrcFactory.getPlaySsrc(SERVER_ID), "Should return null when exhausted");
assertNull(ssrcFactory.getPlayBackSsrc(SERVER_ID), "Should return null for PlayBack too");
}
@Test
@Disabled("Needs mocked mediaServerService for ZLM query")
void rebuild_shouldResetUsage() {
for (int i = 0; i < 500; i++) {
ssrcFactory.getPlaySsrc(SERVER_ID);
}
ssrcFactory.rebuild();
for (int i = 0; i < 500; i++) {
String ssrc = ssrcFactory.getPlaySsrc(SERVER_ID);
assertNotNull(ssrc, "After rebuild should allocate SSRC #" + i);
}
}
@Test
void allocateAll_shouldUseAll10000Slots() {
Set<String> allocated = new HashSet<>();
for (int i = 0; i < 10000; i++) {
String ssrc = ssrcFactory.getPlaySsrc(SERVER_ID);
assertNotNull(ssrc, "Should allocate at iteration " + i);
allocated.add(ssrc);
}
assertEquals(10000, allocated.size(), "All 10000 slots should be unique");
}
@Test
void twoPrefixes_shareSamePool() throws Exception {
for (int i = 0; i < 5000; i++) {
assertNotNull(ssrcFactory.getPlaySsrc(SERVER_ID), "play #" + i);
assertNotNull(ssrcFactory.getPlayBackSsrc(SERVER_ID), "playback #" + i);
}
Field usedMapField = SSRCFactory.class.getDeclaredField("usedMap");
usedMapField.setAccessible(true);
java.util.concurrent.ConcurrentHashMap<String, java.util.BitSet> usedMap =
(java.util.concurrent.ConcurrentHashMap<String, java.util.BitSet>) usedMapField.get(ssrcFactory);
java.util.BitSet bits = usedMap.get(SERVER_ID);
assertNotNull(bits);
assertEquals(10000, bits.cardinality(), "All 10000 bits should be set");
}
@Test
void multipleServers_shouldNotAffectEachOther() {
String server1 = "server-1";
String server2 = "server-2";
String server3 = "server-3";
for (int i = 0; i < 10000; i++) {
ssrcFactory.getPlaySsrc(server1);
}
assertNull(ssrcFactory.getPlaySsrc(server1));
assertNotNull(ssrcFactory.getPlaySsrc(server2));
assertNotNull(ssrcFactory.getPlaySsrc(server3));
for (int i = 0; i < 100; i++) {
ssrcFactory.getPlaySsrc(server2);
ssrcFactory.getPlaySsrc(server3);
}
assertNull(ssrcFactory.getPlaySsrc(server1));
}
@Test
void linearProbe_skipsUsedSlots() throws Exception {
Field usedMapField = SSRCFactory.class.getDeclaredField("usedMap");
usedMapField.setAccessible(true);
java.util.concurrent.ConcurrentHashMap<String, java.util.BitSet> usedMap =
(java.util.concurrent.ConcurrentHashMap<String, java.util.BitSet>) usedMapField.get(ssrcFactory);
java.util.BitSet bits = new java.util.BitSet(10000);
for (int i = 0; i < 100; i++) {
bits.set(i);
}
usedMap.put(SERVER_ID, bits);
String ssrc = ssrcFactory.getPlaySsrc(SERVER_ID);
assertNotNull(ssrc, "Should find a free slot via linear probe");
int suffix = Integer.parseInt(ssrc.substring(6));
assertTrue(suffix >= 100, "Should skip used slots 0-99, got suffix " + suffix);
}
@Test
void ssrc_shouldBeDifferentEachCall() {
Set<String> results = new HashSet<>();
for (int i = 0; i < 100; i++) {
results.add(ssrcFactory.getPlaySsrc(SERVER_ID));
}
assertEquals(100, results.size(), "All 100 calls should return different SSRCs");
}
}