优化RTP服务器相关代码,统一使用MediaApp枚举,简化参数传递,调整配置以提高兼容性

This commit is contained in:
648540858 2026-02-20 22:33:26 +08:00
parent b1b4d065a5
commit 803c47a430
30 changed files with 391 additions and 300 deletions

View File

@ -0,0 +1,8 @@
package com.genersoft.iot.vmp.common.enums;
public class MediaApp {
public final static String GB28181 = "rtp";
public final static String JT1078 = "1078";
}

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.common.enums.ChannelDataType;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import lombok.Data;
@ -199,7 +200,7 @@ public class SendRtpInfo {
sendRtpItem.setChannelId(channelId);
sendRtpItem.setTcp(isTcp);
sendRtpItem.setRtcp(rtcp);
sendRtpItem.setApp("rtp");
sendRtpItem.setApp(MediaApp.GB28181);
sendRtpItem.setLocalPort(localPort);
sendRtpItem.setServerId(serverId);
sendRtpItem.setMediaServerId(mediaServer.getId());

View File

@ -248,7 +248,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) {
for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getApp(), ssrcTransaction.getStream());
sessionManager.removeByCallId(ssrcTransaction.getCallId());
}
}

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.service.impl;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.common.*;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
@ -49,7 +50,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
if ("rtsp".equals(event.getSchema()) && MediaApp.GB28181.equals(event.getApp())) {
InviteInfo inviteInfo = getInviteInfoByStream(null, event.getStream());
if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
removeInviteInfo(inviteInfo);

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.gb28181.service.impl;
import com.genersoft.iot.vmp.common.*;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
@ -618,7 +619,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
} else {
tcpMode = 0;
}
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, ssrcCheck, false, null, true, false, false, tcpMode);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, MediaApp.GB28181, streamId, null, ssrcCheck, false, null, true, false, false, tcpMode);
if (ssrcInfo == null || ssrcInfo.getPort() < 0) {
log.info("[国标级联] 发起语音喊话 开启端口监听失败, platform: {}, channel {}", platform.getServerGBId(), channel.getGbDeviceId());
SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>();
@ -650,7 +651,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
} finally {
timeoutCallback.run(1, "收流超时");
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
}
}
@ -724,7 +725,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
log.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse);
if (!result) {
try {
log.warn("[Invite 200OK] 更新ssrc失败停止喊话 {}/{}", platform.getServerGBId(), channel.getGbDeviceId());
@ -826,12 +827,12 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
}
log.info("[TCP主动连接对方] serverGbId: {}, channelId: {}, 连接对方的地址:{}:{}, SSRC: {}, SSRC校验{}",
platform.getServerGBId(), channel.getGbDeviceId(), sdp.getConnection().getAddress(), port, ssrcInfo.getSsrc(), ssrcCheck);
Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getApp(), ssrcInfo.getStream());
log.info("[TCP主动连接对方] 结果: {}", result);
} catch (SdpException e) {
log.error("[TCP主动连接对方] serverGbId: {}, channelId: {}, 解析200OK的SDP信息失败", platform.getServerGBId(), channel.getGbDeviceId(), e);
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
@ -855,7 +856,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.warn("[消息发送失败] 停止语音对讲, 平台:{},通道:{}", platform.getId(), channel.getGbDeviceId() );
} finally {
mediaServerService.closeRTPServer(mediaServerItem, stream);
mediaServerService.closeRTPServer(mediaServerItem, app, stream);
InviteInfo inviteInfo = inviteStreamService.getInviteInfo(null, channel.getGbId(), stream);
if (inviteInfo != null) {
// 释放ssrc

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.gb28181.service.impl;
import com.genersoft.iot.vmp.common.*;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
@ -231,7 +232,7 @@ public class PlayServiceImpl implements IPlayService {
}
}
}
}else if ("rtp".equals(event.getApp())) {
}else if (MediaApp.GB28181.equals(event.getApp())) {
// 释放ssrc
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, event.getStream());
if (inviteInfo != null && inviteInfo.getStatus() == InviteSessionStatus.ok
@ -249,7 +250,7 @@ public class PlayServiceImpl implements IPlayService {
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaNotFoundEvent event) {
if (!"rtp".equals(event.getApp())) {
if (!MediaApp.GB28181.equals(event.getApp())) {
return;
}
String[] s = event.getStream().split("_");
@ -324,9 +325,9 @@ public class PlayServiceImpl implements IPlayService {
return play(mediaServerItem, device, channel, ssrc, userSetting.getRecordSip(), callback);
}
private SSRCInfo play(MediaServer mediaServerItem, Device device, DeviceChannel channel, String ssrc, Boolean record,
private SSRCInfo play(MediaServer mediaServer, Device device, DeviceChannel channel, String ssrc, Boolean record,
ErrorCallback<StreamInfo> callback) {
if (mediaServerItem == null ) {
if (mediaServer == null ) {
if (callback != null) {
callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
@ -340,7 +341,7 @@ public class PlayServiceImpl implements IPlayService {
if (inviteInfoInCatch.getStreamInfo() == null) {
// 释放生成的ssrc使用上一次申请的
ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
ssrcFactory.releaseSsrc(mediaServer.getId(), ssrc);
// 点播发起了但是尚未成功, 仅注册回调等待结果即可
inviteStreamService.once(InviteSessionType.PLAY, channel.getId(), null, callback);
log.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId({}): {}", device.getDeviceId(), channel.getDeviceId(), channel.getId());
@ -357,7 +358,7 @@ public class PlayServiceImpl implements IPlayService {
return inviteInfoInCatch.getSsrcInfo();
}
MediaServer mediaInfo = streamInfo.getMediaServer();
Boolean ready = mediaServerService.isStreamReady(mediaInfo, "rtp", streamId);
Boolean ready = mediaServerService.isStreamReady(mediaInfo, MediaApp.GB28181, streamId);
if (ready != null && ready) {
if(callback != null) {
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
@ -377,20 +378,30 @@ public class PlayServiceImpl implements IPlayService {
}
}
// 获取mediaServer可用的ssrc
final String finalSsrc;
if (ssrc != null) {
finalSsrc = ssrc;
}else {
finalSsrc = ssrcFactory.getPlaySsrc(mediaServer.getId());
}
String streamId = String.format("%s_%s", device.getDeviceId(), channel.getDeviceId());
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
RTPServerParam rtpServerParam = new RTPServerParam();
rtpServerParam.setMediaServerItem(mediaServerItem);
rtpServerParam.setMediaServer(mediaServer);
rtpServerParam.setApp(MediaApp.GB28181);
rtpServerParam.setStreamId(streamId);
rtpServerParam.setPresetSsrc(ssrc);
rtpServerParam.setSsrcCheck(device.isSsrcCheck());
rtpServerParam.setPlayback(false);
if (device.isSsrcCheck()) {
rtpServerParam.setSsrc(ssrc);
}
rtpServerParam.setPort(0);
rtpServerParam.setTcpMode(tcpMode);
rtpServerParam.setOnlyAuto(false);
rtpServerParam.setDisableAudio(!channel.isHasAudio());
SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, result) -> {
int port = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, result) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) {
// hook响应
@ -422,14 +433,14 @@ public class PlayServiceImpl implements IPlayService {
}
inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null, code, msg, null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId());
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream("rtp", streamId);
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(MediaApp.GB28181, streamId);
if (ssrcTransaction != null) {
try {
cmder.streamByeCmd(device, channel.getDeviceId(),"rtp", streamId, null, null);
cmder.streamByeCmd(device, channel.getDeviceId(),MediaApp.GB28181, streamId, null, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
log.error("[点播超时] 发送BYE失败 {}", e.getMessage());
} finally {
sessionManager.removeByStream("rtp", streamId);
sessionManager.removeByStream(MediaApp.GB28181, streamId);
}
}
}
@ -448,8 +459,8 @@ public class PlayServiceImpl implements IPlayService {
ssrcInfo.getSsrc(), device.isSsrcCheck());
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServerItem.getId(),
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServer.getId(),
mediaServer.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
InviteSessionStatus.ready, userSetting.getRecordSip());
if (record != null) {
inviteInfo.setRecord(record);
@ -460,12 +471,12 @@ public class PlayServiceImpl implements IPlayService {
inviteStreamService.updateInviteInfo(inviteInfo);
try {
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channel, (eventResult) -> {
cmder.playStreamCmd(mediaServer, ssrcInfo, device, channel, (eventResult) -> {
// 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel, callback, inviteInfo, InviteSessionType.PLAY);
InviteOKHandler(eventResult, ssrcInfo, mediaServer, device, channel, callback, inviteInfo, InviteSessionType.PLAY);
}, (event) -> {
log.info("[点播失败]{}:{} deviceId: {}, channelId:{}",event.statusCode, event.msg, device.getDeviceId(), channel.getDeviceId());
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo);
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
if (callback != null) {
@ -478,7 +489,7 @@ public class PlayServiceImpl implements IPlayService {
}, userSetting.getPlayTimeout().longValue());
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 点播消息: {}", e.getMessage());
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo);
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
if (callback != null) {
callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
@ -595,7 +606,7 @@ public class PlayServiceImpl implements IPlayService {
}, (event) -> {
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getStream());
mediaServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc());
sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream());
@ -605,7 +616,7 @@ public class PlayServiceImpl implements IPlayService {
log.error("[命令发送失败] 对讲消息: {}", e.getMessage());
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getStream());
mediaServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc());
@ -647,7 +658,7 @@ public class PlayServiceImpl implements IPlayService {
}
}
log.info("[TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channel.getDeviceId(), sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getApp(), ssrcInfo.getStream());
log.info("[TCP主动连接对方] 结果: {}" , result);
if (!result) {
// 主动连接失败结束流程 清理数据
@ -686,7 +697,7 @@ public class PlayServiceImpl implements IPlayService {
String fileName = deviceId + "_" + channelId + ".jpg";
// 请求截图
log.info("[请求截图]: " + fileName);
mediaServerService.getSnap(mediaServerItemInuse, "rtp", stream, 15, 1, path, fileName);
mediaServerService.getSnap(mediaServerItemInuse, MediaApp.GB28181, stream, 15, 1, path, fileName);
}
public StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, MediaInfo mediaInfo, Device device, DeviceChannel channel) {
@ -779,7 +790,8 @@ public class PlayServiceImpl implements IPlayService {
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
RTPServerParam rtpServerParam = new RTPServerParam();
rtpServerParam.setMediaServerItem(mediaServerItem);
rtpServerParam.setMediaServer(mediaServerItem);
rtpServerParam.setApp(MediaApp.GB28181);
rtpServerParam.setStreamId(stream);
rtpServerParam.setSsrcCheck(device.isSsrcCheck());
rtpServerParam.setPlayback(true);
@ -805,14 +817,14 @@ public class PlayServiceImpl implements IPlayService {
}
inviteStreamService.call(InviteSessionType.PLAYBACK, channel.getId(), null, code, msg, null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAYBACK, channel.getId());
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream("rtp", stream);
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(MediaApp.GB28181, stream);
if (ssrcTransaction != null) {
try {
cmder.streamByeCmd(device, channel.getDeviceId(),"rtp", stream, null, null);
cmder.streamByeCmd(device, channel.getDeviceId(),MediaApp.GB28181, stream, null, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
log.error("[录像回放] 发送BYE失败 {}", e.getMessage());
} finally {
sessionManager.removeByStream("rtp", stream);
sessionManager.removeByStream(MediaApp.GB28181, stream);
}
}
}
@ -902,7 +914,7 @@ public class PlayServiceImpl implements IPlayService {
log.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse);
if (!result) {
try {
log.warn("[Invite 200OK] 更新ssrc失败停止点播 {}/{}", device.getDeviceId(), channel.getDeviceId());
@ -940,14 +952,14 @@ public class PlayServiceImpl implements IPlayService {
if (ssrcInResponse != null) {
// 单端口
// 重新订阅流上线
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream("rtp", inviteInfo.getStream());
sessionManager.removeByStream("rtp", inviteInfo.getStream());
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(MediaApp.GB28181, inviteInfo.getStream());
sessionManager.removeByStream(MediaApp.GB28181, inviteInfo.getStream());
inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse);
ssrcTransaction.setDeviceId(device.getDeviceId());
ssrcTransaction.setChannelId(ssrcTransaction.getChannelId());
ssrcTransaction.setCallId(ssrcTransaction.getCallId());
ssrcTransaction.setSsrc(ssrcInResponse);
ssrcTransaction.setApp("rtp");
ssrcTransaction.setApp(MediaApp.GB28181);
ssrcTransaction.setStream(inviteInfo.getStream());
ssrcTransaction.setMediaServerId(mediaServerItem.getId());
ssrcTransaction.setSipTransactionInfo(new SipTransactionInfo((SIPResponse) responseEvent.getResponse()));
@ -990,7 +1002,7 @@ public class PlayServiceImpl implements IPlayService {
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
// 录像下载不使用固定流地址固定流地址会导致如果开始时间与结束时间一致时文件错误的叠加在一起
RTPServerParam rtpServerParam = new RTPServerParam();
rtpServerParam.setMediaServerItem(mediaServer);
rtpServerParam.setMediaServer(mediaServer);
rtpServerParam.setSsrcCheck(device.isSsrcCheck());
rtpServerParam.setPlayback(true);
rtpServerParam.setPort(0);
@ -1081,7 +1093,7 @@ public class PlayServiceImpl implements IPlayService {
inviteStreamService.updateInviteInfo(inviteInfoForNew, 60*15L);
}
};
Hook hook = Hook.getInstance(HookType.on_record_mp4, "rtp", ssrcInfo.getStream(), mediaServer.getId());
Hook hook = Hook.getInstance(HookType.on_record_mp4, MediaApp.GB28181, ssrcInfo.getStream(), mediaServer.getId());
// 设置过期时间下载失败时自动处理订阅数据
hook.setExpireTime(System.currentTimeMillis() + 24 * 60 * 60 * 1000);
subscribe.addSubscribe(hook, hookEventForRecord);
@ -1101,7 +1113,7 @@ public class PlayServiceImpl implements IPlayService {
InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, channel.getId(), stream);
if (inviteInfo == null) {
String app = "rtp";
String app = MediaApp.GB28181;
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream);
if (streamAuthorityInfo != null) {
List<CloudRecordItem> allList = cloudRecordService.getAllList(null, app, stream, null, null, null, streamAuthorityInfo.getCallId(), null);
@ -1143,7 +1155,7 @@ public class PlayServiceImpl implements IPlayService {
log.warn("[获取下载进度] 查询录像信息时发现节点不存在");
return null;
}
String app = "rtp";
String app = MediaApp.GB28181;
Long duration = mediaServerService.updateDownloadProcess(mediaServerItem, app, stream);
if (duration == null || duration == 0) {
inviteInfo.getStreamInfo().setProgress(0);
@ -1186,7 +1198,7 @@ public class PlayServiceImpl implements IPlayService {
public StreamInfo onPublishHandler(MediaServer mediaServerItem, MediaInfo mediaInfo, Device device, DeviceChannel channel) {
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", mediaInfo.getStream(), mediaInfo, null);
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, MediaApp.GB28181, mediaInfo.getStream(), mediaInfo, null);
streamInfo.setDeviceId(device.getDeviceId());
streamInfo.setChannelId(channel.getId());
return streamInfo;
@ -1557,7 +1569,7 @@ public class PlayServiceImpl implements IPlayService {
SendRtpInfo sendRtpInfo = sendRtpServerService.queryByChannelId(channel.getId(), device.getDeviceId());
if (sendRtpInfo != null) {
MediaServer mediaServer = mediaServerService.getOne(sendRtpInfo.getMediaServerId());
Boolean streamReady = mediaServerService.isStreamReady(mediaServer, "rtp", sendRtpInfo.getReceiveStream());
Boolean streamReady = mediaServerService.isStreamReady(mediaServer, MediaApp.GB28181, sendRtpInfo.getReceiveStream());
if (streamReady) {
log.warn("[语音对讲] 进行中: {}", channel.getDeviceId());
event.call("语音对讲进行中");
@ -1634,7 +1646,7 @@ public class PlayServiceImpl implements IPlayService {
String path = "snap";
// 请求截图
log.info("[请求截图]: " + fileName);
mediaServerService.getSnap(mediaServer, "rtp", inviteInfo.getStreamInfo().getStream(), 15, 1, path, fileName);
mediaServerService.getSnap(mediaServer, MediaApp.GB28181, inviteInfo.getStreamInfo().getStream(), 15, 1, path, fileName);
File snapFile = new File(path + File.separator + fileName);
if (snapFile.exists()) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), snapFile.getAbsoluteFile());
@ -1676,7 +1688,7 @@ public class PlayServiceImpl implements IPlayService {
if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
try {
log.info("[停止点播/回放/下载] {}/{}", device.getDeviceId(), channel.getDeviceId());
cmder.streamByeCmd(device, channel.getDeviceId(), "rtp", inviteInfo.getStream(), null, null);
cmder.streamByeCmd(device, channel.getDeviceId(), MediaApp.GB28181, inviteInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.error("[命令发送失败] 停止点播/回放/下载, 发送BYE: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
@ -1709,7 +1721,7 @@ public class PlayServiceImpl implements IPlayService {
if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
try {
log.info("[停止点播/回放/下载] {}/{}", device.getDeviceId(), channel.getDeviceId());
cmder.streamByeCmd(device, channel.getDeviceId(), "rtp", inviteInfo.getStream(), null, null);
cmder.streamByeCmd(device, channel.getDeviceId(), MediaApp.GB28181, inviteInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.warn("[命令发送失败] 停止点播/回放/下载, 发送BYE: {}", e.getMessage());
}

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
@ -505,7 +506,7 @@ public class SIPCommander implements ISIPCommander {
}
log.info("[语音喊话] {} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), sendRtpItem.getPort());
Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServerItem.getId());
Hook hook = Hook.getInstance(HookType.on_media_arrival, MediaApp.GB28181, stream, mediaServerItem.getId());
subscribe.addSubscribe(hook, (hookData) -> {
if (event != null) {
event.response(hookData);
@ -515,7 +516,7 @@ public class SIPCommander implements ISIPCommander {
CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport());
callIdHeader.setCallId(callId);
Hook publishHook = Hook.getInstance(HookType.on_publish, "rtp", stream, mediaServerItem.getId());
Hook publishHook = Hook.getInstance(HookType.on_publish, MediaApp.GB28181, stream, mediaServerItem.getId());
subscribe.addSubscribe(publishHook, (hookData) -> {
if (eventForPush != null) {
eventForPush.response(hookData);
@ -1388,7 +1389,7 @@ public class SIPCommander implements ISIPCommander {
@Override
public void playbackControlCmd(Device device, DeviceChannel channel, String stream, String content, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws SipException, InvalidArgumentException, ParseException {
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream("rtp", stream);
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(MediaApp.GB28181, stream);
if (ssrcTransaction == null) {
log.info("[回放控制]未找到视频流信息,设备:{}, 流ID: {}", device.getDeviceId(), stream);
return;

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
@ -641,7 +642,7 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem != null) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream());
mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
}
SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem, channel);
if (byeRequest == null) {
@ -664,7 +665,7 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
}
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getApp(), ssrcTransaction.getStream());
sessionManager.removeByStream(ssrcTransaction.getApp(), ssrcTransaction.getStream());
Request byteRequest = headerProviderPlatformProvider.createByteRequest(platform, channel.getGbDeviceId(), ssrcTransaction.getSipTransactionInfo());
@ -705,7 +706,7 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
}
log.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServerItem.getId());
Hook hook = Hook.getInstance(HookType.on_media_arrival, MediaApp.GB28181, stream, mediaServerItem.getId());
subscribe.addSubscribe(hook, (hookData) -> {
if (event != null) {
event.response(hookData);

View File

@ -228,7 +228,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
deviceChannelService.stopPlay(channel.getId());
inviteStreamService.removeInviteInfo(inviteInfo);
if (inviteInfo.getStreamInfo() != null) {
mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getStreamInfo().getStream());
mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream());
}
}
break;

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.service.*;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
@ -98,7 +99,7 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
playService.stop(inviteInfo);
}
// 去除监听流注销自动停止下载的监听
Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcTransaction.getStream(), ssrcTransaction.getMediaServerId());
Hook hook = Hook.getInstance(HookType.on_media_arrival, MediaApp.GB28181, ssrcTransaction.getStream(), ssrcTransaction.getMediaServerId());
subscribe.removeSubscribe(hook);
if (ssrcTransaction.getPlatformId() != null) {
// 如果级联播放需要给上级发送此通知 TODO 多个上级同时观看一个下级 可能存在停错的问题需要将点播CallId进行上下级绑定

View File

@ -3,14 +3,13 @@ package com.genersoft.iot.vmp.jt1078.service.impl;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.ftpServer.FtpSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.jt1078.bean.*;
import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template;
import com.genersoft.iot.vmp.jt1078.event.FtpUploadEvent;
import com.genersoft.iot.vmp.jt1078.proc.request.J1205;
import com.genersoft.iot.vmp.jt1078.proc.response.*;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078PlayService;
@ -26,11 +25,12 @@ import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IReceiveRtpServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.RTPServerParam;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.MediaServerUtils;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@ -74,6 +74,9 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private IReceiveRtpServerService receiveRtpServerService;
@Autowired
private DynamicTask dynamicTask;
@ -197,7 +200,6 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
private void play(JTDevice device, JTChannel channel, int type, CommonCallback<WVPResult<StreamInfo>> callback) {
String phoneNumber = device.getPhoneNumber();
int channelId = channel.getChannelId();
String app = "1078";
String stream = phoneNumber + "_" + channelId;
// 检查流是否已经存在存在则返回
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId;
@ -208,7 +210,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
MediaServer mediaServer = streamInfo.getMediaServer();
if (mediaServer != null) {
// 查询流是否存在不存在则删除缓存数据
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, app, streamInfo.getStream());
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, MediaApp.JT1078, streamInfo.getStream());
if (mediaInfo != null) {
log.info("[JT-点播] 点播已经存在,直接返回, phoneNumber {} channelId {}", phoneNumber, channelId);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
@ -233,48 +235,56 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
}
return;
}
// 设置hook监听
Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServer.getId());
subscribe.addSubscribe(hook, (hookData) -> {
dynamicTask.stop(playKey);
log.info("[JT-点播] 点播成功, 手机号: {} 通道: {}", phoneNumber, channelId);
// TODO 发送9105 实时音视频传输状态通知 通知丢包率
StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
if (errorCallback == null) {
continue;
}
errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info));
}
subscribe.removeSubscribe(hook);
redisTemplate.opsForValue().set(playKey, info);
// 截图
String path = "snap";
String fileName = phoneNumber + "_" + channelId + ".jpg";
// 请求截图
log.info("[请求截图]: " + fileName);
mediaServerService.getSnap(mediaServer, app, stream, 15, 1, path, fileName);
});
// 开启收流端口
SSRCInfo ssrcInfo = mediaServerService.openJTTServer(mediaServer, stream, null, false, !channel.isHasAudio(), 1);
RTPServerParam rtpServerParam = new RTPServerParam();
rtpServerParam.setMediaServer(mediaServer);
rtpServerParam.setApp(MediaApp.JT1078);
rtpServerParam.setStreamId(stream);
rtpServerParam.setSsrcCheck(false);
rtpServerParam.setPlayback(false);
rtpServerParam.setPort(0);
rtpServerParam.setTcpMode(1); // 1 表示tcp被动
rtpServerParam.setOnlyAuto(false);
rtpServerParam.setDisableAudio(!channel.isHasAudio());
SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, result) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) {
// hook响应
log.info("[JT-点播] 点播成功, 手机号: {} 通道: {}", phoneNumber, channelId);
// TODO 发送9105 实时音视频传输状态通知 通知丢包率
StreamInfo info = onPublishHandler(mediaServer, result.getHookData(), phoneNumber, channelId);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
if (errorCallback == null) {
continue;
}
errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info));
}
redisTemplate.opsForValue().set(playKey, info);
// 截图
String path = "snap";
String fileName = phoneNumber + "_" + channelId + ".jpg";
// 请求截图
log.info("[请求截图]: " + fileName);
mediaServerService.getSnap(mediaServer, MediaApp.JT1078, stream, 15, 1, path, fileName);
}else {
if (callback != null) {
callback.run(WVPResult.fail(code, msg));
}
log.info("[JT-点播] 超时, phoneNumber {} channelId {}", phoneNumber, channelId);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null));
}
stopPlay(phoneNumber, channelId);
}
});
if (ssrcInfo == null) {
stopPlay(phoneNumber, channelId);
return;
}
// 设置超时监听
dynamicTask.startDelay(playKey, () -> {
log.info("[JT-点播] 超时, phoneNumber {} channelId {}", phoneNumber, channelId);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null));
}
mediaServerService.closeJTTServer(mediaServer, stream, null);
subscribe.removeSubscribe(hook);
stopPlay(phoneNumber, channelId);
}, userSetting.getPlayTimeout());
log.info("[JT-点播] phoneNumber {} channelId {}IP: {}, 端口: {}", phoneNumber, channelId, mediaServer.getSdpIp(), ssrcInfo.getPort());
J9101 j9101 = new J9101();
j9101.setChannel(channelId);
@ -287,7 +297,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
}
public StreamInfo onPublishHandler(MediaServer mediaServerItem, HookData hookData, String phoneNumber, Integer channelId) {
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, "1078", hookData.getStream(), hookData.getMediaInfo(), null);
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, MediaApp.JT1078, hookData.getStream(), hookData.getMediaInfo(), null);
streamInfo.setDeviceId(phoneNumber);
streamInfo.setChannelId(channelId);
return streamInfo;
@ -317,7 +327,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
// 删除缓存数据
if (streamInfo != null) {
// 关闭rtpServer
mediaServerService.closeJTTServer(streamInfo.getMediaServer(), streamInfo.getStream(), null);
receiveRtpServerService.closeRTPServer(streamInfo.getMediaServer(), new SSRCInfo(streamInfo.getApp(), streamInfo.getStream()));
redisTemplate.delete(playKey);
}
@ -425,12 +435,12 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playbackKey);
if (streamInfo != null) {
mediaServerService.closeJTTServer(streamInfo.getMediaServer(), streamInfo.getStream(), null);
mediaServerService.closeJTTServer(streamInfo.getMediaServer(), streamInfo.getStream());
// 清理数据
redisTemplate.delete(playbackKey);
}
String app = "1078";
String app = MediaApp.JT1078;
String stream = String.format("%s_%s_%s_%s", phoneNumber, channelId,
DateUtil.yyyy_MM_dd_HH_mm_ssToUrl(startTime), DateUtil.yyyy_MM_dd_HH_mm_ssToUrl(endTime));
MediaServer mediaServer;
@ -615,7 +625,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
sendRtpInfo.setReceiveStream(stream + "_talk");
// 设置hook监听
Hook hook = Hook.getInstance(HookType.on_media_arrival, "1078", sendRtpInfo.getReceiveStream(), mediaServer.getId());
Hook hook = Hook.getInstance(HookType.on_media_arrival, MediaApp.JT1078, sendRtpInfo.getReceiveStream(), mediaServer.getId());
subscribe.addSubscribe(hook, (hookData) -> {
log.info("[JT-对讲] 对讲连接建立, phoneNumber {} channelId {}", phoneNumber, channelId);
subscribe.removeSubscribe(hook);

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.media.abl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
@ -194,14 +195,14 @@ public class ABLHttpHookListener {
logger.info("[ABL HOOK] 码流不到达通知:{}->{}/{}", param.getMediaServerId(), param.getApp(), param.getStream());
try {
if ("rtp".equals(param.getApp())) {
if (MediaApp.GB28181.equals(param.getApp())) {
return HookResult.SUCCESS();
}
MediaRtpServerTimeoutEvent event = new MediaRtpServerTimeoutEvent(this);
MediaServer mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
if (mediaServerItem != null) {
event.setMediaServer(mediaServerItem);
event.setApp("rtp");
event.setApp(MediaApp.GB28181);
applicationEventPublisher.publishEvent(event);
}
}catch (Exception e) {

View File

@ -5,6 +5,7 @@ import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
@ -68,40 +69,40 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
}
@Override
public int createRTPServer(MediaServer mediaServer, String stream, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) {
public int createRTPServer(MediaServer mediaServer, String app, String stream, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) {
Boolean recordSip = userSetting.getRecordSip();
return ablresTfulUtils.openRtpServer(mediaServer, "rtp", stream, 96, port, tcpMode, disableAudio?1:0, recordSip, false);
return ablresTfulUtils.openRtpServer(mediaServer, app, stream, 96, port, tcpMode, disableAudio?1:0, recordSip, false);
}
@Override
public void closeRtpServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
public void closeRtpServer(MediaServer mediaServer, String app, String stream, CommonCallback<Boolean> callback) {
if (mediaServer == null) {
return;
}
ABLResult result = ablresTfulUtils.closeStreams(mediaServer, "rtp", streamId);
ABLResult result = ablresTfulUtils.closeStreams(mediaServer, app, stream);
logger.info("关闭RTP Server " + result);
if (result.getCode() != 0) {
logger.error("[closeRtpServer] 失败: {}", result.getMemo());
}
}
@Override
public int createJTTServer(MediaServer mediaServer, String stream, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode) {
Boolean recordSip = userSetting.getRecordSip();
return ablresTfulUtils.openRtpServer(mediaServer, "1078", stream, 96, port, tcpMode, disableAudio?1:0, recordSip, true);
}
@Override
public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
if (mediaServer == null) {
return;
}
ABLResult result = ablresTfulUtils.closeStreams(mediaServer, "1078", streamId);
logger.info("关闭JT-RTP Server " + result);
if (result.getCode() != 0) {
logger.error("[JT-closeRtpServer] 失败: {}", result.getMemo());
}
}
// @Override
// public int createJTTServer(MediaServer mediaServer, String stream, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode) {
// Boolean recordSip = userSetting.getRecordSip();
// return ablresTfulUtils.openRtpServer(mediaServer, MediaApp.JT1078, stream, 96, port, tcpMode, disableAudio?1:0, recordSip, true);
// }
//
// @Override
// public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
// if (mediaServer == null) {
// return;
// }
// ABLResult result = ablresTfulUtils.closeStreams(mediaServer, MediaApp.JT1078, streamId);
// logger.info("关闭JT-RTP Server " + result);
// if (result.getCode() != 0) {
// logger.error("[JT-closeRtpServer] 失败: {}", result.getMemo());
// }
// }
@Override
public void closeStreams(MediaServer mediaServer, String app, String streamId) {
@ -112,7 +113,7 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
}
@Override
public Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String streamId, String ssrc) {
public Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String app, String streamId, String ssrc) {
return null;
}
@ -256,7 +257,7 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
}
@Override
public Boolean connectRtpServer(MediaServer mediaServerItem, String address, int port, String stream) {
public Boolean connectRtpServer(MediaServer mediaServerItem, String address, int port, String app, String stream) {
logger.warn("[abl-connectRtpServer] 未实现");
return null;
}
@ -443,7 +444,7 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
@Override
public List<String> listRtpServer(MediaServer mediaServer) {
ABLResult ablResult = ablresTfulUtils.getMediaList(mediaServer, "rtp", null);
ABLResult ablResult = ablresTfulUtils.getMediaList(mediaServer, MediaApp.GB28181, null);
if (ablResult.getCode() != 0) {
return null;
}

View File

@ -15,19 +15,19 @@ import java.util.List;
import java.util.Map;
public interface IMediaNodeServerService {
int createRTPServer(MediaServer mediaServer, String streamId, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode);
int createRTPServer(MediaServer mediaServer, String app, String stream, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode);
void closeRtpServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback);
void closeRtpServer(MediaServer mediaServer, String app, String stream, CommonCallback<Boolean> callback);
int createJTTServer(MediaServer mediaServer, String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode);
void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback);
// int createJTTServer(MediaServer mediaServer, String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode);
//
// void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback);
void closeStreams(MediaServer mediaServer, String app, String stream);
Boolean updateRtpServerSSRC(MediaServer mediaServer, String stream, String ssrc);
Boolean updateRtpServerSSRC(MediaServer mediaServer, String app, String stream, String ssrc);
boolean checkNodeId(MediaServer mediaServer);
@ -43,7 +43,7 @@ public interface IMediaNodeServerService {
List<StreamInfo> getMediaList(MediaServer mediaServer, String app, String stream, String callId);
Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String stream);
Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String app, String stream);
void getSnap(MediaServer mediaServer, String app, String stream, int timeoutSec, int expireSec, String path, String fileName);

View File

@ -34,20 +34,20 @@ public interface IMediaServerService {
void updateVmServer(List<MediaServer> mediaServerItemList);
SSRCInfo openRTPServer(MediaServer mediaServerItem, String streamId, String presetSsrc, boolean ssrcCheck,
SSRCInfo openRTPServer(MediaServer mediaServerItem, String app, String streamId, String presetSsrc, boolean ssrcCheck,
boolean isPlayback, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode);
void closeRTPServer(MediaServer mediaServerItem, String streamId);
void closeRTPServer(MediaServer mediaServerItem, String app, String streamId);
void closeRTPServer(MediaServer mediaServerItem, String streamId, CommonCallback<Boolean> callback);
void closeRTPServer(MediaServer mediaServerItem, String app, String streamId, CommonCallback<Boolean> callback);
SSRCInfo openJTTServer(MediaServer mediaServerItem, String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode);
// SSRCInfo openJTTServer(MediaServer mediaServerItem, String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode);
//
// void closeJTTServer(MediaServer mediaServerItem, String streamId, CommonCallback<Boolean> callback);
void closeJTTServer(MediaServer mediaServerItem, String streamId, CommonCallback<Boolean> callback);
Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String app, String streamId, String ssrc);
Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String streamId, String ssrc);
void closeRTPServer(String mediaServerId, String streamId);
void closeRTPServer(String mediaServerId, String app, String streamId);
void clearRTPServer(MediaServer mediaServerItem);
@ -89,7 +89,7 @@ public interface IMediaServerService {
List<StreamInfo> getMediaList(MediaServer mediaInfo, String app, String stream, String callId);
Boolean connectRtpServer(MediaServer mediaServerItem, String address, int port, String stream);
Boolean connectRtpServer(MediaServer mediaServerItem, String address, int port, String app, String stream);
void getSnap(MediaServer mediaServer, String app, String stream, int timeoutSec, int expireSec, String path, String fileName);
@ -162,7 +162,7 @@ public interface IMediaServerService {
StreamInfo getMediaByAppAndStream(String app, String stream);
int createRTPServer(MediaServer mediaServerItem, String streamId, long ssrc, Integer port, boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode);
int createRTPServer(MediaServer mediaServerItem, String app, String streamId, long ssrc, Integer port, boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode);
List<String> listRtpServer(MediaServer mediaServer);

View File

@ -164,7 +164,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
@Override
public SSRCInfo openRTPServer(MediaServer mediaServer, String streamId, String presetSsrc, boolean ssrcCheck,
public SSRCInfo openRTPServer(MediaServer mediaServer, String app, String streamId, String presetSsrc, boolean ssrcCheck,
boolean isPlayback, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) {
if (mediaServer == null || mediaServer.getId() == null) {
log.info("[openRTPServer] 失败, mediaServer == null || mediaServer.getId() == null");
@ -196,15 +196,15 @@ public class MediaServerServiceImpl implements IMediaServerService {
log.info("[openRTPServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return null;
}
rtpServerPort = mediaNodeServerService.createRTPServer(mediaServer, streamId, ssrcCheck ? Long.parseLong(ssrc) : 0, port, onlyAuto, disableAudio, reUsePort, tcpMode);
rtpServerPort = mediaNodeServerService.createRTPServer(mediaServer, app, streamId, ssrcCheck ? Long.parseLong(ssrc) : 0, port, onlyAuto, disableAudio, reUsePort, tcpMode);
} else {
rtpServerPort = mediaServer.getRtpProxyPort();
}
return new SSRCInfo(rtpServerPort, ssrc, "rtp", streamId, null);
return new SSRCInfo(rtpServerPort, ssrc, app, streamId);
}
@Override
public int createRTPServer(MediaServer mediaServer, String streamId, long ssrc, Integer port, boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode) {
public int createRTPServer(MediaServer mediaServer, String app, String streamId, long ssrc, Integer port, boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode) {
int rtpServerPort;
if (mediaServer.isRtpEnable()) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
@ -212,33 +212,33 @@ public class MediaServerServiceImpl implements IMediaServerService {
log.info("[openRTPServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return 0;
}
rtpServerPort = mediaNodeServerService.createRTPServer(mediaServer, streamId, ssrc, port, onlyAuto, disableAudio, reUsePort, tcpMode);
rtpServerPort = mediaNodeServerService.createRTPServer(mediaServer, app, streamId, ssrc, port, onlyAuto, disableAudio, reUsePort, tcpMode);
} else {
rtpServerPort = mediaServer.getRtpProxyPort();
}
return rtpServerPort;
}
@Override
public SSRCInfo openJTTServer(MediaServer mediaServer, @NotNull String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode) {
if (mediaServer == null || mediaServer.getId() == null) {
log.info("[openJTTServer] 失败, mediaServer == null || mediaServer.getId() == null");
return null;
}
int rtpServerPort;
if (mediaServer.isRtpEnable()) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[openJTTServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return null;
}
rtpServerPort = mediaNodeServerService.createJTTServer(mediaServer, streamId, port, disableVideo, disableAudio, tcpMode);
} else {
rtpServerPort = mediaServer.getJttProxyPort();
}
return new SSRCInfo(rtpServerPort, null, "1078", streamId, null);
}
// @Override
// public SSRCInfo openJTTServer(MediaServer mediaServer, @NotNull String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode) {
// if (mediaServer == null || mediaServer.getId() == null) {
// log.info("[openJTTServer] 失败, mediaServer == null || mediaServer.getId() == null");
// return null;
// }
//
// int rtpServerPort;
// if (mediaServer.isRtpEnable()) {
// IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
// if (mediaNodeServerService == null) {
// log.info("[openJTTServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
// return null;
// }
// rtpServerPort = mediaNodeServerService.createJTTServer(mediaServer, streamId, port, disableVideo, disableAudio, tcpMode);
// } else {
// rtpServerPort = mediaServer.getJttProxyPort();
// }
// return new SSRCInfo(rtpServerPort, null, "1078", streamId, null);
// }
@Override
public List<String> listRtpServer(MediaServer mediaServer) {
@ -251,7 +251,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
@Override
public void closeRTPServer(MediaServer mediaServer, String streamId) {
public void closeRTPServer(MediaServer mediaServer, String app, String streamId) {
if (mediaServer == null) {
return;
}
@ -260,11 +260,11 @@ public class MediaServerServiceImpl implements IMediaServerService {
log.info("[closeRTPServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return;
}
mediaNodeServerService.closeRtpServer(mediaServer, streamId, null);
mediaNodeServerService.closeRtpServer(mediaServer, app, streamId, null);
}
@Override
public void closeRTPServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
public void closeRTPServer(MediaServer mediaServer, String app, String streamId, CommonCallback<Boolean> callback) {
if (mediaServer == null) {
callback.run(false);
return;
@ -274,42 +274,42 @@ public class MediaServerServiceImpl implements IMediaServerService {
log.info("[closeRTPServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return;
}
mediaNodeServerService.closeRtpServer(mediaServer, streamId, callback);
mediaNodeServerService.closeRtpServer(mediaServer, app, streamId, callback);
}
@Override
public void closeRTPServer(String mediaServerId, String streamId) {
public void closeRTPServer(String mediaServerId, String app, String streamId) {
MediaServer mediaServer = this.getOne(mediaServerId);
if (mediaServer == null) {
return;
}
if (mediaServer.isRtpEnable()) {
closeRTPServer(mediaServer, streamId);
closeRTPServer(mediaServer, app, streamId);
}
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[closeRTPServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return;
}
mediaNodeServerService.closeStreams(mediaServer, "rtp", streamId);
mediaNodeServerService.closeStreams(mediaServer, app, streamId);
}
@Override
public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
if (mediaServer == null) {
callback.run(false);
return;
}
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[closeJTTServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return;
}
mediaNodeServerService.closeJTTServer(mediaServer, streamId, callback);
}
// @Override
// public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
// if (mediaServer == null) {
// callback.run(false);
// return;
// }
// IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
// if (mediaNodeServerService == null) {
// log.info("[closeJTTServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
// return;
// }
// mediaNodeServerService.closeJTTServer(mediaServer, streamId, callback);
// }
@Override
public Boolean updateRtpServerSSRC(MediaServer mediaServer, String streamId, String ssrc) {
public Boolean updateRtpServerSSRC(MediaServer mediaServer, String app, String streamId, String ssrc) {
if (mediaServer == null) {
return false;
}
@ -318,7 +318,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
log.info("[updateRtpServerSSRC] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return false;
}
return mediaNodeServerService.updateRtpServerSSRC(mediaServer, streamId, ssrc);
return mediaNodeServerService.updateRtpServerSSRC(mediaServer, app, streamId, ssrc);
}
@Override
@ -716,13 +716,13 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
@Override
public Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String stream) {
public Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String app, String stream) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[connectRtpServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return false;
}
return mediaNodeServerService.connectRtpServer(mediaServer, address, port, stream);
return mediaNodeServerService.connectRtpServer(mediaServer, address, port, app, stream);
}
@Override

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.media.bean.MediaServer;
@ -262,7 +263,7 @@ public class ZLMHttpHookListener {
log.info("[ZLM HOOK] rtp发送关闭{}->{}/{}", param.getMediaServerId(), param.getApp(), param.getStream());
// 查找对应的上级推流发送停止
if (!"rtp".equals(param.getApp())) {
if (!MediaApp.GB28181.equals(param.getApp())) {
return HookResult.SUCCESS();
}
try {
@ -293,7 +294,7 @@ public class ZLMHttpHookListener {
MediaServer mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
if (mediaServerItem != null) {
event.setMediaServer(mediaServerItem);
event.setApp("rtp");
event.setApp(MediaApp.GB28181);
applicationEventPublisher.publishEvent(event);
}
}catch (Exception e) {

View File

@ -47,24 +47,24 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
private HookSubscribe subscribe;
@Override
public int createRTPServer(MediaServer mediaServer, String streamId, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) {
return zlmServerFactory.createRTPServer(mediaServer, "rtp", streamId, ssrc, port, onlyAuto, disableAudio, reUsePort, tcpMode);
public int createRTPServer(MediaServer mediaServer, String app, String stream, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) {
return zlmServerFactory.createRTPServer(mediaServer, app, stream, ssrc, port, onlyAuto, disableAudio, reUsePort, tcpMode);
}
@Override
public void closeRtpServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
zlmServerFactory.closeRtpServer(mediaServer, streamId, callback);
public void closeRtpServer(MediaServer mediaServer, String app, String stream, CommonCallback<Boolean> callback) {
zlmServerFactory.closeRtpServer(mediaServer, app, stream, callback);
}
@Override
public int createJTTServer(MediaServer mediaServer, String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode) {
return zlmServerFactory.createRTPServer(mediaServer, "1078", streamId, 0, port, disableVideo, disableAudio, false, tcpMode);
}
// @Override
// public int createJTTServer(MediaServer mediaServer, String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode) {
// return zlmServerFactory.createRTPServer(mediaServer, "1078", streamId, 0, port, disableVideo, disableAudio, false, tcpMode);
// }
@Override
public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
zlmServerFactory.closeRtpServer(mediaServer, streamId, callback);
}
// @Override
// public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
// zlmServerFactory.closeRtpServer(mediaServer, streamId, callback);
// }
@Override
public void closeStreams(MediaServer mediaServer, String app, String stream) {
@ -72,8 +72,8 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
}
@Override
public Boolean updateRtpServerSSRC(MediaServer mediaServer, String streamId, String ssrc) {
return zlmServerFactory.updateRtpServerSSRC(mediaServer, streamId, ssrc);
public Boolean updateRtpServerSSRC(MediaServer mediaServer, String app, String streamId, String ssrc) {
return zlmServerFactory.updateRtpServerSSRC(mediaServer, app, streamId, ssrc);
}
@Override
@ -208,8 +208,8 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
}
@Override
public Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String stream) {
ZLMResult<?> zlmResult = zlmresTfulUtils.connectRtpServer(mediaServer, address, port, stream);
public Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String app, String stream) {
ZLMResult<?> zlmResult = zlmresTfulUtils.connectRtpServer(mediaServer, address, port, app, stream);
log.info("[TCP主动连接对方] 结果: {}", zlmResult);
return zlmResult.getCode() == 0;
}

View File

@ -694,10 +694,11 @@ public class ZLMRESTfulUtils {
}
}
public ZLMResult<?> connectRtpServer(MediaServer mediaServer, String dst_url, int dst_port, String stream_id) {
public ZLMResult<?> connectRtpServer(MediaServer mediaServer, String dst_url, int dst_port, String app, String stream_id) {
Map<String, Object> param = new HashMap<>(1);
param.put("dst_url", dst_url);
param.put("dst_port", dst_port);
param.put("app", app);
param.put("stream_id", stream_id);
String response = sendPost(mediaServer, "connectRtpServer", param, null);
if (response == null) {
@ -712,9 +713,10 @@ public class ZLMRESTfulUtils {
}
}
public ZLMResult<?> updateRtpServerSSRC(MediaServer mediaServer, String streamId, String ssrc) {
public ZLMResult<?> updateRtpServerSSRC(MediaServer mediaServer, String app, String streamId, String ssrc) {
Map<String, Object> param = new HashMap<>(1);
param.put("ssrc", ssrc);
param.put("app", app);
param.put("stream_id", streamId);
String response = sendPost(mediaServer, "updateRtpServerSSRC", param, null);

View File

@ -98,10 +98,11 @@ public class ZLMServerFactory {
return result;
}
public boolean closeRtpServer(MediaServer serverItem, String streamId) {
public boolean closeRtpServer(MediaServer serverItem, String app, String streamId) {
boolean result = false;
if (serverItem !=null){
Map<String, Object> param = new HashMap<>();
param.put("app", app);
param.put("stream_id", streamId);
ZLMResult<?> zlmResult = zlmresTfulUtils.closeRtpServer(serverItem, param);
if (zlmResult != null ) {
@ -118,7 +119,7 @@ public class ZLMServerFactory {
return result;
}
public void closeRtpServer(MediaServer serverItem, String streamId, CommonCallback<Boolean> callback) {
public void closeRtpServer(MediaServer serverItem, String app, String streamId, CommonCallback<Boolean> callback) {
if (serverItem == null) {
if (callback != null) {
callback.run(false);
@ -126,6 +127,7 @@ public class ZLMServerFactory {
return;
}
Map<String, Object> param = new HashMap<>();
param.put("app", app);
param.put("stream_id", streamId);
zlmresTfulUtils.closeRtpServer(serverItem, param, zlmResult -> {
if (zlmResult.getCode() == 0) {
@ -223,9 +225,9 @@ public class ZLMServerFactory {
return zlmResult;
}
public Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String streamId, String ssrc) {
public Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String app, String streamId, String ssrc) {
boolean result = false;
ZLMResult<?> zlmResult = zlmresTfulUtils.updateRtpServerSSRC(mediaServerItem, streamId, ssrc);
ZLMResult<?> zlmResult = zlmresTfulUtils.updateRtpServerSSRC(mediaServerItem, app, streamId, ssrc);
if (zlmResult.getCode() == 0) {
result= true;
log.info("[更新RTPServer] 成功");

View File

@ -1,13 +1,16 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.OpenRTPServerResult;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.HookData;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.RTPServerParam;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
public interface IReceiveRtpServerService {
SSRCInfo openRTPServer(RTPServerParam rtpServerParam, ErrorCallback<OpenRTPServerResult> callback);
void closeRTPServer(MediaServer mediaServer, SSRCInfo ssrcInfo);
int openRTPServer(RTPServerParam rtpServerParam, ErrorCallback<HookData> callback);
void closeRTPServer(MediaServer mediaServer, String app, String stream);
}

View File

@ -2,15 +2,25 @@ package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Data
@Getter
@Setter
@NoArgsConstructor
public class RTPServerParam {
private MediaServer mediaServerItem;
/**
* 使用的流媒体
*/
private MediaServer mediaServer;
private String app;
private String streamId;
private String presetSsrc;
private boolean ssrcCheck;
private boolean playback;
/**
* 开启rtpServer时使用的ssrc开启rtpServer时会根据这个ssrc进行校验如果不填则不校验
*/
private Long ssrc;
private Integer port;
private boolean onlyAuto;
private boolean disableAudio;
@ -21,5 +31,16 @@ public class RTPServerParam {
*/
private Integer tcpMode;
public RTPServerParam(MediaServer mediaServer, String app, String streamId, Long ssrc, Integer port,
boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode) {
this.mediaServer = mediaServer;
this.app = app;
this.streamId = streamId;
this.ssrc = ssrc;
this.port = port;
this.onlyAuto = onlyAuto;
this.disableAudio = disableAudio;
this.reUsePort = reUsePort;
this.tcpMode = tcpMode;
}
}

View File

@ -9,13 +9,12 @@ public class SSRCInfo {
private String ssrc;
private String app;
private String Stream;
private String timeOutTaskKey;
public SSRCInfo(int port, String ssrc, String app, String stream, String timeOutTaskKey) {
public SSRCInfo(int port, String ssrc, String app, String stream) {
this.port = port;
this.ssrc = ssrc;
this.app = app;
this.Stream = stream;
this.timeOutTaskKey = timeOutTaskKey;
}
}

View File

@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionStatus;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
@ -82,7 +83,7 @@ public class MediaServiceImpl implements IMediaService {
if (app == null || stream == null) {
return false;
}
if ("rtp".equals(app)) {
if (MediaApp.GB28181.equals(app)) {
return true;
}
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream);
@ -95,7 +96,7 @@ public class MediaServiceImpl implements IMediaService {
@Override
public ResultForOnPublish authenticatePublish(MediaServer mediaServer, String app, String stream, String params) {
// 推流鉴权的处理
if (!"rtp".equals(app) && !"1078".equals(app) ) {
if (!MediaApp.GB28181.equals(app) && !MediaApp.JT1078.equals(app) ) {
if ("talk".equals(app) && stream.endsWith("_talk")) {
ResultForOnPublish result = new ResultForOnPublish();
result.setEnable_mp4(false);
@ -156,7 +157,7 @@ public class MediaServiceImpl implements IMediaService {
result.setEnable_audio(true);
// 国标流
if ("rtp".equals(app)) {
if (MediaApp.GB28181.equals(app)) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream);
@ -223,7 +224,7 @@ public class MediaServiceImpl implements IMediaService {
}else {
result.setEnable_mp4(userSetting.getRecordPushLive());
}
if (app.equalsIgnoreCase("rtp")) {
if (app.equalsIgnoreCase(MediaApp.GB28181)) {
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + stream;
OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo) redisTemplate.opsForValue().get(receiveKey);
@ -243,7 +244,7 @@ public class MediaServiceImpl implements IMediaService {
return false;
}
// 国标类型的流
if ("rtp".equals(app)) {
if (MediaApp.GB28181.equals(app)) {
result = userSetting.getStreamOnDemand();
// 国标流 点播/录像回放/录像下载
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream);
@ -260,7 +261,7 @@ public class MediaServiceImpl implements IMediaService {
}
return result;
}
}else if ("1078".equals(app)) {
}else if (MediaApp.JT1078.equals(app)) {
// 判断是否是1078播放类型
JTMediaStreamType jtMediaStreamType = ijt1078Service.checkStreamFromJt(stream);
if (jtMediaStreamType != null) {

View File

@ -1,12 +1,16 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.OpenRTPServerResult;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookData;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
@ -23,12 +27,15 @@ import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.Objects;
import java.util.UUID;
@Slf4j
@Service
public class RtpServerServiceImpl implements IReceiveRtpServerService {
private final static String TIMEOUT_TASK_KEY_PREFIX = "RTP_SERVER_TIMEOUT_TASK";
@Autowired
private IMediaServerService mediaServerService;
@ -65,98 +72,111 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
}
@Override
public SSRCInfo openRTPServer(RTPServerParam rtpServerParam, ErrorCallback<OpenRTPServerResult> callback) {
public SSRCInfo openGbRTPServer(MediaServer mediaServer, String streamId, String presetSSRC, int tcpMode,
boolean playback, boolean ssrcCheck, boolean onlyAuto, boolean disableAuto, boolean reUsePort,
ErrorCallback<OpenRTPServerResult> callback ) {
if (callback == null) {
log.warn("[开启RTP收流] 失败回调为NULL");
log.warn("[开启国标RTP收流] 失败回调为NULL");
return null;
}
if (rtpServerParam.getMediaServerItem() == null) {
log.warn("[开启RTP收流] 失败媒体节点为NULL");
if (mediaServer == null) {
log.warn("[开启国标RTP收流] 失败媒体节点为NULL");
return null;
}
// 获取mediaServer可用的ssrc
final String ssrc;
if (rtpServerParam.getPresetSsrc() != null) {
ssrc = rtpServerParam.getPresetSsrc();
if (presetSSRC != null) {
ssrc = presetSSRC;
}else {
if (rtpServerParam.isPlayback()) {
ssrc = ssrcFactory.getPlayBackSsrc(rtpServerParam.getMediaServerItem().getId());
if (playback) {
ssrc = ssrcFactory.getPlayBackSsrc(mediaServer.getId());
}else {
ssrc = ssrcFactory.getPlaySsrc(rtpServerParam.getMediaServerItem().getId());
ssrc = ssrcFactory.getPlaySsrc(mediaServer.getId());
}
}
final String streamId;
if (rtpServerParam.getStreamId() == null) {
if (streamId == null) {
streamId = String.format("%08x", Long.parseLong(ssrc)).toUpperCase();
}else {
streamId = rtpServerParam.getStreamId();
}
if (rtpServerParam.isSsrcCheck() && rtpServerParam.getTcpMode() > 0) {
if (ssrcCheck && tcpMode > 0) {
// 目前zlm不支持 tcp模式更新ssrc暂时关闭ssrc校验
log.warn("[openRTPServer] 平台对接时下级可能自定义ssrc但是tcp模式zlm收流目前无法更新ssrc可能收流超时此时请使用udp收流或者关闭ssrc校验");
}
int rtpServerPort;
if (rtpServerParam.getMediaServerItem().isRtpEnable()) {
rtpServerPort = mediaServerService.createRTPServer(rtpServerParam.getMediaServerItem(), streamId,
rtpServerParam.isSsrcCheck() ? Long.parseLong(ssrc) : 0, rtpServerParam.getPort(), rtpServerParam.isOnlyAuto(),
rtpServerParam.isDisableAudio(), rtpServerParam.isReUsePort(), rtpServerParam.getTcpMode());
} else {
rtpServerPort = rtpServerParam.getMediaServerItem().getRtpProxyPort();
}
if (rtpServerPort == 0) {
callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "开启RTPServer失败", null);
// 释放ssrc
if (rtpServerParam.getPresetSsrc() == null) {
ssrcFactory.releaseSsrc(rtpServerParam.getMediaServerItem().getId(), ssrc);
SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaApp.GB28181, streamId);
RTPServerParam rtpServerParam = new RTPServerParam(mediaServer, MediaApp.GB28181, streamId, ssrcCheck ? Long.parseLong(ssrc): 0L, null, onlyAuto, disableAuto, reUsePort, tcpMode);
int rtpServerPort = openRTPServer(rtpServerParam, ((code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult();
openRTPServerResult.setHookData(data);
openRTPServerResult.setSsrcInfo(ssrcInfo);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), openRTPServerResult);
} else {
// 释放ssrc
if (presetSSRC == null) {
ssrcFactory.releaseSsrc(mediaServer.getId(), ssrc);
}
callback.run(code, msg, null);
}
return null;
}));
ssrcInfo.setPort(rtpServerPort);
return new SSRCInfo(rtpServerPort, ssrc, MediaApp.GB28181, streamId);
}
@Override
public int openRTPServer(RTPServerParam rtpServerParam, ErrorCallback<HookData> callback) {
if (callback == null) {
log.warn("[开启RTP收流] 失败回调为NULL");
return -1;
}
if (rtpServerParam.getMediaServer() == null) {
log.warn("[开启RTP收流] 失败媒体节点为NULL");
return -1;
}
// 设置流超时的定时任务
String timeOutTaskKey = UUID.randomUUID().toString();
String timeOutTaskKey = String.format("%s_%s_%s_%s", TIMEOUT_TASK_KEY_PREFIX, rtpServerParam.getMediaServer().getId(), rtpServerParam.getApp(), rtpServerParam.getStreamId());
SSRCInfo ssrcInfo = new SSRCInfo(rtpServerPort, ssrc, "rtp", streamId, timeOutTaskKey);
OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult();
openRTPServerResult.setSsrcInfo(ssrcInfo);
Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, ssrcInfo.getApp(), streamId, rtpServerParam.getMediaServerItem().getId());
Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, rtpServerParam.getApp(), rtpServerParam.getStreamId(), rtpServerParam.getMediaServer().getId());
dynamicTask.startDelay(timeOutTaskKey, () -> {
// 收流超时
// 释放ssrc
if (rtpServerParam.getPresetSsrc() == null) {
ssrcFactory.releaseSsrc(rtpServerParam.getMediaServerItem().getId(), ssrc);
}
// 关闭收流端口
mediaServerService.closeRTPServer(rtpServerParam.getMediaServerItem(), streamId);
mediaServerService.closeRTPServer(rtpServerParam.getMediaServer(), rtpServerParam.getApp(), rtpServerParam.getStreamId());
subscribe.removeSubscribe(rtpHook);
callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), openRTPServerResult);
callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
}, userSetting.getPlayTimeout());
// 开启流到来的监听
subscribe.addSubscribe(rtpHook, (hookData) -> {
dynamicTask.stop(timeOutTaskKey);
// hook响应
openRTPServerResult.setHookData(hookData);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), openRTPServerResult);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), hookData);
subscribe.removeSubscribe(rtpHook);
});
return ssrcInfo;
int rtpServerPort;
if (rtpServerParam.getMediaServer().isRtpEnable()) {
rtpServerPort = mediaServerService.createRTPServer(rtpServerParam.getMediaServer(), rtpServerParam.getApp(), rtpServerParam.getStreamId(),
Objects.requireNonNullElse(rtpServerParam.getSsrc(), 0L), rtpServerParam.getPort(), rtpServerParam.isOnlyAuto(),
rtpServerParam.isDisableAudio(), rtpServerParam.isReUsePort(), rtpServerParam.getTcpMode());
} else {
rtpServerPort = rtpServerParam.getMediaServer().getRtpProxyPort();
}
if (rtpServerPort == 0) {
callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "开启RTPServer失败", null);
return -1;
}
return rtpServerPort;
}
@Override
public void closeRTPServer(MediaServer mediaServer, SSRCInfo ssrcInfo) {
public void closeRTPServer(MediaServer mediaServer, String app, String stream) {
if (mediaServer == null) {
return;
}
if (ssrcInfo.getTimeOutTaskKey() != null) {
dynamicTask.stop(ssrcInfo.getTimeOutTaskKey());
String timeOutTaskKey = String.format("%s_%s_%s_%s", TIMEOUT_TASK_KEY_PREFIX, mediaServer.getId(), app, stream);
if (dynamicTask.contains(timeOutTaskKey)) {
dynamicTask.stop(timeOutTaskKey);
}
if (ssrcInfo.getSsrc() != null) {
// 释放ssrc
ssrcFactory.releaseSsrc(mediaServer.getId(), ssrcInfo.getSsrc());
}
mediaServerService.closeRTPServer(mediaServer, ssrcInfo.getStream());
mediaServerService.closeRTPServer(mediaServer, app, stream);
}
}

View File

@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.streamProxy.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.enums.ChannelDataType;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
@ -103,7 +104,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Async("taskExecutor")
@EventListener
public void onApplicationEvent(MediaNotFoundEvent event) {
if ("rtp".equals(event.getApp())) {
if (MediaApp.GB28181.equals(event.getApp())) {
return;
}
// 拉流代理

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.vmanager.ps;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
@ -92,14 +93,14 @@ public class PsController {
}
}
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + callId + "_" + stream;
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, ssrcInt + "", false, false, null, false, false, false, tcpMode);
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, MediaApp.GB28181, stream, ssrcInt + "", false, false, null, false, false, false, tcpMode);
if (ssrcInfo.getPort() == 0) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败");
}
// 注册回调如果rtp收流超时则通过回调发送通知
if (callBack != null) {
Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServer.getId());
Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, MediaApp.GB28181, stream, mediaServer.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.addSubscribe(hook,
(hookData)->{
@ -149,7 +150,7 @@ public class PsController {
public void closeRtpServer(String stream) {
log.info("[第三方PS服务对接->关闭收流] stream->{}", stream);
MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
mediaServerService.closeRTPServer(mediaServerItem, stream);
mediaServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream);
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_*_" + stream;
List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey);
if (!scan.isEmpty()) {

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.vmanager.rtp;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
@ -92,14 +93,14 @@ public class RtpController {
}
}
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + callId + "_" + stream;
SSRCInfo ssrcInfoForVideo = mediaServerService.openRTPServer(mediaServer, stream, ssrcInt + "",false,false, null, false, false, false, tcpMode);
SSRCInfo ssrcInfoForAudio = mediaServerService.openRTPServer(mediaServer, stream + "_a", ssrcInt + "", false, false, null, false,false,false, tcpMode);
SSRCInfo ssrcInfoForVideo = mediaServerService.openRTPServer(mediaServer, MediaApp.GB28181, stream, ssrcInt + "",false,false, null, false, false, false, tcpMode);
SSRCInfo ssrcInfoForAudio = mediaServerService.openRTPServer(mediaServer, MediaApp.GB28181,stream + "_a", ssrcInt + "", false, false, null, false,false,false, tcpMode);
if (ssrcInfoForVideo.getPort() == 0 || ssrcInfoForAudio.getPort() == 0) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败");
}
// 注册回调如果rtp收流超时则通过回调发送通知
if (callBack != null) {
Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServer.getId());
Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, MediaApp.GB28181, stream, mediaServer.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.addSubscribe(hook,
(hookData)->{
@ -152,8 +153,8 @@ public class RtpController {
public void closeRtpServer(String stream) {
log.info("[第三方服务对接->关闭收流] stream->{}", stream);
MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
mediaServerService.closeRTPServer(mediaServerItem, stream);
mediaServerService.closeRTPServer(mediaServerItem, stream+ "_a");
mediaServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream);
mediaServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream+ "_a");
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_*_" + stream;
List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey);
if (scan.size() > 0) {

View File

@ -4,6 +4,7 @@ import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.Device;
@ -246,7 +247,7 @@ public class ApiStreamController {
}
try {
cmder.streamByeCmd(device, code, "rtp", inviteInfo.getStream(), null, null);
cmder.streamByeCmd(device, code, MediaApp.GB28181, inviteInfo.getStream(), null, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
JSONObject result = new JSONObject();
result.put("error","发送BYE失败" + e.getMessage());

View File

@ -2,4 +2,4 @@ spring:
application:
name: wvp
profiles:
active: 274-dev
active: dev