优化国标RTP服务器接口,简化参数传递,调整关闭逻辑以提高代码可读性

This commit is contained in:
lin 2026-02-20 23:28:38 +08:00
parent 803c47a430
commit d65255deb4
4 changed files with 81 additions and 115 deletions

View File

@ -156,17 +156,13 @@ public class PlayServiceImpl implements IPlayService {
// 开启语音对讲通道 // 开启语音对讲通道
try { try {
audioBroadcastCmd(device, channel, event.getMediaServer(), audioBroadcastCmd(device, channel, event.getMediaServer(),
event.getApp(), event.getStream(), 60, false, (msg) -> { event.getApp(), event.getStream(), 60, false, (msg) -> log.info("[语音喊话] 通道建立成功, device: {}, channel: {}", deviceId, channelId));
log.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
});
} catch (InvalidArgumentException | ParseException | SipException e) { } catch (InvalidArgumentException | ParseException | SipException e) {
log.error("[命令发送失败] 语音对讲: {}", e.getMessage()); log.error("[命令发送失败] 语音对讲: {}", e.getMessage());
} }
}else if ("talk".equals(event.getApp())) { }else if ("talk".equals(event.getApp())) {
// 开启语音对讲通道 // 开启语音对讲通道
talkCmd(device, channel, event.getMediaServer(), event.getStream(), (msg) -> { talkCmd(device, channel, event.getMediaServer(), event.getStream(), (msg) -> log.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId));
log.info("[语音对讲] 通道建立成功, device: {}, channel: {}", deviceId, channelId);
});
} }
} }
} }
@ -378,33 +374,14 @@ public class PlayServiceImpl implements IPlayService {
} }
} }
// 获取mediaServer可用的ssrc
final String finalSsrc;
if (ssrc != null) {
finalSsrc = ssrc;
}else {
finalSsrc = ssrcFactory.getPlaySsrc(mediaServer.getId());
}
String streamId = String.format("%s_%s", device.getDeviceId(), channel.getDeviceId()); String streamId = String.format("%s_%s", device.getDeviceId(), channel.getDeviceId());
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0); int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
RTPServerParam rtpServerParam = new RTPServerParam();
rtpServerParam.setMediaServer(mediaServer);
rtpServerParam.setApp(MediaApp.GB28181);
rtpServerParam.setStreamId(streamId);
if (device.isSsrcCheck()) {
rtpServerParam.setSsrc(ssrc);
}
rtpServerParam.setPort(0);
rtpServerParam.setTcpMode(tcpMode);
rtpServerParam.setOnlyAuto(false);
rtpServerParam.setDisableAudio(!channel.isHasAudio());
SSRCInfo ssrcInfo = receiveRtpServerService.openGbRTPServer(mediaServer, streamId, ssrc, tcpMode, false,
int port = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, result) -> { device.isSsrcCheck(), false, !channel.isHasAudio(), (code, msg, result) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) { if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) {
// hook响应 // hook 响应
StreamInfo streamInfo = onPublishHandlerForPlay(result.getHookData().getMediaServer(), result.getHookData().getMediaInfo(), device, channel); StreamInfo streamInfo = onPublishHandlerForPlay(result.getHookData().getMediaServer(), result.getHookData().getMediaInfo(), device, channel);
if (streamInfo == null){ if (streamInfo == null){
if (callback != null) { if (callback != null) {
@ -476,7 +453,7 @@ public class PlayServiceImpl implements IPlayService {
InviteOKHandler(eventResult, ssrcInfo, mediaServer, device, channel, callback, inviteInfo, InviteSessionType.PLAY); InviteOKHandler(eventResult, ssrcInfo, mediaServer, device, channel, callback, inviteInfo, InviteSessionType.PLAY);
}, (event) -> { }, (event) -> {
log.info("[点播失败]{}:{} deviceId: {}, channelId:{}",event.statusCode, event.msg, device.getDeviceId(), channel.getDeviceId()); log.info("[点播失败]{}:{} deviceId: {}, channelId:{}",event.statusCode, event.msg, device.getDeviceId(), channel.getDeviceId());
receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo); receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
if (callback != null) { if (callback != null) {
@ -489,7 +466,7 @@ public class PlayServiceImpl implements IPlayService {
}, userSetting.getPlayTimeout().longValue()); }, userSetting.getPlayTimeout().longValue());
} catch (InvalidArgumentException | SipException | ParseException e) { } catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 点播消息: {}", e.getMessage()); log.error("[命令发送失败] 点播消息: {}", e.getMessage());
receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo); receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
if (callback != null) { if (callback != null) {
callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(), callback.run(InviteErrorCode.ERROR_FOR_SIP_SENDING_FAILED.getCode(),
@ -662,7 +639,7 @@ public class PlayServiceImpl implements IPlayService {
log.info("[TCP主动连接对方] 结果: {}" , result); log.info("[TCP主动连接对方] 结果: {}" , result);
if (!result) { if (!result) {
// 主动连接失败结束流程 清理数据 // 主动连接失败结束流程 清理数据
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo); receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
callback.run(InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getCode(), callback.run(InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getCode(),
InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getMsg(), null); InviteErrorCode.ERROR_FOR_TCP_ACTIVE_CONNECTION_REFUSED_ERROR.getMsg(), null);
@ -672,7 +649,7 @@ public class PlayServiceImpl implements IPlayService {
} }
} catch (SdpException e) { } catch (SdpException e) {
log.error("[TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channel.getDeviceId(), e); log.error("[TCP主动连接对方] deviceId: {}, channelId: {}, 解析200OK的SDP信息失败", device.getDeviceId(), channel.getDeviceId(), e);
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo); receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
@ -775,7 +752,7 @@ public class PlayServiceImpl implements IPlayService {
playBack(newMediaServerItem, device, channel, startTime, endTime, callback); playBack(newMediaServerItem, device, channel, startTime, endTime, callback);
} }
private void playBack(MediaServer mediaServerItem, private void playBack(MediaServer mediaServer,
Device device, DeviceChannel channel, String startTime, Device device, DeviceChannel channel, String startTime,
String endTime, ErrorCallback<StreamInfo> callback) { String endTime, ErrorCallback<StreamInfo> callback) {
@ -789,17 +766,8 @@ public class PlayServiceImpl implements IPlayService {
String stream = device.getDeviceId() + "_" + channel.getDeviceId() + "_" + startTimeStr + "_" + endTimeTimeStr; String stream = device.getDeviceId() + "_" + channel.getDeviceId() + "_" + startTimeStr + "_" + endTimeTimeStr;
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0); int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
RTPServerParam rtpServerParam = new RTPServerParam(); SSRCInfo ssrcInfo = receiveRtpServerService.openGbRTPServer(mediaServer, stream, null, tcpMode, true,
rtpServerParam.setMediaServer(mediaServerItem); device.isSsrcCheck(), false, !channel.isHasAudio(), (code, msg, result) -> {
rtpServerParam.setApp(MediaApp.GB28181);
rtpServerParam.setStreamId(stream);
rtpServerParam.setSsrcCheck(device.isSsrcCheck());
rtpServerParam.setPlayback(true);
rtpServerParam.setPort(0);
rtpServerParam.setTcpMode(tcpMode);
rtpServerParam.setOnlyAuto(false);
rtpServerParam.setDisableAudio(!channel.isHasAudio());
SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, result) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) { if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) {
// hook响应 // hook响应
StreamInfo streamInfo = onPublishHandlerForPlayback(result.getHookData().getMediaServer(), result.getHookData().getMediaInfo(), device, channel, startTime, endTime); StreamInfo streamInfo = onPublishHandlerForPlayback(result.getHookData().getMediaServer(), result.getHookData().getMediaInfo(), device, channel, startTime, endTime);
@ -845,16 +813,16 @@ public class PlayServiceImpl implements IPlayService {
device.getDeviceId(), channel.getGbDeviceId(), startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(), device.getDeviceId(), channel.getGbDeviceId(), startTime, endTime, ssrcInfo.getPort(), device.getStreamMode(),
ssrcInfo.getSsrc(), device.isSsrcCheck()); ssrcInfo.getSsrc(), device.isSsrcCheck());
// 初始化redis中的invite消息状态 // 初始化redis中的invite消息状态
InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServerItem.getId(), InviteInfo inviteInfo = InviteInfo.getInviteInfo(device.getDeviceId(), channel.getId(), ssrcInfo.getStream(), ssrcInfo, mediaServer.getId(),
mediaServerItem.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK, mediaServer.getSdpIp(), ssrcInfo.getPort(), device.getStreamMode(), InviteSessionType.PLAYBACK,
InviteSessionStatus.ready, userSetting.getRecordSip()); InviteSessionStatus.ready, userSetting.getRecordSip());
inviteStreamService.updateInviteInfo(inviteInfo); inviteStreamService.updateInviteInfo(inviteInfo);
try { try {
cmder.playbackStreamCmd(mediaServerItem, ssrcInfo, device, channel, startTime, endTime, cmder.playbackStreamCmd(mediaServer, ssrcInfo, device, channel, startTime, endTime,
eventResult -> { eventResult -> {
// 处理收到200ok后的TCP主动连接以及SSRC不一致的问题 // 处理收到200ok后的TCP主动连接以及SSRC不一致的问题
InviteOKHandler(eventResult, ssrcInfo, mediaServerItem, device, channel, InviteOKHandler(eventResult, ssrcInfo, mediaServer, device, channel,
callback, inviteInfo, InviteSessionType.PLAYBACK); callback, inviteInfo, InviteSessionType.PLAYBACK);
}, eventResult -> { }, eventResult -> {
log.info("[录像回放] 失败,{} {}", eventResult.statusCode, eventResult.msg); log.info("[录像回放] 失败,{} {}", eventResult.statusCode, eventResult.msg);
@ -862,7 +830,7 @@ public class PlayServiceImpl implements IPlayService {
callback.run(eventResult.statusCode, eventResult.msg, null); callback.run(eventResult.statusCode, eventResult.msg, null);
} }
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo); receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo); inviteStreamService.removeInviteInfo(inviteInfo);
}, userSetting.getPlayTimeout().longValue()); }, userSetting.getPlayTimeout().longValue());
@ -871,7 +839,7 @@ public class PlayServiceImpl implements IPlayService {
if (callback != null) { if (callback != null) {
callback.run(InviteErrorCode.FAIL.getCode(), e.getMessage(), null); callback.run(InviteErrorCode.FAIL.getCode(), e.getMessage(), null);
} }
receiveRtpServerService.closeRTPServer(mediaServerItem, ssrcInfo); receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo); inviteStreamService.removeInviteInfo(inviteInfo);
} }
@ -1000,16 +968,9 @@ public class PlayServiceImpl implements IPlayService {
} }
int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0); int tcpMode = device.getStreamMode().equals("TCP-ACTIVE")? 2: (device.getStreamMode().equals("TCP-PASSIVE")? 1:0);
// 录像下载不使用固定流地址固定流地址会导致如果开始时间与结束时间一致时文件错误的叠加在一起
RTPServerParam rtpServerParam = new RTPServerParam(); SSRCInfo ssrcInfo = receiveRtpServerService.openGbRTPServer(mediaServer, null, null, tcpMode, false,
rtpServerParam.setMediaServer(mediaServer); device.isSsrcCheck(), false, !channel.isHasAudio(), (code, msg, result) -> {
rtpServerParam.setSsrcCheck(device.isSsrcCheck());
rtpServerParam.setPlayback(true);
rtpServerParam.setPort(0);
rtpServerParam.setTcpMode(tcpMode);
rtpServerParam.setOnlyAuto(false);
rtpServerParam.setDisableAudio(!channel.isHasAudio());
SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, result) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) { if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) {
// hook响应 // hook响应
StreamInfo streamInfo = onPublishHandlerForDownload(mediaServer, result.getHookData().getMediaInfo(), device, channel, startTime, endTime); StreamInfo streamInfo = onPublishHandlerForDownload(mediaServer, result.getHookData().getMediaInfo(), device, channel, startTime, endTime);
@ -1070,7 +1031,7 @@ public class PlayServiceImpl implements IPlayService {
eventResult -> { eventResult -> {
// 对方返回错误 // 对方返回错误
callback.run(InviteErrorCode.FAIL.getCode(), String.format("录像下载失败, 错误码: %s, %s", eventResult.statusCode, eventResult.msg), null); callback.run(InviteErrorCode.FAIL.getCode(), String.format("录像下载失败, 错误码: %s, %s", eventResult.statusCode, eventResult.msg), null);
receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo); receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo); inviteStreamService.removeInviteInfo(inviteInfo);
}, eventResult ->{ }, eventResult ->{
@ -1101,7 +1062,7 @@ public class PlayServiceImpl implements IPlayService {
} catch (InvalidArgumentException | SipException | ParseException e) { } catch (InvalidArgumentException | SipException | ParseException e) {
log.error("[命令发送失败] 录像下载: {}", e.getMessage()); log.error("[命令发送失败] 录像下载: {}", e.getMessage());
callback.run(InviteErrorCode.FAIL.getCode(),e.getMessage(), null); callback.run(InviteErrorCode.FAIL.getCode(),e.getMessage(), null);
receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo); receiveRtpServerService.closeRTPServer(mediaServer, ssrcInfo.getApp(), ssrcInfo.getStream());
sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream()); sessionManager.removeByStream(ssrcInfo.getApp(), ssrcInfo.getStream());
inviteStreamService.removeInviteInfo(inviteInfo); inviteStreamService.removeInviteInfo(inviteInfo);
} }
@ -1699,7 +1660,7 @@ public class PlayServiceImpl implements IPlayService {
deviceChannelService.stopPlay(channel.getId()); deviceChannelService.stopPlay(channel.getId());
} }
if (inviteInfo.getStreamInfo() != null) { if (inviteInfo.getStreamInfo() != null) {
receiveRtpServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getSsrcInfo()); receiveRtpServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), MediaApp.GB28181, stream);
} }
} }
} }
@ -1731,7 +1692,7 @@ public class PlayServiceImpl implements IPlayService {
deviceChannelService.stopPlay(channel.getId()); deviceChannelService.stopPlay(channel.getId());
} }
if (inviteInfo.getStreamInfo() != null) { if (inviteInfo.getStreamInfo() != null) {
receiveRtpServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), inviteInfo.getSsrcInfo()); receiveRtpServerService.closeRTPServer(inviteInfo.getStreamInfo().getMediaServer(), MediaApp.GB28181, inviteInfo.getStream());
} }
} }

