diff --git a/src/main/java/com/genersoft/iot/vmp/common/enums/MediaApp.java b/src/main/java/com/genersoft/iot/vmp/common/enums/MediaApp.java new file mode 100644 index 000000000..6c853ad07 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/common/enums/MediaApp.java @@ -0,0 +1,8 @@ +package com.genersoft.iot.vmp.common.enums; + +public class MediaApp { + public final static String GB28181 = "rtp"; + public final static String JT1078 = "1078"; + + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java index 0cfc21cbb..756f20d89 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SendRtpInfo.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.bean; import com.genersoft.iot.vmp.common.enums.ChannelDataType; +import com.genersoft.iot.vmp.common.enums.MediaApp; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg; import lombok.Data; @@ -199,7 +200,7 @@ public class SendRtpInfo { sendRtpItem.setChannelId(channelId); sendRtpItem.setTcp(isTcp); sendRtpItem.setRtcp(rtcp); - sendRtpItem.setApp("rtp"); + sendRtpItem.setApp(MediaApp.GB28181); sendRtpItem.setLocalPort(localPort); sendRtpItem.setServerId(serverId); sendRtpItem.setMediaServerId(mediaServer.getId()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index eb0f64318..8b318d43a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -248,7 +248,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) { for (SsrcTransaction ssrcTransaction : ssrcTransactions) { mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); - mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream()); + mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getApp(), ssrcTransaction.getStream()); sessionManager.removeByCallId(ssrcTransaction.getCallId()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java index c3c768072..5a15fb218 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.service.impl; import com.alibaba.fastjson2.JSON; import com.genersoft.iot.vmp.common.*; +import com.genersoft.iot.vmp.common.enums.MediaApp; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; @@ -49,7 +50,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService { @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaDepartureEvent event) { - if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) { + if ("rtsp".equals(event.getSchema()) && MediaApp.GB28181.equals(event.getApp())) { InviteInfo inviteInfo = getInviteInfoByStream(null, event.getStream()); if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) { removeInviteInfo(inviteInfo); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index a29866707..8a5f7ae2d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.service.impl; import com.genersoft.iot.vmp.common.*; +import com.genersoft.iot.vmp.common.enums.MediaApp; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; @@ -618,7 +619,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner } else { tcpMode = 0; } - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, ssrcCheck, false, null, true, false, false, tcpMode); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, MediaApp.GB28181, streamId, null, ssrcCheck, false, null, true, false, false, tcpMode); if (ssrcInfo == null || ssrcInfo.getPort() < 0) { log.info("[国标级联] 发起语音喊话 开启端口监听失败, platform: {}, channel: {}", platform.getServerGBId(), channel.getGbDeviceId()); SipSubscribe.EventResult eventResult = new SipSubscribe.EventResult<>(); @@ -650,7 +651,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner } finally { timeoutCallback.run(1, "收流超时"); mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream()); sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); } } @@ -724,7 +725,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner log.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse); + Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse); if (!result) { try { log.warn("[Invite 200OK] 更新ssrc失败,停止喊话 {}/{}", platform.getServerGBId(), channel.getGbDeviceId()); @@ -826,12 +827,12 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner } log.info("[TCP主动连接对方] serverGbId: {}, channelId: {}, 连接对方的地址:{}:{}, SSRC: {}, SSRC校验:{}", platform.getServerGBId(), channel.getGbDeviceId(), sdp.getConnection().getAddress(), port, ssrcInfo.getSsrc(), ssrcCheck); - Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream()); + Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getApp(), ssrcInfo.getStream()); log.info("[TCP主动连接对方] 结果: {}", result); } catch (SdpException e) { log.error("[TCP主动连接对方] serverGbId: {}, channelId: {}, 解析200OK的SDP信息失败", platform.getServerGBId(), channel.getGbDeviceId(), e); dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream()); // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); @@ -855,7 +856,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { log.warn("[消息发送失败] 停止语音对讲, 平台:{},通道:{}", platform.getId(), channel.getGbDeviceId() ); } finally { - mediaServerService.closeRTPServer(mediaServerItem, stream); + mediaServerService.closeRTPServer(mediaServerItem, app, stream); InviteInfo inviteInfo = inviteStreamService.getInviteInfo(null, channel.getGbId(), stream); if (inviteInfo != null) { // 释放ssrc diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index f564af05a..7291d06b7 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.service.impl; import com.genersoft.iot.vmp.common.*; +import com.genersoft.iot.vmp.common.enums.MediaApp; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; @@ -231,7 +232,7 @@ public class PlayServiceImpl implements IPlayService { } } } - }else if ("rtp".equals(event.getApp())) { + }else if (MediaApp.GB28181.equals(event.getApp())) { // 释放ssrc InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, event.getStream()); if (inviteInfo != null && inviteInfo.getStatus() == InviteSessionStatus.ok @@ -249,7 +250,7 @@ public class PlayServiceImpl implements IPlayService { @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaNotFoundEvent event) { - if (!"rtp".equals(event.getApp())) { + if (!MediaApp.GB28181.equals(event.getApp())) { return; } String[] s = event.getStream().split("_"); @@ -324,9 +325,9 @@ public class PlayServiceImpl implements IPlayService { return play(mediaServerItem, device, channel, ssrc, userSetting.getRecordSip(), callback); } - private SSRCInfo play(MediaServer mediaServerItem, Device device, DeviceChannel channel, String ssrc, Boolean record, + private SSRCInfo play(MediaServer mediaServer, Device device, DeviceChannel channel, String ssrc, Boolean record, ErrorCallback callback) { - if (mediaServerItem == null ) { + if (mediaServer == null ) { if (callback != null) { callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(), InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(), @@ -340,7 +341,7 @@ public class PlayServiceImpl implements IPlayService { if (inviteInfoInCatch.getStreamInfo() == null) { // 释放生成的ssrc,使用上一次申请的 - ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc); + ssrcFactory.releaseSsrc(mediaServer.getId(), ssrc); // 点播发起了但是尚未成功, 仅注册回调等待结果即可 inviteStreamService.once(InviteSessionType.PLAY, channel.getId(), null, callback); log.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId({}): {}", device.getDeviceId(), channel.getDeviceId(), channel.getId()); @@ -357,7 +358,7 @@ public class PlayServiceImpl implements IPlayService { return inviteInfoInCatch.getSsrcInfo(); } MediaServer mediaInfo = streamInfo.getMediaServer(); - Boolean ready = mediaServerService.isStreamReady(mediaInfo, "rtp", streamId); + Boolean ready = mediaServerService.isStreamReady(mediaInfo, MediaApp.GB28181, streamId); if (ready != null && ready) { if(callback != null) { callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo); @@ -377,20 +378,30 @@ public class PlayServiceImpl implements IPlayService { } } + // 获取mediaServer可用的ssrc + final String finalSsrc; + if (ssrc != null) { + finalSsrc = ssrc; + }else { + finalSsrc = ssrcFactory.getPlaySsrc(mediaServer.getId()); + } + String streamId = String.format("%s_%s", device.getDeviceId(), channel.getDeviceId()); int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0); RTPServerParam rtpServerParam = new RTPServerParam(); - rtpServerParam.setMediaServerItem(mediaServerItem); + rtpServerParam.setMediaServer(mediaServer); + rtpServerParam.setApp(MediaApp.GB28181); rtpServerParam.setStreamId(streamId); - rtpServerParam.setPresetSsrc(ssrc); - rtpServerParam.setSsrcCheck(device.isSsrcCheck()); - rtpServerParam.setPlayback(false); + if (device.isSsrcCheck()) { + rtpServerParam.setSsrc(ssrc); + } rtpServerParam.setPort(0); rtpServerParam.setTcpMode(tcpMode); rtpServerParam.setOnlyAuto(false); rtpServerParam.setDisableAudio(!channel.isHasAudio()); - SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, result) -> { + + int port = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, result) -> { if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) { // hook响应 @@ -422,14 +433,14 @@ public class PlayServiceImpl implements IPlayService { } inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null, code, msg, null); inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId()); - SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream("rtp", streamId); + SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(MediaApp.GB28181, streamId); if (ssrcTransaction != null) { try { - cmder.streamByeCmd(device, channel.getDeviceId(),"rtp", streamId, null, null); + cmder.streamByeCmd(device, channel.getDeviceId(),MediaApp.GB28181, streamId, null, null); } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { log.error("[点播超时], 发送BYE失败 {}", e.getMessage()); } finally { - sessionManager.removeByStream("rtp", streamId); + sessionManager.removeByStream(MediaApp.GB28181, streamId); } } } @@ -448,8 +459,8 @@ public class PlayServiceImpl implements IPlayService { ssrcInfo.getSsrc(), device.isSsrcCheck()); // 初始化redis中的invite消息状态 - InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServerItem.getId(), - mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY, + InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServer.getId(), + mediaServer.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY, InviteSessionStatus.ready, userSetting.getRecordSip()); if (record != null) { inviteInfo.setRecord(record); @@ -460,12 +471,12 @@ public class PlayServiceImpl implements IPlayService { inviteStreamService.updateInviteInfo(inviteInfo); try { - cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channel, (eventResult) -> { + cmder.playStreamCmd(mediaServer, ssrcInfo, device, channel, (eventResult) -> { // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题 - InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel, callback, inviteInfo, InviteSessionType.PLAY); + InviteOKHandler(eventResult, ssrcInfo, mediaServer, device, channel, callback, inviteInfo, InviteSessionType.PLAY); }, (event) -> { log.info("[点播失败]{}:{} deviceId: {}, channelId:{}",event.statusCode, event.msg, device.getDeviceId(), channel.getDeviceId()); - receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo); + receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo); sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); if (callback != null) { @@ -478,7 +489,7 @@ public class PlayServiceImpl implements IPlayService { }, userSetting.getPlayTimeout().longValue()); } catch (InvalidArgumentException | SipException | ParseException e) { log.error("[命令发送失败] 点播消息: {}", e.getMessage()); - receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo); + receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo); sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); if (callback != null) { callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(), @@ -595,7 +606,7 @@ public class PlayServiceImpl implements IPlayService { }, (event) -> { dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream()); // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc()); sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream()); @@ -605,7 +616,7 @@ public class PlayServiceImpl implements IPlayService { log.error("[命令发送失败] 对讲消息: {}", e.getMessage()); dynamicTask.stop(timeOutTaskKey); - mediaServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream()); // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc()); @@ -647,7 +658,7 @@ public class PlayServiceImpl implements IPlayService { } } log.info("[TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验:{}", device.getDeviceId(), channel.getDeviceId(), sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck()); - Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream()); + Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getApp(), ssrcInfo.getStream()); log.info("[TCP主动连接对方] 结果: {}" , result); if (!result) { // 主动连接失败,结束流程, 清理数据 @@ -686,7 +697,7 @@ public class PlayServiceImpl implements IPlayService { String fileName = deviceId + "_" + channelId + ".jpg"; // 请求截图 log.info("[请求截图]: " + fileName); - mediaServerService.getSnap(mediaServerItemInuse, "rtp", stream, 15, 1, path, fileName); + mediaServerService.getSnap(mediaServerItemInuse, MediaApp.GB28181, stream, 15, 1, path, fileName); } public StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, MediaInfo mediaInfo, Device device, DeviceChannel channel) { @@ -779,7 +790,8 @@ public class PlayServiceImpl implements IPlayService { int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0); RTPServerParam rtpServerParam = new RTPServerParam(); - rtpServerParam.setMediaServerItem(mediaServerItem); + rtpServerParam.setMediaServer(mediaServerItem); + rtpServerParam.setApp(MediaApp.GB28181); rtpServerParam.setStreamId(stream); rtpServerParam.setSsrcCheck(device.isSsrcCheck()); rtpServerParam.setPlayback(true); @@ -805,14 +817,14 @@ public class PlayServiceImpl implements IPlayService { } inviteStreamService.call(InviteSessionType.PLAYBACK, channel.getId(), null, code, msg, null); inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAYBACK, channel.getId()); - SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream("rtp", stream); + SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(MediaApp.GB28181, stream); if (ssrcTransaction != null) { try { - cmder.streamByeCmd(device, channel.getDeviceId(),"rtp", stream, null, null); + cmder.streamByeCmd(device, channel.getDeviceId(),MediaApp.GB28181, stream, null, null); } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { log.error("[录像回放] 发送BYE失败 {}", e.getMessage()); } finally { - sessionManager.removeByStream("rtp", stream); + sessionManager.removeByStream(MediaApp.GB28181, stream); } } } @@ -902,7 +914,7 @@ public class PlayServiceImpl implements IPlayService { log.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse); // 释放ssrc mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc()); - Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse); + Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse); if (!result) { try { log.warn("[Invite 200OK] 更新ssrc失败,停止点播 {}/{}", device.getDeviceId(), channel.getDeviceId()); @@ -940,14 +952,14 @@ public class PlayServiceImpl implements IPlayService { if (ssrcInResponse != null) { // 单端口 // 重新订阅流上线 - SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream("rtp", inviteInfo.getStream()); - sessionManager.removeByStream("rtp", inviteInfo.getStream()); + SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(MediaApp.GB28181, inviteInfo.getStream()); + sessionManager.removeByStream(MediaApp.GB28181, inviteInfo.getStream()); inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse); ssrcTransaction.setDeviceId(device.getDeviceId()); ssrcTransaction.setChannelId(ssrcTransaction.getChannelId()); ssrcTransaction.setCallId(ssrcTransaction.getCallId()); ssrcTransaction.setSsrc(ssrcInResponse); - ssrcTransaction.setApp("rtp"); + ssrcTransaction.setApp(MediaApp.GB28181); ssrcTransaction.setStream(inviteInfo.getStream()); ssrcTransaction.setMediaServerId(mediaServerItem.getId()); ssrcTransaction.setSipTransactionInfo(new SipTransactionInfo((SIPResponse) responseEvent.getResponse())); @@ -990,7 +1002,7 @@ public class PlayServiceImpl implements IPlayService { int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0); // 录像下载不使用固定流地址,固定流地址会导致如果开始时间与结束时间一致时文件错误的叠加在一起 RTPServerParam rtpServerParam = new RTPServerParam(); - rtpServerParam.setMediaServerItem(mediaServer); + rtpServerParam.setMediaServer(mediaServer); rtpServerParam.setSsrcCheck(device.isSsrcCheck()); rtpServerParam.setPlayback(true); rtpServerParam.setPort(0); @@ -1081,7 +1093,7 @@ public class PlayServiceImpl implements IPlayService { inviteStreamService.updateInviteInfo(inviteInfoForNew, 60*15L); } }; - Hook hook = Hook.getInstance(HookType.on_record_mp4, "rtp", ssrcInfo.getStream(), mediaServer.getId()); + Hook hook = Hook.getInstance(HookType.on_record_mp4, MediaApp.GB28181, ssrcInfo.getStream(), mediaServer.getId()); // 设置过期时间,下载失败时自动处理订阅数据 hook.setExpireTime(System.currentTimeMillis() + 24 * 60 * 60 * 1000); subscribe.addSubscribe(hook, hookEventForRecord); @@ -1101,7 +1113,7 @@ public class PlayServiceImpl implements IPlayService { InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, channel.getId(), stream); if (inviteInfo == null) { - String app = "rtp"; + String app = MediaApp.GB28181; StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream); if (streamAuthorityInfo != null) { List allList = cloudRecordService.getAllList(null, app, stream, null, null, null, streamAuthorityInfo.getCallId(), null); @@ -1143,7 +1155,7 @@ public class PlayServiceImpl implements IPlayService { log.warn("[获取下载进度] 查询录像信息时发现节点不存在"); return null; } - String app = "rtp"; + String app = MediaApp.GB28181; Long duration = mediaServerService.updateDownloadProcess(mediaServerItem, app, stream); if (duration == null || duration == 0) { inviteInfo.getStreamInfo().setProgress(0); @@ -1186,7 +1198,7 @@ public class PlayServiceImpl implements IPlayService { public StreamInfo onPublishHandler(MediaServer mediaServerItem, MediaInfo mediaInfo, Device device, DeviceChannel channel) { - StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", mediaInfo.getStream(), mediaInfo, null); + StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, MediaApp.GB28181, mediaInfo.getStream(), mediaInfo, null); streamInfo.setDeviceId(device.getDeviceId()); streamInfo.setChannelId(channel.getId()); return streamInfo; @@ -1557,7 +1569,7 @@ public class PlayServiceImpl implements IPlayService { SendRtpInfo sendRtpInfo = sendRtpServerService.queryByChannelId(channel.getId(), device.getDeviceId()); if (sendRtpInfo != null) { MediaServer mediaServer = mediaServerService.getOne(sendRtpInfo.getMediaServerId()); - Boolean streamReady = mediaServerService.isStreamReady(mediaServer, "rtp", sendRtpInfo.getReceiveStream()); + Boolean streamReady = mediaServerService.isStreamReady(mediaServer, MediaApp.GB28181, sendRtpInfo.getReceiveStream()); if (streamReady) { log.warn("[语音对讲] 进行中: {}", channel.getDeviceId()); event.call("语音对讲进行中"); @@ -1634,7 +1646,7 @@ public class PlayServiceImpl implements IPlayService { String path = "snap"; // 请求截图 log.info("[请求截图]: " + fileName); - mediaServerService.getSnap(mediaServer, "rtp", inviteInfo.getStreamInfo().getStream(), 15, 1, path, fileName); + mediaServerService.getSnap(mediaServer, MediaApp.GB28181, inviteInfo.getStreamInfo().getStream(), 15, 1, path, fileName); File snapFile = new File(path + File.separator + fileName); if (snapFile.exists()) { errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), snapFile.getAbsoluteFile()); @@ -1676,7 +1688,7 @@ public class PlayServiceImpl implements IPlayService { if (InviteSessionStatus.ok == inviteInfo.getStatus()) { try { log.info("[停止点播/回放/下载] {}/{}", device.getDeviceId(), channel.getDeviceId()); - cmder.streamByeCmd(device, channel.getDeviceId(), "rtp", inviteInfo.getStream(), null, null); + cmder.streamByeCmd(device, channel.getDeviceId(), MediaApp.GB28181, inviteInfo.getStream(), null, null); } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { log.error("[命令发送失败] 停止点播/回放/下载, 发送BYE: {}", e.getMessage()); throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage()); @@ -1709,7 +1721,7 @@ public class PlayServiceImpl implements IPlayService { if (InviteSessionStatus.ok == inviteInfo.getStatus()) { try { log.info("[停止点播/回放/下载] {}/{}", device.getDeviceId(), channel.getDeviceId()); - cmder.streamByeCmd(device, channel.getDeviceId(), "rtp", inviteInfo.getStream(), null, null); + cmder.streamByeCmd(device, channel.getDeviceId(), MediaApp.GB28181, inviteInfo.getStream(), null, null); } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { log.warn("[命令发送失败] 停止点播/回放/下载, 发送BYE: {}", e.getMessage()); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java index 1151db77d..e87f2f0ba 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommander.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.enums.MediaApp; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; @@ -505,7 +506,7 @@ public class SIPCommander implements ISIPCommander { } log.info("[语音喊话] {} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), sendRtpItem.getPort()); - Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServerItem.getId()); + Hook hook = Hook.getInstance(HookType.on_media_arrival, MediaApp.GB28181, stream, mediaServerItem.getId()); subscribe.addSubscribe(hook, (hookData) -> { if (event != null) { event.response(hookData); @@ -515,7 +516,7 @@ public class SIPCommander implements ISIPCommander { CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport()); callIdHeader.setCallId(callId); - Hook publishHook = Hook.getInstance(HookType.on_publish, "rtp", stream, mediaServerItem.getId()); + Hook publishHook = Hook.getInstance(HookType.on_publish, MediaApp.GB28181, stream, mediaServerItem.getId()); subscribe.addSubscribe(publishHook, (hookData) -> { if (eventForPush != null) { eventForPush.response(hookData); @@ -1388,7 +1389,7 @@ public class SIPCommander implements ISIPCommander { @Override public void playbackControlCmd(Device device, DeviceChannel channel, String stream, String content, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws SipException, InvalidArgumentException, ParseException { - SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream("rtp", stream); + SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(MediaApp.GB28181, stream); if (ssrcTransaction == null) { log.info("[回放控制]未找到视频流信息,设备:{}, 流ID: {}", device.getDeviceId(), stream); return; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java index c51646a3c..78eb04d0d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/cmd/impl/SIPCommanderForPlatform.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl; import com.alibaba.fastjson2.JSON; import com.genersoft.iot.vmp.common.InviteSessionType; +import com.genersoft.iot.vmp.common.enums.MediaApp; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; @@ -641,7 +642,7 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform { MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId); if (mediaServerItem != null) { mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc()); - mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream()); + mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream()); } SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem, channel); if (byeRequest == null) { @@ -664,7 +665,7 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform { } mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc()); - mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream()); + mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getApp(), ssrcTransaction.getStream()); sessionManager.removeByStream(ssrcTransaction.getApp(), ssrcTransaction.getStream()); Request byteRequest = headerProviderPlatformProvider.createByteRequest(platform, channel.getGbDeviceId(), ssrcTransaction.getSipTransactionInfo()); @@ -705,7 +706,7 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform { } log.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort()); - Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServerItem.getId()); + Hook hook = Hook.getInstance(HookType.on_media_arrival, MediaApp.GB28181, stream, mediaServerItem.getId()); subscribe.addSubscribe(hook, (hookData) -> { if (event != null) { event.response(hookData); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java index bacf50456..ea0284596 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/ByeRequestProcessor.java @@ -228,7 +228,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In deviceChannelService.stopPlay(channel.getId()); inviteStreamService.removeInviteInfo(inviteInfo); if (inviteInfo.getStreamInfo() != null) { - mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getStreamInfo().getStream()); + mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream()); } } break; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java index 41fa636a0..33f20d4fd 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MediaStatusNotifyMessageHandler.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; +import com.genersoft.iot.vmp.common.enums.MediaApp; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.service.*; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; @@ -98,7 +99,7 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i playService.stop(inviteInfo); } // 去除监听流注销自动停止下载的监听 - Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcTransaction.getStream(), ssrcTransaction.getMediaServerId()); + Hook hook = Hook.getInstance(HookType.on_media_arrival, MediaApp.GB28181, ssrcTransaction.getStream(), ssrcTransaction.getMediaServerId()); subscribe.removeSubscribe(hook); if (ssrcTransaction.getPlatformId() != null) { // 如果级联播放,需要给上级发送此通知 TODO 多个上级同时观看一个下级 可能存在停错的问题,需要将点播CallId进行上下级绑定 diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java index f9096780b..9d14e57be 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java @@ -3,14 +3,13 @@ package com.genersoft.iot.vmp.jt1078.service.impl; import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.common.enums.MediaApp; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; -import com.genersoft.iot.vmp.conf.ftpServer.FtpSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.jt1078.bean.*; import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template; -import com.genersoft.iot.vmp.jt1078.event.FtpUploadEvent; import com.genersoft.iot.vmp.jt1078.proc.request.J1205; import com.genersoft.iot.vmp.jt1078.proc.response.*; import com.genersoft.iot.vmp.jt1078.service.Ijt1078PlayService; @@ -26,11 +25,12 @@ import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent; import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent; import com.genersoft.iot.vmp.media.service.IMediaServerService; +import com.genersoft.iot.vmp.service.IReceiveRtpServerService; import com.genersoft.iot.vmp.service.ISendRtpServerService; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.InviteErrorCode; +import com.genersoft.iot.vmp.service.bean.RTPServerParam; import com.genersoft.iot.vmp.service.bean.SSRCInfo; -import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.MediaServerUtils; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; @@ -74,6 +74,9 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { @Autowired private IMediaServerService mediaServerService; + @Autowired + private IReceiveRtpServerService receiveRtpServerService; + @Autowired private DynamicTask dynamicTask; @@ -197,7 +200,6 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { private void play(JTDevice device, JTChannel channel, int type, CommonCallback> callback) { String phoneNumber = device.getPhoneNumber(); int channelId = channel.getChannelId(); - String app = "1078"; String stream = phoneNumber + "_" + channelId; // 检查流是否已经存在,存在则返回 String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId; @@ -208,7 +210,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { MediaServer mediaServer = streamInfo.getMediaServer(); if (mediaServer != null) { // 查询流是否存在,不存在则删除缓存数据 - MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, app, streamInfo.getStream()); + MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, MediaApp.JT1078, streamInfo.getStream()); if (mediaInfo != null) { log.info("[JT-点播] 点播已经存在,直接返回, phoneNumber: {}, channelId: {}", phoneNumber, channelId); for (CommonCallback> errorCallback : errorCallbacks) { @@ -233,48 +235,56 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { } return; } - // 设置hook监听 - Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServer.getId()); - subscribe.addSubscribe(hook, (hookData) -> { - dynamicTask.stop(playKey); - log.info("[JT-点播] 点播成功, 手机号: {}, 通道: {}", phoneNumber, channelId); - // TODO 发送9105 实时音视频传输状态通知, 通知丢包率 - StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId); - - for (CommonCallback> errorCallback : errorCallbacks) { - if (errorCallback == null) { - continue; - } - errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info)); - } - subscribe.removeSubscribe(hook); - redisTemplate.opsForValue().set(playKey, info); - // 截图 - String path = "snap"; - String fileName = phoneNumber + "_" + channelId + ".jpg"; - // 请求截图 - log.info("[请求截图]: " + fileName); - mediaServerService.getSnap(mediaServer, app, stream, 15, 1, path, fileName); - }); // 开启收流端口 - SSRCInfo ssrcInfo = mediaServerService.openJTTServer(mediaServer, stream, null, false, !channel.isHasAudio(), 1); + RTPServerParam rtpServerParam = new RTPServerParam(); + rtpServerParam.setMediaServer(mediaServer); + rtpServerParam.setApp(MediaApp.JT1078); + rtpServerParam.setStreamId(stream); + rtpServerParam.setSsrcCheck(false); + rtpServerParam.setPlayback(false); + rtpServerParam.setPort(0); + rtpServerParam.setTcpMode(1); // 1 表示tcp被动 + rtpServerParam.setOnlyAuto(false); + rtpServerParam.setDisableAudio(!channel.isHasAudio()); + + SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, result) -> { + + if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) { + // hook响应 + log.info("[JT-点播] 点播成功, 手机号: {}, 通道: {}", phoneNumber, channelId); + // TODO 发送9105 实时音视频传输状态通知, 通知丢包率 + StreamInfo info = onPublishHandler(mediaServer, result.getHookData(), phoneNumber, channelId); + + for (CommonCallback> errorCallback : errorCallbacks) { + if (errorCallback == null) { + continue; + } + errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info)); + } + redisTemplate.opsForValue().set(playKey, info); + // 截图 + String path = "snap"; + String fileName = phoneNumber + "_" + channelId + ".jpg"; + // 请求截图 + log.info("[请求截图]: " + fileName); + mediaServerService.getSnap(mediaServer, MediaApp.JT1078, stream, 15, 1, path, fileName); + }else { + if (callback != null) { + callback.run(WVPResult.fail(code, msg)); + } + log.info("[JT-点播] 超时, phoneNumber: {}, channelId: {}", phoneNumber, channelId); + for (CommonCallback> errorCallback : errorCallbacks) { + errorCallback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), + InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null)); + } + stopPlay(phoneNumber, channelId); + } + }); if (ssrcInfo == null) { stopPlay(phoneNumber, channelId); return; } - // 设置超时监听 - dynamicTask.startDelay(playKey, () -> { - log.info("[JT-点播] 超时, phoneNumber: {}, channelId: {}", phoneNumber, channelId); - for (CommonCallback> errorCallback : errorCallbacks) { - errorCallback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), - InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null)); - } - mediaServerService.closeJTTServer(mediaServer, stream, null); - subscribe.removeSubscribe(hook); - stopPlay(phoneNumber, channelId); - }, userSetting.getPlayTimeout()); - log.info("[JT-点播] phoneNumber: {}, channelId: {},IP: {}, 端口: {}", phoneNumber, channelId, mediaServer.getSdpIp(), ssrcInfo.getPort()); J9101 j9101 = new J9101(); j9101.setChannel(channelId); @@ -287,7 +297,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { } public StreamInfo onPublishHandler(MediaServer mediaServerItem, HookData hookData, String phoneNumber, Integer channelId) { - StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, "1078", hookData.getStream(), hookData.getMediaInfo(), null); + StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, MediaApp.JT1078, hookData.getStream(), hookData.getMediaInfo(), null); streamInfo.setDeviceId(phoneNumber); streamInfo.setChannelId(channelId); return streamInfo; @@ -317,7 +327,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { // 删除缓存数据 if (streamInfo != null) { // 关闭rtpServer - mediaServerService.closeJTTServer(streamInfo.getMediaServer(), streamInfo.getStream(), null); + receiveRtpServerService.closeRTPServer(streamInfo.getMediaServer(), new SSRCInfo(streamInfo.getApp(), streamInfo.getStream())); redisTemplate.delete(playKey); } @@ -425,12 +435,12 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playbackKey); if (streamInfo != null) { - mediaServerService.closeJTTServer(streamInfo.getMediaServer(), streamInfo.getStream(), null); + mediaServerService.closeJTTServer(streamInfo.getMediaServer(), streamInfo.getStream()); // 清理数据 redisTemplate.delete(playbackKey); } - String app = "1078"; + String app = MediaApp.JT1078; String stream = String.format("%s_%s_%s_%s", phoneNumber, channelId, DateUtil.yyyy_MM_dd_HH_mm_ssToUrl(startTime), DateUtil.yyyy_MM_dd_HH_mm_ssToUrl(endTime)); MediaServer mediaServer; @@ -615,7 +625,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { sendRtpInfo.setReceiveStream(stream + "_talk"); // 设置hook监听 - Hook hook = Hook.getInstance(HookType.on_media_arrival, "1078", sendRtpInfo.getReceiveStream(), mediaServer.getId()); + Hook hook = Hook.getInstance(HookType.on_media_arrival, MediaApp.JT1078, sendRtpInfo.getReceiveStream(), mediaServer.getId()); subscribe.addSubscribe(hook, (hookData) -> { log.info("[JT-对讲] 对讲连接建立, phoneNumber: {}, channelId: {}", phoneNumber, channelId); subscribe.removeSubscribe(hook); diff --git a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLHttpHookListener.java index 17a6c5680..97d59b474 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLHttpHookListener.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.media.abl; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.enums.MediaApp; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.service.IDeviceService; @@ -194,14 +195,14 @@ public class ABLHttpHookListener { logger.info("[ABL HOOK] 码流不到达通知:{}->{}/{}", param.getMediaServerId(), param.getApp(), param.getStream()); try { - if ("rtp".equals(param.getApp())) { + if (MediaApp.GB28181.equals(param.getApp())) { return HookResult.SUCCESS(); } MediaRtpServerTimeoutEvent event = new MediaRtpServerTimeoutEvent(this); MediaServer mediaServerItem = mediaServerService.getOne(param.getMediaServerId()); if (mediaServerItem != null) { event.setMediaServer(mediaServerItem); - event.setApp("rtp"); + event.setApp(MediaApp.GB28181); applicationEventPublisher.publishEvent(event); } }catch (Exception e) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java index ab24c23eb..6b22de5fb 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaNodeServerService.java @@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.common.CommonCallback; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.enums.MediaApp; import com.genersoft.iot.vmp.conf.SipConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; @@ -68,40 +69,40 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService { } @Override - public int createRTPServer(MediaServer mediaServer, String stream, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) { + public int createRTPServer(MediaServer mediaServer, String app, String stream, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) { Boolean recordSip = userSetting.getRecordSip(); - return ablresTfulUtils.openRtpServer(mediaServer, "rtp", stream, 96, port, tcpMode, disableAudio?1:0, recordSip, false); + return ablresTfulUtils.openRtpServer(mediaServer, app, stream, 96, port, tcpMode, disableAudio?1:0, recordSip, false); } @Override - public void closeRtpServer(MediaServer mediaServer, String streamId, CommonCallback callback) { + public void closeRtpServer(MediaServer mediaServer, String app, String stream, CommonCallback callback) { if (mediaServer == null) { return; } - ABLResult result = ablresTfulUtils.closeStreams(mediaServer, "rtp", streamId); + ABLResult result = ablresTfulUtils.closeStreams(mediaServer, app, stream); logger.info("关闭RTP Server " + result); if (result.getCode() != 0) { logger.error("[closeRtpServer] 失败: {}", result.getMemo()); } } - @Override - public int createJTTServer(MediaServer mediaServer, String stream, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode) { - Boolean recordSip = userSetting.getRecordSip(); - return ablresTfulUtils.openRtpServer(mediaServer, "1078", stream, 96, port, tcpMode, disableAudio?1:0, recordSip, true); - } - - @Override - public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback callback) { - if (mediaServer == null) { - return; - } - ABLResult result = ablresTfulUtils.closeStreams(mediaServer, "1078", streamId); - logger.info("关闭JT-RTP Server " + result); - if (result.getCode() != 0) { - logger.error("[JT-closeRtpServer] 失败: {}", result.getMemo()); - } - } +// @Override +// public int createJTTServer(MediaServer mediaServer, String stream, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode) { +// Boolean recordSip = userSetting.getRecordSip(); +// return ablresTfulUtils.openRtpServer(mediaServer, MediaApp.JT1078, stream, 96, port, tcpMode, disableAudio?1:0, recordSip, true); +// } +// +// @Override +// public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback callback) { +// if (mediaServer == null) { +// return; +// } +// ABLResult result = ablresTfulUtils.closeStreams(mediaServer, MediaApp.JT1078, streamId); +// logger.info("关闭JT-RTP Server " + result); +// if (result.getCode() != 0) { +// logger.error("[JT-closeRtpServer] 失败: {}", result.getMemo()); +// } +// } @Override public void closeStreams(MediaServer mediaServer, String app, String streamId) { @@ -112,7 +113,7 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService { } @Override - public Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String streamId, String ssrc) { + public Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String app, String streamId, String ssrc) { return null; } @@ -256,7 +257,7 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService { } @Override - public Boolean connectRtpServer(MediaServer mediaServerItem, String address, int port, String stream) { + public Boolean connectRtpServer(MediaServer mediaServerItem, String address, int port, String app, String stream) { logger.warn("[abl-connectRtpServer] 未实现"); return null; } @@ -443,7 +444,7 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService { @Override public List listRtpServer(MediaServer mediaServer) { - ABLResult ablResult = ablresTfulUtils.getMediaList(mediaServer, "rtp", null); + ABLResult ablResult = ablresTfulUtils.getMediaList(mediaServer, MediaApp.GB28181, null); if (ablResult.getCode() != 0) { return null; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java index b0a768c84..8e36c7b0f 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaNodeServerService.java @@ -15,19 +15,19 @@ import java.util.List; import java.util.Map; public interface IMediaNodeServerService { - int createRTPServer(MediaServer mediaServer, String streamId, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode); + int createRTPServer(MediaServer mediaServer, String app, String stream, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode); - void closeRtpServer(MediaServer mediaServer, String streamId, CommonCallback callback); + void closeRtpServer(MediaServer mediaServer, String app, String stream, CommonCallback callback); - int createJTTServer(MediaServer mediaServer, String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode); - - void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback callback); +// int createJTTServer(MediaServer mediaServer, String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode); +// +// void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback callback); void closeStreams(MediaServer mediaServer, String app, String stream); - Boolean updateRtpServerSSRC(MediaServer mediaServer, String stream, String ssrc); + Boolean updateRtpServerSSRC(MediaServer mediaServer, String app, String stream, String ssrc); boolean checkNodeId(MediaServer mediaServer); @@ -43,7 +43,7 @@ public interface IMediaNodeServerService { List getMediaList(MediaServer mediaServer, String app, String stream, String callId); - Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String stream); + Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String app, String stream); void getSnap(MediaServer mediaServer, String app, String stream, int timeoutSec, int expireSec, String path, String fileName); diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java index 7b6fe6114..aba24a9ca 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/IMediaServerService.java @@ -34,20 +34,20 @@ public interface IMediaServerService { void updateVmServer(List mediaServerItemList); - SSRCInfo openRTPServer(MediaServer mediaServerItem, String streamId, String presetSsrc, boolean ssrcCheck, + SSRCInfo openRTPServer(MediaServer mediaServerItem, String app, String streamId, String presetSsrc, boolean ssrcCheck, boolean isPlayback, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode); - void closeRTPServer(MediaServer mediaServerItem, String streamId); + void closeRTPServer(MediaServer mediaServerItem, String app, String streamId); - void closeRTPServer(MediaServer mediaServerItem, String streamId, CommonCallback callback); + void closeRTPServer(MediaServer mediaServerItem, String app, String streamId, CommonCallback callback); - SSRCInfo openJTTServer(MediaServer mediaServerItem, String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode); +// SSRCInfo openJTTServer(MediaServer mediaServerItem, String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode); +// +// void closeJTTServer(MediaServer mediaServerItem, String streamId, CommonCallback callback); - void closeJTTServer(MediaServer mediaServerItem, String streamId, CommonCallback callback); + Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String app, String streamId, String ssrc); - Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String streamId, String ssrc); - - void closeRTPServer(String mediaServerId, String streamId); + void closeRTPServer(String mediaServerId, String app, String streamId); void clearRTPServer(MediaServer mediaServerItem); @@ -89,7 +89,7 @@ public interface IMediaServerService { List getMediaList(MediaServer mediaInfo, String app, String stream, String callId); - Boolean connectRtpServer(MediaServer mediaServerItem, String address, int port, String stream); + Boolean connectRtpServer(MediaServer mediaServerItem, String address, int port, String app, String stream); void getSnap(MediaServer mediaServer, String app, String stream, int timeoutSec, int expireSec, String path, String fileName); @@ -162,7 +162,7 @@ public interface IMediaServerService { StreamInfo getMediaByAppAndStream(String app, String stream); - int createRTPServer(MediaServer mediaServerItem, String streamId, long ssrc, Integer port, boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode); + int createRTPServer(MediaServer mediaServerItem, String app, String streamId, long ssrc, Integer port, boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode); List listRtpServer(MediaServer mediaServer); diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index deb1044ce..34ef13719 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -164,7 +164,7 @@ public class MediaServerServiceImpl implements IMediaServerService { @Override - public SSRCInfo openRTPServer(MediaServer mediaServer, String streamId, String presetSsrc, boolean ssrcCheck, + public SSRCInfo openRTPServer(MediaServer mediaServer, String app, String streamId, String presetSsrc, boolean ssrcCheck, boolean isPlayback, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) { if (mediaServer == null || mediaServer.getId() == null) { log.info("[openRTPServer] 失败, mediaServer == null || mediaServer.getId() == null"); @@ -196,15 +196,15 @@ public class MediaServerServiceImpl implements IMediaServerService { log.info("[openRTPServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); return null; } - rtpServerPort = mediaNodeServerService.createRTPServer(mediaServer, streamId, ssrcCheck ? Long.parseLong(ssrc) : 0, port, onlyAuto, disableAudio, reUsePort, tcpMode); + rtpServerPort = mediaNodeServerService.createRTPServer(mediaServer, app, streamId, ssrcCheck ? Long.parseLong(ssrc) : 0, port, onlyAuto, disableAudio, reUsePort, tcpMode); } else { rtpServerPort = mediaServer.getRtpProxyPort(); } - return new SSRCInfo(rtpServerPort, ssrc, "rtp", streamId, null); + return new SSRCInfo(rtpServerPort, ssrc, app, streamId); } @Override - public int createRTPServer(MediaServer mediaServer, String streamId, long ssrc, Integer port, boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode) { + public int createRTPServer(MediaServer mediaServer, String app, String streamId, long ssrc, Integer port, boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode) { int rtpServerPort; if (mediaServer.isRtpEnable()) { IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); @@ -212,33 +212,33 @@ public class MediaServerServiceImpl implements IMediaServerService { log.info("[openRTPServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); return 0; } - rtpServerPort = mediaNodeServerService.createRTPServer(mediaServer, streamId, ssrc, port, onlyAuto, disableAudio, reUsePort, tcpMode); + rtpServerPort = mediaNodeServerService.createRTPServer(mediaServer, app, streamId, ssrc, port, onlyAuto, disableAudio, reUsePort, tcpMode); } else { rtpServerPort = mediaServer.getRtpProxyPort(); } return rtpServerPort; } - @Override - public SSRCInfo openJTTServer(MediaServer mediaServer, @NotNull String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode) { - if (mediaServer == null || mediaServer.getId() == null) { - log.info("[openJTTServer] 失败, mediaServer == null || mediaServer.getId() == null"); - return null; - } - - int rtpServerPort; - if (mediaServer.isRtpEnable()) { - IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); - if (mediaNodeServerService == null) { - log.info("[openJTTServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); - return null; - } - rtpServerPort = mediaNodeServerService.createJTTServer(mediaServer, streamId, port, disableVideo, disableAudio, tcpMode); - } else { - rtpServerPort = mediaServer.getJttProxyPort(); - } - return new SSRCInfo(rtpServerPort, null, "1078", streamId, null); - } +// @Override +// public SSRCInfo openJTTServer(MediaServer mediaServer, @NotNull String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode) { +// if (mediaServer == null || mediaServer.getId() == null) { +// log.info("[openJTTServer] 失败, mediaServer == null || mediaServer.getId() == null"); +// return null; +// } +// +// int rtpServerPort; +// if (mediaServer.isRtpEnable()) { +// IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); +// if (mediaNodeServerService == null) { +// log.info("[openJTTServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); +// return null; +// } +// rtpServerPort = mediaNodeServerService.createJTTServer(mediaServer, streamId, port, disableVideo, disableAudio, tcpMode); +// } else { +// rtpServerPort = mediaServer.getJttProxyPort(); +// } +// return new SSRCInfo(rtpServerPort, null, "1078", streamId, null); +// } @Override public List listRtpServer(MediaServer mediaServer) { @@ -251,7 +251,7 @@ public class MediaServerServiceImpl implements IMediaServerService { } @Override - public void closeRTPServer(MediaServer mediaServer, String streamId) { + public void closeRTPServer(MediaServer mediaServer, String app, String streamId) { if (mediaServer == null) { return; } @@ -260,11 +260,11 @@ public class MediaServerServiceImpl implements IMediaServerService { log.info("[closeRTPServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); return; } - mediaNodeServerService.closeRtpServer(mediaServer, streamId, null); + mediaNodeServerService.closeRtpServer(mediaServer, app, streamId, null); } @Override - public void closeRTPServer(MediaServer mediaServer, String streamId, CommonCallback callback) { + public void closeRTPServer(MediaServer mediaServer, String app, String streamId, CommonCallback callback) { if (mediaServer == null) { callback.run(false); return; @@ -274,42 +274,42 @@ public class MediaServerServiceImpl implements IMediaServerService { log.info("[closeRTPServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); return; } - mediaNodeServerService.closeRtpServer(mediaServer, streamId, callback); + mediaNodeServerService.closeRtpServer(mediaServer, app, streamId, callback); } @Override - public void closeRTPServer(String mediaServerId, String streamId) { + public void closeRTPServer(String mediaServerId, String app, String streamId) { MediaServer mediaServer = this.getOne(mediaServerId); if (mediaServer == null) { return; } if (mediaServer.isRtpEnable()) { - closeRTPServer(mediaServer, streamId); + closeRTPServer(mediaServer, app, streamId); } IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); if (mediaNodeServerService == null) { log.info("[closeRTPServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); return; } - mediaNodeServerService.closeStreams(mediaServer, "rtp", streamId); + mediaNodeServerService.closeStreams(mediaServer, app, streamId); } - @Override - public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback callback) { - if (mediaServer == null) { - callback.run(false); - return; - } - IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); - if (mediaNodeServerService == null) { - log.info("[closeJTTServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); - return; - } - mediaNodeServerService.closeJTTServer(mediaServer, streamId, callback); - } +// @Override +// public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback callback) { +// if (mediaServer == null) { +// callback.run(false); +// return; +// } +// IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); +// if (mediaNodeServerService == null) { +// log.info("[closeJTTServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); +// return; +// } +// mediaNodeServerService.closeJTTServer(mediaServer, streamId, callback); +// } @Override - public Boolean updateRtpServerSSRC(MediaServer mediaServer, String streamId, String ssrc) { + public Boolean updateRtpServerSSRC(MediaServer mediaServer, String app, String streamId, String ssrc) { if (mediaServer == null) { return false; } @@ -318,7 +318,7 @@ public class MediaServerServiceImpl implements IMediaServerService { log.info("[updateRtpServerSSRC] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); return false; } - return mediaNodeServerService.updateRtpServerSSRC(mediaServer, streamId, ssrc); + return mediaNodeServerService.updateRtpServerSSRC(mediaServer, app, streamId, ssrc); } @Override @@ -716,13 +716,13 @@ public class MediaServerServiceImpl implements IMediaServerService { } @Override - public Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String stream) { + public Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String app, String stream) { IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); if (mediaNodeServerService == null) { log.info("[connectRtpServer] 失败, mediaServer的类型: {},未找到对应的实现类", mediaServer.getType()); return false; } - return mediaNodeServerService.connectRtpServer(mediaServer, address, port, stream); + return mediaNodeServerService.connectRtpServer(mediaServer, address, port, app, stream); } @Override diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java index c6c7d5426..29cdac26d 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMHttpHookListener.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.media.zlm; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; +import com.genersoft.iot.vmp.common.enums.MediaApp; import com.genersoft.iot.vmp.conf.MediaConfig; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.media.bean.MediaServer; @@ -262,7 +263,7 @@ public class ZLMHttpHookListener { log.info("[ZLM HOOK] rtp发送关闭:{}->{}/{}", param.getMediaServerId(), param.getApp(), param.getStream()); // 查找对应的上级推流,发送停止 - if (!"rtp".equals(param.getApp())) { + if (!MediaApp.GB28181.equals(param.getApp())) { return HookResult.SUCCESS(); } try { @@ -293,7 +294,7 @@ public class ZLMHttpHookListener { MediaServer mediaServerItem = mediaServerService.getOne(param.getMediaServerId()); if (mediaServerItem != null) { event.setMediaServer(mediaServerItem); - event.setApp("rtp"); + event.setApp(MediaApp.GB28181); applicationEventPublisher.publishEvent(event); } }catch (Exception e) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java index 333c494df..e1142c535 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaNodeServerService.java @@ -47,24 +47,24 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { private HookSubscribe subscribe; @Override - public int createRTPServer(MediaServer mediaServer, String streamId, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) { - return zlmServerFactory.createRTPServer(mediaServer, "rtp", streamId, ssrc, port, onlyAuto, disableAudio, reUsePort, tcpMode); + public int createRTPServer(MediaServer mediaServer, String app, String stream, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) { + return zlmServerFactory.createRTPServer(mediaServer, app, stream, ssrc, port, onlyAuto, disableAudio, reUsePort, tcpMode); } @Override - public void closeRtpServer(MediaServer mediaServer, String streamId, CommonCallback callback) { - zlmServerFactory.closeRtpServer(mediaServer, streamId, callback); + public void closeRtpServer(MediaServer mediaServer, String app, String stream, CommonCallback callback) { + zlmServerFactory.closeRtpServer(mediaServer, app, stream, callback); } - @Override - public int createJTTServer(MediaServer mediaServer, String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode) { - return zlmServerFactory.createRTPServer(mediaServer, "1078", streamId, 0, port, disableVideo, disableAudio, false, tcpMode); - } +// @Override +// public int createJTTServer(MediaServer mediaServer, String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode) { +// return zlmServerFactory.createRTPServer(mediaServer, "1078", streamId, 0, port, disableVideo, disableAudio, false, tcpMode); +// } - @Override - public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback callback) { - zlmServerFactory.closeRtpServer(mediaServer, streamId, callback); - } +// @Override +// public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback callback) { +// zlmServerFactory.closeRtpServer(mediaServer, streamId, callback); +// } @Override public void closeStreams(MediaServer mediaServer, String app, String stream) { @@ -72,8 +72,8 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { } @Override - public Boolean updateRtpServerSSRC(MediaServer mediaServer, String streamId, String ssrc) { - return zlmServerFactory.updateRtpServerSSRC(mediaServer, streamId, ssrc); + public Boolean updateRtpServerSSRC(MediaServer mediaServer, String app, String streamId, String ssrc) { + return zlmServerFactory.updateRtpServerSSRC(mediaServer, app, streamId, ssrc); } @Override @@ -208,8 +208,8 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService { } @Override - public Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String stream) { - ZLMResult zlmResult = zlmresTfulUtils.connectRtpServer(mediaServer, address, port, stream); + public Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String app, String stream) { + ZLMResult zlmResult = zlmresTfulUtils.connectRtpServer(mediaServer, address, port, app, stream); log.info("[TCP主动连接对方] 结果: {}", zlmResult); return zlmResult.getCode() == 0; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java index 66bad9a97..03a56b63b 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMRESTfulUtils.java @@ -694,10 +694,11 @@ public class ZLMRESTfulUtils { } } - public ZLMResult connectRtpServer(MediaServer mediaServer, String dst_url, int dst_port, String stream_id) { + public ZLMResult connectRtpServer(MediaServer mediaServer, String dst_url, int dst_port, String app, String stream_id) { Map param = new HashMap<>(1); param.put("dst_url", dst_url); param.put("dst_port", dst_port); + param.put("app", app); param.put("stream_id", stream_id); String response = sendPost(mediaServer, "connectRtpServer", param, null); if (response == null) { @@ -712,9 +713,10 @@ public class ZLMRESTfulUtils { } } - public ZLMResult updateRtpServerSSRC(MediaServer mediaServer, String streamId, String ssrc) { + public ZLMResult updateRtpServerSSRC(MediaServer mediaServer, String app, String streamId, String ssrc) { Map param = new HashMap<>(1); param.put("ssrc", ssrc); + param.put("app", app); param.put("stream_id", streamId); String response = sendPost(mediaServer, "updateRtpServerSSRC", param, null); diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java index 4349445d3..1e52ce353 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMServerFactory.java @@ -98,10 +98,11 @@ public class ZLMServerFactory { return result; } - public boolean closeRtpServer(MediaServer serverItem, String streamId) { + public boolean closeRtpServer(MediaServer serverItem, String app, String streamId) { boolean result = false; if (serverItem !=null){ Map param = new HashMap<>(); + param.put("app", app); param.put("stream_id", streamId); ZLMResult zlmResult = zlmresTfulUtils.closeRtpServer(serverItem, param); if (zlmResult != null ) { @@ -118,7 +119,7 @@ public class ZLMServerFactory { return result; } - public void closeRtpServer(MediaServer serverItem, String streamId, CommonCallback callback) { + public void closeRtpServer(MediaServer serverItem, String app, String streamId, CommonCallback callback) { if (serverItem == null) { if (callback != null) { callback.run(false); @@ -126,6 +127,7 @@ public class ZLMServerFactory { return; } Map param = new HashMap<>(); + param.put("app", app); param.put("stream_id", streamId); zlmresTfulUtils.closeRtpServer(serverItem, param, zlmResult -> { if (zlmResult.getCode() == 0) { @@ -223,9 +225,9 @@ public class ZLMServerFactory { return zlmResult; } - public Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String streamId, String ssrc) { + public Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String app, String streamId, String ssrc) { boolean result = false; - ZLMResult zlmResult = zlmresTfulUtils.updateRtpServerSSRC(mediaServerItem, streamId, ssrc); + ZLMResult zlmResult = zlmresTfulUtils.updateRtpServerSSRC(mediaServerItem, app, streamId, ssrc); if (zlmResult.getCode() == 0) { result= true; log.info("[更新RTPServer] 成功"); diff --git a/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java b/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java index caf2b9044..84cfbf9f9 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IReceiveRtpServerService.java @@ -1,13 +1,16 @@ package com.genersoft.iot.vmp.service; +import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.gb28181.bean.OpenRTPServerResult; import com.genersoft.iot.vmp.media.bean.MediaServer; +import com.genersoft.iot.vmp.media.event.hook.HookData; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.RTPServerParam; import com.genersoft.iot.vmp.service.bean.SSRCInfo; public interface IReceiveRtpServerService { - SSRCInfo openRTPServer(RTPServerParam rtpServerParam, ErrorCallback callback); - void closeRTPServer(MediaServer mediaServer, SSRCInfo ssrcInfo); + int openRTPServer(RTPServerParam rtpServerParam, ErrorCallback callback); + + void closeRTPServer(MediaServer mediaServer, String app, String stream); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/RTPServerParam.java b/src/main/java/com/genersoft/iot/vmp/service/bean/RTPServerParam.java index 2baf335a8..6c2dd15d2 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/RTPServerParam.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/RTPServerParam.java @@ -2,15 +2,25 @@ package com.genersoft.iot.vmp.service.bean; import com.genersoft.iot.vmp.media.bean.MediaServer; import lombok.Data; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; -@Data +@Getter +@Setter +@NoArgsConstructor public class RTPServerParam { - private MediaServer mediaServerItem; + /** + * 使用的流媒体 + */ + private MediaServer mediaServer; + private String app; private String streamId; - private String presetSsrc; - private boolean ssrcCheck; - private boolean playback; + /** + * 开启rtpServer时使用的ssrc,开启rtpServer时会根据这个ssrc进行校验,如果不填则不校验 + */ + private Long ssrc; private Integer port; private boolean onlyAuto; private boolean disableAudio; @@ -21,5 +31,16 @@ public class RTPServerParam { */ private Integer tcpMode; - + public RTPServerParam(MediaServer mediaServer, String app, String streamId, Long ssrc, Integer port, + boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode) { + this.mediaServer = mediaServer; + this.app = app; + this.streamId = streamId; + this.ssrc = ssrc; + this.port = port; + this.onlyAuto = onlyAuto; + this.disableAudio = disableAudio; + this.reUsePort = reUsePort; + this.tcpMode = tcpMode; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/SSRCInfo.java b/src/main/java/com/genersoft/iot/vmp/service/bean/SSRCInfo.java index c35ceb550..e74d17e0e 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/SSRCInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/SSRCInfo.java @@ -9,13 +9,12 @@ public class SSRCInfo { private String ssrc; private String app; private String Stream; - private String timeOutTaskKey; - public SSRCInfo(int port, String ssrc, String app, String stream, String timeOutTaskKey) { + public SSRCInfo(int port, String ssrc, String app, String stream) { this.port = port; this.ssrc = ssrc; this.app = app; this.Stream = stream; - this.timeOutTaskKey = timeOutTaskKey; } + } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java index 833742590..7cb75dcdc 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MediaServiceImpl.java @@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionStatus; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.common.enums.MediaApp; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; @@ -82,7 +83,7 @@ public class MediaServiceImpl implements IMediaService { if (app == null || stream == null) { return false; } - if ("rtp".equals(app)) { + if (MediaApp.GB28181.equals(app)) { return true; } StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream); @@ -95,7 +96,7 @@ public class MediaServiceImpl implements IMediaService { @Override public ResultForOnPublish authenticatePublish(MediaServer mediaServer, String app, String stream, String params) { // 推流鉴权的处理 - if (!"rtp".equals(app) && !"1078".equals(app) ) { + if (!MediaApp.GB28181.equals(app) && !MediaApp.JT1078.equals(app) ) { if ("talk".equals(app) && stream.endsWith("_talk")) { ResultForOnPublish result = new ResultForOnPublish(); result.setEnable_mp4(false); @@ -156,7 +157,7 @@ public class MediaServiceImpl implements IMediaService { result.setEnable_audio(true); // 国标流 - if ("rtp".equals(app)) { + if (MediaApp.GB28181.equals(app)) { InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream); @@ -223,7 +224,7 @@ public class MediaServiceImpl implements IMediaService { }else { result.setEnable_mp4(userSetting.getRecordPushLive()); } - if (app.equalsIgnoreCase("rtp")) { + if (app.equalsIgnoreCase(MediaApp.GB28181)) { String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + stream; OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo) redisTemplate.opsForValue().get(receiveKey); @@ -243,7 +244,7 @@ public class MediaServiceImpl implements IMediaService { return false; } // 国标类型的流 - if ("rtp".equals(app)) { + if (MediaApp.GB28181.equals(app)) { result = userSetting.getStreamOnDemand(); // 国标流, 点播/录像回放/录像下载 InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream); @@ -260,7 +261,7 @@ public class MediaServiceImpl implements IMediaService { } return result; } - }else if ("1078".equals(app)) { + }else if (MediaApp.JT1078.equals(app)) { // 判断是否是1078播放类型 JTMediaStreamType jtMediaStreamType = ijt1078Service.checkStreamFromJt(stream); if (jtMediaStreamType != null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java index 86099262d..9a2aa2346 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java @@ -1,12 +1,16 @@ package com.genersoft.iot.vmp.service.impl; +import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.enums.MediaApp; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.OpenRTPServerResult; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; +import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.event.hook.Hook; +import com.genersoft.iot.vmp.media.event.hook.HookData; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookType; import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; @@ -23,12 +27,15 @@ import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; +import java.util.Objects; import java.util.UUID; @Slf4j @Service public class RtpServerServiceImpl implements IReceiveRtpServerService { + private final static String TIMEOUT_TASK_KEY_PREFIX = "RTP_SERVER_TIMEOUT_TASK"; + @Autowired private IMediaServerService mediaServerService; @@ -65,98 +72,111 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { } - @Override - public SSRCInfo openRTPServer(RTPServerParam rtpServerParam, ErrorCallback callback) { + public SSRCInfo openGbRTPServer(MediaServer mediaServer, String streamId, String presetSSRC, int tcpMode, + boolean playback, boolean ssrcCheck, boolean onlyAuto, boolean disableAuto, boolean reUsePort, + ErrorCallback callback ) { if (callback == null) { - log.warn("[开启RTP收流] 失败,回调为NULL"); + log.warn("[开启国标RTP收流] 失败,回调为NULL"); return null; } - if (rtpServerParam.getMediaServerItem() == null) { - log.warn("[开启RTP收流] 失败,媒体节点为NULL"); + if (mediaServer == null) { + log.warn("[开启国标RTP收流] 失败,媒体节点为NULL"); return null; } // 获取mediaServer可用的ssrc final String ssrc; - if (rtpServerParam.getPresetSsrc() != null) { - ssrc = rtpServerParam.getPresetSsrc(); + if (presetSSRC != null) { + ssrc = presetSSRC; }else { - if (rtpServerParam.isPlayback()) { - ssrc = ssrcFactory.getPlayBackSsrc(rtpServerParam.getMediaServerItem().getId()); + if (playback) { + ssrc = ssrcFactory.getPlayBackSsrc(mediaServer.getId()); }else { - ssrc = ssrcFactory.getPlaySsrc(rtpServerParam.getMediaServerItem().getId()); + ssrc = ssrcFactory.getPlaySsrc(mediaServer.getId()); } } - final String streamId; - if (rtpServerParam.getStreamId() == null) { + if (streamId == null) { streamId = String.format("%08x", Long.parseLong(ssrc)).toUpperCase(); - }else { - streamId = rtpServerParam.getStreamId(); } - if (rtpServerParam.isSsrcCheck() && rtpServerParam.getTcpMode() > 0) { + if (ssrcCheck && tcpMode > 0) { // 目前zlm不支持 tcp模式更新ssrc,暂时关闭ssrc校验 log.warn("[openRTPServer] 平台对接时下级可能自定义ssrc,但是tcp模式zlm收流目前无法更新ssrc,可能收流超时,此时请使用udp收流或者关闭ssrc校验"); } - int rtpServerPort; - if (rtpServerParam.getMediaServerItem().isRtpEnable()) { - rtpServerPort = mediaServerService.createRTPServer(rtpServerParam.getMediaServerItem(), streamId, - rtpServerParam.isSsrcCheck() ? Long.parseLong(ssrc) : 0, rtpServerParam.getPort(), rtpServerParam.isOnlyAuto(), - rtpServerParam.isDisableAudio(), rtpServerParam.isReUsePort(), rtpServerParam.getTcpMode()); - } else { - rtpServerPort = rtpServerParam.getMediaServerItem().getRtpProxyPort(); - } - if (rtpServerPort == 0) { - callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "开启RTPServer失败", null); - // 释放ssrc - if (rtpServerParam.getPresetSsrc() == null) { - ssrcFactory.releaseSsrc(rtpServerParam.getMediaServerItem().getId(), ssrc); + + SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaApp.GB28181, streamId); + RTPServerParam rtpServerParam = new RTPServerParam(mediaServer, MediaApp.GB28181, streamId, ssrcCheck ? Long.parseLong(ssrc): 0L, null, onlyAuto, disableAuto, reUsePort, tcpMode); + int rtpServerPort = openRTPServer(rtpServerParam, ((code, msg, data) -> { + if (code == InviteErrorCode.SUCCESS.getCode()) { + OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult(); + openRTPServerResult.setHookData(data); + openRTPServerResult.setSsrcInfo(ssrcInfo); + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), openRTPServerResult); + } else { + // 释放ssrc + if (presetSSRC == null) { + ssrcFactory.releaseSsrc(mediaServer.getId(), ssrc); + } + callback.run(code, msg, null); } - return null; + })); + ssrcInfo.setPort(rtpServerPort); + return new SSRCInfo(rtpServerPort, ssrc, MediaApp.GB28181, streamId); + } + + @Override + public int openRTPServer(RTPServerParam rtpServerParam, ErrorCallback callback) { + if (callback == null) { + log.warn("[开启RTP收流] 失败,回调为NULL"); + return -1; + } + if (rtpServerParam.getMediaServer() == null) { + log.warn("[开启RTP收流] 失败,媒体节点为NULL"); + return -1; } // 设置流超时的定时任务 - String timeOutTaskKey = UUID.randomUUID().toString(); + String timeOutTaskKey = String.format("%s_%s_%s_%s", TIMEOUT_TASK_KEY_PREFIX, rtpServerParam.getMediaServer().getId(), rtpServerParam.getApp(), rtpServerParam.getStreamId()); - SSRCInfo ssrcInfo = new SSRCInfo(rtpServerPort, ssrc, "rtp", streamId, timeOutTaskKey); - OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult(); - openRTPServerResult.setSsrcInfo(ssrcInfo); - - Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, ssrcInfo.getApp(), streamId, rtpServerParam.getMediaServerItem().getId()); + Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, rtpServerParam.getApp(), rtpServerParam.getStreamId(), rtpServerParam.getMediaServer().getId()); dynamicTask.startDelay(timeOutTaskKey, () -> { // 收流超时 - // 释放ssrc - if (rtpServerParam.getPresetSsrc() == null) { - ssrcFactory.releaseSsrc(rtpServerParam.getMediaServerItem().getId(), ssrc); - } // 关闭收流端口 - mediaServerService.closeRTPServer(rtpServerParam.getMediaServerItem(), streamId); + mediaServerService.closeRTPServer(rtpServerParam.getMediaServer(), rtpServerParam.getApp(), rtpServerParam.getStreamId()); subscribe.removeSubscribe(rtpHook); - callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), openRTPServerResult); + callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null); }, userSetting.getPlayTimeout()); // 开启流到来的监听 subscribe.addSubscribe(rtpHook, (hookData) -> { dynamicTask.stop(timeOutTaskKey); // hook响应 - openRTPServerResult.setHookData(hookData); - callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), openRTPServerResult); + callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), hookData); subscribe.removeSubscribe(rtpHook); }); - return ssrcInfo; + int rtpServerPort; + if (rtpServerParam.getMediaServer().isRtpEnable()) { + rtpServerPort = mediaServerService.createRTPServer(rtpServerParam.getMediaServer(), rtpServerParam.getApp(), rtpServerParam.getStreamId(), + Objects.requireNonNullElse(rtpServerParam.getSsrc(), 0L), rtpServerParam.getPort(), rtpServerParam.isOnlyAuto(), + rtpServerParam.isDisableAudio(), rtpServerParam.isReUsePort(), rtpServerParam.getTcpMode()); + } else { + rtpServerPort = rtpServerParam.getMediaServer().getRtpProxyPort(); + } + if (rtpServerPort == 0) { + callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "开启RTPServer失败", null); + return -1; + } + return rtpServerPort; } @Override - public void closeRTPServer(MediaServer mediaServer, SSRCInfo ssrcInfo) { + public void closeRTPServer(MediaServer mediaServer, String app, String stream) { if (mediaServer == null) { return; } - if (ssrcInfo.getTimeOutTaskKey() != null) { - dynamicTask.stop(ssrcInfo.getTimeOutTaskKey()); + String timeOutTaskKey = String.format("%s_%s_%s_%s", TIMEOUT_TASK_KEY_PREFIX, mediaServer.getId(), app, stream); + if (dynamicTask.contains(timeOutTaskKey)) { + dynamicTask.stop(timeOutTaskKey); } - if (ssrcInfo.getSsrc() != null) { - // 释放ssrc - ssrcFactory.releaseSsrc(mediaServer.getId(), ssrcInfo.getSsrc()); - } - mediaServerService.closeRTPServer(mediaServer, ssrcInfo.getStream()); + mediaServerService.closeRTPServer(mediaServer, app, stream); } } diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java index 854a43a5c..76f163236 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.streamProxy.service.impl; import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.StreamInfo; import com.genersoft.iot.vmp.common.enums.ChannelDataType; +import com.genersoft.iot.vmp.common.enums.MediaApp; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; @@ -103,7 +104,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { @Async("taskExecutor") @EventListener public void onApplicationEvent(MediaNotFoundEvent event) { - if ("rtp".equals(event.getApp())) { + if (MediaApp.GB28181.equals(event.getApp())) { return; } // 拉流代理 diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java index efffbf909..d05111d69 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.vmanager.ps; import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.common.enums.MediaApp; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; @@ -92,14 +93,14 @@ public class PsController { } } String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + callId + "_" + stream; - SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, ssrcInt + "", false, false, null, false, false, false, tcpMode); + SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, MediaApp.GB28181, stream, ssrcInt + "", false, false, null, false, false, false, tcpMode); if (ssrcInfo.getPort() == 0) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败"); } // 注册回调如果rtp收流超时则通过回调发送通知 if (callBack != null) { - Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServer.getId()); + Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, MediaApp.GB28181, stream, mediaServer.getId()); // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 hookSubscribe.addSubscribe(hook, (hookData)->{ @@ -149,7 +150,7 @@ public class PsController { public void closeRtpServer(String stream) { log.info("[第三方PS服务对接->关闭收流] stream->{}", stream); MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer(); - mediaServerService.closeRTPServer(mediaServerItem, stream); + mediaServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream); String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_*_" + stream; List scan = RedisUtil.scan(redisTemplate, receiveKey); if (!scan.isEmpty()) { diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java index b78e9a67c..6f3793564 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.vmanager.rtp; import com.genersoft.iot.vmp.common.VideoManagerConstants; +import com.genersoft.iot.vmp.common.enums.MediaApp; import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.ControllerException; @@ -92,14 +93,14 @@ public class RtpController { } } String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + callId + "_" + stream; - SSRCInfo ssrcInfoForVideo = mediaServerService.openRTPServer(mediaServer, stream, ssrcInt + "",false,false, null, false, false, false, tcpMode); - SSRCInfo ssrcInfoForAudio = mediaServerService.openRTPServer(mediaServer, stream + "_a", ssrcInt + "", false, false, null, false,false,false, tcpMode); + SSRCInfo ssrcInfoForVideo = mediaServerService.openRTPServer(mediaServer, MediaApp.GB28181, stream, ssrcInt + "",false,false, null, false, false, false, tcpMode); + SSRCInfo ssrcInfoForAudio = mediaServerService.openRTPServer(mediaServer, MediaApp.GB28181,stream + "_a", ssrcInt + "", false, false, null, false,false,false, tcpMode); if (ssrcInfoForVideo.getPort() == 0 || ssrcInfoForAudio.getPort() == 0) { throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败"); } // 注册回调如果rtp收流超时则通过回调发送通知 if (callBack != null) { - Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServer.getId()); + Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, MediaApp.GB28181, stream, mediaServer.getId()); // 订阅 zlm启动事件, 新的zlm也会从这里进入系统 hookSubscribe.addSubscribe(hook, (hookData)->{ @@ -152,8 +153,8 @@ public class RtpController { public void closeRtpServer(String stream) { log.info("[第三方服务对接->关闭收流] stream->{}", stream); MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer(); - mediaServerService.closeRTPServer(mediaServerItem, stream); - mediaServerService.closeRTPServer(mediaServerItem, stream+ "_a"); + mediaServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream); + mediaServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream+ "_a"); String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_*_" + stream; List scan = RedisUtil.scan(redisTemplate, receiveKey); if (scan.size() > 0) { diff --git a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java index a86731506..237dc79cc 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java +++ b/src/main/java/com/genersoft/iot/vmp/web/gb28181/ApiStreamController.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson2.JSONObject; import com.genersoft.iot.vmp.common.InviteInfo; import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.StreamInfo; +import com.genersoft.iot.vmp.common.enums.MediaApp; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException; import com.genersoft.iot.vmp.gb28181.bean.Device; @@ -246,7 +247,7 @@ public class ApiStreamController { } try { - cmder.streamByeCmd(device, code, "rtp", inviteInfo.getStream(), null, null); + cmder.streamByeCmd(device, code, MediaApp.GB28181, inviteInfo.getStream(), null, null); } catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) { JSONObject result = new JSONObject(); result.put("error","发送BYE失败:" + e.getMessage()); diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 67213e8a3..19e545b2a 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -2,4 +2,4 @@ spring: application: name: wvp profiles: - active: 274-dev + active: dev