Merge branch 'master' into dev/压力测试

# Conflicts:
#	src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java
#	src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java
This commit is contained in:
lin 2026-03-30 11:23:47 +08:00
commit 0eebcceefb
45 changed files with 686 additions and 614 deletions

View File

@ -160,10 +160,6 @@ https://gitee.com/pan648540858/wvp-GB28181-pro.git
# 付费社群
<img src="doc/_media/shequ.png" width="50%" height="50%">
> 加入三天内不满意可以直接自行推出,星球会直接退款给大家。需要发票可以在星球app中直接咨询星球客服获取。
> 星球还提供了包括闭源的全功能试用包, 会随时更新。
> 付费社群即可以对作者提供支持,也可以为大家更加快速的解决问题。如果暂时无法加入,给项目点个星也是极大的鼓励。

View File

@ -33,7 +33,7 @@ WVP-PRO使用Spring boot开发maven管理依赖。对于熟悉spring开发的
ubuntu环境以ubuntu 18为例
``` bash
apt-get install -y openjdk-21-jre git maven nodejs npm
apt-get install -y openjdk-21-jdk git maven nodejs npm
```
window环境以windows10为例

View File

@ -1,4 +1,4 @@
#! /bin/sh
#! /bin/bash
WORD_DIR=$(cd $(dirname $0); pwd)
SERVICE_NAME="wvp"
@ -13,7 +13,7 @@ if [ "$(id -u)" -ne 0 ]; then
echo
fi
# 当前目录直接搜索(不含子目录)
# 当前目录直接搜索(不含子目录) bugfix 这里使用了需要bash才可以支持的内容
jar_files=(*.jar)
if [ ${#jar_files[@]} -eq 0 ]; then

View File

@ -442,7 +442,7 @@
<path>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
<version>1.18.38</version>
</path>
</annotationProcessorPaths>
</configuration>

View File

@ -0,0 +1,12 @@
package com.genersoft.iot.vmp.common.enums;
public class MediaApp {
public final static String GB28181 = "rtp";
public final static String GB28181_TALK = "talk";
public final static String GB28181_BROADCAST = "broadcast";
public final static String JT1078 = "1078";
public static boolean isKeywords(String app) {
return GB28181.equals(app) || GB28181_TALK.equals(app) || GB28181_BROADCAST.equals(app) || JT1078.equals(app);
}
}

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.common.enums.ChannelDataType;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.bean.RequestPushStreamMsg;
import lombok.Data;
@ -199,7 +200,7 @@ public class SendRtpInfo {
sendRtpItem.setChannelId(channelId);
sendRtpItem.setTcp(isTcp);
sendRtpItem.setRtcp(rtcp);
sendRtpItem.setApp("rtp");
sendRtpItem.setApp(MediaApp.GB28181);
sendRtpItem.setLocalPort(localPort);
sendRtpItem.setServerId(serverId);
sendRtpItem.setMediaServerId(mediaServer.getId());

View File

@ -0,0 +1,53 @@
package com.genersoft.iot.vmp.gb28181.bean;
import lombok.Data;
@Data
public class TalkRtpInfo {
/**
* 应用名, 待推送给设备的流应用名
*/
private String app;
/**
* 流id, 待推送给设备的流id
*/
private String stream;
/**
* rtp推流出去的ssrc
*/
private String ssrc;
/**
* 对方rtp推流上来的流id
*/
private String receiveStreamId;
/**
* 是否推送本地MP4录像该参数非必选参数
*/
private Integer fromMp4;
/**
* 类型 0(ES流)1(PS流)2(TS流)默认1(PS流)该参数非必选参数
*/
private Integer type;
/**
* rtp payload type默认96该参数非必选参数
*/
private Integer pt;
/**
* rtp es方式打包时是否只打包音频该参数非必选参数
*/
private Integer onlyAudio;
/**
* 转发rtp(tcp模式)如果发送不出去是否限制源端收流速度此参数在多倍速rtp转发时作用较大
*/
private Integer enableOriginReceiveLimit;
}

View File

@ -27,6 +27,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd.CatalogResponseMessageHandler;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IReceiveRtpServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
@ -108,6 +109,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private IReceiveRtpServerService receiveRtpServerService;
@Autowired
private AudioBroadcastManager audioBroadcastManager;
@ -259,7 +263,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) {
for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
receiveRtpServerService.closeRTPServerByMediaServerId(ssrcTransaction.getMediaServerId(), ssrcTransaction.getApp(), ssrcTransaction.getStream());
sessionManager.removeByCallId(ssrcTransaction.getCallId());
}
}

View File

@ -145,6 +145,7 @@ public class GbChannelPlayServiceImpl implements IGbChannelPlayService {
throw new PlayException(Response.BUSY_HERE, "channel not support");
}
sourceChannelPlayService.stopPlay(channel);
channelMapper.updateStream(channel.getGbId(), null);
}
@Override

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.service.impl;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.common.*;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
@ -49,7 +50,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
@Async
@EventListener
public void onApplicationEvent(MediaDepartureEvent event) {
if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
if ("rtsp".equals(event.getSchema()) && MediaApp.GB28181.equals(event.getApp())) {
InviteInfo inviteInfo = getInviteInfoByStream(null, event.getStream());
if (inviteInfo != null && (inviteInfo.getType() == InviteSessionType.PLAY || inviteInfo.getType() == InviteSessionType.PLAYBACK)) {
removeInviteInfo(inviteInfo);

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.gb28181.service.impl;
import com.genersoft.iot.vmp.common.*;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
@ -28,6 +29,7 @@ import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IReceiveRtpServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.bean.*;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
@ -110,6 +112,9 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
@Autowired
private ISendRtpServerService sendRtpServerService;
@Autowired
private IReceiveRtpServerService receiveRtpServerService;
@Autowired
private PlatformStatusTaskRunner statusTaskRunner;
@ -618,7 +623,37 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
} else {
tcpMode = 0;
}
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServerItem, streamId, null, ssrcCheck, false, null, true, false, false, tcpMode);
SSRCInfo ssrcInfo = receiveRtpServerService.openGbRTPServer(mediaServerItem, streamId, null, tcpMode,
false, ssrcCheck, true, false, ((code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && data != null && data.getHookData() != null) {
log.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channel.getGbDeviceId());
HookData hookData = data.getHookData();
// hook响应
onPublishHandlerForBroadcast(hookData.getMediaServer(), hookData.getMediaInfo(), platform, channel);
// 收到流
if (hookEvent != null) {
hookEvent.response(hookData);
}
}else {
InviteInfo inviteInfoForBroadcast = inviteStreamService.getInviteInfo(InviteSessionType.BROADCAST, channel.getGbId(), null);
if (inviteInfoForBroadcast == null) {
log.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {}", platform.getServerGBId(), channel.getGbDeviceId());
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
try {
commanderForPlatform.streamByeCmd(platform, channel, data.getSsrcInfo().getApp(), data.getSsrcInfo().getStream(), null, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
log.error("[点播超时] 发送BYE失败 {}", e.getMessage());
} finally {
timeoutCallback.run(1, "收流超时");
mediaServerService.releaseSsrc(mediaServerItem.getId(), data.getSsrcInfo().getSsrc());
receiveRtpServerService.closeRTPServer(mediaServerItem, data.getSsrcInfo().getApp(), data.getSsrcInfo().getStream());
sessionManager.removeByStream(data.getSsrcInfo().getApp(), data.getSsrcInfo().getStream());
}
}
}
}));
if (ssrcInfo == null || ssrcInfo.getPort() < 0) {
log.info("[国标级联] 发起语音喊话 开启端口监听失败, platform: {}, channel {}", platform.getServerGBId(), channel.getGbDeviceId());
SipSubscribe.EventResult<Object> eventResult = new SipSubscribe.EventResult<>();
@ -636,37 +671,8 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), InviteSessionType.BROADCAST,
InviteSessionStatus.ready, userSetting.getRecordSip());
inviteStreamService.updateInviteInfo(inviteInfo);
String timeOutTaskKey = UUID.randomUUID().toString();
dynamicTask.startDelay(timeOutTaskKey, () -> {
// 执行超时任务时查询是否已经成功成功了则不执行超时任务防止超时任务取消失败的情况
InviteInfo inviteInfoForBroadcast = inviteStreamService.getInviteInfo(InviteSessionType.BROADCAST, channel.getGbId(), null);
if (inviteInfoForBroadcast == null) {
log.info("[国标级联] 发起语音喊话 收流超时 deviceId: {}, channelId: {},端口:{}, SSRC: {}", platform.getServerGBId(), channel.getGbDeviceId(), ssrcInfo.getPort(), ssrcInfo.getSsrc());
// 点播超时回复BYE 同时释放ssrc以及此次点播的资源
try {
commanderForPlatform.streamByeCmd(platform, channel, ssrcInfo.getApp(), ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
log.error("[点播超时] 发送BYE失败 {}", e.getMessage());
} finally {
timeoutCallback.run(1, "收流超时");
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
}
}
}, userSetting.getPlayTimeout());
commanderForPlatform.broadcastInviteCmd(platform, channel,sourceId, mediaServerItem, ssrcInfo, (hookData)->{
log.info("[国标级联] 发起语音喊话 收到上级推流 deviceId: {}, channelId: {}", platform.getServerGBId(), channel.getGbDeviceId());
dynamicTask.stop(timeOutTaskKey);
// hook响应
onPublishHandlerForBroadcast(hookData.getMediaServer(), hookData.getMediaInfo(), platform, channel);
// 收到流
if (hookEvent != null) {
hookEvent.response(hookData);
}
}, event -> {
inviteOKHandler(event, ssrcInfo, tcpMode, ssrcCheck, mediaServerItem, platform, channel, timeOutTaskKey,
commanderForPlatform.broadcastInviteCmd(platform, channel,sourceId, mediaServerItem, ssrcInfo, event -> {
inviteOKHandler(event, ssrcInfo, tcpMode, ssrcCheck, mediaServerItem, platform, channel,
null, inviteInfo, InviteSessionType.BROADCAST);
}, eventResult -> {
// 收到错误回复
@ -689,7 +695,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
}
private void inviteOKHandler(SipSubscribe.EventResult eventResult, SSRCInfo ssrcInfo, int tcpMode, boolean ssrcCheck, MediaServer mediaServerItem,
Platform platform, CommonGBChannel channel, String timeOutTaskKey, ErrorCallback<Object> callback,
Platform platform, CommonGBChannel channel, ErrorCallback<Object> callback,
InviteInfo inviteInfo, InviteSessionType inviteSessionType){
inviteInfo.setStatus(InviteSessionStatus.ok);
ResponseEvent responseEvent = (ResponseEvent) eventResult.event;
@ -705,7 +711,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
// 多端口
if (tcpMode == 2) {
tcpActiveHandler(platform, channel, contentString, mediaServerItem, tcpMode, ssrcCheck,
timeOutTaskKey, ssrcInfo, callback);
ssrcInfo, callback);
}
}else {
// 单端口
@ -724,27 +730,25 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
log.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse);
if (!result) {
try {
log.warn("[Invite 200OK] 更新ssrc失败停止喊话 {}/{}", platform.getServerGBId(), channel.getGbDeviceId());
commanderForPlatform.streamByeCmd(platform, channel, ssrcInfo.getApp(), ssrcInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.error("[命令发送失败] 停止播放, 发送BYE: {}", e.getMessage());
} finally {
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
inviteStreamService.call(inviteSessionType, channel.getGbId(), null,
InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
}
dynamicTask.stop(timeOutTaskKey);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
inviteStreamService.call(inviteSessionType, channel.getGbId(), null,
InviteErrorCode.ERROR_FOR_RESET_SSRC.getCode(),
"下级自定义了ssrc,重新设置收流信息失败", null);
}else {
ssrcInfo.setSsrc(ssrcInResponse);
inviteInfo.setSsrcInfo(ssrcInfo);
@ -752,7 +756,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
if (tcpMode == 2) {
if (mediaServerItem.isRtpEnable()) {
tcpActiveHandler(platform, channel, contentString, mediaServerItem, tcpMode, ssrcCheck,
timeOutTaskKey, ssrcInfo, callback);
ssrcInfo, callback);
}else {
log.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
}
@ -766,7 +770,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
if (tcpMode == 2) {
if (mediaServerItem.isRtpEnable()) {
tcpActiveHandler(platform, channel, contentString, mediaServerItem, tcpMode, ssrcCheck,
timeOutTaskKey, ssrcInfo, callback);
ssrcInfo, callback);
}else {
log.warn("[Invite 200OK] 单端口收流模式不支持tcp主动模式收流");
}
@ -799,7 +803,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
private void tcpActiveHandler(Platform platform, CommonGBChannel channel, String contentString,
MediaServer mediaServerItem, int tcpMode, boolean ssrcCheck,
String timeOutTaskKey, SSRCInfo ssrcInfo, ErrorCallback<Object> callback){
SSRCInfo ssrcInfo, ErrorCallback<Object> callback){
if (tcpMode != 2) {
return;
}
@ -826,15 +830,13 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
}
log.info("[TCP主动连接对方] serverGbId: {}, channelId: {}, 连接对方的地址:{}:{}, SSRC: {}, SSRC校验{}",
platform.getServerGBId(), channel.getGbDeviceId(), sdp.getConnection().getAddress(), port, ssrcInfo.getSsrc(), ssrcCheck);
Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getApp(), ssrcInfo.getStream());
log.info("[TCP主动连接对方] 结果: {}", result);
} catch (SdpException e) {
log.error("[TCP主动连接对方] serverGbId: {}, channelId: {}, 解析200OK的SDP信息失败", platform.getServerGBId(), channel.getGbDeviceId(), e);
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, ssrcInfo.getStream());
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_SDP_PARSING_EXCEPTIONS.getCode(),
@ -855,7 +857,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.warn("[消息发送失败] 停止语音对讲, 平台:{},通道:{}", platform.getId(), channel.getGbDeviceId() );
} finally {
mediaServerService.closeRTPServer(mediaServerItem, stream);
receiveRtpServerService.closeRTPServer(mediaServerItem, app, stream);
InviteInfo inviteInfo = inviteStreamService.getInviteInfo(null, channel.getGbId(), stream);
if (inviteInfo != null) {
// 释放ssrc

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.gb28181.service.impl;
import com.genersoft.iot.vmp.common.*;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
@ -132,7 +133,7 @@ public class PlayServiceImpl implements IPlayService {
@Async
@EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
if ("broadcast".equals(event.getApp()) || "talk".equals(event.getApp())) {
if (MediaApp.GB28181_BROADCAST.equals(event.getApp()) || MediaApp.GB28181_TALK.equals(event.getApp())) {
if (event.getStream().indexOf("_") > 0) {
String[] streamArray = event.getStream().split("_");
if (streamArray.length == 2) {
@ -148,24 +149,20 @@ public class PlayServiceImpl implements IPlayService {
log.info("[语音对讲/喊话] 未找到通道:{}", channelId);
return;
}
if ("broadcast".equals(event.getApp())) {
if (MediaApp.GB28181_BROADCAST.equals(event.getApp())) {
if (audioBroadcastManager.exit(channel.getId())) {
stopAudioBroadcast(device, channel);
}
// 开启语音对讲通道
try {
audioBroadcastCmd(device, channel, event.getMediaServer(),
event.getApp(), event.getStream(), 60, false, (msg) -> {
log.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
});
event.getApp(), event.getStream(), 60, false, (msg) -> log.info("[语音喊话] 通道建立成功, device: {}, channel: {}", deviceId, channelId));
} catch (InvalidArgumentException | ParseException | SipException e) {
log.error("[命令发送失败] 语音对讲: {}", e.getMessage());
}
}else if ("talk".equals(event.getApp())) {
}else if (MediaApp.GB28181_TALK.equals(event.getApp())) {
// 开启语音对讲通道
talkCmd(device, channel, event.getMediaServer(), event.getStream(), (msg) -> {
log.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
});
talkCmd(device, channel, event.getMediaServer(), event.getStream(), (msg) -> log.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId));
}
}
}
@ -208,7 +205,7 @@ public class PlayServiceImpl implements IPlayService {
}
}
if ("broadcast".equals(event.getApp()) || "talk".equals(event.getApp())) {
if (MediaApp.GB28181_BROADCAST.equals(event.getApp()) || MediaApp.GB28181_TALK.equals(event.getApp())) {
if (event.getStream().indexOf("_") > 0) {
String[] streamArray = event.getStream().split("_");
if (streamArray.length == 2) {
@ -224,14 +221,14 @@ public class PlayServiceImpl implements IPlayService {
log.info("[语音对讲/喊话] 未找到通道:{}", channelId);
return;
}
if ("broadcast".equals(event.getApp())) {
if (MediaApp.GB28181_BROADCAST.equals(event.getApp())) {
stopAudioBroadcast(device, channel);
}else if ("talk".equals(event.getApp())) {
}else if (MediaApp.GB28181_TALK.equals(event.getApp())) {
stopTalk(device, channel, false);
}
}
}
}else if ("rtp".equals(event.getApp())) {
}else if (MediaApp.GB28181.equals(event.getApp())) {
// 释放ssrc
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, event.getStream());
if (inviteInfo != null && inviteInfo.getStatus() == InviteSessionStatus.ok
@ -249,7 +246,7 @@ public class PlayServiceImpl implements IPlayService {
@Async
@EventListener
public void onApplicationEvent(MediaNotFoundEvent event) {
if (!"rtp".equals(event.getApp())) {
if (!MediaApp.GB28181.equals(event.getApp())) {
return;
}
String[] s = event.getStream().split("_");
@ -324,9 +321,9 @@ public class PlayServiceImpl implements IPlayService {
return play(mediaServerItem, device, channel, ssrc, userSetting.getRecordSip(), callback);
}
private SSRCInfo play(MediaServer mediaServerItem, Device device, DeviceChannel channel, String ssrc, Boolean record,
private SSRCInfo play(MediaServer mediaServer, Device device, DeviceChannel channel, String ssrc, Boolean record,
ErrorCallback<StreamInfo> callback) {
if (mediaServerItem == null ) {
if (mediaServer == null ) {
if (callback != null) {
callback.run(InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getCode(),
InviteErrorCode.ERROR_FOR_PARAMETER_ERROR.getMsg(),
@ -340,7 +337,7 @@ public class PlayServiceImpl implements IPlayService {
if (inviteInfoInCatch.getStreamInfo() == null) {
// 释放生成的ssrc使用上一次申请的
ssrcFactory.releaseSsrc(mediaServerItem.getId(), ssrc);
ssrcFactory.releaseSsrc(mediaServer.getId(), ssrc);
// 点播发起了但是尚未成功, 仅注册回调等待结果即可
inviteStreamService.once(InviteSessionType.PLAY, channel.getId(), null, callback);
log.info("[点播开始] 已经请求中,等待结果, deviceId: {}, channelId({}): {}", device.getDeviceId(), channel.getDeviceId(), channel.getId());
@ -357,7 +354,7 @@ public class PlayServiceImpl implements IPlayService {
return inviteInfoInCatch.getSsrcInfo();
}
MediaServer mediaInfo = streamInfo.getMediaServer();
Boolean ready = mediaServerService.isStreamReady(mediaInfo, "rtp", streamId);
Boolean ready = mediaServerService.isStreamReady(mediaInfo, MediaApp.GB28181, streamId);
if (ready != null && ready) {
if(callback != null) {
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
@ -379,21 +376,12 @@ public class PlayServiceImpl implements IPlayService {
String streamId = String.format("%s_%s", device.getDeviceId(), channel.getDeviceId());
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
RTPServerParam rtpServerParam = new RTPServerParam();
rtpServerParam.setMediaServerItem(mediaServerItem);
rtpServerParam.setStreamId(streamId);
rtpServerParam.setPresetSsrc(ssrc);
rtpServerParam.setSsrcCheck(device.isSsrcCheck());
rtpServerParam.setPlayback(false);
rtpServerParam.setPort(0);
rtpServerParam.setTcpMode(tcpMode);
rtpServerParam.setOnlyAuto(false);
rtpServerParam.setDisableAudio(!channel.isHasAudio());
SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, result) -> {
SSRCInfo ssrcInfo = receiveRtpServerService.openGbRTPServer(mediaServer, streamId, ssrc, tcpMode, false,
device.isSsrcCheck(), false, !channel.isHasAudio(), (code, msg, result) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) {
// hook响应
// hook 响应
StreamInfo streamInfo = onPublishHandlerForPlay(result.getHookData().getMediaServer(), result.getHookData().getMediaInfo(), device, channel);
if (streamInfo == null){
if (callback != null) {
@ -422,14 +410,14 @@ public class PlayServiceImpl implements IPlayService {
}
inviteStreamService.call(InviteSessionType.PLAY, channel.getId(), null, code, msg, null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAY, channel.getId());
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream("rtp", streamId);
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(MediaApp.GB28181, streamId);
if (ssrcTransaction != null) {
try {
cmder.streamByeCmd(device, channel.getDeviceId(),"rtp", streamId, null, null);
cmder.streamByeCmd(device, channel.getDeviceId(),MediaApp.GB28181, streamId, null, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
log.error("[点播超时] 发送BYE失败 {}", e.getMessage());
} finally {
sessionManager.removeByStream("rtp", streamId);
sessionManager.removeByStream(MediaApp.GB28181, streamId);
}
}
}
@ -448,8 +436,8 @@ public class PlayServiceImpl implements IPlayService {
ssrcInfo.getSsrc(), device.isSsrcCheck());
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServerItem.getId(),
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServer.getId(),
mediaServer.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
InviteSessionStatus.ready, userSetting.getRecordSip());
if (record != null) {
inviteInfo.setRecord(record);
@ -460,12 +448,12 @@ public class PlayServiceImpl implements IPlayService {
inviteStreamService.updateInviteInfo(inviteInfo);
try {
cmder.playStreamCmd(mediaServerItem, ssrcInfo, device, channel, (eventResult) -> {
cmder.playStreamCmd(mediaServer, ssrcInfo, device, channel, (eventResult) -> {
// 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel, callback, inviteInfo, InviteSessionType.PLAY);
InviteOKHandler(eventResult, ssrcInfo, mediaServer, device, channel, callback, inviteInfo, InviteSessionType.PLAY);
}, (event) -> {
log.info("[点播失败]{}:{} deviceId: {}, channelId:{}",event.statusCode, event.msg, device.getDeviceId(), channel.getDeviceId());
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
if (callback != null) {
@ -478,7 +466,7 @@ public class PlayServiceImpl implements IPlayService {
}, userSetting.getPlayTimeout().longValue());
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 点播消息: {}", e.getMessage());
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
if (callback != null) {
callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
@ -495,8 +483,7 @@ public class PlayServiceImpl implements IPlayService {
private void talk(MediaServer mediaServerItem, Device device, DeviceChannel channel, String stream,
HookSubscribe.Event hookEvent, SipSubscribe.Event errorEvent,
Runnable timeoutCallback, AudioBroadcastEvent audioEvent) {
SipSubscribe.Event errorEvent, Runnable timeoutCallback, AudioBroadcastEvent audioEvent) {
String playSsrc = ssrcFactory.getPlaySsrc(mediaServerItem.getId());
@ -506,7 +493,7 @@ public class PlayServiceImpl implements IPlayService {
}
SendRtpInfo sendRtpInfo;
try {
sendRtpInfo = sendRtpServerService.createSendRtpInfo(mediaServerItem, null, null, playSsrc, device.getDeviceId(), "talk", stream,
sendRtpInfo = sendRtpServerService.createSendRtpInfo(mediaServerItem, null, null, playSsrc, device.getDeviceId(), MediaApp.GB28181_TALK, stream,
channel.getId(), true, false);
sendRtpInfo.setPlayType(InviteStreamType.TALK);
}catch (PlayException e) {
@ -595,7 +582,7 @@ public class PlayServiceImpl implements IPlayService {
}, (event) -> {
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getStream());
receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc());
sessionManager.removeByStream(sendRtpInfo.getApp(), sendRtpInfo.getStream());
@ -605,7 +592,7 @@ public class PlayServiceImpl implements IPlayService {
log.error("[命令发送失败] 对讲消息: {}", e.getMessage());
dynamicTask.stop(timeOutTaskKey);
mediaServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getStream());
receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpInfo.getApp(), sendRtpInfo.getStream());
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrc());
@ -647,11 +634,11 @@ public class PlayServiceImpl implements IPlayService {
}
}
log.info("[TCP主动连接对方] deviceId: {}, channelId: {}, 连接对方的地址:{}:{}, 收流模式:{}, SSRC: {}, SSRC校验{}", device.getDeviceId(), channel.getDeviceId(), sdp.getConnection().getAddress(), port, device.getStreamMode(), ssrcInfo.getSsrc(), device.isSsrcCheck());
Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getStream());
Boolean result = mediaServerService.connectRtpServer(mediaServerItem, sdp.getConnection().getAddress(), port, ssrcInfo.getApp(), ssrcInfo.getStream());
log.info("[TCP主动连接对方] 结果: {}" , result);
if (!result) {
// 主动连接失败结束流程 清理数据
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getCode(),
InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getMsg(), null);
@ -661,7 +648,7 @@ public class PlayServiceImpl implements IPlayService {
}
} catch (SdpException e) {
log.error("[TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channel.getDeviceId(), e);
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
@ -686,7 +673,7 @@ public class PlayServiceImpl implements IPlayService {
String fileName = deviceId + "_" + channelId + ".jpg";
// 请求截图
log.info("[请求截图]: " + fileName);
mediaServerService.getSnap(mediaServerItemInuse, "rtp", stream, 15, 1, path, fileName);
mediaServerService.getSnap(mediaServerItemInuse, MediaApp.GB28181, stream, 15, 1, path, fileName);
}
public StreamInfo onPublishHandlerForPlay(MediaServer mediaServerItem, MediaInfo mediaInfo, Device device, DeviceChannel channel) {
@ -764,7 +751,7 @@ public class PlayServiceImpl implements IPlayService {
playBack(newMediaServerItem, device, channel, startTime, endTime, callback);
}
private void playBack(MediaServer mediaServerItem,
private void playBack(MediaServer mediaServer,
Device device, DeviceChannel channel, String startTime,
String endTime, ErrorCallback<StreamInfo> callback) {
@ -778,16 +765,8 @@ public class PlayServiceImpl implements IPlayService {
String stream = device.getDeviceId() + "_" + channel.getDeviceId() + "_" + startTimeStr + "_" + endTimeTimeStr;
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
RTPServerParam rtpServerParam = new RTPServerParam();
rtpServerParam.setMediaServerItem(mediaServerItem);
rtpServerParam.setStreamId(stream);
rtpServerParam.setSsrcCheck(device.isSsrcCheck());
rtpServerParam.setPlayback(true);
rtpServerParam.setPort(0);
rtpServerParam.setTcpMode(tcpMode);
rtpServerParam.setOnlyAuto(false);
rtpServerParam.setDisableAudio(!channel.isHasAudio());
SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, result) -> {
SSRCInfo ssrcInfo = receiveRtpServerService.openGbRTPServer(mediaServer, stream, null, tcpMode, true,
device.isSsrcCheck(), false, !channel.isHasAudio(), (code, msg, result) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) {
// hook响应
StreamInfo streamInfo = onPublishHandlerForPlayback(result.getHookData().getMediaServer(), result.getHookData().getMediaInfo(), device, channel, startTime, endTime);
@ -805,14 +784,14 @@ public class PlayServiceImpl implements IPlayService {
}
inviteStreamService.call(InviteSessionType.PLAYBACK, channel.getId(), null, code, msg, null);
inviteStreamService.removeInviteInfoByDeviceAndChannel(InviteSessionType.PLAYBACK, channel.getId());
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream("rtp", stream);
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(MediaApp.GB28181, stream);
if (ssrcTransaction != null) {
try {
cmder.streamByeCmd(device, channel.getDeviceId(),"rtp", stream, null, null);
cmder.streamByeCmd(device, channel.getDeviceId(),MediaApp.GB28181, stream, null, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
log.error("[录像回放] 发送BYE失败 {}", e.getMessage());
} finally {
sessionManager.removeByStream("rtp", stream);
sessionManager.removeByStream(MediaApp.GB28181, stream);
}
}
}
@ -833,16 +812,16 @@ public class PlayServiceImpl implements IPlayService {
device.getDeviceId(), channel.getGbDeviceId(), startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(),
ssrcInfo.getSsrc(), device.isSsrcCheck());
// 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServerItem.getId(),
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK,
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServer.getId(),
mediaServer.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK,
InviteSessionStatus.ready, userSetting.getRecordSip());
inviteStreamService.updateInviteInfo(inviteInfo);
try {
cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channel, startTime, endTime,
cmder.playbackStreamCmd(mediaServer, ssrcInfo, device, channel, startTime, endTime,
eventResult -> {
// 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel,
InviteOKHandler(eventResult, ssrcInfo, mediaServer, device, channel,
callback, inviteInfo, InviteSessionType.PLAYBACK);
}, eventResult -> {
log.info("[录像回放] 失败,{} {}", eventResult.statusCode, eventResult.msg);
@ -850,7 +829,7 @@ public class PlayServiceImpl implements IPlayService {
callback.run(eventResult.statusCode, eventResult.msg, null);
}
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo);
}, userSetting.getPlayTimeout().longValue());
@ -859,7 +838,7 @@ public class PlayServiceImpl implements IPlayService {
if (callback != null) {
callback.run(InviteErrorCode.FAIL.getCode(), e.getMessage(), null);
}
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo);
receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo);
}
@ -902,7 +881,7 @@ public class PlayServiceImpl implements IPlayService {
log.info("[Invite 200OK] SSRC修正 {}->{}", ssrcInfo.getSsrc(), ssrcInResponse);
// 释放ssrc
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getStream(), ssrcInResponse);
Boolean result = mediaServerService.updateRtpServerSSRC(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream(), ssrcInResponse);
if (!result) {
try {
log.warn("[Invite 200OK] 更新ssrc失败停止点播 {}/{}", device.getDeviceId(), channel.getDeviceId());
@ -940,14 +919,14 @@ public class PlayServiceImpl implements IPlayService {
if (ssrcInResponse != null) {
// 单端口
// 重新订阅流上线
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream("rtp", inviteInfo.getStream());
sessionManager.removeByStream("rtp", inviteInfo.getStream());
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(MediaApp.GB28181, inviteInfo.getStream());
sessionManager.removeByStream(MediaApp.GB28181, inviteInfo.getStream());
inviteStreamService.updateInviteInfoForSSRC(inviteInfo, ssrcInResponse);
ssrcTransaction.setDeviceId(device.getDeviceId());
ssrcTransaction.setChannelId(ssrcTransaction.getChannelId());
ssrcTransaction.setCallId(ssrcTransaction.getCallId());
ssrcTransaction.setSsrc(ssrcInResponse);
ssrcTransaction.setApp("rtp");
ssrcTransaction.setApp(MediaApp.GB28181);
ssrcTransaction.setStream(inviteInfo.getStream());
ssrcTransaction.setMediaServerId(mediaServerItem.getId());
ssrcTransaction.setSipTransactionInfo(new SipTransactionInfo((SIPResponse) responseEvent.getResponse()));
@ -988,16 +967,9 @@ public class PlayServiceImpl implements IPlayService {
}
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
// 录像下载不使用固定流地址固定流地址会导致如果开始时间与结束时间一致时文件错误的叠加在一起
RTPServerParam rtpServerParam = new RTPServerParam();
rtpServerParam.setMediaServerItem(mediaServer);
rtpServerParam.setSsrcCheck(device.isSsrcCheck());
rtpServerParam.setPlayback(true);
rtpServerParam.setPort(0);
rtpServerParam.setTcpMode(tcpMode);
rtpServerParam.setOnlyAuto(false);
rtpServerParam.setDisableAudio(!channel.isHasAudio());
SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, result) -> {
SSRCInfo ssrcInfo = receiveRtpServerService.openGbRTPServer(mediaServer, null, null, tcpMode, false,
device.isSsrcCheck(), false, !channel.isHasAudio(), (code, msg, result) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) {
// hook响应
StreamInfo streamInfo = onPublishHandlerForDownload(mediaServer, result.getHookData().getMediaInfo(), device, channel, startTime, endTime);
@ -1058,7 +1030,7 @@ public class PlayServiceImpl implements IPlayService {
eventResult -> {
// 对方返回错误
callback.run(InviteErrorCode.FAIL.getCode(), String.format("录像下载失败, 错误码: %s, %s", eventResult.statusCode, eventResult.msg), null);
receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo);
receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo);
}, eventResult ->{
@ -1081,7 +1053,7 @@ public class PlayServiceImpl implements IPlayService {
inviteStreamService.updateInviteInfo(inviteInfoForNew, 60*15L);
}
};
Hook hook = Hook.getInstance(HookType.on_record_mp4, "rtp", ssrcInfo.getStream(), mediaServer.getId());
Hook hook = Hook.getInstance(HookType.on_record_mp4, MediaApp.GB28181, ssrcInfo.getStream(), mediaServer.getId());
// 设置过期时间下载失败时自动处理订阅数据
hook.setExpireTime(System.currentTimeMillis() + 24 * 60 * 60 * 1000);
subscribe.addSubscribe(hook, hookEventForRecord);
@ -1089,7 +1061,7 @@ public class PlayServiceImpl implements IPlayService {
} catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 录像下载: {}", e.getMessage());
callback.run(InviteErrorCode.FAIL.getCode(),e.getMessage(), null);
receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo);
receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo);
}
@ -1101,7 +1073,7 @@ public class PlayServiceImpl implements IPlayService {
InviteInfo inviteInfo = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, channel.getId(), stream);
if (inviteInfo == null) {
String app = "rtp";
String app = MediaApp.GB28181;
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream);
if (streamAuthorityInfo != null) {
List<CloudRecordItem> allList = cloudRecordService.getAllList(null, app, stream, null, null, null, streamAuthorityInfo.getCallId(), null);
@ -1143,7 +1115,7 @@ public class PlayServiceImpl implements IPlayService {
log.warn("[获取下载进度] 查询录像信息时发现节点不存在");
return null;
}
String app = "rtp";
String app = MediaApp.GB28181;
Long duration = mediaServerService.updateDownloadProcess(mediaServerItem, app, stream);
if (duration == null || duration == 0) {
inviteInfo.getStreamInfo().setProgress(0);
@ -1186,7 +1158,7 @@ public class PlayServiceImpl implements IPlayService {
public StreamInfo onPublishHandler(MediaServer mediaServerItem, MediaInfo mediaInfo, Device device, DeviceChannel channel) {
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, "rtp", mediaInfo.getStream(), mediaInfo, null);
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, MediaApp.GB28181, mediaInfo.getStream(), mediaInfo, null);
streamInfo.setDeviceId(device.getDeviceId());
streamInfo.setChannelId(channel.getId());
return streamInfo;
@ -1255,7 +1227,7 @@ public class PlayServiceImpl implements IPlayService {
if (broadcastMode == null) {
broadcastMode = true;
}
String app = broadcastMode?"broadcast":"talk";
String app = broadcastMode ? MediaApp.GB28181_BROADCAST : MediaApp.GB28181_TALK;
String stream = device.getDeviceId() + "_" + deviceChannel.getDeviceId();
AudioBroadcastResult audioBroadcastResult = new AudioBroadcastResult();
audioBroadcastResult.setApp(app);
@ -1557,7 +1529,7 @@ public class PlayServiceImpl implements IPlayService {
SendRtpInfo sendRtpInfo = sendRtpServerService.queryByChannelId(channel.getId(), device.getDeviceId());
if (sendRtpInfo != null) {
MediaServer mediaServer = mediaServerService.getOne(sendRtpInfo.getMediaServerId());
Boolean streamReady = mediaServerService.isStreamReady(mediaServer, "rtp", sendRtpInfo.getReceiveStream());
Boolean streamReady = mediaServerService.isStreamReady(mediaServer, MediaApp.GB28181_TALK, sendRtpInfo.getReceiveStream());
if (streamReady) {
log.warn("[语音对讲] 进行中: {}", channel.getDeviceId());
event.call("语音对讲进行中");
@ -1567,9 +1539,7 @@ public class PlayServiceImpl implements IPlayService {
}
}
talk(mediaServerItem, device, channel, stream, (hookData) -> {
log.info("[语音对讲] 收到设备发来的流");
}, eventResult -> {
talk(mediaServerItem, device, channel, stream, eventResult -> {
log.warn("[语音对讲] 失败,{}/{}, 错误码 {} {}", device.getDeviceId(), channel.getDeviceId(), eventResult.statusCode, eventResult.msg);
event.call("失败,错误码 " + eventResult.statusCode + ", " + eventResult.msg);
}, () -> {
@ -1634,7 +1604,7 @@ public class PlayServiceImpl implements IPlayService {
String path = "snap";
// 请求截图
log.info("[请求截图]: " + fileName);
mediaServerService.getSnap(mediaServer, "rtp", inviteInfo.getStreamInfo().getStream(), 15, 1, path, fileName);
mediaServerService.getSnap(mediaServer, MediaApp.GB28181, inviteInfo.getStreamInfo().getStream(), 15, 1, path, fileName);
File snapFile = new File(path + File.separator + fileName);
if (snapFile.exists()) {
errorCallback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), snapFile.getAbsoluteFile());
@ -1676,7 +1646,7 @@ public class PlayServiceImpl implements IPlayService {
if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
try {
log.info("[停止点播/回放/下载] {}/{}", device.getDeviceId(), channel.getDeviceId());
cmder.streamByeCmd(device, channel.getDeviceId(), "rtp", inviteInfo.getStream(), null, null);
cmder.streamByeCmd(device, channel.getDeviceId(), MediaApp.GB28181, inviteInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.error("[命令发送失败] 停止点播/回放/下载, 发送BYE: {}", e.getMessage());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "命令发送失败: " + e.getMessage());
@ -1687,7 +1657,7 @@ public class PlayServiceImpl implements IPlayService {
deviceChannelService.stopPlay(channel.getId());
}
if (inviteInfo.getStreamInfo() != null) {
receiveRtpServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getSsrcInfo());
receiveRtpServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), MediaApp.GB28181, stream);
}
}
}
@ -1709,7 +1679,7 @@ public class PlayServiceImpl implements IPlayService {
if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
try {
log.info("[停止点播/回放/下载] {}/{}", device.getDeviceId(), channel.getDeviceId());
cmder.streamByeCmd(device, channel.getDeviceId(), "rtp", inviteInfo.getStream(), null, null);
cmder.streamByeCmd(device, channel.getDeviceId(), MediaApp.GB28181, inviteInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.warn("[命令发送失败] 停止点播/回放/下载, 发送BYE: {}", e.getMessage());
}
@ -1719,7 +1689,7 @@ public class PlayServiceImpl implements IPlayService {
deviceChannelService.stopPlay(channel.getId());
}
if (inviteInfo.getStreamInfo() != null) {
receiveRtpServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getSsrcInfo());
receiveRtpServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), MediaApp.GB28181, inviteInfo.getStream());
}
}

View File

@ -143,7 +143,7 @@ public class SIPSender {
public CallIdHeader getNewCallIdHeader(String ip, String transport) {
if (ObjectUtils.isEmpty(transport)) {
return sipLayer.getUdpSipProvider().getNewCallId();
return sipLayer.getUdpSipProvider() != null ? sipLayer.getUdpSipProvider().getNewCallId() : sipLayer.getTcpSipProvider().getNewCallId();
}
SipProviderImpl sipProvider;
if (ObjectUtils.isEmpty(ip)) {
@ -155,7 +155,8 @@ public class SIPSender {
}
if (sipProvider == null) {
sipProvider = sipLayer.getUdpSipProvider();
sipProvider = transport.equalsIgnoreCase("TCP") ? sipLayer.getTcpSipProvider()
: sipLayer.getUdpSipProvider();
}
if (sipProvider != null) {

View File

@ -147,8 +147,7 @@ public interface ISIPCommanderForPlatform {
void streamByeCmd(Platform platform, CommonGBChannel channel, String app, String stream, String callId, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException, SsrcTransactionNotFoundException;
void broadcastInviteCmd(Platform platform, CommonGBChannel channel, String sourceId, MediaServer mediaServerItem,
SSRCInfo ssrcInfo, HookSubscribe.Event event, SipSubscribe.Event okEvent,
SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException;
SSRCInfo ssrcInfo, SipSubscribe.Event okEvent, SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException;
void broadcastResultCmd(Platform platform, CommonGBChannel deviceChannel, String sn, boolean result, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws InvalidArgumentException, SipException, ParseException;
}

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
@ -279,7 +280,7 @@ public class SIPCommander implements ISIPCommander {
content.append("y=" + ssrcInfo.getSsrc() + "\r\n");//ssrc
// f字段:f= v/编码格式/分辨率/帧率/码率类型/码率大小a/编码格式/码率大小/采样率
// content.append("f=v/2/5/25/1/4000a/1/8/1" + "\r\n"); // 未发现支持此特性的设备
// content.append("f=v/2/6/25/1/4000a/6/8/1" + "\r\n"); // 未发现支持此特性的设备
Request request = headerProvider.createInviteRequest(device, channel.getDeviceId(), content.toString(), SipUtils.getNewViaTag(), SipUtils.getNewFromTag(), null, ssrcInfo.getSsrc(),sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()),device.getTransport()));
sipSender.transmitRequest(sipLayer.getLocalIp(device.getLocalIp()), request, (e -> {
@ -505,7 +506,7 @@ public class SIPCommander implements ISIPCommander {
}
log.info("[语音喊话] {} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), sendRtpItem.getPort());
Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServerItem.getId());
Hook hook = Hook.getInstance(HookType.on_media_arrival, MediaApp.GB28181, stream, mediaServerItem.getId());
subscribe.addSubscribe(hook, (hookData) -> {
if (event != null) {
event.response(hookData);
@ -515,7 +516,7 @@ public class SIPCommander implements ISIPCommander {
CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(sipLayer.getLocalIp(device.getLocalIp()), device.getTransport());
callIdHeader.setCallId(callId);
Hook publishHook = Hook.getInstance(HookType.on_publish, "rtp", stream, mediaServerItem.getId());
Hook publishHook = Hook.getInstance(HookType.on_publish, MediaApp.GB28181, stream, mediaServerItem.getId());
subscribe.addSubscribe(publishHook, (hookData) -> {
if (eventForPush != null) {
eventForPush.response(hookData);
@ -549,7 +550,7 @@ public class SIPCommander implements ISIPCommander {
// 这里为例避免一个通道的点播只有一个callID这个参数使用一个固定值
ResponseEvent responseEvent = (ResponseEvent) e.event;
SIPResponse response = (SIPResponse) responseEvent.getResponse();
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), "talk",sendRtpItem.getApp(), stream, sendRtpItem.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.TALK);
SsrcTransaction ssrcTransaction = SsrcTransaction.buildForDevice(device.getDeviceId(), channel.getId(), MediaApp.GB28181_TALK,sendRtpItem.getApp(), stream, sendRtpItem.getSsrc(), mediaServerItem.getId(), response, InviteSessionType.TALK);
sessionManager.put(ssrcTransaction);
okEvent.response(e);
}, timeout);
@ -1388,7 +1389,7 @@ public class SIPCommander implements ISIPCommander {
@Override
public void playbackControlCmd(Device device, DeviceChannel channel, String stream, String content, SipSubscribe.Event errorEvent, SipSubscribe.Event okEvent) throws SipException, InvalidArgumentException, ParseException {
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream("rtp", stream);
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(MediaApp.GB28181, stream);
if (ssrcTransaction == null) {
log.info("[回放控制]未找到视频流信息,设备:{}, 流ID: {}", device.getDeviceId(), stream);
return;

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.cmd.impl;
import com.alibaba.fastjson2.JSON;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
@ -18,6 +19,7 @@ import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IReceiveRtpServerService;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@ -60,6 +62,9 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private IReceiveRtpServerService receiveRtpServerService;
@Autowired
private SipLayer sipLayer;
@ -641,7 +646,7 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
MediaServer mediaServerItem = mediaServerService.getOne(mediaServerId);
if (mediaServerItem != null) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpItem.getSsrc());
mediaServerService.closeRTPServer(mediaServerItem, sendRtpItem.getStream());
receiveRtpServerService.closeRTPServer(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream());
}
SIPRequest byeRequest = headerProviderPlatformProvider.createByeRequest(platform, sendRtpItem, channel);
if (byeRequest == null) {
@ -663,8 +668,6 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
throw new SsrcTransactionNotFoundException(platform.getServerGBId(), channel.getGbDeviceId(), callId, stream);
}
mediaServerService.releaseSsrc(ssrcTransaction.getMediaServerId(), ssrcTransaction.getSsrc());
mediaServerService.closeRTPServer(ssrcTransaction.getMediaServerId(), ssrcTransaction.getStream());
sessionManager.removeByStream(ssrcTransaction.getApp(), ssrcTransaction.getStream());
Request byteRequest = headerProviderPlatformProvider.createByteRequest(platform, channel.getGbDeviceId(), ssrcTransaction.getSipTransactionInfo());
@ -696,7 +699,7 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
@Override
public void broadcastInviteCmd(Platform platform, CommonGBChannel channel,String sourceId, MediaServer mediaServerItem,
SSRCInfo ssrcInfo, HookSubscribe.Event event, SipSubscribe.Event okEvent,
SSRCInfo ssrcInfo, SipSubscribe.Event okEvent,
SipSubscribe.Event errorEvent) throws ParseException, SipException, InvalidArgumentException {
String stream = ssrcInfo.getStream();
@ -705,13 +708,6 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
}
log.info("{} 分配的ZLM为: {} [{}:{}]", stream, mediaServerItem.getId(), mediaServerItem.getIp(), ssrcInfo.getPort());
Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", stream, mediaServerItem.getId());
subscribe.addSubscribe(hook, (hookData) -> {
if (event != null) {
event.response(hookData);
subscribe.removeSubscribe(hook);
}
});
String sdpIp = mediaServerItem.getSdpIp();
StringBuffer content = new StringBuffer(200);
@ -752,7 +748,6 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
sipSender.transmitRequest(sipLayer.getLocalIp(platform.getDeviceIp()), request, (e -> {
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
mediaServerService.releaseSsrc(mediaServerItem.getId(), ssrcInfo.getSsrc());
subscribe.removeSubscribe(hook);
errorEvent.response(e);
}), e -> {
ResponseEvent responseEvent = (ResponseEvent) e.event;

View File

@ -13,6 +13,7 @@ import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.util.ObjectUtils;
import javax.sip.*;
@ -30,10 +31,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
/**
* @description:处理接收IPCamera发来的SIP协议请求消息
* @author: songww
* @date: 2020年5月3日 下午4:42:22
* @date: 2020年5月3日 下午4:42:22
*/
@Slf4j
public abstract class SIPRequestProcessorParent {
@ -77,6 +78,11 @@ public abstract class SIPRequestProcessorParent {
return responseAck(sipRequest, statusCode, null);
}
@Async("taskExecutor")
public void responseAckAsync(SIPRequest sipRequest, int statusCode) throws SipException, InvalidArgumentException, ParseException {
responseAck(sipRequest, statusCode, null);
}
public SIPResponse responseAck(SIPRequest sipRequest, int statusCode, String msg) throws SipException, InvalidArgumentException, ParseException {
return responseAck(sipRequest, statusCode, msg, null);
}

View File

@ -15,6 +15,7 @@ import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorP
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IReceiveRtpServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@ -85,6 +86,9 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
@Autowired
private IRedisRpcService redisRpcService;
@Autowired
private IReceiveRtpServerService receiveRtpServerService;
@Override
public void afterPropertiesSet() throws Exception {
@ -228,7 +232,7 @@ public class ByeRequestProcessor extends SIPRequestProcessorParent implements In
deviceChannelService.stopPlay(channel.getId());
inviteStreamService.removeInviteInfo(inviteInfo);
if (inviteInfo.getStreamInfo() != null) {
mediaServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getStreamInfo().getStream());
receiveRtpServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getStreamInfo().getApp(), inviteInfo.getStreamInfo().getStream());
}
}
break;

View File

@ -65,7 +65,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
// 回复200 OK
try {
responseAck((SIPRequest) evt.getRequest(), Response.OK);
responseAckAsync((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 心跳回复: {}", e.getMessage());
}

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.service.*;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
@ -98,7 +99,7 @@ public class MediaStatusNotifyMessageHandler extends SIPRequestProcessorParent i
playService.stop(inviteInfo);
}
// 去除监听流注销自动停止下载的监听
Hook hook = Hook.getInstance(HookType.on_media_arrival, "rtp", ssrcTransaction.getStream(), ssrcTransaction.getMediaServerId());
Hook hook = Hook.getInstance(HookType.on_media_arrival, MediaApp.GB28181, ssrcTransaction.getStream(), ssrcTransaction.getMediaServerId());
subscribe.removeSubscribe(hook);
if (ssrcTransaction.getPlatformId() != null) {
// 如果级联播放需要给上级发送此通知 TODO 多个上级同时观看一个下级 可能存在停错的问题需要将点播CallId进行上下级绑定

View File

@ -3,14 +3,13 @@ package com.genersoft.iot.vmp.jt1078.service.impl;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.ftpServer.FtpSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.jt1078.bean.*;
import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template;
import com.genersoft.iot.vmp.jt1078.event.FtpUploadEvent;
import com.genersoft.iot.vmp.jt1078.proc.request.J1205;
import com.genersoft.iot.vmp.jt1078.proc.response.*;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078PlayService;
@ -26,11 +25,12 @@ import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.event.media.MediaNotFoundEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaSendRtpStoppedEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IReceiveRtpServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.RTPServerParam;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.MediaServerUtils;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@ -74,6 +74,9 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private IReceiveRtpServerService receiveRtpServerService;
@Autowired
private DynamicTask dynamicTask;
@ -197,7 +200,6 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
private void play(JTDevice device, JTChannel channel, int type, CommonCallback<WVPResult<StreamInfo>> callback) {
String phoneNumber = device.getPhoneNumber();
int channelId = channel.getChannelId();
String app = "1078";
String stream = phoneNumber + "_" + channelId;
// 检查流是否已经存在存在则返回
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId;
@ -208,7 +210,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
MediaServer mediaServer = streamInfo.getMediaServer();
if (mediaServer != null) {
// 查询流是否存在不存在则删除缓存数据
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, app, streamInfo.getStream());
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, MediaApp.JT1078, streamInfo.getStream());
if (mediaInfo != null) {
log.info("[JT-点播] 点播已经存在,直接返回, phoneNumber {} channelId {}", phoneNumber, channelId);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
@ -233,61 +235,67 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
}
return;
}
// 设置hook监听
Hook hook = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServer.getId());
subscribe.addSubscribe(hook, (hookData) -> {
dynamicTask.stop(playKey);
log.info("[JT-点播] 点播成功, 手机号: {} 通道: {}", phoneNumber, channelId);
// TODO 发送9105 实时音视频传输状态通知 通知丢包率
StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
if (errorCallback == null) {
continue;
}
errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info));
}
subscribe.removeSubscribe(hook);
redisTemplate.opsForValue().set(playKey, info);
// 截图
String path = "snap";
String fileName = phoneNumber + "_" + channelId + ".jpg";
// 请求截图
log.info("[请求截图]: " + fileName);
mediaServerService.getSnap(mediaServer, app, stream, 15, 1, path, fileName);
});
// 开启收流端口
SSRCInfo ssrcInfo = mediaServerService.openJTTServer(mediaServer, stream, null, false, !channel.isHasAudio(), 1);
if (ssrcInfo == null) {
RTPServerParam rtpServerParam = new RTPServerParam();
rtpServerParam.setMediaServer(mediaServer);
rtpServerParam.setApp(MediaApp.JT1078);
rtpServerParam.setStreamId(stream);
rtpServerParam.setPort(0);
rtpServerParam.setTcpMode(1); // 1 表示tcp被动
rtpServerParam.setOnlyAuto(false);
rtpServerParam.setDisableAudio(!channel.isHasAudio());
int port = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, hookData) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && hookData != null ) {
// hook响应
log.info("[JT-点播] 点播成功, 手机号: {} 通道: {}", phoneNumber, channelId);
// TODO 发送9105 实时音视频传输状态通知 通知丢包率
StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
if (errorCallback == null) {
continue;
}
errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info));
}
redisTemplate.opsForValue().set(playKey, info);
// 截图
String path = "snap";
String fileName = phoneNumber + "_" + channelId + ".jpg";
// 请求截图
log.info("[请求截图]: " + fileName);
mediaServerService.getSnap(mediaServer, MediaApp.JT1078, stream, 15, 1, path, fileName);
}else {
if (callback != null) {
callback.run(WVPResult.fail(code, msg));
}
log.info("[JT-点播] 超时, phoneNumber {} channelId {}", phoneNumber, channelId);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null));
}
stopPlay(phoneNumber, channelId);
}
});
if (port <= 0) {
stopPlay(phoneNumber, channelId);
return;
}
// 设置超时监听
dynamicTask.startDelay(playKey, () -> {
log.info("[JT-点播] 超时, phoneNumber {} channelId {}", phoneNumber, channelId);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null));
}
mediaServerService.closeJTTServer(mediaServer, stream, null);
subscribe.removeSubscribe(hook);
stopPlay(phoneNumber, channelId);
}, userSetting.getPlayTimeout());
log.info("[JT-点播] phoneNumber {} channelId {}IP: {}, 端口: {}", phoneNumber, channelId, mediaServer.getSdpIp(), ssrcInfo.getPort());
log.info("[JT-点播] phoneNumber {} channelId {}IP: {}, 端口: {}", phoneNumber, channelId, mediaServer.getSdpIp(), port);
J9101 j9101 = new J9101();
j9101.setChannel(channelId);
j9101.setIp(mediaServer.getSdpIp());
j9101.setRate(1);
j9101.setTcpPort(ssrcInfo.getPort());
j9101.setUdpPort(ssrcInfo.getPort());
j9101.setTcpPort(port);
j9101.setUdpPort(port);
j9101.setType(type);
jt1078Template.startLive(phoneNumber, j9101, 6);
}
public StreamInfo onPublishHandler(MediaServer mediaServerItem, HookData hookData, String phoneNumber, Integer channelId) {
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, "1078", hookData.getStream(), hookData.getMediaInfo(), null);
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServerItem, MediaApp.JT1078, hookData.getStream(), hookData.getMediaInfo(), null);
streamInfo.setDeviceId(phoneNumber);
streamInfo.setChannelId(channelId);
return streamInfo;
@ -317,7 +325,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
// 删除缓存数据
if (streamInfo != null) {
// 关闭rtpServer
mediaServerService.closeJTTServer(streamInfo.getMediaServer(), streamInfo.getStream(), null);
receiveRtpServerService.closeRTPServer(streamInfo.getMediaServer(), streamInfo.getApp(), streamInfo.getStream());
redisTemplate.delete(playKey);
}
@ -425,12 +433,12 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playbackKey);
if (streamInfo != null) {
mediaServerService.closeJTTServer(streamInfo.getMediaServer(), streamInfo.getStream(), null);
receiveRtpServerService.closeRTPServer(streamInfo.getMediaServer(), streamInfo.getApp(), streamInfo.getStream());
// 清理数据
redisTemplate.delete(playbackKey);
}
String app = "1078";
String app = MediaApp.JT1078;
String stream = String.format("%s_%s_%s_%s", phoneNumber, channelId,
DateUtil.yyyy_MM_dd_HH_mm_ssToUrl(startTime), DateUtil.yyyy_MM_dd_HH_mm_ssToUrl(endTime));
MediaServer mediaServer;
@ -445,36 +453,41 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
}
return;
}
// 设置hook监听
Hook hookSubscribe = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServer.getId());
subscribe.addSubscribe(hookSubscribe, (hookData) -> {
dynamicTask.stop(playbackKey);
log.info("[JT-回放] 回放成功, logInfo {}", logInfo);
StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
if (errorCallback == null) {
continue;
}
errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info));
}
subscribe.removeSubscribe(hookSubscribe);
redisTemplate.opsForValue().set(playbackKey, info);
});
// 设置超时监听
dynamicTask.startDelay(playbackKey, () -> {
log.info("[JT-回放] 回放超时, logInfo {}", logInfo);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null));
}
mediaServerService.closeJTTServer(mediaServer, stream, null);
subscribe.removeSubscribe(hookSubscribe);
}, userSetting.getPlayTimeout());
// 开启收流端口
SSRCInfo ssrcInfo = mediaServerService.openJTTServer(mediaServer, stream, null, false, !channel.isHasAudio(), 1);
log.info("[JT-回放] logInfo {} 端口: {}", logInfo, ssrcInfo.getPort());
RTPServerParam rtpServerParam = new RTPServerParam();
rtpServerParam.setMediaServer(mediaServer);
rtpServerParam.setApp(MediaApp.JT1078);
rtpServerParam.setStreamId(stream);
rtpServerParam.setPort(0);
rtpServerParam.setTcpMode(1); // 1 表示tcp被动
rtpServerParam.setOnlyAuto(false);
rtpServerParam.setDisableAudio(!channel.isHasAudio());
int port = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, hookData) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && hookData != null ) {
// hook 响应
log.info("[JT-回放] 回放成功, logInfo {}", logInfo);
StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
if (errorCallback == null) {
continue;
}
errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info));
}
redisTemplate.opsForValue().set(playbackKey, info);
}else {
log.info("[JT-回放] 回放超时, logInfo {}", logInfo);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null));
}
receiveRtpServerService.closeRTPServer(mediaServer, app, stream);
}
});
log.info("[JT-回放] logInfo {} 端口: {}", logInfo, port);
J9201 j9201 = new J9201();
j9201.setChannel(channelId);
j9201.setIp(mediaServer.getSdpIp());
@ -488,8 +501,8 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
j9201.setPlaybackSpeed(playbackSpeed);
}
j9201.setTcpPort(ssrcInfo.getPort());
j9201.setUdpPort(ssrcInfo.getPort());
j9201.setTcpPort(port);
j9201.setUdpPort(port);
j9201.setType(type);
j9201.setStartTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime));
j9201.setEndTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime));
@ -509,7 +522,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
// 删除缓存数据
if (streamInfo != null) {
// 关闭rtpServer
mediaServerService.closeJTTServer(streamInfo.getMediaServer(), streamInfo.getStream(), null);
receiveRtpServerService.closeRTPServer(streamInfo.getMediaServer(), streamInfo.getApp(), streamInfo.getStream());
}
// 清理回调
List<CommonCallback<WVPResult<StreamInfo>>> generalCallbacks = inviteErrorCallbackMap.get(playKey);
@ -615,7 +628,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
sendRtpInfo.setReceiveStream(stream + "_talk");
// 设置hook监听
Hook hook = Hook.getInstance(HookType.on_media_arrival, "1078", sendRtpInfo.getReceiveStream(), mediaServer.getId());
Hook hook = Hook.getInstance(HookType.on_media_arrival, MediaApp.JT1078, sendRtpInfo.getReceiveStream(), mediaServer.getId());
subscribe.addSubscribe(hook, (hookData) -> {
log.info("[JT-对讲] 对讲连接建立, phoneNumber {} channelId {}", phoneNumber, channelId);
subscribe.removeSubscribe(hook);
@ -662,8 +675,8 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
// 删除缓存数据
if (streamInfo != null) {
redisTemplate.delete(playKey);
// 关闭rtpServer
mediaServerService.closeJTTServer(streamInfo.getMediaServer(), streamInfo.getStream(), null);
// 关闭 rtpServer
receiveRtpServerService.closeRTPServer(streamInfo.getMediaServer(), streamInfo.getApp(), streamInfo.getStream());
}
// 清理回调
List<CommonCallback<WVPResult<StreamInfo>>> generalCallbacks = inviteErrorCallbackMap.get(playKey);

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.media.abl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
@ -188,14 +189,14 @@ public class ABLHttpHookListener {
logger.info("[ABL HOOK] 码流不到达通知:{}->{}/{}", param.getMediaServerId(), param.getApp(), param.getStream());
try {
if ("rtp".equals(param.getApp())) {
if (MediaApp.GB28181.equals(param.getApp())) {
return HookResult.SUCCESS();
}
MediaRtpServerTimeoutEvent event = new MediaRtpServerTimeoutEvent(this);
MediaServer mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
if (mediaServerItem != null) {
event.setMediaServer(mediaServerItem);
event.setApp("rtp");
event.setApp(MediaApp.GB28181);
applicationEventPublisher.publishEvent(event);
}
}catch (Exception e) {

View File

@ -5,10 +5,12 @@ import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.SipConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.bean.TalkRtpInfo;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.media.abl.bean.ABLMedia;
import com.genersoft.iot.vmp.media.abl.bean.ABLResult;
@ -68,40 +70,40 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
}
@Override
public int createRTPServer(MediaServer mediaServer, String stream, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) {
public int createRTPServer(MediaServer mediaServer, String app, String stream, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) {
Boolean recordSip = userSetting.getRecordSip();
return ablresTfulUtils.openRtpServer(mediaServer, "rtp", stream, 96, port, tcpMode, disableAudio?1:0, recordSip, false);
return ablresTfulUtils.openRtpServer(mediaServer, app, stream, 96, port, tcpMode, disableAudio?1:0, recordSip, false);
}
@Override
public void closeRtpServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
public void closeRtpServer(MediaServer mediaServer, String app, String stream, CommonCallback<Boolean> callback) {
if (mediaServer == null) {
return;
}
ABLResult result = ablresTfulUtils.closeStreams(mediaServer, "rtp", streamId);
ABLResult result = ablresTfulUtils.closeStreams(mediaServer, app, stream);
logger.info("关闭RTP Server " + result);
if (result.getCode() != 0) {
logger.error("[closeRtpServer] 失败: {}", result.getMemo());
}
}
@Override
public int createJTTServer(MediaServer mediaServer, String stream, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode) {
Boolean recordSip = userSetting.getRecordSip();
return ablresTfulUtils.openRtpServer(mediaServer, "1078", stream, 96, port, tcpMode, disableAudio?1:0, recordSip, true);
}
@Override
public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
if (mediaServer == null) {
return;
}
ABLResult result = ablresTfulUtils.closeStreams(mediaServer, "1078", streamId);
logger.info("关闭JT-RTP Server " + result);
if (result.getCode() != 0) {
logger.error("[JT-closeRtpServer] 失败: {}", result.getMemo());
}
}
// @Override
// public int createJTTServer(MediaServer mediaServer, String stream, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode) {
// Boolean recordSip = userSetting.getRecordSip();
// return ablresTfulUtils.openRtpServer(mediaServer, MediaApp.JT1078, stream, 96, port, tcpMode, disableAudio?1:0, recordSip, true);
// }
//
// @Override
// public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
// if (mediaServer == null) {
// return;
// }
// ABLResult result = ablresTfulUtils.closeStreams(mediaServer, MediaApp.JT1078, streamId);
// logger.info("关闭JT-RTP Server " + result);
// if (result.getCode() != 0) {
// logger.error("[JT-closeRtpServer] 失败: {}", result.getMemo());
// }
// }
@Override
public void closeStreams(MediaServer mediaServer, String app, String streamId) {
@ -112,7 +114,7 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
}
@Override
public Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String streamId, String ssrc) {
public Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String app, String streamId, String ssrc) {
return null;
}
@ -246,7 +248,7 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
streamInfoResult.setMediaInfo(mediaInfo);
if (!"broadcast".equalsIgnoreCase(app) && !ObjectUtils.isEmpty(mediaServer.getTranscodeSuffix()) && !"null".equalsIgnoreCase(mediaServer.getTranscodeSuffix())) {
if (!MediaApp.GB28181_BROADCAST.equalsIgnoreCase(app) && !ObjectUtils.isEmpty(mediaServer.getTranscodeSuffix()) && !"null".equalsIgnoreCase(mediaServer.getTranscodeSuffix())) {
String newStream = stream + "_" + mediaServer.getTranscodeSuffix();
mediaServer.setTranscodeSuffix(null);
StreamInfo transcodeStreamInfo = getStreamInfoByAppAndStream(mediaServer, app, newStream, null, addr, callId, isPlay);
@ -256,7 +258,7 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
}
@Override
public Boolean connectRtpServer(MediaServer mediaServerItem, String address, int port, String stream) {
public Boolean connectRtpServer(MediaServer mediaServerItem, String address, int port, String app, String stream) {
logger.warn("[abl-connectRtpServer] 未实现");
return null;
}
@ -386,7 +388,7 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
}
@Override
public Integer startSendRtpTalk(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout) {
public Integer startSendRtpTalk(MediaServer mediaServer, TalkRtpInfo talkRtpInfo, Integer timeout) {
logger.warn("[abl-startSendRtpTalk] 未实现");
return 0;
}
@ -443,7 +445,7 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
@Override
public List<String> listRtpServer(MediaServer mediaServer) {
ABLResult ablResult = ablresTfulUtils.getMediaList(mediaServer, "rtp", null);
ABLResult ablResult = ablresTfulUtils.getMediaList(mediaServer, null, null);
if (ablResult.getCode() != 0) {
return null;
}

View File

@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.media.service;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.bean.TalkRtpInfo;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.bean.RecordInfo;
@ -15,19 +16,19 @@ import java.util.List;
import java.util.Map;
public interface IMediaNodeServerService {
int createRTPServer(MediaServer mediaServer, String streamId, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode);
int createRTPServer(MediaServer mediaServer, String app, String stream, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode);
void closeRtpServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback);
void closeRtpServer(MediaServer mediaServer, String app, String stream, CommonCallback<Boolean> callback);
int createJTTServer(MediaServer mediaServer, String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode);
void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback);
// int createJTTServer(MediaServer mediaServer, String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode);
//
// void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback);
void closeStreams(MediaServer mediaServer, String app, String stream);
Boolean updateRtpServerSSRC(MediaServer mediaServer, String stream, String ssrc);
Boolean updateRtpServerSSRC(MediaServer mediaServer, String app, String stream, String ssrc);
boolean checkNodeId(MediaServer mediaServer);
@ -43,7 +44,7 @@ public interface IMediaNodeServerService {
List<StreamInfo> getMediaList(MediaServer mediaServer, String app, String stream, String callId);
Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String stream);
Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String app, String stream);
void getSnap(MediaServer mediaServer, String app, String stream, int timeoutSec, int expireSec, String path, String fileName);
@ -67,7 +68,7 @@ public interface IMediaNodeServerService {
void startSendRtpStream(MediaServer mediaServer, SendRtpInfo sendRtpItem);
Integer startSendRtpTalk(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout);
Integer startSendRtpTalk(MediaServer mediaServer, TalkRtpInfo talkRtpInfo, Integer timeout);
Long updateDownloadProcess(MediaServer mediaServer, String app, String stream);

View File

@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.media.service;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.bean.TalkRtpInfo;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.bean.RecordInfo;
@ -34,20 +35,11 @@ public interface IMediaServerService {
void updateVmServer(List<MediaServer> mediaServerItemList);
SSRCInfo openRTPServer(MediaServer mediaServerItem, String streamId, String presetSsrc, boolean ssrcCheck,
boolean isPlayback, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode);
void closeRTPServer(MediaServer mediaServerItem, String app, String streamId);
void closeRTPServer(MediaServer mediaServerItem, String streamId);
void closeRTPServer(MediaServer mediaServerItem, String app, String streamId, CommonCallback<Boolean> callback);
void closeRTPServer(MediaServer mediaServerItem, String streamId, CommonCallback<Boolean> callback);
SSRCInfo openJTTServer(MediaServer mediaServerItem, String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode);
void closeJTTServer(MediaServer mediaServerItem, String streamId, CommonCallback<Boolean> callback);
Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String streamId, String ssrc);
void closeRTPServer(String mediaServerId, String streamId);
Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String app, String streamId, String ssrc);
void clearRTPServer(MediaServer mediaServerItem);
@ -89,7 +81,7 @@ public interface IMediaServerService {
List<StreamInfo> getMediaList(MediaServer mediaInfo, String app, String stream, String callId);
Boolean connectRtpServer(MediaServer mediaServerItem, String address, int port, String stream);
Boolean connectRtpServer(MediaServer mediaServerItem, String address, int port, String app, String stream);
void getSnap(MediaServer mediaServer, String app, String stream, int timeoutSec, int expireSec, String path, String fileName);
@ -148,7 +140,7 @@ public interface IMediaServerService {
Integer startSendRtpPassive(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout);
Integer startSendRtpTalk(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout);
Integer startSendRtpTalk(MediaServer mediaServer, TalkRtpInfo talkRtpInfo, Integer timeout);
void startSendRtp(MediaServer mediaServer, SendRtpInfo sendRtpItem);
@ -162,7 +154,7 @@ public interface IMediaServerService {
StreamInfo getMediaByAppAndStream(String app, String stream);
int createRTPServer(MediaServer mediaServerItem, String streamId, long ssrc, Integer port, boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode);
int createRTPServer(MediaServer mediaServerItem, String app, String streamId, long ssrc, Integer port, boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode);
List<String> listRtpServer(MediaServer mediaServer);

View File

@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.bean.TalkRtpInfo;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
@ -24,7 +25,6 @@ import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
import com.genersoft.iot.vmp.service.bean.DownloadFileInfo;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.MediaServerLoad;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.storager.dao.MediaServerMapper;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
@ -163,49 +163,8 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
}
@Override
public SSRCInfo openRTPServer(MediaServer mediaServer, String streamId, String presetSsrc, boolean ssrcCheck,
boolean isPlayback, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) {
if (mediaServer == null || mediaServer.getId() == null) {
log.info("[openRTPServer] 失败, mediaServer == null || mediaServer.getId() == null");
return null;
}
// 获取mediaServer可用的ssrc
String ssrc;
if (presetSsrc != null) {
ssrc = presetSsrc;
}else {
if (isPlayback) {
ssrc = ssrcFactory.getPlayBackSsrc(mediaServer.getId());
}else {
ssrc = ssrcFactory.getPlaySsrc(mediaServer.getId());
}
}
if (streamId == null) {
streamId = String.format("%08x", Long.parseLong(ssrc)).toUpperCase();
}
if (ssrcCheck && tcpMode > 0) {
// 目前zlm不支持 tcp模式更新ssrc暂时关闭ssrc校验
log.warn("[openRTPServer] 平台对接时下级可能自定义ssrc但是tcp模式zlm收流目前无法更新ssrc可能收流超时此时请使用udp收流或者关闭ssrc校验");
}
int rtpServerPort;
if (mediaServer.isRtpEnable()) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[openRTPServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return null;
}
rtpServerPort = mediaNodeServerService.createRTPServer(mediaServer, streamId, ssrcCheck ? Long.parseLong(ssrc) : 0, port, onlyAuto, disableAudio, reUsePort, tcpMode);
} else {
rtpServerPort = mediaServer.getRtpProxyPort();
}
return new SSRCInfo(rtpServerPort, ssrc, "rtp", streamId, null);
}
@Override
public int createRTPServer(MediaServer mediaServer, String streamId, long ssrc, Integer port, boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode) {
public int createRTPServer(MediaServer mediaServer, String app, String streamId, long ssrc, Integer port, boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode) {
int rtpServerPort;
if (mediaServer.isRtpEnable()) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
@ -213,34 +172,13 @@ public class MediaServerServiceImpl implements IMediaServerService {
log.info("[openRTPServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return 0;
}
rtpServerPort = mediaNodeServerService.createRTPServer(mediaServer, streamId, ssrc, port, onlyAuto, disableAudio, reUsePort, tcpMode);
rtpServerPort = mediaNodeServerService.createRTPServer(mediaServer, app, streamId, ssrc, port, onlyAuto, disableAudio, reUsePort, tcpMode);
} else {
rtpServerPort = mediaServer.getRtpProxyPort();
}
return rtpServerPort;
}
@Override
public SSRCInfo openJTTServer(MediaServer mediaServer, @NotNull String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode) {
if (mediaServer == null || mediaServer.getId() == null) {
log.info("[openJTTServer] 失败, mediaServer == null || mediaServer.getId() == null");
return null;
}
int rtpServerPort;
if (mediaServer.isRtpEnable()) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[openJTTServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return null;
}
rtpServerPort = mediaNodeServerService.createJTTServer(mediaServer, streamId, port, disableVideo, disableAudio, tcpMode);
} else {
rtpServerPort = mediaServer.getJttProxyPort();
}
return new SSRCInfo(rtpServerPort, null, "1078", streamId, null);
}
@Override
public List<String> listRtpServer(MediaServer mediaServer) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
@ -252,7 +190,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
@Override
public void closeRTPServer(MediaServer mediaServer, String streamId) {
public void closeRTPServer(MediaServer mediaServer, String app, String streamId) {
if (mediaServer == null) {
return;
}
@ -261,11 +199,11 @@ public class MediaServerServiceImpl implements IMediaServerService {
log.info("[closeRTPServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return;
}
mediaNodeServerService.closeRtpServer(mediaServer, streamId, null);
mediaNodeServerService.closeRtpServer(mediaServer, app, streamId, null);
}
@Override
public void closeRTPServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
public void closeRTPServer(MediaServer mediaServer, String app, String streamId, CommonCallback<Boolean> callback) {
if (mediaServer == null) {
callback.run(false);
return;
@ -275,42 +213,11 @@ public class MediaServerServiceImpl implements IMediaServerService {
log.info("[closeRTPServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return;
}
mediaNodeServerService.closeRtpServer(mediaServer, streamId, callback);
mediaNodeServerService.closeRtpServer(mediaServer, app, streamId, callback);
}
@Override
public void closeRTPServer(String mediaServerId, String streamId) {
MediaServer mediaServer = this.getOne(mediaServerId);
if (mediaServer == null) {
return;
}
if (mediaServer.isRtpEnable()) {
closeRTPServer(mediaServer, streamId);
}
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[closeRTPServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return;
}
mediaNodeServerService.closeStreams(mediaServer, "rtp", streamId);
}
@Override
public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
if (mediaServer == null) {
callback.run(false);
return;
}
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[closeJTTServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return;
}
mediaNodeServerService.closeJTTServer(mediaServer, streamId, callback);
}
@Override
public Boolean updateRtpServerSSRC(MediaServer mediaServer, String streamId, String ssrc) {
public Boolean updateRtpServerSSRC(MediaServer mediaServer, String app, String streamId, String ssrc) {
if (mediaServer == null) {
return false;
}
@ -319,7 +226,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
log.info("[updateRtpServerSSRC] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return false;
}
return mediaNodeServerService.updateRtpServerSSRC(mediaServer, streamId, ssrc);
return mediaNodeServerService.updateRtpServerSSRC(mediaServer, app, streamId, ssrc);
}
@Override
@ -717,13 +624,13 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
@Override
public Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String stream) {
public Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String app, String stream) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[connectRtpServer] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
return false;
}
return mediaNodeServerService.connectRtpServer(mediaServer, address, port, stream);
return mediaNodeServerService.connectRtpServer(mediaServer, address, port, app, stream);
}
@Override
@ -897,15 +804,16 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
@Override
public Integer startSendRtpTalk(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout) {
public Integer startSendRtpTalk(MediaServer mediaServer, TalkRtpInfo talkRtpInfo, Integer timeout) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[startSendRtpPassive] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类");
}
return mediaNodeServerService.startSendRtpTalk(mediaServer, sendRtpItem, timeout);
return mediaNodeServerService.startSendRtpTalk(mediaServer, talkRtpInfo, timeout);
}
@Override
public void startSendRtp(MediaServer mediaServer, SendRtpInfo sendRtpItem) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.media.zlm;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.MediaConfig;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.media.bean.MediaServer;
@ -262,7 +263,7 @@ public class ZLMHttpHookListener {
log.info("[ZLM HOOK] rtp发送关闭{}->{}/{}", param.getMediaServerId(), param.getApp(), param.getStream());
// 查找对应的上级推流发送停止
if (!"rtp".equals(param.getApp())) {
if (!MediaApp.GB28181.equals(param.getApp())) {
return HookResult.SUCCESS();
}
try {
@ -293,7 +294,7 @@ public class ZLMHttpHookListener {
MediaServer mediaServerItem = mediaServerService.getOne(param.getMediaServerId());
if (mediaServerItem != null) {
event.setMediaServer(mediaServerItem);
event.setApp("rtp");
event.setApp(MediaApp.GB28181);
applicationEventPublisher.publishEvent(event);
}
}catch (Exception e) {

View File

@ -5,9 +5,11 @@ import com.alibaba.fastjson2.JSONArray;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.CommonCallback;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.gb28181.bean.TalkRtpInfo;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.bean.RecordInfo;
@ -47,24 +49,24 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
private HookSubscribe subscribe;
@Override
public int createRTPServer(MediaServer mediaServer, String streamId, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) {
return zlmServerFactory.createRTPServer(mediaServer, "rtp", streamId, ssrc, port, onlyAuto, disableAudio, reUsePort, tcpMode);
public int createRTPServer(MediaServer mediaServer, String app, String stream, long ssrc, Integer port, Boolean onlyAuto, Boolean disableAudio, Boolean reUsePort, Integer tcpMode) {
return zlmServerFactory.createRTPServer(mediaServer, app, stream, ssrc, port, onlyAuto, disableAudio, reUsePort, tcpMode);
}
@Override
public void closeRtpServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
zlmServerFactory.closeRtpServer(mediaServer, streamId, callback);
public void closeRtpServer(MediaServer mediaServer, String app, String stream, CommonCallback<Boolean> callback) {
zlmServerFactory.closeRtpServer(mediaServer, app, stream, callback);
}
@Override
public int createJTTServer(MediaServer mediaServer, String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode) {
return zlmServerFactory.createRTPServer(mediaServer, "1078", streamId, 0, port, disableVideo, disableAudio, false, tcpMode);
}
// @Override
// public int createJTTServer(MediaServer mediaServer, String streamId, Integer port, Boolean disableVideo, Boolean disableAudio, Integer tcpMode) {
// return zlmServerFactory.createRTPServer(mediaServer, "1078", streamId, 0, port, disableVideo, disableAudio, false, tcpMode);
// }
@Override
public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
zlmServerFactory.closeRtpServer(mediaServer, streamId, callback);
}
// @Override
// public void closeJTTServer(MediaServer mediaServer, String streamId, CommonCallback<Boolean> callback) {
// zlmServerFactory.closeRtpServer(mediaServer, streamId, callback);
// }
@Override
public void closeStreams(MediaServer mediaServer, String app, String stream) {
@ -72,8 +74,8 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
}
@Override
public Boolean updateRtpServerSSRC(MediaServer mediaServer, String streamId, String ssrc) {
return zlmServerFactory.updateRtpServerSSRC(mediaServer, streamId, ssrc);
public Boolean updateRtpServerSSRC(MediaServer mediaServer, String app, String streamId, String ssrc) {
return zlmServerFactory.updateRtpServerSSRC(mediaServer, app, streamId, ssrc);
}
@Override
@ -208,8 +210,8 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
}
@Override
public Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String stream) {
ZLMResult<?> zlmResult = zlmresTfulUtils.connectRtpServer(mediaServer, address, port, stream);
public Boolean connectRtpServer(MediaServer mediaServer, String address, int port, String app, String stream) {
ZLMResult<?> zlmResult = zlmresTfulUtils.connectRtpServer(mediaServer, address, port, app, stream);
log.info("[TCP主动连接对方] 结果: {}", zlmResult);
return zlmResult.getCode() == 0;
}
@ -370,17 +372,17 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
}
@Override
public Integer startSendRtpTalk(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout) {
public Integer startSendRtpTalk(MediaServer mediaServer, TalkRtpInfo talkRtpInfo, Integer timeout) {
Map<String, Object> param = new HashMap<>(12);
param.put("vhost","__defaultVhost__");
param.put("app", sendRtpItem.getApp());
param.put("stream", sendRtpItem.getStream());
param.put("ssrc", sendRtpItem.getSsrc());
param.put("pt", sendRtpItem.getPt());
param.put("type", sendRtpItem.isUsePs() ? "1" : "0");
param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
param.put("recv_stream_id", sendRtpItem.getReceiveStream());
param.put("enable_origin_recv_limit", "1");
param.put("app", talkRtpInfo.getApp());
param.put("stream", talkRtpInfo.getStream());
param.put("ssrc", talkRtpInfo.getSsrc());
param.put("pt", talkRtpInfo.getPt());
param.put("type", talkRtpInfo.getType());
param.put("only_audio", talkRtpInfo.getOnlyAudio());
param.put("recv_stream_id", talkRtpInfo.getReceiveStreamId());
param.put("enable_origin_recv_limit", talkRtpInfo.getEnableOriginReceiveLimit() != null && talkRtpInfo.getEnableOriginReceiveLimit() == 1 ? "1" : "0");
ZLMResult<?> zlmResult = zlmServerFactory.startSendRtpTalk(mediaServer, param, null);
if (zlmResult.getCode() != 0 ) {
log.error("启动监听TCP被动推流失败: {}, 参数:{}", zlmResult.getMsg(), JSON.toJSONString(param));
@ -679,7 +681,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
streamInfoResult.setMediaInfo(mediaInfo);
if (!"broadcast".equalsIgnoreCase(app) && !ObjectUtils.isEmpty(mediaServer.getTranscodeSuffix()) && !"null".equalsIgnoreCase(mediaServer.getTranscodeSuffix())) {
if (!MediaApp.GB28181_BROADCAST.equalsIgnoreCase(app) && !ObjectUtils.isEmpty(mediaServer.getTranscodeSuffix()) && !"null".equalsIgnoreCase(mediaServer.getTranscodeSuffix())) {
String newStream = stream + "_" + mediaServer.getTranscodeSuffix();
mediaServer.setTranscodeSuffix(null);
StreamInfo transcodeStreamInfo = getStreamInfoByAppAndStream(mediaServer, app, newStream, null, addr, callId, isPlay);

View File

@ -509,21 +509,25 @@ public class ZLMRESTfulUtils {
}
public ZLMResult<?> startSendRtpPassive(MediaServer mediaServer, Map<String, Object> param, ResultCallback callback) {
String response = sendPost(mediaServer, "startSendRtpPassive",param, (responseStr -> {
if (callback == null) {
return;
}
if (responseStr == null) {
callback.run(ZLMResult.getFailForMediaServer());
}else {
ZLMResult<?> zlmResult = JSON.parseObject(responseStr, ZLMResult.class);
if (zlmResult == null) {
RequestCallback requestCallback = null;
if (callback != null) {
requestCallback = (responseStr -> {
if (callback == null) {
return;
}
if (responseStr == null) {
callback.run(ZLMResult.getFailForMediaServer());
}else {
callback.run(zlmResult);
ZLMResult<?> zlmResult = JSON.parseObject(responseStr, ZLMResult.class);
if (zlmResult == null) {
callback.run(ZLMResult.getFailForMediaServer());
}else {
callback.run(zlmResult);
}
}
}
}));
});
}
String response = sendPost(mediaServer, "startSendRtpPassive",param, requestCallback);
if (response == null) {
return ZLMResult.getFailForMediaServer();
}else {
@ -694,10 +698,11 @@ public class ZLMRESTfulUtils {
}
}
public ZLMResult<?> connectRtpServer(MediaServer mediaServer, String dst_url, int dst_port, String stream_id) {
public ZLMResult<?> connectRtpServer(MediaServer mediaServer, String dst_url, int dst_port, String app, String stream_id) {
Map<String, Object> param = new HashMap<>(1);
param.put("dst_url", dst_url);
param.put("dst_port", dst_port);
param.put("app", app);
param.put("stream_id", stream_id);
String response = sendPost(mediaServer, "connectRtpServer", param, null);
if (response == null) {
@ -712,9 +717,10 @@ public class ZLMRESTfulUtils {
}
}
public ZLMResult<?> updateRtpServerSSRC(MediaServer mediaServer, String streamId, String ssrc) {
public ZLMResult<?> updateRtpServerSSRC(MediaServer mediaServer, String app, String streamId, String ssrc) {
Map<String, Object> param = new HashMap<>(1);
param.put("ssrc", ssrc);
param.put("app", app);
param.put("stream_id", streamId);
String response = sendPost(mediaServer, "updateRtpServerSSRC", param, null);

View File

@ -98,10 +98,11 @@ public class ZLMServerFactory {
return result;
}
public boolean closeRtpServer(MediaServer serverItem, String streamId) {
public boolean closeRtpServer(MediaServer serverItem, String app, String streamId) {
boolean result = false;
if (serverItem !=null){
Map<String, Object> param = new HashMap<>();
param.put("app", app);
param.put("stream_id", streamId);
ZLMResult<?> zlmResult = zlmresTfulUtils.closeRtpServer(serverItem, param);
if (zlmResult != null ) {
@ -118,7 +119,7 @@ public class ZLMServerFactory {
return result;
}
public void closeRtpServer(MediaServer serverItem, String streamId, CommonCallback<Boolean> callback) {
public void closeRtpServer(MediaServer serverItem, String app, String streamId, CommonCallback<Boolean> callback) {
if (serverItem == null) {
if (callback != null) {
callback.run(false);
@ -126,6 +127,7 @@ public class ZLMServerFactory {
return;
}
Map<String, Object> param = new HashMap<>();
param.put("app", app);
param.put("stream_id", streamId);
zlmresTfulUtils.closeRtpServer(serverItem, param, zlmResult -> {
if (zlmResult.getCode() == 0) {
@ -223,9 +225,9 @@ public class ZLMServerFactory {
return zlmResult;
}
public Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String streamId, String ssrc) {
public Boolean updateRtpServerSSRC(MediaServer mediaServerItem, String app, String streamId, String ssrc) {
boolean result = false;
ZLMResult<?> zlmResult = zlmresTfulUtils.updateRtpServerSSRC(mediaServerItem, streamId, ssrc);
ZLMResult<?> zlmResult = zlmresTfulUtils.updateRtpServerSSRC(mediaServerItem, app, streamId, ssrc);
if (zlmResult.getCode() == 0) {
result= true;
log.info("[更新RTPServer] 成功");

View File

@ -1,13 +1,22 @@
package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.gb28181.bean.OpenRTPServerResult;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.HookData;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.RTPServerParam;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
public interface IReceiveRtpServerService {
SSRCInfo openRTPServer(RTPServerParam rtpServerParam, ErrorCallback<OpenRTPServerResult> callback);
void closeRTPServer(MediaServer mediaServer, SSRCInfo ssrcInfo);
SSRCInfo openGbRTPServer(MediaServer mediaServer, String streamId, String presetSSRC, int tcpMode,
boolean playback, boolean ssrcCheck, boolean onlyAuto, boolean disableAuto,
ErrorCallback<OpenRTPServerResult> callback);
int openRTPServer(RTPServerParam rtpServerParam, ErrorCallback<HookData> callback);
void closeRTPServer(MediaServer mediaServer, String app, String stream);
void closeRTPServerByMediaServerId(String mediaServerId, String app, String stream);
}

View File

@ -1,7 +1,6 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.utils.DateUtil;
import lombok.Data;
@Data
@ -62,7 +61,7 @@ public class GPSMsgInfo {
gpsMsgInfo.setLat(mobilePosition.getLatitude());
gpsMsgInfo.setSpeed(mobilePosition.getSpeed());
gpsMsgInfo.setDirection(mobilePosition.getDirection());
gpsMsgInfo.setTime(DateUtil.yyyy_MM_dd_HH_mm_ssToISO8601(mobilePosition.getTime()));
gpsMsgInfo.setTime(mobilePosition.getTime());
return gpsMsgInfo;
}
}

View File

@ -2,15 +2,25 @@ package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Data
@Getter
@Setter
@NoArgsConstructor
public class RTPServerParam {
private MediaServer mediaServerItem;
/**
* 使用的流媒体
*/
private MediaServer mediaServer;
private String app;
private String streamId;
private String presetSsrc;
private boolean ssrcCheck;
private boolean playback;
/**
* 开启rtpServer时使用的ssrc开启rtpServer时会根据这个ssrc进行校验如果不填则不校验
*/
private Long ssrc;
private Integer port;
private boolean onlyAuto;
private boolean disableAudio;
@ -21,5 +31,16 @@ public class RTPServerParam {
*/
private Integer tcpMode;
public RTPServerParam(MediaServer mediaServer, String app, String streamId, Long ssrc, Integer port,
boolean onlyAuto, boolean disableAudio, boolean reUsePort, Integer tcpMode) {
this.mediaServer = mediaServer;
this.app = app;
this.streamId = streamId;
this.ssrc = ssrc;
this.port = port;
this.onlyAuto = onlyAuto;
this.disableAudio = disableAudio;
this.reUsePort = reUsePort;
this.tcpMode = tcpMode;
}
}

View File

@ -9,13 +9,12 @@ public class SSRCInfo {
private String ssrc;
private String app;
private String Stream;
private String timeOutTaskKey;
public SSRCInfo(int port, String ssrc, String app, String stream, String timeOutTaskKey) {
public SSRCInfo(int port, String ssrc, String app, String stream) {
this.port = port;
this.ssrc = ssrc;
this.app = app;
this.Stream = stream;
this.timeOutTaskKey = timeOutTaskKey;
}
}

View File

@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionStatus;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
@ -82,7 +83,7 @@ public class MediaServiceImpl implements IMediaService {
if (app == null || stream == null) {
return false;
}
if ("rtp".equals(app)) {
if (MediaApp.GB28181.equals(app)) {
return true;
}
StreamAuthorityInfo streamAuthorityInfo = redisCatchStorage.getStreamAuthorityInfo(app, stream);
@ -95,8 +96,8 @@ public class MediaServiceImpl implements IMediaService {
@Override
public ResultForOnPublish authenticatePublish(MediaServer mediaServer, String app, String stream, String params) {
// 推流鉴权的处理
if (!"rtp".equals(app) && !"1078".equals(app) ) {
if ("talk".equals(app) && stream.endsWith("_talk")) {
if (!MediaApp.GB28181.equals(app) && !MediaApp.JT1078.equals(app) ) {
if (MediaApp.GB28181_TALK.equals(app) && stream.endsWith("_talk")) {
ResultForOnPublish result = new ResultForOnPublish();
result.setEnable_mp4(false);
result.setEnable_audio(true);
@ -156,7 +157,7 @@ public class MediaServiceImpl implements IMediaService {
result.setEnable_audio(true);
// 国标流
if ("rtp".equals(app)) {
if (MediaApp.GB28181.equals(app)) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream);
@ -214,16 +215,16 @@ public class MediaServiceImpl implements IMediaService {
result.setEnable_audio(true);
}
}
} else if (app.equals("broadcast")) {
} else if (app.equals(MediaApp.GB28181_BROADCAST)) {
result.setEnable_audio(true);
result.setEnable_mp4(userSetting.getRecordSip());
} else if (app.equals("talk")) {
} else if (app.equals(MediaApp.GB28181_TALK)) {
result.setEnable_audio(true);
result.setEnable_mp4(userSetting.getRecordSip());
}else {
result.setEnable_mp4(userSetting.getRecordPushLive());
}
if (app.equalsIgnoreCase("rtp")) {
if (app.equalsIgnoreCase(MediaApp.GB28181)) {
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + stream;
OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo) redisTemplate.opsForValue().get(receiveKey);
@ -243,7 +244,7 @@ public class MediaServiceImpl implements IMediaService {
return false;
}
// 国标类型的流
if ("rtp".equals(app)) {
if (MediaApp.GB28181.equals(app)) {
result = userSetting.getStreamOnDemand();
// 国标流 点播/录像回放/录像下载
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream);
@ -260,7 +261,7 @@ public class MediaServiceImpl implements IMediaService {
}
return result;
}
}else if ("1078".equals(app)) {
}else if (MediaApp.JT1078.equals(app)) {
// 判断是否是1078播放类型
JTMediaStreamType jtMediaStreamType = ijt1078Service.checkStreamFromJt(stream);
if (jtMediaStreamType != null) {
@ -273,7 +274,7 @@ public class MediaServiceImpl implements IMediaService {
}else {
return false;
}
}else if ("talk".equals(app) || "broadcast".equals(app)) {
}else if (MediaApp.GB28181_TALK.equals(app) || MediaApp.GB28181_BROADCAST.equals(app)) {
return false;
} else if ("mp4_record".equals(app)) {
return true;

View File

@ -1,5 +1,6 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.OpenRTPServerResult;
@ -7,6 +8,7 @@ import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookData;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
@ -23,12 +25,14 @@ import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.UUID;
import java.util.Objects;
@Slf4j
@Service
public class RtpServerServiceImpl implements IReceiveRtpServerService {
private final static String TIMEOUT_TASK_KEY_PREFIX = "RTP_SERVER_TIMEOUT_TASK";
@Autowired
private IMediaServerService mediaServerService;
@ -66,97 +70,122 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
}
@Override
public SSRCInfo openRTPServer(RTPServerParam rtpServerParam, ErrorCallback<OpenRTPServerResult> callback) {
public SSRCInfo openGbRTPServer(MediaServer mediaServer, String streamId, String presetSSRC, int tcpMode,
boolean playback, boolean ssrcCheck, boolean onlyAuto, boolean disableAuto,
ErrorCallback<OpenRTPServerResult> callback) {
if (callback == null) {
log.warn("[开启RTP收流] 失败回调为NULL");
log.warn("[开启国标RTP收流] 失败回调为NULL");
return null;
}
if (rtpServerParam.getMediaServerItem() == null) {
log.warn("[开启RTP收流] 失败媒体节点为NULL");
if (mediaServer == null) {
log.warn("[开启国标RTP收流] 失败媒体节点为NULL");
return null;
}
// 获取mediaServer可用的ssrc
// 获取 mediaServer 可用的 ssrc
final String ssrc;
if (rtpServerParam.getPresetSsrc() != null) {
ssrc = rtpServerParam.getPresetSsrc();
if (presetSSRC != null) {
ssrc = presetSSRC;
}else {
if (rtpServerParam.isPlayback()) {
ssrc = ssrcFactory.getPlayBackSsrc(rtpServerParam.getMediaServerItem().getId());
if (playback) {
ssrc = ssrcFactory.getPlayBackSsrc(mediaServer.getId());
}else {
ssrc = ssrcFactory.getPlaySsrc(rtpServerParam.getMediaServerItem().getId());
ssrc = ssrcFactory.getPlaySsrc(mediaServer.getId());
}
}
final String streamId;
if (rtpServerParam.getStreamId() == null) {
if (streamId == null) {
streamId = String.format("%08x", Long.parseLong(ssrc)).toUpperCase();
}else {
streamId = rtpServerParam.getStreamId();
}
if (rtpServerParam.isSsrcCheck() && rtpServerParam.getTcpMode() > 0) {
if (ssrcCheck && tcpMode > 0) {
// 目前zlm不支持 tcp模式更新ssrc暂时关闭ssrc校验
log.warn("[openRTPServer] 平台对接时下级可能自定义ssrc但是tcp模式zlm收流目前无法更新ssrc可能收流超时此时请使用udp收流或者关闭ssrc校验");
}
int rtpServerPort;
if (rtpServerParam.getMediaServerItem().isRtpEnable()) {
rtpServerPort = mediaServerService.createRTPServer(rtpServerParam.getMediaServerItem(), streamId,
rtpServerParam.isSsrcCheck() ? Long.parseLong(ssrc) : 0, rtpServerParam.getPort(), rtpServerParam.isOnlyAuto(),
rtpServerParam.isDisableAudio(), rtpServerParam.isReUsePort(), rtpServerParam.getTcpMode());
} else {
rtpServerPort = rtpServerParam.getMediaServerItem().getRtpProxyPort();
}
if (rtpServerPort == 0) {
callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "开启RTPServer失败", null);
// 释放ssrc
if (rtpServerParam.getPresetSsrc() == null) {
ssrcFactory.releaseSsrc(rtpServerParam.getMediaServerItem().getId(), ssrc);
SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaApp.GB28181, streamId);
RTPServerParam rtpServerParam = new RTPServerParam(mediaServer, MediaApp.GB28181, streamId, ssrcCheck ? Long.parseLong(ssrc): 0L, null, onlyAuto, disableAuto, false, tcpMode);
int rtpServerPort = openRTPServer(rtpServerParam, ((code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult();
openRTPServerResult.setHookData(data);
openRTPServerResult.setSsrcInfo(ssrcInfo);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), openRTPServerResult);
} else {
// 释放ssrc
if (presetSSRC == null) {
ssrcFactory.releaseSsrc(mediaServer.getId(), ssrc);
}
OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult();
openRTPServerResult.setSsrcInfo(ssrcInfo);
callback.run(code, msg, openRTPServerResult);
}
return null;
}));
ssrcInfo.setPort(rtpServerPort);
return new SSRCInfo(rtpServerPort, ssrc, MediaApp.GB28181, streamId);
}
@Override
public int openRTPServer(RTPServerParam rtpServerParam, ErrorCallback<HookData> callback) {
if (callback == null) {
log.warn("[开启RTP收流] 失败回调为NULL");
return -1;
}
if (rtpServerParam.getMediaServer() == null) {
log.warn("[开启RTP收流] 失败媒体节点为NULL");
return -1;
}
// 设置流超时的定时任务
String timeOutTaskKey = UUID.randomUUID().toString();
String timeOutTaskKey = String.format("%s_%s_%s_%s", TIMEOUT_TASK_KEY_PREFIX, rtpServerParam.getMediaServer().getId(), rtpServerParam.getApp(), rtpServerParam.getStreamId());
SSRCInfo ssrcInfo = new SSRCInfo(rtpServerPort, ssrc, "rtp", streamId, timeOutTaskKey);
OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult();
openRTPServerResult.setSsrcInfo(ssrcInfo);
Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, ssrcInfo.getApp(), streamId, rtpServerParam.getMediaServerItem().getId());
Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, rtpServerParam.getApp(), rtpServerParam.getStreamId(), rtpServerParam.getMediaServer().getId());
dynamicTask.startDelay(timeOutTaskKey, () -> {
// 收流超时
// 释放ssrc
if (rtpServerParam.getPresetSsrc() == null) {
ssrcFactory.releaseSsrc(rtpServerParam.getMediaServerItem().getId(), ssrc);
}
// 关闭收流端口
mediaServerService.closeRTPServer(rtpServerParam.getMediaServerItem(), streamId);
mediaServerService.closeRTPServer(rtpServerParam.getMediaServer(), rtpServerParam.getApp(), rtpServerParam.getStreamId());
subscribe.removeSubscribe(rtpHook);
callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), openRTPServerResult);
callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
}, userSetting.getPlayTimeout());
// 开启流到来的监听
subscribe.addSubscribe(rtpHook, (hookData) -> {
dynamicTask.stop(timeOutTaskKey);
// hook响应
openRTPServerResult.setHookData(hookData);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), openRTPServerResult);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), hookData);
subscribe.removeSubscribe(rtpHook);
});
return ssrcInfo;
int rtpServerPort;
if (rtpServerParam.getMediaServer().isRtpEnable()) {
rtpServerPort = mediaServerService.createRTPServer(rtpServerParam.getMediaServer(), rtpServerParam.getApp(), rtpServerParam.getStreamId(),
Objects.requireNonNullElse(rtpServerParam.getSsrc(), 0L), rtpServerParam.getPort(), rtpServerParam.isOnlyAuto(),
rtpServerParam.isDisableAudio(), rtpServerParam.isReUsePort(), rtpServerParam.getTcpMode());
} else {
rtpServerPort = rtpServerParam.getMediaServer().getRtpProxyPort();
}
if (rtpServerPort == 0) {
callback.run(InviteErrorCode.ERROR_FOR_RESOURCE_EXHAUSTION.getCode(), "开启RTPServer失败", null);
return -1;
}
return rtpServerPort;
}
@Override
public void closeRTPServer(MediaServer mediaServer, SSRCInfo ssrcInfo) {
public void closeRTPServer(MediaServer mediaServer, String app, String stream) {
if (mediaServer == null) {
return;
}
if (ssrcInfo.getTimeOutTaskKey() != null) {
dynamicTask.stop(ssrcInfo.getTimeOutTaskKey());
String timeOutTaskKey = String.format("%s_%s_%s_%s", TIMEOUT_TASK_KEY_PREFIX, mediaServer.getId(), app, stream);
if (dynamicTask.contains(timeOutTaskKey)) {
dynamicTask.stop(timeOutTaskKey);
}
if (ssrcInfo.getSsrc() != null) {
// 释放ssrc
ssrcFactory.releaseSsrc(mediaServer.getId(), ssrcInfo.getSsrc());
mediaServerService.closeRTPServer(mediaServer, app, stream);
}
@Override
public void closeRTPServerByMediaServerId(String mediaServerId, String app, String stream) {
MediaServer mediaServer = mediaServerService.getOne(mediaServerId);
if (mediaServer == null) {
return;
}
mediaServerService.closeRTPServer(mediaServer, ssrcInfo.getStream());
closeRTPServer(mediaServer, app, stream);
}
}

View File

@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.streamProxy.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.enums.ChannelDataType;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
@ -103,7 +104,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Async
@EventListener
public void onApplicationEvent(MediaNotFoundEvent event) {
if ("rtp".equals(event.getApp())) {
if (MediaApp.isKeywords(event.getApp())) {
return;
}
// 拉流代理

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.streamPush.service.impl;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
@ -101,7 +102,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
updatePushStatus(streamPushInDb);
}
// 冗余数据自己系统中自用
if (!"broadcast".equals(event.getApp()) && !"talk".equals(event.getApp())) {
if (!MediaApp.GB28181_BROADCAST.equals(event.getApp()) && !MediaApp.GB28181_TALK.equals(event.getApp())) {
redisCatchStorage.addPushListItem(event.getApp(), event.getStream(), event.getMediaInfo());
}

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.vmanager.ps;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
@ -11,7 +12,10 @@ import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IReceiveRtpServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.RTPServerParam;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@ -48,6 +52,9 @@ public class PsController {
@Autowired
private ISendRtpServerService sendRtpServerService;
@Autowired
private IReceiveRtpServerService receiveRtpServerService;
@Autowired
private UserSetting userSetting;
@ -92,37 +99,44 @@ public class PsController {
}
}
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + callId + "_" + stream;
SSRCInfo ssrcInfo = mediaServerService.openRTPServer(mediaServer, stream, ssrcInt + "", false, false, null, false, false, false, tcpMode);
if (ssrcInfo.getPort() == 0) {
RTPServerParam rtpServerParam = new RTPServerParam();
rtpServerParam.setMediaServer(mediaServer);
rtpServerParam.setApp(MediaApp.GB28181);
rtpServerParam.setStreamId(stream);
rtpServerParam.setSsrc(ssrcInt);
rtpServerParam.setTcpMode(tcpMode);
int rtpServerPort = receiveRtpServerService.openRTPServer(rtpServerParam, ((code, msg, data) -> {
if (callBack == null) {
return;
}
if (code == InviteErrorCode.SUCCESS.getCode()) {
log.info("[第三方PS服务对接->开启收流和获取发流信息] 成功回调callId->{}, data->{}", callId, data);
// 将信息写入redis中以备后用
redisTemplate.delete(receiveKey);
OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
OkHttpClient client = httpClientBuilder.build();
String url = callBack + "?callId=" + callId;
Request request = new Request.Builder().get().url(url).build();
try {
client.newCall(request).execute();
} catch (IOException e) {
log.error("[第三方PS服务对接->开启收流和获取发流信息] 成功回调 callId->{}, 发送回调失败", callId, e);
}
} else {
log.info("[第三方PS服务对接->开启收流和获取发流信息] 失败回调callId->{}, code->{}, msg->{}", callId, code, msg);
// 将信息写入redis中以备后用
redisTemplate.delete(receiveKey);
}
}));
if (rtpServerPort == 0) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败");
}
// 注册回调如果rtp收流超时则通过回调发送通知
if (callBack != null) {
Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServer.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.addSubscribe(hook,
(hookData)->{
if (stream.equals(hookData.getStream())) {
log.info("[第三方PS服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId);
// 将信息写入redis中以备后用
redisTemplate.delete(receiveKey);
OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
OkHttpClient client = httpClientBuilder.build();
String url = callBack + "?callId=" + callId;
Request request = new Request.Builder().get().url(url).build();
try {
client.newCall(request).execute();
} catch (IOException e) {
log.error("[第三方PS服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e);
}
hookSubscribe.removeSubscribe(hook);
}
});
}
OtherPsSendInfo otherPsSendInfo = new OtherPsSendInfo();
otherPsSendInfo.setReceiveIp(mediaServer.getSdpIp());
otherPsSendInfo.setReceivePort(ssrcInfo.getPort());
otherPsSendInfo.setReceivePort(rtpServerPort);
otherPsSendInfo.setCallId(callId);
otherPsSendInfo.setStream(stream);
@ -149,7 +163,7 @@ public class PsController {
public void closeRtpServer(String stream) {
log.info("[第三方PS服务对接->关闭收流] stream->{}", stream);
MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
mediaServerService.closeRTPServer(mediaServerItem, stream);
receiveRtpServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream);
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_*_" + stream;
List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey);
if (!scan.isEmpty()) {

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.vmanager.rtp;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
@ -11,7 +12,10 @@ import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.IReceiveRtpServerService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.RTPServerParam;
import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
@ -43,6 +47,9 @@ public class RtpController {
@Autowired
private ISendRtpServerService sendRtpServerService;
@Autowired
private IReceiveRtpServerService receiveRtpServerService;
@Autowired
private HookSubscribe hookSubscribe;
@ -92,37 +99,52 @@ public class RtpController {
}
}
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + callId + "_" + stream;
SSRCInfo ssrcInfoForVideo = mediaServerService.openRTPServer(mediaServer, stream, ssrcInt + "",false,false, null, false, false, false, tcpMode);
SSRCInfo ssrcInfoForAudio = mediaServerService.openRTPServer(mediaServer, stream + "_a", ssrcInt + "", false, false, null, false,false,false, tcpMode);
if (ssrcInfoForVideo.getPort() == 0 || ssrcInfoForAudio.getPort() == 0) {
RTPServerParam rtpServerParam = new RTPServerParam();
rtpServerParam.setMediaServer(mediaServer);
rtpServerParam.setApp(MediaApp.GB28181);
rtpServerParam.setStreamId(stream);
rtpServerParam.setSsrc(ssrcInt);
rtpServerParam.setTcpMode(tcpMode);
int rtpServerPortForVideo = receiveRtpServerService.openRTPServer(rtpServerParam, ((code, msg, data) -> {
if (callBack == null) {
return;
}
if (code == InviteErrorCode.SUCCESS.getCode()) {
log.info("[开启收流和获取发流信息] 视频流收流成功callId->{}stream->{}", callId, stream);
OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
OkHttpClient client = httpClientBuilder.build();
String url = callBack + "?callId=" + callId;
Request request = new Request.Builder().get().url(url).build();
try {
client.newCall(request).execute();
} catch (IOException e) {
log.error("[第三方服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e);
}
}else {
log.info("[开启收流和获取发流信息] 视频流收流失败callId->{}stream->{}", callId, stream);
}
}));
rtpServerParam.setStreamId(stream + "_a");
int rtpServerPortForAudio = receiveRtpServerService.openRTPServer(rtpServerParam, ((code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
log.info("[开启收流和获取发流信息] 音频流收流成功callId->{}stream->{}", callId, stream);
}else {
log.info("[开启收流和获取发流信息] 音频流收流失败callId->{}stream->{}", callId, stream);
}
}));
if (rtpServerPortForVideo == 0 || rtpServerPortForAudio == 0) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败");
}
// 注册回调如果rtp收流超时则通过回调发送通知
if (callBack != null) {
Hook hook = Hook.getInstance(HookType.on_rtp_server_timeout, "rtp", stream, mediaServer.getId());
// 订阅 zlm启动事件, 新的zlm也会从这里进入系统
hookSubscribe.addSubscribe(hook,
(hookData)->{
if (stream.equals(hookData.getStream())) {
log.info("[开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调", callId);
OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
OkHttpClient client = httpClientBuilder.build();
String url = callBack + "?callId=" + callId;
Request request = new Request.Builder().get().url(url).build();
try {
client.newCall(request).execute();
} catch (IOException e) {
log.error("[第三方服务对接->开启收流和获取发流信息] 等待收流超时 callId->{}, 发送回调失败", callId, e);
}
hookSubscribe.removeSubscribe(hook);
}
});
}
String key = VideoManagerConstants.WVP_OTHER_SEND_RTP_INFO + userSetting.getServerId() + "_" + callId;
OtherRtpSendInfo otherRtpSendInfo = new OtherRtpSendInfo();
otherRtpSendInfo.setReceiveIp(mediaServer.getSdpIp());
otherRtpSendInfo.setReceivePortForVideo(ssrcInfoForVideo.getPort());
otherRtpSendInfo.setReceivePortForAudio(ssrcInfoForAudio.getPort());
otherRtpSendInfo.setReceivePortForVideo(rtpServerPortForVideo);
otherRtpSendInfo.setReceivePortForAudio(rtpServerPortForAudio);
otherRtpSendInfo.setCallId(callId);
otherRtpSendInfo.setStream(stream);
@ -152,8 +174,8 @@ public class RtpController {
public void closeRtpServer(String stream) {
log.info("[第三方服务对接->关闭收流] stream->{}", stream);
MediaServer mediaServerItem = mediaServerService.getDefaultMediaServer();
mediaServerService.closeRTPServer(mediaServerItem, stream);
mediaServerService.closeRTPServer(mediaServerItem, stream+ "_a");
receiveRtpServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream);
receiveRtpServerService.closeRTPServer(mediaServerItem, MediaApp.GB28181, stream+ "_a");
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_*_" + stream;
List<Object> scan = RedisUtil.scan(redisTemplate, receiveKey);
if (scan.size() > 0) {

View File

@ -4,6 +4,7 @@ import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.SsrcTransactionNotFoundException;
import com.genersoft.iot.vmp.gb28181.bean.Device;
@ -246,7 +247,7 @@ public class ApiStreamController {
}
try {
cmder.streamByeCmd(device, code, "rtp", inviteInfo.getStream(), null, null);
cmder.streamByeCmd(device, code, MediaApp.GB28181, inviteInfo.getStream(), null, null);
} catch (InvalidArgumentException | ParseException | SipException | SsrcTransactionNotFoundException e) {
JSONObject result = new JSONObject();
result.put("error","发送BYE失败" + e.getMessage());

View File

@ -5,4 +5,4 @@ spring:
virtual:
enabled: true
profiles:
active: 274-dev
active: dev

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -89,7 +89,7 @@ export default {
container: this.$refs.container,
videoBuffer: 0,
isResize: true,
useMSE: true,
useMSE: false,
useWCS: false,
text: '',
// background: '',