[1078] 支持语音对讲 中心广播模式

This commit is contained in:
lin 2025-07-28 17:50:03 +08:00
parent 4fb04776f2
commit 4c97022c78
22 changed files with 316 additions and 241 deletions

View File

@ -12,89 +12,48 @@ public class JTDevice {
private int id; private int id;
/**
* 省域ID
*/
@Schema(description = "省域ID") @Schema(description = "省域ID")
private String provinceId; private String provinceId;
/**
* 省域文字描述
*/
@Schema(description = "省域文字描述") @Schema(description = "省域文字描述")
private String provinceText; private String provinceText;
/**
* 市县域ID
*/
@Schema(description = "市县域ID") @Schema(description = "市县域ID")
private String cityId; private String cityId;
/**
* 市县域文字描述
*/
@Schema(description = "市县域文字描述") @Schema(description = "市县域文字描述")
private String cityText; private String cityText;
/**
* 制造商ID
*/
@Schema(description = "制造商ID") @Schema(description = "制造商ID")
private String makerId; private String makerId;
/**
* 终端型号
*/
@Schema(description = "终端型号") @Schema(description = "终端型号")
private String model; private String model;
/**
* 终端手机号
*/
@Schema(description = "终端手机号") @Schema(description = "终端手机号")
private String phoneNumber; private String phoneNumber;
/**
* 终端ID
*/
@Schema(description = "终端ID") @Schema(description = "终端ID")
private String terminalId; private String terminalId;
/**
* 车牌颜色
*/
@Schema(description = "车牌颜色") @Schema(description = "车牌颜色")
private int plateColor; private int plateColor;
/**
* 车牌
*/
@Schema(description = "车牌") @Schema(description = "车牌")
private String plateNo; private String plateNo;
/**
* 鉴权码
*/
@Schema(description = "鉴权码") @Schema(description = "鉴权码")
private String authenticationCode; private String authenticationCode;
/**
* 经度
*/
@Schema(description = "经度") @Schema(description = "经度")
private Double longitude; private Double longitude;
/**
* 纬度
*/
@Schema(description = "纬度") @Schema(description = "纬度")
private Double latitude; private Double latitude;
@Schema(description = "注册时间") @Schema(description = "注册时间")
private String registerTime; private String registerTime;
@Schema(description = "创建时间") @Schema(description = "创建时间")
private String createTime; private String createTime;
@ -104,6 +63,15 @@ public class JTDevice {
@Schema(description = "状态") @Schema(description = "状态")
private boolean status; private boolean status;
@Schema(description = "设备使用的媒体id, 默认为null")
private String mediaServerId;
@Schema(description = "地理坐标系, 目前支持 WGS84,GCJ02")
private String geoCoordSys;
@Schema(description = "收流IP")
private String sdpIp;
@Override @Override
public String toString() { public String toString() {
return "JTDevice{" + return "JTDevice{" +

View File

@ -5,14 +5,12 @@ import io.netty.buffer.Unpooled;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import java.util.Stack;
/** /**
* 音视频通道 * 音视频通道
*/ */
@Setter @Setter
@Getter @Getter
public class JTChanel implements JTDeviceSubConfig{ public class JTChanelConfig implements JTDeviceSubConfig{
/** /**
* 物理通道号 单独 * 物理通道号 单独
@ -47,8 +45,8 @@ public class JTChanel implements JTDeviceSubConfig{
return byteBuf; return byteBuf;
} }
public static JTChanel decode(ByteBuf byteBuf) { public static JTChanelConfig decode(ByteBuf byteBuf) {
JTChanel jtChanel = new JTChanel(); JTChanelConfig jtChanel = new JTChanelConfig();
jtChanel.setPhysicalChannelId(byteBuf.readUnsignedByte()); jtChanel.setPhysicalChannelId(byteBuf.readUnsignedByte());
jtChanel.setLogicChannelId(byteBuf.readUnsignedByte()); jtChanel.setLogicChannelId(byteBuf.readUnsignedByte());
jtChanel.setChannelType(byteBuf.readUnsignedByte()); jtChanel.setChannelType(byteBuf.readUnsignedByte());

View File

@ -30,7 +30,7 @@ public class JTChannelListParam implements JTDeviceSubConfig{
*/ */
private int videoCount; private int videoCount;
private List<JTChanel> chanelList; private List<JTChanelConfig> chanelList;
@Override @Override
@ -39,7 +39,7 @@ public class JTChannelListParam implements JTDeviceSubConfig{
byteBuf.writeByte(videoAndAudioCount); byteBuf.writeByte(videoAndAudioCount);
byteBuf.writeByte(audioCount); byteBuf.writeByte(audioCount);
byteBuf.writeByte(videoCount); byteBuf.writeByte(videoCount);
for (JTChanel jtChanel : chanelList) { for (JTChanelConfig jtChanel : chanelList) {
byteBuf.writeBytes(jtChanel.encode()); byteBuf.writeBytes(jtChanel.encode());
} }
return byteBuf; return byteBuf;
@ -51,9 +51,9 @@ public class JTChannelListParam implements JTDeviceSubConfig{
channelListParam.setAudioCount(byteBuf.readUnsignedByte()); channelListParam.setAudioCount(byteBuf.readUnsignedByte());
channelListParam.setVideoCount(byteBuf.readUnsignedByte()); channelListParam.setVideoCount(byteBuf.readUnsignedByte());
int total = channelListParam.getVideoAndAudioCount() + channelListParam.getVideoCount() + channelListParam.getAudioCount(); int total = channelListParam.getVideoAndAudioCount() + channelListParam.getVideoCount() + channelListParam.getAudioCount();
List<JTChanel> chanelList = new ArrayList<>(total); List<JTChanelConfig> chanelList = new ArrayList<>(total);
for (int i = 0; i < total; i++) { for (int i = 0; i < total; i++) {
chanelList.add(JTChanel.decode(byteBuf)); chanelList.add(JTChanelConfig.decode(byteBuf));
} }
channelListParam.setChanelList(chanelList); channelListParam.setChanelList(chanelList);
return channelListParam; return channelListParam;

View File

@ -135,39 +135,13 @@ public class JT1078Controller {
@Operation(summary = "JT-语音对讲", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Operation(summary = "JT-语音对讲", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "phoneNumber", description = "设备手机号", required = true) @Parameter(name = "phoneNumber", description = "设备手机号", required = true)
@Parameter(name = "channelId", description = "通道国标编号, 一般为从1开始的数字", required = true) @Parameter(name = "channelId", description = "通道国标编号, 一般为从1开始的数字", required = true)
@Parameter(name = "app", description = "推流应用名", required = true)
@Parameter(name = "stream", description = "推流ID", required = true)
@Parameter(name = "mediaServerId", description = "流媒体ID", required = true)
@Parameter(name = "onlySend", description = "是否只发送", required = false)
@GetMapping("/talk/start") @GetMapping("/talk/start")
public DeferredResult<WVPResult<StreamContent>> startTalk(HttpServletRequest request, public StreamContent startTalk(HttpServletRequest request,
@Parameter(required = true) String phoneNumber, @Parameter(required = true) String phoneNumber,
@Parameter(required = true) Integer channelId, @Parameter(required = true) Integer channelId) {
@Parameter(required = true) String app,
@Parameter(required = true) String stream,
@Parameter(required = true) String mediaServerId,
@Parameter(required = false) Boolean onlySend) {
DeferredResult<WVPResult<StreamContent>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
result.onTimeout(()->{
log.info("[JT-语音对讲超时] phoneNumber{}, channelId{}, ", phoneNumber, channelId);
// 释放rtpserver
WVPResult<StreamContent> wvpResult = new WVPResult<>();
wvpResult.setCode(ErrorCode.ERROR100.getCode());
wvpResult.setMsg("超时");
result.setResult(wvpResult);
jt1078PlayService.stopPlay(phoneNumber, channelId);
});
jt1078PlayService.startTalk(phoneNumber, channelId, app, stream, mediaServerId, onlySend, wvpResult -> { StreamInfo streamInfo = jt1078PlayService.startTalk(phoneNumber, channelId);
WVPResult<StreamContent> wvpResultForFinish = new WVPResult<>();
wvpResultForFinish.setCode(wvpResult.getCode());
wvpResultForFinish.setMsg(wvpResult.getMsg());
if (wvpResult.getCode() == InviteErrorCode.SUCCESS.getCode()) {
if (wvpResult.getData() != null) {
StreamInfo streamInfo = wvpResult.getData();
if (userSetting.getUseSourceIpAsStreamIp()) { if (userSetting.getUseSourceIpAsStreamIp()) {
streamInfo = wvpResult.getData().clone();//深拷贝
String host; String host;
try { try {
URL url=new URL(request.getRequestURL().toString()); URL url=new URL(request.getRequestURL().toString());
@ -177,13 +151,8 @@ public class JT1078Controller {
} }
streamInfo.changeStreamIp(host); streamInfo.changeStreamIp(host);
} }
wvpResultForFinish.setData(new StreamContent(streamInfo)); streamInfo.setIp("localhost");
} return new StreamContent(streamInfo);
}
result.setResult(wvpResultForFinish);
});
return result;
} }
@Operation(summary = "JT-结束对讲", security = @SecurityRequirement(name = JwtUtils.HEADER)) @Operation(summary = "JT-结束对讲", security = @SecurityRequirement(name = JwtUtils.HEADER))

View File

@ -27,7 +27,7 @@ public interface Ijt1078PlayService {
void stopPlayback(String phoneNumber, Integer channelId); void stopPlayback(String phoneNumber, Integer channelId);
void startTalk(String phoneNumber, Integer channelId, String app, String stream, String mediaServerId, Boolean onlySend, CommonCallback<WVPResult<StreamInfo>> callback); StreamInfo startTalk(String phoneNumber, Integer channelId);
void stopTalk(String phoneNumber, Integer channelId); void stopTalk(String phoneNumber, Integer channelId);

View File

@ -7,6 +7,8 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.ftpServer.FtpSetting; import com.genersoft.iot.vmp.conf.ftpServer.FtpSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo; import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
import com.genersoft.iot.vmp.jt1078.bean.*; import com.genersoft.iot.vmp.jt1078.bean.*;
import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template; import com.genersoft.iot.vmp.jt1078.cmd.JT1078Template;
@ -41,6 +43,7 @@ import org.springframework.context.event.EventListener;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -51,6 +54,8 @@ import java.util.concurrent.ConcurrentHashMap;
@Slf4j @Slf4j
public class jt1078PlayServiceImpl implements Ijt1078PlayService { public class jt1078PlayServiceImpl implements Ijt1078PlayService {
public static final String talkApp = "jt_talk";
@Autowired @Autowired
private ISendRtpServerService sendRtpServerService; private ISendRtpServerService sendRtpServerService;
@ -81,8 +86,28 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
@Async("taskExecutor") @Async("taskExecutor")
@EventListener @EventListener
public void onApplicationEvent(MediaArrivalEvent event) { public void onApplicationEvent(MediaArrivalEvent event) {
if (event.getApp().equals(talkApp) && event.getStream().endsWith("_talk")) {
// 收到对JT讲的流
if (event.getStream().indexOf("_") <= 0) {
log.info("[JT-对讲流到来] 流格式有误stream应该为jt_[phoneNumber]_[channelId]_talk");
return;
}
String[] streamArray = event.getStream().split("_");
if (streamArray.length != 4) {
log.info("[JT-对讲流到来] 流格式有误stream应该为jt_[phoneNumber]_[channelId]_talk");
return;
}
String phoneNumber = streamArray[1];
String channelId = streamArray[2];
JTDevice device = jt1078Service.getDevice(phoneNumber);
if (device == null) {
log.info("[JT-对讲流到来] 未找到设备{}", phoneNumber);
return;
}
sendTalk(device, Integer.valueOf(channelId), event.getMediaServer(), event.getApp(), event.getStream());
} }
}
/** /**
* 流离开的处理 * 流离开的处理
@ -191,7 +216,12 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
redisTemplate.delete(playKey); redisTemplate.delete(playKey);
} }
MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); MediaServer mediaServer;
if (org.springframework.util.ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
} else {
mediaServer = mediaServerService.getOne(device.getMediaServerId());
}
if (mediaServer == null) { if (mediaServer == null) {
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) { for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo)); errorCallback.run(new WVPResult<>(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo));
@ -376,7 +406,12 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
String app = "1078"; String app = "1078";
String stream = String.format("%s_%s_%s_%s", phoneNumber, channelId, String stream = String.format("%s_%s_%s_%s", phoneNumber, channelId,
DateUtil.yyyy_MM_dd_HH_mm_ssToUrl(startTime), DateUtil.yyyy_MM_dd_HH_mm_ssToUrl(endTime)); DateUtil.yyyy_MM_dd_HH_mm_ssToUrl(startTime), DateUtil.yyyy_MM_dd_HH_mm_ssToUrl(endTime));
MediaServer mediaServer = mediaServerService.getMediaServerForMinimumLoad(null); MediaServer mediaServer;
if (org.springframework.util.ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
} else {
mediaServer = mediaServerService.getOne(device.getMediaServerId());
}
if (mediaServer == null) { if (mediaServer == null) {
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) { for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo)); errorCallback.run(new WVPResult<>(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo));
@ -483,25 +518,6 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
playbackControl(phoneNumber, channelId, 2, null, null); playbackControl(phoneNumber, channelId, 2, null, null);
} }
private final Map<String, CommonCallback<WVPResult<String>>> fileUploadMap = new ConcurrentHashMap<>();
@EventListener
public void onApplicationEvent(FtpUploadEvent event) {
if (fileUploadMap.isEmpty()) {
return;
}
fileUploadMap.keySet().forEach(key -> {
if (!event.getFileName().contains(key)) {
return;
}
CommonCallback<WVPResult<String>> callback = fileUploadMap.get(key);
if (callback != null) {
callback.run(new WVPResult<>(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), event.getFileName()));
fileUploadMap.remove(key);
}
});
}
/** /**
* 监听发流停止 * 监听发流停止
*/ */
@ -525,90 +541,82 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
} }
} }
@Override @Override
public void startTalk(String phoneNumber, Integer channelId, String app, String stream, String mediaServerId, Boolean onlySend, public StreamInfo startTalk(String phoneNumber, Integer channelId) {
CommonCallback<WVPResult<StreamInfo>> callback) {
// 检查流是否已经存在存在则返回 // 检查流是否已经存在存在则返回
String playKey = VideoManagerConstants.INVITE_INFO_1078_TALK + phoneNumber + ":" + channelId; String playKey = VideoManagerConstants.INVITE_INFO_1078_TALK + phoneNumber + ":" + channelId;
List<CommonCallback<WVPResult<StreamInfo>>> errorCallbacks = inviteErrorCallbackMap.computeIfAbsent(playKey, k -> new ArrayList<>());
errorCallbacks.add(callback);
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
if (streamInfo != null) { if (streamInfo != null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), "对讲进行中"); throw new ControllerException(ErrorCode.ERROR100.getCode(), "对讲进行中");
} }
String receiveStream = "jt_" + phoneNumber + "_" + channelId + "_talk"; JTDevice device = jt1078Service.getDevice(phoneNumber);
MediaServer mediaServer = mediaServerService.getOne(mediaServerId); Assert.notNull(device, "部标设备不存在");
if (mediaServer == null) {
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) { String stream = "jt_" + phoneNumber + "_" + channelId + "_talk";
errorCallback.run(new WVPResult<>(InviteErrorCode.FAIL.getCode(), "未找到可用的媒体节点", streamInfo));
MediaServer mediaServer;
if (org.springframework.util.ObjectUtils.isEmpty(device.getMediaServerId()) || "auto".equals(device.getMediaServerId())) {
mediaServer = mediaServerService.getMediaServerForMinimumLoad(null);
} else {
mediaServer = mediaServerService.getOne(device.getMediaServerId());
} }
return;
// 检查待发送的流是否存在
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, talkApp, stream);
Assert.isNull(mediaInfo, "对讲已经存在");
return mediaServerService.getStreamInfoByAppAndStream(mediaServer, talkApp, stream, null, null, null, false);
} }
private void sendTalk(JTDevice device, Integer channelId, MediaServer mediaServer, String app, String stream) {
// 检查待发送的流是否存在 // 检查待发送的流是否存在
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, app, stream); MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, app, stream);
if (mediaInfo == null) { if (mediaInfo == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), app + "/" + stream + "流不存在"); throw new ControllerException(ErrorCode.ERROR100.getCode(), app + "/" + stream + "流不存在");
} }
String phoneNumber = device.getPhoneNumber();
// 开启收流端口, zlm发送1078的rtp流需要将ssrc字段设置为 imei_channel格式 // 开启收流端口, zlm发送1078的rtp流需要将ssrc字段设置为 imei_channel格式
String ssrc = phoneNumber + "_" + channelId; String ssrc = device.getPhoneNumber() + "_" + channelId;
SendRtpInfo sendRtpInfo = sendRtpServerService.createSendRtpInfo(mediaServer, null, null, ssrc, phoneNumber, app, stream, channelId, true, false); SendRtpInfo sendRtpInfo = sendRtpServerService.createSendRtpInfo(mediaServer, null, null, ssrc, phoneNumber, talkApp, stream, channelId, true, false);
sendRtpInfo.setTcpActive(true); sendRtpInfo.setTcpActive(true);
sendRtpInfo.setUsePs(false); sendRtpInfo.setUsePs(false);
sendRtpInfo.setOnlyAudio(true); sendRtpInfo.setOnlyAudio(true);
if (onlySend == null || !onlySend) { sendRtpInfo.setReceiveStream(stream + "_talk");
sendRtpInfo.setReceiveStream(receiveStream);
}
if (onlySend == null || !onlySend) {
// 设置hook监听
Hook hook = Hook.getInstance(HookType.on_media_arrival, "1078", receiveStream, mediaServer.getId());
subscribe.addSubscribe(hook, (hookData) -> {
dynamicTask.stop(playKey);
log.info("[JT-对讲] 对讲成功, phoneNumber {} channelId {}", phoneNumber, channelId);
StreamInfo info = onPublishHandler(mediaServer, hookData, phoneNumber, channelId);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) { // 设置hook监听
errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), info)); Hook hook = Hook.getInstance(HookType.on_media_arrival, "1078", sendRtpInfo.getReceiveStream(), mediaServer.getId());
} subscribe.addSubscribe(hook, (hookData) -> {
log.info("[JT-对讲] 对讲连接建立, phoneNumber {} channelId {}", phoneNumber, channelId);
subscribe.removeSubscribe(hook); subscribe.removeSubscribe(hook);
redisTemplate.opsForValue().set(playKey, info);
// 存储发流信息 // 存储发流信息
sendRtpServerService.update(sendRtpInfo); sendRtpServerService.update(sendRtpInfo);
}); });
Hook hookForDeparture = Hook.getInstance(HookType.on_media_departure, "1078", receiveStream, mediaServer.getId()); Hook hookForDeparture = Hook.getInstance(HookType.on_media_departure, app, stream, mediaServer.getId());
subscribe.addSubscribe(hookForDeparture, (hookData) -> { subscribe.addSubscribe(hookForDeparture, (hookData) -> {
log.info("[JT-对讲] 对讲时源流注销, app: {}. stream: {}, phoneNumber {} channelId {}", app, stream, phoneNumber, channelId); log.info("[JT-对讲] 对讲时源流注销, app: {}. stream: {}, phoneNumber {} channelId {}", app, stream, phoneNumber, channelId);
stopTalk(phoneNumber, channelId); stopTalk(phoneNumber, channelId);
}); });
// 设置超时监听
dynamicTask.startDelay(playKey, () -> {
log.info("[JT-对讲] 超时, phoneNumber {} channelId {}", phoneNumber, channelId);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getCode(),
InviteErrorCode.ERROR_FOR_SIGNALLING_TIMEOUT.getMsg(), null));
}
}, userSetting.getPlayTimeout()); Integer localPort = mediaServerService.startSendRtpPassive(mediaServer, sendRtpInfo, userSetting.getPlayTimeout());
}
log.info("[JT-对讲] phoneNumber {} channelId {} 收发端口: {} app: {}, stream: {}", log.info("[JT-对讲] phoneNumber {} channelId {} 收发端口: {} app: {}, stream: {}",
phoneNumber, channelId, sendRtpInfo.getLocalPort(), app, stream); phoneNumber, channelId, localPort, app, stream);
J9101 j9101 = new J9101(); J9101 j9101 = new J9101();
j9101.setChannel(Integer.valueOf(channelId)); j9101.setChannel(channelId);
j9101.setIp(mediaServer.getSdpIp()); j9101.setIp(mediaServer.getSdpIp());
j9101.setRate(1); j9101.setRate(1);
j9101.setTcpPort(sendRtpInfo.getLocalPort()); j9101.setTcpPort(sendRtpInfo.getLocalPort());
j9101.setUdpPort(sendRtpInfo.getLocalPort()); j9101.setUdpPort(sendRtpInfo.getLocalPort());
j9101.setType(2); j9101.setType(4);
jt1078Template.startLive(phoneNumber, j9101, 6); jt1078Template.startLive(phoneNumber, j9101, 6);
if (onlySend != null && onlySend) {
log.info("[JT-对讲] 对讲成功, phoneNumber {} channelId {}", phoneNumber, channelId); log.info("[JT-对讲] 对讲消息下发成功, phoneNumber {} channelId {}", phoneNumber, channelId);
for (CommonCallback<WVPResult<StreamInfo>> errorCallback : errorCallbacks) {
errorCallback.run(new WVPResult<>(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), null));
}
// 存储发流信息 // 存储发流信息
sendRtpServerService.update(sendRtpInfo); // sendRtpServerService.update(sendRtpInfo);
}
} }
@Override @Override
@ -618,7 +626,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey); StreamInfo streamInfo = (StreamInfo) redisTemplate.opsForValue().get(playKey);
// 发送停止命令 // 发送停止命令
J9102 j9102 = new J9102(); J9102 j9102 = new J9102();
j9102.setChannel(Integer.valueOf(channelId)); j9102.setChannel(channelId);
j9102.setCommand(4); j9102.setCommand(4);
j9102.setCloseType(0); j9102.setCloseType(0);
j9102.setStreamType(1); j9102.setStreamType(1);

