From d29bdec64856404cebba28a0f8cdeb8a4a1f3a7b Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Thu, 15 Jan 2026 23:10:49 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BD=BF=E7=94=A8zset=E7=AE=A1=E7=90=86?= =?UTF-8?q?=E8=AE=BE=E5=A4=87=E7=8A=B6=E6=80=81=EF=BC=8C=E4=BB=A5=E5=87=8F?= =?UTF-8?q?=E5=B0=91=E5=86=85=E5=AD=98=E5=BC=80=E9=94=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/event/EventPublisher.java | 11 +- .../event/device/DeviceOfflineEvent.java | 22 ++++ .../event/device/DeviceOnlineEvent.java | 22 ++++ .../service/impl/DeviceServiceImpl.java | 84 ++++--------- .../task/deviceStatus/DeviceStatusTask.java | 60 ---------- .../deviceStatus/DeviceStatusTaskInfo.java | 17 --- .../deviceStatus/DeviceStatusTaskRunner.java | 68 +++++++---- .../impl/message/MessageHandlerAbstract.java | 4 +- .../cmd/KeepaliveNotifyMessageHandler.java | 7 +- .../cmd/CatalogResponseMessageHandler.java | 110 +++++++++++++++++- .../DeviceStatusResponseMessageHandler.java | 8 +- 11 files changed, 230 insertions(+), 183 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/event/device/DeviceOfflineEvent.java create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/event/device/DeviceOnlineEvent.java delete mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTask.java delete mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskInfo.java diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index 2e05e2587..cdcee7dd1 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -1,12 +1,10 @@ package com.genersoft.iot.vmp.gb28181.event; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; -import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm; -import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; -import com.genersoft.iot.vmp.gb28181.bean.Platform; +import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEvent; import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent; +import com.genersoft.iot.vmp.gb28181.event.device.DeviceOfflineEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent; import com.genersoft.iot.vmp.media.bean.MediaServer; @@ -120,4 +118,9 @@ public class EventPublisher { } + public void deviceOfflineEventPublish(String deviceId) { + DeviceOfflineEvent event = new DeviceOfflineEvent(this); + event.setDeviceId(deviceId); + applicationEventPublisher.publishEvent(event); + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/device/DeviceOfflineEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/device/DeviceOfflineEvent.java new file mode 100644 index 000000000..fcceafef5 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/device/DeviceOfflineEvent.java @@ -0,0 +1,22 @@ +package com.genersoft.iot.vmp.gb28181.event.device; + +import com.genersoft.iot.vmp.gb28181.bean.Device; +import lombok.Getter; +import lombok.Setter; +import org.springframework.context.ApplicationEvent; + +import java.io.Serial; + +@Getter +@Setter +public class DeviceOfflineEvent extends ApplicationEvent { + + private String deviceId; + + @Serial + private static final long serialVersionUID = 1L; + + public DeviceOfflineEvent(Object source) { + super(source); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/device/DeviceOnlineEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/device/DeviceOnlineEvent.java new file mode 100644 index 000000000..2a9c703cd --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/device/DeviceOnlineEvent.java @@ -0,0 +1,22 @@ +package com.genersoft.iot.vmp.gb28181.event.device; + +import com.genersoft.iot.vmp.gb28181.bean.Device; +import lombok.Getter; +import lombok.Setter; +import org.springframework.context.ApplicationEvent; + +import java.io.Serial; + +@Getter +@Setter +public class DeviceOnlineEvent extends ApplicationEvent { + + private Device device; + + @Serial + private static final long serialVersionUID = 1L; + + public DeviceOnlineEvent(Object source) { + super(source); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java index 24012609b..162342e60 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceServiceImpl.java @@ -12,12 +12,11 @@ import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper; import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent; +import com.genersoft.iot.vmp.gb28181.event.device.DeviceOfflineEvent; import com.genersoft.iot.vmp.gb28181.service.IDeviceService; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager; import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager; -import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTask; -import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskInfo; import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskRunner; import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask; import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskInfo; @@ -43,7 +42,9 @@ import jakarta.validation.constraints.NotNull; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; +import org.springframework.context.event.EventListener; import org.springframework.core.annotation.Order; +import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -127,10 +128,11 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { @Override public void run(String... args) throws Exception { - // 清理数据库不存在但是redis中存在的数据 + // 清理数据库不存在但是 redis 中存在的数据 List devicesInDb = getAll(); if (devicesInDb.isEmpty()) { redisCatchStorage.removeAllDevice(); + deviceStatusTaskRunner.clear(); }else { List devicesInRedis = redisCatchStorage.getAllDevices(); if (!devicesInRedis.isEmpty()) { @@ -147,30 +149,17 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { } } - // 重置cseq计数 + // 重置 cseq 计数 redisCatchStorage.resetAllCSEQ(); // 处理设备状态 - List allTaskInfo = deviceStatusTaskRunner.getAllTaskInfo(); - List onlineDeviceIds = new ArrayList<>(); - if (!allTaskInfo.isEmpty()) { - for (DeviceStatusTaskInfo taskInfo : allTaskInfo) { - Device device = getDeviceByDeviceId(taskInfo.getDeviceId()); - if (device == null) { - deviceStatusTaskRunner.removeTask(taskInfo.getDeviceId()); - continue; - } - // 恢复定时任务, TCP因为连接已经断开必须等待设备重新连接 - DeviceStatusTask deviceStatusTask = DeviceStatusTask.getInstance(taskInfo.getDeviceId(), - taskInfo.getTransactionInfo(), taskInfo.getExpireTime() + 1000 + System.currentTimeMillis(), this::deviceStatusExpire); - deviceStatusTaskRunner.addTask(deviceStatusTask); - onlineDeviceIds.add(taskInfo.getDeviceId()); - } + Set allDeviceIds = deviceStatusTaskRunner.getAll(); + if (!allDeviceIds.isEmpty()) { // 除了记录的设备以外, 其他设备全部离线 List onlineDevice = getAllOnlineDevice(userSetting.getServerId()); if (!onlineDevice.isEmpty()) { List offlineDevices = new ArrayList<>(); for (Device device : onlineDevice) { - if (!onlineDeviceIds.contains(device.getDeviceId())) { + if (!allDeviceIds.contains(device.getDeviceId())) { // 此设备需要离线 device.setOnLine(false); // 清理离线设备的相关缓存 @@ -203,7 +192,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { continue; } Device device = getDeviceByDeviceId(taskInfo.getDeviceId()); - if (device == null || !device.isOnLine() || !onlineDeviceIds.contains(taskInfo.getDeviceId())) { + if (device == null || !device.isOnLine() || !allDeviceIds.contains(taskInfo.getDeviceId())) { subscribeTaskRunner.removeSubscribe(taskInfo.getKey()); continue; } @@ -244,6 +233,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { for (Device device : offlineDevices) { device.setOnLine(false); redisCatchStorage.updateDevice(device); + deviceStatusTaskRunner.removeTask(device.getDeviceId()); } } @@ -254,7 +244,8 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) { subscribeTaskRunner.removeSubscribe(SubscribeTaskForMobilPosition.getKey(device)); } - // 离线释放所有ssrc + deviceStatusTaskRunner.removeTask(device.getDeviceId()); + // 离线释放所有 ssrc List ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(device.getDeviceId()); if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) { for (SsrcTransaction ssrcTransaction : ssrcTransactions) { @@ -283,9 +274,12 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { } } - private void deviceStatusExpire(String deviceId, SipTransactionInfo transactionInfo) { - log.info("[设备状态] 到期, 编号: {}", deviceId); - offline(deviceId, "保活到期", true); + // 监听设备过期事件 + @Async + @EventListener + public void onApplicationEvent(DeviceOfflineEvent event) { + log.info("[设备状态] 到期, 编号: {}", event.getDeviceId()); + offline(event.getDeviceId(), "保活到期", true); } @Override @@ -363,7 +357,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { } if (userSetting.getDeviceStatusNotify()) { - // 发送redis消息 + // 发送 redis消息 redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, true); } }else { @@ -375,20 +369,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { sync(device); } } + // 设备状态任务添加 long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L; - if (deviceStatusTaskRunner.containsKey(device.getDeviceId())) { - if (sipTransactionInfo == null) { - deviceStatusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis()); - }else { - deviceStatusTaskRunner.removeTask(device.getDeviceId()); - DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime + System.currentTimeMillis(), this::deviceStatusExpire); - deviceStatusTaskRunner.addTask(task); - } - }else { - DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime + System.currentTimeMillis(), this::deviceStatusExpire); - deviceStatusTaskRunner.addTask(task); - } - + deviceStatusTaskRunner.addTask(device.getDeviceId(), expiresTime + System.currentTimeMillis()); } @Override @@ -467,25 +450,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { } } - // 设备状态丢失检查 - @Scheduled(fixedDelay = 30, timeUnit = TimeUnit.SECONDS) - public void lostCheckForStatus(){ - // 获取所有设备 - List deviceList = redisCatchStorage.getAllDevices(); - if (deviceList.isEmpty()) { - return; - } - for (Device device : deviceList) { - if (device == null || !device.isOnLine() || !userSetting.getServerId().equals(device.getServerId())) { - continue; - } - if (!deviceStatusTaskRunner.containsKey(device.getDeviceId())) { - log.debug("[状态丢失] 执行设备离线, 编号: {},", device.getDeviceId()); - offline(device.getDeviceId(), "", true); - } - } - } - private void catalogSubscribeExpire(String deviceId, SipTransactionInfo transactionInfo) { log.info("[目录订阅] 到期, 编号: {}", deviceId); Device device = getDeviceByDeviceId(deviceId); @@ -1003,9 +967,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner { updateDevice(deviceInDb); long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L; - if (deviceStatusTaskRunner.containsKey(device.getDeviceId())) { - deviceStatusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis()); - } + deviceStatusTaskRunner.addTask(device.getDeviceId(), expiresTime + System.currentTimeMillis()); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTask.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTask.java deleted file mode 100644 index 0e754358b..000000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTask.java +++ /dev/null @@ -1,60 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.task.deviceStatus; - -import com.genersoft.iot.vmp.common.DeviceStatusCallback; -import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; -import lombok.Data; -import lombok.extern.slf4j.Slf4j; -import org.jetbrains.annotations.NotNull; - -import java.util.concurrent.Delayed; -import java.util.concurrent.TimeUnit; - -@Slf4j -@Data -public class DeviceStatusTask implements Delayed { - - private String deviceId; - - private SipTransactionInfo transactionInfo; - - /** - * 超时时间(单位: 毫秒) - */ - private long delayTime; - - private DeviceStatusCallback callback; - - public static DeviceStatusTask getInstance(String deviceId, SipTransactionInfo transactionInfo, long delayTime, DeviceStatusCallback callback) { - DeviceStatusTask deviceStatusTask = new DeviceStatusTask(); - deviceStatusTask.setDeviceId(deviceId); - deviceStatusTask.setTransactionInfo(transactionInfo); - deviceStatusTask.setDelayTime(delayTime); - deviceStatusTask.setCallback(callback); - return deviceStatusTask; - } - - public void expired() { - if (callback == null) { - log.info("[设备离线] 未找到过期处理回调, {}", deviceId); - return; - } - callback.run(deviceId, transactionInfo); - } - - @Override - public long getDelay(@NotNull TimeUnit unit) { - return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); - } - - @Override - public int compareTo(@NotNull Delayed o) { - return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); - } - - public DeviceStatusTaskInfo getInfo(){ - DeviceStatusTaskInfo taskInfo = new DeviceStatusTaskInfo(); - taskInfo.setTransactionInfo(transactionInfo); - taskInfo.setDeviceId(deviceId); - return taskInfo; - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskInfo.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskInfo.java deleted file mode 100644 index af2aa7512..000000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskInfo.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.task.deviceStatus; - -import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo; -import lombok.Data; - -@Data -public class DeviceStatusTaskInfo{ - - private String deviceId; - - private SipTransactionInfo transactionInfo; - - /** - * 过期时间,单位毫秒 - */ - private long expireTime; -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskRunner.java index a99c894dd..aa776fee2 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceStatus/DeviceStatusTaskRunner.java @@ -1,6 +1,9 @@ package com.genersoft.iot.vmp.gb28181.task.deviceStatus; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.Device; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; +import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.utils.redis.RedisUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ObjectUtils; @@ -12,6 +15,7 @@ import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.concurrent.TimeUnit; @Slf4j @@ -19,54 +23,68 @@ import java.util.concurrent.TimeUnit; public class DeviceStatusTaskRunner { @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; + + @Autowired + private IRedisCatchStorage redisCatchStorage; @Autowired private UserSetting userSetting; + @Autowired + private EventPublisher eventPublisher; + private final String prefix = "VMP_DEVICE_EXPIRES"; - private final String redisKey = String.format("%s_%s", prefix, userSetting.getServerId()); - // 状态过期检查 + public String redisKey(){ + return String.format("%s_%s", prefix, userSetting.getServerId()); + } + + /** + * 状态过期检查 + */ @Scheduled(fixedDelay = 1, timeUnit = TimeUnit.SECONDS) - @Async public void expirationCheck(){ + long now = System.currentTimeMillis(); + // 获取已过期的 deviceId (Score 介于 0 到 现在之间) + Set expiredIds = redisTemplate.opsForZSet().rangeByScore(redisKey(), 0, now); + if (expiredIds != null && !expiredIds.isEmpty()) { + redisTemplate.opsForZSet().remove(redisKey(), expiredIds.toArray()); + + // 使用 JDK 21 虚拟线程异步分发事件 + for (String deviceId : expiredIds) { + Thread.startVirtualThread(() -> { + // 获取详情后删除缓存 + Device device = redisCatchStorage.getDevice(deviceId); +// redisCatchStorage.removeDevice(deviceId); + // 发送 Spring 异步事件 + eventPublisher.deviceOfflineEventPublish(deviceId); + }); + } + } } public void addTask(String deviceId, long expireTime) { - redisTemplate.opsForZSet().add(redisKey, deviceId, expireTime); + redisTemplate.opsForZSet().add(redisKey(), deviceId, expireTime); } public void removeTask(String deviceId) { - redisTemplate.opsForZSet().remove(redisKey, deviceId); + redisTemplate.opsForZSet().remove(redisKey(), deviceId); } public boolean containsKey(String deviceId) { if (ObjectUtils.isEmpty(deviceId)) { return false; } - return redisTemplate.opsForZSet().score(redisKey, deviceId) != null; + return redisTemplate.opsForZSet().score(redisKey(), deviceId) != null; } - public List getAllTaskInfo(){ - String scanKey = String.format("%s_%s_*", prefix, userSetting.getServerId()); - List values = RedisUtil.scan(redisTemplate, scanKey); - if (values.isEmpty()) { - return new ArrayList<>(); - } - List result = new ArrayList<>(); - for (Object value : values) { - String redisKey = (String)value; - DeviceStatusTaskInfo taskInfo = (DeviceStatusTaskInfo)redisTemplate.opsForValue().get(redisKey); - if (taskInfo == null) { - continue; - } - Long expire = redisTemplate.getExpire(redisKey, TimeUnit.MILLISECONDS); - taskInfo.setExpireTime(expire); - result.add(taskInfo); - } - return result; + public void clear() { + redisTemplate.opsForZSet().removeRangeByScore(redisKey(), 0, Long.MAX_VALUE); + } + public Set getAll() { + return redisTemplate.opsForZSet().rangeByScore(redisKey(), 0, Long.MAX_VALUE); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java index 7a743d14d..b1dc99a66 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/MessageHandlerAbstract.java @@ -12,6 +12,7 @@ import gov.nist.javax.sip.message.SIPRequest; import lombok.extern.slf4j.Slf4j; import org.dom4j.Element; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; @@ -74,9 +75,8 @@ public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent i } } - + @Async public void handMessageEvent(Element element, Object data) { - String cmd = getText(element, "CmdType"); String sn = getText(element, "SN"); MessageEvent subscribe = (MessageEvent)messageSubscribe.getSubscribe(cmd + sn); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java index ce7dbeb36..dddf242d9 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/KeepaliveNotifyMessageHandler.java @@ -28,6 +28,7 @@ import javax.sip.message.Response; import java.text.ParseException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; /** * 状态信息(心跳)报送 @@ -82,9 +83,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp if (device.isOnLine()) { taskQueue.add(device); long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L; - if (statusTaskRunner.containsKey(device.getDeviceId())) { - statusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis()); - } + statusTaskRunner.addTask(device.getDeviceId(), expiresTime + System.currentTimeMillis()); } else { if (userSetting.getGbDeviceOnline() == 1) { // 对于已经离线的设备判断他的注册是否已经过期 @@ -92,7 +91,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp } } } - @Scheduled(fixedDelay = 1000) + @Scheduled(fixedDelay = 1000, timeUnit = TimeUnit.MILLISECONDS) @Async public void executeUpdateDeviceList() { if (!taskQueue.isEmpty()) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 738430a80..57a255092 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -16,6 +16,7 @@ import org.dom4j.DocumentException; import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -67,17 +68,120 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp @Override public void handForDevice(RequestEvent evt, Device device, Element element) { - taskQueue.offer(new HandlerCatchData(evt, device, element)); // 回复200 OK try { responseAck((SIPRequest) evt.getRequest(), Response.OK); } catch (SipException | InvalidArgumentException | ParseException e) { log.error("[命令发送失败] 目录查询回复: {}", e.getMessage()); } + + int sn = 0; + // 全局异常捕获,保证下一条可以得到处理 + try { + Element rootElement = null; + try { + rootElement = getRootElement(evt, device.getCharset()); + } catch (DocumentException e) { + log.error("[xml解析] 失败: ", e); + return; + } + if (rootElement == null) { + log.warn("[ 收到通道 ] content cannot be null, {}", evt.getRequest()); + return; + } + Element deviceListElement = rootElement.element("DeviceList"); + Element sumNumElement = rootElement.element("SumNum"); + Element snElement = rootElement.element("SN"); + + sn = Integer.parseInt(snElement.getText()); + int sumNum = Integer.parseInt(sumNumElement.getText()); + + if (sumNum == 0) { + log.info("[收到通道]设备:{}的: 0个", device.getDeviceId()); + // 数据已经完整接收 + deviceChannelService.cleanChannelsForDevice(device.getId()); + // 推送空数据,不然无法及时结束 + catalogDataCatch.put(device.getDeviceId(), sn, 0, device, + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); + catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), sn, null); + return; + } else { + Iterator deviceListIterator = deviceListElement.elementIterator(); + if (deviceListIterator != null) { + List channelList = new ArrayList<>(); + List regionList = new ArrayList<>(); + List groupList = new ArrayList<>(); + // 遍历DeviceList + while (deviceListIterator.hasNext()) { + Element itemDevice = deviceListIterator.next(); + Element channelDeviceElement = itemDevice.element("DeviceID"); + if (channelDeviceElement == null) { + // 总数减一, 避免最后总数不对 无法确定问题 + continue; + } + // 从xml解析内容到 DeviceChannel 对象 + DeviceChannel channel = DeviceChannel.decode(itemDevice); + if (channel.getDeviceId() == null) { + log.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent())); + continue; + } + channel.setDataDeviceId(device.getId()); + if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { + channel.setParentId(null); + } + // 解析通道类型 + if (channel.getDeviceId().length() <= 8) { + // 行政区划 + Region region = Region.getInstance(channel); + regionList.add(region); + channel.setChannelType(1); + }else if (channel.getDeviceId().length() == 20){ + // 业务分组/虚拟组织 + Group group = Group.getInstance(channel); + if (group != null) { + channel.setParental(1); + channel.setChannelType(2); + groupList.add(group); + } + if (channel.getLongitude() != null && channel.getLatitude() != null && channel.getLongitude() > 0 && channel.getLatitude() > 0) { + Double[] wgs84Position = Coordtransform.GCJ02ToWGS84(channel.getLongitude(), channel.getLatitude()); + channel.setGbLongitude(wgs84Position[0]); + channel.setGbLatitude(wgs84Position[1]); + } + } + channelList.add(channel); + } + + catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device, + channelList, regionList, groupList); + log.info("[收到通道]设备: {} -> {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.size(device.getDeviceId(), sn), sumNum); + } + } + } catch (Exception e) { + log.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest()); + log.error("[收到通道] 异常内容: ", e); + } finally { + String deviceId = device.getDeviceId(); + if (catalogDataCatch.size(deviceId, sn) > 0 + && catalogDataCatch.size(deviceId, sn) == catalogDataCatch.sumNum(deviceId, sn)) { + // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理, + // 目前支持设备通道上线通知时和设备上线时向上级通知 + int finalSn = sn; + Thread.startVirtualThread(() -> { + boolean resetChannelsResult = saveData(device, finalSn); + if (!resetChannelsResult) { + String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(deviceId, finalSn) + "条,已接收" + catalogDataCatch.getDeviceChannelList(device.getDeviceId(), finalSn).size() + "条"; + catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, errorMsg); + } else { + catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, null); + } + }).start(); + } + } } - @Scheduled(fixedDelay = 50) - @Transactional +// @Scheduled(fixedDelay = 50) +// @Transactional public void executeTaskQueue(){ if (taskQueue.isEmpty()) { return; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceStatusResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceStatusResponseMessageHandler.java index a4dc8aef5..028304360 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceStatusResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/DeviceStatusResponseMessageHandler.java @@ -51,13 +51,7 @@ public class DeviceStatusResponseMessageHandler extends SIPRequestProcessorParen } catch (SipException | InvalidArgumentException | ParseException e) { log.error("[命令发送失败] 国标级联 设备状态应答回复200OK: {}", e.getMessage()); } - Element onlineElement = element.element("Online"); - JSONObject json = new JSONObject(); - XmlUtil.node2Json(element, json); - if (log.isDebugEnabled()) { - log.debug(json.toJSONString()); - } - String text = onlineElement.getText(); + String text = element.elementText("Online"); responseMessageHandler.handMessageEvent(element, text); }