From fb06fa85f189ce91acd97da78224863b270e5082 Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Thu, 16 Apr 2026 22:56:09 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E7=A7=BB=E5=8A=A8=E4=BD=8D?= =?UTF-8?q?=E7=BD=AE=E9=80=9A=E7=9F=A5=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91?= =?UTF-8?q?=EF=BC=8C=E4=BC=98=E5=8C=96=E4=BB=BB=E5=8A=A1=E9=98=9F=E5=88=97?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E6=96=B9=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/gb28181/event/EventPublisher.java | 8 ++ .../mobilePosition/MobilePositionEvent.java | 6 + .../impl/DeviceChannelServiceImpl.java | 7 +- .../MobilePositionNotifyMessageHandler.java | 117 ++++++++++-------- 4 files changed, 82 insertions(+), 56 deletions(-) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index f78c5970c..8b6db0e63 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -116,10 +116,18 @@ public class EventPublisher { applicationEventPublisher.publishEvent(event); } + public void mobilePositionListEventPublish(List mobilePositionList) { + MobilePositionEvent event = new MobilePositionEvent(this); + event.setMobilePositionList(mobilePositionList); + applicationEventPublisher.publishEvent(event); + } + public void deviceOfflineEventPublish(Set deviceIds) { DeviceOfflineEvent event = new DeviceOfflineEvent(this); event.setDeviceIds(deviceIds); applicationEventPublisher.publishEvent(event); } + + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEvent.java index f6a4ad759..064f769db 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEvent.java @@ -5,6 +5,8 @@ import lombok.Getter; import lombok.Setter; import org.springframework.context.ApplicationEvent; +import java.util.List; + public class MobilePositionEvent extends ApplicationEvent { public MobilePositionEvent(Object source) { @@ -14,4 +16,8 @@ public class MobilePositionEvent extends ApplicationEvent { @Getter @Setter private MobilePosition mobilePosition; + + @Getter + @Setter + private List mobilePositionList; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java index 4b9e7214f..e33ba2868 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java @@ -34,6 +34,7 @@ import jakarta.validation.constraints.NotNull; import lombok.extern.slf4j.Slf4j; import org.dom4j.Element; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -99,7 +100,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { * 监听录像查询结束事件 */ @Async - @org.springframework.context.event.EventListener + @EventListener public void onApplicationEvent(RecordInfoEndEvent event) { SynchronousQueue queue = topicSubscribers.get("record" + event.getRecordInfo().getSn()); if (queue != null) { @@ -107,8 +108,6 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { } } - @Autowired - private ISIPCommander cmder; @Override @@ -293,7 +292,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { deviceChannel.getName(), deviceChannel.getDeviceId()); String cmdString = getText(rootElement, type.getVal()); try { - cmder.fronEndCmd(device, deviceChannel.getDeviceId(), cmdString, errorResult->{ + commander.fronEndCmd(device, deviceChannel.getDeviceId(), cmdString, errorResult->{ callback.run(errorResult.statusCode, errorResult.msg, null); }, errorResult->{ callback.run(errorResult.statusCode, errorResult.msg, null); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java index 452c6537b..650582ede 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java @@ -1,5 +1,6 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; @@ -14,6 +15,8 @@ import org.dom4j.DocumentException; import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.sip.InvalidArgumentException; @@ -21,6 +24,7 @@ import javax.sip.RequestEvent; import javax.sip.SipException; import javax.sip.message.Response; import java.text.ParseException; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -40,79 +44,88 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen private final IDeviceChannelService deviceChannelService; - private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); private final TaskExecutor taskExecutor; private final EventPublisher eventPublisher; + private final UserSetting userSetting; + + @Override public void afterPropertiesSet() throws Exception { notifyMessageHandler.addHandler(cmdType, this); } + @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { - - boolean isEmpty = taskQueue.isEmpty(); - taskQueue.offer(new SipMsgInfo(evt, device, rootElement)); + if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { + log.error("[message-notify-移动位置] 待处理消息队列已满 {},返回486 BUSY_HERE", userSetting.getMaxNotifyCountQueue()); + return; + } + taskQueue.offer(new HandlerCatchData(evt, null, null)); // 回复200 OK try { responseAckAsync((SIPRequest) evt.getRequest(), Response.OK); } catch (SipException | InvalidArgumentException | ParseException e) { log.error("[命令发送失败] 移动位置通知回复: {}", e.getMessage()); } - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - SipMsgInfo sipMsgInfo = taskQueue.poll(); - try { - Element rootElementAfterCharset = getRootElement(sipMsgInfo.getEvt(), sipMsgInfo.getDevice().getCharset()); - if (rootElementAfterCharset == null) { - log.warn("[移动位置通知] {}处理失败,未识别到信息体", device.getDeviceId()); - continue; + + } + @Scheduled(fixedDelay = 400) //每400毫秒执行一次 + @Async + public void executeTaskQueue(){ + if (taskQueue.isEmpty()) { + return; + } + + List handlerCatchDataList = new ArrayList<>(); + int size = taskQueue.size(); + for (int i = 0; i < size; i++) { + HandlerCatchData poll = taskQueue.poll(); + if (poll != null) { + handlerCatchDataList.add(poll); + } + } + if (handlerCatchDataList.isEmpty()) { + return; + } + List mobilePositionList = new ArrayList<>(); + for (HandlerCatchData take : handlerCatchDataList) { + if (take == null) { + continue; + } + Device device = take.getDevice(); + try { + Element rootElementAfterCharset = getRootElement(take.getEvt(), device.getCharset()); + if (rootElementAfterCharset == null) { + log.warn("[移动位置通知] {}处理失败,未识别到信息体", device.getDeviceId()); + List mobilePositions = MobilePosition.decode(device.getName(), device.getDeviceId(), rootElementAfterCharset); + for (MobilePosition mobilePosition : mobilePositions) { + try { + mobilePosition.setReportSource("Mobile Position"); + log.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelDeviceId(), + mobilePosition.getLongitude(), mobilePosition.getLatitude(), mobilePosition.getTime()); + mobilePositionList.add(mobilePosition); + }catch (Exception e) { + log.error("未处理的异常 ", e); } - - List mobilePositionList = MobilePosition.decode(sipMsgInfo.getDevice().getName(), sipMsgInfo.getDevice().getDeviceId(), rootElementAfterCharset); - mobilePositionList.forEach(mobilePosition -> { - try { - // 更新device channel 的经纬度 - DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), mobilePosition.getChannelDeviceId()); - if (deviceChannel == null) { - log.warn("[解析移动位置通知] 未找到通道:{}/{}", device.getDeviceId(), mobilePosition.getChannelDeviceId()); - return; - } - mobilePosition.setChannelId(deviceChannel.getId()); - mobilePosition.setReportSource("Mobile Position"); - - log.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelDeviceId(), - mobilePosition.getLongitude(), mobilePosition.getLatitude(), mobilePosition.getTime()); - - mobilePositionService.add(mobilePosition); - // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息 - try { - eventPublisher.mobilePositionEventPublish(mobilePosition); - }catch (Exception e) { - log.error("[MobilePositionEvent] 发送失败: ", e); - } - - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); - deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); - }catch (Exception e) { - log.error("未处理的异常 ", e); - } - }); - - } catch (DocumentException e) { - log.error("未处理的异常 ", e); - } catch (Exception e) { - log.warn("[移动位置通知] 发现未处理的异常, \r\n{}", evt.getRequest()); - log.error("[移动位置通知] 异常内容: ", e); } } - }); + }catch (Exception e) { + log.warn("[移动位置通知] 发现未处理的异常, \r\n{}", take.getEvt().getRequest()); + log.error("[移动位置通知] 异常内容: ", e); + } + } + // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息 + if (!mobilePositionList.isEmpty()) { + try { + eventPublisher.mobilePositionListEventPublish(mobilePositionList); + }catch (Exception e) { + log.error("[MobilePositionEvent] 发送失败: ", e); + } } }