调整流处理逻辑以支持单端口收流

This commit is contained in:
lin 2026-04-15 00:31:49 +08:00
parent 3e162ea358
commit 1e5ae10571
20 changed files with 196 additions and 239 deletions

View File

@ -35,16 +35,10 @@ public class InviteInfo {
private Long createTime; private Long createTime;
private Boolean record;
private String startTime;
private String endTime;
public static InviteInfo getInviteInfo(String deviceId, Integer channelId, String stream, SSRCInfo ssrcInfo, String mediaServerId, public static InviteInfo getInviteInfo(String deviceId, Integer channelId, String stream, SSRCInfo ssrcInfo, String mediaServerId,
String receiveIp, Integer receivePort, String streamMode, String receiveIp, Integer receivePort, String streamMode,
InviteSessionType type, InviteSessionStatus status, Boolean record) { InviteSessionType type, InviteSessionStatus status) {
InviteInfo inviteInfo = new InviteInfo(); InviteInfo inviteInfo = new InviteInfo();
inviteInfo.setDeviceId(deviceId); inviteInfo.setDeviceId(deviceId);
inviteInfo.setChannelId(channelId); inviteInfo.setChannelId(channelId);
@ -56,7 +50,6 @@ public class InviteInfo {
inviteInfo.setType(type); inviteInfo.setType(type);
inviteInfo.setStatus(status); inviteInfo.setStatus(status);
inviteInfo.setMediaServerId(mediaServerId); inviteInfo.setMediaServerId(mediaServerId);
inviteInfo.setRecord(record);
return inviteInfo; return inviteInfo;
} }

View File

@ -1,9 +1,11 @@
package com.genersoft.iot.vmp.common.enums; package com.genersoft.iot.vmp.common.enums;
public class MediaStreamUtil { public class MediaStreamUtil {
public final static String LOAD_MP4_APP = "mp4_record";
public final static String RTP_APP = "rtp"; public final static String RTP_APP = "rtp";
public final static String RTP_STREAM_REST_PREFIX = "s"; public final static String RTP_STREAM_REST_PREFIX = "s";
public final static String GB28181_TALK = "talk"; public final static String GB28181_TALK = "talk";
public final static String GB28181_BROADCAST = "broadcast"; public final static String GB28181_BROADCAST = "broadcast";
@ -24,11 +26,11 @@ public class MediaStreamUtil {
return GB28181_TALK.equals(app); return GB28181_TALK.equals(app);
} }
public static boolean isBroadcast(String app, String streamId) { public static boolean isBroadcast(String app, String streamId) {
return GB28181_BROADCAST.equals(app); return GB28181_BROADCAST.equals(app);
} }
public static boolean isJT1078(String app, String streamId) { public static boolean isJT1078(String app, String streamId) {
return RTP_APP.equals(app) || streamId.startsWith(JT1078_STREAM_PREFIX); return RTP_APP.equals(app) || streamId.startsWith(JT1078_STREAM_PREFIX);
} }

View File

@ -73,9 +73,7 @@ public class DeviceControl {
@Parameter(name = "guardCmd", description = "命令, 可选值SetGuard布防ResetGuard撤防", required = true) @Parameter(name = "guardCmd", description = "命令, 可选值SetGuard布防ResetGuard撤防", required = true)
@GetMapping("/guard") @GetMapping("/guard")
public DeferredResult<WVPResult<String>> guardApi(String deviceId, String guardCmd) { public DeferredResult<WVPResult<String>> guardApi(String deviceId, String guardCmd) {
if (log.isDebugEnabled()) { log.info("[布防/撤防] API调用, deviceId: {}, guardCmd: {}", deviceId, guardCmd);
log.debug("布防/撤防API调用");
}
Device device = deviceService.getDeviceByDeviceId(deviceId); Device device = deviceService.getDeviceByDeviceId(deviceId);
Assert.notNull(device, "设备不存在"); Assert.notNull(device, "设备不存在");
DeferredResult<WVPResult<String>> result = new DeferredResult<>(); DeferredResult<WVPResult<String>> result = new DeferredResult<>();
@ -109,9 +107,7 @@ public class DeviceControl {
public DeferredResult<WVPResult<String>> resetAlarm(String deviceId, String channelId, public DeferredResult<WVPResult<String>> resetAlarm(String deviceId, String channelId,
@RequestParam(required = false) String alarmMethod, @RequestParam(required = false) String alarmMethod,
@RequestParam(required = false) String alarmType) { @RequestParam(required = false) String alarmType) {
if (log.isDebugEnabled()) { log.info("[报警复位] deviceId: {}, channelId: {}, alarmMethod: {}, alarmType: {}", deviceId, channelId, alarmMethod, alarmType);
log.debug("报警复位API调用");
}
Device device = deviceService.getDeviceByDeviceId(deviceId); Device device = deviceService.getDeviceByDeviceId(deviceId);
Assert.notNull(device, "设备不存在"); Assert.notNull(device, "设备不存在");
DeferredResult<WVPResult<String>> result = new DeferredResult<>(); DeferredResult<WVPResult<String>> result = new DeferredResult<>();

View File

@ -67,9 +67,7 @@ public class GBRecordController {
@GetMapping("/query/{deviceId}/{channelId}") @GetMapping("/query/{deviceId}/{channelId}")
public DeferredResult<WVPResult<RecordInfo>> recordinfo(@PathVariable String deviceId, @PathVariable String channelId, String startTime, String endTime){ public DeferredResult<WVPResult<RecordInfo>> recordinfo(@PathVariable String deviceId, @PathVariable String channelId, String startTime, String endTime){
if (log.isDebugEnabled()) {
log.debug(String.format("录像信息查询 API调用deviceId%s startTime%s endTime%s",deviceId, startTime, endTime));
}
DeferredResult<WVPResult<RecordInfo>> result = new DeferredResult<>(Long.valueOf(userSetting.getRecordInfoTimeout()), TimeUnit.MILLISECONDS); DeferredResult<WVPResult<RecordInfo>> result = new DeferredResult<>(Long.valueOf(userSetting.getRecordInfoTimeout()), TimeUnit.MILLISECONDS);
if (!DateUtil.verification(startTime, DateUtil.formatter)){ if (!DateUtil.verification(startTime, DateUtil.formatter)){
throw new ControllerException(ErrorCode.ERROR100.getCode(), "startTime格式为" + DateUtil.PATTERN); throw new ControllerException(ErrorCode.ERROR100.getCode(), "startTime格式为" + DateUtil.PATTERN);
@ -113,9 +111,7 @@ public class GBRecordController {
public DeferredResult<WVPResult<StreamContent>> download(HttpServletRequest request, @PathVariable String deviceId, @PathVariable String channelId, public DeferredResult<WVPResult<StreamContent>> download(HttpServletRequest request, @PathVariable String deviceId, @PathVariable String channelId,
String startTime, String endTime, String downloadSpeed) { String startTime, String endTime, String downloadSpeed) {
if (log.isDebugEnabled()) { log.info("[开始历史媒体下载] deviceId: {}, channelId: {}, startTime: {}, endTime: {}, downloadSpeed: {}", deviceId, channelId, startTime, endTime, downloadSpeed);
log.debug(String.format("历史媒体下载 API调用deviceId%schannelId%sdownloadSpeed%s", deviceId, channelId, downloadSpeed));
}
String uuid = UUID.randomUUID().toString(); String uuid = UUID.randomUUID().toString();
String key = DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId; String key = DeferredResultHolder.CALLBACK_CMD_DOWNLOAD + deviceId + channelId;

View File

@ -80,9 +80,7 @@ public class PlaybackController {
public DeferredResult<WVPResult<StreamContent>> start(HttpServletRequest request, @PathVariable String deviceId, @PathVariable String channelId, public DeferredResult<WVPResult<StreamContent>> start(HttpServletRequest request, @PathVariable String deviceId, @PathVariable String channelId,
String startTime, String endTime) { String startTime, String endTime) {
if (log.isDebugEnabled()) { log.info("[录像回放] deviceId: {}, channelId: {}, startTime: {}, endTime: {}", deviceId, channelId, startTime, endTime);
log.debug(String.format("设备回放 API调用deviceId%s channelId%s", deviceId, channelId));
}
String uuid = UUID.randomUUID().toString(); String uuid = UUID.randomUUID().toString();
String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId; String key = DeferredResultHolder.CALLBACK_CMD_PLAYBACK + deviceId + channelId;

View File

@ -609,6 +609,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
@Override @Override
public void queryRecordInfo(Device device, DeviceChannel channel, String startTime, String endTime, ErrorCallback<RecordInfo> callback) { public void queryRecordInfo(Device device, DeviceChannel channel, String startTime, String endTime, ErrorCallback<RecordInfo> callback) {
log.info("录像查询 API调用deviceId{}channelId{}startTime{}endTime{}", device.getDeviceId(), channel.getDeviceId(), startTime, endTime);
if (!userSetting.getServerId().equals(device.getServerId())){ if (!userSetting.getServerId().equals(device.getServerId())){
redisRpcPlayService.queryRecordInfo(device.getServerId(), channel.getId(), startTime, endTime, callback); redisRpcPlayService.queryRecordInfo(device.getServerId(), channel.getId(), startTime, endTime, callback);
return; return;

View File

@ -650,7 +650,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
// 初始化redis中的invite消息状态 // 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getInviteInfo(platform.getServerGBId(), channel.getGbId(), ssrcInfo.getStream(), ssrcInfo, mediaServerItem.getId(), InviteInfo inviteInfo = InviteInfo.getInviteInfo(platform.getServerGBId(), channel.getGbId(), ssrcInfo.getStream(), ssrcInfo, mediaServerItem.getId(),
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), InviteSessionType.BROADCAST, mediaServerItem.getSdpIp(), ssrcInfo.getPort(), userSetting.getBroadcastForPlatform(), InviteSessionType.BROADCAST,
InviteSessionStatus.ready, userSetting.getRecordSip()); InviteSessionStatus.ready);
inviteStreamService.updateInviteInfo(inviteInfo); inviteStreamService.updateInviteInfo(inviteInfo);
commanderForPlatform.broadcastInviteCmd(platform, channel,sourceId, mediaServerItem, ssrcInfo, event -> { commanderForPlatform.broadcastInviteCmd(platform, channel,sourceId, mediaServerItem, ssrcInfo, event -> {
inviteOKHandler(event, ssrcInfo, false, mediaServerItem, platform, channel, inviteOKHandler(event, ssrcInfo, false, mediaServerItem, platform, channel,

View File

@ -376,7 +376,9 @@ public class PlayServiceImpl implements IPlayService {
String streamId = String.format("%s_%s", device.getDeviceId(), channel.getDeviceId()); String streamId = String.format("%s_%s", device.getDeviceId(), channel.getDeviceId());
SSRCInfo ssrcInfo = receiveRtpServerService.openGbRTPServerForPlay(mediaServer, device, channel, ssrc, (code, msg, result) -> { SSRCInfo ssrcInfo = receiveRtpServerService.openGbRTPServerForPlay(mediaServer, device, channel, ssrc,
record != null ? record : userSetting.getRecordSip(),
(code, msg, result) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) { if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) {
// hook 响应 // hook 响应
@ -436,13 +438,7 @@ public class PlayServiceImpl implements IPlayService {
// 初始化redis中的invite消息状态 // 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServer.getId(), InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServer.getId(),
mediaServer.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY, mediaServer.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAY,
InviteSessionStatus.ready, userSetting.getRecordSip()); InviteSessionStatus.ready);
if (record != null) {
inviteInfo.setRecord(record);
}else {
inviteInfo.setRecord(userSetting.getRecordSip());
}
inviteStreamService.updateInviteInfo(inviteInfo); inviteStreamService.updateInviteInfo(inviteInfo);
try { try {
@ -542,6 +538,9 @@ public class PlayServiceImpl implements IPlayService {
return; return;
} }
sendRtpInfo.setPort(localPort); sendRtpInfo.setPort(localPort);
// 增加鉴权信息
receiveRtpServerService.addAuthenticateInfoForGb28181Talk(mediaServerItem, sendRtpInfo.getStream());
}catch (ControllerException e) { }catch (ControllerException e) {
mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease()); mediaServerService.releaseSsrc(mediaServerItem.getId(), sendRtpInfo.getSsrcToRelease());
log.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channel.getDeviceId()); log.info("[语音对讲]失败 deviceId: {}, channelId: {}", device.getDeviceId(), channel.getDeviceId());
@ -760,16 +759,9 @@ public class PlayServiceImpl implements IPlayService {
Device device, DeviceChannel channel, String startTime, Device device, DeviceChannel channel, String startTime,
String endTime, ErrorCallback<StreamInfo> callback) { String endTime, ErrorCallback<StreamInfo> callback) {
String startTimeStr = startTime.replace("-", "") String stream = receiveRtpServerService.getPlaybackStream(device, channel, startTime, endTime);
.replace(":", "")
.replace(" ", "");
String endTimeTimeStr = endTime.replace("-", "")
.replace(":", "")
.replace(" ", "");
String stream = device.getDeviceId() + "_" + channel.getDeviceId() + "_" + startTimeStr + "_" + endTimeTimeStr; SSRCInfo ssrcInfo = receiveRtpServerService.openGbRTPServerForPlayback(mediaServer, device, channel, startTime, endTime, (code, msg, result) -> {
SSRCInfo ssrcInfo = receiveRtpServerService.openGbRTPServerForPlayback(mediaServer, device, channel, startTimeStr, endTimeTimeStr, (code, msg, result) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) { if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) {
// hook响应 // hook响应
StreamInfo streamInfo = onPublishHandlerForPlayback(result.getHookData().getMediaServer(), result.getHookData().getMediaInfo(), device, channel, startTime, endTime); StreamInfo streamInfo = onPublishHandlerForPlayback(result.getHookData().getMediaServer(), result.getHookData().getMediaInfo(), device, channel, startTime, endTime);
@ -812,12 +804,12 @@ public class PlayServiceImpl implements IPlayService {
} }
log.info("[录像回放] deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验{}", log.info("[录像回放] deviceId: {}, channelId: {}, 开始时间: {}, 结束时间: {}, 收流端口:{}, 收流模式:{}, SSRC: {}, SSRC校验{}",
device.getDeviceId(), channel.getGbDeviceId(), startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(), device.getDeviceId(), channel.getDeviceId(), startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(),
ssrcInfo.getSsrc(), device.isSsrcCheck()); ssrcInfo.getSsrc(), device.isSsrcCheck());
// 初始化redis中的invite消息状态 // 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServer.getId(), InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServer.getId(),
mediaServer.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK, mediaServer.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK,
InviteSessionStatus.ready, userSetting.getRecordSip()); InviteSessionStatus.ready);
inviteStreamService.updateInviteInfo(inviteInfo); inviteStreamService.updateInviteInfo(inviteInfo);
try { try {
@ -1005,7 +997,7 @@ public class PlayServiceImpl implements IPlayService {
return; return;
} }
SSRCInfo ssrcInfo = receiveRtpServerService.openGbRTPServerForDownload(mediaServer, device, channel, (code, msg, result) -> { SSRCInfo ssrcInfo = receiveRtpServerService.openGbRTPServerForDownload(mediaServer, device, channel, startTime, endTime, (code, msg, result) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) { if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) {
// hook响应 // hook响应
StreamInfo streamInfo = onPublishHandlerForDownload(mediaServer, result.getHookData().getMediaInfo(), device, channel, startTime, endTime); StreamInfo streamInfo = onPublishHandlerForDownload(mediaServer, result.getHookData().getMediaInfo(), device, channel, startTime, endTime);
@ -1056,10 +1048,7 @@ public class PlayServiceImpl implements IPlayService {
// 初始化redis中的invite消息状态 // 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServer.getId(), InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServer.getId(),
mediaServer.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.DOWNLOAD, mediaServer.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.DOWNLOAD,
InviteSessionStatus.ready, true); InviteSessionStatus.ready);
inviteInfo.setStartTime(startTime);
inviteInfo.setEndTime(endTime);
inviteStreamService.updateInviteInfo(inviteInfo); inviteStreamService.updateInviteInfo(inviteInfo);
try { try {
cmder.downloadStreamCmd(mediaServer, ssrcInfo, device, channel, startTime, endTime, downloadSpeed, cmder.downloadStreamCmd(mediaServer, ssrcInfo, device, channel, startTime, endTime, downloadSpeed,
@ -1722,6 +1711,7 @@ public class PlayServiceImpl implements IPlayService {
if (!userSetting.getServerId().equals(device.getServerId())) { if (!userSetting.getServerId().equals(device.getServerId())) {
redisRpcPlayService.stop(device.getServerId(), type, channel.getId(), stream); redisRpcPlayService.stop(device.getServerId(), type, channel.getId(), stream);
}else { }else {
log.info("[停止点播/回放/下载] {}/{}", device.getDeviceId(), channel.getDeviceId());
InviteInfo inviteInfo = inviteStreamService.getInviteInfo(type, channel.getId(), stream); InviteInfo inviteInfo = inviteStreamService.getInviteInfo(type, channel.getId(), stream);
if (inviteInfo == null) { if (inviteInfo == null) {
if (type == InviteSessionType.PLAY) { if (type == InviteSessionType.PLAY) {
@ -1732,7 +1722,7 @@ public class PlayServiceImpl implements IPlayService {
inviteStreamService.removeInviteInfo(inviteInfo); inviteStreamService.removeInviteInfo(inviteInfo);
if (InviteSessionStatus.ok == inviteInfo.getStatus()) { if (InviteSessionStatus.ok == inviteInfo.getStatus()) {
try { try {
log.info("[停止点播/回放/下载] {}/{}", device.getDeviceId(), channel.getDeviceId()); log.info("[停止点播/回放/下载] 成功 {}/{}", device.getDeviceId(), channel.getDeviceId());
cmder.streamByeCmd(device, channel.getDeviceId(), MediaStreamUtil.RTP_APP, inviteInfo.getStream(), null, null); cmder.streamByeCmd(device, channel.getDeviceId(), MediaStreamUtil.RTP_APP, inviteInfo.getStream(), null, null);
} catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) { } catch (InvalidArgumentException | SipException | ParseException | SsrcTransactionNotFoundException e) {
log.error("[命令发送失败] 停止点播/回放/下载, 发送BYE: {}", e.getMessage()); log.error("[命令发送失败] 停止点播/回放/下载, 发送BYE: {}", e.getMessage());

View File

@ -15,4 +15,6 @@ public class JT1078Config {
private Integer port; private Integer port;
private String password; private String password;
private Boolean record = false;
} }

View File

@ -10,6 +10,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.jt1078.bean.*; import com.genersoft.iot.vmp.jt1078.bean.*;
import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template; import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template;
import com.genersoft.iot.vmp.jt1078.config.JT1078Config;
import com.genersoft.iot.vmp.jt1078.proc.request.J1205; import com.genersoft.iot.vmp.jt1078.proc.request.J1205;
import com.genersoft.iot.vmp.jt1078.proc.response.*; import com.genersoft.iot.vmp.jt1078.proc.response.*;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078PlayService; import com.genersoft.iot.vmp.jt1078.service.Ijt1078PlayService;
@ -36,6 +37,7 @@ import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
@ -80,6 +82,9 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
@Autowired @Autowired
private UserSetting userSetting; private UserSetting userSetting;
@Autowired
private JT1078Config jt1078Config;
/** /**
* 流到来的处理 * 流到来的处理
*/ */
@ -195,7 +200,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
private void play(JTDevice device, JTChannel channel, int type, CommonCallback<WVPResult<StreamInfo>> callback) { private void play(JTDevice device, JTChannel channel, int type, CommonCallback<WVPResult<StreamInfo>> callback) {
String phoneNumber = device.getPhoneNumber(); String phoneNumber = device.getPhoneNumber();
int channelId = channel.getChannelId(); int channelId = channel.getChannelId();
String stream = MediaStreamUtil.getJTPlayStreamId(phoneNumber, channelId); String finalStream = MediaStreamUtil.getJTPlayStreamId(phoneNumber, channelId);
// 检查流是否已经存在存在则返回 // 检查流是否已经存在存在则返回
String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId; String playKey = VideoManagerConstants.INVITE_INFO_1078_PLAY + phoneNumber + ":" + channelId;
List<CommonCallback<WVPResult<StreamInfo>>> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>()); List<CommonCallback<WVPResult<StreamInfo>>> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>());
@ -230,11 +235,23 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
} }
return; return;
} }
String streamId;
String streamReplace = null;
if (mediaServer.isRtpEnable()) {
log.info("[JT-点播] 媒体服务器支持rtp开启rtp点播 phoneNumber {} channelId {}", phoneNumber, channelId);
streamId = finalStream;
}else {
String phone = StringUtils.leftPad(device.getPhoneNumber(), 12, '0');
streamId = String.format("%s_%s", phone, channelId);
streamReplace = finalStream;
}
// 开启收流端口 // 开启收流端口
RTPServerParam rtpServerParam = new RTPServerParam(); RTPServerParam rtpServerParam = new RTPServerParam();
rtpServerParam.setMediaServer(mediaServer); rtpServerParam.setMediaServer(mediaServer);
rtpServerParam.setApp(MediaStreamUtil.RTP_APP); rtpServerParam.setApp(MediaStreamUtil.RTP_APP);
rtpServerParam.setStreamId(stream); rtpServerParam.setStreamId(finalStream);
rtpServerParam.setPort(0); rtpServerParam.setPort(0);
rtpServerParam.setTcpMode(1); // 1 表示tcp被动 rtpServerParam.setTcpMode(1); // 1 表示tcp被动
rtpServerParam.setOnlyAuto(false); rtpServerParam.setOnlyAuto(false);
@ -260,7 +277,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
String fileName = phoneNumber + "_" + channelId + ".jpg"; String fileName = phoneNumber + "_" + channelId + ".jpg";
// 请求截图 // 请求截图
log.info("[请求截图]: {}", fileName); log.info("[请求截图]: {}", fileName);
mediaServerService.getSnap(mediaServer, MediaStreamUtil.RTP_APP, stream, 15, 1, path, fileName); mediaServerService.getSnap(mediaServer, MediaStreamUtil.RTP_APP, finalStream, 15, 1, path, fileName);
}else { }else {
if (callback != null) { if (callback != null) {
callback.run(WVPResult.fail(code, msg)); callback.run(WVPResult.fail(code, msg));
@ -277,6 +294,8 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
stopPlay(phoneNumber, channelId); stopPlay(phoneNumber, channelId);
return; return;
} }
// 补充鉴权参数
receiveRtpServerService.addAuthenticateInfo(streamId, streamReplace, !channel.isHasAudio(), jt1078Config.getRecord(), null);
log.info("[JT-点播] phoneNumber {} channelId {}IP: {}, 端口: {}", phoneNumber, channelId, mediaServer.getSdpIp(), port); log.info("[JT-点播] phoneNumber {} channelId {}IP: {}, 端口: {}", phoneNumber, channelId, mediaServer.getSdpIp(), port);
J9101 j9101 = new J9101(); J9101 j9101 = new J9101();
@ -434,7 +453,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
} }
String app = MediaStreamUtil.RTP_APP; String app = MediaStreamUtil.RTP_APP;
String stream = MediaStreamUtil.getJTPlaybackStreamId(phoneNumber, channelId, String finalStream = MediaStreamUtil.getJTPlaybackStreamId(phoneNumber, channelId,
DateUtil.yyyy_MM_dd_HH_mm_ssToUrl(startTime), DateUtil.yyyy_MM_dd_HH_mm_ssToUrl(endTime)); DateUtil.yyyy_MM_dd_HH_mm_ssToUrl(startTime), DateUtil.yyyy_MM_dd_HH_mm_ssToUrl(endTime));
MediaServer mediaServer; MediaServer mediaServer;
if (org.springframework.util.ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) { if (org.springframework.util.ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
@ -448,12 +467,22 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
} }
return; return;
} }
String streamId;
String streamReplace = null;
if (mediaServer.isRtpEnable()) {
log.info("[JT-点播] 媒体服务器支持rtp开启rtp点播 phoneNumber {} channelId {}", phoneNumber, channelId);
streamId = finalStream;
}else {
String phone = StringUtils.leftPad(device.getPhoneNumber(), 12, '0');
streamId = String.format("%s_%s", phone, channelId);
streamReplace = finalStream;
}
// 开启收流端口 // 开启收流端口
RTPServerParam rtpServerParam = new RTPServerParam(); RTPServerParam rtpServerParam = new RTPServerParam();
rtpServerParam.setMediaServer(mediaServer); rtpServerParam.setMediaServer(mediaServer);
rtpServerParam.setApp(MediaStreamUtil.RTP_APP); rtpServerParam.setApp(MediaStreamUtil.RTP_APP);
rtpServerParam.setStreamId(stream); rtpServerParam.setStreamId(finalStream);
rtpServerParam.setPort(0); rtpServerParam.setPort(0);
rtpServerParam.setTcpMode(1); // 1 表示tcp被动 rtpServerParam.setTcpMode(1); // 1 表示tcp被动
rtpServerParam.setOnlyAuto(false); rtpServerParam.setOnlyAuto(false);
@ -479,10 +508,14 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
errorCallback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(), errorCallback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null)); InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null));
} }
receiveRtpServerService.closeRTPServer(mediaServer, app, stream); receiveRtpServerService.closeRTPServer(mediaServer, app, finalStream);
} }
}); });
log.info("[JT-回放] logInfo {} 端口: {}", logInfo, port); log.info("[JT-回放] logInfo {} 端口: {}", logInfo, port);
// 补充鉴权参数
receiveRtpServerService.addAuthenticateInfo(streamId, streamReplace, !channel.isHasAudio(), jt1078Config.getRecord(), null);
J9201 j9201 = new J9201(); J9201 j9201 = new J9201();
j9201.setChannel(channelId); j9201.setChannel(channelId);
j9201.setIp(mediaServer.getSdpIp()); j9201.setIp(mediaServer.getSdpIp());

View File

@ -1,59 +1,16 @@
package com.genersoft.iot.vmp.media.bean; package com.genersoft.iot.vmp.media.bean;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class ResultForOnPublish { public class ResultForOnPublish {
private boolean enable_audio; private Boolean enable_audio;
private boolean enable_mp4; private Boolean enable_mp4;
private int mp4_max_second; private Integer mp4_max_second;
private String mp4_save_path; private String mp4_save_path;
private String stream_replace; private String stream_replace;
private Integer modify_stamp; private Integer modify_stamp;
public boolean isEnable_audio() {
return enable_audio;
}
public void setEnable_audio(boolean enable_audio) {
this.enable_audio = enable_audio;
}
public boolean isEnable_mp4() {
return enable_mp4;
}
public void setEnable_mp4(boolean enable_mp4) {
this.enable_mp4 = enable_mp4;
}
public int getMp4_max_second() {
return mp4_max_second;
}
public void setMp4_max_second(int mp4_max_second) {
this.mp4_max_second = mp4_max_second;
}
public String getMp4_save_path() {
return mp4_save_path;
}
public void setMp4_save_path(String mp4_save_path) {
this.mp4_save_path = mp4_save_path;
}
public String getStream_replace() {
return stream_replace;
}
public void setStream_replace(String stream_replace) {
this.stream_replace = stream_replace;
}
public Integer getModify_stamp() {
return modify_stamp;
}
public void setModify_stamp(Integer modify_stamp) {
this.modify_stamp = modify_stamp;
}
} }

View File

@ -516,7 +516,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
@Override @Override
public void loadMP4File(MediaServer mediaServer, String app, String stream, String filePath, String fileName, ErrorCallback<StreamInfo> callback) { public void loadMP4File(MediaServer mediaServer, String app, String stream, String filePath, String fileName, ErrorCallback<StreamInfo> callback) {
String buildApp = "mp4_record"; String buildApp = MediaStreamUtil.LOAD_MP4_APP;
String buildStream = app + "_" + stream + "_" + fileName + "_" + RandomStringUtils.randomAlphabetic(6).toLowerCase(); String buildStream = app + "_" + stream + "_" + fileName + "_" + RandomStringUtils.randomAlphabetic(6).toLowerCase();
Hook hook = Hook.getInstance(HookType.on_media_arrival, buildApp, buildStream, mediaServer.getServerId()); Hook hook = Hook.getInstance(HookType.on_media_arrival, buildApp, buildStream, mediaServer.getServerId());
@ -539,7 +539,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
@Override @Override
public void loadMP4FileForDate(MediaServer mediaServer, String app, String stream, String date, String dateDir, ErrorCallback<StreamInfo> callback) { public void loadMP4FileForDate(MediaServer mediaServer, String app, String stream, String date, String dateDir, ErrorCallback<StreamInfo> callback) {
String buildApp = "mp4_record"; String buildApp = MediaStreamUtil.LOAD_MP4_APP;
String buildStream = app + "_" + stream + "_" + date; String buildStream = app + "_" + stream + "_" + date;
MediaInfo mediaInfo = getMediaInfo(mediaServer, buildApp, buildStream); MediaInfo mediaInfo = getMediaInfo(mediaServer, buildApp, buildStream);
if (mediaInfo != null) { if (mediaInfo != null) {

View File

@ -211,6 +211,16 @@ public class ZLMMediaServerStatusManager {
String key = "zlm-keepalive-" + mediaServer.getId(); String key = "zlm-keepalive-" + mediaServer.getId();
dynamicTask.startDelay(key, ()->{ dynamicTask.startDelay(key, ()->{
log.warn("[ZLM-心跳超时] ID{}", mediaServer.getId()); log.warn("[ZLM-心跳超时] ID{}", mediaServer.getId());
// 主动探测一次避免因短暂网络抖动误判离线
ZLMResult<List<JSONObject>> probeResult = zlmresTfulUtils.getMediaServerConfig(mediaServer);
if (probeResult != null && probeResult.getData() != null && !probeResult.getData().isEmpty()) {
log.info("[ZLM-心跳超时] 主动探测成功,服务仍在线,重置心跳计时器 ID{}", mediaServer.getId());
ZLMServerConfig zlmServerConfig = JSON.parseObject(JSON.toJSONString(probeResult.getData().get(0)), ZLMServerConfig.class);
initPort(mediaServer, zlmServerConfig);
online(mediaServer, zlmServerConfig);
return;
}
log.warn("[ZLM-心跳超时] 主动探测失败,确认离线 ID{}", mediaServer.getId());
mediaServer.setStatus(false); mediaServer.setStatus(false);
offlineZlmPrimaryMap.put(mediaServer.getId(), mediaServer); offlineZlmPrimaryMap.put(mediaServer.getId(), mediaServer);
offlineZlmTimeMap.put(mediaServer.getId(), System.currentTimeMillis()); offlineZlmTimeMap.put(mediaServer.getId(), System.currentTimeMillis());

View File

@ -8,9 +8,9 @@ import lombok.Setter;
@Getter @Getter
public class HookResultForOnPublish extends HookResult{ public class HookResultForOnPublish extends HookResult{
private boolean enable_audio; private Boolean enable_audio;
private boolean enable_mp4; private Boolean enable_mp4;
private int mp4_max_second; private Integer mp4_max_second;
private String mp4_save_path; private String mp4_save_path;
private String stream_replace; private String stream_replace;
private Integer modify_stamp; private Integer modify_stamp;
@ -24,8 +24,8 @@ public class HookResultForOnPublish extends HookResult{
public static HookResultForOnPublish getInstance(ResultForOnPublish resultForOnPublish){ public static HookResultForOnPublish getInstance(ResultForOnPublish resultForOnPublish){
HookResultForOnPublish successResult = new HookResultForOnPublish(0, "success"); HookResultForOnPublish successResult = new HookResultForOnPublish(0, "success");
successResult.setEnable_audio(resultForOnPublish.isEnable_audio()); successResult.setEnable_audio(resultForOnPublish.getEnable_audio());
successResult.setEnable_mp4(resultForOnPublish.isEnable_mp4()); successResult.setEnable_mp4(resultForOnPublish.getEnable_mp4());
successResult.setModify_stamp(resultForOnPublish.getModify_stamp()); successResult.setModify_stamp(resultForOnPublish.getModify_stamp());
successResult.setStream_replace(resultForOnPublish.getStream_replace()); successResult.setStream_replace(resultForOnPublish.getStream_replace());
successResult.setMp4_max_second(resultForOnPublish.getMp4_max_second()); successResult.setMp4_max_second(resultForOnPublish.getMp4_max_second());

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.service;
import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
import com.genersoft.iot.vmp.media.event.hook.HookData; import com.genersoft.iot.vmp.media.event.hook.HookData;
import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.RTPServerParam; import com.genersoft.iot.vmp.service.bean.RTPServerParam;
@ -14,13 +15,15 @@ public interface IReceiveRtpServerService {
ErrorCallback<OpenRTPServerResult> callback); ErrorCallback<OpenRTPServerResult> callback);
SSRCInfo openGbRTPServerForPlay(MediaServer mediaServer, Device device, DeviceChannel channel, SSRCInfo openGbRTPServerForPlay(MediaServer mediaServer, Device device, DeviceChannel channel,
String presetSSRC, ErrorCallback<OpenRTPServerResult> callback); String presetSSRC, boolean record, ErrorCallback<OpenRTPServerResult> callback);
SSRCInfo openGbRTPServerForPlayback(MediaServer mediaServer, Device device, DeviceChannel channel, SSRCInfo openGbRTPServerForPlayback(MediaServer mediaServer, Device device, DeviceChannel channel,
String startTime, String endTime, ErrorCallback<OpenRTPServerResult> callback); String startTime, String endTime, ErrorCallback<OpenRTPServerResult> callback);
SSRCInfo openGbRTPServerForDownload(MediaServer mediaServer, Device device, DeviceChannel channel, SSRCInfo openGbRTPServerForDownload(MediaServer mediaServer, Device device, DeviceChannel channel,
ErrorCallback<OpenRTPServerResult> callback); String startTime, String endTime, ErrorCallback<OpenRTPServerResult> callback);
String getPlaybackStream(Device device, DeviceChannel channel, String startTime, String endTime);
SSRCInfo openGbRTPServerForBroadcast(MediaServer mediaServer, Platform platform, CommonGBChannel channel, SSRCInfo openGbRTPServerForBroadcast(MediaServer mediaServer, Platform platform, CommonGBChannel channel,
ErrorCallback<OpenRTPServerResult> callback); ErrorCallback<OpenRTPServerResult> callback);
@ -30,4 +33,10 @@ public interface IReceiveRtpServerService {
void closeRTPServer(MediaServer mediaServer, String app, String stream); void closeRTPServer(MediaServer mediaServer, String app, String stream);
void closeRTPServerByMediaServerId(String mediaServerId, String app, String stream); void closeRTPServerByMediaServerId(String mediaServerId, String app, String stream);
void addAuthenticateInfoForGb28181Talk(MediaServer mediaServer, String streamId);
void addAuthenticateInfo(String streamId, String streamReplace, Boolean enableAudio, Boolean enableMp4, Integer mp4MaxSecond);
ResultForOnPublish getAuthenticateInfo(String streamId);
} }

View File

@ -18,10 +18,7 @@ import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service;
import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.bean.ResultForOnPublish; import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo; import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.service.IMediaService; import com.genersoft.iot.vmp.service.*;
import com.genersoft.iot.vmp.service.IRecordPlanService;
import com.genersoft.iot.vmp.service.ISendRtpServerService;
import com.genersoft.iot.vmp.service.IUserService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy; import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService; import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
@ -72,7 +69,7 @@ public class MediaServiceImpl implements IMediaService {
private Ijt1078PlayService jt1078PlayService; private Ijt1078PlayService jt1078PlayService;
@Autowired @Autowired
private ISendRtpServerService sendRtpServerService; private IReceiveRtpServerService receiveRtpServerService;
@Autowired @Autowired
@ -95,23 +92,24 @@ public class MediaServiceImpl implements IMediaService {
@Override @Override
public ResultForOnPublish authenticatePublish(MediaServer mediaServer, String app, String stream, String params) { public ResultForOnPublish authenticatePublish(MediaServer mediaServer, String app, String stream, String params) {
// 推流鉴权的处理
if (!MediaStreamUtil.RTP_APP.equals(app)) { if (MediaStreamUtil.RTP_APP.equals(app)) {
if (MediaStreamUtil.GB28181_TALK.equals(app) || MediaStreamUtil.JT_TALK.equals(app)) { return receiveRtpServerService.getAuthenticateInfo(stream);
ResultForOnPublish result = new ResultForOnPublish(); }else {
ResultForOnPublish result = new ResultForOnPublish();
// app RTP_APP 的流 如果是国标对讲或者广播则默认获取声音并且不录制 其他的流先查询是否有代理配置如果没有代理配置再进行鉴权
if (MediaStreamUtil.GB28181_TALK.equals(app) || MediaStreamUtil.GB28181_BROADCAST.equals(app) || MediaStreamUtil.JT_TALK.equals(app)) {
result.setEnable_mp4(false); result.setEnable_mp4(false);
result.setEnable_audio(true); result.setEnable_audio(true);
return result; return result;
} }
if ("mp4_record".equals(app) ) { if (MediaStreamUtil.LOAD_MP4_APP.equals(app) ) {
ResultForOnPublish result = new ResultForOnPublish();
result.setEnable_mp4(false); result.setEnable_mp4(false);
result.setEnable_audio(true); result.setEnable_audio(true);
return result; return result;
} }
StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, stream); StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, stream);
if (streamProxyItem != null) { if (streamProxyItem != null) {
ResultForOnPublish result = new ResultForOnPublish();
result.setEnable_audio(streamProxyItem.isEnableAudio()); result.setEnable_audio(streamProxyItem.isEnableAudio());
result.setEnable_mp4(streamProxyItem.isEnableMp4()); result.setEnable_mp4(streamProxyItem.isEnableMp4());
return result; return result;
@ -144,90 +142,9 @@ public class MediaServiceImpl implements IMediaService {
// 鉴权通过 // 鉴权通过
redisCatchStorage.updateStreamAuthorityInfo(app, stream, streamAuthorityInfo); redisCatchStorage.updateStreamAuthorityInfo(app, stream, streamAuthorityInfo);
} }
}
ResultForOnPublish result = new ResultForOnPublish();
result.setEnable_audio(true);
// RTP SERVER 收流
if (MediaStreamUtil.isGB28181(app, stream)) {
InviteInfo inviteInfo = inviteStreamService.getInviteInfoByStream(null, stream);
if (inviteInfo != null) {
result.setEnable_mp4(inviteInfo.getRecord());
}else {
result.setEnable_mp4(userSetting.getRecordSip());
}
// 单端口模式下修改流 ID
if (!mediaServer.isRtpEnable() && inviteInfo == null) {
String ssrc = String.format("%010d", Long.parseLong(stream, 16));
inviteInfo = inviteStreamService.getInviteInfoBySSRC(ssrc);
if (inviteInfo != null) {
result.setStream_replace(inviteInfo.getStream());
log.info("[HOOK]推流鉴权 stream: {} 替换为 {}", stream, inviteInfo.getStream());
stream = inviteInfo.getStream();
}
}
// 设置音频信息及录制信息
SsrcTransaction ssrcTransaction = sessionManager.getSsrcTransactionByStream(app, stream);
if (ssrcTransaction != null ) {
// 为录制国标模拟一个鉴权信息, 方便后续写入录像文件时使用
StreamAuthorityInfo streamAuthorityInfo = StreamAuthorityInfo.getInstanceByHook(app, stream, mediaServer.getId());
streamAuthorityInfo.setApp(app);
streamAuthorityInfo.setStream(ssrcTransaction.getStream());
streamAuthorityInfo.setCallId(ssrcTransaction.getSipTransactionInfo().getCallId());
redisCatchStorage.updateStreamAuthorityInfo(app, ssrcTransaction.getStream(), streamAuthorityInfo);
String deviceId = ssrcTransaction.getDeviceId();
Integer channelId = ssrcTransaction.getChannelId();
DeviceChannel deviceChannel = deviceChannelService.getOneForSourceById(channelId);
if (deviceChannel != null) {
result.setEnable_audio(deviceChannel.isHasAudio());
}
// 如果是录像下载就设置视频间隔十秒
if (ssrcTransaction.getType() == InviteSessionType.DOWNLOAD) {
// 获取录像的总时长然后设置为这个视频的时长
InviteInfo inviteInfoForDownload = inviteStreamService.getInviteInfo(InviteSessionType.DOWNLOAD, channelId, stream);
if (inviteInfoForDownload != null) {
String startTime = inviteInfoForDownload.getStartTime();
String endTime = inviteInfoForDownload.getEndTime();
long difference = DateUtil.getDifference(startTime, endTime) / 1000;
result.setMp4_max_second((int) difference);
result.setEnable_mp4(true);
// 设置为2保证得到的mp4的时长是正常的
result.setModify_stamp(2);
}
}
// 如果是talk对讲则默认获取声音
if (ssrcTransaction.getType() == InviteSessionType.TALK) {
result.setEnable_audio(true);
}
}
} else if (app.equals(MediaStreamUtil.GB28181_BROADCAST)) {
result.setEnable_audio(true);
result.setEnable_mp4(userSetting.getRecordSip());
} else if (app.equals(MediaStreamUtil.GB28181_TALK)) {
result.setEnable_audio(true);
result.setEnable_mp4(userSetting.getRecordSip());
}else {
result.setEnable_mp4(userSetting.getRecordPushLive()); result.setEnable_mp4(userSetting.getRecordPushLive());
return result;
} }
if (app.equalsIgnoreCase(MediaStreamUtil.RTP_APP)) {
String receiveKey = VideoManagerConstants.WVP_OTHER_RECEIVE_RTP_INFO + userSetting.getServerId() + "_" + stream;
OtherRtpSendInfo otherRtpSendInfo = (OtherRtpSendInfo) redisTemplate.opsForValue().get(receiveKey);
String receiveKeyForPS = VideoManagerConstants.WVP_OTHER_RECEIVE_PS_INFO + userSetting.getServerId() + "_" + stream;
OtherPsSendInfo otherPsSendInfo = (OtherPsSendInfo) redisTemplate.opsForValue().get(receiveKeyForPS);
if (otherRtpSendInfo != null || otherPsSendInfo != null) {
result.setEnable_mp4(true);
}
}
return result;
} }
@Override @Override
@ -269,7 +186,7 @@ public class MediaServiceImpl implements IMediaService {
} }
}else if (MediaStreamUtil.GB28181_TALK.equals(app) || MediaStreamUtil.GB28181_BROADCAST.equals(app)) { }else if (MediaStreamUtil.GB28181_TALK.equals(app) || MediaStreamUtil.GB28181_BROADCAST.equals(app)) {
return false; return false;
} else if ("mp4_record".equals(app)) { } else if (MediaStreamUtil.LOAD_MP4_APP.equals(app)) {
return true; return true;
} else { } else {
// 非国标流 推流/拉流代理 // 非国标流 推流/拉流代理

View File

@ -7,6 +7,7 @@ import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.bean.ResultForOnPublish;
import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookData; import com.genersoft.iot.vmp.media.event.hook.HookData;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe; import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
@ -14,12 +15,12 @@ import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent; import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent; import com.genersoft.iot.vmp.media.event.media.MediaDepartureEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService; import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.hook.HookResultForOnPublish;
import com.genersoft.iot.vmp.service.IReceiveRtpServerService; import com.genersoft.iot.vmp.service.IReceiveRtpServerService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode; import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.service.bean.RTPServerParam; import com.genersoft.iot.vmp.service.bean.RTPServerParam;
import com.genersoft.iot.vmp.service.bean.SSRCInfo; import com.genersoft.iot.vmp.service.bean.SSRCInfo;
import com.genersoft.iot.vmp.utils.DateUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener; import org.springframework.context.event.EventListener;
@ -132,7 +133,7 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
@Override @Override
public SSRCInfo openGbRTPServerForPlay(MediaServer mediaServer, Device device, DeviceChannel channel, public SSRCInfo openGbRTPServerForPlay(MediaServer mediaServer, Device device, DeviceChannel channel,
String presetSSRC, ErrorCallback<OpenRTPServerResult> callback) { String presetSSRC, boolean record, ErrorCallback<OpenRTPServerResult> callback) {
if (callback == null) { if (callback == null) {
log.warn("[开启国标点播RTP收流] 失败回调为NULL"); log.warn("[开启国标点播RTP收流] 失败回调为NULL");
return null; return null;
@ -142,9 +143,6 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
return null; return null;
} }
String streamId = String.format("%s_%s", device.getDeviceId(), channel.getDeviceId());
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
// 获取 mediaServer 可用的 ssrc // 获取 mediaServer 可用的 ssrc
final String ssrc; final String ssrc;
if (presetSSRC != null) { if (presetSSRC != null) {
@ -152,6 +150,18 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
}else { }else {
ssrc = ssrcFactory.getPlaySsrc(mediaServer.getId()); ssrc = ssrcFactory.getPlaySsrc(mediaServer.getId());
} }
String streamId;
String streamReplace = null;
if (mediaServer.isRtpEnable()) {
streamId = String.format("%s_%s", device.getDeviceId(), channel.getDeviceId());
}else {
streamId = String.format("%08x", Long.parseLong(ssrc)).toUpperCase();
streamReplace = String.format("%s_%s", device.getDeviceId(), channel.getDeviceId());
}
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
if (device.isSsrcCheck() && tcpMode > 0) { if (device.isSsrcCheck() && tcpMode > 0) {
// 目前zlm不支持 tcp模式更新ssrc暂时关闭ssrc校验 // 目前zlm不支持 tcp模式更新ssrc暂时关闭ssrc校验
log.warn("[开启国标点播RTP收流] 平台对接时下级可能自定义ssrc但是tcp模式zlm收流目前无法更新ssrc可能收流超时此时请使用udp收流或者关闭ssrc校验"); log.warn("[开启国标点播RTP收流] 平台对接时下级可能自定义ssrc但是tcp模式zlm收流目前无法更新ssrc可能收流超时此时请使用udp收流或者关闭ssrc校验");
@ -159,17 +169,18 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
Long checkSsrc = device.isSsrcCheck() ? Long.parseLong(ssrc) : 0L; Long checkSsrc = device.isSsrcCheck() ? Long.parseLong(ssrc) : 0L;
SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamId); SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamReplace != null ? streamReplace : streamId);
if (presetSSRC == null) { if (presetSSRC == null) {
ssrcInfo.setAllocatedSsrc(ssrc); ssrcInfo.setAllocatedSsrc(ssrc);
} }
openRtpServer(mediaServer, ssrcInfo, checkSsrc, !channel.isHasAudio(), false, tcpMode, callback); openRtpServer(mediaServer, ssrcInfo, checkSsrc, !channel.isHasAudio(), false, tcpMode, callback);
addAuthenticateInfo(streamId, streamReplace, !channel.isHasAudio(), record, null);
return ssrcInfo; return ssrcInfo;
} }
@Override @Override
public SSRCInfo openGbRTPServerForPlayback(MediaServer mediaServer, Device device, DeviceChannel channel, public SSRCInfo openGbRTPServerForPlayback(MediaServer mediaServer, Device device, DeviceChannel channel,
String startTimeStr, String endTimeTimeStr, ErrorCallback<OpenRTPServerResult> callback) { String startTime, String endTime, ErrorCallback<OpenRTPServerResult> callback) {
if (callback == null) { if (callback == null) {
log.warn("[开启国标回放RTP收流] 失败回调为NULL"); log.warn("[开启国标回放RTP收流] 失败回调为NULL");
return null; return null;
@ -179,11 +190,19 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
return null; return null;
} }
String streamId = device.getDeviceId() + "_" + channel.getDeviceId() + "_" + startTimeStr + "_" + endTimeTimeStr;
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
// 获取 mediaServer 可用的 ssrc // 获取 mediaServer 可用的 ssrc
final String ssrc = ssrcFactory.getPlayBackSsrc(mediaServer.getId()); String ssrc = ssrcFactory.getPlayBackSsrc(mediaServer.getId());
String streamId;
String streamReplace = null;
if (mediaServer.isRtpEnable()) {
streamId = getPlaybackStream(device, channel, startTime, endTime);
}else {
streamId = String.format("%08x", Long.parseLong(ssrc)).toUpperCase();
streamReplace = getPlaybackStream(device, channel, startTime, endTime);
}
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
if (device.isSsrcCheck() && tcpMode > 0) { if (device.isSsrcCheck() && tcpMode > 0) {
// 目前zlm不支持 tcp模式更新ssrc暂时关闭ssrc校验 // 目前zlm不支持 tcp模式更新ssrc暂时关闭ssrc校验
@ -192,15 +211,28 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
Long checkSsrc = device.isSsrcCheck() ? Long.parseLong(ssrc) : 0L; Long checkSsrc = device.isSsrcCheck() ? Long.parseLong(ssrc) : 0L;
SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamId); SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamReplace != null ? streamReplace : streamId);
ssrcInfo.setAllocatedSsrc(ssrc); ssrcInfo.setAllocatedSsrc(ssrc);
openRtpServer(mediaServer, ssrcInfo, checkSsrc, !channel.isHasAudio(), false, tcpMode, callback); openRtpServer(mediaServer, ssrcInfo, checkSsrc, !channel.isHasAudio(), false, tcpMode, callback);
addAuthenticateInfo(streamId, streamReplace, !channel.isHasAudio(), false,null);
return ssrcInfo; return ssrcInfo;
} }
@Override
public String getPlaybackStream(Device device, DeviceChannel channel, String startTime, String endTime) {
String startTimeStr = startTime.replace("-", "")
.replace(":", "")
.replace(" ", "");
String endTimeTimeStr = endTime.replace("-", "")
.replace(":", "")
.replace(" ", "");
return device.getDeviceId() + "_" + channel.getDeviceId() + "_" + startTimeStr + "_" + endTimeTimeStr;
}
@Override @Override
public SSRCInfo openGbRTPServerForDownload(MediaServer mediaServer, Device device, DeviceChannel channel, public SSRCInfo openGbRTPServerForDownload(MediaServer mediaServer, Device device, DeviceChannel channel,
ErrorCallback<OpenRTPServerResult> callback) { String startTime, String endTime, ErrorCallback<OpenRTPServerResult> callback) {
if (callback == null) { if (callback == null) {
log.warn("[开启国标录像下载RTP收流] 失败回调为NULL"); log.warn("[开启国标录像下载RTP收流] 失败回调为NULL");
return null; return null;
@ -213,8 +245,8 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0); int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
// 获取 mediaServer 可用的 ssrc // 获取 mediaServer 可用的 ssrc
String ssrc = ssrcFactory.getPlaySsrc(mediaServer.getId()); String ssrc = ssrcFactory.getPlayBackSsrc(mediaServer.getId());
String streamId = String.format("%08x", Long.parseLong(ssrc)).toUpperCase();; String streamId = String.format("%08x", Long.parseLong(ssrc)).toUpperCase();
if (device.isSsrcCheck() && tcpMode > 0) { if (device.isSsrcCheck() && tcpMode > 0) {
// 目前zlm不支持 tcp模式更新ssrc暂时关闭ssrc校验 // 目前zlm不支持 tcp模式更新ssrc暂时关闭ssrc校验
log.warn("[开启国标录像下载RTP收流] 平台对接时下级可能自定义ssrc但是tcp模式zlm收流目前无法更新ssrc可能收流超时此时请使用udp收流或者关闭ssrc校验"); log.warn("[开启国标录像下载RTP收流] 平台对接时下级可能自定义ssrc但是tcp模式zlm收流目前无法更新ssrc可能收流超时此时请使用udp收流或者关闭ssrc校验");
@ -225,6 +257,10 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamId); SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamId);
ssrcInfo.setAllocatedSsrc(ssrc); ssrcInfo.setAllocatedSsrc(ssrc);
openRtpServer(mediaServer, ssrcInfo, checkSsrc, !channel.isHasAudio(), false, tcpMode, callback); openRtpServer(mediaServer, ssrcInfo, checkSsrc, !channel.isHasAudio(), false, tcpMode, callback);
long difference = DateUtil.getDifference(startTime, endTime) / 1000;
addAuthenticateInfo(streamId, null, !channel.isHasAudio(), true, (int) difference);
return ssrcInfo; return ssrcInfo;
} }
@ -257,7 +293,6 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
// 获取 mediaServer 可用的 ssrc // 获取 mediaServer 可用的 ssrc
String ssrc = ssrcFactory.getPlaySsrc(mediaServer.getId()); String ssrc = ssrcFactory.getPlaySsrc(mediaServer.getId());
SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamId); SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaStreamUtil.RTP_APP, streamId);
ssrcInfo.setAllocatedSsrc(ssrc); ssrcInfo.setAllocatedSsrc(ssrc);
openRtpServer(mediaServer, ssrcInfo, 0L, false, true, tcpMode, callback); openRtpServer(mediaServer, ssrcInfo, 0L, false, true, tcpMode, callback);
@ -354,29 +389,38 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
closeRTPServer(mediaServer, app, stream); closeRTPServer(mediaServer, app, stream);
} }
private void addAuthenticateInfoForGb28181(MediaServer mediaServer, String streamId, Boolean enableMp4, Boolean enableAudio, String mapSavePath, Integer modifyStamp) { @Override
public void addAuthenticateInfoForGb28181Talk(MediaServer mediaServer, String streamId) {
String streamReplace = null; String streamReplace = null;
if (!mediaServer.isRtpEnable() ) { if (!mediaServer.isRtpEnable() ) {
streamReplace = streamId; streamReplace = streamId;
} }
addAuthenticateInfo(streamId, streamReplace, enableAudio, enableMp4, mapSavePath, modifyStamp); addAuthenticateInfo(streamId, streamReplace, true, false, null);
} }
private void addAuthenticateInfo(String streamId, String streamReplace, Boolean enableAudio, Boolean enableMp4, String mapSavePath, Integer modifyStamp) { @Override
HookResultForOnPublish hookResultForOnPublish = new HookResultForOnPublish(); public void addAuthenticateInfo(String streamId, String streamReplace, Boolean enableAudio, Boolean enableMp4, Integer mp4MaxSecond) {
hookResultForOnPublish.setCode(0); ResultForOnPublish hookResultForOnPublish = new ResultForOnPublish();
hookResultForOnPublish.setMsg("success");
hookResultForOnPublish.setStream_replace(streamReplace); hookResultForOnPublish.setStream_replace(streamReplace);
hookResultForOnPublish.setEnable_audio(enableAudio); hookResultForOnPublish.setEnable_audio(enableAudio);
hookResultForOnPublish.setEnable_mp4(enableMp4); hookResultForOnPublish.setEnable_mp4(enableMp4);
hookResultForOnPublish.setMp4_save_path(mapSavePath); hookResultForOnPublish.setMp4_max_second(mp4MaxSecond);
hookResultForOnPublish.setModify_stamp(modifyStamp);
String key = String.format("%s:%s", VideoManagerConstants.RTP_AUTHENTICATE, streamId); String key = String.format("%s:%s", VideoManagerConstants.RTP_AUTHENTICATE, streamId);
// 存储认证信息过期时间为60秒 过期则无法通过认证 // 存储认证信息过期时间为60秒 过期则无法通过认证
redisTemplate.opsForValue().set(key, hookResultForOnPublish); redisTemplate.opsForValue().set(key, hookResultForOnPublish);
redisTemplate.expire(key, 60, TimeUnit.SECONDS); redisTemplate.expire(key, 60, TimeUnit.SECONDS);
} }
@Override
public ResultForOnPublish getAuthenticateInfo(String streamId) {
String key = String.format("%s:%s", VideoManagerConstants.RTP_AUTHENTICATE, streamId);
Object obj = redisTemplate.opsForValue().get(key);
if (obj instanceof ResultForOnPublish) {
return (ResultForOnPublish) obj;
}
return null;
}
} }

View File

@ -133,6 +133,10 @@ public class PsController {
if (rtpServerPort == 0) { if (rtpServerPort == 0) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败"); throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败");
} }
// 补充鉴权参数
receiveRtpServerService.addAuthenticateInfo(stream, null, false, false, null);
OtherPsSendInfo otherPsSendInfo = new OtherPsSendInfo(); OtherPsSendInfo otherPsSendInfo = new OtherPsSendInfo();
otherPsSendInfo.setReceiveIp(mediaServer.getSdpIp()); otherPsSendInfo.setReceiveIp(mediaServer.getSdpIp());
otherPsSendInfo.setReceivePort(rtpServerPort); otherPsSendInfo.setReceivePort(rtpServerPort);

View File

@ -126,7 +126,8 @@ public class RtpController {
log.info("[开启收流和获取发流信息] 视频流收流失败callId->{}stream->{}", callId, stream); log.info("[开启收流和获取发流信息] 视频流收流失败callId->{}stream->{}", callId, stream);
} }
})); }));
// 补充鉴权参数
receiveRtpServerService.addAuthenticateInfo(stream, null, false, false, null);
rtpServerParam.setStreamId(stream + "_a"); rtpServerParam.setStreamId(stream + "_a");
int rtpServerPortForAudio = receiveRtpServerService.openCommonRTPServer(rtpServerParam, ((code, msg, data) -> { int rtpServerPortForAudio = receiveRtpServerService.openCommonRTPServer(rtpServerParam, ((code, msg, data) -> {
@ -136,6 +137,10 @@ public class RtpController {
log.info("[开启收流和获取发流信息] 音频流收流失败callId->{}stream->{}", callId, stream); log.info("[开启收流和获取发流信息] 音频流收流失败callId->{}stream->{}", callId, stream);
} }
})); }));
// 补充鉴权参数
receiveRtpServerService.addAuthenticateInfo(rtpServerParam.getStreamId(), null, true, false, null);
if (rtpServerPortForVideo == 0 || rtpServerPortForAudio == 0) { if (rtpServerPortForVideo == 0 || rtpServerPortForAudio == 0) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败"); throw new ControllerException(ErrorCode.ERROR100.getCode(), "获取端口失败");
} }

View File

@ -157,7 +157,7 @@ export function startPlayback(params) {
const { phoneNumber, channelId, startTime, endTime, type, rate, playbackType, playbackSpeed } = params const { phoneNumber, channelId, startTime, endTime, type, rate, playbackType, playbackSpeed } = params
return request({ return request({
method: 'get', method: 'get',
url: '/api/jt1078/playback/start/', url: '/api/jt1078/playback/start',
params: { params: {
phoneNumber: phoneNumber, phoneNumber: phoneNumber,
channelId: channelId, channelId: channelId,