mirror of
https://gitee.com/pan648540858/wvp-GB28181-pro.git
synced 2026-05-21 04:47:49 +08:00
开启虚拟线程,修改代码已使用虚拟线程
This commit is contained in:
parent
f396f5f29e
commit
7a4d5e551d
@ -1,40 +0,0 @@
|
||||
package com.genersoft.iot.vmp.conf;
|
||||
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.annotation.SchedulingConfigurer;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
|
||||
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
import static com.genersoft.iot.vmp.conf.ThreadPoolTaskConfig.cpuNum;
|
||||
|
||||
/**
|
||||
* "@Scheduled"是Spring框架提供的一种定时任务执行机制,默认情况下它是单线程的,在同时执行多个定时任务时可能会出现阻塞和性能问题。
|
||||
* 为了解决这种单线程瓶颈问题,可以将定时任务的执行机制改为支持多线程
|
||||
*/
|
||||
@Configuration
|
||||
public class ScheduleConfig implements SchedulingConfigurer {
|
||||
|
||||
/**
|
||||
* 核心线程数(默认线程数)
|
||||
*/
|
||||
private static final int corePoolSize = Math.max(cpuNum, 20);
|
||||
|
||||
/**
|
||||
* 线程池名前缀
|
||||
*/
|
||||
private static final String threadNamePrefix = "schedule";
|
||||
|
||||
@Override
|
||||
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
|
||||
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(corePoolSize,
|
||||
new BasicThreadFactory.Builder().namingPattern(threadNamePrefix).daemon(true).build(),
|
||||
new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
taskRegistrar.setScheduler(scheduledThreadPoolExecutor);
|
||||
}
|
||||
}
|
||||
@ -1,67 +0,0 @@
|
||||
package com.genersoft.iot.vmp.conf;
|
||||
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.annotation.Order;
|
||||
import org.springframework.scheduling.annotation.EnableAsync;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
/**
|
||||
* ThreadPoolTask 配置类
|
||||
* @author lin
|
||||
*/
|
||||
@Configuration
|
||||
@Order(1)
|
||||
@EnableAsync(proxyTargetClass = true)
|
||||
public class ThreadPoolTaskConfig {
|
||||
|
||||
public static final int cpuNum = Runtime.getRuntime().availableProcessors();
|
||||
|
||||
/**
|
||||
* 默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,
|
||||
* 当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
|
||||
* 当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝
|
||||
*/
|
||||
|
||||
/**
|
||||
* 核心线程数(默认线程数)
|
||||
*/
|
||||
private static final int corePoolSize = Math.max(cpuNum * 2, 16);
|
||||
/**
|
||||
* 最大线程数
|
||||
*/
|
||||
private static final int maxPoolSize = corePoolSize * 10;
|
||||
/**
|
||||
* 允许线程空闲时间(单位:默认为秒)
|
||||
*/
|
||||
private static final int keepAliveTime = 30;
|
||||
|
||||
/**
|
||||
* 缓冲队列大小
|
||||
*/
|
||||
private static final int queueCapacity = 10000;
|
||||
/**
|
||||
* 线程池名前缀
|
||||
*/
|
||||
private static final String threadNamePrefix = "async-";
|
||||
|
||||
|
||||
@Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名
|
||||
public ThreadPoolTaskExecutor taskExecutor() {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
executor.setCorePoolSize(corePoolSize);
|
||||
executor.setMaxPoolSize(maxPoolSize);
|
||||
executor.setQueueCapacity(queueCapacity);
|
||||
executor.setKeepAliveSeconds(keepAliveTime);
|
||||
executor.setThreadNamePrefix(threadNamePrefix);
|
||||
|
||||
// 线程池对拒绝任务的处理策略
|
||||
// CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
|
||||
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
// 初始化
|
||||
executor.initialize();
|
||||
return executor;
|
||||
}
|
||||
}
|
||||
@ -11,11 +11,10 @@ import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController;
|
||||
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.connection.MessageListener;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
@ -43,9 +42,8 @@ public class RedisRpcConfig implements MessageListener {
|
||||
|
||||
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
@Qualifier("taskExecutor")
|
||||
@Autowired
|
||||
private ThreadPoolTaskExecutor taskExecutor;
|
||||
private TaskExecutor taskExecutor;
|
||||
|
||||
private final static Map<String, RedisRpcClassHandler> protocolHash = new HashMap<>();
|
||||
|
||||
|
||||
@ -159,6 +159,7 @@ public class CatalogEventLister implements ApplicationListener<CatalogEvent> {
|
||||
List<CommonGBChannel> channelList = new ArrayList<>();
|
||||
CommonGBChannel deviceChannel = channelMap.get(gbId);
|
||||
channelList.add(deviceChannel);
|
||||
|
||||
try {
|
||||
sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(event.getType(), platform, channelList, subscribeInfo, null);
|
||||
} catch (InvalidArgumentException | ParseException | NoSuchFieldException |
|
||||
|
||||
@ -97,7 +97,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
|
||||
/**
|
||||
* 监听录像查询结束事件
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@org.springframework.context.event.EventListener
|
||||
public void onApplicationEvent(RecordInfoEndEvent event) {
|
||||
SynchronousQueue<RecordInfo> queue = topicSubscribers.get("record" + event.getRecordInfo().getSn());
|
||||
|
||||
@ -229,7 +229,18 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
||||
log.info("[更新多个离线设备信息] 参数为空");
|
||||
return;
|
||||
}
|
||||
deviceMapper.offlineByList(offlineDevices);
|
||||
int limitCount = 300;
|
||||
if (offlineDevices.size() > limitCount) {
|
||||
for (int i = 0; i < offlineDevices.size(); i += limitCount) {
|
||||
int toIndex = i + limitCount;
|
||||
if (i + limitCount > offlineDevices.size()) {
|
||||
toIndex = offlineDevices.size();
|
||||
}
|
||||
deviceMapper.offlineByList(offlineDevices.subList(i, toIndex));
|
||||
}
|
||||
}else {
|
||||
deviceMapper.offlineByList(offlineDevices);
|
||||
}
|
||||
for (Device device : offlineDevices) {
|
||||
device.setOnLine(false);
|
||||
redisCatchStorage.updateDevice(device);
|
||||
|
||||
@ -46,7 +46,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
|
||||
/**
|
||||
* 流离开的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaDepartureEvent event) {
|
||||
if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) {
|
||||
|
||||
@ -123,7 +123,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
|
||||
}
|
||||
}
|
||||
}else {
|
||||
log.info("[Catalog事件: {}] 未找到上级平台: {}", event.getMessageType(), serverGbId);
|
||||
log.info("[Catalog事件: {}] 没有需要通知的上级平台: {}", event.getMessageType(), serverGbId);
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
||||
@ -298,7 +298,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
||||
/**
|
||||
* 流离开的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaDepartureEvent event) {
|
||||
List<SendRtpInfo> sendRtpItems = sendRtpServerService.queryByStream(event.getStream());
|
||||
@ -325,7 +325,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
||||
/**
|
||||
* 发流停止
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaSendRtpStoppedEvent event) {
|
||||
List<SendRtpInfo> sendRtpItems = sendRtpServerService.queryByStream(event.getStream());
|
||||
|
||||
@ -129,7 +129,7 @@ public class PlayServiceImpl implements IPlayService {
|
||||
/**
|
||||
* 流到来的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaArrivalEvent event) {
|
||||
if ("broadcast".equals(event.getApp()) || "talk".equals(event.getApp())) {
|
||||
@ -177,7 +177,7 @@ public class PlayServiceImpl implements IPlayService {
|
||||
/**
|
||||
* 流离开的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaDepartureEvent event) {
|
||||
List<SendRtpInfo> sendRtpInfos = sendRtpServerService.queryByStream(event.getStream());
|
||||
@ -246,7 +246,7 @@ public class PlayServiceImpl implements IPlayService {
|
||||
/**
|
||||
* 流未找到的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaNotFoundEvent event) {
|
||||
if (!"rtp".equals(event.getApp())) {
|
||||
|
||||
@ -59,7 +59,7 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
|
||||
* @param requestEvent RequestEvent事件
|
||||
*/
|
||||
@Override
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
public void processRequest(RequestEvent requestEvent) {
|
||||
String method = requestEvent.getRequest().getMethod();
|
||||
ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method);
|
||||
@ -77,7 +77,7 @@ public class SIPProcessorObserver implements ISIPProcessorObserver {
|
||||
* @param responseEvent responseEvent事件
|
||||
*/
|
||||
@Override
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
public void processResponse(ResponseEvent responseEvent) {
|
||||
SIPResponse response = (SIPResponse)responseEvent.getResponse();
|
||||
int status = response.getStatusCode();
|
||||
|
||||
@ -14,8 +14,7 @@ import org.dom4j.DocumentException;
|
||||
import org.dom4j.Element;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
@ -45,9 +44,8 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
|
||||
|
||||
private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
@Qualifier("taskExecutor")
|
||||
@Autowired
|
||||
private ThreadPoolTaskExecutor taskExecutor;
|
||||
private TaskExecutor taskExecutor;
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
|
||||
@ -5,11 +5,8 @@ import com.genersoft.iot.vmp.gb28181.bean.Device;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Platform;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.RecordInfo;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.RecordItem;
|
||||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
||||
import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEndEvent;
|
||||
import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEvent;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
|
||||
@ -20,10 +17,8 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.dom4j.Element;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
|
||||
@ -83,7 +83,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
|
||||
/**
|
||||
* 流到来的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaArrivalEvent event) {
|
||||
if (event.getApp().equals(talkApp) && event.getStream().endsWith("_talk")) {
|
||||
@ -112,7 +112,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
|
||||
/**
|
||||
* 流离开的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaDepartureEvent event) {
|
||||
|
||||
@ -121,7 +121,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
|
||||
/**
|
||||
* 流未找到的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaNotFoundEvent event) {
|
||||
if (!userSetting.getAutoApplyPlay()) {
|
||||
|
||||
@ -119,7 +119,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
|
||||
/**
|
||||
* 流到来的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@org.springframework.context.event.EventListener
|
||||
public void onApplicationEvent(MediaArrivalEvent event) {
|
||||
|
||||
@ -128,7 +128,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
|
||||
/**
|
||||
* 流离开的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaDepartureEvent event) {
|
||||
|
||||
@ -137,7 +137,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
|
||||
/**
|
||||
* 设备更新的通知
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(DeviceUpdateEvent event) {
|
||||
JTDevice device = event.getDevice();
|
||||
@ -163,7 +163,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
|
||||
/**
|
||||
* 位置更新的通知
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(JTPositionEvent event) {
|
||||
if (event.getPhoneNumber() == null || event.getPositionInfo() == null
|
||||
|
||||
@ -25,10 +25,8 @@ import jakarta.servlet.http.HttpServletRequest;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.context.ApplicationEventPublisher;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
@ -80,10 +78,6 @@ public class ABLHttpHookListener {
|
||||
@Autowired
|
||||
private SSRCFactory ssrcFactory;
|
||||
|
||||
@Qualifier("taskExecutor")
|
||||
@Autowired
|
||||
private ThreadPoolTaskExecutor taskExecutor;
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<Object, Object> redisTemplate;
|
||||
|
||||
|
||||
@ -59,7 +59,7 @@ public class ABLMediaServerStatusManger {
|
||||
|
||||
private final String type = "abl";
|
||||
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaServerChangeEvent event) {
|
||||
if (event.getMediaServerItemList() == null
|
||||
@ -77,7 +77,7 @@ public class ABLMediaServerStatusManger {
|
||||
execute();
|
||||
}
|
||||
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(HookAblServerStartEvent event) {
|
||||
if (event.getMediaServerItem() == null
|
||||
@ -93,7 +93,7 @@ public class ABLMediaServerStatusManger {
|
||||
online(serverItem, null);
|
||||
}
|
||||
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(HookAblServerKeepaliveEvent event) {
|
||||
if (event.getMediaServerItem() == null) {
|
||||
@ -107,7 +107,7 @@ public class ABLMediaServerStatusManger {
|
||||
online(serverItem, null);
|
||||
}
|
||||
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaServerDeleteEvent event) {
|
||||
if (event.getMediaServer() == null) {
|
||||
|
||||
@ -32,7 +32,7 @@ public class HookSubscribe {
|
||||
/**
|
||||
* 流到来的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaArrivalEvent event) {
|
||||
if (event.getSchema() == null || "rtsp".equals(event.getSchema())) {
|
||||
@ -44,7 +44,7 @@ public class HookSubscribe {
|
||||
/**
|
||||
* 流结束事件
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaDepartureEvent event) {
|
||||
if (event.getSchema() == null || "rtsp".equals(event.getSchema())) {
|
||||
@ -55,7 +55,7 @@ public class HookSubscribe {
|
||||
/**
|
||||
* 推流鉴权事件
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaPublishEvent event) {
|
||||
sendNotify(HookType.on_publish, event);
|
||||
@ -63,7 +63,7 @@ public class HookSubscribe {
|
||||
/**
|
||||
* 生成录像文件事件
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaRecordMp4Event event) {
|
||||
sendNotify(HookType.on_record_mp4, event);
|
||||
|
||||
@ -22,14 +22,14 @@ public class MediaServerStatusEventListener {
|
||||
@Autowired
|
||||
private IPlayService playService;
|
||||
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaServerOnlineEvent event) {
|
||||
log.info("[媒体节点] 上线 ID:" + event.getMediaServer().getId());
|
||||
playService.zlmServerOnline(event.getMediaServer());
|
||||
}
|
||||
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaServerOfflineEvent event) {
|
||||
|
||||
|
||||
@ -87,7 +87,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
|
||||
/**
|
||||
* 流到来的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@org.springframework.context.event.EventListener
|
||||
public void onApplicationEvent(MediaArrivalEvent event) {
|
||||
if ("rtsp".equals(event.getSchema())) {
|
||||
@ -101,7 +101,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
|
||||
/**
|
||||
* 流离开的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaDepartureEvent event) {
|
||||
if ("rtsp".equals(event.getSchema())) {
|
||||
@ -120,7 +120,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
|
||||
/**
|
||||
* 流媒体节点上线
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
@Transactional
|
||||
public void onApplicationEvent(MediaServerOnlineEvent event) {
|
||||
@ -131,7 +131,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
|
||||
/**
|
||||
* 流媒体节点离线
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
@Transactional
|
||||
public void onApplicationEvent(MediaServerOfflineEvent event) {
|
||||
|
||||
@ -66,7 +66,7 @@ public class ZLMMediaServerStatusManager {
|
||||
|
||||
private final String type = "zlm";
|
||||
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaServerChangeEvent event) {
|
||||
if (event.getMediaServerItemList() == null
|
||||
@ -84,7 +84,7 @@ public class ZLMMediaServerStatusManager {
|
||||
}
|
||||
}
|
||||
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(HookZlmServerStartEvent event) {
|
||||
if (event.getMediaServer() == null
|
||||
@ -96,7 +96,7 @@ public class ZLMMediaServerStatusManager {
|
||||
online(event.getMediaServer(), event.getConfig());
|
||||
}
|
||||
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(HookZlmServerKeepaliveEvent event) {
|
||||
if (event.getMediaServerItem() == null) {
|
||||
@ -110,7 +110,7 @@ public class ZLMMediaServerStatusManager {
|
||||
online(mediaServer, null);
|
||||
}
|
||||
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaServerDeleteEvent event) {
|
||||
if (event.getMediaServer() == null) {
|
||||
|
||||
@ -116,7 +116,7 @@ public class CloudRecordServiceImpl implements ICloudRecordService {
|
||||
return new ArrayList<>(resultSet);
|
||||
}
|
||||
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaRecordMp4Event event) {
|
||||
CloudRecordItem cloudRecordItem = CloudRecordItem.getInstance(event);
|
||||
|
||||
@ -51,7 +51,7 @@ public class RecordPlanServiceImpl implements IRecordPlanService {
|
||||
/**
|
||||
* 流离开的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaDepartureEvent event) {
|
||||
// 流断开,检查是否还处于录像状态, 如果是则继续录像
|
||||
|
||||
@ -50,7 +50,7 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
|
||||
/**
|
||||
* 流到来的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@org.springframework.context.event.EventListener
|
||||
public void onApplicationEvent(MediaArrivalEvent event) {
|
||||
|
||||
@ -59,7 +59,7 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
|
||||
/**
|
||||
* 流离开的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaDepartureEvent event) {
|
||||
|
||||
|
||||
@ -3,12 +3,9 @@ package com.genersoft.iot.vmp.service.redisMsg;
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.data.redis.connection.Message;
|
||||
import org.springframework.data.redis.connection.MessageListener;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
@ -30,9 +27,6 @@ public class RedisPushStreamResponseListener implements MessageListener {
|
||||
|
||||
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
@Qualifier("taskExecutor")
|
||||
@Autowired
|
||||
private ThreadPoolTaskExecutor taskExecutor;
|
||||
|
||||
private final Map<String, PushStreamResponseEvent> responseEvents = new ConcurrentHashMap<>();
|
||||
|
||||
|
||||
@ -76,7 +76,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
/**
|
||||
* 流到来的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@Transactional
|
||||
@org.springframework.context.event.EventListener
|
||||
public void onApplicationEvent(MediaArrivalEvent event) {
|
||||
@ -88,7 +88,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
/**
|
||||
* 流离开的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
@Transactional
|
||||
public void onApplicationEvent(MediaDepartureEvent event) {
|
||||
@ -100,7 +100,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
/**
|
||||
* 流未找到的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MediaNotFoundEvent event) {
|
||||
if ("rtp".equals(event.getApp())) {
|
||||
@ -118,7 +118,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
/**
|
||||
* 流媒体节点上线
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
@Transactional
|
||||
public void onApplicationEvent(MediaServerOnlineEvent event) {
|
||||
@ -128,7 +128,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService {
|
||||
/**
|
||||
* 流媒体节点离线
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
@Transactional
|
||||
public void onApplicationEvent(MediaServerOfflineEvent event) {
|
||||
|
||||
@ -63,7 +63,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
/**
|
||||
* 流到来的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
@Transactional
|
||||
public void onApplicationEvent(MediaArrivalEvent event) {
|
||||
@ -118,7 +118,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
/**
|
||||
* 流离开的处理
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
@Transactional
|
||||
public void onApplicationEvent(MediaDepartureEvent event) {
|
||||
@ -155,7 +155,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
/**
|
||||
* 流媒体节点上线
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
@Transactional
|
||||
public void onApplicationEvent(MediaServerOnlineEvent event) {
|
||||
@ -165,7 +165,7 @@ public class StreamPushServiceImpl implements IStreamPushService {
|
||||
/**
|
||||
* 流媒体节点离线
|
||||
*/
|
||||
@Async("taskExecutor")
|
||||
@Async
|
||||
@EventListener
|
||||
@Transactional
|
||||
public void onApplicationEvent(MediaServerOfflineEvent event) {
|
||||
|
||||
@ -42,49 +42,4 @@ public class IpPortUtil {
|
||||
throw new IllegalArgumentException("无效的IP地址: " + ip, e);
|
||||
}
|
||||
}
|
||||
|
||||
// 测试用例
|
||||
public static void main(String[] args) {
|
||||
// IPv4测试
|
||||
String ipv4 = "192.168.1.1";
|
||||
String port1 = "8080";
|
||||
System.out.println(concatenateIpAndPort(ipv4, port1)); // 输出: 192.168.1.1:8080
|
||||
|
||||
// IPv6测试
|
||||
String ipv6 = "2001:0db8:85a3:0000:0000:8a2e:0370:7334";
|
||||
String port2 = "80";
|
||||
System.out.println(concatenateIpAndPort(ipv6, port2)); // 输出: [2001:0db8:85a3:0000:0000:8a2e:0370:7334]:80
|
||||
|
||||
// 压缩格式IPv6测试
|
||||
String ipv6Compressed = "2001:db8::1";
|
||||
System.out.println(concatenateIpAndPort(ipv6Compressed, port2)); // 输出: [2001:db8::1]:80
|
||||
|
||||
// 无效IP测试
|
||||
try {
|
||||
System.out.println(concatenateIpAndPort("invalid.ip", "1234"));
|
||||
} catch (IllegalArgumentException e) {
|
||||
System.out.println("捕获到预期异常: " + e.getMessage());
|
||||
}
|
||||
|
||||
// 无效端口测试 - 非数字
|
||||
try {
|
||||
System.out.println(concatenateIpAndPort(ipv4, "abc"));
|
||||
} catch (IllegalArgumentException e) {
|
||||
System.out.println("捕获到预期异常: " + e.getMessage());
|
||||
}
|
||||
|
||||
// 无效端口测试 - 超出范围
|
||||
try {
|
||||
System.out.println(concatenateIpAndPort(ipv4, "70000"));
|
||||
} catch (IllegalArgumentException e) {
|
||||
System.out.println("捕获到预期异常: " + e.getMessage());
|
||||
}
|
||||
|
||||
// 空端口测试
|
||||
try {
|
||||
System.out.println(concatenateIpAndPort(ipv4, ""));
|
||||
} catch (IllegalArgumentException e) {
|
||||
System.out.println("捕获到预期异常: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,5 +1,8 @@
|
||||
spring:
|
||||
application:
|
||||
name: wvp
|
||||
threads:
|
||||
virtual:
|
||||
enabled: true
|
||||
profiles:
|
||||
active: 274-dev
|
||||
|
||||
Loading…
Reference in New Issue
Block a user