View File

@ -362,6 +362,12 @@ public class ABLMediaNodeServerService implements IMediaNodeServerService {
return 0; return 0;
} }
@Override
public Integer startSendRtpTalk(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout) {
logger.warn("[abl-startSendRtpTalk] 未实现");
return 0;
}
@Override @Override
public void startSendRtpStream(MediaServer mediaServer, SendRtpInfo sendRtpItem) { public void startSendRtpStream(MediaServer mediaServer, SendRtpInfo sendRtpItem) {
logger.warn("[abl-startSendRtpStream] 未实现"); logger.warn("[abl-startSendRtpStream] 未实现");

View File

@ -66,6 +66,8 @@ public interface IMediaNodeServerService {
void startSendRtpStream(MediaServer mediaServer, SendRtpInfo sendRtpItem); void startSendRtpStream(MediaServer mediaServer, SendRtpInfo sendRtpItem);
Integer startSendRtpTalk(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout);
Long updateDownloadProcess(MediaServer mediaServer, String app, String stream); Long updateDownloadProcess(MediaServer mediaServer, String app, String stream);
void startProxy(MediaServer mediaServer, StreamProxy streamProxy); void startProxy(MediaServer mediaServer, StreamProxy streamProxy);

View File

@ -148,6 +148,8 @@ public interface IMediaServerService {
Integer startSendRtpPassive(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout); Integer startSendRtpPassive(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout);
Integer startSendRtpTalk(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout);
void startSendRtp(MediaServer mediaServer, SendRtpInfo sendRtpItem); void startSendRtp(MediaServer mediaServer, SendRtpInfo sendRtpItem);
MediaServer getMediaServerByAppAndStream(String app, String stream); MediaServer getMediaServerByAppAndStream(String app, String stream);

View File

@ -936,6 +936,16 @@ public class MediaServerServiceImpl implements IMediaServerService {
return mediaNodeServerService.startSendRtpPassive(mediaServer, sendRtpItem, timeout); return mediaNodeServerService.startSendRtpPassive(mediaServer, sendRtpItem, timeout);
} }
@Override
public Integer startSendRtpTalk(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[startSendRtpPassive] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类");
}
return mediaNodeServerService.startSendRtpTalk(mediaServer, sendRtpItem, timeout);
}
@Override @Override
public void startSendRtp(MediaServer mediaServer, SendRtpInfo sendRtpItem) { public void startSendRtp(MediaServer mediaServer, SendRtpInfo sendRtpItem) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType()); IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());

View File

@ -425,6 +425,29 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
log.info("[推流结果]{} ,参数: {}",jsonObject, JSONObject.toJSONString(param)); log.info("[推流结果]{} ,参数: {}",jsonObject, JSONObject.toJSONString(param));
} }
@Override
public Integer startSendRtpTalk(MediaServer mediaServer, SendRtpInfo sendRtpItem, Integer timeout) {
Map<String, Object> param = new HashMap<>(12);
param.put("vhost","__defaultVhost__");
param.put("app", sendRtpItem.getApp());
param.put("stream", sendRtpItem.getStream());
param.put("ssrc", sendRtpItem.getSsrc());
param.put("pt", sendRtpItem.getPt());
param.put("type", sendRtpItem.isUsePs() ? "1" : "0");
param.put("only_audio", sendRtpItem.isOnlyAudio() ? "1" : "0");
param.put("recv_stream_id", sendRtpItem.getReceiveStream());
JSONObject jsonObject = zlmServerFactory.startSendRtpTalk(mediaServer, param, null);
if (jsonObject == null || jsonObject.getInteger("code") != 0 ) {
log.error("启动监听TCP被动推流失败: {}, 参数:{}", jsonObject.getString("msg"), JSON.toJSONString(param));
throw new ControllerException(jsonObject.getInteger("code"), jsonObject.getString("msg"));
}
log.info("调用ZLM-TCP被动推流接口, 结果: {}", jsonObject);
log.info("启动监听TCP被动推流成功[ {}/{} ]{}->{}:{}, " , sendRtpItem.getApp(), sendRtpItem.getStream(),
jsonObject.getString("local_port"), param.get("dst_url"), param.get("dst_port"));
return jsonObject.getInteger("local_port");
}
@Override @Override
public Long updateDownloadProcess(MediaServer mediaServer, String app, String stream) { public Long updateDownloadProcess(MediaServer mediaServer, String app, String stream) {
MediaInfo mediaInfo = getMediaInfo(mediaServer, app, stream); MediaInfo mediaInfo = getMediaInfo(mediaServer, app, stream);

View File

@ -331,6 +331,10 @@ public class ZLMRESTfulUtils {
return sendPost(mediaServerItem, "startSendRtpPassive",param, callback); return sendPost(mediaServerItem, "startSendRtpPassive",param, callback);
} }
public JSONObject startSendRtpTalk(MediaServer mediaServerItem, Map<String, Object> param, RequestCallback callback) {
return sendPost(mediaServerItem, "startSendRtpTalk",param, callback);
}
public JSONObject stopSendRtp(MediaServer mediaServerItem, Map<String, Object> param) { public JSONObject stopSendRtp(MediaServer mediaServerItem, Map<String, Object> param) {
return sendPost(mediaServerItem, "stopSendRtp",param, null); return sendPost(mediaServerItem, "stopSendRtp",param, null);
} }

View File

@ -166,6 +166,10 @@ public class ZLMServerFactory {
return zlmresTfulUtils.startSendRtpPassive(mediaServerItem, param, callback); return zlmresTfulUtils.startSendRtpPassive(mediaServerItem, param, callback);
} }
public JSONObject startSendRtpTalk(MediaServer mediaServer, Map<String, Object> param, ZLMRESTfulUtils.RequestCallback callback) {
return zlmresTfulUtils.startSendRtpTalk(mediaServer, param, callback);
}
/** /**
* 查询待转推的流是否就绪 * 查询待转推的流是否就绪
*/ */
@ -268,4 +272,6 @@ public class ZLMServerFactory {
param.put("ssrc", sendRtpItem.getSsrc()); param.put("ssrc", sendRtpItem.getSsrc());
return zlmresTfulUtils.stopSendRtp(mediaServerItem, param); return zlmresTfulUtils.stopSendRtp(mediaServerItem, param);
} }
} }