View File

@ -240,20 +240,18 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
rtpServerParam.setMediaServer(mediaServer); rtpServerParam.setMediaServer(mediaServer);
rtpServerParam.setApp(MediaApp.JT1078); rtpServerParam.setApp(MediaApp.JT1078);
rtpServerParam.setStreamId(stream); rtpServerParam.setStreamId(stream);
rtpServerParam.setSsrcCheck(false);
rtpServerParam.setPlayback(false);
rtpServerParam.setPort(0); rtpServerParam.setPort(0);
rtpServerParam.setTcpMode(1); // 1 表示tcp被动 rtpServerParam.setTcpMode(1); // 1 表示tcp被动
rtpServerParam.setOnlyAuto(false); rtpServerParam.setOnlyAuto(false);
rtpServerParam.setDisableAudio(!channel.isHasAudio()); rtpServerParam.setDisableAudio(!channel.isHasAudio());
SSRCInfo ssrcInfo = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, result) -> { int port = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, hookData) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && result != null && result.getHookData() != null) { if (code == InviteErrorCode.SUCCESS.getCode() && hookData != null ) {
// hook响应 // hook响应
log.info("[JT-点播] 点播成功, 手机号: {} 通道: {}", phoneNumber, channelId); log.info("[JT-点播] 点播成功, 手机号: {} 通道: {}", phoneNumber, channelId);
// TODO 发送9105 实时音视频传输状态通知 通知丢包率 // TODO 发送9105 实时音视频传输状态通知 通知丢包率
StreamInfo info = onPublishHandler(mediaServer, result.getHookData(), phoneNumber, channelId); StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) { for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
if (errorCallback == null) { if (errorCallback == null) {
@ -280,18 +278,18 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
stopPlay(phoneNumber, channelId); stopPlay(phoneNumber, channelId);
} }
}); });
if (ssrcInfo == null) { if (port <= 0) {
stopPlay(phoneNumber, channelId); stopPlay(phoneNumber, channelId);
return; return;
} }
log.info("[JT-点播] phoneNumber {} channelId {}IP: {}, 端口: {}", phoneNumber, channelId, mediaServer.getSdpIp(), ssrcInfo.getPort()); log.info("[JT-点播] phoneNumber {} channelId {}IP: {}, 端口: {}", phoneNumber, channelId, mediaServer.getSdpIp(), port);
J9101 j9101 = new J9101(); J9101 j9101 = new J9101();
j9101.setChannel(channelId); j9101.setChannel(channelId);
j9101.setIp(mediaServer.getSdpIp()); j9101.setIp(mediaServer.getSdpIp());
j9101.setRate(1); j9101.setRate(1);
j9101.setTcpPort(ssrcInfo.getPort()); j9101.setTcpPort(port);
j9101.setUdpPort(ssrcInfo.getPort()); j9101.setUdpPort(port);
j9101.setType(type); j9101.setType(type);
jt1078Template.startLive(phoneNumber, j9101, 6); jt1078Template.startLive(phoneNumber, j9101, 6);
} }
@ -327,7 +325,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
// 删除缓存数据 // 删除缓存数据
if (streamInfo != null) { if (streamInfo != null) {
// 关闭rtpServer // 关闭rtpServer
receiveRtpServerService.closeRTPServer(streamInfo.getMediaServer(), new SSRCInfo(streamInfo.getApp(), streamInfo.getStream())); receiveRtpServerService.closeRTPServer(streamInfo.getMediaServer(), streamInfo.getApp(), streamInfo.getStream());
redisTemplate.delete(playKey); redisTemplate.delete(playKey);
} }
@ -435,7 +433,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playbackKey); StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playbackKey);
if (streamInfo != null) { if (streamInfo != null) {
mediaServerService.closeJTTServer(streamInfo.getMediaServer(), streamInfo.getStream()); receiveRtpServerService.closeRTPServer(streamInfo.getMediaServer(), streamInfo.getApp(), streamInfo.getStream());
// 清理数据 // 清理数据
redisTemplate.delete(playbackKey); redisTemplate.delete(playbackKey);
} }
@ -455,36 +453,41 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
} }
return; return;
} }
// 设置hook监听
Hook hookSubscribe = Hook.getInstance(HookType.on_media_arrival, app, stream, mediaServer.getId());
subscribe.addSubscribe(hookSubscribe, (hookData) -> {
dynamicTask.stop(playbackKey);
log.info("[JT-回放] 回放成功, logInfo {}", logInfo);
StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
if (errorCallback == null) {
continue;
}
errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info));
}
subscribe.removeSubscribe(hookSubscribe);
redisTemplate.opsForValue().set(playbackKey, info);
});
// 设置超时监听
dynamicTask.startDelay(playbackKey, () -> {
log.info("[JT-回放] 回放超时, logInfo {}", logInfo);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null));
}
mediaServerService.closeJTTServer(mediaServer, stream, null);
subscribe.removeSubscribe(hookSubscribe);
}, userSetting.getPlayTimeout());
// 开启收流端口 // 开启收流端口
SSRCInfo ssrcInfo = mediaServerService.openJTTServer(mediaServer, stream, null, false, !channel.isHasAudio(), 1); RTPServerParam rtpServerParam = new RTPServerParam();
log.info("[JT-回放] logInfo {} 端口: {}", logInfo, ssrcInfo.getPort()); rtpServerParam.setMediaServer(mediaServer);
rtpServerParam.setApp(MediaApp.JT1078);
rtpServerParam.setStreamId(stream);
rtpServerParam.setPort(0);
rtpServerParam.setTcpMode(1); // 1 表示tcp被动
rtpServerParam.setOnlyAuto(false);
rtpServerParam.setDisableAudio(!channel.isHasAudio());
int port = receiveRtpServerService.openRTPServer(rtpServerParam, (code, msg, hookData) -> {
if (code == InviteErrorCode.SUCCESS.getCode() && hookData != null ) {
// hook 响应
log.info("[JT-回放] 回放成功, logInfo {}", logInfo);
StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
if (errorCallback == null) {
continue;
}
errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info));
}
redisTemplate.opsForValue().set(playbackKey, info);
}else {
log.info("[JT-回放] 回放超时, logInfo {}", logInfo);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null));
}
receiveRtpServerService.closeRTPServer(mediaServer, app, stream);
}
});
log.info("[JT-回放] logInfo {} 端口: {}", logInfo, port);
J9201 j9201 = new J9201(); J9201 j9201 = new J9201();
j9201.setChannel(channelId); j9201.setChannel(channelId);
j9201.setIp(mediaServer.getSdpIp()); j9201.setIp(mediaServer.getSdpIp());
@ -498,8 +501,8 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
j9201.setPlaybackSpeed(playbackSpeed); j9201.setPlaybackSpeed(playbackSpeed);
} }
j9201.setTcpPort(ssrcInfo.getPort()); j9201.setTcpPort(port);
j9201.setUdpPort(ssrcInfo.getPort()); j9201.setUdpPort(port);
j9201.setType(type); j9201.setType(type);
j9201.setStartTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime)); j9201.setStartTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(startTime));
j9201.setEndTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime)); j9201.setEndTime(DateUtil.yyyy_MM_dd_HH_mm_ssTo1078(endTime));
@ -519,7 +522,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
// 删除缓存数据 // 删除缓存数据
if (streamInfo != null) { if (streamInfo != null) {
// 关闭rtpServer // 关闭rtpServer
mediaServerService.closeJTTServer(streamInfo.getMediaServer(), streamInfo.getStream(), null); receiveRtpServerService.closeRTPServer(streamInfo.getMediaServer(), streamInfo.getApp(), streamInfo.getStream());
} }
// 清理回调 // 清理回调
List<CommonCallback<WVPResult<StreamInfo>>> generalCallbacks = inviteErrorCallbackMap.get(playKey); List<CommonCallback<WVPResult<StreamInfo>>> generalCallbacks = inviteErrorCallbackMap.get(playKey);
@ -672,8 +675,8 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
// 删除缓存数据 // 删除缓存数据
if (streamInfo != null) { if (streamInfo != null) {
redisTemplate.delete(playKey); redisTemplate.delete(playKey);
// 关闭rtpServer // 关闭 rtpServer
mediaServerService.closeJTTServer(streamInfo.getMediaServer(), streamInfo.getStream(), null); receiveRtpServerService.closeRTPServer(streamInfo.getMediaServer(), streamInfo.getApp(), streamInfo.getStream());
} }
// 清理回调 // 清理回调
List<CommonCallback<WVPResult<StreamInfo>>> generalCallbacks = inviteErrorCallbackMap.get(playKey); List<CommonCallback<WVPResult<StreamInfo>>> generalCallbacks = inviteErrorCallbackMap.get(playKey);

