无人观看流程处理与接入设备类型解耦

This commit is contained in:
lin 2026-04-15 19:12:38 +08:00
parent f961d6fc8d
commit a1441016f7
11 changed files with 174 additions and 112 deletions

View File

@ -15,6 +15,7 @@ public class ChannelDataType {
public final static String PLAYBACK_SERVICE = "sourceChannelPlaybackService";
public final static String DOWNLOAD_SERVICE = "sourceChannelDownloadService";
public final static String PTZ_SERVICE = "sourceChannelPTZService";
public final static String OTHER_SERVICE = "sourceChannelOtherService";
public static String getDateTypeDesc(Integer dataType) {

View File

@ -31,7 +31,7 @@ public class MediaStreamUtil {
}
public static boolean isJT1078(String app, String streamId) {
return RTP_APP.equals(app) || streamId.startsWith(JT1078_STREAM_PREFIX);
return RTP_APP.equals(app) && streamId.startsWith(JT1078_STREAM_PREFIX);
}
public static String getJTPlayStreamId(String phoneNumber, int channelId) {
@ -39,11 +39,11 @@ public class MediaStreamUtil {
}
public static boolean isJT1078Play(String app, String stream) {
return RTP_APP.equals(app) || stream.startsWith(JT1078_STREAM_PLAY_PREFIX);
return RTP_APP.equals(app) && stream.startsWith(JT1078_STREAM_PLAY_PREFIX + "_");
}
public static boolean isJT1078Playback(String app, String stream) {
return RTP_APP.equals(app) || stream.startsWith(JT1078_STREAM_PLAYBACK_PREFIX);
return RTP_APP.equals(app) && stream.startsWith(JT1078_STREAM_PLAYBACK_PREFIX + "_");
}
public static boolean isJT1078Talk(String app, String stream) {
@ -61,4 +61,16 @@ public class MediaStreamUtil {
public static String getJTTalkReceiveStreamId(String phoneNumber, Integer channelId) {
return getJTTalkStreamId(phoneNumber, channelId) + "_receive";
}
public static String[] getJT1078StreamInfo(String app, String stream) {
if (!isJT1078(app, stream)) {
return null;
}
String[] streamInfoArray = stream.split("_");
if (streamInfoArray.length > 5) {
return new String[]{streamInfoArray[3], streamInfoArray[4], streamInfoArray[5], streamInfoArray[6]};
}else {
return new String[]{streamInfoArray[3], streamInfoArray[4]};
}
}
}

View File

@ -106,7 +106,6 @@ public class JwtUtils implements InitializingBean {
if (jwkFile == null || jwkFile.trim().isEmpty()) {
log.warn("[API AUTH] JWK文件路径未配置使用默认配置路径./config/jwk.json");
jwkFile = "config" + File.separator + "jwk.json"; // 默认外部路径
return createAndPersistDefaultRsaKey(jwkFile);
}
// 尝试读取JWK文件自动处理classpath/本地文件用try-with-resources自动关流无泄露

View File

@ -0,0 +1,10 @@
package com.genersoft.iot.vmp.gb28181.service;
/**
* 资源能力接入-其他
*/
public interface ISourceOtherService {
Boolean closeStreamOnNoneReader(String mediaServerId, String app, String stream, String schema);
}

View File

@ -431,8 +431,9 @@ public class PlayServiceImpl implements IPlayService {
null);
return null;
}
log.info("[点播开始] 设备编号: {}, 通道编号: {}, 收流端口: {}, 流ID{}, 收流模式:{}, SSRC: {}, SSRC校验{}",
device.getDeviceId(), channel.getDeviceId(), ssrcInfo.getPort(), ssrcInfo.getStream(), channel.getStreamIdentification(),
String sdpIp = !ObjectUtils.isEmpty(device.getSdpIp()) ? device.getSdpIp() : mediaServer.getSdpIp();
log.info("[点播开始] 设备编号: {}, 通道编号: {}, 收流地址: {}:{}, 流ID{}, 收流模式:{}, SSRC: {}, SSRC校验{}",
device.getDeviceId(), channel.getDeviceId(), sdpIp, ssrcInfo.getPort(), ssrcInfo.getStream(), device.getStreamMode(),
ssrcInfo.getSsrc(), device.isSsrcCheck());
// 初始化redis中的invite消息状态

View File

@ -0,0 +1,55 @@
package com.genersoft.iot.vmp.gb28181.service.impl;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionStatus;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.enums.ChannelDataType;
import com.genersoft.iot.vmp.common.enums.MediaStreamUtil;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.service.ISourceOtherService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service(ChannelDataType.OTHER_SERVICE + ChannelDataType.GB28181)
@RequiredArgsConstructor
public class SourceOtherServiceGorGbImpl implements ISourceOtherService {
private final IInviteStreamService inviteStreamService;
private final IDeviceChannelService deviceChannelService;
private final UserSetting userSetting;
@Override
public Boolean closeStreamOnNoneReader(String mediaServerId, String app, String stream, String schema) {
if (MediaStreamUtil.GB28181_TALK.equals(app) || MediaStreamUtil.GB28181_BROADCAST.equals(app)) {
// 国标对讲/广播流 直接关闭
return false;
}
if (!MediaStreamUtil.isGB28181(app, stream)) {
return null;
}
// 国标流 点播/录像回放/录像下载
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream);
if (inviteInfo == null) {
return null;
}
if (inviteInfo.getStatus() == InviteSessionStatus.ok) {
// 录像下载
if (inviteInfo.getType() == InviteSessionType.DOWNLOAD) {
return false;
}
DeviceChannel deviceChannel = deviceChannelService.getOneForSourceById(inviteInfo.getChannelId());
if (deviceChannel == null) {
return false;
}
}
return userSetting.getStreamOnDemand();
}
}

View File

@ -215,12 +215,8 @@ public class SIPCommander implements ISIPCommander {
if (device == null) {
return;
}
String sdpIp;
if (!ObjectUtils.isEmpty(device.getSdpIp())) {
sdpIp = device.getSdpIp();
}else {
sdpIp = mediaServerItem.getSdpIp();
}
String sdpIp = !ObjectUtils.isEmpty(device.getSdpIp()) ? device.getSdpIp() : mediaServerItem.getSdpIp();
StringBuffer content = new StringBuffer(200);
content.append("v=0\r\n");
content.append("o=" + device.getDeviceId() + " 0 0 IN IP4 " + sdpIp + "\r\n");

View File

@ -0,0 +1,46 @@
package com.genersoft.iot.vmp.jt1078.service.impl;
import com.genersoft.iot.vmp.common.enums.ChannelDataType;
import com.genersoft.iot.vmp.common.enums.MediaStreamUtil;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.service.ISourceOtherService;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078PlayService;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@Service(ChannelDataType.OTHER_SERVICE + ChannelDataType.JT_1078)
@RequiredArgsConstructor
public class SourceOtherServiceGorJTImpl implements ISourceOtherService {
private final UserSetting userSetting;
private final Ijt1078Service ijt1078Service;
private final Ijt1078PlayService jt1078PlayService;
@Override
public Boolean closeStreamOnNoneReader(String mediaServerId, String app, String stream, String schema) {
if (!MediaStreamUtil.isJT1078(app, stream)) {
return null;
}
if (userSetting.getStreamOnDemand()) {
String[] streamParamArray = MediaStreamUtil.getJT1078StreamInfo(app, stream);
if (streamParamArray == null || streamParamArray.length < 2) {
return true;
}
String phoneNumber = streamParamArray[0];
Integer channelId = Integer.parseInt(streamParamArray[1]);
// 判断是否是1078播放类型
if (MediaStreamUtil.isJT1078Play(app, stream)) {
jt1078PlayService.stopPlay(phoneNumber, channelId);
} else if (MediaStreamUtil.isJT1078Playback(app, stream)) {
jt1078PlayService.stopPlayback(phoneNumber, channelId);
}
return true;
}
return false;
}
}

View File

@ -1,18 +1,9 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.common.InviteInfo;
import com.genersoft.iot.vmp.common.InviteSessionStatus;
import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.enums.MediaStreamUtil;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.jt1078.bean.JTMediaStreamType;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078PlayService;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service;
import com.genersoft.iot.vmp.gb28181.service.ISourceOtherService;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
@ -25,53 +16,31 @@ import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
import com.genersoft.iot.vmp.utils.MediaServerUtils;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.Map;
@Slf4j
@Service
@RequiredArgsConstructor
public class MediaServiceImpl implements IMediaService {
@Autowired
private IRedisCatchStorage redisCatchStorage;
private final IRedisCatchStorage redisCatchStorage;
@Autowired
private IStreamProxyService streamProxyService;
private final IStreamProxyService streamProxyService;
@Autowired
private UserSetting userSetting;
private final UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private final IUserService userService;
@Autowired
private IUserService userService;
private final IReceiveRtpServerService receiveRtpServerService;
@Autowired
private IInviteStreamService inviteStreamService;
private final IRecordPlanService recordPlanService;
@Autowired
private IDeviceChannelService deviceChannelService;
private final Map<String, ISourceOtherService> sourceOtherServiceMap;
@Autowired
private SipInviteSessionManager sessionManager;
@Autowired
private Ijt1078Service ijt1078Service;
@Autowired
private Ijt1078PlayService jt1078PlayService;
@Autowired
private IReceiveRtpServerService receiveRtpServerService;
@Autowired
private IRecordPlanService recordPlanService;
@Override
public boolean authenticatePlay(String app, String stream, String callId) {
@ -147,69 +116,39 @@ public class MediaServiceImpl implements IMediaService {
@Override
public boolean closeStreamOnNoneReader(String mediaServerId, String app, String stream, String schema) {
boolean result = false;
if (recordPlanService.recording(app, stream) != null) {
return false;
}
// 国标类型的流
switch (app) {
case MediaStreamUtil.RTP_APP -> {
result = userSetting.getStreamOnDemand();
if (MediaStreamUtil.isGB28181(app, stream)) {
// 国标流 点播/录像回放/录像下载
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream);
if (inviteInfo != null) {
if (inviteInfo.getStatus() == InviteSessionStatus.ok) {
// 录像下载
if (inviteInfo.getType() == InviteSessionType.DOWNLOAD) {
return false;
}
DeviceChannel deviceChannel = deviceChannelService.getOneForSourceById(inviteInfo.getChannelId());
if (deviceChannel == null) {
return false;
}
}
}
if (MediaStreamUtil.LOAD_MP4_APP.equals(app)) {
// mp4点播流 无人观看不关闭
return true;
}
for (ISourceOtherService sourceOtherService : sourceOtherServiceMap.values()) {
try {
Boolean result = sourceOtherService.closeStreamOnNoneReader(mediaServerId, app, stream, schema);
if (result != null) {
return result;
} else if (MediaStreamUtil.isJT1078(app, stream)) {
// 判断是否是1078播放类型
JTMediaStreamType jtMediaStreamType = ijt1078Service.checkStreamFromJt(stream);
if (jtMediaStreamType != null) {
String[] streamParamArray = stream.split("_");
if (jtMediaStreamType.equals(JTMediaStreamType.PLAY)) {
jt1078PlayService.stopPlay(streamParamArray[0], Integer.parseInt(streamParamArray[1]));
} else if (jtMediaStreamType.equals(JTMediaStreamType.PLAYBACK)) {
jt1078PlayService.stopPlayback(streamParamArray[0], Integer.parseInt(streamParamArray[1]));
}
}
return false;
}
}
case MediaStreamUtil.GB28181_TALK, MediaStreamUtil.GB28181_BROADCAST -> {
return false;
}
case MediaStreamUtil.LOAD_MP4_APP -> {
return true;
}
case null, default -> {
// 非国标流 推流/拉流代理
// 拉流代理
StreamProxy streamProxy = streamProxyService.getStreamProxyByAppAndStream(app, stream);
if (streamProxy != null) {
if (streamProxy.isEnableDisableNoneReader()) {
// 无人观看停用
// 修改数据
streamProxyService.stopByAppAndStream(app, stream);
return true;
} else {
// 无人观看不做处理
return false;
}
} else {
return false;
}
}catch (Exception e) {
log.error("调用其他服务关闭无人观看流失败, app={}, stream={}, schema={}", app, stream, schema, e);
}
}
return result;
// 拉流代理
StreamProxy streamProxy = streamProxyService.getStreamProxyByAppAndStream(app, stream);
if (streamProxy != null) {
if (streamProxy.isEnableDisableNoneReader()) {
// 无人观看停用
// 修改数据
streamProxyService.stopByAppAndStream(app, stream);
return true;
} else {
// 无人观看不做处理
return false;
}
} else {
return userSetting.getStreamOnDemand();
}
}
}

View File

@ -377,7 +377,10 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
if (dynamicTask.contains(timeOutTaskKey)) {
dynamicTask.stop(timeOutTaskKey);
}
mediaServerService.closeRTPServer(mediaServer, app, stream);
if (mediaServer.isRtpEnable()) {
mediaServerService.closeRTPServer(mediaServer, app, stream);
}
mediaServerService.closeStreams(mediaServer, app, stream);
}
@Override

View File

@ -204,7 +204,7 @@ export function stopPlayback(params) {
const { phoneNumber, channelId, streamId } = params
return request({
method: 'get',
url: '/api/jt1078/playback/stop/',
url: '/api/jt1078/playback/stop',
params: {
phoneNumber: phoneNumber,
channelId: channelId,