From 7a4d5e551d4e0b566ad87c36961f7b61c07453e5 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Wed, 14 Jan 2026 11:02:58 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BC=80=E5=90=AF=E8=99=9A=E6=8B=9F=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=EF=BC=8C=E4=BF=AE=E6=94=B9=E4=BB=A3=E7=A0=81=E5=B7=B2?= =?UTF-8?q?=E4=BD=BF=E7=94=A8=E8=99=9A=E6=8B=9F=E7=BA=BF=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/conf/ScheduleConfig.java | 40 ----------- .../iot/vmp/conf/ThreadPoolTaskConfig.java | 67 ------------------- .../iot/vmp/conf/redis/RedisRpcConfig.java | 6 +- .../subscribe/catalog/CatalogEventLister.java | 1 + .../impl/DeviceChannelServiceImpl.java | 2 +- .../service/impl/DeviceServiceImpl.java | 13 +++- .../service/impl/InviteStreamServiceImpl.java | 2 +- .../impl/PlatformChannelServiceImpl.java | 2 +- .../service/impl/PlatformServiceImpl.java | 4 +- .../gb28181/service/impl/PlayServiceImpl.java | 6 +- .../transmit/SIPProcessorObserver.java | 4 +- .../MobilePositionNotifyMessageHandler.java | 6 +- .../cmd/RecordInfoResponseMessageHandler.java | 5 -- .../service/impl/jt1078PlayServiceImpl.java | 6 +- .../service/impl/jt1078ServiceImpl.java | 8 +-- .../vmp/media/abl/ABLHttpHookListener.java | 6 -- .../media/abl/ABLMediaServerStatusManger.java | 8 +-- .../vmp/media/event/hook/HookSubscribe.java | 8 +-- .../MediaServerStatusEventListener.java | 4 +- .../service/impl/MediaServerServiceImpl.java | 8 +-- .../zlm/ZLMMediaServerStatusManager.java | 8 +-- .../service/impl/CloudRecordServiceImpl.java | 2 +- .../service/impl/RecordPlanServiceImpl.java | 2 +- .../service/impl/RtpServerServiceImpl.java | 4 +- .../RedisPushStreamResponseListener.java | 6 -- .../service/impl/StreamProxyServiceImpl.java | 10 +-- .../service/impl/StreamPushServiceImpl.java | 8 +-- .../genersoft/iot/vmp/utils/IpPortUtil.java | 47 +------------ src/main/resources/application.yml | 3 + 29 files changed, 69 insertions(+), 227 deletions(-) delete mode 100644 src/main/java/com/genersoft/iot/vmp/conf/ScheduleConfig.java delete mode 100644 src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java diff --git a/src/main/java/com/genersoft/iot/vmp/conf/ScheduleConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/ScheduleConfig.java deleted file mode 100644 index df88bcf3a..000000000 --- a/src/main/java/com/genersoft/iot/vmp/conf/ScheduleConfig.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.genersoft.iot.vmp.conf; - -import org.apache.commons.lang3.concurrent.BasicThreadFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.annotation.Configuration; -import org.springframework.scheduling.annotation.SchedulingConfigurer; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.scheduling.config.ScheduledTaskRegistrar; - -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor; - -import static com.genersoft.iot.vmp.conf.ThreadPoolTaskConfig.cpuNum; - -/** - * "@Scheduled"是Spring框架提供的一种定时任务执行机制,默认情况下它是单线程的,在同时执行多个定时任务时可能会出现阻塞和性能问题。 - * 为了解决这种单线程瓶颈问题,可以将定时任务的执行机制改为支持多线程 - */ -@Configuration -public class ScheduleConfig implements SchedulingConfigurer { - - /** - * 核心线程数(默认线程数) - */ - private static final int corePoolSize = Math.max(cpuNum, 20); - - /** - * 线程池名前缀 - */ - private static final String threadNamePrefix = "schedule"; - - @Override - public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { - ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(corePoolSize, - new BasicThreadFactory.Builder().namingPattern(threadNamePrefix).daemon(true).build(), - new ThreadPoolExecutor.CallerRunsPolicy()); - taskRegistrar.setScheduler(scheduledThreadPoolExecutor); - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java deleted file mode 100644 index a549a053a..000000000 --- a/src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.genersoft.iot.vmp.conf; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.core.annotation.Order; -import org.springframework.scheduling.annotation.EnableAsync; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; - -import java.util.concurrent.ThreadPoolExecutor; - -/** - * ThreadPoolTask 配置类 - * @author lin - */ -@Configuration -@Order(1) -@EnableAsync(proxyTargetClass = true) -public class ThreadPoolTaskConfig { - - public static final int cpuNum = Runtime.getRuntime().availableProcessors(); - - /** - * 默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务, - * 当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中; - * 当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝 - */ - - /** - * 核心线程数(默认线程数) - */ - private static final int corePoolSize = Math.max(cpuNum * 2, 16); - /** - * 最大线程数 - */ - private static final int maxPoolSize = corePoolSize * 10; - /** - * 允许线程空闲时间(单位:默认为秒) - */ - private static final int keepAliveTime = 30; - - /** - * 缓冲队列大小 - */ - private static final int queueCapacity = 10000; - /** - * 线程池名前缀 - */ - private static final String threadNamePrefix = "async-"; - - - @Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名 - public ThreadPoolTaskExecutor taskExecutor() { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(corePoolSize); - executor.setMaxPoolSize(maxPoolSize); - executor.setQueueCapacity(queueCapacity); - executor.setKeepAliveSeconds(keepAliveTime); - executor.setThreadNamePrefix(threadNamePrefix); - - // 线程池对拒绝任务的处理策略 - // CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务 - executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); - // 初始化 - executor.initialize(); - return executor; - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java index e413d7240..779fe9eda 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java @@ -11,11 +11,10 @@ import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.core.task.TaskExecutor; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.lang.reflect.Method; @@ -43,9 +42,8 @@ public class RedisRpcConfig implements MessageListener { private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - @Qualifier("taskExecutor") @Autowired - private ThreadPoolTaskExecutor taskExecutor; + private TaskExecutor taskExecutor; private final static Map protocolHash = new HashMap<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index 2aef0ceab..9bcbb9b7c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -159,6 +159,7 @@ public class CatalogEventLister implements ApplicationListener { List channelList = new ArrayList<>(); CommonGBChannel deviceChannel = channelMap.get(gbId); channelList.add(deviceChannel); + try { sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(event.getType(), platform, channelList, subscribeInfo, null); } catch (InvalidArgumentException | ParseException | NoSuchFieldException | diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java index 73e316dc5..f6e155e6d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java @@ -97,7 +97,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { /** * 监听录像查询结束事件 */ - @Async("taskExecutor") + @Async @org.springframework.context.event.EventListener public void onApplicationEvent(RecordInfoEndEvent event) { SynchronousQueue queue = topicSubscribers.get("record" + event.getRecordInfo().getSn()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index eb0f64318..691365161 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -229,7 +229,18 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { log.info("[更新多个离线设备信息] 参数为空"); return; } - deviceMapper.offlineByList(offlineDevices); + int limitCount = 300; + if (offlineDevices.size() > limitCount) { + for (int i = 0; i < offlineDevices.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > offlineDevices.size()) { + toIndex = offlineDevices.size(); + } + deviceMapper.offlineByList(offlineDevices.subList(i, toIndex)); + } + }else { + deviceMapper.offlineByList(offlineDevices); + } for (Device device : offlineDevices) { device.setOnLine(false); redisCatchStorage.updateDevice(device); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java index c3c768072..4c0bb826f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java @@ -46,7 +46,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService { /** * 流离开的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaDepartureEvent event) { if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java index 71178d4df..390153a48 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java @@ -123,7 +123,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { } } }else { - log.info("[Catalog事件: {}] 未找到上级平台: {}", event.getMessageType(), serverGbId); + log.info("[Catalog事件: {}] 没有需要通知的上级平台: {}", event.getMessageType(), serverGbId); } } break; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index a29866707..6798b0591 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -298,7 +298,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner /** * 流离开的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaDepartureEvent event) { List sendRtpItems = sendRtpServerService.queryByStream(event.getStream()); @@ -325,7 +325,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner /** * 发流停止 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaSendRtpStoppedEvent event) { List sendRtpItems = sendRtpServerService.queryByStream(event.getStream()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index f564af05a..8e80d9fb2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -129,7 +129,7 @@ public class PlayServiceImpl implements IPlayService { /** * 流到来的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaArrivalEvent event) { if ("broadcast".equals(event.getApp()) || "talk".equals(event.getApp())) { @@ -177,7 +177,7 @@ public class PlayServiceImpl implements IPlayService { /** * 流离开的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaDepartureEvent event) { List sendRtpInfos = sendRtpServerService.queryByStream(event.getStream()); @@ -246,7 +246,7 @@ public class PlayServiceImpl implements IPlayService { /** * 流未找到的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaNotFoundEvent event) { if (!"rtp".equals(event.getApp())) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java index 3ac4be1bf..420bb094b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java @@ -59,7 +59,7 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { * @param requestEvent RequestEvent事件 */ @Override - @Async("taskExecutor") + @Async public void processRequest(RequestEvent requestEvent) { String method = requestEvent.getRequest().getMethod(); ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method); @@ -77,7 +77,7 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { * @param responseEvent responseEvent事件 */ @Override - @Async("taskExecutor") + @Async public void processResponse(ResponseEvent responseEvent) { SIPResponse response = (SIPResponse)responseEvent.getResponse(); int status = response.getStatusCode(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java index ec65dc8d4..cc26f4adb 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java @@ -14,8 +14,7 @@ import org.dom4j.DocumentException; import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.core.task.TaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; @@ -45,9 +44,8 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - @Qualifier("taskExecutor") @Autowired - private ThreadPoolTaskExecutor taskExecutor; + private TaskExecutor taskExecutor; @Override public void afterPropertiesSet() throws Exception { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java index 0ab97e87f..4272bd9e3 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -5,11 +5,8 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; import com.genersoft.iot.vmp.gb28181.bean.RecordItem; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEndEvent; import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEvent; -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; @@ -20,10 +17,8 @@ import lombok.extern.slf4j.Slf4j; import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationEventPublisher; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java index f9096780b..7a29640b8 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java @@ -83,7 +83,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { /** * 流到来的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaArrivalEvent event) { if (event.getApp().equals(talkApp) && event.getStream().endsWith("_talk")) { @@ -112,7 +112,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { /** * 流离开的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaDepartureEvent event) { @@ -121,7 +121,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { /** * 流未找到的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaNotFoundEvent event) { if (!userSetting.getAutoApplyPlay()) { diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java index 6247af899..342f4f52b 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java @@ -119,7 +119,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { /** * 流到来的处理 */ - @Async("taskExecutor") + @Async @org.springframework.context.event.EventListener public void onApplicationEvent(MediaArrivalEvent event) { @@ -128,7 +128,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { /** * 流离开的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaDepartureEvent event) { @@ -137,7 +137,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { /** * 设备更新的通知 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(DeviceUpdateEvent event) { JTDevice device = event.getDevice(); @@ -163,7 +163,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { /** * 位置更新的通知 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(JTPositionEvent event) { if (event.getPhoneNumber() == null || event.getPositionInfo() == null diff --git a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLHttpHookListener.java index 17a6c5680..07322899c 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLHttpHookListener.java @@ -25,10 +25,8 @@ import jakarta.servlet.http.HttpServletRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationEventPublisher; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; @@ -80,10 +78,6 @@ public class ABLHttpHookListener { @Autowired private SSRCFactory ssrcFactory; - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - @Autowired private RedisTemplate redisTemplate; diff --git a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaServerStatusManger.java b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaServerStatusManger.java index 24d446511..a04552a99 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaServerStatusManger.java +++ b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaServerStatusManger.java @@ -59,7 +59,7 @@ public class ABLMediaServerStatusManger { private final String type = "abl"; - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaServerChangeEvent event) { if (event.getMediaServerItemList() == null @@ -77,7 +77,7 @@ public class ABLMediaServerStatusManger { execute(); } - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(HookAblServerStartEvent event) { if (event.getMediaServerItem() == null @@ -93,7 +93,7 @@ public class ABLMediaServerStatusManger { online(serverItem, null); } - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(HookAblServerKeepaliveEvent event) { if (event.getMediaServerItem() == null) { @@ -107,7 +107,7 @@ public class ABLMediaServerStatusManger { online(serverItem, null); } - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaServerDeleteEvent event) { if (event.getMediaServer() == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java index c26f3dca3..8e40578c7 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java @@ -32,7 +32,7 @@ public class HookSubscribe { /** * 流到来的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaArrivalEvent event) { if (event.getSchema() == null || "rtsp".equals(event.getSchema())) { @@ -44,7 +44,7 @@ public class HookSubscribe { /** * 流结束事件 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaDepartureEvent event) { if (event.getSchema() == null || "rtsp".equals(event.getSchema())) { @@ -55,7 +55,7 @@ public class HookSubscribe { /** * 推流鉴权事件 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaPublishEvent event) { sendNotify(HookType.on_publish, event); @@ -63,7 +63,7 @@ public class HookSubscribe { /** * 生成录像文件事件 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaRecordMp4Event event) { sendNotify(HookType.on_record_mp4, event); diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java index 2e5a6ea08..0b0bacf9c 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java @@ -22,14 +22,14 @@ public class MediaServerStatusEventListener { @Autowired private IPlayService playService; - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaServerOnlineEvent event) { log.info("[媒体节点] 上线 ID:" + event.getMediaServer().getId()); playService.zlmServerOnline(event.getMediaServer()); } - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaServerOfflineEvent event) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index deb1044ce..7b6e0aa7d 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -87,7 +87,7 @@ public class MediaServerServiceImpl implements IMediaServerService { /** * 流到来的处理 */ - @Async("taskExecutor") + @Async @org.springframework.context.event.EventListener public void onApplicationEvent(MediaArrivalEvent event) { if ("rtsp".equals(event.getSchema())) { @@ -101,7 +101,7 @@ public class MediaServerServiceImpl implements IMediaServerService { /** * 流离开的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaDepartureEvent event) { if ("rtsp".equals(event.getSchema())) { @@ -120,7 +120,7 @@ public class MediaServerServiceImpl implements IMediaServerService { /** * 流媒体节点上线 */ - @Async("taskExecutor") + @Async @EventListener @Transactional public void onApplicationEvent(MediaServerOnlineEvent event) { @@ -131,7 +131,7 @@ public class MediaServerServiceImpl implements IMediaServerService { /** * 流媒体节点离线 */ - @Async("taskExecutor") + @Async @EventListener @Transactional public void onApplicationEvent(MediaServerOfflineEvent event) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManager.java index f1eb7d5b9..fd0d7d664 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManager.java @@ -66,7 +66,7 @@ public class ZLMMediaServerStatusManager { private final String type = "zlm"; - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaServerChangeEvent event) { if (event.getMediaServerItemList() == null @@ -84,7 +84,7 @@ public class ZLMMediaServerStatusManager { } } - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(HookZlmServerStartEvent event) { if (event.getMediaServer() == null @@ -96,7 +96,7 @@ public class ZLMMediaServerStatusManager { online(event.getMediaServer(), event.getConfig()); } - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(HookZlmServerKeepaliveEvent event) { if (event.getMediaServerItem() == null) { @@ -110,7 +110,7 @@ public class ZLMMediaServerStatusManager { online(mediaServer, null); } - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaServerDeleteEvent event) { if (event.getMediaServer() == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/CloudRecordServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/CloudRecordServiceImpl.java index 4dc5df890..90c381564 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/CloudRecordServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/CloudRecordServiceImpl.java @@ -116,7 +116,7 @@ public class CloudRecordServiceImpl implements ICloudRecordService { return new ArrayList<>(resultSet); } - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaRecordMp4Event event) { CloudRecordItem cloudRecordItem = CloudRecordItem.getInstance(event); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RecordPlanServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RecordPlanServiceImpl.java index 05c4d5fa2..503a7113a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RecordPlanServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RecordPlanServiceImpl.java @@ -51,7 +51,7 @@ public class RecordPlanServiceImpl implements IRecordPlanService { /** * 流离开的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaDepartureEvent event) { // 流断开,检查是否还处于录像状态, 如果是则继续录像 diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java index 86099262d..f8b5b60d5 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java @@ -50,7 +50,7 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { /** * 流到来的处理 */ - @Async("taskExecutor") + @Async @org.springframework.context.event.EventListener public void onApplicationEvent(MediaArrivalEvent event) { @@ -59,7 +59,7 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { /** * 流离开的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaDepartureEvent event) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java index f8a78dca7..decd87a52 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java @@ -3,12 +3,9 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; @@ -30,9 +27,6 @@ public class RedisPushStreamResponseListener implements MessageListener { private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; private final Map responseEvents = new ConcurrentHashMap<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java index 854a43a5c..ee98d9c9f 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java @@ -76,7 +76,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { /** * 流到来的处理 */ - @Async("taskExecutor") + @Async @Transactional @org.springframework.context.event.EventListener public void onApplicationEvent(MediaArrivalEvent event) { @@ -88,7 +88,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { /** * 流离开的处理 */ - @Async("taskExecutor") + @Async @EventListener @Transactional public void onApplicationEvent(MediaDepartureEvent event) { @@ -100,7 +100,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { /** * 流未找到的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaNotFoundEvent event) { if ("rtp".equals(event.getApp())) { @@ -118,7 +118,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { /** * 流媒体节点上线 */ - @Async("taskExecutor") + @Async @EventListener @Transactional public void onApplicationEvent(MediaServerOnlineEvent event) { @@ -128,7 +128,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { /** * 流媒体节点离线 */ - @Async("taskExecutor") + @Async @EventListener @Transactional public void onApplicationEvent(MediaServerOfflineEvent event) { diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java index 60c5555f8..4ea54800d 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java @@ -63,7 +63,7 @@ public class StreamPushServiceImpl implements IStreamPushService { /** * 流到来的处理 */ - @Async("taskExecutor") + @Async @EventListener @Transactional public void onApplicationEvent(MediaArrivalEvent event) { @@ -118,7 +118,7 @@ public class StreamPushServiceImpl implements IStreamPushService { /** * 流离开的处理 */ - @Async("taskExecutor") + @Async @EventListener @Transactional public void onApplicationEvent(MediaDepartureEvent event) { @@ -155,7 +155,7 @@ public class StreamPushServiceImpl implements IStreamPushService { /** * 流媒体节点上线 */ - @Async("taskExecutor") + @Async @EventListener @Transactional public void onApplicationEvent(MediaServerOnlineEvent event) { @@ -165,7 +165,7 @@ public class StreamPushServiceImpl implements IStreamPushService { /** * 流媒体节点离线 */ - @Async("taskExecutor") + @Async @EventListener @Transactional public void onApplicationEvent(MediaServerOfflineEvent event) { diff --git a/src/main/java/com/genersoft/iot/vmp/utils/IpPortUtil.java b/src/main/java/com/genersoft/iot/vmp/utils/IpPortUtil.java index 2ce0a5568..d0e3f433e 100644 --- a/src/main/java/com/genersoft/iot/vmp/utils/IpPortUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/utils/IpPortUtil.java @@ -42,49 +42,4 @@ public class IpPortUtil { throw new IllegalArgumentException("无效的IP地址: " + ip, e); } } - - // 测试用例 - public static void main(String[] args) { - // IPv4测试 - String ipv4 = "192.168.1.1"; - String port1 = "8080"; - System.out.println(concatenateIpAndPort(ipv4, port1)); // 输出: 192.168.1.1:8080 - - // IPv6测试 - String ipv6 = "2001:0db8:85a3:0000:0000:8a2e:0370:7334"; - String port2 = "80"; - System.out.println(concatenateIpAndPort(ipv6, port2)); // 输出: [2001:0db8:85a3:0000:0000:8a2e:0370:7334]:80 - - // 压缩格式IPv6测试 - String ipv6Compressed = "2001:db8::1"; - System.out.println(concatenateIpAndPort(ipv6Compressed, port2)); // 输出: [2001:db8::1]:80 - - // 无效IP测试 - try { - System.out.println(concatenateIpAndPort("invalid.ip", "1234")); - } catch (IllegalArgumentException e) { - System.out.println("捕获到预期异常: " + e.getMessage()); - } - - // 无效端口测试 - 非数字 - try { - System.out.println(concatenateIpAndPort(ipv4, "abc")); - } catch (IllegalArgumentException e) { - System.out.println("捕获到预期异常: " + e.getMessage()); - } - - // 无效端口测试 - 超出范围 - try { - System.out.println(concatenateIpAndPort(ipv4, "70000")); - } catch (IllegalArgumentException e) { - System.out.println("捕获到预期异常: " + e.getMessage()); - } - - // 空端口测试 - try { - System.out.println(concatenateIpAndPort(ipv4, "")); - } catch (IllegalArgumentException e) { - System.out.println("捕获到预期异常: " + e.getMessage()); - } - } -} \ No newline at end of file +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 67213e8a3..f6b7f7771 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,5 +1,8 @@ spring: application: name: wvp + threads: + virtual: + enabled: true profiles: active: 274-dev