View File

@ -106,6 +106,12 @@ public class MediaServiceImpl implements IMediaService {
result.setEnable_audio(true); result.setEnable_audio(true);
return result; return result;
} }
if ("jt_talk".equals(app) && stream.endsWith("_talk")) {
ResultForOnPublish result = new ResultForOnPublish();
result.setEnable_mp4(false);
result.setEnable_audio(true);
return result;
}
StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, stream); StreamProxy streamProxyItem = streamProxyService.getStreamProxyByAppAndStream(app, stream);
if (streamProxyItem != null) { if (streamProxyItem != null) {
ResultForOnPublish result = new ResultForOnPublish(); ResultForOnPublish result = new ResultForOnPublish();

View File

@ -340,5 +340,25 @@ export function shooting(data) {
data: data data: data
}) })
} }
export function startTalk({ phoneNumber, channelId }) {
return request({
method: 'get',
url: '/api/jt1078/talk/start',
params: {
phoneNumber: phoneNumber,
channelId: channelId
}
})
}
export function stopTalk({ phoneNumber, channelId }) {
return request({
method: 'get',
url: '/api/jt1078/talk/stop',
params: {
phoneNumber: phoneNumber,
channelId: channelId
}
})
}

View File

@ -23,9 +23,9 @@ import {
reset, reset,
sendTextMessage, sendTextMessage,
setConfig, setPhoneBook, shooting, setConfig, setPhoneBook, shooting,
startPlayback, startPlayback, startTalk,
stopPlay, stopPlay,
stopPlayback, stopPlayback, stopTalk,
telephoneCallback, telephoneCallback,
update, update,
updateChannel, updateChannel,
@ -372,6 +372,26 @@ const actions = {
reject(error) reject(error)
}) })
}) })
},
startTalk({ commit }, param) {
return new Promise((resolve, reject) => {
startTalk(param).then(response => {
const { data } = response
resolve(data)
}).catch(error => {
reject(error)
})
})
},
stopTalk({ commit }, param) {
return new Promise((resolve, reject) => {
stopTalk(param).then(response => {
const { data } = response
resolve(data)
}).catch(error => {
reject(error)
})
})
} }
} }