View File

@ -10,6 +10,10 @@ import com.genersoft.iot.vmp.service.bean.SSRCInfo;
public interface IReceiveRtpServerService { public interface IReceiveRtpServerService {
SSRCInfo openGbRTPServer(MediaServer mediaServer, String streamId, String presetSSRC, int tcpMode,
boolean playback, boolean ssrcCheck, boolean onlyAuto, boolean disableAuto,
ErrorCallback<OpenRTPServerResult> callback);
int openRTPServer(RTPServerParam rtpServerParam, ErrorCallback<HookData> callback); int openRTPServer(RTPServerParam rtpServerParam, ErrorCallback<HookData> callback);
void closeRTPServer(MediaServer mediaServer, String app, String stream); void closeRTPServer(MediaServer mediaServer, String app, String stream);

View File

@ -1,13 +1,11 @@
package com.genersoft.iot.vmp.service.impl; package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.common.enums.MediaApp; import com.genersoft.iot.vmp.common.enums.MediaApp;
import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.OpenRTPServerResult; import com.genersoft.iot.vmp.gb28181.bean.OpenRTPServerResult;
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory; import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer; import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook; import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookData; import com.genersoft.iot.vmp.media.event.hook.HookData;
@ -28,7 +26,6 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.Objects; import java.util.Objects;
import java.util.UUID;
@Slf4j @Slf4j
@Service @Service
@ -72,9 +69,10 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
} }
@Override
public SSRCInfo openGbRTPServer(MediaServer mediaServer, String streamId, String presetSSRC, int tcpMode, public SSRCInfo openGbRTPServer(MediaServer mediaServer, String streamId, String presetSSRC, int tcpMode,
boolean playback, boolean ssrcCheck, boolean onlyAuto, boolean disableAuto, boolean reUsePort, boolean playback, boolean ssrcCheck, boolean onlyAuto, boolean disableAuto,
ErrorCallback<OpenRTPServerResult> callback ) { ErrorCallback<OpenRTPServerResult> callback) {
if (callback == null) { if (callback == null) {
log.warn("[开启国标RTP收流] 失败回调为NULL"); log.warn("[开启国标RTP收流] 失败回调为NULL");
return null; return null;
@ -84,7 +82,7 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
return null; return null;
} }
// 获取mediaServer可用的ssrc // 获取 mediaServer 可用的 ssrc
final String ssrc; final String ssrc;
if (presetSSRC != null) { if (presetSSRC != null) {
ssrc = presetSSRC; ssrc = presetSSRC;
@ -104,7 +102,7 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
} }
SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaApp.GB28181, streamId); SSRCInfo ssrcInfo = new SSRCInfo(0, ssrc, MediaApp.GB28181, streamId);
RTPServerParam rtpServerParam = new RTPServerParam(mediaServer, MediaApp.GB28181, streamId, ssrcCheck ? Long.parseLong(ssrc): 0L, null, onlyAuto, disableAuto, reUsePort, tcpMode); RTPServerParam rtpServerParam = new RTPServerParam(mediaServer, MediaApp.GB28181, streamId, ssrcCheck ? Long.parseLong(ssrc): 0L, null, onlyAuto, disableAuto, false, tcpMode);
int rtpServerPort = openRTPServer(rtpServerParam, ((code, msg, data) -> { int rtpServerPort = openRTPServer(rtpServerParam, ((code, msg, data) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) { if (code == InviteErrorCode.SUCCESS.getCode()) {
OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult(); OpenRTPServerResult openRTPServerResult = new OpenRTPServerResult();