使用zset管理设备状态,以减少内存开销

This commit is contained in:
lin 2026-01-15 23:10:49 +08:00
parent 5373e6082c
commit d29bdec648
11 changed files with 230 additions and 183 deletions

View File

@ -1,12 +1,10 @@
package com.genersoft.iot.vmp.gb28181.event;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEvent;
import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent;
import com.genersoft.iot.vmp.gb28181.event.device.DeviceOfflineEvent;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent;
import com.genersoft.iot.vmp.media.bean.MediaServer;
@ -120,4 +118,9 @@ public class EventPublisher {
}
public void deviceOfflineEventPublish(String deviceId) {
DeviceOfflineEvent event = new DeviceOfflineEvent(this);
event.setDeviceId(deviceId);
applicationEventPublisher.publishEvent(event);
}
}

View File

@ -0,0 +1,22 @@
package com.genersoft.iot.vmp.gb28181.event.device;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import lombok.Getter;
import lombok.Setter;
import org.springframework.context.ApplicationEvent;
import java.io.Serial;
@Getter
@Setter
public class DeviceOfflineEvent extends ApplicationEvent {
private String deviceId;
@Serial
private static final long serialVersionUID = 1L;
public DeviceOfflineEvent(Object source) {
super(source);
}
}

View File

@ -0,0 +1,22 @@
package com.genersoft.iot.vmp.gb28181.event.device;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import lombok.Getter;
import lombok.Setter;
import org.springframework.context.ApplicationEvent;
import java.io.Serial;
@Getter
@Setter
public class DeviceOnlineEvent extends ApplicationEvent {
private Device device;
@Serial
private static final long serialVersionUID = 1L;
public DeviceOnlineEvent(Object source) {
super(source);
}
}

View File

