From 7a4d5e551d4e0b566ad87c36961f7b61c07453e5 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Wed, 14 Jan 2026 11:02:58 +0800 Subject: [PATCH 01/25] =?UTF-8?q?=E5=BC=80=E5=90=AF=E8=99=9A=E6=8B=9F?= =?UTF-8?q?=E7=BA=BF=E7=A8=8B=EF=BC=8C=E4=BF=AE=E6=94=B9=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E5=B7=B2=E4=BD=BF=E7=94=A8=E8=99=9A=E6=8B=9F=E7=BA=BF=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/conf/ScheduleConfig.java | 40 ----------- .../iot/vmp/conf/ThreadPoolTaskConfig.java | 67 ------------------- .../iot/vmp/conf/redis/RedisRpcConfig.java | 6 +- .../subscribe/catalog/CatalogEventLister.java | 1 + .../impl/DeviceChannelServiceImpl.java | 2 +- .../service/impl/DeviceServiceImpl.java | 13 +++- .../service/impl/InviteStreamServiceImpl.java | 2 +- .../impl/PlatformChannelServiceImpl.java | 2 +- .../service/impl/PlatformServiceImpl.java | 4 +- .../gb28181/service/impl/PlayServiceImpl.java | 6 +- .../transmit/SIPProcessorObserver.java | 4 +- .../MobilePositionNotifyMessageHandler.java | 6 +- .../cmd/RecordInfoResponseMessageHandler.java | 5 -- .../service/impl/jt1078PlayServiceImpl.java | 6 +- .../service/impl/jt1078ServiceImpl.java | 8 +-- .../vmp/media/abl/ABLHttpHookListener.java | 6 -- .../media/abl/ABLMediaServerStatusManger.java | 8 +-- .../vmp/media/event/hook/HookSubscribe.java | 8 +-- .../MediaServerStatusEventListener.java | 4 +- .../service/impl/MediaServerServiceImpl.java | 8 +-- .../zlm/ZLMMediaServerStatusManager.java | 8 +-- .../service/impl/CloudRecordServiceImpl.java | 2 +- .../service/impl/RecordPlanServiceImpl.java | 2 +- .../service/impl/RtpServerServiceImpl.java | 4 +- .../RedisPushStreamResponseListener.java | 6 -- .../service/impl/StreamProxyServiceImpl.java | 10 +-- .../service/impl/StreamPushServiceImpl.java | 8 +-- .../genersoft/iot/vmp/utils/IpPortUtil.java | 47 +------------ src/main/resources/application.yml | 3 + 29 files changed, 69 insertions(+), 227 deletions(-) delete mode 100644 src/main/java/com/genersoft/iot/vmp/conf/ScheduleConfig.java delete mode 100644 src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java diff --git a/src/main/java/com/genersoft/iot/vmp/conf/ScheduleConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/ScheduleConfig.java deleted file mode 100644 index df88bcf3a..000000000 --- a/src/main/java/com/genersoft/iot/vmp/conf/ScheduleConfig.java +++ /dev/null @@ -1,40 +0,0 @@ -package com.genersoft.iot.vmp.conf; - -import org.apache.commons.lang3.concurrent.BasicThreadFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.context.annotation.Configuration; -import org.springframework.scheduling.annotation.SchedulingConfigurer; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; -import org.springframework.scheduling.config.ScheduledTaskRegistrar; - -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor; - -import static com.genersoft.iot.vmp.conf.ThreadPoolTaskConfig.cpuNum; - -/** - * "@Scheduled"是Spring框架提供的一种定时任务执行机制,默认情况下它是单线程的,在同时执行多个定时任务时可能会出现阻塞和性能问题。 - * 为了解决这种单线程瓶颈问题,可以将定时任务的执行机制改为支持多线程 - */ -@Configuration -public class ScheduleConfig implements SchedulingConfigurer { - - /** - * 核心线程数(默认线程数) - */ - private static final int corePoolSize = Math.max(cpuNum, 20); - - /** - * 线程池名前缀 - */ - private static final String threadNamePrefix = "schedule"; - - @Override - public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { - ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(corePoolSize, - new BasicThreadFactory.Builder().namingPattern(threadNamePrefix).daemon(true).build(), - new ThreadPoolExecutor.CallerRunsPolicy()); - taskRegistrar.setScheduler(scheduledThreadPoolExecutor); - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java deleted file mode 100644 index a549a053a..000000000 --- a/src/main/java/com/genersoft/iot/vmp/conf/ThreadPoolTaskConfig.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.genersoft.iot.vmp.conf; - -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.core.annotation.Order; -import org.springframework.scheduling.annotation.EnableAsync; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; - -import java.util.concurrent.ThreadPoolExecutor; - -/** - * ThreadPoolTask 配置类 - * @author lin - */ -@Configuration -@Order(1) -@EnableAsync(proxyTargetClass = true) -public class ThreadPoolTaskConfig { - - public static final int cpuNum = Runtime.getRuntime().availableProcessors(); - - /** - * 默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务, - * 当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中; - * 当队列满了,就继续创建线程,当线程数量大于等于maxPoolSize后,开始使用拒绝策略拒绝 - */ - - /** - * 核心线程数(默认线程数) - */ - private static final int corePoolSize = Math.max(cpuNum * 2, 16); - /** - * 最大线程数 - */ - private static final int maxPoolSize = corePoolSize * 10; - /** - * 允许线程空闲时间(单位:默认为秒) - */ - private static final int keepAliveTime = 30; - - /** - * 缓冲队列大小 - */ - private static final int queueCapacity = 10000; - /** - * 线程池名前缀 - */ - private static final String threadNamePrefix = "async-"; - - - @Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名 - public ThreadPoolTaskExecutor taskExecutor() { - ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); - executor.setCorePoolSize(corePoolSize); - executor.setMaxPoolSize(maxPoolSize); - executor.setQueueCapacity(queueCapacity); - executor.setKeepAliveSeconds(keepAliveTime); - executor.setThreadNamePrefix(threadNamePrefix); - - // 线程池对拒绝任务的处理策略 - // CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务 - executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); - // 初始化 - executor.initialize(); - return executor; - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java index e413d7240..779fe9eda 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java @@ -11,11 +11,10 @@ import com.genersoft.iot.vmp.service.redisMsg.dto.RpcController; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.core.task.TaskExecutor; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.lang.reflect.Method; @@ -43,9 +42,8 @@ public class RedisRpcConfig implements MessageListener { private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - @Qualifier("taskExecutor") @Autowired - private ThreadPoolTaskExecutor taskExecutor; + private TaskExecutor taskExecutor; private final static Map protocolHash = new HashMap<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java index 2aef0ceab..9bcbb9b7c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/catalog/CatalogEventLister.java @@ -159,6 +159,7 @@ public class CatalogEventLister implements ApplicationListener { List channelList = new ArrayList<>(); CommonGBChannel deviceChannel = channelMap.get(gbId); channelList.add(deviceChannel); + try { sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(event.getType(), platform, channelList, subscribeInfo, null); } catch (InvalidArgumentException | ParseException | NoSuchFieldException | diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java index 73e316dc5..f6e155e6d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java @@ -97,7 +97,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { /** * 监听录像查询结束事件 */ - @Async("taskExecutor") + @Async @org.springframework.context.event.EventListener public void onApplicationEvent(RecordInfoEndEvent event) { SynchronousQueue queue = topicSubscribers.get("record" + event.getRecordInfo().getSn()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index eb0f64318..691365161 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -229,7 +229,18 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { log.info("[更新多个离线设备信息] 参数为空"); return; } - deviceMapper.offlineByList(offlineDevices); + int limitCount = 300; + if (offlineDevices.size() > limitCount) { + for (int i = 0; i < offlineDevices.size(); i += limitCount) { + int toIndex = i + limitCount; + if (i + limitCount > offlineDevices.size()) { + toIndex = offlineDevices.size(); + } + deviceMapper.offlineByList(offlineDevices.subList(i, toIndex)); + } + }else { + deviceMapper.offlineByList(offlineDevices); + } for (Device device : offlineDevices) { device.setOnLine(false); redisCatchStorage.updateDevice(device); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java index c3c768072..4c0bb826f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java @@ -46,7 +46,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService { /** * 流离开的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaDepartureEvent event) { if ("rtsp".equals(event.getSchema()) && "rtp".equals(event.getApp())) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java index 71178d4df..390153a48 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java @@ -123,7 +123,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { } } }else { - log.info("[Catalog事件: {}] 未找到上级平台: {}", event.getMessageType(), serverGbId); + log.info("[Catalog事件: {}] 没有需要通知的上级平台: {}", event.getMessageType(), serverGbId); } } break; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java index a29866707..6798b0591 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformServiceImpl.java @@ -298,7 +298,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner /** * 流离开的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaDepartureEvent event) { List sendRtpItems = sendRtpServerService.queryByStream(event.getStream()); @@ -325,7 +325,7 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner /** * 发流停止 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaSendRtpStoppedEvent event) { List sendRtpItems = sendRtpServerService.queryByStream(event.getStream()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java index f564af05a..8e80d9fb2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlayServiceImpl.java @@ -129,7 +129,7 @@ public class PlayServiceImpl implements IPlayService { /** * 流到来的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaArrivalEvent event) { if ("broadcast".equals(event.getApp()) || "talk".equals(event.getApp())) { @@ -177,7 +177,7 @@ public class PlayServiceImpl implements IPlayService { /** * 流离开的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaDepartureEvent event) { List sendRtpInfos = sendRtpServerService.queryByStream(event.getStream()); @@ -246,7 +246,7 @@ public class PlayServiceImpl implements IPlayService { /** * 流未找到的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaNotFoundEvent event) { if (!"rtp".equals(event.getApp())) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java index 3ac4be1bf..420bb094b 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/SIPProcessorObserver.java @@ -59,7 +59,7 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { * @param requestEvent RequestEvent事件 */ @Override - @Async("taskExecutor") + @Async public void processRequest(RequestEvent requestEvent) { String method = requestEvent.getRequest().getMethod(); ISIPRequestProcessor sipRequestProcessor = requestProcessorMap.get(method); @@ -77,7 +77,7 @@ public class SIPProcessorObserver implements ISIPProcessorObserver { * @param responseEvent responseEvent事件 */ @Override - @Async("taskExecutor") + @Async public void processResponse(ResponseEvent responseEvent) { SIPResponse response = (SIPResponse)responseEvent.getResponse(); int status = response.getStatusCode(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java index ec65dc8d4..cc26f4adb 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java @@ -14,8 +14,7 @@ import org.dom4j.DocumentException; import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.core.task.TaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; @@ -45,9 +44,8 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - @Qualifier("taskExecutor") @Autowired - private ThreadPoolTaskExecutor taskExecutor; + private TaskExecutor taskExecutor; @Override public void afterPropertiesSet() throws Exception { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java index 0ab97e87f..4272bd9e3 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -5,11 +5,8 @@ import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.bean.RecordInfo; import com.genersoft.iot.vmp.gb28181.bean.RecordItem; -import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEndEvent; import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEvent; -import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; -import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; @@ -20,10 +17,8 @@ import lombok.extern.slf4j.Slf4j; import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationEventPublisher; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java index f9096780b..7a29640b8 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java @@ -83,7 +83,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { /** * 流到来的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaArrivalEvent event) { if (event.getApp().equals(talkApp) && event.getStream().endsWith("_talk")) { @@ -112,7 +112,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { /** * 流离开的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaDepartureEvent event) { @@ -121,7 +121,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { /** * 流未找到的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaNotFoundEvent event) { if (!userSetting.getAutoApplyPlay()) { diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java index 6247af899..342f4f52b 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java @@ -119,7 +119,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { /** * 流到来的处理 */ - @Async("taskExecutor") + @Async @org.springframework.context.event.EventListener public void onApplicationEvent(MediaArrivalEvent event) { @@ -128,7 +128,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { /** * 流离开的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaDepartureEvent event) { @@ -137,7 +137,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { /** * 设备更新的通知 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(DeviceUpdateEvent event) { JTDevice device = event.getDevice(); @@ -163,7 +163,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { /** * 位置更新的通知 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(JTPositionEvent event) { if (event.getPhoneNumber() == null || event.getPositionInfo() == null diff --git a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLHttpHookListener.java index 17a6c5680..07322899c 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLHttpHookListener.java @@ -25,10 +25,8 @@ import jakarta.servlet.http.HttpServletRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.ApplicationEventPublisher; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; @@ -80,10 +78,6 @@ public class ABLHttpHookListener { @Autowired private SSRCFactory ssrcFactory; - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; - @Autowired private RedisTemplate redisTemplate; diff --git a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaServerStatusManger.java b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaServerStatusManger.java index 24d446511..a04552a99 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaServerStatusManger.java +++ b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLMediaServerStatusManger.java @@ -59,7 +59,7 @@ public class ABLMediaServerStatusManger { private final String type = "abl"; - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaServerChangeEvent event) { if (event.getMediaServerItemList() == null @@ -77,7 +77,7 @@ public class ABLMediaServerStatusManger { execute(); } - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(HookAblServerStartEvent event) { if (event.getMediaServerItem() == null @@ -93,7 +93,7 @@ public class ABLMediaServerStatusManger { online(serverItem, null); } - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(HookAblServerKeepaliveEvent event) { if (event.getMediaServerItem() == null) { @@ -107,7 +107,7 @@ public class ABLMediaServerStatusManger { online(serverItem, null); } - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaServerDeleteEvent event) { if (event.getMediaServer() == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java index c26f3dca3..8e40578c7 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/hook/HookSubscribe.java @@ -32,7 +32,7 @@ public class HookSubscribe { /** * 流到来的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaArrivalEvent event) { if (event.getSchema() == null || "rtsp".equals(event.getSchema())) { @@ -44,7 +44,7 @@ public class HookSubscribe { /** * 流结束事件 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaDepartureEvent event) { if (event.getSchema() == null || "rtsp".equals(event.getSchema())) { @@ -55,7 +55,7 @@ public class HookSubscribe { /** * 推流鉴权事件 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaPublishEvent event) { sendNotify(HookType.on_publish, event); @@ -63,7 +63,7 @@ public class HookSubscribe { /** * 生成录像文件事件 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaRecordMp4Event event) { sendNotify(HookType.on_record_mp4, event); diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java index 2e5a6ea08..0b0bacf9c 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerStatusEventListener.java @@ -22,14 +22,14 @@ public class MediaServerStatusEventListener { @Autowired private IPlayService playService; - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaServerOnlineEvent event) { log.info("[媒体节点] 上线 ID:" + event.getMediaServer().getId()); playService.zlmServerOnline(event.getMediaServer()); } - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaServerOfflineEvent event) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index deb1044ce..7b6e0aa7d 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -87,7 +87,7 @@ public class MediaServerServiceImpl implements IMediaServerService { /** * 流到来的处理 */ - @Async("taskExecutor") + @Async @org.springframework.context.event.EventListener public void onApplicationEvent(MediaArrivalEvent event) { if ("rtsp".equals(event.getSchema())) { @@ -101,7 +101,7 @@ public class MediaServerServiceImpl implements IMediaServerService { /** * 流离开的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaDepartureEvent event) { if ("rtsp".equals(event.getSchema())) { @@ -120,7 +120,7 @@ public class MediaServerServiceImpl implements IMediaServerService { /** * 流媒体节点上线 */ - @Async("taskExecutor") + @Async @EventListener @Transactional public void onApplicationEvent(MediaServerOnlineEvent event) { @@ -131,7 +131,7 @@ public class MediaServerServiceImpl implements IMediaServerService { /** * 流媒体节点离线 */ - @Async("taskExecutor") + @Async @EventListener @Transactional public void onApplicationEvent(MediaServerOfflineEvent event) { diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManager.java index f1eb7d5b9..fd0d7d664 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManager.java @@ -66,7 +66,7 @@ public class ZLMMediaServerStatusManager { private final String type = "zlm"; - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaServerChangeEvent event) { if (event.getMediaServerItemList() == null @@ -84,7 +84,7 @@ public class ZLMMediaServerStatusManager { } } - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(HookZlmServerStartEvent event) { if (event.getMediaServer() == null @@ -96,7 +96,7 @@ public class ZLMMediaServerStatusManager { online(event.getMediaServer(), event.getConfig()); } - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(HookZlmServerKeepaliveEvent event) { if (event.getMediaServerItem() == null) { @@ -110,7 +110,7 @@ public class ZLMMediaServerStatusManager { online(mediaServer, null); } - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaServerDeleteEvent event) { if (event.getMediaServer() == null) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/CloudRecordServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/CloudRecordServiceImpl.java index 4dc5df890..90c381564 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/CloudRecordServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/CloudRecordServiceImpl.java @@ -116,7 +116,7 @@ public class CloudRecordServiceImpl implements ICloudRecordService { return new ArrayList<>(resultSet); } - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaRecordMp4Event event) { CloudRecordItem cloudRecordItem = CloudRecordItem.getInstance(event); diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RecordPlanServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RecordPlanServiceImpl.java index 05c4d5fa2..503a7113a 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RecordPlanServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RecordPlanServiceImpl.java @@ -51,7 +51,7 @@ public class RecordPlanServiceImpl implements IRecordPlanService { /** * 流离开的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaDepartureEvent event) { // 流断开,检查是否还处于录像状态, 如果是则继续录像 diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java index 86099262d..f8b5b60d5 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java @@ -50,7 +50,7 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { /** * 流到来的处理 */ - @Async("taskExecutor") + @Async @org.springframework.context.event.EventListener public void onApplicationEvent(MediaArrivalEvent event) { @@ -59,7 +59,7 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { /** * 流离开的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaDepartureEvent event) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java index f8a78dca7..decd87a52 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisPushStreamResponseListener.java @@ -3,12 +3,9 @@ package com.genersoft.iot.vmp.service.redisMsg; import com.alibaba.fastjson2.JSON; import com.genersoft.iot.vmp.service.bean.MessageForPushChannelResponse; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; @@ -30,9 +27,6 @@ public class RedisPushStreamResponseListener implements MessageListener { private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - @Qualifier("taskExecutor") - @Autowired - private ThreadPoolTaskExecutor taskExecutor; private final Map responseEvents = new ConcurrentHashMap<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java index 854a43a5c..ee98d9c9f 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamProxy/service/impl/StreamProxyServiceImpl.java @@ -76,7 +76,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { /** * 流到来的处理 */ - @Async("taskExecutor") + @Async @Transactional @org.springframework.context.event.EventListener public void onApplicationEvent(MediaArrivalEvent event) { @@ -88,7 +88,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { /** * 流离开的处理 */ - @Async("taskExecutor") + @Async @EventListener @Transactional public void onApplicationEvent(MediaDepartureEvent event) { @@ -100,7 +100,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { /** * 流未找到的处理 */ - @Async("taskExecutor") + @Async @EventListener public void onApplicationEvent(MediaNotFoundEvent event) { if ("rtp".equals(event.getApp())) { @@ -118,7 +118,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { /** * 流媒体节点上线 */ - @Async("taskExecutor") + @Async @EventListener @Transactional public void onApplicationEvent(MediaServerOnlineEvent event) { @@ -128,7 +128,7 @@ public class StreamProxyServiceImpl implements IStreamProxyService { /** * 流媒体节点离线 */ - @Async("taskExecutor") + @Async @EventListener @Transactional public void onApplicationEvent(MediaServerOfflineEvent event) { diff --git a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java index 60c5555f8..4ea54800d 100755 --- a/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/streamPush/service/impl/StreamPushServiceImpl.java @@ -63,7 +63,7 @@ public class StreamPushServiceImpl implements IStreamPushService { /** * 流到来的处理 */ - @Async("taskExecutor") + @Async @EventListener @Transactional public void onApplicationEvent(MediaArrivalEvent event) { @@ -118,7 +118,7 @@ public class StreamPushServiceImpl implements IStreamPushService { /** * 流离开的处理 */ - @Async("taskExecutor") + @Async @EventListener @Transactional public void onApplicationEvent(MediaDepartureEvent event) { @@ -155,7 +155,7 @@ public class StreamPushServiceImpl implements IStreamPushService { /** * 流媒体节点上线 */ - @Async("taskExecutor") + @Async @EventListener @Transactional public void onApplicationEvent(MediaServerOnlineEvent event) { @@ -165,7 +165,7 @@ public class StreamPushServiceImpl implements IStreamPushService { /** * 流媒体节点离线 */ - @Async("taskExecutor") + @Async @EventListener @Transactional public void onApplicationEvent(MediaServerOfflineEvent event) { diff --git a/src/main/java/com/genersoft/iot/vmp/utils/IpPortUtil.java b/src/main/java/com/genersoft/iot/vmp/utils/IpPortUtil.java index 2ce0a5568..d0e3f433e 100644 --- a/src/main/java/com/genersoft/iot/vmp/utils/IpPortUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/utils/IpPortUtil.java @@ -42,49 +42,4 @@ public class IpPortUtil { throw new IllegalArgumentException("无效的IP地址: " + ip, e); } } - - // 测试用例 - public static void main(String[] args) { - // IPv4测试 - String ipv4 = "192.168.1.1"; - String port1 = "8080"; - System.out.println(concatenateIpAndPort(ipv4, port1)); // 输出: 192.168.1.1:8080 - - // IPv6测试 - String ipv6 = "2001:0db8:85a3:0000:0000:8a2e:0370:7334"; - String port2 = "80"; - System.out.println(concatenateIpAndPort(ipv6, port2)); // 输出: [2001:0db8:85a3:0000:0000:8a2e:0370:7334]:80 - - // 压缩格式IPv6测试 - String ipv6Compressed = "2001:db8::1"; - System.out.println(concatenateIpAndPort(ipv6Compressed, port2)); // 输出: [2001:db8::1]:80 - - // 无效IP测试 - try { - System.out.println(concatenateIpAndPort("invalid.ip", "1234")); - } catch (IllegalArgumentException e) { - System.out.println("捕获到预期异常: " + e.getMessage()); - } - - // 无效端口测试 - 非数字 - try { - System.out.println(concatenateIpAndPort(ipv4, "abc")); - } catch (IllegalArgumentException e) { - System.out.println("捕获到预期异常: " + e.getMessage()); - } - - // 无效端口测试 - 超出范围 - try { - System.out.println(concatenateIpAndPort(ipv4, "70000")); - } catch (IllegalArgumentException e) { - System.out.println("捕获到预期异常: " + e.getMessage()); - } - - // 空端口测试 - try { - System.out.println(concatenateIpAndPort(ipv4, "")); - } catch (IllegalArgumentException e) { - System.out.println("捕获到预期异常: " + e.getMessage()); - } - } -} \ No newline at end of file +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 67213e8a3..f6b7f7771 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,5 +1,8 @@ spring: application: name: wvp + threads: + virtual: + enabled: true profiles: active: 274-dev From 4b0cdd57182f3ec21b2269415b0a4b1ec8e213fd Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Wed, 14 Jan 2026 17:17:44 +0800 Subject: [PATCH 02/25] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=BF=83=E8=B7=B3?= =?UTF-8?q?=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91=EF=BC=8C=E5=8F=AF=E6=8E=A5?= =?UTF-8?q?=E5=85=A5=E8=AE=BE=E5=A4=87=E6=95=B0=E9=87=8F=E6=9B=B4=E5=A4=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/dao/DeviceMapper.java | 10 ++ .../vmp/gb28181/service/IDeviceService.java | 3 + .../service/impl/DeviceServiceImpl.java | 25 +++- .../NotifyRequestForCatalogProcessor.java | 2 + ...tifyRequestForMobilePositionProcessor.java | 2 + .../impl/RegisterRequestProcessor.java | 8 +- .../cmd/KeepaliveNotifyMessageHandler.java | 111 +++++++----------- 7 files changed, 85 insertions(+), 76 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMapper.java index ded02979f..a45723cc7 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMapper.java @@ -460,6 +460,16 @@ public interface DeviceMapper { ""}) void batchUpdate(List devices); + @Update({""}) + void batchUpdateForKeepalive(List devices); + @Select(value = {" "}) + void offlineByDeviceIds(List deviceList); + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/ChannelProvider.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/ChannelProvider.java index dd9ddaff2..8158f9e56 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/ChannelProvider.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/provider/ChannelProvider.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.gb28181.dao.provider; import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; +import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Group; import com.genersoft.iot.vmp.streamPush.bean.StreamPush; import com.genersoft.iot.vmp.web.custom.bean.CameraGroup; @@ -601,6 +602,28 @@ public class ChannelProvider { return sqlBuild.toString(); } + public String queryOnlineListsByGbDeviceIds(Map params ){ + StringBuilder sqlBuild = new StringBuilder(); + sqlBuild.append(BASE_SQL_TABLE_NAME); + sqlBuild.append(" where wdc.channel_type = 0 AND coalesce(wdc.gb_status, wdc.status) = 'ON' AND wdc.data_type = 1 "); + + List deviceList = (List)params.get("deviceList"); + if (deviceList != null && !deviceList.isEmpty()) { + sqlBuild.append(" AND data_device_id in ("); + boolean first = true; + for (Device device : deviceList) { + if (!first) { + sqlBuild.append(","); + } + sqlBuild.append("'" + device.getId() + "'"); + first = false; + } + sqlBuild.append(" )"); + } + + return sqlBuild.toString(); + } + public String queryListForSy(Map params ){ StringBuilder sqlBuild = new StringBuilder(); sqlBuild.append(BASE_SQL_FOR_CAMERA_DEVICE); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index cdcee7dd1..b54a403b4 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -118,9 +118,9 @@ public class EventPublisher { } - public void deviceOfflineEventPublish(String deviceId) { + public void deviceOfflineEventPublish(Set deviceIds) { DeviceOfflineEvent event = new DeviceOfflineEvent(this); - event.setDeviceId(deviceId); + event.setDeviceIds(deviceIds); applicationEventPublisher.publishEvent(event); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/device/DeviceOfflineEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/device/DeviceOfflineEvent.java index fcceafef5..e15aa43f3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/device/DeviceOfflineEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/device/DeviceOfflineEvent.java @@ -6,12 +6,13 @@ import lombok.Setter; import org.springframework.context.ApplicationEvent; import java.io.Serial; +import java.util.Set; @Getter @Setter public class DeviceOfflineEvent extends ApplicationEvent { - private String deviceId; + private Set deviceIds; @Serial private static final long serialVersionUID = 1L; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index b20b95ddd..7e8aec7e5 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -278,15 +278,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { @Async @EventListener public void onApplicationEvent(DeviceOfflineEvent event) { - log.info("[设备状态] 到期, 编号: {}", event.getDeviceId()); - Device device = getDeviceByDeviceId(event.getDeviceId()); - Boolean deviceStatus = getDeviceStatus(device); - if (deviceStatus != null && deviceStatus) { - log.info("[设备离线] 主动探测发现设备在线,暂不处理 device:{}", event.getDeviceId()); - online(device); - return; - } - offline(device); + log.info("[设备状态] 到期, 编号: {}", event.getDeviceIds().toString()); + List deviceList = redisCatchStorage.getDeviceList(event.getDeviceIds()); + offline(deviceList); } @Override @@ -392,17 +386,44 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, false); } if (isDevice(deviceId)) { - channelOfflineByDevice(device); + channelOfflineByDevice(List.of(device)); } } - private void channelOfflineByDevice(Device device) { + public void offline(List deviceList) { + if (deviceList == null || deviceList.isEmpty()) { + log.warn("[设备不存在]"); + return; + } + List realDeviceList = new ArrayList<>(); + for (Device device : deviceList) { + log.info("[设备离线] device:{}, 心跳间隔: {},心跳超时次数: {}, 上次心跳时间:{}, 上次注册时间: {}", device.getDeviceId(), + device.getHeartBeatInterval(), device.getHeartBeatCount(), device.getKeepaliveTime(), device.getRegisterTime()); + device.setOnLine(false); + cleanOfflineDevice(device); + if (isDevice(device.getDeviceId())) { + realDeviceList.add(device); + } + redisCatchStorage.updateDevice(device); + if (userSetting.getDeviceStatusNotify()) { + // 发送 redis 消息 + redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, false); + } + } + deviceMapper.offlineByList(deviceList); + + if (!realDeviceList.isEmpty()) { + channelOfflineByDevice(realDeviceList); + } + } + + private void channelOfflineByDevice(List deviceList) { // 进行通道离线 - List channelList = commonGBChannelMapper.queryOnlineListsByGbDeviceId(device.getId()); + List channelList = commonGBChannelMapper.queryOnlineListsByGbDeviceIds(deviceList); if (channelList.isEmpty()) { return; } - deviceChannelMapper.offlineByDeviceId(device.getId()); + deviceChannelMapper.offlineByDeviceIds(deviceList); // 发送通道离线通知 eventPublisher.channelEventPublish(channelList, ChannelEvent.ChannelEventMessageType.OFF); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusManager.java index 1434ec289..291bcd534 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusManager.java @@ -48,17 +48,15 @@ public class DeviceStatusManager { if (expiredIds != null && !expiredIds.isEmpty()) { redisTemplate.opsForZSet().remove(redisKey(), expiredIds.toArray()); - // 使用 JDK 21 虚拟线程异步分发事件 - for (String deviceId : expiredIds) { - Thread.startVirtualThread(() -> { - // 获取详情后删除缓存 + Thread.startVirtualThread(() -> { + // 获取详情后删除缓存 // Device device = redisCatchStorage.getDevice(deviceId); // redisCatchStorage.removeDevice(deviceId); - // 发送 Spring 异步事件 - eventPublisher.deviceOfflineEventPublish(deviceId); - }); - } + // 发送 Spring 异步事件 + eventPublisher.deviceOfflineEventPublish(expiredIds); + }); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java index e7bdffa32..e26fdce39 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/IRedisCatchStorage.java @@ -12,6 +12,7 @@ import com.genersoft.iot.vmp.service.bean.MessageForPushChannel; import java.util.List; import java.util.Map; +import java.util.Set; public interface IRedisCatchStorage { @@ -79,6 +80,11 @@ public interface IRedisCatchStorage { */ Device getDevice(String deviceId); + /** + * 获取Device + */ + List getDeviceList(Set deviceIds); + void resetAllCSEQ(); void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo); diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 40c741acf..a967af64a 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -184,6 +184,17 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { return device; } + @Override + public List getDeviceList(Set deviceIds) { + String key = VideoManagerConstants.DEVICE_PREFIX; + List deviceList = new ArrayList<>(); + List objectList = redisTemplate.opsForHash().multiGet(key, Arrays.asList(deviceIds.toArray())); + for (Object object : objectList) { + deviceList.add((Device)object); + } + return deviceList; + } + @Override public void updateGpsMsgInfo(GPSMsgInfo gpsMsgInfo) { String key = VideoManagerConstants.WVP_STREAM_GPS_MSG_PREFIX + userSetting.getServerId(); From b2772a0a1bdc946fed24fd0eab626ed16b64d250 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Sun, 25 Jan 2026 09:13:11 +0800 Subject: [PATCH 11/25] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=90=AF=E5=8A=A8?= =?UTF-8?q?=E6=97=B6=E5=AA=92=E4=BD=93=E4=BF=A1=E6=81=AF=E5=86=99=E5=85=A5?= =?UTF-8?q?=E5=BC=82=E5=B8=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/service/impl/DeviceServiceImpl.java | 3 +++ .../vmp/media/event/mediaServer/MediaServerChangeEvent.java | 6 ++---- .../iot/vmp/media/service/impl/MediaServerServiceImpl.java | 1 + .../iot/vmp/media/zlm/ZLMMediaServerStatusManager.java | 4 +++- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 7e8aec7e5..24323cb5d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -397,6 +397,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { } List realDeviceList = new ArrayList<>(); for (Device device : deviceList) { + if (device == null) { + continue; + } log.info("[设备离线] device:{}, 心跳间隔: {},心跳超时次数: {}, 上次心跳时间:{}, 上次注册时间: {}", device.getDeviceId(), device.getHeartBeatInterval(), device.getHeartBeatCount(), device.getKeepaliveTime(), device.getRegisterTime()); device.setOnLine(false); diff --git a/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerChangeEvent.java b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerChangeEvent.java index ecbe332b3..df257c774 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerChangeEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/media/event/mediaServer/MediaServerChangeEvent.java @@ -1,12 +1,14 @@ package com.genersoft.iot.vmp.media.event.mediaServer; import com.genersoft.iot.vmp.media.bean.MediaServer; +import lombok.Getter; import org.springframework.context.ApplicationEvent; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +@Getter public class MediaServerChangeEvent extends ApplicationEvent { public MediaServerChangeEvent(Object source) { @@ -15,10 +17,6 @@ public class MediaServerChangeEvent extends ApplicationEvent { private List mediaServerItemList; - public List getMediaServerItemList() { - return mediaServerItemList; - } - public void setMediaServerItemList(List mediaServerItemList) { this.mediaServerItemList = mediaServerItemList; } diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 7b6e0aa7d..d3c1c3b64 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -33,6 +33,7 @@ import com.genersoft.iot.vmp.utils.redis.RedisUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.vmanager.bean.WVPResult; import jakarta.validation.constraints.NotNull; +import lombok.Synchronized; import lombok.extern.slf4j.Slf4j; import okhttp3.OkHttpClient; import okhttp3.Request; diff --git a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManager.java b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManager.java index fd0d7d664..b0693c89d 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManager.java +++ b/src/main/java/com/genersoft/iot/vmp/media/zlm/ZLMMediaServerStatusManager.java @@ -36,6 +36,7 @@ import java.util.concurrent.ConcurrentHashMap; @Component public class ZLMMediaServerStatusManager { + private final Map offlineZlmPrimaryMap = new ConcurrentHashMap<>(); private final Map offlineZlmsecondaryMap = new ConcurrentHashMap<>(); private final Map offlineZlmTimeMap = new ConcurrentHashMap<>(); @@ -80,7 +81,6 @@ public class ZLMMediaServerStatusManager { log.info("[ZLM-添加待上线节点] ID:{}", mediaServerItem.getId()); offlineZlmPrimaryMap.put(mediaServerItem.getId(), mediaServerItem); offlineZlmTimeMap.put(mediaServerItem.getId(), System.currentTimeMillis()); - execute(); } } @@ -183,6 +183,8 @@ public class ZLMMediaServerStatusManager { MediaServer mediaServerInDb = mediaServerService.getOne(mediaServer.getId()); if (mediaServerInDb == null || !mediaServerInDb.isStatus()) { log.info("[ZLM-连接成功] ID:{}, 地址: {}:{}", mediaServer.getId(), mediaServer.getIp(), mediaServer.getHttpPort()); + offlineZlmPrimaryMap.remove(mediaServer.getId()); + offlineZlmsecondaryMap.remove(mediaServer.getId()); if (config == null) { ZLMResult> mediaServerConfig = zlmresTfulUtils.getMediaServerConfig(mediaServer); List data = mediaServerConfig.getData(); From 2c774ae1552be508a23e7ff2a1c68928f268619f Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Sun, 25 Jan 2026 20:22:31 +0800 Subject: [PATCH 12/25] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=BF=83=E8=B7=B3?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E7=9A=84=E5=86=99=E5=85=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/genersoft/iot/vmp/gb28181/bean/Device.java | 14 ++++++++++++++ .../iot/vmp/gb28181/dao/DeviceMapper.java | 2 +- .../notify/cmd/KeepaliveNotifyMessageHandler.java | 8 ++++---- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java index 83a302ecc..75d52a624 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/Device.java @@ -216,4 +216,18 @@ public class Device { public boolean checkWgs84() { return geoCoordSys.equalsIgnoreCase("WGS84"); } + + public Integer getHeartBeatCount() { + if (heartBeatCount == null) { + return 3; + } + return heartBeatCount; + } + + public Integer getHeartBeatInterval() { + if (heartBeatCount == null) { + return 60; + } + return heartBeatInterval; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMapper.java index a45723cc7..a92028b7b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMapper.java @@ -465,7 +465,7 @@ public interface DeviceMapper { " UPDATE" + " wvp_device" + " SET keepalive_time=#{item.keepaliveTime}" + - " WHERE device_id=#{item.deviceId}"+ + " WHERE id=#{item.id}"+ "" + ""}) void batchUpdateForKeepalive(List devices); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index 0cd348f90..7d746d87e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -26,6 +26,7 @@ import javax.sip.RequestEvent; import javax.sip.SipException; import javax.sip.message.Response; import java.text.ParseException; +import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -49,7 +50,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp private IDeviceService deviceService; @Autowired - private DeviceStatusManager statusTaskRunner; + private DeviceStatusManager deviceStatusManager; @Autowired private UserSetting userSetting; @@ -83,7 +84,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp if (device.isOnLine()) { taskQueue.add(device); long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L; - statusTaskRunner.add(device.getDeviceId(), expiresTime + System.currentTimeMillis()); + deviceStatusManager.add(device.getDeviceId(), expiresTime + System.currentTimeMillis()); } else { if (userSetting.getGbDeviceOnline() == 1) { // 对于已经离线的设备判断他的注册是否已经过期 @@ -91,8 +92,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp } } } - @Scheduled(fixedDelay = 1000, timeUnit = TimeUnit.MILLISECONDS) - @Async + @Scheduled(fixedDelay = 1, timeUnit = TimeUnit.SECONDS) public void executeUpdateDeviceList() { if (!taskQueue.isEmpty()) { deviceService.updateDeviceListForKeepalive(taskQueue.stream().toList()); From 31549bce0966d5c19f7b63201c4ec8a9d2c632e6 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Sun, 25 Jan 2026 21:59:42 +0800 Subject: [PATCH 13/25] =?UTF-8?q?=E5=8E=BB=E9=99=A4=E5=BF=83=E8=B7=B3?= =?UTF-8?q?=E4=BF=A1=E6=81=AF=E5=85=A5=E5=BA=93=EF=BC=8C=E6=8F=90=E5=8D=87?= =?UTF-8?q?=E6=80=A7=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/common/VideoManagerConstants.java | 2 ++ .../iot/vmp/gb28181/dao/DeviceMapper.java | 33 ------------------- .../vmp/gb28181/service/IDeviceService.java | 3 -- .../service/impl/DeviceServiceImpl.java | 31 ++--------------- .../cmd/KeepaliveNotifyMessageHandler.java | 4 +-- .../iot/vmp/storager/IRedisCatchStorage.java | 3 ++ .../storager/impl/RedisCatchStorageImpl.java | 25 ++++++++++++++ web/src/views/device/list.vue | 2 -- 数据库/2.7.4/初始化-mysql-2.7.4.sql | 2 -- .../2.7.4/初始化-postgresql-kingbase-2.7.4.sql | 4 --- 数据库/2.7.4/更新-mysql-2.7.4.sql | 21 ++++++++++++ .../2.7.4/更新-postgresql-kingbase-2.7.4.sql | 3 ++ 12 files changed, 57 insertions(+), 76 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java index e89695aff..2cbe4a745 100644 --- a/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java +++ b/src/main/java/com/genersoft/iot/vmp/common/VideoManagerConstants.java @@ -19,6 +19,8 @@ public class VideoManagerConstants { public static final String ONLINE_MEDIA_SERVERS_PREFIX = "VMP_ONLINE_MEDIA_SERVERS:"; public static final String DEVICE_PREFIX = "VMP_DEVICE_INFO"; + public static final String DEVICE_KEEPALIVE_PREFIX = "DEVICE_KEEPALIVE:"; + public static final String DEVICE_REGISTER_PREFIX = "DEVICE_REGISTER:"; public static final String INVITE_PREFIX = "VMP_GB_INVITE_INFO"; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMapper.java index a92028b7b..a14528352 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMapper.java @@ -30,8 +30,6 @@ public interface DeviceMapper { "port," + "host_address," + "expires," + - "register_time," + - "keepalive_time," + "create_time," + "update_time," + "charset," + @@ -65,8 +63,6 @@ public interface DeviceMapper { "port," + "host_address," + "expires," + - "register_time," + - "keepalive_time," + "heart_beat_interval," + "heart_beat_count," + "position_capability," + @@ -98,8 +94,6 @@ public interface DeviceMapper { "#{port}," + "#{hostAddress}," + "#{expires}," + - "#{registerTime}," + - "#{keepaliveTime}," + "#{heartBeatInterval}," + "#{heartBeatCount}," + "#{positionCapability}," + @@ -133,8 +127,6 @@ public interface DeviceMapper { ", port=#{port}" + ", host_address=#{hostAddress}" + ", on_line=#{onLine}" + - ", register_time=#{registerTime}" + - ", keepalive_time=#{keepaliveTime}" + ", heart_beat_interval=#{heartBeatInterval}" + ", position_capability=#{positionCapability}" + ", heart_beat_count=#{heartBeatCount}" + @@ -166,8 +158,6 @@ public interface DeviceMapper { "port,"+ "host_address,"+ "expires,"+ - "register_time,"+ - "keepalive_time,"+ "create_time,"+ "update_time,"+ "charset,"+ @@ -208,8 +198,6 @@ public interface DeviceMapper { "port,"+ "host_address,"+ "expires,"+ - "register_time,"+ - "keepalive_time,"+ "create_time,"+ "update_time,"+ "charset,"+ @@ -242,8 +230,6 @@ public interface DeviceMapper { "port,"+ "host_address,"+ "expires,"+ - "register_time,"+ - "keepalive_time,"+ "create_time,"+ "update_time,"+ "charset,"+ @@ -277,8 +263,6 @@ public interface DeviceMapper { "port,"+ "host_address,"+ "expires,"+ - "register_time,"+ - "keepalive_time,"+ "create_time,"+ "update_time,"+ "charset,"+ @@ -356,8 +340,6 @@ public interface DeviceMapper { ",transport" + ",stream_mode" + ",on_line" + - ",register_time" + - ",keepalive_time" + ",ip" + ",create_time" + ",update_time" + @@ -444,8 +426,6 @@ public interface DeviceMapper { ", port=#{item.port}" + ", host_address=#{item.hostAddress}" + ", on_line=#{item.onLine}" + - ", register_time=#{item.registerTime}" + - ", keepalive_time=#{item.keepaliveTime}" + ", heart_beat_interval=#{item.heartBeatInterval}" + ", position_capability=#{item.positionCapability}" + ", heart_beat_count=#{item.heartBeatCount}" + @@ -460,17 +440,6 @@ public interface DeviceMapper { ""}) void batchUpdate(List devices); - @Update({""}) - void batchUpdateForKeepalive(List devices); - - @Select(value = {" From 4067dcf8d17f674b30c423a713b63ad8ed11ba73 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Tue, 27 Jan 2026 11:16:04 +0800 Subject: [PATCH 15/25] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=BF=83=E8=B7=B3?= =?UTF-8?q?=E5=8E=86=E5=8F=B2=E8=AE=B0=E5=BD=95=E4=B8=8D=E5=85=A8=E7=9A=84?= =?UTF-8?q?BUG?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index 1493bd9bd..25fb845f1 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -80,8 +80,8 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp device.setLocalIp(request.getLocalAddress().getHostAddress()); } device.setKeepaliveTimeStamp(System.currentTimeMillis()); + taskQueue.add(device); if (device.isOnLine()) { - taskQueue.add(device); long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L; deviceStatusManager.add(device.getDeviceId(), expiresTime + System.currentTimeMillis()); } else { From 27b06a84d77d49d923032bb1872e6e0a2e6f2bdf Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Tue, 27 Jan 2026 12:31:57 +0800 Subject: [PATCH 16/25] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=BF=83=E8=B7=B3?= =?UTF-8?q?=E5=92=8C=E6=B3=A8=E5=86=8C=E6=97=B6=E9=97=B4=E8=AE=B0=E5=BD=95?= =?UTF-8?q?=E7=9A=84=E5=8F=96=E5=80=BC=E5=92=8C=E5=B1=95=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/storager/impl/RedisCatchStorageImpl.java | 8 ++++---- web/src/views/device/timeStatistics.vue | 1 + 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 0e100c529..8e278dd8c 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -563,7 +563,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { for (Device device : deviceList) { operations.opsForList().rightPush(VideoManagerConstants.DEVICE_KEEPALIVE_PREFIX + device.getDeviceId(), device.getKeepaliveTimeStamp()); // 2. 截取列表,只保留最新 100 条 - operations.opsForList().trim((VideoManagerConstants.DEVICE_KEEPALIVE_PREFIX + device.getDeviceId()), -1000, -1); + operations.opsForList().trim((VideoManagerConstants.DEVICE_KEEPALIVE_PREFIX + device.getDeviceId()), -100, -1); } return true; } @@ -580,7 +580,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { if (count == null) { count = 20; } - return longRedisTemplate.opsForList().range(VideoManagerConstants.DEVICE_KEEPALIVE_PREFIX + deviceId, 0, count + 1); + return longRedisTemplate.opsForList().range(VideoManagerConstants.DEVICE_KEEPALIVE_PREFIX + deviceId, -count - 1, -1); } @@ -599,7 +599,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { for (Device device : deviceList) { operations.opsForList().rightPush(VideoManagerConstants.DEVICE_REGISTER_PREFIX + device.getDeviceId(), device.getRegisterTimeStamp()); // 2. 截取列表,只保留最新 100 条 - operations.opsForList().trim((VideoManagerConstants.DEVICE_REGISTER_PREFIX + device.getDeviceId()), -1000, -1); + operations.opsForList().trim((VideoManagerConstants.DEVICE_REGISTER_PREFIX + device.getDeviceId()), -100, -1); } return true; } @@ -615,6 +615,6 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { if (count == null) { count = 20; } - return longRedisTemplate.opsForList().range(VideoManagerConstants.DEVICE_REGISTER_PREFIX + deviceId, 0, count + 1); + return longRedisTemplate.opsForList().range(VideoManagerConstants.DEVICE_REGISTER_PREFIX + deviceId, -count - 1, -1); } } diff --git a/web/src/views/device/timeStatistics.vue b/web/src/views/device/timeStatistics.vue index 8d2c1d863..9495de311 100644 --- a/web/src/views/device/timeStatistics.vue +++ b/web/src/views/device/timeStatistics.vue @@ -30,6 +30,7 @@ v-if="viewMode === 'table'" :data="list" border + stripe size="mini" height="400px" style="width: 100%;" From 2bdced8b9cf552852fa7a881ef3b98273b989fe9 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Tue, 27 Jan 2026 14:21:41 +0800 Subject: [PATCH 17/25] =?UTF-8?q?=E4=B8=BA=E5=BF=83=E8=B7=B3=E5=8E=86?= =?UTF-8?q?=E5=8F=B2=E5=92=8C=E6=B3=A8=E5=86=8C=E5=8E=86=E5=8F=B2=E8=AE=BE?= =?UTF-8?q?=E7=BD=AE=E8=BF=87=E6=9C=9F=E6=97=B6=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../storager/impl/RedisCatchStorageImpl.java | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 8e278dd8c..29c730428 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -561,9 +561,14 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { public Boolean execute(@NonNull RedisOperations operations) { // 1. 批量添加心跳数据到列表尾部 for (Device device : deviceList) { - operations.opsForList().rightPush(VideoManagerConstants.DEVICE_KEEPALIVE_PREFIX + device.getDeviceId(), device.getKeepaliveTimeStamp()); + Duration duration = Duration.ofHours(1); + String key = VideoManagerConstants.DEVICE_KEEPALIVE_PREFIX + device.getDeviceId(); + operations.opsForList().rightPush(key, device.getKeepaliveTimeStamp()); // 2. 截取列表,只保留最新 100 条 - operations.opsForList().trim((VideoManagerConstants.DEVICE_KEEPALIVE_PREFIX + device.getDeviceId()), -100, -1); + operations.opsForList().trim(key, -100, -1); + + // 为整个列表 Key 设置过期时间(核心:覆盖式设置,每次更新心跳都重置过期时间) + operations.expire(key, duration); } return true; } @@ -597,9 +602,15 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { public Boolean execute(@NonNull RedisOperations operations) { // 1. 批量添加心跳数据到列表尾部 for (Device device : deviceList) { - operations.opsForList().rightPush(VideoManagerConstants.DEVICE_REGISTER_PREFIX + device.getDeviceId(), device.getRegisterTimeStamp()); + Duration duration = Duration.ofHours(3); + String key = VideoManagerConstants.DEVICE_REGISTER_PREFIX + device.getDeviceId(); + operations.opsForList().rightPush(key, device.getRegisterTimeStamp()); // 2. 截取列表,只保留最新 100 条 - operations.opsForList().trim((VideoManagerConstants.DEVICE_REGISTER_PREFIX + device.getDeviceId()), -100, -1); + operations.opsForList().trim(key, -100, -1); + + // 为整个列表 Key 设置过期时间(核心:覆盖式设置,每次更新心跳都重置过期时间) + operations.expire(key, duration); + } return true; } From 7f2db96ac1d2c5cfbdae077806d17999bbd333c2 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Tue, 27 Jan 2026 17:04:07 +0800 Subject: [PATCH 18/25] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=9B=BD=E6=A0=87?= =?UTF-8?q?=E8=AE=BE=E5=A4=87=E6=95=B0=E6=8D=AE=E5=BA=93=E7=8A=B6=E6=80=81?= =?UTF-8?q?=E6=A3=80=E6=9F=A5=E5=8A=9F=E8=83=BD=EF=BC=8C=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E8=AE=BE=E5=A4=87=E7=8A=B6=E6=80=81=E5=A4=84=E7=90=86=E5=92=8C?= =?UTF-8?q?=E7=A7=BB=E5=8A=A8=E4=BD=8D=E7=BD=AE=E8=AE=A2=E9=98=85=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../genersoft/iot/vmp/conf/UserSetting.java | 7 +++++ .../service/impl/DeviceServiceImpl.java | 26 +++++++++++++++++++ .../deviceStatus/DeviceStatusManager.java | 3 --- ...tifyRequestForMobilePositionProcessor.java | 2 +- .../cmd/CatalogResponseMessageHandler.java | 2 +- web/src/views/device/list.vue | 2 +- 6 files changed, 36 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java index 77beba74a..c4023cfc4 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/UserSetting.java @@ -219,4 +219,11 @@ public class UserSetting { */ private boolean useAliasForGroupSync = false; + /** + * 对于识别为设备的国标设备的,是否默认开启位置订阅 + */ + private boolean subscribeMobilePosition = false; + + + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index c94d0ceb0..c703aec61 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -150,6 +150,15 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { // 重置 cseq 计数 redisCatchStorage.resetAllCSEQ(); + // 处理设备状态 + dbStatusCheck(); + } + + /** + * 数据库状态检查, 每6小时检查一次 + */ + @Scheduled(fixedDelay = 6, initialDelay = 6, timeUnit = TimeUnit.HOURS) + public void dbStatusCheck(){ // 处理设备状态 Set allDeviceIds = deviceStatusManager.getAll(); if (!allDeviceIds.isEmpty()) { @@ -210,6 +219,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { } } } + } private void offlineByIds(List offlineDevices) { @@ -318,6 +328,14 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { } catch (InvalidArgumentException | SipException | ParseException e) { log.error("[命令发送失败] 查询设备信息: {}", e.getMessage()); } + // 上线添加订阅 + if (userSetting.isSubscribeMobilePosition() && isDevice(device.getDeviceId())) { + // 开启订阅 + device.setSubscribeCycleForMobilePosition(60); + device.setMobilePositionSubmissionInterval(5); + addMobilePositionSubscribe(device, null); + } + sync(device); }else { device.setServerId(userSetting.getServerId()); @@ -346,6 +364,13 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { } if (device.getSubscribeCycleForMobilePosition() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) { addMobilePositionSubscribe(device, null); + }else{ + if (userSetting.isSubscribeMobilePosition() && isDevice(device.getDeviceId())) { + // 开启订阅 + device.setSubscribeCycleForMobilePosition(60); + device.setMobilePositionSubmissionInterval(5); + addMobilePositionSubscribe(device, null); + } } if (userSetting.getDeviceStatusNotify()) { @@ -361,6 +386,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { sync(device); } } + // 设备状态任务添加 long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L; deviceStatusManager.add(device.getDeviceId(), expiresTime + System.currentTimeMillis()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusManager.java index 291bcd534..7d74f14a0 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusManager.java @@ -1,7 +1,6 @@ package com.genersoft.iot.vmp.gb28181.task.deviceStatus; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import lombok.extern.slf4j.Slf4j; @@ -13,7 +12,6 @@ import org.springframework.stereotype.Component; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.lang.Thread; @Slf4j @Component @@ -56,7 +54,6 @@ public class DeviceStatusManager { // 发送 Spring 异步事件 eventPublisher.deviceOfflineEventPublish(expiredIds); }); - } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 24dc3e56f..d27434434 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -159,7 +159,7 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor continue; } - log.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelId(), + log.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelDeviceId(), mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); mobilePosition.setReportSource("Mobile Position"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 75ce45dfd..d616e7da2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -174,7 +174,7 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp } else { catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, null); } - }).start(); + }); } } } diff --git a/web/src/views/device/list.vue b/web/src/views/device/list.vue index ac27d53b1..9a890aaf1 100755 --- a/web/src/views/device/list.vue +++ b/web/src/views/device/list.vue @@ -467,7 +467,7 @@ export default { }) }, getKeepaliveTimeStatistics: function(deviceId) { - this.$refs.timeStatistics.openDialog('心跳时间统计', 'device/getKeepaliveTimeStatistics', deviceId, 10) + this.$refs.timeStatistics.openDialog('心跳时间统计', 'device/getKeepaliveTimeStatistics', deviceId, 60) }, getRegisterTimeStatistics: function(deviceId) { this.$refs.timeStatistics.openDialog('注册时间统计', 'device/getRegisterTimeStatistics', deviceId, 10) From 7dea67b4723ff377c7132c00a270d0986ac479e3 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Tue, 27 Jan 2026 21:08:26 +0800 Subject: [PATCH 19/25] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=BF=83=E8=B7=B3?= =?UTF-8?q?=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91=EF=BC=8C=E6=96=B0=E5=A2=9E?= =?UTF-8?q?commons-pool2=E4=BE=9D=E8=B5=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 5 +++++ .../message/notify/cmd/KeepaliveNotifyMessageHandler.java | 3 ++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index b2da8a723..8b072c6e5 100644 --- a/pom.xml +++ b/pom.xml @@ -406,6 +406,11 @@ 1.18.2 + + org.apache.commons + commons-pool2 + + org.springframework.boot spring-boot-starter-test diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index 25fb845f1..7e2b0794b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -69,6 +69,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp } catch (SipException | InvalidArgumentException | ParseException e) { log.error("[命令发送失败] 心跳回复: {}", e.getMessage()); } + taskQueue.add(device); SIPRequest request = (SIPRequest) evt.getRequest(); RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, userSetting.getSipUseSourceIpAsRemoteAddress()); @@ -80,7 +81,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp device.setLocalIp(request.getLocalAddress().getHostAddress()); } device.setKeepaliveTimeStamp(System.currentTimeMillis()); - taskQueue.add(device); + if (device.isOnLine()) { long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L; deviceStatusManager.add(device.getDeviceId(), expiresTime + System.currentTimeMillis()); From 83057d81d8b28b337b0b8bdf0f34aaaed075847a Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Tue, 27 Jan 2026 22:26:46 +0800 Subject: [PATCH 20/25] =?UTF-8?q?=E5=BF=83=E8=B7=B3=E5=8E=86=E5=8F=B2?= =?UTF-8?q?=E5=B1=95=E7=A4=BA=E4=B8=BA=E4=BB=8E=E6=96=B0=E5=88=B0=E6=97=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- web/src/views/device/timeStatistics.vue | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/web/src/views/device/timeStatistics.vue b/web/src/views/device/timeStatistics.vue index 9495de311..de110ee02 100644 --- a/web/src/views/device/timeStatistics.vue +++ b/web/src/views/device/timeStatistics.vue @@ -28,7 +28,7 @@ Date: Mon, 30 Mar 2026 11:24:23 +0800 Subject: [PATCH 21/25] =?UTF-8?q?=E7=AE=80=E5=8C=96=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E5=A4=84=E7=90=86=E7=9A=84=E5=BF=83=E8=B7=B3=E5=9B=9E=E5=A4=8D?= =?UTF-8?q?=E6=96=B9=E6=B3=95=E6=B3=A8=E8=A7=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../transmit/event/request/SIPRequestProcessorParent.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java index e15c70bf5..445662ec6 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/SIPRequestProcessorParent.java @@ -78,7 +78,7 @@ public abstract class SIPRequestProcessorParent { return responseAck(sipRequest, statusCode, null); } - @Async("taskExecutor") + @Async public void responseAckAsync(SIPRequest sipRequest, int statusCode) throws SipException, InvalidArgumentException, ParseException { responseAck(sipRequest, statusCode, null); } From 2b090bbf23137e18a58ee0847bd7342ec1d5697c Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Mon, 30 Mar 2026 11:34:44 +0800 Subject: [PATCH 22/25] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=8A=A5=E8=AD=A6?= =?UTF-8?q?=E5=92=8C=E5=BF=83=E8=B7=B3=E9=80=9A=E7=9F=A5=E7=9A=84=E5=9B=9E?= =?UTF-8?q?=E5=A4=8D=E6=96=B9=E5=BC=8F=EF=BC=8C=E6=94=B9=E4=B8=BA=E5=BC=82?= =?UTF-8?q?=E6=AD=A5=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../event/request/impl/NotifyRequestProcessor.java | 3 +-- .../notify/cmd/AlarmNotifyMessageHandler.java | 13 +++++++------ .../notify/cmd/KeepaliveNotifyMessageHandler.java | 5 +++-- .../cmd/MobilePositionNotifyMessageHandler.java | 4 +++- 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java index 15184e2f9..f0d6e4190 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestProcessor.java @@ -66,11 +66,10 @@ public class NotifyRequestProcessor extends SIPRequestProcessorParent implements @Override public void process(RequestEvent evt) { try { - responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); + responseAckAsync((SIPRequest) evt.getRequest(), Response.OK); Element rootElement = getRootElement(evt); if (rootElement == null) { log.error("处理NOTIFY消息时未获取到消息体,{}", evt.getRequest()); - responseAck((SIPRequest) evt.getRequest(), Response.OK, null, null); return; } String cmd = XmlUtil.getText(rootElement, "CmdType"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java index 5cb5ac9fa..a0c24645c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java @@ -78,6 +78,12 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme log.error("[Alarm] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue()); return; } + // 回复200 OK + try { + responseAckAsync((SIPRequest) evt.getRequest(), Response.OK); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] 报警通知回复: {}", e.getMessage()); + } taskQueue.offer(new SipMsgInfo(evt, device, rootElement)); } @@ -102,12 +108,7 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme continue; } RequestEvent evt = sipMsgInfo.getEvt(); - // 回复200 OK - try { - responseAck((SIPRequest) evt.getRequest(), Response.OK); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 报警通知回复: {}", e.getMessage()); - } + try { Device device = sipMsgInfo.getDevice(); Element deviceIdElement = sipMsgInfo.getRootElement().element("DeviceID"); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index be3986b74..ec4cabc65 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -103,10 +103,11 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp @Override public void handForPlatform(RequestEvent evt, Platform parentPlatform, Element element) { // 个别平台保活不回复200OK会判定离线 + // 回复200 OK try { - responseAck((SIPRequest) evt.getRequest(), Response.OK); + responseAckAsync((SIPRequest) evt.getRequest(), Response.OK); } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 心跳回复: {}", e.getMessage()); + log.error("[命令发送失败] 报警通知回复: {}", e.getMessage()); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java index cc26f4adb..dcf86004d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java @@ -9,6 +9,7 @@ import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.utils.DateUtil; import gov.nist.javax.sip.message.SIPRequest; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.dom4j.DocumentException; import org.dom4j.Element; @@ -32,6 +33,7 @@ import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; */ @Slf4j @Component +@RequiredArgsConstructor public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { private final String cmdType = "MobilePosition"; @@ -59,7 +61,7 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen taskQueue.offer(new SipMsgInfo(evt, device, rootElement)); // 回复200 OK try { - responseAck((SIPRequest) evt.getRequest(), Response.OK); + responseAckAsync((SIPRequest) evt.getRequest(), Response.OK); } catch (SipException | InvalidArgumentException | ParseException e) { log.error("[命令发送失败] 移动位置通知回复: {}", e.getMessage()); } From 0dc297bf02b08e5ec6deb39b28f71f88baf47672 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Mon, 30 Mar 2026 14:28:52 +0800 Subject: [PATCH 23/25] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=97=B6=E9=97=B4?= =?UTF-8?q?=E7=BB=9F=E8=AE=A1=E5=88=97=E8=A1=A8=E5=A4=84=E7=90=86=E9=80=BB?= =?UTF-8?q?=E8=BE=91=E5=B9=B6=E5=A2=9E=E5=BC=BA=E5=BF=83=E8=B7=B3=E8=AE=B0?= =?UTF-8?q?=E5=BD=95=E6=9B=B4=E6=96=B0=E7=9A=84=E5=BC=82=E5=B8=B8=E5=A4=84?= =?UTF-8?q?=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../vmp/gb28181/service/impl/DeviceServiceImpl.java | 4 +++- .../notify/cmd/KeepaliveNotifyMessageHandler.java | 10 +++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index c9d3969b6..0bc2c8348 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -1307,7 +1307,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { } timeStatisticsList.add(timeStatistics); } - if (timeStatisticsList.size() > count) { + // 第一个数据由于没有上一个时间戳,无法计算时间差,去掉 + timeStatisticsList.removeFirst(); + if (timeStatisticsList.size() - 1 > count) { timeStatisticsList = timeStatisticsList.subList(timeStatisticsList.size() - count, timeStatisticsList.size()); } return timeStatisticsList; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index ec4cabc65..97e7a4ccf 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -94,9 +94,13 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp } @Scheduled(fixedDelay = 10, timeUnit = TimeUnit.SECONDS) public void executeUpdateDeviceList() { - if (!taskQueue.isEmpty()) { - redisCatchStorage.updateDeviceKeepaliveTimeStamp(taskQueue.stream().toList()); - taskQueue.clear(); + try { + if (!taskQueue.isEmpty()) { + redisCatchStorage.updateDeviceKeepaliveTimeStamp(taskQueue.stream().toList()); + taskQueue.clear(); + } + } catch (Exception e) { + log.error("[定时任务] 更新心跳记录 执行异常", e); } } From ede979dbf9a7872b0709ff2bdbac2e11167a3b71 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Mon, 30 Mar 2026 17:00:11 +0800 Subject: [PATCH 24/25] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=AE=9A=E6=97=B6?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E9=85=8D=E7=BD=AE=E7=B1=BB=E5=B9=B6=E8=AE=B0?= =?UTF-8?q?=E5=BD=95=E5=BF=83=E8=B7=B3=E6=9B=B4=E6=96=B0=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/conf/SchedulingConfig.java | 20 +++++++++++++++++++ .../cmd/KeepaliveNotifyMessageHandler.java | 1 + 2 files changed, 21 insertions(+) create mode 100644 src/main/java/com/genersoft/iot/vmp/conf/SchedulingConfig.java diff --git a/src/main/java/com/genersoft/iot/vmp/conf/SchedulingConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/SchedulingConfig.java new file mode 100644 index 000000000..6896ca63a --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/conf/SchedulingConfig.java @@ -0,0 +1,20 @@ +package com.genersoft.iot.vmp.conf; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.TaskScheduler; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; + +@Configuration +public class SchedulingConfig { + + @Bean + public TaskScheduler taskScheduler() { + ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); + scheduler.setPoolSize(5); + scheduler.setThreadNamePrefix("scheduled-"); + scheduler.setVirtualThreads(true); // 必须在 initialize() 之前 + scheduler.initialize(); + return scheduler; + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index 97e7a4ccf..a801dc08c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -94,6 +94,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp } @Scheduled(fixedDelay = 10, timeUnit = TimeUnit.SECONDS) public void executeUpdateDeviceList() { + log.info("[定时任务] 更新心跳记录,待处理设备数量: {}", taskQueue.size()); try { if (!taskQueue.isEmpty()) { redisCatchStorage.updateDeviceKeepaliveTimeStamp(taskQueue.stream().toList()); From 1a83d5a504b53cb508a685469887a53092dbfa4f Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Mon, 30 Mar 2026 17:01:05 +0800 Subject: [PATCH 25/25] =?UTF-8?q?=E5=B0=86=E5=BF=83=E8=B7=B3=E8=AE=B0?= =?UTF-8?q?=E5=BD=95=E6=9B=B4=E6=96=B0=E6=97=A5=E5=BF=97=E7=9A=84=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E7=BA=A7=E5=88=AB=E4=BB=8Einfo=E8=B0=83=E6=95=B4?= =?UTF-8?q?=E4=B8=BAdebug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index a801dc08c..e3c4feb55 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -94,7 +94,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp } @Scheduled(fixedDelay = 10, timeUnit = TimeUnit.SECONDS) public void executeUpdateDeviceList() { - log.info("[定时任务] 更新心跳记录,待处理设备数量: {}", taskQueue.size()); + log.debug("[定时任务] 更新心跳记录,待处理设备数量: {}", taskQueue.size()); try { if (!taskQueue.isEmpty()) { redisCatchStorage.updateDeviceKeepaliveTimeStamp(taskQueue.stream().toList());