View File

@ -619,7 +619,7 @@ export default {
// Key // Key
this.$store.dispatch('user/getUserInfo') this.$store.dispatch('user/getUserInfo')
.then((data) => { .then((data) => {
if (data == null) { if (data === null) {
this.broadcastStatus = -1 this.broadcastStatus = -1
return return
} }

View File

@ -327,7 +327,7 @@
<!-- <el-radio :label="4">中心广播</el-radio>--> <!-- <el-radio :label="4">中心广播</el-radio>-->
<!-- </el-radio-group>--> <!-- </el-radio-group>-->
<!-- </div>--> <!-- </div>-->
<div class="trank" style="text-align: center;"> <div class="trank" style="text-align: center; width: 100%;">
<el-button <el-button
:type="getBroadcastStatus()" :type="getBroadcastStatus()"
:disabled="broadcastStatus === -2" :disabled="broadcastStatus === -2"
@ -580,24 +580,19 @@ export default {
// //
this.broadcastStatus = 0 this.broadcastStatus = 0
// //
this.$axios({ this.$store.dispatch('jtDevice/startTalk', {
method: 'get', phoneNumber: this.deviceId,
url: '/api/play/broadcast/' + this.deviceId + '/' + this.channelId + '?timeout=30&broadcastMode=' + this.broadcastMode channelId: this.channelId
}).then((res) => { }).then(data => {
if (res.data.code === 0) { const streamInfo = data
const streamInfo = res.data.data.streamInfo
if (document.location.protocol.includes('https')) { if (document.location.protocol.includes('https')) {
this.startBroadcast(streamInfo.rtcs) this.startBroadcast(streamInfo.rtcs)
} else { } else {
this.startBroadcast(streamInfo.rtc) this.startBroadcast(streamInfo.rtc)
} }
} else { }).catch(error => {
this.$message({ this.$message.error(error)
showClose: true, this.broadcastStatus = -1
message: res.data.msg,
type: 'error'
})
}
}) })
} else if (this.broadcastStatus === 1) { } else if (this.broadcastStatus === 1) {
this.broadcastStatus = -1 this.broadcastStatus = -1
@ -608,7 +603,7 @@ export default {
// Key // Key
this.$store.dispatch('user/getUserInfo') this.$store.dispatch('user/getUserInfo')
.then((data) => { .then((data) => {
if (data == null) { if (data === null) {
this.broadcastStatus = -1 this.broadcastStatus = -1
return return
} }
@ -686,7 +681,10 @@ export default {
stopBroadcast() { stopBroadcast() {
this.broadcastRtc.close() this.broadcastRtc.close()
this.broadcastStatus = -1 this.broadcastStatus = -1
this.$store.dispatch('play/broadcastStop', [this.deviceId, this.channelId]) this.$store.dispatch('jtDevice/stopTalk', {
phoneNumber: this.deviceId,
channelId: this.channelId
})
} }
} }
} }

View File

@ -464,3 +464,41 @@ create table IF NOT EXISTS wvp_record_plan_item
update_time character varying(50) update_time character varying(50)
); );
drop table IF EXISTS wvp_jt_terminal;
create table IF NOT EXISTS wvp_jt_terminal (
id serial primary key,
phone_number character varying(50),
terminal_id character varying(50),
province_id character varying(50),
province_text character varying(100),
city_id character varying(50),
city_text character varying(100),
maker_id character varying(50),
model character varying(50),
plate_color character varying(50),
plate_no character varying(50),
authentication_code character varying(255),
longitude double precision,
latitude double precision,
status bool default false,
register_time character varying(50) default null,
update_time character varying(50) not null,
create_time character varying(50) not null,
geo_coord_sys character varying(50),
media_server_id character varying(50) default 'auto',
sdp_ip character varying(50),
constraint uk_jt_device_id_device_id unique (id, phone_number)
);
drop table IF EXISTS wvp_jt_channel;
create table IF NOT EXISTS wvp_jt_channel (
id serial primary key,
terminal_db_id integer,
channel_id integer,
has_audio bool default false,
name character varying(255),
update_time character varying(50) not null,
create_time character varying(50) not null,
constraint uk_jt_device_id_device_id unique (terminal_db_id, channel_id)
);

View File

@ -465,3 +465,40 @@ create table IF NOT EXISTS wvp_record_plan_item
update_time character varying(50) update_time character varying(50)
); );
drop table IF EXISTS wvp_jt_terminal;
create table IF NOT EXISTS wvp_jt_terminal (
id serial primary key,
phone_number character varying(50),
terminal_id character varying(50),
province_id character varying(50),
province_text character varying(100),
city_id character varying(50),
city_text character varying(100),
maker_id character varying(50),
model character varying(50),
plate_color character varying(50),
plate_no character varying(50),
authentication_code character varying(255),
longitude double precision,
latitude double precision,
status bool default false,
register_time character varying(50) default null,
update_time character varying(50) not null,
create_time character varying(50) not null,
geo_coord_sys character varying(50),
media_server_id character varying(50) default 'auto',
sdp_ip character varying(50),
constraint uk_jt_device_id_device_id unique (id, phone_number)
);
drop table IF EXISTS wvp_jt_channel;
create table IF NOT EXISTS wvp_jt_channel (
id serial primary key,
terminal_db_id integer,
channel_id integer,
has_audio bool default false,
name character varying(255),
update_time character varying(50) not null,
create_time character varying(50) not null,
constraint uk_jt_device_id_device_id unique (terminal_db_id, channel_id)
);

View File

@ -18,6 +18,9 @@ create table IF NOT EXISTS wvp_jt_terminal (
register_time character varying(50) default null, register_time character varying(50) default null,
update_time character varying(50) not null, update_time character varying(50) not null,
create_time character varying(50) not null, create_time character varying(50) not null,
geo_coord_sys character varying(50),
media_server_id character varying(50) default 'auto',
sdp_ip character varying(50),
constraint uk_jt_device_id_device_id unique (id, phone_number) constraint uk_jt_device_id_device_id unique (id, phone_number)
); );
@ -30,52 +33,6 @@ create table IF NOT EXISTS wvp_jt_channel (
name character varying(255), name character varying(255),
update_time character varying(50) not null, update_time character varying(50) not null,
create_time character varying(50) not null, create_time character varying(50) not null,
gb_device_id character varying(255),
gb_name character varying(255),
gb_manufacturer character varying(255),
gb_model character varying(255),
gb_civil_code character varying(8),
gb_block character varying(255),
gb_address character varying(255),
gb_parental bool default false,
gb_parent_id character varying(255),
gb_register_way integer default 1,
gb_security_level_code character varying(255),
gb_secrecy integer default 0,
gb_ip_address character varying(255),
gb_port integer,
gb_password character varying(255),
gb_status bool default false,
gb_longitude double precision,
gb_latitude double precision,
gb_business_group_id character varying(255),
gb_ptz_type integer,
gb_photoelectric_imaging_type integer,
gb_capture_position_type integer,
gb_room_type integer,
gb_supply_light_type integer default 1,
gb_direction_type integer,
gb_resolution character varying(255),
gb_stream_number_list character varying(255),
gb_download_speed character varying(255),
gb_svc_space_support_mode integer,
gb_svc_time_support_mode integer,
gb_ssvc_ratio_support_list character varying(255),
gb_mobile_device_type integer,
gb_horizontal_field_angle double precision,
gb_vertical_field_angle double precision,
gb_max_view_distance double precision,
gb_grassroots_code character varying(255),
gb_point_type integer,
gb_point_common_name character varying(255),
gb_mac character varying(255),
gb_function_type character varying(255),
gb_encode_type character varying(255),
gb_install_time character varying(255),
gb_management_unit character varying(255),
gb_contact_info character varying(255),
gb_record_save_days character varying(255),
gb_industrial_classification character varying(255),
constraint uk_jt_device_id_device_id unique (terminal_db_id, channel_id) constraint uk_jt_device_id_device_id unique (terminal_db_id, channel_id)
); );

View File

@ -18,6 +18,9 @@ create table IF NOT EXISTS wvp_jt_terminal (
register_time character varying(50) default null, register_time character varying(50) default null,
update_time character varying(50) not null, update_time character varying(50) not null,
create_time character varying(50) not null, create_time character varying(50) not null,
geo_coord_sys character varying(50),
media_server_id character varying(50) default 'auto',
sdp_ip character varying(50),
constraint uk_jt_device_id_device_id unique (id, phone_number) constraint uk_jt_device_id_device_id unique (id, phone_number)
); );
drop table IF EXISTS wvp_jt_channel; drop table IF EXISTS wvp_jt_channel;