@ -12,12 +12,11 @@ import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent;
import com.genersoft.iot.vmp.gb28181.event.device.DeviceOfflineEvent;
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTask;
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskInfo;
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskRunner;
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask;
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskInfo;
@ -43,7 +42,9 @@ import jakarta.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -127,10 +128,11 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
@Override
public void run(String... args) throws Exception {
// 清理数据库不存在但是redis中存在的数据
// 清理数据库不存在但是 redis 中存在的数据
List<Device> devicesInDb = getAll();
if (devicesInDb.isEmpty()) {
redisCatchStorage.removeAllDevice();
deviceStatusTaskRunner.clear();
}else {
List<Device> devicesInRedis = redisCatchStorage.getAllDevices();
if (!devicesInRedis.isEmpty()) {
@ -147,30 +149,17 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
}
// 重置cseq计数
// 重置 cseq 计数
redisCatchStorage.resetAllCSEQ();
// 处理设备状态
List<DeviceStatusTaskInfo> allTaskInfo = deviceStatusTaskRunner.getAllTaskInfo();
List<String> onlineDeviceIds = new ArrayList<>();
if (!allTaskInfo.isEmpty()) {
for (DeviceStatusTaskInfo taskInfo : allTaskInfo) {
Device device = getDeviceByDeviceId(taskInfo.getDeviceId());
if (device == null) {
deviceStatusTaskRunner.removeTask(taskInfo.getDeviceId());
continue;
}
// 恢复定时任务, TCP因为连接已经断开必须等待设备重新连接
DeviceStatusTask deviceStatusTask = DeviceStatusTask.getInstance(taskInfo.getDeviceId(),
taskInfo.getTransactionInfo(), taskInfo.getExpireTime() + 1000 + System.currentTimeMillis(), this::deviceStatusExpire);
deviceStatusTaskRunner.addTask(deviceStatusTask);
onlineDeviceIds.add(taskInfo.getDeviceId());
}
Set<String> allDeviceIds = deviceStatusTaskRunner.getAll();
if (!allDeviceIds.isEmpty()) {
// 除了记录的设备以外 其他设备全部离线
List<Device> onlineDevice = getAllOnlineDevice(userSetting.getServerId());
if (!onlineDevice.isEmpty()) {
List<Device> offlineDevices = new ArrayList<>();
for (Device device : onlineDevice) {
if (!onlineDeviceIds.contains(device.getDeviceId())) {
if (!allDeviceIds.contains(device.getDeviceId())) {
// 此设备需要离线
device.setOnLine(false);
// 清理离线设备的相关缓存
@ -203,7 +192,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
continue;
}
Device device = getDeviceByDeviceId(taskInfo.getDeviceId());
if (device == null || !device.isOnLine() || !onlineDeviceIds.contains(taskInfo.getDeviceId())) {
if (device == null || !device.isOnLine() || !allDeviceIds.contains(taskInfo.getDeviceId())) {
subscribeTaskRunner.removeSubscribe(taskInfo.getKey());
continue;
}
@ -244,6 +233,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
for (Device device : offlineDevices) {
device.setOnLine(false);
redisCatchStorage.updateDevice(device);
deviceStatusTaskRunner.removeTask(device.getDeviceId());
}
}
@ -254,7 +244,8 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
subscribeTaskRunner.removeSubscribe(SubscribeTaskForMobilPosition.getKey(device));
}
// 离线释放所有ssrc
deviceStatusTaskRunner.removeTask(device.getDeviceId());
// 离线释放所有 ssrc
List<SsrcTransaction> ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(device.getDeviceId());
if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) {
for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
@ -283,9 +274,12 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
}
private void deviceStatusExpire(String deviceId, SipTransactionInfo transactionInfo) {
log.info("[设备状态] 到期, 编号: {}", deviceId);
offline(deviceId, "保活到期", true);
// 监听设备过期事件
@Async
@EventListener
public void onApplicationEvent(DeviceOfflineEvent event) {
log.info("[设备状态] 到期, 编号: {}", event.getDeviceId());
offline(event.getDeviceId(), "保活到期", true);
}
@Override
@ -363,7 +357,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
if (userSetting.getDeviceStatusNotify()) {
// 发送redis消息
// 发送 redis消息
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, true);
}
}else {
@ -375,20 +369,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
sync(device);
}
}
// 设备状态任务添加
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
if (deviceStatusTaskRunner.containsKey(device.getDeviceId())) {
if (sipTransactionInfo == null) {
deviceStatusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis());
}else {
deviceStatusTaskRunner.removeTask(device.getDeviceId());
DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime + System.currentTimeMillis(), this::deviceStatusExpire);
deviceStatusTaskRunner.addTask(task);
}
}else {
DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime + System.currentTimeMillis(), this::deviceStatusExpire);
deviceStatusTaskRunner.addTask(task);
}
deviceStatusTaskRunner.addTask(device.getDeviceId(), expiresTime + System.currentTimeMillis());
}
@Override
@ -467,25 +450,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
}
}
// 设备状态丢失检查
@Scheduled(fixedDelay = 30, timeUnit = TimeUnit.SECONDS)
public void lostCheckForStatus(){
// 获取所有设备
List<Device> deviceList = redisCatchStorage.getAllDevices();
if (deviceList.isEmpty()) {
return;
}
for (Device device : deviceList) {
if (device == null || !device.isOnLine() || !userSetting.getServerId().equals(device.getServerId())) {
continue;
}
if (!deviceStatusTaskRunner.containsKey(device.getDeviceId())) {
log.debug("[状态丢失] 执行设备离线, 编号: {},", device.getDeviceId());
offline(device.getDeviceId(), "", true);
}
}
}
private void catalogSubscribeExpire(String deviceId, SipTransactionInfo transactionInfo) {
log.info("[目录订阅] 到期, 编号: {}", deviceId);
Device device = getDeviceByDeviceId(deviceId);
@ -1003,9 +967,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
updateDevice(deviceInDb);
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
if (deviceStatusTaskRunner.containsKey(device.getDeviceId())) {
deviceStatusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis());
}
deviceStatusTaskRunner.addTask(device.getDeviceId(), expiresTime + System.currentTimeMillis());
}
}

View File

