From 46e9d56c24308493cd26571f56638258f6913918 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Mon, 19 Jan 2026 09:17:54 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=9B=AE=E5=BD=95=E6=8E=A5?= =?UTF-8?q?=E6=94=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../gb28181/session/CatalogDataManager.java | 2 +- .../cmd/CatalogResponseMessageHandler.java | 127 ------------------ 2 files changed, 1 insertion(+), 128 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java index 409ea5040..b6981ff91 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java @@ -215,7 +215,7 @@ public class CatalogDataManager implements CommandLineRunner { redisTemplate.delete(key); } - @Scheduled(fixedDelay = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 + @Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时 private void timerTask(){ if (dataMap.isEmpty()) { return; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java index 57a255092..fc3771ee6 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/CatalogResponseMessageHandler.java @@ -16,8 +16,6 @@ import org.dom4j.DocumentException; import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.scheduling.annotation.Async; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -180,131 +178,6 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp } } -// @Scheduled(fixedDelay = 50) -// @Transactional - public void executeTaskQueue(){ - if (taskQueue.isEmpty()) { - return; - } - List handlerCatchDataList = new ArrayList<>(); - int size = taskQueue.size(); - for (int i = 0; i < size; i++) { - HandlerCatchData poll = taskQueue.poll(); - if (poll != null) { - handlerCatchDataList.add(poll); - } - } - if (handlerCatchDataList.isEmpty()) { - return; - } - for (HandlerCatchData take : handlerCatchDataList) { - if (take == null) { - continue; - } - RequestEvent evt = take.getEvt(); - int sn = 0; - // 全局异常捕获,保证下一条可以得到处理 - try { - Element rootElement = null; - try { - rootElement = getRootElement(take.getEvt(), take.getDevice().getCharset()); - } catch (DocumentException e) { - log.error("[xml解析] 失败: ", e); - continue; - } - if (rootElement == null) { - log.warn("[ 收到通道 ] content cannot be null, {}", evt.getRequest()); - continue; - } - Element deviceListElement = rootElement.element("DeviceList"); - Element sumNumElement = rootElement.element("SumNum"); - Element snElement = rootElement.element("SN"); - - sn = Integer.parseInt(snElement.getText()); - int sumNum = Integer.parseInt(sumNumElement.getText()); - - if (sumNum == 0) { - log.info("[收到通道]设备:{}的: 0个", take.getDevice().getDeviceId()); - // 数据已经完整接收 - deviceChannelService.cleanChannelsForDevice(take.getDevice().getId()); - // 推送空数据,不然无法及时结束 - catalogDataCatch.put(take.getDevice().getDeviceId(), sn, 0, take.getDevice(), - Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); - catalogDataCatch.setChannelSyncEnd(take.getDevice().getDeviceId(), sn, null); - return; - } else { - Iterator deviceListIterator = deviceListElement.elementIterator(); - if (deviceListIterator != null) { - List channelList = new ArrayList<>(); - List regionList = new ArrayList<>(); - List groupList = new ArrayList<>(); - // 遍历DeviceList - while (deviceListIterator.hasNext()) { - Element itemDevice = deviceListIterator.next(); - Element channelDeviceElement = itemDevice.element("DeviceID"); - if (channelDeviceElement == null) { - // 总数减一, 避免最后总数不对 无法确定问题 - continue; - } - // 从xml解析内容到 DeviceChannel 对象 - DeviceChannel channel = DeviceChannel.decode(itemDevice); - if (channel.getDeviceId() == null) { - log.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent())); - continue; - } - channel.setDataDeviceId(take.getDevice().getId()); - if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) { - channel.setParentId(null); - } - // 解析通道类型 - if (channel.getDeviceId().length() <= 8) { - // 行政区划 - Region region = Region.getInstance(channel); - regionList.add(region); - channel.setChannelType(1); - }else if (channel.getDeviceId().length() == 20){ - // 业务分组/虚拟组织 - Group group = Group.getInstance(channel); - if (group != null) { - channel.setParental(1); - channel.setChannelType(2); - groupList.add(group); - } - if (channel.getLongitude() != null && channel.getLatitude() != null && channel.getLongitude() > 0 && channel.getLatitude() > 0) { - Double[] wgs84Position = Coordtransform.GCJ02ToWGS84(channel.getLongitude(), channel.getLatitude()); - channel.setGbLongitude(wgs84Position[0]); - channel.setGbLatitude(wgs84Position[1]); - } - } - channelList.add(channel); - } - - catalogDataCatch.put(take.getDevice().getDeviceId(), sn, sumNum, take.getDevice(), - channelList, regionList, groupList); - log.info("[收到通道]设备: {} -> {}个,{}/{}", take.getDevice().getDeviceId(), channelList.size(), catalogDataCatch.size(take.getDevice().getDeviceId(), sn), sumNum); - } - } - } catch (Exception e) { - log.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest()); - log.error("[收到通道] 异常内容: ", e); - } finally { - String deviceId = take.getDevice().getDeviceId(); - if (catalogDataCatch.size(deviceId, sn) > 0 - && catalogDataCatch.size(deviceId, sn) == catalogDataCatch.sumNum(deviceId, sn)) { - // 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理, - // 目前支持设备通道上线通知时和设备上线时向上级通知 - boolean resetChannelsResult = saveData(take.getDevice(), sn); - if (!resetChannelsResult) { - String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(deviceId, sn) + "条,已接收" + catalogDataCatch.getDeviceChannelList(take.getDevice().getDeviceId(), sn).size() + "条"; - catalogDataCatch.setChannelSyncEnd(deviceId, sn, errorMsg); - } else { - catalogDataCatch.setChannelSyncEnd(deviceId, sn, null); - } - } - } - } - } - @Transactional public boolean saveData(Device device, int sn) {