From 8e9e75997aeade2d3c286c28ab1087e3385c9306 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Mon, 19 Jan 2026 21:47:15 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=B3=A8=E5=86=8C=E5=8F=82?= =?UTF-8?q?=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 1 + .../genersoft/iot/vmp/VManageBootstrap.java | 2 + .../vmp/gb28181/service/IDeviceService.java | 5 +- .../service/impl/DeviceServiceImpl.java | 63 ++++++++----------- .../gb28181/session/CatalogDataManager.java | 3 +- ...skRunner.java => DeviceStatusManager.java} | 19 +++--- .../impl/RegisterRequestProcessor.java | 16 +++-- .../cmd/KeepaliveNotifyMessageHandler.java | 10 +-- .../cmd/CatalogResponseMessageHandler.java | 1 + 9 files changed, 59 insertions(+), 61 deletions(-) rename src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/{DeviceStatusTaskRunner.java => DeviceStatusManager.java} (82%) diff --git a/pom.xml b/pom.xml index 5f7d0cbf7..b2da8a723 100644 --- a/pom.xml +++ b/pom.xml @@ -60,6 +60,7 @@ ${project.build.directory}/asciidoc/html ${project.build.directory}/asciidoc/pdf + 21 21 21 diff --git a/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java b/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java index a1d3a527d..f99070f4e 100644 --- a/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java +++ b/src/main/java/com/genersoft/iot/vmp/VManageBootstrap.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.jt1078.util.ClassUtil; import com.genersoft.iot.vmp.utils.GitUtil; import com.genersoft.iot.vmp.utils.SpringBeanFactory; @@ -43,6 +44,7 @@ public class VManageBootstrap extends SpringBootServletInitializer { log.info("构建时间: {}", gitUtil.getBuildDate()); log.info("GIT信息: 分支: {}, ID: {}, 时间: {}", gitUtil.getBranch(), gitUtil.getCommitIdShort(), gitUtil.getCommitTime()); } + } // 项目重启 public static void restart() { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java index 296651698..0208451e6 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceService.java @@ -20,13 +20,12 @@ public interface IDeviceService { * 设备上线 * @param device 设备信息 */ - void online(Device device, SipTransactionInfo sipTransactionInfo); + void online(Device device); /** * 设备下线 - * @param deviceId 设备编号 */ - void offline(String deviceId, String reason, boolean check); + void offline(Device device); /** * 添加目录订阅 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 162342e60..b20b95ddd 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -17,7 +17,7 @@ import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; -import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskRunner; +import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusManager; import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask; import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskInfo; import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskRunner; @@ -119,7 +119,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { private SubscribeTaskRunner subscribeTaskRunner; @Autowired - private DeviceStatusTaskRunner deviceStatusTaskRunner; + private DeviceStatusManager deviceStatusManager; private Device getDeviceByDeviceIdFromDb(String deviceId) { return deviceMapper.getDeviceByDeviceId(deviceId); @@ -132,7 +132,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { List devicesInDb = getAll(); if (devicesInDb.isEmpty()) { redisCatchStorage.removeAllDevice(); - deviceStatusTaskRunner.clear(); + deviceStatusManager.clear(); }else { List devicesInRedis = redisCatchStorage.getAllDevices(); if (!devicesInRedis.isEmpty()) { @@ -152,7 +152,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { // 重置 cseq 计数 redisCatchStorage.resetAllCSEQ(); // 处理设备状态 - Set allDeviceIds = deviceStatusTaskRunner.getAll(); + Set allDeviceIds = deviceStatusManager.getAll(); if (!allDeviceIds.isEmpty()) { // 除了记录的设备以外, 其他设备全部离线 List onlineDevice = getAllOnlineDevice(userSetting.getServerId()); @@ -233,7 +233,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { for (Device device : offlineDevices) { device.setOnLine(false); redisCatchStorage.updateDevice(device); - deviceStatusTaskRunner.removeTask(device.getDeviceId()); + deviceStatusManager.remove(device.getDeviceId()); } } @@ -244,7 +244,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) { subscribeTaskRunner.removeSubscribe(SubscribeTaskForMobilPosition.getKey(device)); } - deviceStatusTaskRunner.removeTask(device.getDeviceId()); + deviceStatusManager.remove(device.getDeviceId()); // 离线释放所有 ssrc List ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(device.getDeviceId()); if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) { @@ -279,11 +279,18 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { @EventListener public void onApplicationEvent(DeviceOfflineEvent event) { log.info("[设备状态] 到期, 编号: {}", event.getDeviceId()); - offline(event.getDeviceId(), "保活到期", true); + Device device = getDeviceByDeviceId(event.getDeviceId()); + Boolean deviceStatus = getDeviceStatus(device); + if (deviceStatus != null && deviceStatus) { + log.info("[设备离线] 主动探测发现设备在线,暂不处理 device:{}", event.getDeviceId()); + online(device); + return; + } + offline(device); } @Override - public void online(Device device, SipTransactionInfo sipTransactionInfo) { + public void online(Device device) { log.info("[设备上线] deviceId:{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort()); Device deviceInRedis = redisCatchStorage.getDevice(device.getDeviceId()); Device deviceInDb = getDeviceByDeviceIdFromDb(device.getDeviceId()); @@ -301,13 +308,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { device.setHeartBeatInterval(60); device.setPositionCapability(0); } - if (sipTransactionInfo != null) { - device.setSipTransactionInfo(sipTransactionInfo); - }else { - if (deviceInRedis != null) { - device.setSipTransactionInfo(deviceInRedis.getSipTransactionInfo()); - } - } // 第一次上线 或则设备之前是离线状态--进行通道同步和设备信息查询 if (deviceInDb == null) { @@ -371,35 +371,24 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { } // 设备状态任务添加 long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L; - deviceStatusTaskRunner.addTask(device.getDeviceId(), expiresTime + System.currentTimeMillis()); + deviceStatusManager.add(device.getDeviceId(), expiresTime + System.currentTimeMillis()); } @Override - @Transactional - public void offline(String deviceId, String reason, boolean check) { - Device device = getDeviceByDeviceIdFromDb(deviceId); + public void offline(Device device) { if (device == null) { - log.warn("[设备不存在] device:{}", deviceId); + log.warn("[设备不存在]"); return; } - - // 主动查询设备状态, 没有HostAddress无法发送请求,可能是手动添加的设备 - if (check && device.getHostAddress() != null) { - Boolean deviceStatus = getDeviceStatus(device); - if (deviceStatus != null && deviceStatus) { - log.info("[设备离线] 主动探测发现设备在线,暂不处理 device:{}", deviceId); - online(device, null); - return; - } - } - log.info("[设备离线] {}, device:{}, 心跳间隔: {},心跳超时次数: {}, 上次心跳时间:{}, 上次注册时间: {}", reason, deviceId, + String deviceId = device.getDeviceId(); + log.info("[设备离线] device:{}, 心跳间隔: {},心跳超时次数: {}, 上次心跳时间:{}, 上次注册时间: {}", deviceId, device.getHeartBeatInterval(), device.getHeartBeatCount(), device.getKeepaliveTime(), device.getRegisterTime()); device.setOnLine(false); cleanOfflineDevice(device); redisCatchStorage.updateDevice(device); deviceMapper.update(device); if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 + // 发送 redis 消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, false); } if (isDevice(deviceId)) { @@ -818,8 +807,8 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) { removeMobilePositionSubscribe(device, null); } - if (deviceStatusTaskRunner.containsKey(deviceId)) { - deviceStatusTaskRunner.removeTask(deviceId); + if (deviceStatusManager.contains(deviceId)) { + deviceStatusManager.remove(deviceId); } List commonGBChannels = commonGBChannelMapper.queryByDataTypeAndDeviceIds(1, List.of(device.getId())); @@ -967,7 +956,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { updateDevice(deviceInDb); long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L; - deviceStatusTaskRunner.addTask(device.getDeviceId(), expiresTime + System.currentTimeMillis()); + deviceStatusManager.add(device.getDeviceId(), expiresTime + System.currentTimeMillis()); } } @@ -1196,9 +1185,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { try { sipCommander.deviceStatusQuery(device, (code, msg, data) -> { if ("ONLINE".equalsIgnoreCase(data.trim())) { - online(device, null); + online(device); }else { - offline(device.getDeviceId(), "设备状态查询结果:" + data.trim(), true); + offline(device); } if (callback != null) { callback.run(code, msg, data); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java index 409ea5040..88ac9d59c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java @@ -215,7 +215,8 @@ public class CatalogDataManager implements CommandLineRunner { redisTemplate.delete(key); } - @Scheduled(fixedDelay = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 + //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 + @Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS) private void timerTask(){ if (dataMap.isEmpty()) { return; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusManager.java similarity index 82% rename from src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskRunner.java rename to src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusManager.java index aa776fee2..1434ec289 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusManager.java @@ -4,23 +4,20 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.utils.redis.RedisUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import java.util.ArrayList; -import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.lang.Thread; @Slf4j @Component -public class DeviceStatusTaskRunner { +public class DeviceStatusManager { @Autowired private RedisTemplate redisTemplate; @@ -41,9 +38,9 @@ public class DeviceStatusTaskRunner { } /** - * 状态过期检查 + * 状态过期检查, 每秒检查一次, 系统启动10秒后开始检查 */ - @Scheduled(fixedDelay = 1, timeUnit = TimeUnit.SECONDS) + @Scheduled(fixedDelay = 1, initialDelay = 10, timeUnit = TimeUnit.SECONDS) public void expirationCheck(){ long now = System.currentTimeMillis(); // 获取已过期的 deviceId (Score 介于 0 到 现在之间) @@ -56,7 +53,7 @@ public class DeviceStatusTaskRunner { for (String deviceId : expiredIds) { Thread.startVirtualThread(() -> { // 获取详情后删除缓存 - Device device = redisCatchStorage.getDevice(deviceId); +// Device device = redisCatchStorage.getDevice(deviceId); // redisCatchStorage.removeDevice(deviceId); // 发送 Spring 异步事件 eventPublisher.deviceOfflineEventPublish(deviceId); @@ -65,15 +62,15 @@ public class DeviceStatusTaskRunner { } } - public void addTask(String deviceId, long expireTime) { + public void add(String deviceId, long expireTime) { redisTemplate.opsForZSet().add(redisKey(), deviceId, expireTime); } - public void removeTask(String deviceId) { + public void remove(String deviceId) { redisTemplate.opsForZSet().remove(redisKey(), deviceId); } - public boolean containsKey(String deviceId) { + public boolean contains(String deviceId) { if (ObjectUtils.isEmpty(deviceId)) { return false; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java index c329837f4..db6615aeb 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/RegisterRequestProcessor.java @@ -23,6 +23,7 @@ import gov.nist.javax.sip.message.SIPResponse; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; @@ -38,6 +39,9 @@ import java.security.NoSuchAlgorithmException; import java.text.ParseException; import java.util.Calendar; import java.util.Locale; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; /** * SIP命令类型: REGISTER请求 @@ -63,6 +67,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen @Autowired private UserSetting userSetting; + @Override public void afterPropertiesSet() throws Exception { // 添加消息处理的订阅 @@ -115,9 +120,9 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen device.setTransport("TCP".equalsIgnoreCase(transport) ? "TCP" : "UDP"); sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), registerOkResponse); device.setRegisterTime(DateUtil.getNow()); - deviceService.online(device, null); + deviceService.online(device); } else { - deviceService.offline(deviceId, "主动注销", false); + deviceService.offline(device); } return; }else { @@ -125,6 +130,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen if (!ObjectUtils.isEmpty(device.getPassword()) || !ObjectUtils.isEmpty(sipConfig.getPassword())) { password = (!ObjectUtils.isEmpty(device.getPassword())) ? device.getPassword() : sipConfig.getPassword(); } + // 如果设置了一个无密码的设备,那么这里就会自动跳动,后续会直接注册成功 } }else { if (ObjectUtils.isEmpty(sipConfig.getPassword())) { @@ -225,10 +231,11 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen log.info("[注册成功] deviceId: {}->{}", deviceId, requestAddress); device.setRegisterTime(DateUtil.getNow()); SipTransactionInfo sipTransactionInfo = new SipTransactionInfo((SIPResponse) response); - deviceService.online(device, sipTransactionInfo); + device.setSipTransactionInfo(sipTransactionInfo); + deviceService.online(device); } else { log.info("[注销成功] deviceId: {}->{}", deviceId, requestAddress); - deviceService.offline(deviceId, "主动注销", false); + deviceService.offline(device); } } catch (SipException | NoSuchAlgorithmException | ParseException e) { log.error("未处理的异常 ", e); @@ -256,4 +263,5 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen return response; } + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index dddf242d9..0cd348f90 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -5,7 +5,7 @@ import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.service.IDeviceService; -import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskRunner; +import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusManager; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler; @@ -49,7 +49,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp private IDeviceService deviceService; @Autowired - private DeviceStatusTaskRunner statusTaskRunner; + private DeviceStatusManager statusTaskRunner; @Autowired private UserSetting userSetting; @@ -71,7 +71,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, userSetting.getSipUseSourceIpAsRemoteAddress()); if (device.getIp() == null || !device.getIp().equalsIgnoreCase(remoteAddressInfo.getIp()) || device.getPort() != remoteAddressInfo.getPort()) { - log.info("[收到心跳] 地址变化, {}({}), {}:{}->{}", device.getName(), device.getDeviceId(), remoteAddressInfo.getIp(), remoteAddressInfo.getPort(), request.getLocalAddress().getHostAddress()); + log.info("[收到心跳] 地址变化, {}({}), {}:{}->{}:{}", device.getName(), device.getDeviceId(), device.getIp(), device.getPort(), remoteAddressInfo.getIp(), remoteAddressInfo.getPort()); device.setPort(remoteAddressInfo.getPort()); device.setHostAddress(IpPortUtil.concatenateIpAndPort(remoteAddressInfo.getIp(), String.valueOf(remoteAddressInfo.getPort()))); device.setIp(remoteAddressInfo.getIp()); @@ -83,11 +83,11 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp if (device.isOnLine()) { taskQueue.add(device); long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L; - statusTaskRunner.addTask(device.getDeviceId(), expiresTime + System.currentTimeMillis()); + statusTaskRunner.add(device.getDeviceId(), expiresTime + System.currentTimeMillis()); } else { if (userSetting.getGbDeviceOnline() == 1) { // 对于已经离线的设备判断他的注册是否已经过期 - deviceService.online(device, null); + deviceService.online(device); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 57a255092..17b375358 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -31,6 +31,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; +import java.lang.Thread; /** * 目录查询的回复