@ -1,60 +0,0 @@
package com.genersoft.iot.vmp.gb28181.task.deviceStatus;
import com.genersoft.iot.vmp.common.DeviceStatusCallback;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@Slf4j
@Data
public class DeviceStatusTask implements Delayed {
private String deviceId;
private SipTransactionInfo transactionInfo;
/**
* 超时时间(单位 毫秒)
*/
private long delayTime;
private DeviceStatusCallback callback;
public static DeviceStatusTask getInstance(String deviceId, SipTransactionInfo transactionInfo, long delayTime, DeviceStatusCallback callback) {
DeviceStatusTask deviceStatusTask = new DeviceStatusTask();
deviceStatusTask.setDeviceId(deviceId);
deviceStatusTask.setTransactionInfo(transactionInfo);
deviceStatusTask.setDelayTime(delayTime);
deviceStatusTask.setCallback(callback);
return deviceStatusTask;
}
public void expired() {
if (callback == null) {
log.info("[设备离线] 未找到过期处理回调, {}", deviceId);
return;
}
callback.run(deviceId, transactionInfo);
}
@Override
public long getDelay(@NotNull TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(@NotNull Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
public DeviceStatusTaskInfo getInfo(){
DeviceStatusTaskInfo taskInfo = new DeviceStatusTaskInfo();
taskInfo.setTransactionInfo(transactionInfo);
taskInfo.setDeviceId(deviceId);
return taskInfo;
}
}

View File

@ -1,17 +0,0 @@
package com.genersoft.iot.vmp.gb28181.task.deviceStatus;
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
import lombok.Data;
@Data
public class DeviceStatusTaskInfo{
private String deviceId;
private SipTransactionInfo transactionInfo;
/**
* 过期时间,单位毫秒
*/
private long expireTime;
}

View File

@ -1,6 +1,9 @@
package com.genersoft.iot.vmp.gb28181.task.deviceStatus;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
@ -12,6 +15,7 @@ import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Slf4j
@ -19,54 +23,68 @@ import java.util.concurrent.TimeUnit;
public class DeviceStatusTaskRunner {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, String> redisTemplate;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private UserSetting userSetting;
@Autowired
private EventPublisher eventPublisher;
private final String prefix = "VMP_DEVICE_EXPIRES";
private final String redisKey = String.format("%s_%s", prefix, userSetting.getServerId());
// 状态过期检查
public String redisKey(){
return String.format("%s_%s", prefix, userSetting.getServerId());
}
/**
* 状态过期检查
*/
@Scheduled(fixedDelay = 1, timeUnit = TimeUnit.SECONDS)
@Async
public void expirationCheck(){
long now = System.currentTimeMillis();
// 获取已过期的 deviceId (Score 介于 0 现在之间)
Set<String> expiredIds = redisTemplate.opsForZSet().rangeByScore(redisKey(), 0, now);
if (expiredIds != null && !expiredIds.isEmpty()) {
redisTemplate.opsForZSet().remove(redisKey(), expiredIds.toArray());
// 使用 JDK 21 虚拟线程异步分发事件
for (String deviceId : expiredIds) {
Thread.startVirtualThread(() -> {
// 获取详情后删除缓存
Device device = redisCatchStorage.getDevice(deviceId);
// redisCatchStorage.removeDevice(deviceId);
// 发送 Spring 异步事件
eventPublisher.deviceOfflineEventPublish(deviceId);
});
}
}
}
public void addTask(String deviceId, long expireTime) {
redisTemplate.opsForZSet().add(redisKey, deviceId, expireTime);
redisTemplate.opsForZSet().add(redisKey(), deviceId, expireTime);
}
public void removeTask(String deviceId) {
redisTemplate.opsForZSet().remove(redisKey, deviceId);
redisTemplate.opsForZSet().remove(redisKey(), deviceId);
}
public boolean containsKey(String deviceId) {
if (ObjectUtils.isEmpty(deviceId)) {
return false;
}
return redisTemplate.opsForZSet().score(redisKey, deviceId) != null;
return redisTemplate.opsForZSet().score(redisKey(), deviceId) != null;
}
public List<DeviceStatusTaskInfo> getAllTaskInfo(){
String scanKey = String.format("%s_%s_*", prefix, userSetting.getServerId());
List<Object> values = RedisUtil.scan(redisTemplate, scanKey);
if (values.isEmpty()) {
return new ArrayList<>();
}
List<DeviceStatusTaskInfo> result = new ArrayList<>();
for (Object value : values) {
String redisKey = (String)value;
DeviceStatusTaskInfo taskInfo = (DeviceStatusTaskInfo)redisTemplate.opsForValue().get(redisKey);
if (taskInfo == null) {
continue;
}
Long expire = redisTemplate.getExpire(redisKey, TimeUnit.MILLISECONDS);
taskInfo.setExpireTime(expire);
result.add(taskInfo);
}
return result;
public void clear() {
redisTemplate.opsForZSet().removeRangeByScore(redisKey(), 0, Long.MAX_VALUE);
}
public Set<String> getAll() {
return redisTemplate.opsForZSet().rangeByScore(redisKey(), 0, Long.MAX_VALUE);
}
}

View File

@ -12,6 +12,7 @@ import gov.nist.javax.sip.message.SIPRequest;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
@ -74,9 +75,8 @@ public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent i
}
}
@Async
public void handMessageEvent(Element element, Object data) {
String cmd = getText(element, "CmdType");
String sn = getText(element, "SN");
MessageEvent<Object> subscribe = (MessageEvent<Object>)messageSubscribe.getSubscribe(cmd + sn);

View File

@ -28,6 +28,7 @@ import javax.sip.message.Response;
import java.text.ParseException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
/**
* 状态信息(心跳)报送
@ -82,9 +83,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
if (device.isOnLine()) {
taskQueue.add(device);
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
if (statusTaskRunner.containsKey(device.getDeviceId())) {
statusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis());
}
statusTaskRunner.addTask(device.getDeviceId(), expiresTime + System.currentTimeMillis());
} else {
if (userSetting.getGbDeviceOnline() == 1) {
// 对于已经离线的设备判断他的注册是否已经过期
@ -92,7 +91,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
}
}
}
@Scheduled(fixedDelay = 1000)
@Scheduled(fixedDelay = 1000, timeUnit = TimeUnit.MILLISECONDS)
@Async
public void executeUpdateDeviceList() {
if (!taskQueue.isEmpty()) {

View File

@ -16,6 +16,7 @@ import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@ -67,17 +68,120 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
@Override
public void handForDevice(RequestEvent evt, Device device, Element element) {
taskQueue.offer(new HandlerCatchData(evt, device, element));
// 回复200 OK
try {
responseAck((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 目录查询回复: {}", e.getMessage());
}
int sn = 0;
// 全局异常捕获保证下一条可以得到处理
try {
Element rootElement = null;
try {
rootElement = getRootElement(evt, device.getCharset());
} catch (DocumentException e) {
log.error("[xml解析] 失败: ", e);
return;
}
if (rootElement == null) {
log.warn("[ 收到通道 ] content cannot be null, {}", evt.getRequest());
return;
}
Element deviceListElement = rootElement.element("DeviceList");
Element sumNumElement = rootElement.element("SumNum");
Element snElement = rootElement.element("SN");
sn = Integer.parseInt(snElement.getText());
int sumNum = Integer.parseInt(sumNumElement.getText());
if (sumNum == 0) {
log.info("[收到通道]设备:{}的: 0个", device.getDeviceId());
// 数据已经完整接收
deviceChannelService.cleanChannelsForDevice(device.getId());
// 推送空数据不然无法及时结束
catalogDataCatch.put(device.getDeviceId(), sn, 0, device,
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), sn, null);
return;
} else {
Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
if (deviceListIterator != null) {
List<DeviceChannel> channelList = new ArrayList<>();
List<Region> regionList = new ArrayList<>();
List<Group> groupList = new ArrayList<>();
// 遍历DeviceList
while (deviceListIterator.hasNext()) {
Element itemDevice = deviceListIterator.next();
Element channelDeviceElement = itemDevice.element("DeviceID");
if (channelDeviceElement == null) {
// 总数减一 避免最后总数不对 无法确定问题
continue;
}
// 从xml解析内容到 DeviceChannel 对象
DeviceChannel channel = DeviceChannel.decode(itemDevice);
if (channel.getDeviceId() == null) {
log.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
continue;
}
channel.setDataDeviceId(device.getId());
if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
channel.setParentId(null);
}
// 解析通道类型
if (channel.getDeviceId().length() <= 8) {
// 行政区划
Region region = Region.getInstance(channel);
regionList.add(region);
channel.setChannelType(1);
}else if (channel.getDeviceId().length() == 20){
// 业务分组/虚拟组织
Group group = Group.getInstance(channel);
if (group != null) {
channel.setParental(1);
channel.setChannelType(2);
groupList.add(group);
}
if (channel.getLongitude() != null && channel.getLatitude() != null && channel.getLongitude() > 0 && channel.getLatitude() > 0) {
Double[] wgs84Position = Coordtransform.GCJ02ToWGS84(channel.getLongitude(), channel.getLatitude());
channel.setGbLongitude(wgs84Position[0]);
channel.setGbLatitude(wgs84Position[1]);
}
}
channelList.add(channel);
}
catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device,
channelList, regionList, groupList);
log.info("[收到通道]设备: {} -> {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.size(device.getDeviceId(), sn), sumNum);
}
}
} catch (Exception e) {
log.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest());
log.error("[收到通道] 异常内容: ", e);
} finally {
String deviceId = device.getDeviceId();
if (catalogDataCatch.size(deviceId, sn) > 0
&& catalogDataCatch.size(deviceId, sn) == catalogDataCatch.sumNum(deviceId, sn)) {
// 数据已经完整接收 此时可能存在某个设备离线变上线的情况但是考虑到性能此处不做处理
// 目前支持设备通道上线通知时和设备上线时向上级通知
int finalSn = sn;
Thread.startVirtualThread(() -> {
boolean resetChannelsResult = saveData(device, finalSn);
if (!resetChannelsResult) {
String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(deviceId, finalSn) + "条,已接收" + catalogDataCatch.getDeviceChannelList(device.getDeviceId(), finalSn).size() + "";
catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, errorMsg);
} else {
catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, null);
}
}).start();
}
}
}
@Scheduled(fixedDelay = 50)
@Transactional
// @Scheduled(fixedDelay = 50)
// @Transactional
public void executeTaskQueue(){
if (taskQueue.isEmpty()) {
return;

View File

@ -51,13 +51,7 @@ public class DeviceStatusResponseMessageHandler extends SIPRequestProcessorParen
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 国标级联 设备状态应答回复200OK: {}", e.getMessage());
}
Element onlineElement = element.element("Online");
JSONObject json = new JSONObject();
XmlUtil.node2Json(element, json);
if (log.isDebugEnabled()) {
log.debug(json.toJSONString());
}
String text = onlineElement.getText();
String text = element.elementText("Online");
responseMessageHandler.handMessageEvent(element, text);
}