统一使用MediaApp枚举,调整关闭逻辑

This commit is contained in:
lin 2026-02-21 23:14:55 +08:00
parent d65255deb4
commit a9dc6057b2
19 changed files with 205 additions and 255 deletions

View File

@ -2,7 +2,11 @@ package com.genersoft.iot.vmp.common.enums;
public class MediaApp {
public final static String GB28181 = "rtp";
public final static String GB28181_TALK = "talk";
public final static String GB28181_BROADCAST = "broadcast";
public final static String JT1078 = "1078";
public static boolean isKeywords(String app) {
return GB28181.equals(app) || GB28181_TALK.equals(app) || GB28181_BROADCAST.equals(app) || JT1078.equals(app);
}
}

View File

@ -28,6 +28,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IReceiveRtpServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
@ -108,6 +109,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private IReceiveRtpServerService receiveRtpServerService;
@Autowired
private AudioBroadcastManager audioBroadcastManager;
@ -248,7 +252,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) {
for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getApp(), ssrcTransaction.getStream());
receiveRtpServerService.closeRTPServerByMediaServerId(ssrcTransaction.getMediaServerId(), ssrcTransaction.getApp(), ssrcTransaction.getStream());
sessionManager.removeByCallId(ssrcTransaction.getCallId());
}
}

View File

