From 775376b327a16386b0bb634ae2fc1a24aa584a3d Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Fri, 17 Apr 2026 18:07:21 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=96=B0=E5=AE=9A=E4=B9=89=E7=A7=BB?= =?UTF-8?q?=E5=8A=A8=E4=BD=8D=E7=BD=AE=E4=BA=8B=E4=BB=B6=E5=A4=84=E7=90=86?= =?UTF-8?q?=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- .../gb28181/bean/DeviceMobilePosition.java | 88 +++++++++++++++++++ .../iot/vmp/gb28181/bean/MobilePosition.java | 40 +++------ .../vmp/gb28181/dao/DeviceChannelMapper.java | 61 +++++++++++++ ...nMapper.java => MobilePositionMapper.java} | 14 +-- .../iot/vmp/gb28181/event/EventPublisher.java | 7 +- .../mobilePosition/MobilePositionEvent.java | 2 +- .../service/IDeviceChannelService.java | 5 ++ .../service/IMobilePositionService.java | 4 + .../gb28181/service/ISourceOtherService.java | 7 ++ .../impl/DeviceChannelServiceImpl.java | 26 +++++- .../service/impl/GbChannelServiceImpl.java | 4 +- .../impl/MobilePositionServiceImpl.java | 84 ++++++++++++++++++ .../impl/SourceOtherServiceForGbImpl.java | 51 +++++++++++ .../request/impl/NotifyRequestForAlarm.java | 6 +- ...tifyRequestForMobilePositionProcessor.java | 17 ++-- .../notify/cmd/AlarmNotifyMessageHandler.java | 1 - .../MobilePositionNotifyMessageHandler.java | 38 +++----- .../MobilePositionResponseMessageHandler.java | 2 - .../iot/vmp/gb28181/utils/SipUtils.java | 21 +++++ .../impl/MobilePositionServiceImpl.java | 4 +- .../com/genersoft/iot/vmp/utils/DateUtil.java | 2 +- 22 files changed, 401 insertions(+), 85 deletions(-) create mode 100755 src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceMobilePosition.java rename src/main/java/com/genersoft/iot/vmp/gb28181/dao/{DeviceMobilePositionMapper.java => MobilePositionMapper.java} (79%) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/service/IMobilePositionService.java create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/MobilePositionServiceImpl.java diff --git a/README.md b/README.md index 6e992aee9..50928d306 100644 --- a/README.md +++ b/README.md @@ -192,7 +192,7 @@ https://gitee.com/pan648540858/wvp-GB28181-pro.git ## 付费社群 -> 付费社群即可以对作者提供支持,也可以为大家更加快速的解决问题,对星球内容不满意,三天之内退出支持自动退款。如果暂时无法加入,给项目点个星也是极大的鼓励。 +> 付费社群即可以对作者提供支持,也可以为大家更加快速的解决问题,还为正式加入星球的用户提供了微信群。对星球内容不满意,三天之内退出支持自动退款。如果暂时无法加入,给项目点个星也是极大的鼓励。 [知识星球](https://t.zsxq.com/0d8VAD3Dm)专栏列表: - [WVP 部署安全加固指南:新手必看,防范攻击与漏洞](https://articles.zsxq.com/id_tv8wz4uubx2n.html) diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceMobilePosition.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceMobilePosition.java new file mode 100755 index 000000000..ee672ae5c --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceMobilePosition.java @@ -0,0 +1,88 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.utils.DateUtil; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.dom4j.Element; +import org.springframework.util.ObjectUtils; + +import java.util.ArrayList; +import java.util.List; + +import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; + +/** + * 国标设备移动位置 + */ + +@Slf4j +@Getter +@Setter +public class DeviceMobilePosition extends MobilePosition{ + + /** + * 通道数据库自增Id + */ + private String channelDeviceId; + + + private Device device; + + + public static List decode(Device device, Element rootElementAfterCharset) { + + List mobilePositions = new ArrayList<>(); + + DeviceMobilePosition mobilePosition = new DeviceMobilePosition(); + mobilePosition.setCreateTime(DateUtil.getNow()); + mobilePosition.setDevice(device); + + String channelId = getText(rootElementAfterCharset, "DeviceID"); + + mobilePosition.setChannelDeviceId(channelId); + String time = getText(rootElementAfterCharset, "Time"); + if (ObjectUtils.isEmpty(time)){ + mobilePosition.setTimestamp(System.currentTimeMillis()); + }else { + Long timestamp = SipUtils.parseTimeForTimestamp(time); + if(timestamp == null) { + log.warn("解析移动位置时间失败:{}, 使用当前时间", time); + mobilePosition.setTimestamp(System.currentTimeMillis()); + }else { + mobilePosition.setTimestamp(timestamp); + } + } + mobilePosition.setLongitude(Double.parseDouble(getText(rootElementAfterCharset, "Longitude"))); + mobilePosition.setLatitude(Double.parseDouble(getText(rootElementAfterCharset, "Latitude"))); + if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Speed"))) { + mobilePosition.setSpeed(Double.parseDouble(getText(rootElementAfterCharset, "Speed"))); + } else { + mobilePosition.setSpeed(0.0); + } + if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Direction"))) { + mobilePosition.setDirection(Double.parseDouble(getText(rootElementAfterCharset, "Direction"))); + } else { + mobilePosition.setDirection(0.0); + } + if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Altitude"))) { + mobilePosition.setAltitude(Double.parseDouble(getText(rootElementAfterCharset, "Altitude"))); + } else { + mobilePosition.setAltitude(0.0); + } + + mobilePositions.add(mobilePosition); + + return mobilePositions; + } + + @Override + public String toString() { + return "DeviceMobilePosition{" + + "channelDeviceId='" + channelDeviceId + '\'' + + ", deviceId='" + device.getDeviceId() + '\'' + + "} " + super.toString(); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/MobilePosition.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/MobilePosition.java index 38d9552c5..94050e2cd 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/MobilePosition.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/MobilePosition.java @@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; import com.genersoft.iot.vmp.utils.DateUtil; import lombok.Data; +import lombok.extern.slf4j.Slf4j; import org.dom4j.Element; import org.springframework.util.ObjectUtils; @@ -18,15 +19,12 @@ import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; * @date: 2021年1月23日 */ +@Slf4j @Data public class MobilePosition { - /** - * 设备Id - */ - private String deviceId; /** - * 通道Id + * 通道数据库自增Id */ private Integer channelId; @@ -35,15 +33,10 @@ public class MobilePosition { */ private String channelDeviceId; - /** - * 设备名称 - */ - private String deviceName; - /** * 通知时间 */ - private String time; + private long timestamp; /** * 经度 @@ -70,34 +63,32 @@ public class MobilePosition { */ private double direction; - /** - * 位置信息上报来源(Mobile Position、GPS Alarm) - */ - private String reportSource; /** * 创建时间 */ private String createTime; - public static List decode(String deviceName, String deviceId, Element rootElementAfterCharset) { + public static List decode(Element rootElementAfterCharset) { List mobilePositions = new ArrayList<>(); MobilePosition mobilePosition = new MobilePosition(); mobilePosition.setCreateTime(DateUtil.getNow()); - if (!ObjectUtils.isEmpty(deviceName)) { - mobilePosition.setDeviceName(deviceName); - } - mobilePosition.setDeviceId(deviceId); String channelId = getText(rootElementAfterCharset, "DeviceID"); mobilePosition.setChannelDeviceId(channelId); String time = getText(rootElementAfterCharset, "Time"); if (ObjectUtils.isEmpty(time)){ - mobilePosition.setTime(DateUtil.getNow()); + mobilePosition.setTimestamp(System.currentTimeMillis()); }else { - mobilePosition.setTime(SipUtils.parseTime(time)); + Long timestamp = SipUtils.parseTimeForTimestamp(time); + if(timestamp == null) { + log.warn("解析移动位置时间失败:{}, 使用当前时间", time); + mobilePosition.setTimestamp(System.currentTimeMillis()); + }else { + mobilePosition.setTimestamp(timestamp); + } } mobilePosition.setLongitude(Double.parseDouble(getText(rootElementAfterCharset, "Longitude"))); mobilePosition.setLatitude(Double.parseDouble(getText(rootElementAfterCharset, "Latitude"))); @@ -116,7 +107,6 @@ public class MobilePosition { } else { mobilePosition.setAltitude(0.0); } - mobilePosition.setReportSource("Mobile Position"); mobilePositions.add(mobilePosition); @@ -126,17 +116,13 @@ public class MobilePosition { @Override public String toString() { return "MobilePosition{" + - "deviceId='" + deviceId + '\'' + ", channelId=" + channelId + ", channelDeviceId='" + channelDeviceId + '\'' + - ", deviceName='" + deviceName + '\'' + - ", time='" + time + '\'' + ", longitude=" + longitude + ", latitude=" + latitude + ", altitude=" + altitude + ", speed=" + speed + ", direction=" + direction + - ", reportSource='" + reportSource + '\'' + ", createTime='" + createTime + '\'' + '}'; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java index b41cf183b..1e59c4154 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.dao; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.DeviceMobilePosition; import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce; import com.genersoft.iot.vmp.gb28181.dao.provider.DeviceChannelProvider; import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend; @@ -9,6 +10,7 @@ import org.apache.ibatis.annotations.*; import org.springframework.stereotype.Repository; import java.util.List; +import java.util.Map; /** * 用于存储设备通道信息 @@ -617,4 +619,63 @@ public interface DeviceChannelMapper { void offlineByDeviceIds(List deviceList); + @Select(value = {" "}) + @MapKey("deviceIdKey") + Map getAllForMobilePosition(@Param("deviceId") int deviceId, List mobilePositionList); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMobilePositionMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/MobilePositionMapper.java similarity index 79% rename from src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMobilePositionMapper.java rename to src/main/java/com/genersoft/iot/vmp/gb28181/dao/MobilePositionMapper.java index 3e09df8ce..fe28567e1 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMobilePositionMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/MobilePositionMapper.java @@ -10,14 +10,14 @@ import org.apache.ibatis.annotations.Select; import java.util.List; @Mapper -public interface DeviceMobilePositionMapper { +public interface MobilePositionMapper { - @Insert("INSERT INTO wvp_device_mobile_position (device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source,create_time)"+ + @Insert("INSERT INTO wvp_mobile_position (device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source,create_time)"+ "VALUES (#{deviceId}, #{channelId}, #{deviceName}, #{time}, #{longitude}, #{latitude}, #{altitude}, #{speed}, #{direction}, #{reportSource}, #{createTime})") int insertNewPosition(MobilePosition mobilePosition); @Select(value = {" "}) List queryPositionByDeviceIdAndTime(@Param("deviceId") String deviceId, @Param("channelId") String channelId, @Param("startTime") String startTime, @Param("endTime") String endTime); - @Select("SELECT * FROM wvp_device_mobile_position WHERE device_id = #{deviceId}" + + @Select("SELECT * FROM wvp_mobile_position WHERE device_id = #{deviceId}" + " ORDER BY time DESC LIMIT 1") MobilePosition queryLatestPositionByDevice(String deviceId); - @Delete("DELETE FROM wvp_device_mobile_position WHERE device_id = #{deviceId}") + @Delete("DELETE FROM wvp_mobile_position WHERE device_id = #{deviceId}") int clearMobilePositionsByDeviceId(String deviceId); @Insert("") void batchadd(List mobilePositions); + + void insertMobilePositions(List batchList); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index 8b6db0e63..4b9ed231a 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -1,7 +1,10 @@ package com.genersoft.iot.vmp.gb28181.event; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; +import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarmNotify; +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; +import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.event.alarm.DeviceAlarmEvent; import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent; import com.genersoft.iot.vmp.gb28181.event.device.DeviceOfflineEvent; @@ -116,7 +119,7 @@ public class EventPublisher { applicationEventPublisher.publishEvent(event); } - public void mobilePositionListEventPublish(List mobilePositionList) { + public void mobilePositionsEventPublish(List mobilePositionList) { MobilePositionEvent event = new MobilePositionEvent(this); event.setMobilePositionList(mobilePositionList); applicationEventPublisher.publishEvent(event); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEvent.java index 064f769db..ffb70a7e6 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEvent.java @@ -19,5 +19,5 @@ public class MobilePositionEvent extends ApplicationEvent { @Getter @Setter - private List mobilePositionList; + private List mobilePositionList; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceChannelService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceChannelService.java index 48f23a3f6..abec69a8d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceChannelService.java @@ -9,7 +9,9 @@ import com.github.pagehelper.PageInfo; import jakarta.validation.constraints.NotNull; import org.dom4j.Element; +import java.util.Collection; import java.util.List; +import java.util.Map; /** * 国标通道业务类 @@ -96,4 +98,7 @@ public interface IDeviceChannelService { void queryRecordInfo(CommonGBChannel channel, String startTime, String endTime, ErrorCallback object); + Map getAllForMobilePosition(List mobilePositionList); + + void asyncBatchChannelPosition(Collection channels); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IMobilePositionService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IMobilePositionService.java new file mode 100644 index 000000000..4bb104df2 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IMobilePositionService.java @@ -0,0 +1,4 @@ +package com.genersoft.iot.vmp.gb28181.service; + +public interface IMobilePositionService { +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/ISourceOtherService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/ISourceOtherService.java index 43c84ef7a..9af336404 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/ISourceOtherService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/ISourceOtherService.java @@ -1,5 +1,9 @@ package com.genersoft.iot.vmp.gb28181.service; +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; + +import java.util.List; + /** * 资源能力接入-其他 */ @@ -7,4 +11,7 @@ public interface ISourceOtherService { Boolean closeStreamOnNoneReader(String mediaServerId, String app, String stream, String schema); + + Boolean addChannelIdForMobilePosition(List mobilePositionList); + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java index e33ba2868..2a1ecbacc 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java @@ -9,7 +9,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper; -import com.genersoft.iot.vmp.gb28181.dao.DeviceMobilePositionMapper; +import com.genersoft.iot.vmp.gb28181.dao.MobilePositionMapper; import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEndEvent; @@ -19,7 +19,6 @@ import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.service.bean.Alarm; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -76,7 +75,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { private DeviceMapper deviceMapper; @Autowired - private DeviceMobilePositionMapper deviceMobilePositionMapper; + private MobilePositionMapper deviceMobilePositionMapper; @Autowired private UserSetting userSetting; @@ -659,4 +658,25 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { queryRecordInfo(device, deviceChannel, startTime, endTime, callback); } + + @Override + public Map getAllForMobilePosition(List mobilePositionList) { + return channelMapper.getAllForMobilePosition(mobilePositionList.get(0).getDevice().getId(), mobilePositionList); + } + + @Override + @Async + @Transactional + public void asyncBatchChannelPosition(Collection channels) { + // 批量更新通道位置信息 + int limitCount = 500; + List channelList = new ArrayList<>(channels); + if (!channelList.isEmpty()) { + for (int i = 0; i < channelList.size(); i += limitCount) { + int end = Math.min(i + limitCount, channelList.size()); + List batchList = channelList.subList(i, end); + channelMapper.batchUpdatePosition(batchList); + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java index 628e5e3f6..dd446d87d 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java @@ -211,12 +211,10 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne if (newChannel.getGbLongitude() != null && !Objects.equals(oldChannel.getGbLongitude(), newChannel.getGbLongitude()) && newChannel.getGbLatitude() != null && !Objects.equals(oldChannel.getGbLatitude(), newChannel.getGbLatitude())) { MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setDeviceId(newChannel.getGbDeviceId()); mobilePosition.setChannelId(newChannel.getGbId()); mobilePosition.setChannelDeviceId(newChannel.getGbDeviceId()); - mobilePosition.setDeviceName(newChannel.getGbName()); mobilePosition.setCreateTime(DateUtil.getNow()); - mobilePosition.setTime(DateUtil.getNow()); + mobilePosition.setTimestamp(System.currentTimeMillis()); mobilePosition.setLongitude(newChannel.getGbLongitude()); mobilePosition.setLatitude(newChannel.getGbLatitude()); eventPublisher.mobilePositionEventPublish(mobilePosition); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/MobilePositionServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/MobilePositionServiceImpl.java new file mode 100644 index 000000000..f27092621 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/MobilePositionServiceImpl.java @@ -0,0 +1,84 @@ +package com.genersoft.iot.vmp.gb28181.service.impl; + +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; +import com.genersoft.iot.vmp.gb28181.dao.MobilePositionMapper; +import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent; +import com.genersoft.iot.vmp.gb28181.service.IMobilePositionService; +import com.genersoft.iot.vmp.gb28181.service.ISourceOtherService; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; + +@Slf4j +@Service +@RequiredArgsConstructor +public class MobilePositionServiceImpl implements IMobilePositionService { + + + private final ConcurrentLinkedQueue mobilePositionQueue = new ConcurrentLinkedQueue<>(); + + private final Map sourceOtherServiceMap; + + private final MobilePositionMapper mobilePositionMapper; + + @PostConstruct + public void init() { + + } + + @Async + @EventListener + public void onApplicationEvent(MobilePositionEvent event) { + if (event.getMobilePositionList() == null || event.getMobilePositionList().isEmpty()) { + return; + } + for (ISourceOtherService sourceOtherService : sourceOtherServiceMap.values()) { + try { + Boolean addResult = sourceOtherService.addChannelIdForMobilePosition(event.getMobilePositionList()); + if (addResult != null && addResult) { + mobilePositionQueue.addAll(event.getMobilePositionList()); + } + }catch (Exception e) { + log.error("[移动位置事件] 处理移动位置事件失败", e); + } + } + } + + @Scheduled(fixedDelay = 500) + public void executeMobilePositionQueue() { + if (mobilePositionQueue.isEmpty()) { + return; + } + List handlerCatchDataList = new ArrayList<>(); + int size = mobilePositionQueue.size(); + for (int i = 0; i < size; i++) { + MobilePosition poll = mobilePositionQueue.poll(); + if (poll != null) { + handlerCatchDataList.add(poll); + } + } + if (handlerCatchDataList.isEmpty()) { + return; + } + // TODO 发送通知,方便国标级联转发给上级 + + + // 批量保存到数据库 + int batchSize = 1000; + for (int i = 0; i < handlerCatchDataList.size(); i += batchSize) { + int end = Math.min(i + batchSize, handlerCatchDataList.size()); + List batchList = handlerCatchDataList.subList(i, end); + mobilePositionMapper.insertMobilePositions(batchList); + } + } + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/SourceOtherServiceForGbImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/SourceOtherServiceForGbImpl.java index 5bd9f3a14..48772608f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/SourceOtherServiceForGbImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/SourceOtherServiceForGbImpl.java @@ -6,13 +6,22 @@ import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.enums.ChannelDataType; import com.genersoft.iot.vmp.common.enums.MediaStreamUtil; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.DeviceMobilePosition; +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.service.ISourceOtherService; +import com.genersoft.iot.vmp.utils.Coordtransform; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; @Slf4j @Service(ChannelDataType.OTHER_SERVICE + ChannelDataType.GB28181) @@ -52,4 +61,46 @@ public class SourceOtherServiceForGbImpl implements ISourceOtherService { } return userSetting.getStreamOnDemand(); } + + @Override + public Boolean addChannelIdForMobilePosition(List mobilePositionList) { + if (CollectionUtils.isEmpty(mobilePositionList)) { + return false; + } + if (!(mobilePositionList.get(0) instanceof DeviceMobilePosition)) { + return null; + } + List deviceMobilePositionList = mobilePositionList.stream() + .map(DeviceMobilePosition.class::cast) + .collect(Collectors.toList()); + + Map deviceChannelMap = deviceChannelService.getAllForMobilePosition(deviceMobilePositionList); + if (CollectionUtils.isEmpty(deviceChannelMap)) { + return false; + } + + // 查询通道表,为mobilePositionList赋值channelId + for (DeviceMobilePosition position : deviceMobilePositionList) { + if (position.getDevice() == null) { + continue; + } + String key = position.getDevice().getId() + "_" + position.getChannelDeviceId(); + DeviceChannel deviceChannel = deviceChannelMap.get(key); + Device device = position.getDevice(); + if (deviceChannel != null) { + position.setChannelId(deviceChannel.getId()); + + if (device.getGeoCoordSys().equalsIgnoreCase("GCJ02")) { + Double[] wgs84Position = Coordtransform.GCJ02ToWGS84(position.getLongitude(), position.getLatitude()); + position.setLongitude(wgs84Position[0]); + position.setLatitude(wgs84Position[1]); + } + deviceChannel.setLongitude(position.getLongitude()); + deviceChannel.setLatitude(position.getLatitude()); + } + } + // 批量更新通道 + deviceChannelService.asyncBatchChannelPosition(deviceChannelMap.values()); + return true; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForAlarm.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForAlarm.java index c95b4d110..6f3f6a6d1 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForAlarm.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForAlarm.java @@ -109,16 +109,14 @@ public class NotifyRequestForAlarm extends SIPRequestProcessorParent { mobilePosition.setChannelId(deviceChannel.getId()); mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId()); mobilePosition.setCreateTime(DateUtil.getNow()); - mobilePosition.setDeviceId(deviceAlarmNotify.getDeviceId()); - mobilePosition.setTime(deviceAlarmNotify.getAlarmTime()); + mobilePosition.setTimestamp(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestampMs(deviceAlarmNotify.getAlarmTime())); mobilePosition.setLongitude(deviceAlarmNotify.getLongitude()); mobilePosition.setLatitude(deviceAlarmNotify.getLatitude()); - mobilePosition.setReportSource("GPS Alarm"); // 更新device channel 的经纬度 deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); + deviceChannel.setGpsTime(deviceAlarmNotify.getAlarmTime()); deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index d27434434..48f53d96b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -95,8 +95,6 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor continue; } MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setDeviceId(device.getDeviceId()); - mobilePosition.setDeviceName(device.getName()); mobilePosition.setCreateTime(DateUtil.getNow()); DeviceChannel deviceChannel = null; @@ -116,10 +114,16 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor break; case "Time": String timeVal = element.getStringValue(); - if (ObjectUtils.isEmpty(timeVal)) { - mobilePosition.setTime(DateUtil.getNow()); - } else { - mobilePosition.setTime(SipUtils.parseTime(timeVal)); + if (ObjectUtils.isEmpty(timeVal)){ + mobilePosition.setTimestamp(System.currentTimeMillis()); + }else { + Long timestamp = SipUtils.parseTimeForTimestamp(time); + if(timestamp == null) { + log.warn("解析移动位置时间失败:{}, 使用当前时间", time); + mobilePosition.setTimestamp(System.currentTimeMillis()); + }else { + mobilePosition.setTimestamp(timestamp); + } } break; case "Longitude": @@ -161,7 +165,6 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor log.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelDeviceId(), mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); - mobilePosition.setReportSource("Mobile Position"); mobilePositionService.add(mobilePosition); // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java index 9dba07c36..f54c536d3 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java @@ -122,7 +122,6 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme mobilePosition.setTime(deviceAlarmNotify.getAlarmTime()); mobilePosition.setLongitude(deviceAlarmNotify.getLongitude()); mobilePosition.setLatitude(deviceAlarmNotify.getLatitude()); - mobilePosition.setReportSource("GPS Alarm"); // 更新device channel 的经纬度 deviceChannel.setLongitude(mobilePosition.getLongitude()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java index 650582ede..c7b867b95 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java @@ -3,18 +3,14 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler; -import com.genersoft.iot.vmp.service.IMobilePositionService; import gov.nist.javax.sip.message.SIPRequest; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.dom4j.DocumentException; import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; -import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -36,18 +32,10 @@ import java.util.concurrent.ConcurrentLinkedQueue; @RequiredArgsConstructor public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { - private final String cmdType = "MobilePosition"; - private final NotifyMessageHandler notifyMessageHandler; - private final IMobilePositionService mobilePositionService; - - private final IDeviceChannelService deviceChannelService; - private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - private final TaskExecutor taskExecutor; - private final EventPublisher eventPublisher; private final UserSetting userSetting; @@ -55,6 +43,7 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen @Override public void afterPropertiesSet() throws Exception { + String cmdType = "MobilePosition"; notifyMessageHandler.addHandler(cmdType, this); } @@ -80,7 +69,6 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen if (taskQueue.isEmpty()) { return; } - List handlerCatchDataList = new ArrayList<>(); int size = taskQueue.size(); for (int i = 0; i < size; i++) { @@ -92,7 +80,7 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen if (handlerCatchDataList.isEmpty()) { return; } - List mobilePositionList = new ArrayList<>(); + List mobilePositionList = new ArrayList<>(); for (HandlerCatchData take : handlerCatchDataList) { if (take == null) { continue; @@ -102,16 +90,16 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen Element rootElementAfterCharset = getRootElement(take.getEvt(), device.getCharset()); if (rootElementAfterCharset == null) { log.warn("[移动位置通知] {}处理失败,未识别到信息体", device.getDeviceId()); - List mobilePositions = MobilePosition.decode(device.getName(), device.getDeviceId(), rootElementAfterCharset); - for (MobilePosition mobilePosition : mobilePositions) { - try { - mobilePosition.setReportSource("Mobile Position"); - log.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelDeviceId(), - mobilePosition.getLongitude(), mobilePosition.getLatitude(), mobilePosition.getTime()); - mobilePositionList.add(mobilePosition); - }catch (Exception e) { - log.error("未处理的异常 ", e); - } + continue; + } + List mobilePositions = DeviceMobilePosition.decode(device, rootElementAfterCharset); + for (DeviceMobilePosition mobilePosition : mobilePositions) { + try { + log.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", device.getDeviceId(), mobilePosition.getChannelDeviceId(), + mobilePosition.getLongitude(), mobilePosition.getLatitude(), mobilePosition.getTimestamp()); + mobilePositionList.add(mobilePosition); + }catch (Exception e) { + log.error("未处理的异常 ", e); } } }catch (Exception e) { @@ -122,7 +110,7 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息 if (!mobilePositionList.isEmpty()) { try { - eventPublisher.mobilePositionListEventPublish(mobilePositionList); + eventPublisher.mobilePositionsEventPublish(mobilePositionList); }catch (Exception e) { log.error("[MobilePositionEvent] 发送失败: ", e); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java index bbfc57c70..e0c214a23 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java @@ -79,7 +79,6 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar if (!ObjectUtils.isEmpty(device.getName())) { mobilePosition.setDeviceName(device.getName()); } - mobilePosition.setDeviceId(device.getDeviceId()); mobilePosition.setChannelId(deviceChannel.getId()); mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId()); //兼容ISO 8601格式时间 @@ -106,7 +105,6 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar } else { mobilePosition.setAltitude(0.0); } - mobilePosition.setReportSource("Mobile Position"); // 更新device channel 的经纬度 deviceChannel.setLongitude(mobilePosition.getLongitude()); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java index 90a7e2aac..269237b77 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java @@ -23,6 +23,7 @@ import javax.sip.header.UserAgentHeader; import javax.sip.message.Request; import java.text.ParseException; import java.time.LocalDateTime; +import java.time.ZoneId; import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.List; @@ -264,4 +265,24 @@ public class SipUtils { } return localDateTime.format(DateUtil.formatter); } + + public static Long parseTimeForTimestamp(String timeStr) { + if (ObjectUtils.isEmpty(timeStr)){ + return null; + } + LocalDateTime localDateTime; + try { + localDateTime = LocalDateTime.parse(timeStr); + }catch (DateTimeParseException e) { + try { + localDateTime = LocalDateTime.parse(timeStr, DateUtil.formatterISO8601); + }catch (DateTimeParseException e2) { + log.error("[格式化时间] 无法格式化时间: {}", timeStr); + return null; + } + } + // 返回毫秒值 + return localDateTime.atZone(ZoneId.of(DateUtil.zoneStr)).toInstant().toEpochMilli(); + + } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java index cfb5b0df6..3792cd898 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java @@ -7,7 +7,7 @@ import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper; -import com.genersoft.iot.vmp.gb28181.dao.DeviceMobilePositionMapper; +import com.genersoft.iot.vmp.gb28181.dao.MobilePositionMapper; import com.genersoft.iot.vmp.gb28181.dao.PlatformMapper; import com.genersoft.iot.vmp.gb28181.utils.Coordtransform; import com.genersoft.iot.vmp.service.IMobilePositionService; @@ -35,7 +35,7 @@ public class MobilePositionServiceImpl implements IMobilePositionService { private DeviceChannelMapper channelMapper; @Autowired - private DeviceMobilePositionMapper mobilePositionMapper; + private MobilePositionMapper mobilePositionMapper; @Autowired private UserSetting userSetting; diff --git a/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java b/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java index 41a031062..2438ffc68 100755 --- a/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java @@ -70,7 +70,7 @@ public class DateUtil { public static final DateTimeFormatter formatter1078 = DateTimeFormatter.ofPattern(PATTERN1078, Locale.getDefault()).withZone(ZoneId.of(zoneStr)); public static final DateTimeFormatter formatter1078date = DateTimeFormatter.ofPattern(PATTERN1078Date, Locale.getDefault()).withZone(ZoneId.of(zoneStr)); - public static String yyyy_MM_dd_HH_mm_ssToISO8601(@NotNull String formatTime) { + public static String yyyy_MM_dd_HH_mm_ssToISO8601(@NotNull String formatTime) { return formatterISO8601.format(formatter.parse(formatTime)); }