diff --git a/src/main/java/com/genersoft/iot/vmp/common/enums/MediaApp.java b/src/main/java/com/genersoft/iot/vmp/common/enums/MediaApp.java index 6c853ad07..8cb6c4306 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/enums/MediaApp.java +++ b/src/main/java/com/genersoft/iot/vmp/common/enums/MediaApp.java @@ -2,7 +2,11 @@ package com.genersoft.iot.vmp.common.enums; public class MediaApp { public final static String GB28181 = "rtp"; + public final static String GB28181_TALK = "talk"; + public final static String GB28181_BROADCAST = "broadcast"; public final static String JT1078 = "1078"; - + public static boolean isKeywords(String app) { + return GB28181.equals(app) || GB28181_TALK.equals(app) || GB28181_BROADCAST.equals(app) || JT1078.equals(app); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 8b318d43a..dccbbffad 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -28,6 +28,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IReceiveRtpServerService; import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; @@ -108,6 +109,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { @Autowired private IMediaServerService mediaServerService; + @Autowired + private IReceiveRtpServerService receiveRtpServerService; + @Autowired private AudioBroadcastManager audioBroadcastManager; @@ -248,7 +252,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) { for (SsrcTransaction ssrcTransaction : ssrcTransactions) { mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); - mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getApp(), ssrcTransaction.getStream()); + receiveRtpServerService.closeRTPServerByMediaServerId(ssrcTransaction.getMediaServerId(), ssrcTransaction.getApp(), ssrcTransaction.getStream()); sessionManager.removeByCallId(ssrcTransaction.getCallId()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index 8a5f7ae2d..8603c3dfe 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -29,6 +29,7 @@ import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IReceiveRtpServerService; import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.service.bean.*; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; @@ -111,6 +112,9 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner @Autowired private ISendRtpServerService sendRtpServerService; + @Autowired + private IReceiveRtpServerService receiveRtpServerService; + @Autowired private PlatformStatusTaskRunner statusTaskRunner; @@ -619,7 +623,37 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner } else { tcpMode = 0; } - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, MediaApp.GB28181, streamId, null, ssrcCheck, false, null, true, false, false, tcpMode); + + + SSRCInfo ssrcInfo = receiveRtpServerService.openGbRTPServer(mediaServerItem, streamId, null, tcpMode, + false, ssrcCheck, true, false, ((code, msg, data) -> { + if (code == InviteErrorCode.SUCCESS.getCode() && data != null && data.getHookData() != null) { + log.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channel.getGbDeviceId()); + HookData hookData = data.getHookData(); + // hook响应 + onPublishHandlerForBroadcast(hookData.getMediaServer(), hookData.getMediaInfo(), platform, channel); + // 收到流 + if (hookEvent != null) { + hookEvent.response(hookData); + } + }else { + InviteInfo inviteInfoForBroadcast = inviteStreamService.getInviteInfo(InviteSessionType.BROADCAST, channel.getGbId(), null); + if (inviteInfoForBroadcast == null) { + log.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {}", platform.getServerGBId(), channel.getGbDeviceId()); + // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 + try { + commanderForPlatform.streamByeCmd(platform, channel, data.getSsrcInfo().getApp(), data.getSsrcInfo().getStream(), null, null); + } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { + log.error("[点播超时], 发送BYE失败 {}", e.getMessage()); + } finally { + timeoutCallback.run(1, "收流超时"); + mediaServerService.releaseSsrc(mediaServerItem.getId(), data.getSsrcInfo().getSsrc()); + receiveRtpServerService.closeRTPServer(mediaServerItem, data.getSsrcInfo().getApp(), data.getSsrcInfo().getStream()); + sessionManager.removeByStream(data.getSsrcInfo().getApp(), data.getSsrcInfo().getStream()); + } + } + } + })); if (ssrcInfo == null || ssrcInfo.getPort() < 0) { log.info("[国标级联] 发起语音喊话 开启端口监听失败, platform: {}, channel: {}", platform.getServerGBId(), channel.getGbDeviceId()); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult<>(); @@ -637,37 +671,8 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner mediaServerItem.getSdpIp(), ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), InviteSessionType.BROADCAST, InviteSessionStatus.ready, userSetting.getRecordSip()); inviteStreamService.updateInviteInfo(inviteInfo); - String timeOutTaskKey = UUID.randomUUID().toString(); - dynamicTask.startDelay(timeOutTaskKey, () -> { - // 执行超时任务时查询是否已经成功,成功了则不执行超时任务,防止超时任务取消失败的情况 - InviteInfo inviteInfoForBroadcast = inviteStreamService.getInviteInfo(InviteSessionType.BROADCAST, channel.getGbId(), null); - if (inviteInfoForBroadcast == null) { - log.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", platform.getServerGBId(), channel.getGbDeviceId(), ssrcInfo.getPort(), ssrcInfo.getSsrc()); - // 点播超时回复BYE 同时释放ssrc以及此次点播的资源 - try { - commanderForPlatform.streamByeCmd(platform, channel, ssrcInfo.getApp(), ssrcInfo.getStream(), null, null); - } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { - log.error("[点播超时], 发送BYE失败 {}", e.getMessage()); - } finally { - timeoutCallback.run(1, "收流超时"); - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream()); - sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); - } - } - }, userSetting.getPlayTimeout()); - commanderForPlatform.broadcastInviteCmd(platform, channel,sourceId, mediaServerItem, ssrcInfo, (hookData)->{ - log.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channel.getGbDeviceId()); - dynamicTask.stop(timeOutTaskKey); - // hook响应 - onPublishHandlerForBroadcast(hookData.getMediaServer(), hookData.getMediaInfo(), platform, channel); - // 收到流 - if (hookEvent != null) { - hookEvent.response(hookData); - } - }, event -> { - - inviteOKHandler(event, ssrcInfo, tcpMode, ssrcCheck, mediaServerItem, platform, channel, timeOutTaskKey, + commanderForPlatform.broadcastInviteCmd(platform, channel,sourceId, mediaServerItem, ssrcInfo, event -> { + inviteOKHandler(event, ssrcInfo, tcpMode, ssrcCheck, mediaServerItem, platform, channel, null, inviteInfo, InviteSessionType.BROADCAST); }, eventResult -> { // 收到错误回复 @@ -690,7 +695,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner } private void inviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, int tcpMode, boolean ssrcCheck, MediaServer mediaServerItem, - Platform platform, CommonGBChannel channel, String timeOutTaskKey, ErrorCallback callback, + Platform platform, CommonGBChannel channel, ErrorCallback callback, InviteInfo inviteInfo, InviteSessionType inviteSessionType){ inviteInfo.setStatus(InviteSessionStatus.ok); ResponseEvent responseEvent = (ResponseEvent) eventResult.event; @@ -706,7 +711,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner // 多端口 if (tcpMode == 2) { tcpActiveHandler(platform, channel, contentString, mediaServerItem, tcpMode, ssrcCheck, - timeOutTaskKey, ssrcInfo, callback); + ssrcInfo, callback); } }else { // 单端口 @@ -732,20 +737,18 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner commanderForPlatform.streamByeCmd(platform, channel, ssrcInfo.getApp(), ssrcInfo.getStream(), null, null); } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { log.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage()); + } finally { + // 释放ssrc + mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); + receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream()); + sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); + + callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), + "下级自定义了ssrc,重新设置收流信息失败", null); + inviteStreamService.call(inviteSessionType, channel.getGbId(), null, + InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), + "下级自定义了ssrc,重新设置收流信息失败", null); } - - dynamicTask.stop(timeOutTaskKey); - // 释放ssrc - mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - - sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); - - callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), - "下级自定义了ssrc,重新设置收流信息失败", null); - inviteStreamService.call(inviteSessionType, channel.getGbId(), null, - InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(), - "下级自定义了ssrc,重新设置收流信息失败", null); - }else { ssrcInfo.setSsrc(ssrcInResponse); inviteInfo.setSsrcInfo(ssrcInfo); @@ -753,7 +756,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner if (tcpMode == 2) { if (mediaServerItem.isRtpEnable()) { tcpActiveHandler(platform, channel, contentString, mediaServerItem, tcpMode, ssrcCheck, - timeOutTaskKey, ssrcInfo, callback); + ssrcInfo, callback); }else { log.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流"); } @@ -767,7 +770,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner if (tcpMode == 2) { if (mediaServerItem.isRtpEnable()) { tcpActiveHandler(platform, channel, contentString, mediaServerItem, tcpMode, ssrcCheck, - timeOutTaskKey, ssrcInfo, callback); + ssrcInfo, callback); }else { log.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流"); } @@ -800,7 +803,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner private void tcpActiveHandler(Platform platform, CommonGBChannel channel, String contentString, MediaServer mediaServerItem, int tcpMode, boolean ssrcCheck, - String timeOutTaskKey, SSRCInfo ssrcInfo, ErrorCallback callback){ + SSRCInfo ssrcInfo, ErrorCallback callback){ if (tcpMode != 2) { return; } @@ -831,11 +834,9 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner log.info("[TCP主动连接对方] 结果: {}", result); } catch (SdpException e) { log.error("[TCP主动连接对方] serverGbId: {}, channelId: {}, 解析200OK的SDP信息失败", platform.getServerGBId(), channel.getGbDeviceId(), e); - dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream()); + receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream()); // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(), @@ -856,7 +857,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { log.warn("[消息发送失败] 停止语音对讲, 平台:{},通道:{}", platform.getId(), channel.getGbDeviceId() ); } finally { - mediaServerService.closeRTPServer(mediaServerItem, app, stream); + receiveRtpServerService.closeRTPServer(mediaServerItem, app, stream); InviteInfo inviteInfo = inviteStreamService.getInviteInfo(null, channel.getGbId(), stream); if (inviteInfo != null) { // 释放ssrc diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index 416e747be..9e2b0be78 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -133,7 +133,7 @@ public class PlayServiceImpl implements IPlayService { @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaArrivalEvent event) { - if ("broadcast".equals(event.getApp()) || "talk".equals(event.getApp())) { + if (MediaApp.GB28181_BROADCAST.equals(event.getApp()) || MediaApp.GB28181_TALK.equals(event.getApp())) { if (event.getStream().indexOf("_") > 0) { String[] streamArray = event.getStream().split("_"); if (streamArray.length == 2) { @@ -149,7 +149,7 @@ public class PlayServiceImpl implements IPlayService { log.info("[语音对讲/喊话] 未找到通道:{}", channelId); return; } - if ("broadcast".equals(event.getApp())) { + if (MediaApp.GB28181_BROADCAST.equals(event.getApp())) { if (audioBroadcastManager.exit(channel.getId())) { stopAudioBroadcast(device, channel); } @@ -160,7 +160,7 @@ public class PlayServiceImpl implements IPlayService { } catch (InvalidArgumentException | ParseException | SipException e) { log.error("[命令发送失败] 语音对讲: {}", e.getMessage()); } - }else if ("talk".equals(event.getApp())) { + }else if (MediaApp.GB28181_TALK.equals(event.getApp())) { // 开启语音对讲通道 talkCmd(device, channel, event.getMediaServer(), event.getStream(), (msg) -> log.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId)); } @@ -205,7 +205,7 @@ public class PlayServiceImpl implements IPlayService { } } - if ("broadcast".equals(event.getApp()) || "talk".equals(event.getApp())) { + if (MediaApp.GB28181_BROADCAST.equals(event.getApp()) || MediaApp.GB28181_TALK.equals(event.getApp())) { if (event.getStream().indexOf("_") > 0) { String[] streamArray = event.getStream().split("_"); if (streamArray.length == 2) { @@ -221,9 +221,9 @@ public class PlayServiceImpl implements IPlayService { log.info("[语音对讲/喊话] 未找到通道:{}", channelId); return; } - if ("broadcast".equals(event.getApp())) { + if (MediaApp.GB28181_BROADCAST.equals(event.getApp())) { stopAudioBroadcast(device, channel); - }else if ("talk".equals(event.getApp())) { + }else if (MediaApp.GB28181_TALK.equals(event.getApp())) { stopTalk(device, channel, false); } } @@ -483,8 +483,7 @@ public class PlayServiceImpl implements IPlayService { private void talk(MediaServer mediaServerItem, Device device, DeviceChannel channel, String stream, - HookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent, - Runnable timeoutCallback, AudioBroadcastEvent audioEvent) { + SipSubscribe.Event errorEvent, Runnable timeoutCallback, AudioBroadcastEvent audioEvent) { String playSsrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId()); @@ -494,7 +493,7 @@ public class PlayServiceImpl implements IPlayService { } SendRtpInfo sendRtpInfo; try { - sendRtpInfo = sendRtpServerService.createSendRtpInfo(mediaServerItem, null, null, playSsrc, device.getDeviceId(), "talk", stream, + sendRtpInfo = sendRtpServerService.createSendRtpInfo(mediaServerItem, null, null, playSsrc, device.getDeviceId(), MediaApp.GB28181_TALK, stream, channel.getId(), true, false); sendRtpInfo.setPlayType(InviteStreamType.TALK); }catch (PlayException e) { @@ -583,7 +582,7 @@ public class PlayServiceImpl implements IPlayService { }, (event) -> { dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream()); + receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream()); // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc()); sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream()); @@ -593,7 +592,7 @@ public class PlayServiceImpl implements IPlayService { log.error("[命令发送失败] 对讲消息: {}", e.getMessage()); dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream()); + receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream()); // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc()); @@ -1228,7 +1227,7 @@ public class PlayServiceImpl implements IPlayService { if (broadcastMode == null) { broadcastMode = true; } - String app = broadcastMode?"broadcast":"talk"; + String app = broadcastMode ? MediaApp.GB28181_BROADCAST : MediaApp.GB28181_TALK; String stream = device.getDeviceId() + "_" + deviceChannel.getDeviceId(); AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult(); audioBroadcastResult.setApp(app); @@ -1530,7 +1529,7 @@ public class PlayServiceImpl implements IPlayService { SendRtpInfo sendRtpInfo = sendRtpServerService.queryByChannelId(channel.getId(), device.getDeviceId()); if (sendRtpInfo != null) { MediaServer mediaServer = mediaServerService.getOne(sendRtpInfo.getMediaServerId()); - Boolean streamReady = mediaServerService.isStreamReady(mediaServer, MediaApp.GB28181, sendRtpInfo.getReceiveStream()); + Boolean streamReady = mediaServerService.isStreamReady(mediaServer, MediaApp.GB28181_TALK, sendRtpInfo.getReceiveStream()); if (streamReady) { log.warn("[语音对讲] 进行中: {}", channel.getDeviceId()); event.call("语音对讲进行中"); @@ -1540,9 +1539,7 @@ public class PlayServiceImpl implements IPlayService { } } - talk(mediaServerItem, device, channel, stream, (hookData) -> { - log.info("[语音对讲] 收到设备发来的流"); - }, eventResult -> { + talk(mediaServerItem, device, channel, stream, eventResult -> { log.warn("[语音对讲] 失败,{}/{}, 错误码 {} {}", device.getDeviceId(), channel.getDeviceId(), eventResult.statusCode, eventResult.msg); event.call("失败,错误码 " + eventResult.statusCode + ", " + eventResult.msg); }, () -> { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java index 70efe78be..25edd7b3e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/ISIPCommanderForPlatform.java @@ -147,8 +147,7 @@ public interface ISIPCommanderForPlatform { void streamByeCmd(Platform platform, CommonGBChannel channel, String app, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException; void broadcastInviteCmd(Platform platform, CommonGBChannel channel, String sourceId, MediaServer mediaServerItem, - SSRCInfo ssrcInfo, HookSubscribe.Event event, SipSubscribe.Event okEvent, - SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException; + SSRCInfo ssrcInfo, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException; void broadcastResultCmd(Platform platform, CommonGBChannel deviceChannel, String sn, boolean result, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index e87f2f0ba..51f296a6a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -550,7 +550,7 @@ public class SIPCommander implements ISIPCommander { // 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值 ResponseEvent responseEvent = (ResponseEvent) e.event; SIPResponse response = (SIPResponse) responseEvent.getResponse(); - SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), "talk",sendRtpItem.getApp(), stream, sendRtpItem.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.TALK); + SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), MediaApp.GB28181_TALK,sendRtpItem.getApp(), stream, sendRtpItem.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.TALK); sessionManager.put(ssrcTransaction); okEvent.response(e); }, timeout); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java index 78eb04d0d..2328b0abc 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java @@ -19,6 +19,7 @@ import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IReceiveRtpServerService; import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -61,6 +62,9 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform { @Autowired private IMediaServerService mediaServerService; + @Autowired + private IReceiveRtpServerService receiveRtpServerService; + @Autowired private SipLayer sipLayer; @@ -642,7 +646,7 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform { MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem != null) { mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); - mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); + receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); } SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem, channel); if (byeRequest == null) { @@ -664,8 +668,6 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform { throw new SsrcTransactionNotFoundException(platform.getServerGBId(), channel.getGbDeviceId(), callId, stream); } - mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); - mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getApp(), ssrcTransaction.getStream()); sessionManager.removeByStream(ssrcTransaction.getApp(), ssrcTransaction.getStream()); Request byteRequest = headerProviderPlatformProvider.createByteRequest(platform, channel.getGbDeviceId(), ssrcTransaction.getSipTransactionInfo()); @@ -697,7 +699,7 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform { @Override public void broadcastInviteCmd(Platform platform, CommonGBChannel channel,String sourceId, MediaServer mediaServerItem, - SSRCInfo ssrcInfo, HookSubscribe.Event event, SipSubscribe.Event okEvent, + SSRCInfo ssrcInfo, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException { String stream = ssrcInfo.getStream(); @@ -706,13 +708,6 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform { } log.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); - Hook hook = Hook.getInstance(HookType.on_media_arrival, MediaApp.GB28181, stream, mediaServerItem.getId()); - subscribe.addSubscribe(hook, (hookData) -> { - if (event != null) { - event.response(hookData); - subscribe.removeSubscribe(hook); - } - }); String sdpIp = mediaServerItem.getSdpIp(); StringBuffer content = new StringBuffer(200); @@ -753,7 +748,6 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform { sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), request, (e -> { sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - subscribe.removeSubscribe(hook); errorEvent.response(e); }), e -> { ResponseEvent responseEvent = (ResponseEvent) e.event; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index ea0284596..0868ef826 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -15,6 +15,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IReceiveRtpServerService; import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -85,6 +86,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In @Autowired private IRedisRpcService redisRpcService; + @Autowired + private IReceiveRtpServerService receiveRtpServerService; + @Override public void afterPropertiesSet() throws Exception { @@ -228,7 +232,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In deviceChannelService.stopPlay(channel.getId()); inviteStreamService.removeInviteInfo(inviteInfo); if (inviteInfo.getStreamInfo() != null) { - mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream()); + receiveRtpServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream()); } } break; diff --git a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java index 6b22de5fb..2e029197b 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java @@ -247,7 +247,7 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService { streamInfoResult.setMediaInfo(mediaInfo); - if (!"broadcast".equalsIgnoreCase(app) && !ObjectUtils.isEmpty(mediaServer.getTranscodeSuffix()) && !"null".equalsIgnoreCase(mediaServer.getTranscodeSuffix())) { + if (!MediaApp.GB28181_BROADCAST.equalsIgnoreCase(app) && !ObjectUtils.isEmpty(mediaServer.getTranscodeSuffix()) && !"null".equalsIgnoreCase(mediaServer.getTranscodeSuffix())) { String newStream = stream + "_" + mediaServer.getTranscodeSuffix(); mediaServer.setTranscodeSuffix(null); StreamInfo transcodeStreamInfo = getStreamInfoByAppAndStream(mediaServer, app, newStream, null, addr, callId, isPlay); @@ -444,7 +444,7 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService { @Override public List listRtpServer(MediaServer mediaServer) { - ABLResult ablResult = ablresTfulUtils.getMediaList(mediaServer, MediaApp.GB28181, null); + ABLResult ablResult = ablresTfulUtils.getMediaList(mediaServer, null, null); if (ablResult.getCode() != 0) { return null; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java index aba24a9ca..b33f68daa 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java @@ -34,21 +34,12 @@ public interface IMediaServerService { void updateVmServer(List mediaServerItemList); - SSRCInfo openRTPServer(MediaServer mediaServerItem, String app, String streamId, String presetSsrc, boolean ssrcCheck, - boolean isPlayback, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode); - void closeRTPServer(MediaServer mediaServerItem, String app, String streamId); void closeRTPServer(MediaServer mediaServerItem, String app, String streamId, CommonCallback callback); -// SSRCInfo openJTTServer(MediaServer mediaServerItem, String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode); -// -// void closeJTTServer(MediaServer mediaServerItem, String streamId, CommonCallback callback); - Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String app, String streamId, String ssrc); - void closeRTPServer(String mediaServerId, String app, String streamId); - void clearRTPServer(MediaServer mediaServerItem); void update(MediaServer mediaSerItem); diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 34ef13719..5614b51a7 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -162,47 +162,6 @@ public class MediaServerServiceImpl implements IMediaServerService { } } - - @Override - public SSRCInfo openRTPServer(MediaServer mediaServer, String app, String streamId, String presetSsrc, boolean ssrcCheck, - boolean isPlayback, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) { - if (mediaServer == null || mediaServer.getId() == null) { - log.info("[openRTPServer] 失败, mediaServer == null || mediaServer.getId() == null"); - return null; - } - // 获取mediaServer可用的ssrc - String ssrc; - if (presetSsrc != null) { - ssrc = presetSsrc; - }else { - if (isPlayback) { - ssrc = ssrcFactory.getPlayBackSsrc(mediaServer.getId()); - }else { - ssrc = ssrcFactory.getPlaySsrc(mediaServer.getId()); - } - } - - if (streamId == null) { - streamId = String.format("%08x", Long.parseLong(ssrc)).toUpperCase(); - } - if (ssrcCheck && tcpMode > 0) { - // 目前zlm不支持 tcp模式更新ssrc,暂时关闭ssrc校验 - log.warn("[openRTPServer] 平台对接时下级可能自定义ssrc,但是tcp模式zlm收流目前无法更新ssrc,可能收流超时,此时请使用udp收流或者关闭ssrc校验"); - } - int rtpServerPort; - if (mediaServer.isRtpEnable()) { - IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); - if (mediaNodeServerService == null) { - log.info("[openRTPServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); - return null; - } - rtpServerPort = mediaNodeServerService.createRTPServer(mediaServer, app, streamId, ssrcCheck ? Long.parseLong(ssrc) : 0, port, onlyAuto, disableAudio, reUsePort, tcpMode); - } else { - rtpServerPort = mediaServer.getRtpProxyPort(); - } - return new SSRCInfo(rtpServerPort, ssrc, app, streamId); - } - @Override public int createRTPServer(MediaServer mediaServer, String app, String streamId, long ssrc, Integer port, boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode) { int rtpServerPort; @@ -219,27 +178,6 @@ public class MediaServerServiceImpl implements IMediaServerService { return rtpServerPort; } -// @Override -// public SSRCInfo openJTTServer(MediaServer mediaServer, @NotNull String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode) { -// if (mediaServer == null || mediaServer.getId() == null) { -// log.info("[openJTTServer] 失败, mediaServer == null || mediaServer.getId() == null"); -// return null; -// } -// -// int rtpServerPort; -// if (mediaServer.isRtpEnable()) { -// IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); -// if (mediaNodeServerService == null) { -// log.info("[openJTTServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); -// return null; -// } -// rtpServerPort = mediaNodeServerService.createJTTServer(mediaServer, streamId, port, disableVideo, disableAudio, tcpMode); -// } else { -// rtpServerPort = mediaServer.getJttProxyPort(); -// } -// return new SSRCInfo(rtpServerPort, null, "1078", streamId, null); -// } - @Override public List listRtpServer(MediaServer mediaServer) { IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); @@ -277,37 +215,6 @@ public class MediaServerServiceImpl implements IMediaServerService { mediaNodeServerService.closeRtpServer(mediaServer, app, streamId, callback); } - @Override - public void closeRTPServer(String mediaServerId, String app, String streamId) { - MediaServer mediaServer = this.getOne(mediaServerId); - if (mediaServer == null) { - return; - } - if (mediaServer.isRtpEnable()) { - closeRTPServer(mediaServer, app, streamId); - } - IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); - if (mediaNodeServerService == null) { - log.info("[closeRTPServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); - return; - } - mediaNodeServerService.closeStreams(mediaServer, app, streamId); - } - -// @Override -// public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback callback) { -// if (mediaServer == null) { -// callback.run(false); -// return; -// } -// IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); -// if (mediaNodeServerService == null) { -// log.info("[closeJTTServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); -// return; -// } -// mediaNodeServerService.closeJTTServer(mediaServer, streamId, callback); -// } - @Override public Boolean updateRtpServerSSRC(MediaServer mediaServer, String app, String streamId, String ssrc) { if (mediaServer == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java index e1142c535..7f74e4795 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java @@ -5,6 +5,7 @@ import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.enums.MediaApp; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; @@ -679,7 +680,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { streamInfoResult.setMediaInfo(mediaInfo); - if (!"broadcast".equalsIgnoreCase(app) && !ObjectUtils.isEmpty(mediaServer.getTranscodeSuffix()) && !"null".equalsIgnoreCase(mediaServer.getTranscodeSuffix())) { + if (!MediaApp.GB28181_BROADCAST.equalsIgnoreCase(app) && !ObjectUtils.isEmpty(mediaServer.getTranscodeSuffix()) && !"null".equalsIgnoreCase(mediaServer.getTranscodeSuffix())) { String newStream = stream + "_" + mediaServer.getTranscodeSuffix(); mediaServer.setTranscodeSuffix(null); StreamInfo transcodeStreamInfo = getStreamInfoByAppAndStream(mediaServer, app, newStream, null, addr, callId, isPlay); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java index 335d2e5df..b5f3c8ac7 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java @@ -17,4 +17,6 @@ public interface IReceiveRtpServerService { int openRTPServer(RTPServerParam rtpServerParam, ErrorCallback callback); void closeRTPServer(MediaServer mediaServer, String app, String stream); + + void closeRTPServerByMediaServerId(String mediaServerId, String app, String stream); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index 7cb75dcdc..9a2e39aba 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -97,7 +97,7 @@ public class MediaServiceImpl implements IMediaService { public ResultForOnPublish authenticatePublish(MediaServer mediaServer, String app, String stream, String params) { // 推流鉴权的处理 if (!MediaApp.GB28181.equals(app) && !MediaApp.JT1078.equals(app) ) { - if ("talk".equals(app) && stream.endsWith("_talk")) { + if (MediaApp.GB28181_TALK.equals(app) && stream.endsWith("_talk")) { ResultForOnPublish result = new ResultForOnPublish(); result.setEnable_mp4(false); result.setEnable_audio(true); @@ -215,10 +215,10 @@ public class MediaServiceImpl implements IMediaService { result.setEnable_audio(true); } } - } else if (app.equals("broadcast")) { + } else if (app.equals(MediaApp.GB28181_BROADCAST)) { result.setEnable_audio(true); result.setEnable_mp4(userSetting.getRecordSip()); - } else if (app.equals("talk")) { + } else if (app.equals(MediaApp.GB28181_TALK)) { result.setEnable_audio(true); result.setEnable_mp4(userSetting.getRecordSip()); }else { @@ -274,7 +274,7 @@ public class MediaServiceImpl implements IMediaService { }else { return false; } - }else if ("talk".equals(app) || "broadcast".equals(app)) { + }else if (MediaApp.GB28181_TALK.equals(app) || MediaApp.GB28181_BROADCAST.equals(app)) { return false; } else if ("mp4_record".equals(app)) { return true; diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java index f7fbb83bd..991e406ec 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java @@ -114,7 +114,9 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { if (presetSSRC == null) { ssrcFactory.releaseSsrc(mediaServer.getId(), ssrc); } - callback.run(code, msg, null); + OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult(); + openRTPServerResult.setSsrcInfo(ssrcInfo); + callback.run(code, msg, openRTPServerResult); } })); ssrcInfo.setPort(rtpServerPort); @@ -177,4 +179,13 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { } mediaServerService.closeRTPServer(mediaServer, app, stream); } + + @Override + public void closeRTPServerByMediaServerId(String mediaServerId, String app, String stream) { + MediaServer mediaServer = mediaServerService.getOne(mediaServerId); + if (mediaServer == null) { + return; + } + closeRTPServer(mediaServer, app, stream); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java index 76f163236..1ba4f56fa 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java @@ -104,7 +104,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaNotFoundEvent event) { - if (MediaApp.GB28181.equals(event.getApp())) { + if (MediaApp.isKeywords(event.getApp())) { return; } // 拉流代理 diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java index 60c5555f8..77a6d5ebf 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.streamPush.service.impl; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.enums.MediaApp; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; @@ -101,7 +102,7 @@ public class StreamPushServiceImpl implements IStreamPushService { updatePushStatus(streamPushInDb); } // 冗余数据,自己系统中自用 - if (!"broadcast".equals(event.getApp()) && !"talk".equals(event.getApp())) { + if (!MediaApp.GB28181_BROADCAST.equals(event.getApp()) && !MediaApp.GB28181_TALK.equals(event.getApp())) { redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event.getMediaInfo()); } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java index d05111d69..e406a49eb 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java @@ -12,7 +12,10 @@ import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IReceiveRtpServerService; import com.genersoft.iot.vmp.service.ISendRtpServerService; +import com.genersoft.iot.vmp.service.bean.InviteErrorCode; +import com.genersoft.iot.vmp.service.bean.RTPServerParam; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; @@ -49,6 +52,9 @@ public class PsController { @Autowired private ISendRtpServerService sendRtpServerService; + @Autowired + private IReceiveRtpServerService receiveRtpServerService; + @Autowired private UserSetting userSetting; @@ -93,37 +99,44 @@ public class PsController { } } String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + callId + "_" + stream; - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, MediaApp.GB28181, stream, ssrcInt + "", false, false, null, false, false, false, tcpMode); - if (ssrcInfo.getPort() == 0) { + RTPServerParam rtpServerParam = new RTPServerParam(); + rtpServerParam.setMediaServer(mediaServer); + rtpServerParam.setApp(MediaApp.GB28181); + rtpServerParam.setStreamId(stream); + rtpServerParam.setSsrc(ssrcInt); + rtpServerParam.setTcpMode(tcpMode); + + int rtpServerPort = receiveRtpServerService.openRTPServer(rtpServerParam, ((code, msg, data) -> { + if (callBack == null) { + return; + } + if (code == InviteErrorCode.SUCCESS.getCode()) { + log.info("[第三方PS服务对接->开启收流和获取发流信息] 成功回调,callId->{}, data->{}", callId, data); + // 将信息写入redis中,以备后用 + redisTemplate.delete(receiveKey); + OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder(); + OkHttpClient client = httpClientBuilder.build(); + String url = callBack + "?callId=" + callId; + Request request = new Request.Builder().get().url(url).build(); + try { + client.newCall(request).execute(); + } catch (IOException e) { + log.error("[第三方PS服务对接->开启收流和获取发流信息] 成功回调 callId->{}, 发送回调失败", callId, e); + } + } else { + log.info("[第三方PS服务对接->开启收流和获取发流信息] 失败回调,callId->{}, code->{}, msg->{}", callId, code, msg); + // 将信息写入redis中,以备后用 + redisTemplate.delete(receiveKey); + } + })); + + if (rtpServerPort == 0) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败"); } - // 注册回调如果rtp收流超时则通过回调发送通知 - if (callBack != null) { - Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, MediaApp.GB28181, stream, mediaServer.getId()); - // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 - hookSubscribe.addSubscribe(hook, - (hookData)->{ - if (stream.equals(hookData.getStream())) { - log.info("[第三方PS服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId); - // 将信息写入redis中,以备后用 - redisTemplate.delete(receiveKey); - OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder(); - OkHttpClient client = httpClientBuilder.build(); - String url = callBack + "?callId=" + callId; - Request request = new Request.Builder().get().url(url).build(); - try { - client.newCall(request).execute(); - } catch (IOException e) { - log.error("[第三方PS服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e); - } - hookSubscribe.removeSubscribe(hook); - } - }); - } OtherPsSendInfo otherPsSendInfo = new OtherPsSendInfo(); otherPsSendInfo.setReceiveIp(mediaServer.getSdpIp()); - otherPsSendInfo.setReceivePort(ssrcInfo.getPort()); + otherPsSendInfo.setReceivePort(rtpServerPort); otherPsSendInfo.setCallId(callId); otherPsSendInfo.setStream(stream); @@ -150,7 +163,7 @@ public class PsController { public void closeRtpServer(String stream) { log.info("[第三方PS服务对接->关闭收流] stream->{}", stream); MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer(); - mediaServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream); + receiveRtpServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream); String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_*_" + stream; List scan = RedisUtil.scan(redisTemplate, receiveKey); if (!scan.isEmpty()) { diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java index 6f3793564..58b81ccab 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java @@ -12,7 +12,10 @@ import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IReceiveRtpServerService; import com.genersoft.iot.vmp.service.ISendRtpServerService; +import com.genersoft.iot.vmp.service.bean.InviteErrorCode; +import com.genersoft.iot.vmp.service.bean.RTPServerParam; import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; @@ -44,6 +47,9 @@ public class RtpController { @Autowired private ISendRtpServerService sendRtpServerService; + @Autowired + private IReceiveRtpServerService receiveRtpServerService; + @Autowired private HookSubscribe hookSubscribe; @@ -93,37 +99,52 @@ public class RtpController { } } String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + callId + "_" + stream; - SSRCInfo ssrcInfoForVideo = mediaServerService.openRTPServer(mediaServer, MediaApp.GB28181, stream, ssrcInt + "",false,false, null, false, false, false, tcpMode); - SSRCInfo ssrcInfoForAudio = mediaServerService.openRTPServer(mediaServer, MediaApp.GB28181,stream + "_a", ssrcInt + "", false, false, null, false,false,false, tcpMode); - if (ssrcInfoForVideo.getPort() == 0 || ssrcInfoForAudio.getPort() == 0) { + + RTPServerParam rtpServerParam = new RTPServerParam(); + rtpServerParam.setMediaServer(mediaServer); + rtpServerParam.setApp(MediaApp.GB28181); + rtpServerParam.setStreamId(stream); + rtpServerParam.setSsrc(ssrcInt); + rtpServerParam.setTcpMode(tcpMode); + + + int rtpServerPortForVideo = receiveRtpServerService.openRTPServer(rtpServerParam, ((code, msg, data) -> { + if (callBack == null) { + return; + } + if (code == InviteErrorCode.SUCCESS.getCode()) { + log.info("[开启收流和获取发流信息] 视频流收流成功,callId->{},stream->{}", callId, stream); + OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder(); + OkHttpClient client = httpClientBuilder.build(); + String url = callBack + "?callId=" + callId; + Request request = new Request.Builder().get().url(url).build(); + try { + client.newCall(request).execute(); + } catch (IOException e) { + log.error("[第三方服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e); + } + }else { + log.info("[开启收流和获取发流信息] 视频流收流失败,callId->{},stream->{}", callId, stream); + } + })); + + rtpServerParam.setStreamId(stream + "_a"); + + int rtpServerPortForAudio = receiveRtpServerService.openRTPServer(rtpServerParam, ((code, msg, data) -> { + if (code == InviteErrorCode.SUCCESS.getCode()) { + log.info("[开启收流和获取发流信息] 音频流收流成功,callId->{},stream->{}", callId, stream); + }else { + log.info("[开启收流和获取发流信息] 音频流收流失败,callId->{},stream->{}", callId, stream); + } + })); + if (rtpServerPortForVideo == 0 || rtpServerPortForAudio == 0) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败"); } - // 注册回调如果rtp收流超时则通过回调发送通知 - if (callBack != null) { - Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, MediaApp.GB28181, stream, mediaServer.getId()); - // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 - hookSubscribe.addSubscribe(hook, - (hookData)->{ - if (stream.equals(hookData.getStream())) { - log.info("[开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId); - OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder(); - OkHttpClient client = httpClientBuilder.build(); - String url = callBack + "?callId=" + callId; - Request request = new Request.Builder().get().url(url).build(); - try { - client.newCall(request).execute(); - } catch (IOException e) { - log.error("[第三方服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e); - } - hookSubscribe.removeSubscribe(hook); - } - }); - } String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId; OtherRtpSendInfo otherRtpSendInfo = new OtherRtpSendInfo(); otherRtpSendInfo.setReceiveIp(mediaServer.getSdpIp()); - otherRtpSendInfo.setReceivePortForVideo(ssrcInfoForVideo.getPort()); - otherRtpSendInfo.setReceivePortForAudio(ssrcInfoForAudio.getPort()); + otherRtpSendInfo.setReceivePortForVideo(rtpServerPortForVideo); + otherRtpSendInfo.setReceivePortForAudio(rtpServerPortForAudio); otherRtpSendInfo.setCallId(callId); otherRtpSendInfo.setStream(stream); @@ -153,8 +174,8 @@ public class RtpController { public void closeRtpServer(String stream) { log.info("[第三方服务对接->关闭收流] stream->{}", stream); MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer(); - mediaServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream); - mediaServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream+ "_a"); + receiveRtpServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream); + receiveRtpServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream+ "_a"); String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_*_" + stream; List scan = RedisUtil.scan(redisTemplate, receiveKey); if (scan.size() > 0) {