@ -29,6 +29,7 @@ import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IReceiveRtpServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
@ -111,6 +112,9 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
@Autowired
private ISendRtpServerService sendRtpServerService;
@Autowired
private IReceiveRtpServerService receiveRtpServerService;
@Autowired
private PlatformStatusTaskRunner statusTaskRunner;
@ -619,7 +623,37 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
} else {
tcpMode = 0;
}
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, MediaApp.GB28181, streamId, null, ssrcCheck, false, null, true, false, false, tcpMode);
SSRCInfo ssrcInfo = receiveRtpServerService.openGbRTPServer(mediaServerItem, streamId, null, tcpMode,
false, ssrcCheck, true, false, ((code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && data != null && data.getHookData() != null) {
log.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channel.getGbDeviceId());
HookData hookData = data.getHookData();
// hook响应
onPublishHandlerForBroadcast(hookData.getMediaServer(), hookData.getMediaInfo(), platform, channel);
// 收到流
if (hookEvent != null) {
hookEvent.response(hookData);
}
}else {
InviteInfo inviteInfoForBroadcast = inviteStreamService.getInviteInfo(InviteSessionType.BROADCAST, channel.getGbId(), null);
if (inviteInfoForBroadcast == null) {
log.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {}", platform.getServerGBId(), channel.getGbDeviceId());
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
try {
commanderForPlatform.streamByeCmd(platform, channel, data.getSsrcInfo().getApp(), data.getSsrcInfo().getStream(), null, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
log.error("[点播超时] 发送BYE失败 {}", e.getMessage());
} finally {
timeoutCallback.run(1, "收流超时");
mediaServerService.releaseSsrc(mediaServerItem.getId(), data.getSsrcInfo().getSsrc());
receiveRtpServerService.closeRTPServer(mediaServerItem, data.getSsrcInfo().getApp(), data.getSsrcInfo().getStream());
sessionManager.removeByStream(data.getSsrcInfo().getApp(), data.getSsrcInfo().getStream());
}
}
}
}));
if (ssrcInfo == null || ssrcInfo.getPort() < 0) {
log.info("[国标级联] 发起语音喊话 开启端口监听失败, platform: {}, channel {}", platform.getServerGBId(), channel.getGbDeviceId());
SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>();
@ -637,37 +671,8 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), InviteSessionType.BROADCAST,
InviteSessionStatus.ready, userSetting.getRecordSip());
inviteStreamService.updateInviteInfo(inviteInfo);
String timeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(timeOutTaskKey, () -> {
// 执行超时任务时查询是否已经成功成功了则不执行超时任务防止超时任务取消失败的情况
InviteInfo inviteInfoForBroadcast = inviteStreamService.getInviteInfo(InviteSessionType.BROADCAST, channel.getGbId(), null);
if (inviteInfoForBroadcast == null) {
log.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", platform.getServerGBId(), channel.getGbDeviceId(), ssrcInfo.getPort(), ssrcInfo.getSsrc());
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
try {
commanderForPlatform.streamByeCmd(platform, channel, ssrcInfo.getApp(), ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
log.error("[点播超时] 发送BYE失败 {}", e.getMessage());
} finally {
timeoutCallback.run(1, "收流超时");
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
}
}
}, userSetting.getPlayTimeout());
commanderForPlatform.broadcastInviteCmd(platform, channel,sourceId, mediaServerItem, ssrcInfo, (hookData)->{
log.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channel.getGbDeviceId());
dynamicTask.stop(timeOutTaskKey);
// hook响应
onPublishHandlerForBroadcast(hookData.getMediaServer(), hookData.getMediaInfo(), platform, channel);
// 收到流
if (hookEvent != null) {
hookEvent.response(hookData);
}
}, event -> {
inviteOKHandler(event, ssrcInfo, tcpMode, ssrcCheck, mediaServerItem, platform, channel, timeOutTaskKey,
commanderForPlatform.broadcastInviteCmd(platform, channel,sourceId, mediaServerItem, ssrcInfo, event -> {
inviteOKHandler(event, ssrcInfo, tcpMode, ssrcCheck, mediaServerItem, platform, channel,
null, inviteInfo, InviteSessionType.BROADCAST);
}, eventResult -> {
// 收到错误回复
@ -690,7 +695,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
}
private void inviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, int tcpMode, boolean ssrcCheck, MediaServer mediaServerItem,
Platform platform, CommonGBChannel channel, String timeOutTaskKey, ErrorCallback<Object> callback,
Platform platform, CommonGBChannel channel, ErrorCallback<Object> callback,
InviteInfo inviteInfo, InviteSessionType inviteSessionType){
inviteInfo.setStatus(InviteSessionStatus.ok);
ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
@ -706,7 +711,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
// 多端口
if (tcpMode == 2) {
tcpActiveHandler(platform, channel, contentString, mediaServerItem, tcpMode, ssrcCheck,
timeOutTaskKey, ssrcInfo, callback);
ssrcInfo, callback);
}
}else {
// 单端口
@ -732,20 +737,18 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
commanderForPlatform.streamByeCmd(platform, channel, ssrcInfo.getApp(), ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage());
} finally {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
inviteStreamService.call(inviteSessionType, channel.getGbId(), null,
InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
}
dynamicTask.stop(timeOutTaskKey);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
inviteStreamService.call(inviteSessionType, channel.getGbId(), null,
InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
}else {
ssrcInfo.setSsrc(ssrcInResponse);
inviteInfo.setSsrcInfo(ssrcInfo);
@ -753,7 +756,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
if (tcpMode == 2) {
if (mediaServerItem.isRtpEnable()) {
tcpActiveHandler(platform, channel, contentString, mediaServerItem, tcpMode, ssrcCheck,
timeOutTaskKey, ssrcInfo, callback);
ssrcInfo, callback);
}else {
log.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
}
@ -767,7 +770,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
if (tcpMode == 2) {
if (mediaServerItem.isRtpEnable()) {
tcpActiveHandler(platform, channel, contentString, mediaServerItem, tcpMode, ssrcCheck,
timeOutTaskKey, ssrcInfo, callback);
ssrcInfo, callback);
}else {
log.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
}
@ -800,7 +803,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
private void tcpActiveHandler(Platform platform, CommonGBChannel channel, String contentString,
MediaServer mediaServerItem, int tcpMode, boolean ssrcCheck,
String timeOutTaskKey, SSRCInfo ssrcInfo, ErrorCallback<Object> callback){
SSRCInfo ssrcInfo, ErrorCallback<Object> callback){
if (tcpMode != 2) {
return;
}
@ -831,11 +834,9 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
log.info("[TCP主动连接对方] 结果: {}", result);
} catch (SdpException e) {
log.error("[TCP主动连接对方] serverGbId: {}, channelId: {}, 解析200OK的SDP信息失败", platform.getServerGBId(), channel.getGbDeviceId(), e);
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream());
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
@ -856,7 +857,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.warn("[消息发送失败] 停止语音对讲, 平台:{},通道:{}", platform.getId(), channel.getGbDeviceId() );
} finally {
mediaServerService.closeRTPServer(mediaServerItem, app, stream);
receiveRtpServerService.closeRTPServer(mediaServerItem, app, stream);
InviteInfo inviteInfo = inviteStreamService.getInviteInfo(null, channel.getGbId(), stream);
if (inviteInfo != null) {
// 释放ssrc

View File

@ -133,7 +133,7 @@ public class PlayServiceImpl implements IPlayService {
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
if ("broadcast".equals(event.getApp()) || "talk".equals(event.getApp())) {
if (MediaApp.GB28181_BROADCAST.equals(event.getApp()) || MediaApp.GB28181_TALK.equals(event.getApp())) {
if (event.getStream().indexOf("_") > 0) {
String[] streamArray = event.getStream().split("_");
if (streamArray.length == 2) {
@ -149,7 +149,7 @@ public class PlayServiceImpl implements IPlayService {
log.info("[语音对讲/喊话] 未找到通道:{}", channelId);
return;
}
if ("broadcast".equals(event.getApp())) {
if (MediaApp.GB28181_BROADCAST.equals(event.getApp())) {
if (audioBroadcastManager.exit(channel.getId())) {
stopAudioBroadcast(device, channel);
}
@ -160,7 +160,7 @@ public class PlayServiceImpl implements IPlayService {
} catch (InvalidArgumentException | ParseException | SipException e) {
log.error("[命令发送失败] 语音对讲: {}", e.getMessage());
}
}else if ("talk".equals(event.getApp())) {
}else if (MediaApp.GB28181_TALK.equals(event.getApp())) {
// 开启语音对讲通道
talkCmd(device, channel, event.getMediaServer(), event.getStream(), (msg) -> log.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId));
}
@ -205,7 +205,7 @@ public class PlayServiceImpl implements IPlayService {
}
}
if ("broadcast".equals(event.getApp()) || "talk".equals(event.getApp())) {
if (MediaApp.GB28181_BROADCAST.equals(event.getApp()) || MediaApp.GB28181_TALK.equals(event.getApp())) {
if (event.getStream().indexOf("_") > 0) {
String[] streamArray = event.getStream().split("_");
if (streamArray.length == 2) {
@ -221,9 +221,9 @@ public class PlayServiceImpl implements IPlayService {
log.info("[语音对讲/喊话] 未找到通道:{}", channelId);
return;
}
if ("broadcast".equals(event.getApp())) {
if (MediaApp.GB28181_BROADCAST.equals(event.getApp())) {
stopAudioBroadcast(device, channel);
}else if ("talk".equals(event.getApp())) {
}else if (MediaApp.GB28181_TALK.equals(event.getApp())) {
stopTalk(device, channel, false);
}
}
@ -483,8 +483,7 @@ public class PlayServiceImpl implements IPlayService {
private void talk(MediaServer mediaServerItem, Device device, DeviceChannel channel, String stream,
HookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
Runnable timeoutCallback, AudioBroadcastEvent audioEvent) {
SipSubscribe.Event errorEvent, Runnable timeoutCallback, AudioBroadcastEvent audioEvent) {
String playSsrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId());
@ -494,7 +493,7 @@ public class PlayServiceImpl implements IPlayService {
}
SendRtpInfo sendRtpInfo;
try {
sendRtpInfo = sendRtpServerService.createSendRtpInfo(mediaServerItem, null, null, playSsrc, device.getDeviceId(), "talk", stream,
sendRtpInfo = sendRtpServerService.createSendRtpInfo(mediaServerItem, null, null, playSsrc, device.getDeviceId(), MediaApp.GB28181_TALK, stream,
channel.getId(), true, false);
sendRtpInfo.setPlayType(InviteStreamType.TALK);
}catch (PlayException e) {
@ -583,7 +582,7 @@ public class PlayServiceImpl implements IPlayService {
}, (event) -> {
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream());
receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc());
sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream());
@ -593,7 +592,7 @@ public class PlayServiceImpl implements IPlayService {
log.error("[命令发送失败] 对讲消息: {}", e.getMessage());
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream());
receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc());
@ -1228,7 +1227,7 @@ public class PlayServiceImpl implements IPlayService {
if (broadcastMode == null) {
broadcastMode = true;
}
String app = broadcastMode?"broadcast":"talk";
String app = broadcastMode ? MediaApp.GB28181_BROADCAST : MediaApp.GB28181_TALK;
String stream = device.getDeviceId() + "_" + deviceChannel.getDeviceId();
AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult();
audioBroadcastResult.setApp(app);
@ -1530,7 +1529,7 @@ public class PlayServiceImpl implements IPlayService {
SendRtpInfo sendRtpInfo = sendRtpServerService.queryByChannelId(channel.getId(), device.getDeviceId());
if (sendRtpInfo != null) {
MediaServer mediaServer = mediaServerService.getOne(sendRtpInfo.getMediaServerId());
Boolean streamReady = mediaServerService.isStreamReady(mediaServer, MediaApp.GB28181, sendRtpInfo.getReceiveStream());
Boolean streamReady = mediaServerService.isStreamReady(mediaServer, MediaApp.GB28181_TALK, sendRtpInfo.getReceiveStream());
if (streamReady) {
log.warn("[语音对讲] 进行中: {}", channel.getDeviceId());
event.call("语音对讲进行中");
@ -1540,9 +1539,7 @@ public class PlayServiceImpl implements IPlayService {
}
}
talk(mediaServerItem, device, channel, stream, (hookData) -> {
log.info("[语音对讲] 收到设备发来的流");
}, eventResult -> {
talk(mediaServerItem, device, channel, stream, eventResult -> {
log.warn("[语音对讲] 失败,{}/{}, 错误码 {} {}", device.getDeviceId(), channel.getDeviceId(), eventResult.statusCode, eventResult.msg);
event.call("失败,错误码 " + eventResult.statusCode + ", " + eventResult.msg);
}, () -> {

View File

@ -147,8 +147,7 @@ public interface ISIPCommanderForPlatform {
void streamByeCmd(Platform platform, CommonGBChannel channel, String app, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;
void broadcastInviteCmd(Platform platform, CommonGBChannel channel, String sourceId, MediaServer mediaServerItem,
SSRCInfo ssrcInfo, HookSubscribe.Event event, SipSubscribe.Event okEvent,
SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException;
SSRCInfo ssrcInfo, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException;
void broadcastResultCmd(Platform platform, CommonGBChannel deviceChannel, String sn, boolean result, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException;
}

View File

@ -550,7 +550,7 @@ public class SIPCommander implements ISIPCommander {
// 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
ResponseEvent responseEvent = (ResponseEvent) e.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), "talk",sendRtpItem.getApp(), stream, sendRtpItem.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.TALK);
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), MediaApp.GB28181_TALK,sendRtpItem.getApp(), stream, sendRtpItem.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.TALK);
sessionManager.put(ssrcTransaction);
okEvent.response(e);
}, timeout);

View File

@ -19,6 +19,7 @@ import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IReceiveRtpServerService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@ -61,6 +62,9 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private IReceiveRtpServerService receiveRtpServerService;
@Autowired
private SipLayer sipLayer;
@ -642,7 +646,7 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem != null) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
}
SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem, channel);
if (byeRequest == null) {
@ -664,8 +668,6 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
throw new SsrcTransactionNotFoundException(platform.getServerGBId(), channel.getGbDeviceId(), callId, stream);
}
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getApp(), ssrcTransaction.getStream());
sessionManager.removeByStream(ssrcTransaction.getApp(), ssrcTransaction.getStream());
Request byteRequest = headerProviderPlatformProvider.createByteRequest(platform, channel.getGbDeviceId(), ssrcTransaction.getSipTransactionInfo());
@ -697,7 +699,7 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
@Override
public void broadcastInviteCmd(Platform platform, CommonGBChannel channel,String sourceId, MediaServer mediaServerItem,
SSRCInfo ssrcInfo, HookSubscribe.Event event, SipSubscribe.Event okEvent,
SSRCInfo ssrcInfo, SipSubscribe.Event okEvent,
SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException {
String stream = ssrcInfo.getStream();
@ -706,13 +708,6 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
}
log.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
Hook hook = Hook.getInstance(HookType.on_media_arrival, MediaApp.GB28181, stream, mediaServerItem.getId());
subscribe.addSubscribe(hook, (hookData) -> {
if (event != null) {
event.response(hookData);
subscribe.removeSubscribe(hook);
}
});
String sdpIp = mediaServerItem.getSdpIp();
StringBuffer content = new StringBuffer(200);
@ -753,7 +748,6 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), request, (e -> {
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
subscribe.removeSubscribe(hook);
errorEvent.response(e);
}), e -> {
ResponseEvent responseEvent = (ResponseEvent) e.event;

View File

@ -15,6 +15,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IReceiveRtpServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@ -85,6 +86,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired
private IRedisRpcService redisRpcService;
@Autowired
private IReceiveRtpServerService receiveRtpServerService;
@Override
public void afterPropertiesSet() throws Exception {
@ -228,7 +232,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
deviceChannelService.stopPlay(channel.getId());
inviteStreamService.removeInviteInfo(inviteInfo);
if (inviteInfo.getStreamInfo() != null) {
mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream());
receiveRtpServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream());
}
}
break;

View File

@ -247,7 +247,7 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
streamInfoResult.setMediaInfo(mediaInfo);
if (!"broadcast".equalsIgnoreCase(app) && !ObjectUtils.isEmpty(mediaServer.getTranscodeSuffix()) && !"null".equalsIgnoreCase(mediaServer.getTranscodeSuffix())) {
if (!MediaApp.GB28181_BROADCAST.equalsIgnoreCase(app) && !ObjectUtils.isEmpty(mediaServer.getTranscodeSuffix()) && !"null".equalsIgnoreCase(mediaServer.getTranscodeSuffix())) {
String newStream = stream + "_" + mediaServer.getTranscodeSuffix();
mediaServer.setTranscodeSuffix(null);
StreamInfo transcodeStreamInfo = getStreamInfoByAppAndStream(mediaServer, app, newStream, null, addr, callId, isPlay);
@ -444,7 +444,7 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
@Override
public List<String> listRtpServer(MediaServer mediaServer) {
ABLResult ablResult = ablresTfulUtils.getMediaList(mediaServer, MediaApp.GB28181, null);
ABLResult ablResult = ablresTfulUtils.getMediaList(mediaServer, null, null);
if (ablResult.getCode() != 0) {
return null;
}

View File

@ -34,21 +34,12 @@ public interface IMediaServerService {
void updateVmServer(List<MediaServer> mediaServerItemList);
SSRCInfo openRTPServer(MediaServer mediaServerItem, String app, String streamId, String presetSsrc, boolean ssrcCheck,
boolean isPlayback, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode);
void closeRTPServer(MediaServer mediaServerItem, String app, String streamId);
void closeRTPServer(MediaServer mediaServerItem, String app, String streamId, CommonCallback<Boolean> callback);
// SSRCInfo openJTTServer(MediaServer mediaServerItem, String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode);
//
// void closeJTTServer(MediaServer mediaServerItem, String streamId, CommonCallback<Boolean> callback);
Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String app, String streamId, String ssrc);
void closeRTPServer(String mediaServerId, String app, String streamId);
void clearRTPServer(MediaServer mediaServerItem);
void update(MediaServer mediaSerItem);

View File

@ -162,47 +162,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
}
@Override
public SSRCInfo openRTPServer(MediaServer mediaServer, String app, String streamId, String presetSsrc, boolean ssrcCheck,
boolean isPlayback, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) {
if (mediaServer == null || mediaServer.getId() == null) {
log.info("[openRTPServer] 失败, mediaServer == null || mediaServer.getId() == null");
return null;
}
// 获取mediaServer可用的ssrc
String ssrc;
if (presetSsrc != null) {
ssrc = presetSsrc;
}else {
if (isPlayback) {
ssrc = ssrcFactory.getPlayBackSsrc(mediaServer.getId());
}else {
ssrc = ssrcFactory.getPlaySsrc(mediaServer.getId());
}
}
if (streamId == null) {
streamId = String.format("%08x", Long.parseLong(ssrc)).toUpperCase();
}
if (ssrcCheck && tcpMode > 0) {
// 目前zlm不支持 tcp模式更新ssrc暂时关闭ssrc校验
log.warn("[openRTPServer] 平台对接时下级可能自定义ssrc但是tcp模式zlm收流目前无法更新ssrc可能收流超时此时请使用udp收流或者关闭ssrc校验");
}
int rtpServerPort;
if (mediaServer.isRtpEnable()) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[openRTPServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return null;
}
rtpServerPort = mediaNodeServerService.createRTPServer(mediaServer, app, streamId, ssrcCheck ? Long.parseLong(ssrc) : 0, port, onlyAuto, disableAudio, reUsePort, tcpMode);
} else {
rtpServerPort = mediaServer.getRtpProxyPort();
}
return new SSRCInfo(rtpServerPort, ssrc, app, streamId);
}
@Override
public int createRTPServer(MediaServer mediaServer, String app, String streamId, long ssrc, Integer port, boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode) {
int rtpServerPort;
@ -219,27 +178,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
return rtpServerPort;
}
// @Override
// public SSRCInfo openJTTServer(MediaServer mediaServer, @NotNull String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode) {
// if (mediaServer == null || mediaServer.getId() == null) {
// log.info("[openJTTServer] 失败, mediaServer == null || mediaServer.getId() == null");
// return null;
// }
//
// int rtpServerPort;
// if (mediaServer.isRtpEnable()) {
// IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
// if (mediaNodeServerService == null) {
// log.info("[openJTTServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
// return null;
// }
// rtpServerPort = mediaNodeServerService.createJTTServer(mediaServer, streamId, port, disableVideo, disableAudio, tcpMode);
// } else {
// rtpServerPort = mediaServer.getJttProxyPort();
// }
// return new SSRCInfo(rtpServerPort, null, "1078", streamId, null);
// }
@Override
public List<String> listRtpServer(MediaServer mediaServer) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
@ -277,37 +215,6 @@ public class MediaServerServiceImpl implements IMediaServerService {
mediaNodeServerService.closeRtpServer(mediaServer, app, streamId, callback);
}
@Override
public void closeRTPServer(String mediaServerId, String app, String streamId) {
MediaServer mediaServer = this.getOne(mediaServerId);
if (mediaServer == null) {
return;
}
if (mediaServer.isRtpEnable()) {
closeRTPServer(mediaServer, app, streamId);
}
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[closeRTPServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return;
}
mediaNodeServerService.closeStreams(mediaServer, app, streamId);
}
// @Override
// public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
// if (mediaServer == null) {
// callback.run(false);
// return;
// }
// IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
// if (mediaNodeServerService == null) {
// log.info("[closeJTTServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
// return;
// }
// mediaNodeServerService.closeJTTServer(mediaServer, streamId, callback);
// }
@Override
public Boolean updateRtpServerSSRC(MediaServer mediaServer, String app, String streamId, String ssrc) {
if (mediaServer == null) {

View File

@ -5,6 +5,7 @@ import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
@ -679,7 +680,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
streamInfoResult.setMediaInfo(mediaInfo);
if (!"broadcast".equalsIgnoreCase(app) && !ObjectUtils.isEmpty(mediaServer.getTranscodeSuffix()) && !"null".equalsIgnoreCase(mediaServer.getTranscodeSuffix())) {
if (!MediaApp.GB28181_BROADCAST.equalsIgnoreCase(app) && !ObjectUtils.isEmpty(mediaServer.getTranscodeSuffix()) && !"null".equalsIgnoreCase(mediaServer.getTranscodeSuffix())) {
String newStream = stream + "_" + mediaServer.getTranscodeSuffix();
mediaServer.setTranscodeSuffix(null);
StreamInfo transcodeStreamInfo = getStreamInfoByAppAndStream(mediaServer, app, newStream, null, addr, callId, isPlay);

View File

@ -17,4 +17,6 @@ public interface IReceiveRtpServerService {
int openRTPServer(RTPServerParam rtpServerParam, ErrorCallback<HookData> callback);
void closeRTPServer(MediaServer mediaServer, String app, String stream);
void closeRTPServerByMediaServerId(String mediaServerId, String app, String stream);
}

View File

@ -97,7 +97,7 @@ public class MediaServiceImpl implements IMediaService {
public ResultForOnPublish authenticatePublish(MediaServer mediaServer, String app, String stream, String params) {
// 推流鉴权的处理
if (!MediaApp.GB28181.equals(app) && !MediaApp.JT1078.equals(app) ) {
if ("talk".equals(app) && stream.endsWith("_talk")) {
if (MediaApp.GB28181_TALK.equals(app) && stream.endsWith("_talk")) {
ResultForOnPublish result = new ResultForOnPublish();
result.setEnable_mp4(false);
result.setEnable_audio(true);
@ -215,10 +215,10 @@ public class MediaServiceImpl implements IMediaService {
result.setEnable_audio(true);
}
}
} else if (app.equals("broadcast")) {
} else if (app.equals(MediaApp.GB28181_BROADCAST)) {
result.setEnable_audio(true);
result.setEnable_mp4(userSetting.getRecordSip());
} else if (app.equals("talk")) {
} else if (app.equals(MediaApp.GB28181_TALK)) {
result.setEnable_audio(true);
result.setEnable_mp4(userSetting.getRecordSip());
}else {
@ -274,7 +274,7 @@ public class MediaServiceImpl implements IMediaService {
}else {
return false;
}
}else if ("talk".equals(app) || "broadcast".equals(app)) {
}else if (MediaApp.GB28181_TALK.equals(app) || MediaApp.GB28181_BROADCAST.equals(app)) {
return false;
} else if ("mp4_record".equals(app)) {
return true;

View File

@ -114,7 +114,9 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
if (presetSSRC == null) {
ssrcFactory.releaseSsrc(mediaServer.getId(), ssrc);
}
callback.run(code, msg, null);
OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult();
openRTPServerResult.setSsrcInfo(ssrcInfo);
callback.run(code, msg, openRTPServerResult);
}
}));
ssrcInfo.setPort(rtpServerPort);
@ -177,4 +179,13 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
}
mediaServerService.closeRTPServer(mediaServer, app, stream);
}
@Override
public void closeRTPServerByMediaServerId(String mediaServerId, String app, String stream) {
MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
if (mediaServer == null) {
return;
}
closeRTPServer(mediaServer, app, stream);
}
}

View File

@ -104,7 +104,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaNotFoundEvent event) {
if (MediaApp.GB28181.equals(event.getApp())) {
if (MediaApp.isKeywords(event.getApp())) {
return;
}
// 拉流代理

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.streamPush.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
@ -101,7 +102,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
updatePushStatus(streamPushInDb);
}
// 冗余数据自己系统中自用
if (!"broadcast".equals(event.getApp()) && !"talk".equals(event.getApp())) {
if (!MediaApp.GB28181_BROADCAST.equals(event.getApp()) && !MediaApp.GB28181_TALK.equals(event.getApp())) {
redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event.getMediaInfo());
}

View File

@ -12,7 +12,10 @@ import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IReceiveRtpServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.RTPServerParam;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@ -49,6 +52,9 @@ public class PsController {
@Autowired
private ISendRtpServerService sendRtpServerService;
@Autowired
private IReceiveRtpServerService receiveRtpServerService;
@Autowired
private UserSetting userSetting;
@ -93,37 +99,44 @@ public class PsController {
}
}
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + callId + "_" + stream;
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, MediaApp.GB28181, stream, ssrcInt + "", false, false, null, false, false, false, tcpMode);
if (ssrcInfo.getPort() == 0) {
RTPServerParam rtpServerParam = new RTPServerParam();
rtpServerParam.setMediaServer(mediaServer);
rtpServerParam.setApp(MediaApp.GB28181);
rtpServerParam.setStreamId(stream);
rtpServerParam.setSsrc(ssrcInt);
rtpServerParam.setTcpMode(tcpMode);
int rtpServerPort = receiveRtpServerService.openRTPServer(rtpServerParam, ((code, msg, data) -> {
if (callBack == null) {
return;
}
if (code == InviteErrorCode.SUCCESS.getCode()) {
log.info("[第三方PS服务对接->开启收流和获取发流信息] 成功回调callId->{}, data->{}", callId, data);
// 将信息写入redis中以备后用
redisTemplate.delete(receiveKey);
OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
OkHttpClient client = httpClientBuilder.build();
String url = callBack + "?callId=" + callId;
Request request = new Request.Builder().get().url(url).build();
try {
client.newCall(request).execute();
} catch (IOException e) {
log.error("[第三方PS服务对接->开启收流和获取发流信息] 成功回调 callId->{}, 发送回调失败", callId, e);
}
} else {
log.info("[第三方PS服务对接->开启收流和获取发流信息] 失败回调callId->{}, code->{}, msg->{}", callId, code, msg);
// 将信息写入redis中以备后用
redisTemplate.delete(receiveKey);
}
}));
if (rtpServerPort == 0) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败");
}
// 注册回调如果rtp收流超时则通过回调发送通知
if (callBack != null) {
Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, MediaApp.GB28181, stream, mediaServer.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.addSubscribe(hook,
(hookData)->{
if (stream.equals(hookData.getStream())) {
log.info("[第三方PS服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId);
// 将信息写入redis中以备后用
redisTemplate.delete(receiveKey);
OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
OkHttpClient client = httpClientBuilder.build();
String url = callBack + "?callId=" + callId;
Request request = new Request.Builder().get().url(url).build();
try {
client.newCall(request).execute();
} catch (IOException e) {
log.error("[第三方PS服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e);
}
hookSubscribe.removeSubscribe(hook);
}
});
}
OtherPsSendInfo otherPsSendInfo = new OtherPsSendInfo();
otherPsSendInfo.setReceiveIp(mediaServer.getSdpIp());
otherPsSendInfo.setReceivePort(ssrcInfo.getPort());
otherPsSendInfo.setReceivePort(rtpServerPort);
otherPsSendInfo.setCallId(callId);
otherPsSendInfo.setStream(stream);
@ -150,7 +163,7 @@ public class PsController {
public void closeRtpServer(String stream) {
log.info("[第三方PS服务对接->关闭收流] stream->{}", stream);
MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
mediaServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream);
receiveRtpServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream);
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_*_" + stream;
List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey);
if (!scan.isEmpty()) {

View File

@ -12,7 +12,10 @@ import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IReceiveRtpServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.RTPServerParam;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@ -44,6 +47,9 @@ public class RtpController {
@Autowired
private ISendRtpServerService sendRtpServerService;
@Autowired
private IReceiveRtpServerService receiveRtpServerService;
@Autowired
private HookSubscribe hookSubscribe;
@ -93,37 +99,52 @@ public class RtpController {
}
}
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + callId + "_" + stream;
SSRCInfo ssrcInfoForVideo = mediaServerService.openRTPServer(mediaServer, MediaApp.GB28181, stream, ssrcInt + "",false,false, null, false, false, false, tcpMode);
SSRCInfo ssrcInfoForAudio = mediaServerService.openRTPServer(mediaServer, MediaApp.GB28181,stream + "_a", ssrcInt + "", false, false, null, false,false,false, tcpMode);
if (ssrcInfoForVideo.getPort() == 0 || ssrcInfoForAudio.getPort() == 0) {
RTPServerParam rtpServerParam = new RTPServerParam();
rtpServerParam.setMediaServer(mediaServer);
rtpServerParam.setApp(MediaApp.GB28181);
rtpServerParam.setStreamId(stream);
rtpServerParam.setSsrc(ssrcInt);
rtpServerParam.setTcpMode(tcpMode);
int rtpServerPortForVideo = receiveRtpServerService.openRTPServer(rtpServerParam, ((code, msg, data) -> {
if (callBack == null) {
return;
}
if (code == InviteErrorCode.SUCCESS.getCode()) {
log.info("[开启收流和获取发流信息] 视频流收流成功callId->{}stream->{}", callId, stream);
OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
OkHttpClient client = httpClientBuilder.build();
String url = callBack + "?callId=" + callId;
Request request = new Request.Builder().get().url(url).build();
try {
client.newCall(request).execute();
} catch (IOException e) {
log.error("[第三方服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e);
}
}else {
log.info("[开启收流和获取发流信息] 视频流收流失败callId->{}stream->{}", callId, stream);
}
}));
rtpServerParam.setStreamId(stream + "_a");
int rtpServerPortForAudio = receiveRtpServerService.openRTPServer(rtpServerParam, ((code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
log.info("[开启收流和获取发流信息] 音频流收流成功callId->{}stream->{}", callId, stream);
}else {
log.info("[开启收流和获取发流信息] 音频流收流失败callId->{}stream->{}", callId, stream);
}
}));
if (rtpServerPortForVideo == 0 || rtpServerPortForAudio == 0) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败");
}
// 注册回调如果rtp收流超时则通过回调发送通知
if (callBack != null) {
Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, MediaApp.GB28181, stream, mediaServer.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.addSubscribe(hook,
(hookData)->{
if (stream.equals(hookData.getStream())) {
log.info("[开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId);
OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
OkHttpClient client = httpClientBuilder.build();
String url = callBack + "?callId=" + callId;
Request request = new Request.Builder().get().url(url).build();
try {
client.newCall(request).execute();
} catch (IOException e) {
log.error("[第三方服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e);
}
hookSubscribe.removeSubscribe(hook);
}
});
}
String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId;
OtherRtpSendInfo otherRtpSendInfo = new OtherRtpSendInfo();
otherRtpSendInfo.setReceiveIp(mediaServer.getSdpIp());
otherRtpSendInfo.setReceivePortForVideo(ssrcInfoForVideo.getPort());
otherRtpSendInfo.setReceivePortForAudio(ssrcInfoForAudio.getPort());
otherRtpSendInfo.setReceivePortForVideo(rtpServerPortForVideo);
otherRtpSendInfo.setReceivePortForAudio(rtpServerPortForAudio);
otherRtpSendInfo.setCallId(callId);
otherRtpSendInfo.setStream(stream);
@ -153,8 +174,8 @@ public class RtpController {
public void closeRtpServer(String stream) {
log.info("[第三方服务对接->关闭收流] stream->{}", stream);
MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
mediaServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream);
mediaServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream+ "_a");
receiveRtpServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream);
receiveRtpServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream+ "_a");
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_*_" + stream;
List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey);
if (scan.size() > 0) {