Compare commits

...

4 Commits

Author SHA1 Message Date
阿斌
1701a51f8a
Pre Merge pull request !36 from 阿斌/N/A 2025-07-01 15:37:14 +00:00
lin
e194c027cb 推流列表新增推流地址生成 2025-07-01 23:36:10 +08:00
lin
f8e5e8f057 优化拉流代理播放机制 2025-07-01 17:07:10 +08:00
阿斌
da98101aac
update src/main/resources/civilCode.csv.
行政规划错误。江苏南通海门市,修改为海门区,浙江杭州删除下城区、江干区,新增钱塘区,临平区

Signed-off-by: 阿斌 <38912748@qq.com>
2024-12-15 08:58:42 +00:00
17 changed files with 383 additions and 205 deletions

View File

@ -7,19 +7,26 @@ import com.genersoft.iot.vmp.conf.security.SecurityUtils;
import com.genersoft.iot.vmp.conf.security.dto.LoginUser;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.StreamAuthorityInfo;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.security.SecurityRequirement;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import javax.servlet.http.HttpServletRequest;
import java.net.MalformedURLException;
import java.net.URL;
@Tag(name = "媒体流相关")
@ -52,11 +59,12 @@ public class MediaController {
@Parameter(name = "useSourceIpAsStreamIp", description = "是否使用请求IP作为返回的地址IP")
@GetMapping(value = "/stream_info_by_app_and_stream")
@ResponseBody
public StreamContent getStreamInfoByAppAndStream(HttpServletRequest request, @RequestParam String app,
@RequestParam String stream,
@RequestParam(required = false) String mediaServerId,
@RequestParam(required = false) String callId,
@RequestParam(required = false) Boolean useSourceIpAsStreamIp){
public DeferredResult<WVPResult<StreamContent>> getStreamInfoByAppAndStream(HttpServletRequest request, @RequestParam String app,
@RequestParam String stream,
@RequestParam(required = false) String mediaServerId,
@RequestParam(required = false) String callId,
@RequestParam(required = false) Boolean useSourceIpAsStreamIp){
DeferredResult<WVPResult<StreamContent>> result = new DeferredResult<>();
boolean authority = false;
if (callId != null) {
// 权限校验
@ -75,9 +83,7 @@ public class MediaController {
authority = true;
}
}
StreamInfo streamInfo;
if (useSourceIpAsStreamIp != null && useSourceIpAsStreamIp) {
String host = request.getHeader("Host");
String localAddr = host.split(":")[0];
@ -88,30 +94,37 @@ public class MediaController {
}
if (streamInfo != null){
return new StreamContent(streamInfo);
WVPResult<StreamContent> wvpResult = WVPResult.success();
wvpResult.setData(new StreamContent(streamInfo));
result.setResult(wvpResult);
}else {
ErrorCallback<StreamInfo> callback = (code, msg, streamInfoStoStart) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
WVPResult<StreamContent> wvpResult = WVPResult.success();
if (useSourceIpAsStreamIp != null && useSourceIpAsStreamIp) {
String host;
try {
URL url=new URL(request.getRequestURL().toString());
host=url.getHost();
} catch (MalformedURLException e) {
host=request.getLocalAddr();
}
streamInfoStoStart.changeStreamIp(host);
}
if (!ObjectUtils.isEmpty(streamInfoStoStart.getMediaServer().getTranscodeSuffix())
&& !"null".equalsIgnoreCase(streamInfoStoStart.getMediaServer().getTranscodeSuffix())) {
streamInfoStoStart.setStream(streamInfoStoStart.getStream() + "_" + streamInfoStoStart.getMediaServer().getTranscodeSuffix());
}
wvpResult.setData(new StreamContent(streamInfoStoStart));
result.setResult(wvpResult);
}else {
result.setResult(WVPResult.fail(code, msg));
}
};
//获取流失败重启拉流后重试一次
streamProxyService.stopByAppAndStream(app,stream);
boolean start = streamProxyService.startByAppAndStream(app, stream);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("[线程休眠失败] {}", e.getMessage());
}
if (useSourceIpAsStreamIp != null && useSourceIpAsStreamIp) {
String host = request.getHeader("Host");
String localAddr = host.split(":")[0];
log.info("使用{}作为返回流的ip", localAddr);
streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, localAddr, authority);
}else {
streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(app, stream, mediaServerId, authority);
}
if (streamInfo != null){
return new StreamContent(streamInfo);
}else {
throw new ControllerException(ErrorCode.ERROR100);
}
streamProxyService.startByAppAndStream(app, stream, callback);
}
return result;
}
/**
* 获取推流播放地址

View File

@ -64,7 +64,7 @@ public interface IMediaNodeServerService {
Long updateDownloadProcess(MediaServer mediaServer, String app, String stream);
StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy);
void startProxy(MediaServer mediaServer, StreamProxy streamProxy);
void stopProxy(MediaServer mediaServer, String streamKey);

View File

@ -150,7 +150,7 @@ public interface IMediaServerService {
Long updateDownloadProcess(MediaServer mediaServerItem, String app, String stream);
StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy);
void startProxy(MediaServer mediaServer, StreamProxy streamProxy);
void stopProxy(MediaServer mediaServer, String streamKey);

View File

@ -952,13 +952,13 @@ public class MediaServerServiceImpl implements IMediaServerService {
}
@Override
public StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy) {
public void startProxy(MediaServer mediaServer, StreamProxy streamProxy) {
IMediaNodeServerService mediaNodeServerService = nodeServerServiceMap.get(mediaServer.getType());
if (mediaNodeServerService == null) {
log.info("[startProxy] 失败, mediaServer的类型 {},未找到对应的实现类", mediaServer.getType());
throw new ControllerException(ErrorCode.ERROR100.getCode(), "未找到mediaServer对应的实现类");
}
return mediaNodeServerService.startProxy(mediaServer, streamProxy);
mediaNodeServerService.startProxy(mediaServer, streamProxy);
}
@Override

View File

@ -425,7 +425,7 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
}
@Override
public StreamInfo startProxy(MediaServer mediaServer, StreamProxy streamProxy) {
public void startProxy(MediaServer mediaServer, StreamProxy streamProxy) {
String dstUrl;
if ("ffmpeg".equalsIgnoreCase(streamProxy.getType())) {
@ -463,10 +463,6 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
MediaInfo mediaInfo = getMediaInfo(mediaServer, streamProxy.getApp(), streamProxy.getStream());
if (mediaInfo != null) {
if (mediaInfo.getOriginUrl() != null && mediaInfo.getOriginUrl().equals(streamProxy.getSrcUrl())) {
log.info("[启动拉流代理] 已存在, 直接返回, app {}, stream: {}", mediaInfo.getApp(), streamProxy.getStream());
return getStreamInfoByAppAndStream(mediaServer, streamProxy.getApp(), streamProxy.getStream(), mediaInfo, null, true);
}
closeStreams(mediaServer, streamProxy.getApp(), streamProxy.getStream());
}
@ -490,15 +486,6 @@ public class ZLMMediaNodeServerService implements IMediaNodeServerService {
JSONObject data = jsonObject.getJSONObject("data");
if (data == null) {
throw new ControllerException(jsonObject.getInteger("code"), "代理结果异常: " + jsonObject);
}else {
streamProxy.setStreamKey(data.getString("key"));
// 由于此时流未注册手动拼装流信息
mediaInfo = new MediaInfo();
mediaInfo.setApp(streamProxy.getApp());
mediaInfo.setStream(streamProxy.getStream());
mediaInfo.setOriginType(4);
mediaInfo.setOriginTypeStr("pull");
return getStreamInfoByAppAndStream(mediaServer, streamProxy.getApp(), streamProxy.getStream(), mediaInfo, null, true);
}
}
}

View File

@ -28,7 +28,7 @@ public interface IRedisRpcPlayService {
void playPush(String serverId, Integer id, ErrorCallback<StreamInfo> callback);
StreamInfo playProxy(String serverId, int id);
void playProxy(String serverId, int id, ErrorCallback<StreamInfo> callback);
void stopProxy(String serverId, int id);

View File

@ -1,7 +1,6 @@
package com.genersoft.iot.vmp.service.redisMsg.control;
import com.alibaba.fastjson2.JSONObject;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.redis.RedisRpcConfig;
import com.genersoft.iot.vmp.conf.redis.bean.RedisRpcMessage;
@ -63,10 +62,13 @@ public class RedisRpcStreamProxyController extends RpcController {
response.setBody("param error");
return response;
}
StreamInfo streamInfo = streamProxyPlayService.startProxy(streamProxy);
response.setStatusCode(ErrorCode.SUCCESS.getCode());
response.setBody(JSONObject.toJSONString(streamInfo));
return response;
streamProxyPlayService.startProxy(streamProxy, (code, msg, streamInfo) -> {
response.setStatusCode(code);
response.setBody(JSONObject.toJSONString(streamInfo));
sendResponse(response);
});
return null;
}
/**

View File

@ -212,13 +212,20 @@ public class RedisRpcPlayServiceImpl implements IRedisRpcPlayService {
}
@Override
public StreamInfo playProxy(String serverId, int id) {
public void playProxy(String serverId, int id, ErrorCallback<StreamInfo> callback) {
RedisRpcRequest request = buildRequest("streamProxy/play", id);
request.setToId(serverId);
RedisRpcResponse response = redisRpcConfig.request(request, userSetting.getPlayTimeout(), TimeUnit.SECONDS);
if (response != null && response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
return JSON.parseObject(response.getBody().toString(), StreamInfo.class);
if (response == null) {
callback.run(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg(), null);
}else {
if (response.getStatusCode() == ErrorCode.SUCCESS.getCode()) {
StreamInfo streamInfo = JSON.parseObject(response.getBody().toString(), StreamInfo.class);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}else {
callback.run(response.getStatusCode(), response.getBody().toString(), null);
}
}
return null;
}
@Override

View File

@ -7,12 +7,15 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.conf.security.JwtUtils;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxyParam;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService;
import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.vmanager.bean.StreamContent;
import com.genersoft.iot.vmp.vmanager.bean.WVPResult;
import com.github.pagehelper.PageInfo;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
@ -20,8 +23,10 @@ import io.swagger.v3.oas.annotations.security.SecurityRequirement;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.context.request.async.DeferredResult;
import javax.servlet.http.HttpServletRequest;
import java.net.MalformedURLException;
@ -89,7 +94,7 @@ public class StreamProxyController {
})
@PostMapping(value = "/save")
@ResponseBody
public StreamContent save(@RequestBody StreamProxyParam param){
public DeferredResult<WVPResult<StreamContent>> save(HttpServletRequest request, @RequestBody StreamProxyParam param){
log.info("添加代理: " + JSONObject.toJSONString(param));
if (ObjectUtils.isEmpty(param.getMediaServerId())) {
param.setMediaServerId("auto");
@ -97,18 +102,39 @@ public class StreamProxyController {
if (ObjectUtils.isEmpty(param.getType())) {
param.setType("default");
}
DeferredResult<WVPResult<StreamContent>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
ErrorCallback<StreamInfo> callback = (code, msg, streamInfo) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
WVPResult<StreamContent> wvpResult = WVPResult.success();
if (streamInfo != null) {
if (userSetting.getUseSourceIpAsStreamIp()) {
streamInfo=streamInfo.clone();//深拷贝
String host;
try {
URL url=new URL(request.getRequestURL().toString());
host=url.getHost();
} catch (MalformedURLException e) {
host=request.getLocalAddr();
}
streamInfo.changeStreamIp(host);
}
if (!ObjectUtils.isEmpty(streamInfo.getMediaServer().getTranscodeSuffix())
&& !"null".equalsIgnoreCase(streamInfo.getMediaServer().getTranscodeSuffix())) {
streamInfo.setStream(streamInfo.getStream() + "_" + streamInfo.getMediaServer().getTranscodeSuffix());
}
wvpResult.setData(new StreamContent(streamInfo));
}else {
wvpResult.setCode(code);
wvpResult.setMsg(msg);
}
StreamInfo streamInfo = streamProxyService.save(param);
if (param.isEnable()) {
if (streamInfo == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg());
result.setResult(wvpResult);
}else {
return new StreamContent(streamInfo);
result.setResult(WVPResult.fail(code, msg));
}
}else {
return null;
}
};
streamProxyService.save(param, callback);
return result;
}
@Operation(summary = "新增代理", security = @SecurityRequirement(name = JwtUtils.HEADER), parameters = {
@ -193,25 +219,46 @@ public class StreamProxyController {
@ResponseBody
@Operation(summary = "启用代理", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "id", description = "代理Id", required = true)
public StreamContent start(HttpServletRequest request, int id){
public DeferredResult<WVPResult<StreamContent>> start(HttpServletRequest request, int id){
log.info("播放代理: {}", id);
StreamInfo streamInfo = streamProxyPlayService.start(id, null, null);
if (streamInfo == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), ErrorCode.ERROR100.getMsg());
}else {
if (userSetting.getUseSourceIpAsStreamIp()) {
streamInfo=streamInfo.clone();//深拷贝
String host;
try {
URL url=new URL(request.getRequestURL().toString());
host=url.getHost();
} catch (MalformedURLException e) {
host=request.getLocalAddr();
StreamProxy streamProxy = streamProxyService.getStreamProxy(id);
Assert.notNull(streamProxy, "代理信息不存在");
DeferredResult<WVPResult<StreamContent>> result = new DeferredResult<>(userSetting.getPlayTimeout().longValue());
ErrorCallback<StreamInfo> callback = (code, msg, streamInfo) -> {
if (code == InviteErrorCode.SUCCESS.getCode()) {
WVPResult<StreamContent> wvpResult = WVPResult.success();
if (streamInfo != null) {
if (userSetting.getUseSourceIpAsStreamIp()) {
streamInfo=streamInfo.clone();//深拷贝
String host;
try {
URL url=new URL(request.getRequestURL().toString());
host=url.getHost();
} catch (MalformedURLException e) {
host=request.getLocalAddr();
}
streamInfo.changeStreamIp(host);
}
if (!ObjectUtils.isEmpty(streamInfo.getMediaServer().getTranscodeSuffix())
&& !"null".equalsIgnoreCase(streamInfo.getMediaServer().getTranscodeSuffix())) {
streamInfo.setStream(streamInfo.getStream() + "_" + streamInfo.getMediaServer().getTranscodeSuffix());
}
wvpResult.setData(new StreamContent(streamInfo));
}else {
wvpResult.setCode(code);
wvpResult.setMsg(msg);
}
streamInfo.changeStreamIp(host);
result.setResult(wvpResult);
}else {
result.setResult(WVPResult.fail(code, msg));
}
return new StreamContent(streamInfo);
}
};
streamProxyPlayService.start(id, null, callback);
return result;
}
@GetMapping(value = "/stop")

View File

@ -92,5 +92,5 @@ public interface StreamProxyMapper {
" SET pulling=#{pulling}, media_server_id = #{mediaServerId}, " +
" stream_key = #{streamKey} " +
" WHERE id=#{id}")
void addStream(StreamProxy streamProxy);
void updateStream(StreamProxy streamProxy);
}

View File

@ -4,13 +4,13 @@ import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import javax.validation.constraints.NotNull;
public interface IStreamProxyPlayService {
StreamInfo start(int id, Boolean record, ErrorCallback<StreamInfo> callback);
void start(int id, Boolean record, ErrorCallback<StreamInfo> callback);
void start(int id, ErrorCallback<StreamInfo> callback);
StreamInfo startProxy(StreamProxy streamProxy);
void startProxy(@NotNull StreamProxy streamProxy, ErrorCallback<StreamInfo> callback);
void stop(int id);

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.streamProxy.service;
import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxyParam;
import com.genersoft.iot.vmp.vmanager.bean.ResourceBaseInfo;
@ -15,7 +16,7 @@ public interface IStreamProxyService {
* 保存视频代理
* @param param
*/
StreamInfo save(StreamProxyParam param);
void save(StreamProxyParam param, ErrorCallback<StreamInfo> callback);
/**
* 分页查询
@ -38,7 +39,7 @@ public interface IStreamProxyService {
* @param stream
* @return
*/
boolean startByAppAndStream(String app, String stream);
void startByAppAndStream(String app, String stream, ErrorCallback<StreamInfo> callback);
/**
* 停用用视频代理

View File

@ -4,12 +4,10 @@ import com.genersoft.iot.vmp.common.StreamInfo;
import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
import com.genersoft.iot.vmp.media.event.hook.Hook;
import com.genersoft.iot.vmp.media.event.hook.HookSubscribe;
import com.genersoft.iot.vmp.media.event.hook.HookType;
import com.genersoft.iot.vmp.media.event.media.MediaArrivalEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.bean.InviteErrorCode;
@ -20,16 +18,12 @@ import com.genersoft.iot.vmp.streamProxy.service.IStreamProxyPlayService;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import javax.sip.message.Response;
import javax.validation.constraints.NotNull;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* 视频代理业务
@ -56,107 +50,42 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService {
@Autowired
private IRedisRpcPlayService redisRpcPlayService;
private ConcurrentHashMap<Integer, ErrorCallback<StreamInfo>> callbackMap = new ConcurrentHashMap<>();
private ConcurrentHashMap<Integer, StreamInfo> streamInfoMap = new ConcurrentHashMap<>();
/**
* 流到来的处理
*/
@Async("taskExecutor")
@Transactional
@EventListener
public void onApplicationEvent(MediaArrivalEvent event) {
if ("rtsp".equals(event.getSchema())) {
StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(event.getApp(), event.getStream());
if (streamProxy != null) {
ErrorCallback<StreamInfo> callback = callbackMap.remove(streamProxy.getId());
StreamInfo streamInfo = streamInfoMap.remove(streamProxy.getId());
if (callback != null && streamInfo != null) {
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}
}
}
}
@Override
public void start(int id, ErrorCallback<StreamInfo> callback) {
StreamProxy streamProxy = streamProxyMapper.select(id);
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
StreamInfo streamInfo = startProxy(streamProxy);
if (streamInfo == null) {
callback.run(Response.BUSY_HERE, "busy here", null);
return;
}
callbackMap.put(id, callback);
streamInfoMap.put(id, streamInfo);
MediaServer mediaServer = mediaServerService.getOne(streamProxy.getMediaServerId());
if (mediaServer != null) {
MediaInfo mediaInfo = mediaServerService.getMediaInfo(mediaServer, streamProxy.getApp(), streamProxy.getStream());
if (mediaInfo != null) {
callbackMap.remove(id);
streamInfoMap.remove(id);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
}
}
}
@Override
public StreamInfo start(int id, Boolean record, ErrorCallback<StreamInfo> callback) {
public void start(int id, Boolean record, ErrorCallback<StreamInfo> callback) {
log.info("[拉流代理] 开始拉流ID{}", id);
StreamProxy streamProxy = streamProxyMapper.select(id);
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
log.info("[拉流代理] 类型: {} app{}, stream: {}, 流地址: {}", streamProxy.getType(), streamProxy.getApp(), streamProxy.getStream(), streamProxy.getSrcUrl());
if (record != null) {
streamProxy.setEnableMp4(record);
}
if (streamProxy.getMediaServerId() != null) {
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(streamProxy.getApp(), streamProxy.getStream(), streamProxy.getMediaServerId(), false);
if (streamInfo != null) {
callbackMap.remove(id);
streamInfoMap.remove(id);
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
return streamInfo;
}
}
StreamInfo streamInfo = startProxy(streamProxy);
if (callback != null) {
// 设置流超时的定时任务
String timeOutTaskKey = UUID.randomUUID().toString();
Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, streamProxy.getApp(), streamProxy.getStream(), streamInfo.getMediaServer().getId());
dynamicTask.startDelay(timeOutTaskKey, () -> {
log.info("[拉流代理] 收流超时ID{}", id);
// 收流超时
subscribe.removeSubscribe(rtpHook);
callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), streamInfo);
}, userSetting.getPlayTimeout());
// 开启流到来的监听
subscribe.addSubscribe(rtpHook, (hookData) -> {
dynamicTask.stop(timeOutTaskKey);
// hook响应
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
subscribe.removeSubscribe(rtpHook);
});
}
return streamInfo;
startProxy(streamProxy, callback);
}
@Override
public StreamInfo startProxy(StreamProxy streamProxy){
public void startProxy(@NotNull StreamProxy streamProxy, ErrorCallback<StreamInfo> callback){
if (!streamProxy.isEnable()) {
return null;
callback.run(ErrorCode.ERROR100.getCode(), "代理未启用", null);
return;
}
if (streamProxy.getServerId() == null) {
streamProxy.setServerId(userSetting.getServerId());
}
if (!userSetting.getServerId().equals(streamProxy.getServerId())) {
return redisRpcPlayService.playProxy(streamProxy.getServerId(), streamProxy.getId());
log.info("[拉流代理] 由其他服务{}管理", streamProxy.getServerId());
redisRpcPlayService.playProxy(streamProxy.getServerId(), streamProxy.getId(), callback);
return;
}
if (streamProxy.getMediaServerId() != null) {
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStreamWithCheck(streamProxy.getApp(), streamProxy.getStream(), streamProxy.getMediaServerId(), null, false);
if (streamInfo != null) {
callback.run(ErrorCode.SUCCESS.getCode(), ErrorCode.SUCCESS.getMsg(), streamInfo);
return;
}
}
MediaServer mediaServer;
@ -169,12 +98,32 @@ public class StreamProxyPlayServiceImpl implements IStreamProxyPlayService {
if (mediaServer == null) {
throw new ControllerException(ErrorCode.ERROR100.getCode(), mediaServerId == null?"未找到可用的媒体节点":"未找到节点" + mediaServerId);
}
StreamInfo streamInfo = mediaServerService.startProxy(mediaServer, streamProxy);
// 设置流超时的定时任务
String timeOutTaskKey = UUID.randomUUID().toString();
Hook rtpHook = Hook.getInstance(HookType.on_media_arrival, streamProxy.getApp(), streamProxy.getStream(), mediaServer.getId());
dynamicTask.startDelay(timeOutTaskKey, () -> {
log.info("[拉流代理] 收流超时app{}stream: {}", streamProxy.getApp(), streamProxy.getStream());
// 收流超时
subscribe.removeSubscribe(rtpHook);
callback.run(InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getCode(), InviteErrorCode.ERROR_FOR_STREAM_TIMEOUT.getMsg(), null);
}, userSetting.getPlayTimeout());
// 开启流到来的监听
subscribe.addSubscribe(rtpHook, (hookData) -> {
log.info("[拉流代理] 收流成功app{}stream: {}", hookData.getApp(), hookData.getStream());
dynamicTask.stop(timeOutTaskKey);
StreamInfo streamInfo = mediaServerService.getStreamInfoByAppAndStream(mediaServer, hookData.getApp(), hookData.getStream(), hookData.getMediaInfo(), null);
// hook响应
callback.run(InviteErrorCode.SUCCESS.getCode(), InviteErrorCode.SUCCESS.getMsg(), streamInfo);
subscribe.removeSubscribe(rtpHook);
});
mediaServerService.startProxy(mediaServer, streamProxy);
if (mediaServerId == null || !mediaServerId.equals(mediaServer.getId())) {
streamProxy.setMediaServerId(mediaServer.getId());
streamProxyMapper.addStream(streamProxy);
streamProxyMapper.updateStream(streamProxy);
}
return streamInfo;
}
@Override

View File

@ -15,6 +15,7 @@ import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOfflineEvent;
import com.genersoft.iot.vmp.media.event.mediaServer.MediaServerOnlineEvent;
import com.genersoft.iot.vmp.media.service.IMediaServerService;
import com.genersoft.iot.vmp.media.zlm.dto.hook.OriginType;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxy;
import com.genersoft.iot.vmp.streamProxy.bean.StreamProxyParam;
@ -109,7 +110,9 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
// 拉流代理
StreamProxy streamProxyByAppAndStream = getStreamProxyByAppAndStream(event.getApp(), event.getStream());
if (streamProxyByAppAndStream != null && streamProxyByAppAndStream.isEnableDisableNoneReader()) {
startByAppAndStream(event.getApp(), event.getStream());
startByAppAndStream(event.getApp(), event.getStream(), ((code, msg, data) -> {
log.info("[拉流代理] 自动点播成功, app {} stream: {}", event.getApp(), event.getStream());
}));
}
}
@ -136,7 +139,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
@Transactional
public StreamInfo save(StreamProxyParam param) {
public void save(StreamProxyParam param, ErrorCallback<StreamInfo> callback) {
// 兼容旧接口
StreamProxy streamProxyInDb = getStreamProxyByAppAndStream(param.getApp(), param.getStream());
if (streamProxyInDb != null && streamProxyInDb.getPulling() != null && streamProxyInDb.getPulling()) {
@ -159,9 +162,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
}
if (param.isEnable()) {
return playService.startProxy(streamProxy);
} else {
return null;
playService.startProxy(streamProxy, callback);
}
}
@ -247,13 +248,12 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
@Override
public boolean startByAppAndStream(String app, String stream) {
public void startByAppAndStream(String app, String stream, ErrorCallback<StreamInfo> callback) {
StreamProxy streamProxy = streamProxyMapper.selectOneByAppAndStream(app, stream);
if (streamProxy == null) {
throw new ControllerException(ErrorCode.ERROR404.getCode(), "代理信息未找到");
}
StreamInfo streamInfo = playService.startProxy(streamProxy);
return streamInfo != null;
playService.startProxy(streamProxy, callback);
}
@Override
@ -406,7 +406,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
streamProxy.setPulling(status);
streamProxy.setMediaServerId(mediaServerId);
streamProxy.setUpdateTime(DateUtil.getNow());
streamProxyMapper.addStream(streamProxy);
streamProxyMapper.updateStream(streamProxy);
streamProxy.setGbStatus(status ? "ON" : "OFF");
if (streamProxy.getGbId() > 0) {

View File

@ -861,7 +861,7 @@
320623,如东县,3206
320681,启东市,3206
320682,如皋市,3206
320684,海门,3206
320684,海门,3206
320685,海安市,3206
3207,连云港市,32
320703,连云区,3207
@ -918,8 +918,6 @@
33,浙江省,
3301,杭州市,33
330102,上城区,3301
330103,下城区,3301
330104,江干区,3301
330105,拱墅区,3301
330106,西湖区,3301
330108,滨江区,3301
@ -927,6 +925,8 @@
330110,余杭区,3301
330111,富阳区,3301
330112,临安区,3301
330113,临平区,3301
330114,钱塘区,3301
330122,桐庐县,3301
330127,淳安县,3301
330182,建德市,3301

1 编号 名称 上级
861 320623 如东县 3206
862 320681 启东市 3206
863 320682 如皋市 3206
864 320684 海门市 海门区 3206
865 320685 海安市 3206
866 3207 连云港市 32
867 320703 连云区 3207
918 33 浙江省
919 3301 杭州市 33
920 330102 上城区 3301
330103 下城区 3301
330104 江干区 3301
921 330105 拱墅区 3301
922 330106 西湖区 3301
923 330108 滨江区 3301
925 330110 余杭区 3301
926 330111 富阳区 3301
927 330112 临安区 3301
928 330113 临平区 3301
929 330114 钱塘区 3301
930 330122 桐庐县 3301
931 330127 淳安县 3301
932 330182 建德市 3301

View File

@ -0,0 +1,163 @@
<template>
<div id="addUser" v-loading="isLoging">
<el-dialog
v-el-drag-dialog
title="构建推流地址"
width="40%"
top="2rem"
:close-on-click-modal="false"
:visible.sync="showDialog"
:destroy-on-close="true"
@close="close()"
>
<div id="shared" style="margin-right: 20px;">
<el-form ref="buildFrom" status-icon label-width="80px">
<el-form-item label="应用名" prop="app">
<el-input v-model="app" autocomplete="off" />
</el-form-item>
<el-form-item label="流ID" prop="stream">
<el-input v-model="stream" autocomplete="off" />
</el-form-item>
<el-form-item label="媒体节点" prop="mediaServerId">
<el-select v-model="mediaServer" placeholder="请选择" style="width: 100%">
<el-option
v-for="item in mediaServerList"
:key="item.id"
:label="item.id"
:value="item"
/>
</el-select>
</el-form-item>
<el-form-item label="地址" prop="url">
<div style="width: 100%" v-if="rtc" title="点击拷贝">
<el-tag size="medium" @click="copyUrl(rtc)">
<i class="el-icon-document-copy"/>
{{ rtc }}
</el-tag>
</div>
<div style="width: 100%" v-if="rtsp" title="点击拷贝">
<el-tag size="medium" @click="copyUrl(rtsp)">
<i class="el-icon-document-copy"/>
{{ rtsp }}
</el-tag>
</div>
<div style="width: 100%" v-if="rtmp" title="点击拷贝">
<el-tag size="medium" @click="copyUrl(rtmp)">
<i class="el-icon-document-copy"/>
{{ rtmp }}
</el-tag>
</div>
<div style="width: 100%" v-if="rtcs" title="点击拷贝">
<el-tag size="medium" @click="copyUrl(rtcs)">
<i class="el-icon-document-copy" />
{{ rtcs }}
</el-tag>
</div>
</el-form-item>
<el-form-item>
<div style="float: right;">
<el-button type="primary" @click="onSubmit">确认</el-button>
</div>
</el-form-item>
</el-form>
</div>
</el-dialog>
</div>
</template>
<script>
import elDragDialog from '@/directive/el-drag-dialog'
import crypto from "crypto";
export default {
name: 'BuildPushStreamUrl',
directives: { elDragDialog },
props: {},
data() {
return {
showDialog: false,
app: null,
stream: null,
mediaServer: null,
mediaServerList: [],
pushKey: null,
endCallback: null
}
},
computed: {
sign(){
if (!this.pushKey) {
return ''
}
return crypto.createHash('md5').update(this.pushKey, 'utf8').digest('hex')
},
rtsp(){
if (!this.mediaServer || !this.stream || !this.app) {
return ''
}
crypto.createHash('md5').update(this.pushKey, 'utf8').digest('hex')
return `rtsp://${this.mediaServer.streamIp}:${this.mediaServer.rtspPort}/${this.app}/${this.stream}?sign=${this.sign}`
},
rtmp(){
if (!this.mediaServer || !this.stream || !this.app) {
return ''
}
return `rtmp://${this.mediaServer.streamIp}:${this.mediaServer.rtmpPort}/${this.app}/${this.stream}?sign=${this.sign}`
},
rtc(){
if (!this.mediaServer || !this.stream || !this.app) {
return ''
}
return `http://${this.mediaServer.streamIp}:${this.mediaServer.httpPort}/index/api/webrtc?app=${this.app}&stream=${this.stream}&sign=${this.sign}`
},
rtcs(){
if (!this.mediaServer || !this.stream || !this.app) {
return ''
}
return `https://${this.mediaServer.streamIp}:${this.mediaServer.httpSSlPort}/index/api/webrtc?app=${this.app}&stream=${this.stream}&sign=${this.sign}`
}
},
created() {
this.initData()
},
methods: {
openDialog: function(callback) {
this.endCallback = callback
this.showDialog = true
},
onSubmit: function() {
},
close: function() {
this.showDialog = false
this.app = null
this.stream = null
this.mediaServer = null
this.endCallback = null
this.mediaServerList = []
},
initData: function() {
this.loading = true
this.$store.dispatch('server/getMediaServerList').then(data => {
this.mediaServerList = data
})
this.$store.dispatch('user/getUserInfo').then(data => {
this.pushKey = data.pushKey
})
},
copyUrl: function(dropdownItem) {
console.log(dropdownItem)
this.$copyText(dropdownItem).then((e) => {
this.$message.success({
showClose: true,
message: '成功拷贝到粘贴板'
})
}, (e) => {
})
},
}
}
</script>

View File

@ -45,24 +45,27 @@
<el-form-item>
<el-button icon="el-icon-plus" style="margin-right: 1rem;" type="primary" @click="addStream">添加
</el-button>
<el-button icon="el-icon-upload2" style="margin-right: 1rem;" @click="importChannel">
通道导入
</el-button>
<el-button icon="el-icon-download" style="margin-right: 1rem;">
<a
style="text-align: center; text-decoration: none"
href="/static/file/推流通道导入.zip"
download="推流通道导入.zip"
>下载模板</a>
</el-button>
<el-button-group>
<el-button icon="el-icon-upload2" @click="importChannel">
通道导入
</el-button>
<el-button icon="el-icon-download">
<a
style="text-align: center; text-decoration: none"
href="/static/file/推流通道导入.zip"
download="推流通道导入.zip"
>下载模板</a>
</el-button>
</el-button-group>
<el-button
icon="el-icon-delete"
style="margin-right: 1rem;"
style="margin-left: 1rem;"
:disabled="multipleSelection.length === 0"
type="danger"
@click="batchDel"
>移除
</el-button>
<el-button icon="el-icon-chicken" @click="buildPushStream">生成推流地址</el-button>
</el-form-item>
<el-form-item style="float: right;">
<el-button icon="el-icon-refresh-right" circle @click="refresh()" />
@ -135,6 +138,7 @@
<addStreamTOGB ref="addStreamTOGB" />
<importChannel ref="importChannel" />
<stream-push-edit v-if="streamPush" :stream-push="streamPush" :close-edit="closeEdit" style="height: calc(100vh - 90px);" />
<buildPushStreamUrl ref="buildPushStreamUrl" />
</div>
</template>
@ -143,6 +147,7 @@ import devicePlayer from '../dialog/devicePlayer.vue'
import addStreamTOGB from '../dialog/pushStreamEdit.vue'
import importChannel from '../dialog/importChannel.vue'
import StreamPushEdit from './edit.vue'
import buildPushStreamUrl from './buildPushStreamUrl.vue'
export default {
name: 'PushList',
@ -150,7 +155,8 @@ export default {
StreamPushEdit,
devicePlayer,
addStreamTOGB,
importChannel
importChannel,
buildPushStreamUrl
},
data() {
return {
@ -287,6 +293,9 @@ export default {
},
refresh: function() {
this.initData()
},
buildPushStream: function() {
this.$refs.buildPushStreamUrl.openDialog()
}
}
}