diff --git a/README.md b/README.md index 6e992aee9..50928d306 100644 --- a/README.md +++ b/README.md @@ -192,7 +192,7 @@ https://gitee.com/pan648540858/wvp-GB28181-pro.git ## 付费社群 -> 付费社群即可以对作者提供支持,也可以为大家更加快速的解决问题,对星球内容不满意,三天之内退出支持自动退款。如果暂时无法加入,给项目点个星也是极大的鼓励。 +> 付费社群即可以对作者提供支持,也可以为大家更加快速的解决问题,还为正式加入星球的用户提供了微信群。对星球内容不满意,三天之内退出支持自动退款。如果暂时无法加入,给项目点个星也是极大的鼓励。 [知识星球](https://t.zsxq.com/0d8VAD3Dm)专栏列表: - [WVP 部署安全加固指南:新手必看,防范攻击与漏洞](https://articles.zsxq.com/id_tv8wz4uubx2n.html) diff --git a/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/UserManager.java b/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/UserManager.java index da5c5ed69..31766e3f3 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/UserManager.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/UserManager.java @@ -21,7 +21,7 @@ public class UserManager implements org.apache.ftpserver.ftplet.UserManager { private static final String PREFIX = "VMP_FTP_USER_"; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Override diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java index 779fe9eda..45f98e923 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java @@ -38,7 +38,7 @@ public class RedisRpcConfig implements MessageListener { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java index 3bfee31ba..aac310a2a 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java @@ -13,8 +13,8 @@ import org.springframework.data.redis.serializer.StringRedisSerializer; public class RedisTemplateConfig { @Bean - public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) { - RedisTemplate redisTemplate = new RedisTemplate<>(); + public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) { + RedisTemplate redisTemplate = new RedisTemplate<>(); // 使用fastJson序列化 GenericFastJsonRedisSerializer fastJsonRedisSerializer = new GenericFastJsonRedisSerializer(); // value值的序列化采用fastJsonRedisSerializer diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java index 1ba09b540..02d2bdb1e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java @@ -191,6 +191,8 @@ public class DeviceChannel extends CommonGBChannel { @Schema(description = "通道类型, 默认0, 0: 普通通道,1 行政区划 2 业务分组/虚拟组织") private int channelType; + private String dbKey; + private Integer dataType = ChannelDataType.GB28181; public void setPtzType(int ptzType) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceMobilePosition.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceMobilePosition.java new file mode 100755 index 000000000..ee672ae5c --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceMobilePosition.java @@ -0,0 +1,88 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.utils.DateUtil; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.dom4j.Element; +import org.springframework.util.ObjectUtils; + +import java.util.ArrayList; +import java.util.List; + +import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; + +/** + * 国标设备移动位置 + */ + +@Slf4j +@Getter +@Setter +public class DeviceMobilePosition extends MobilePosition{ + + /** + * 通道数据库自增Id + */ + private String channelDeviceId; + + + private Device device; + + + public static List decode(Device device, Element rootElementAfterCharset) { + + List mobilePositions = new ArrayList<>(); + + DeviceMobilePosition mobilePosition = new DeviceMobilePosition(); + mobilePosition.setCreateTime(DateUtil.getNow()); + mobilePosition.setDevice(device); + + String channelId = getText(rootElementAfterCharset, "DeviceID"); + + mobilePosition.setChannelDeviceId(channelId); + String time = getText(rootElementAfterCharset, "Time"); + if (ObjectUtils.isEmpty(time)){ + mobilePosition.setTimestamp(System.currentTimeMillis()); + }else { + Long timestamp = SipUtils.parseTimeForTimestamp(time); + if(timestamp == null) { + log.warn("解析移动位置时间失败:{}, 使用当前时间", time); + mobilePosition.setTimestamp(System.currentTimeMillis()); + }else { + mobilePosition.setTimestamp(timestamp); + } + } + mobilePosition.setLongitude(Double.parseDouble(getText(rootElementAfterCharset, "Longitude"))); + mobilePosition.setLatitude(Double.parseDouble(getText(rootElementAfterCharset, "Latitude"))); + if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Speed"))) { + mobilePosition.setSpeed(Double.parseDouble(getText(rootElementAfterCharset, "Speed"))); + } else { + mobilePosition.setSpeed(0.0); + } + if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Direction"))) { + mobilePosition.setDirection(Double.parseDouble(getText(rootElementAfterCharset, "Direction"))); + } else { + mobilePosition.setDirection(0.0); + } + if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Altitude"))) { + mobilePosition.setAltitude(Double.parseDouble(getText(rootElementAfterCharset, "Altitude"))); + } else { + mobilePosition.setAltitude(0.0); + } + + mobilePositions.add(mobilePosition); + + return mobilePositions; + } + + @Override + public String toString() { + return "DeviceMobilePosition{" + + "channelDeviceId='" + channelDeviceId + '\'' + + ", deviceId='" + device.getDeviceId() + '\'' + + "} " + super.toString(); + } +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/MobilePosition.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/MobilePosition.java index c739f6b2e..94050e2cd 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/MobilePosition.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/MobilePosition.java @@ -1,6 +1,17 @@ package com.genersoft.iot.vmp.gb28181.bean; +import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; +import com.genersoft.iot.vmp.gb28181.utils.SipUtils; +import com.genersoft.iot.vmp.utils.DateUtil; import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.dom4j.Element; +import org.springframework.util.ObjectUtils; + +import java.util.ArrayList; +import java.util.List; + +import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; /** * @description: 移动位置bean @@ -8,15 +19,12 @@ import lombok.Data; * @date: 2021年1月23日 */ +@Slf4j @Data public class MobilePosition { - /** - * 设备Id - */ - private String deviceId; /** - * 通道Id + * 通道数据库自增Id */ private Integer channelId; @@ -25,15 +33,10 @@ public class MobilePosition { */ private String channelDeviceId; - /** - * 设备名称 - */ - private String deviceName; - /** * 通知时间 */ - private String time; + private long timestamp; /** * 经度 @@ -60,29 +63,66 @@ public class MobilePosition { */ private double direction; - /** - * 位置信息上报来源(Mobile Position、GPS Alarm) - */ - private String reportSource; /** * 创建时间 */ private String createTime; + public static List decode(Element rootElementAfterCharset) { + + List mobilePositions = new ArrayList<>(); + + MobilePosition mobilePosition = new MobilePosition(); + mobilePosition.setCreateTime(DateUtil.getNow()); + + String channelId = getText(rootElementAfterCharset, "DeviceID"); + + mobilePosition.setChannelDeviceId(channelId); + String time = getText(rootElementAfterCharset, "Time"); + if (ObjectUtils.isEmpty(time)){ + mobilePosition.setTimestamp(System.currentTimeMillis()); + }else { + Long timestamp = SipUtils.parseTimeForTimestamp(time); + if(timestamp == null) { + log.warn("解析移动位置时间失败:{}, 使用当前时间", time); + mobilePosition.setTimestamp(System.currentTimeMillis()); + }else { + mobilePosition.setTimestamp(timestamp); + } + } + mobilePosition.setLongitude(Double.parseDouble(getText(rootElementAfterCharset, "Longitude"))); + mobilePosition.setLatitude(Double.parseDouble(getText(rootElementAfterCharset, "Latitude"))); + if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Speed"))) { + mobilePosition.setSpeed(Double.parseDouble(getText(rootElementAfterCharset, "Speed"))); + } else { + mobilePosition.setSpeed(0.0); + } + if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Direction"))) { + mobilePosition.setDirection(Double.parseDouble(getText(rootElementAfterCharset, "Direction"))); + } else { + mobilePosition.setDirection(0.0); + } + if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Altitude"))) { + mobilePosition.setAltitude(Double.parseDouble(getText(rootElementAfterCharset, "Altitude"))); + } else { + mobilePosition.setAltitude(0.0); + } + + mobilePositions.add(mobilePosition); + + return mobilePositions; + } + @Override public String toString() { return "MobilePosition{" + - "deviceId='" + deviceId + '\'' + ", channelId=" + channelId + ", channelDeviceId='" + channelDeviceId + '\'' + - ", deviceName='" + deviceName + '\'' + - ", time='" + time + '\'' + ", longitude=" + longitude + ", latitude=" + latitude + ", altitude=" + altitude + ", speed=" + speed + ", direction=" + direction + - ", reportSource='" + reportSource + '\'' + ", createTime='" + createTime + '\'' + '}'; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ShareGBChannel.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ShareGBChannel.java new file mode 100644 index 000000000..bd8d161ca --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ShareGBChannel.java @@ -0,0 +1,17 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@EqualsAndHashCode(callSuper = true) +@Data +@Schema(description = "国标共享通道") +public class ShareGBChannel extends CommonGBChannel{ + + @Schema(description = "平台ID") + private int platformId; + + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java index 0bcb9a695..e41caad6b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -4,12 +4,15 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * @author lin @@ -25,7 +28,7 @@ public class SubscribeHolder { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; private final String prefix = "VMP_SUBSCRIBE_OVERDUE"; @@ -106,15 +109,28 @@ public class SubscribeHolder { return result; } - public List getAllMobilePositionSubscribePlatform(List platformList) { + public Map getAllMobilePositionSubscribePlatform(List platformList) { if (platformList == null || platformList.isEmpty()) { - return new ArrayList<>(); + return new HashMap<>(); } - List result = new ArrayList<>(); - for (Platform platform : platformList) { - String key = String.format("%s:%s:%s", prefix, "mobilePosition", platform.getServerGBId()); - if (redisTemplate.hasKey(key)) { - result.add(platform.getServerGBId()); + Map result = new HashMap<>(); + + // 1. 先批量构建所有 key + List keys = platformList.stream() + .map(platform -> String.format("%s:%s:%s", prefix, "mobilePosition", platform.getServerGBId())) + .toList(); + + // 2. 批量查询 Redis 【关键:只发1次请求!】 + List results = redisTemplate.executePipelined((RedisCallback) connection -> { + for (String key : keys) { + // 注意:这里使用的是底层 connection 接口 + connection.keyCommands().exists(key.getBytes()); + } + return null; // 流水线模式下必须返回 null + }); + for (int i = 0; i < results.size(); i++) { + if (results.get(i) instanceof Boolean exists && exists) { + result.put(platformList.get(i).getId(), platformList.get(i)); } } return result; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/ChannelController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/ChannelController.java index 20a92a8d0..647a679e2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/ChannelController.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/ChannelController.java @@ -48,7 +48,7 @@ import java.util.concurrent.TimeUnit; public class ChannelController { @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private IGbChannelService channelService; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/MobilePositionController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/MobilePositionController.java index 768983c97..74bacf083 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/MobilePositionController.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/MobilePositionController.java @@ -48,23 +48,14 @@ public class MobilePositionController { private IDeviceService deviceService; - /** - * 查询历史轨迹 - * @param deviceId 设备ID - * @param start 开始时间 - * @param end 结束时间 - * @return - */ @Operation(summary = "查询历史轨迹", security = @SecurityRequirement(name = JwtUtils.HEADER)) - @Parameter(name = "deviceId", description = "设备国标编号", required = true) - @Parameter(name = "channelId", description = "通道国标编号") + @Parameter(name = "channelId", description = "通道的数据库ID") @Parameter(name = "start", description = "开始时间") @Parameter(name = "end", description = "结束时间") @GetMapping("/history/{deviceId}") - public List positions(@PathVariable String deviceId, - @RequestParam(required = false) String channelId, - @RequestParam(required = false) String start, - @RequestParam(required = false) String end) { + public List positions( Integer channelId, + @RequestParam(required = false) String start, + @RequestParam(required = false) String end) { if (StringUtil.isEmpty(start)) { start = null; @@ -72,19 +63,14 @@ public class MobilePositionController { if (StringUtil.isEmpty(end)) { end = null; } - return mobilePositionService.queryMobilePositions(deviceId, channelId, start, end); + return mobilePositionService.queryMobilePositions(channelId, start, end); } - /** - * 查询设备最新位置 - * @param deviceId 设备ID - * @return - */ - @Operation(summary = "查询设备最新位置", security = @SecurityRequirement(name = JwtUtils.HEADER)) - @Parameter(name = "deviceId", description = "设备国标编号", required = true) - @GetMapping("/latest/{deviceId}") - public MobilePosition latestPosition(@PathVariable String deviceId) { - return mobilePositionService.queryLatestPosition(deviceId); + @Operation(summary = "查询通道最新位置", security = @SecurityRequirement(name = JwtUtils.HEADER)) + @Parameter(name = "channelId", description = "通道的数据库ID", required = true) + @GetMapping("/latest") + public MobilePosition latestPosition(Integer channelId) { + return mobilePositionService.queryLatestPosition(channelId); } /** diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java index 3dd6df65b..d63b30cbe 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java @@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.dao; import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.DeviceMobilePosition; import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce; import com.genersoft.iot.vmp.gb28181.dao.provider.DeviceChannelProvider; import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend; @@ -9,6 +10,7 @@ import org.apache.ibatis.annotations.*; import org.springframework.stereotype.Repository; import java.util.List; +import java.util.Map; /** * 用于存储设备通道信息 @@ -617,4 +619,63 @@ public interface DeviceChannelMapper { void offlineByDeviceIds(List deviceList); + @Select(value = {" "}) + @MapKey("dbKey") + Map getAllForMobilePosition(@Param("deviceId") int deviceId, List mobilePositionList); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMobilePositionMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMobilePositionMapper.java deleted file mode 100755 index 3e09df8ce..000000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceMobilePositionMapper.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.dao; - -import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; -import org.apache.ibatis.annotations.Delete; -import org.apache.ibatis.annotations.Insert; -import org.apache.ibatis.annotations.Mapper; -import org.apache.ibatis.annotations.Param; -import org.apache.ibatis.annotations.Select; - -import java.util.List; - -@Mapper -public interface DeviceMobilePositionMapper { - - @Insert("INSERT INTO wvp_device_mobile_position (device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source,create_time)"+ - "VALUES (#{deviceId}, #{channelId}, #{deviceName}, #{time}, #{longitude}, #{latitude}, #{altitude}, #{speed}, #{direction}, #{reportSource}, #{createTime})") - int insertNewPosition(MobilePosition mobilePosition); - - @Select(value = {" "}) - List queryPositionByDeviceIdAndTime(@Param("deviceId") String deviceId, @Param("channelId") String channelId, @Param("startTime") String startTime, @Param("endTime") String endTime); - - @Select("SELECT * FROM wvp_device_mobile_position WHERE device_id = #{deviceId}" + - " ORDER BY time DESC LIMIT 1") - MobilePosition queryLatestPositionByDevice(String deviceId); - - @Delete("DELETE FROM wvp_device_mobile_position WHERE device_id = #{deviceId}") - int clearMobilePositionsByDeviceId(String deviceId); - - @Insert("") - void batchadd(List mobilePositions); - -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/MobilePositionMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/MobilePositionMapper.java new file mode 100755 index 000000000..c3c51ed93 --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/MobilePositionMapper.java @@ -0,0 +1,43 @@ +package com.genersoft.iot.vmp.gb28181.dao; + +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; +import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; + +import java.util.List; + +@Mapper +public interface MobilePositionMapper { + + @Insert("INSERT INTO wvp_mobile_position (channel_id, timestamp, longitude, latitude, altitude, speed, direction, create_time)"+ + "VALUES (#{channelId}, #{timestamp}, #{longitude}, #{latitude}, #{altitude}, #{speed}, #{direction}, #{createTime})") + int insertNewPosition(MobilePosition mobilePosition); + + @Select(value = {" "}) + List queryPositionByDeviceIdAndTime(@Param("channelId") Integer channelId, @Param("startTime") Long startTime, @Param("endTime") Long endTime); + + @Select("SELECT * FROM wvp_mobile_position WHERE channel_id = #{channelId}" + + " ORDER BY timestamp DESC LIMIT 1") + MobilePosition queryLatestPosition(@Param("channelId") Integer channelId); + + @Insert("") + void batchAdd(List mobilePositions); +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformChannelMapper.java index 8d44f8df3..cb5145351 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformChannelMapper.java @@ -548,4 +548,55 @@ public interface PlatformChannelMapper { " order by wcr.id DESC" + " ") Set queryShareRegion(Integer id); + + @Select("") + List queryShareChannelInPlatformsAndChannelIds(Collection platforms, Collection channelIds); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java index f78c5970c..b84872af9 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/EventPublisher.java @@ -1,7 +1,10 @@ package com.genersoft.iot.vmp.gb28181.event; import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.*; +import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; +import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarmNotify; +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; +import com.genersoft.iot.vmp.gb28181.bean.Platform; import com.genersoft.iot.vmp.gb28181.event.alarm.DeviceAlarmEvent; import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent; import com.genersoft.iot.vmp.gb28181.event.device.DeviceOfflineEvent; @@ -112,7 +115,13 @@ public class EventPublisher { public void mobilePositionEventPublish(MobilePosition mobilePosition) { MobilePositionEvent event = new MobilePositionEvent(this); - event.setMobilePosition(mobilePosition); + event.setMobilePositionList(List.of(mobilePosition)); + applicationEventPublisher.publishEvent(event); + } + + public void mobilePositionsEventPublish(List mobilePositionList) { + MobilePositionEvent event = new MobilePositionEvent(this); + event.setMobilePositionList(mobilePositionList); applicationEventPublisher.publishEvent(event); } @@ -122,4 +131,6 @@ public class EventPublisher { event.setDeviceIds(deviceIds); applicationEventPublisher.publishEvent(event); } + + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEvent.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEvent.java index f6a4ad759..33972b24f 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEvent.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEvent.java @@ -5,6 +5,8 @@ import lombok.Getter; import lombok.Setter; import org.springframework.context.ApplicationEvent; +import java.util.List; + public class MobilePositionEvent extends ApplicationEvent { public MobilePositionEvent(Object source) { @@ -13,5 +15,5 @@ public class MobilePositionEvent extends ApplicationEvent { @Getter @Setter - private MobilePosition mobilePosition; + private List mobilePositionList; } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java deleted file mode 100755 index 7b06f07fc..000000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java +++ /dev/null @@ -1,75 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition; - -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; -import com.genersoft.iot.vmp.gb28181.bean.Platform; -import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; -import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; -import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; -import com.genersoft.iot.vmp.gb28181.service.IPlatformService; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderForPlatform; -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationListener; -import org.springframework.stereotype.Component; - -import javax.sip.InvalidArgumentException; -import javax.sip.SipException; -import java.text.ParseException; -import java.util.List; - -/** - * 移动位置通知消息转发 - */ -@Slf4j -@Component -public class MobilePositionEventLister implements ApplicationListener { - - @Autowired - private IPlatformService platformService; - - @Autowired - private IPlatformChannelService platformChannelService; - - @Autowired - private SIPCommanderForPlatform sipCommanderForPlatform; - - @Autowired - private SubscribeHolder subscribeHolder; - - @Autowired - private UserSetting userSetting; - - @Override - public void onApplicationEvent(MobilePositionEvent event) { - if (event.getMobilePosition().getChannelId() == 0) { - return; - } - List allPlatforms = platformService.queryAll(userSetting.getServerId()); - // 获取所用订阅 - List platforms = subscribeHolder.getAllMobilePositionSubscribePlatform(allPlatforms); - if (platforms.isEmpty()) { - return; - } - List platformsForGB = platformChannelService.queryPlatFormListByChannelDeviceId(event.getMobilePosition().getChannelId(), platforms); - - for (Platform platform : platformsForGB) { - if (log.isDebugEnabled()){ - log.debug("[向上级发送MobilePosition] 通道:{},平台:{}, 位置: {}:{}", event.getMobilePosition().getChannelId(), - platform.getServerGBId(), event.getMobilePosition().getLongitude(), event.getMobilePosition().getLatitude()); - } - SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId()); - try { - GPSMsgInfo gpsMsgInfo = GPSMsgInfo.getInstance(event.getMobilePosition()); - // 获取通道编号 - CommonGBChannel commonGBChannel = platformChannelService.queryChannelByPlatformIdAndChannelId(platform.getId(), event.getMobilePosition().getChannelId()); - sipCommanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, commonGBChannel, - subscribe); - } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | - IllegalAccessException e) { - log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage()); - } - } - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceChannelService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceChannelService.java index 48f23a3f6..abec69a8d 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IDeviceChannelService.java @@ -9,7 +9,9 @@ import com.github.pagehelper.PageInfo; import jakarta.validation.constraints.NotNull; import org.dom4j.Element; +import java.util.Collection; import java.util.List; +import java.util.Map; /** * 国标通道业务类 @@ -96,4 +98,7 @@ public interface IDeviceChannelService { void queryRecordInfo(CommonGBChannel channel, String startTime, String endTime, ErrorCallback object); + Map getAllForMobilePosition(List mobilePositionList); + + void asyncBatchChannelPosition(Collection channels); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformChannelService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformChannelService.java index 0151b63a5..f29668db2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformChannelService.java @@ -50,4 +50,6 @@ public interface IPlatformChannelService { void checkRegionRemove(List channelList, List regionList); List queryByPlatformBySharChannelId(String gbId); + + void notifyMobilePosition(List handlerCatchDataList); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/ISourceOtherService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/ISourceOtherService.java index 43c84ef7a..9af336404 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/ISourceOtherService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/ISourceOtherService.java @@ -1,5 +1,9 @@ package com.genersoft.iot.vmp.gb28181.service; +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; + +import java.util.List; + /** * 资源能力接入-其他 */ @@ -7,4 +11,7 @@ public interface ISourceOtherService { Boolean closeStreamOnNoneReader(String mediaServerId, String app, String stream, String schema); + + Boolean addChannelIdForMobilePosition(List mobilePositionList); + } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java index 4b9e7214f..2a1ecbacc 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/DeviceChannelServiceImpl.java @@ -9,7 +9,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException; import com.genersoft.iot.vmp.gb28181.bean.*; import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper; -import com.genersoft.iot.vmp.gb28181.dao.DeviceMobilePositionMapper; +import com.genersoft.iot.vmp.gb28181.dao.MobilePositionMapper; import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEndEvent; @@ -19,7 +19,6 @@ import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.service.bean.Alarm; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; @@ -34,6 +33,7 @@ import jakarta.validation.constraints.NotNull; import lombok.extern.slf4j.Slf4j; import org.dom4j.Element; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -75,7 +75,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { private DeviceMapper deviceMapper; @Autowired - private DeviceMobilePositionMapper deviceMobilePositionMapper; + private MobilePositionMapper deviceMobilePositionMapper; @Autowired private UserSetting userSetting; @@ -99,7 +99,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { * 监听录像查询结束事件 */ @Async - @org.springframework.context.event.EventListener + @EventListener public void onApplicationEvent(RecordInfoEndEvent event) { SynchronousQueue queue = topicSubscribers.get("record" + event.getRecordInfo().getSn()); if (queue != null) { @@ -107,8 +107,6 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { } } - @Autowired - private ISIPCommander cmder; @Override @@ -293,7 +291,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { deviceChannel.getName(), deviceChannel.getDeviceId()); String cmdString = getText(rootElement, type.getVal()); try { - cmder.fronEndCmd(device, deviceChannel.getDeviceId(), cmdString, errorResult->{ + commander.fronEndCmd(device, deviceChannel.getDeviceId(), cmdString, errorResult->{ callback.run(errorResult.statusCode, errorResult.msg, null); }, errorResult->{ callback.run(errorResult.statusCode, errorResult.msg, null); @@ -660,4 +658,25 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService { queryRecordInfo(device, deviceChannel, startTime, endTime, callback); } + + @Override + public Map getAllForMobilePosition(List mobilePositionList) { + return channelMapper.getAllForMobilePosition(mobilePositionList.get(0).getDevice().getId(), mobilePositionList); + } + + @Override + @Async + @Transactional + public void asyncBatchChannelPosition(Collection channels) { + // 批量更新通道位置信息 + int limitCount = 500; + List channelList = new ArrayList<>(channels); + if (!channelList.isEmpty()) { + for (int i = 0; i < channelList.size(); i += limitCount) { + int end = Math.min(i + limitCount, channelList.size()); + List batchList = channelList.subList(i, end); + channelMapper.batchUpdatePosition(batchList); + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java index c3a09d782..6a2ec5847 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java @@ -74,7 +74,7 @@ public class GbChannelServiceImpl implements IGbChannelService { private DynamicTask dynamicTask; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private VectorTileCatch vectorTileCatch; @@ -212,12 +212,10 @@ public class GbChannelServiceImpl implements IGbChannelService { if (newChannel.getGbLongitude() != null && !Objects.equals(oldChannel.getGbLongitude(), newChannel.getGbLongitude()) && newChannel.getGbLatitude() != null && !Objects.equals(oldChannel.getGbLatitude(), newChannel.getGbLatitude())) { MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setDeviceId(newChannel.getGbDeviceId()); mobilePosition.setChannelId(newChannel.getGbId()); mobilePosition.setChannelDeviceId(newChannel.getGbDeviceId()); - mobilePosition.setDeviceName(newChannel.getGbName()); mobilePosition.setCreateTime(DateUtil.getNow()); - mobilePosition.setTime(DateUtil.getNow()); + mobilePosition.setTimestamp(System.currentTimeMillis()); mobilePosition.setLongitude(newChannel.getGbLongitude()); mobilePosition.setLatitude(newChannel.getGbLatitude()); eventPublisher.mobilePositionEventPublish(mobilePosition); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GroupServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GroupServiceImpl.java index f7342a7ff..6e1df3a3a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GroupServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GroupServiceImpl.java @@ -43,7 +43,7 @@ public class GroupServiceImpl implements IGroupService { private EventPublisher eventPublisher; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Override public void add(Group group) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java index dbac486b5..759c29cf6 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java @@ -33,7 +33,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService { private final Map>> inviteErrorCallbackMap = new ConcurrentHashMap<>(); @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private UserSetting userSetting; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java index 390153a48..61e7e10db 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java @@ -10,11 +10,12 @@ import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -24,6 +25,7 @@ import javax.sip.InvalidArgumentException; import javax.sip.SipException; import java.text.ParseException; import java.util.*; +import java.util.concurrent.Executors; import java.util.stream.Collectors; /** @@ -31,38 +33,29 @@ import java.util.stream.Collectors; */ @Slf4j @Service +@RequiredArgsConstructor public class PlatformChannelServiceImpl implements IPlatformChannelService { - @Autowired - private PlatformChannelMapper platformChannelMapper; + private final PlatformChannelMapper platformChannelMapper; - @Autowired - private EventPublisher eventPublisher; + private final EventPublisher eventPublisher; - @Autowired - private GroupMapper groupMapper; + private final GroupMapper groupMapper; + private final RegionMapper regionMapper; - @Autowired - private RegionMapper regionMapper; + private final CommonGBChannelMapper commonGBChannelMapper; - @Autowired - private CommonGBChannelMapper commonGBChannelMapper; + private final PlatformMapper platformMapper; - @Autowired - private PlatformMapper platformMapper; + private final ISIPCommanderForPlatform sipCommanderForPlatform; - @Autowired - private ISIPCommanderForPlatform sipCommanderFroPlatform; + private final SubscribeHolder subscribeHolder; - @Autowired - private SubscribeHolder subscribeHolder; + private final UserSetting userSetting; - @Autowired - private UserSetting userSetting; + private final IRedisRpcService redisRpcService; - @Autowired - private IRedisRpcService redisRpcService; // 监听通道信息变化 @EventListener @@ -116,7 +109,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { deviceChannel.setGbDeviceId(serverGbId); deviceChannelList.add(deviceChannel); try { - sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getMessageType().name(), platform, deviceChannelList, subscribeInfo, null); + sipCommanderForPlatform.sendNotifyForCatalogOther(event.getMessageType().name(), platform, deviceChannelList, subscribeInfo, null); } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | IllegalAccessException e) { log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage()); @@ -146,7 +139,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { CommonGBChannel deviceChannel = channelMap.get(gbId); channelList.add(deviceChannel); try { - sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(event.getMessageType().name(), platform, channelList, subscribeInfo, null); + sipCommanderForPlatform.sendNotifyForCatalogAddOrUpdate(event.getMessageType().name(), platform, channelList, subscribeInfo, null); } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | IllegalAccessException e) { log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage()); @@ -185,7 +178,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { if (!channels.isEmpty()) { log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), deviceIds); try { - sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getType(), platform, channels, subscribe, null); + sipCommanderForPlatform.sendNotifyForCatalogOther(event.getType(), platform, channels, subscribe, null); } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | IllegalAccessException e) { log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage()); @@ -205,7 +198,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { if (!deviceChannelList.isEmpty()) { log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), deviceIds); try { - sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(event.getType(), platform, deviceChannelList, subscribe, null); + sipCommanderForPlatform.sendNotifyForCatalogAddOrUpdate(event.getType(), platform, deviceChannelList, subscribe, null); } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | IllegalAccessException e) { log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage()); @@ -646,7 +639,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { SubscribeInfo subscribeInfo = SubscribeInfo.buildSimulated(platform.getServerGBId(), platform.getServerIp()); try { - sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(CatalogEvent.ADD, platform, channelList, subscribeInfo, null); + sipCommanderForPlatform.sendNotifyForCatalogAddOrUpdate(CatalogEvent.ADD, platform, channelList, subscribeInfo, null); } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | IllegalAccessException e) { log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage()); @@ -852,4 +845,52 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { } return platformChannelMapper.queryPlatFormListByChannelList(ids); } + + @Override + public void notifyMobilePosition(List mobilePositionList) { + + List allPlatforms = platformMapper.queryServerIdsWithEnableAndServer(userSetting.getServerId()); + // 获取所用订阅 + Map platformMap = subscribeHolder.getAllMobilePositionSubscribePlatform(allPlatforms); + if (platformMap.isEmpty()) { + return; + } + + // 对mobilePositionList内部的channelId分类 + Map> channelIdMap = mobilePositionList.stream().collect(Collectors.groupingBy(MobilePosition::getChannelId)); + + List shareGBChannels = platformChannelMapper.queryShareChannelInPlatformsAndChannelIds(platformMap.values(), channelIdMap.keySet()); + try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { + for (ShareGBChannel shareGBChannel : shareGBChannels) { + List mobilePositions = channelIdMap.get(shareGBChannel.getGbId()); + if (mobilePositions == null || mobilePositions.isEmpty()) { + continue; + } + executor.submit(() -> { + Platform platform = platformMap.get(shareGBChannel.getPlatformId()); + if (platform == null) { + log.info("[查询平台] 平台ID:{} 未查询到", shareGBChannel.getPlatformId()); + return; + } + SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId()); + if (subscribe == null) { + log.info("[查询订阅] 平台:{} 未查询到移动位置订阅", platform.getServerGBId()); + return; + } + for (MobilePosition mobilePosition : mobilePositions) { + try { + GPSMsgInfo gpsMsgInfo = GPSMsgInfo.getInstance(mobilePosition); + // 获取通道编号 + CommonGBChannel commonGBChannel = queryChannelByPlatformIdAndChannelId(platform.getId(), mobilePosition.getChannelId()); + sipCommanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, commonGBChannel, + subscribe); + } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | + IllegalAccessException e) { + log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage()); + } + } + }); + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/SourceOtherServiceForGbImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/SourceOtherServiceForGbImpl.java index 5bd9f3a14..48772608f 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/SourceOtherServiceForGbImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/SourceOtherServiceForGbImpl.java @@ -6,13 +6,22 @@ import com.genersoft.iot.vmp.common.InviteSessionType; import com.genersoft.iot.vmp.common.enums.ChannelDataType; import com.genersoft.iot.vmp.common.enums.MediaStreamUtil; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.Device; import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.DeviceMobilePosition; +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService; import com.genersoft.iot.vmp.gb28181.service.ISourceOtherService; +import com.genersoft.iot.vmp.utils.Coordtransform; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import org.springframework.util.CollectionUtils; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; @Slf4j @Service(ChannelDataType.OTHER_SERVICE + ChannelDataType.GB28181) @@ -52,4 +61,46 @@ public class SourceOtherServiceForGbImpl implements ISourceOtherService { } return userSetting.getStreamOnDemand(); } + + @Override + public Boolean addChannelIdForMobilePosition(List mobilePositionList) { + if (CollectionUtils.isEmpty(mobilePositionList)) { + return false; + } + if (!(mobilePositionList.get(0) instanceof DeviceMobilePosition)) { + return null; + } + List deviceMobilePositionList = mobilePositionList.stream() + .map(DeviceMobilePosition.class::cast) + .collect(Collectors.toList()); + + Map deviceChannelMap = deviceChannelService.getAllForMobilePosition(deviceMobilePositionList); + if (CollectionUtils.isEmpty(deviceChannelMap)) { + return false; + } + + // 查询通道表,为mobilePositionList赋值channelId + for (DeviceMobilePosition position : deviceMobilePositionList) { + if (position.getDevice() == null) { + continue; + } + String key = position.getDevice().getId() + "_" + position.getChannelDeviceId(); + DeviceChannel deviceChannel = deviceChannelMap.get(key); + Device device = position.getDevice(); + if (deviceChannel != null) { + position.setChannelId(deviceChannel.getId()); + + if (device.getGeoCoordSys().equalsIgnoreCase("GCJ02")) { + Double[] wgs84Position = Coordtransform.GCJ02ToWGS84(position.getLongitude(), position.getLatitude()); + position.setLongitude(wgs84Position[0]); + position.setLatitude(wgs84Position[1]); + } + deviceChannel.setLongitude(position.getLongitude()); + deviceChannel.setLatitude(position.getLatitude()); + } + } + // 批量更新通道 + deviceChannelService.asyncBatchChannelPosition(deviceChannelMap.values()); + return true; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java index a3b78d65a..08d3e8903 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java @@ -34,7 +34,7 @@ public class CatalogDataManager{ private IGroupService groupService; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; private final Map dataMap = new ConcurrentHashMap<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SipInviteSessionManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SipInviteSessionManager.java index a4468d921..80716d173 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SipInviteSessionManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SipInviteSessionManager.java @@ -20,7 +20,7 @@ public class SipInviteSessionManager { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; /** * 添加一个点播/回放的事务信息 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/SubscribeTaskRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/SubscribeTaskRunner.java index 7e70935f5..dbfef68e3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/SubscribeTaskRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/SubscribeTaskRunner.java @@ -26,7 +26,7 @@ public class SubscribeTaskRunner{ private final DelayQueue delayQueue = new DelayQueue<>(); @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private UserSetting userSetting; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformStatusTaskRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformStatusTaskRunner.java index a043d8507..a8e290ae3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformStatusTaskRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformStatusTaskRunner.java @@ -30,7 +30,7 @@ public class PlatformStatusTaskRunner { private final DelayQueue keepaliveTaskDelayQueue = new DelayQueue<>(); @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private UserSetting userSetting; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForAlarm.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForAlarm.java index c95b4d110..6f3f6a6d1 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForAlarm.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForAlarm.java @@ -109,16 +109,14 @@ public class NotifyRequestForAlarm extends SIPRequestProcessorParent { mobilePosition.setChannelId(deviceChannel.getId()); mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId()); mobilePosition.setCreateTime(DateUtil.getNow()); - mobilePosition.setDeviceId(deviceAlarmNotify.getDeviceId()); - mobilePosition.setTime(deviceAlarmNotify.getAlarmTime()); + mobilePosition.setTimestamp(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestampMs(deviceAlarmNotify.getAlarmTime())); mobilePosition.setLongitude(deviceAlarmNotify.getLongitude()); mobilePosition.setLatitude(deviceAlarmNotify.getLatitude()); - mobilePosition.setReportSource("GPS Alarm"); // 更新device channel 的经纬度 deviceChannel.setLongitude(mobilePosition.getLongitude()); deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); + deviceChannel.setGpsTime(deviceAlarmNotify.getAlarmTime()); deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index d27434434..4e816ab7e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -2,25 +2,18 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.DeviceMobilePosition; import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; -import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; -import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.service.IMobilePositionService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.utils.DateUtil; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.dom4j.DocumentException; import org.dom4j.Element; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; @@ -33,27 +26,18 @@ import java.util.concurrent.ConcurrentLinkedQueue; */ @Slf4j @Component +@RequiredArgsConstructor public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessorParent { private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - @Autowired - private UserSetting userSetting; + private final UserSetting userSetting; - @Autowired - private EventPublisher eventPublisher; + private final EventPublisher eventPublisher; - @Autowired - private IRedisCatchStorage redisCatchStorage; - - @Autowired - private IDeviceChannelService deviceChannelService; - - @Autowired - private IMobilePositionService mobilePositionService; + private final IRedisCatchStorage redisCatchStorage; public void process(RequestEvent evt) { - if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { log.error("[notify-移动位置] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue()); return; @@ -61,124 +45,56 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor taskQueue.offer(new HandlerCatchData(evt, null, null)); } - @Scheduled(fixedDelay = 200) //每200毫秒执行一次 + @Scheduled(fixedDelay = 200) @Async public void executeTaskQueue() { if (taskQueue.isEmpty()) { return; } List handlerCatchDataList = new ArrayList<>(); - while (!taskQueue.isEmpty()) { - handlerCatchDataList.add(taskQueue.poll()); + int size = taskQueue.size(); + for (int i = 0; i < size; i++) { + HandlerCatchData poll = taskQueue.poll(); + if (poll != null) { + handlerCatchDataList.add(poll); + } } if (handlerCatchDataList.isEmpty()) { return; } + List mobilePositionList = new ArrayList<>(); for (HandlerCatchData take : handlerCatchDataList) { - if (take == null) { - continue; - } RequestEvent evt = take.getEvt(); try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - long startTime = System.currentTimeMillis(); - // 回复 200 OK - Element rootElement = getRootElement(evt); - if (rootElement == null) { - log.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest()); - continue; - } Device device = redisCatchStorage.getDevice(deviceId); if (device == null) { - log.error("处理MobilePosition移动位置Notify时未获取到device,{}", deviceId); + log.error("[notify-移动位置] 未获取到device, {}", deviceId); continue; } - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setDeviceId(device.getDeviceId()); - mobilePosition.setDeviceName(device.getName()); - mobilePosition.setCreateTime(DateUtil.getNow()); - - DeviceChannel deviceChannel = null; - List elements = rootElement.elements(); - readDocument: for (Element element : elements) { - switch (element.getName()){ - case "DeviceID": - String channelId = element.getStringValue(); - deviceChannel = deviceChannelService.getOne(device.getDeviceId(), channelId); - if (deviceChannel != null) { - mobilePosition.setChannelId(deviceChannel.getId()); - mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId()); - }else { - log.error("[notify-移动位置] 未找到通道 {}/{}", device.getDeviceId(), channelId); - break readDocument; - } - break; - case "Time": - String timeVal = element.getStringValue(); - if (ObjectUtils.isEmpty(timeVal)) { - mobilePosition.setTime(DateUtil.getNow()); - } else { - mobilePosition.setTime(SipUtils.parseTime(timeVal)); - } - break; - case "Longitude": - mobilePosition.setLongitude(Double.parseDouble(element.getStringValue())); - break; - case "Latitude": - mobilePosition.setLatitude(Double.parseDouble(element.getStringValue())); - break; - case "Speed": - String speedVal = element.getStringValue(); - if (NumericUtil.isDouble(speedVal)) { - mobilePosition.setSpeed(Double.parseDouble(speedVal)); - } else { - mobilePosition.setSpeed(0.0); - } - break; - case "Direction": - String directionVal = element.getStringValue(); - if (NumericUtil.isDouble(directionVal)) { - mobilePosition.setDirection(Double.parseDouble(directionVal)); - } else { - mobilePosition.setDirection(0.0); - } - break; - case "Altitude": - String altitudeVal = element.getStringValue(); - if (NumericUtil.isDouble(altitudeVal)) { - mobilePosition.setAltitude(Double.parseDouble(altitudeVal)); - } else { - mobilePosition.setAltitude(0.0); - } - break; - - } - } - if (deviceChannel == null) { + Element rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + log.warn("[notify-移动位置] {}处理失败,未识别到信息体", deviceId); continue; } - - log.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelDeviceId(), - mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); - mobilePosition.setReportSource("Mobile Position"); - - mobilePositionService.add(mobilePosition); - // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息 - try { - eventPublisher.mobilePositionEventPublish(mobilePosition); - }catch (Exception e) { - log.error("[MobilePositionEvent] 发送失败: ", e); + List mobilePositions = DeviceMobilePosition.decode(device, rootElement); + for (DeviceMobilePosition mobilePosition : mobilePositions) { + log.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", device.getDeviceId(), mobilePosition.getChannelDeviceId(), + mobilePosition.getLongitude(), mobilePosition.getLatitude(), mobilePosition.getTimestamp()); + mobilePositionList.add(mobilePosition); } - } catch (DocumentException e) { - log.error("[收到移动位置订阅通知] 文档解析异常: \r\n{}", evt.getRequest(), e); - } catch ( Exception e) { - log.error("[收到移动位置订阅通知] 异常: ", e); + } catch (Exception e) { + log.warn("[notify-移动位置] 发现未处理的异常, \r\n{}", evt.getRequest()); + log.error("[notify-移动位置] 异常内容: ", e); + } + } + if (!mobilePositionList.isEmpty()) { + try { + eventPublisher.mobilePositionsEventPublish(mobilePositionList); + } catch (Exception e) { + log.error("[MobilePositionEvent] 发送失败: ", e); } } } -// @Scheduled(fixedRate = 10000) -// public void execute(){ -// logger.debug("[待处理Notify-移动位置订阅消息数量]: {}", taskQueue.size()); -// } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java index 9dba07c36..a250b44f6 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java @@ -114,22 +114,16 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme if (deviceChannel == null) { log.warn("[解析报警消息] 未找到通道:{}/{}", device.getDeviceId(), deviceAlarmNotify.getChannelId()); } else { - MobilePosition mobilePosition = new MobilePosition(); + DeviceMobilePosition mobilePosition = new DeviceMobilePosition(); mobilePosition.setCreateTime(DateUtil.getNow()); - mobilePosition.setDeviceId(device.getDeviceId()); mobilePosition.setChannelId(deviceChannel.getId()); mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId()); - mobilePosition.setTime(deviceAlarmNotify.getAlarmTime()); + mobilePosition.setTimestamp(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestampMs(deviceAlarmNotify.getAlarmTime())); mobilePosition.setLongitude(deviceAlarmNotify.getLongitude()); mobilePosition.setLatitude(deviceAlarmNotify.getLatitude()); - mobilePosition.setReportSource("GPS Alarm"); - - // 更新device channel 的经纬度 - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); - - deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); + mobilePosition.setDevice(device); + // 发送移动位置事件,后续会保存到数据库,并且发送给上级平台 + publisher.mobilePositionsEventPublish(List.of(mobilePosition)); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java index dcf86004d..0f345c2d9 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java @@ -1,33 +1,29 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler; -import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; -import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.utils.DateUtil; import gov.nist.javax.sip.message.SIPRequest; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.dom4j.DocumentException; import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; import javax.sip.message.Response; import java.text.ParseException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; -import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; - /** * 移动设备位置数据通知,设备主动发起,不需要上级订阅 */ @@ -36,101 +32,88 @@ import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; @RequiredArgsConstructor public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { - private final String cmdType = "MobilePosition"; + private final NotifyMessageHandler notifyMessageHandler; - @Autowired - private NotifyMessageHandler notifyMessageHandler; + private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - @Autowired - private IDeviceChannelService deviceChannelService; + private final EventPublisher eventPublisher; - private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); + private final UserSetting userSetting; - @Autowired - private TaskExecutor taskExecutor; @Override public void afterPropertiesSet() throws Exception { + String cmdType = "MobilePosition"; notifyMessageHandler.addHandler(cmdType, this); } + @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { - - boolean isEmpty = taskQueue.isEmpty(); - taskQueue.offer(new SipMsgInfo(evt, device, rootElement)); + if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { + log.error("[message-notify-移动位置] 待处理消息队列已满 {},返回486 BUSY_HERE", userSetting.getMaxNotifyCountQueue()); + return; + } + taskQueue.offer(new HandlerCatchData(evt, device, rootElement)); // 回复200 OK try { responseAckAsync((SIPRequest) evt.getRequest(), Response.OK); } catch (SipException | InvalidArgumentException | ParseException e) { log.error("[命令发送失败] 移动位置通知回复: {}", e.getMessage()); } - if (isEmpty) { - taskExecutor.execute(() -> { - while (!taskQueue.isEmpty()) { - SipMsgInfo sipMsgInfo = taskQueue.poll(); + + } + @Scheduled(fixedDelay = 400) //每400毫秒执行一次 + @Async + public void executeTaskQueue(){ + if (taskQueue.isEmpty()) { + return; + } + List handlerCatchDataList = new ArrayList<>(); + int size = taskQueue.size(); + for (int i = 0; i < size; i++) { + HandlerCatchData poll = taskQueue.poll(); + if (poll != null) { + handlerCatchDataList.add(poll); + } + } + if (handlerCatchDataList.isEmpty()) { + return; + } + List mobilePositionList = new ArrayList<>(); + for (HandlerCatchData take : handlerCatchDataList) { + if (take == null) { + continue; + } + Device device = take.getDevice(); + try { + Element rootElementAfterCharset = getRootElement(take.getEvt(), device.getCharset()); + if (rootElementAfterCharset == null) { + log.warn("[移动位置通知] {}处理失败,未识别到信息体", device.getDeviceId()); + continue; + } + List mobilePositions = DeviceMobilePosition.decode(device, rootElementAfterCharset); + for (DeviceMobilePosition mobilePosition : mobilePositions) { try { - Element rootElementAfterCharset = getRootElement(sipMsgInfo.getEvt(), sipMsgInfo.getDevice().getCharset()); - if (rootElementAfterCharset == null) { - log.warn("[移动位置通知] {}处理失败,未识别到信息体", device.getDeviceId()); - continue; - } - String channelId = getText(rootElementAfterCharset, "DeviceID"); - DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), channelId); - if (deviceChannel == null) { - log.warn("[解析移动位置通知] 未找到通道:{}/{}", device.getDeviceId(), channelId); - continue; - } - - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setCreateTime(DateUtil.getNow()); - if (!ObjectUtils.isEmpty(sipMsgInfo.getDevice().getName())) { - mobilePosition.setDeviceName(sipMsgInfo.getDevice().getName()); - } - mobilePosition.setDeviceId(sipMsgInfo.getDevice().getDeviceId()); - - mobilePosition.setChannelId(deviceChannel.getId()); - mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId()); - String time = getText(rootElementAfterCharset, "Time"); - if (ObjectUtils.isEmpty(time)){ - mobilePosition.setTime(DateUtil.getNow()); - }else { - mobilePosition.setTime(SipUtils.parseTime(time)); - } - mobilePosition.setLongitude(Double.parseDouble(getText(rootElementAfterCharset, "Longitude"))); - mobilePosition.setLatitude(Double.parseDouble(getText(rootElementAfterCharset, "Latitude"))); - if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Speed"))) { - mobilePosition.setSpeed(Double.parseDouble(getText(rootElementAfterCharset, "Speed"))); - } else { - mobilePosition.setSpeed(0.0); - } - if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Direction"))) { - mobilePosition.setDirection(Double.parseDouble(getText(rootElementAfterCharset, "Direction"))); - } else { - mobilePosition.setDirection(0.0); - } - if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Altitude"))) { - mobilePosition.setAltitude(Double.parseDouble(getText(rootElementAfterCharset, "Altitude"))); - } else { - mobilePosition.setAltitude(0.0); - } - mobilePosition.setReportSource("Mobile Position"); - - // 更新device channel 的经纬度 - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); - - deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); - - } catch (DocumentException e) { + log.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", device.getDeviceId(), mobilePosition.getChannelDeviceId(), + mobilePosition.getLongitude(), mobilePosition.getLatitude(), mobilePosition.getTimestamp()); + mobilePositionList.add(mobilePosition); + }catch (Exception e) { log.error("未处理的异常 ", e); - } catch (Exception e) { - log.warn("[移动位置通知] 发现未处理的异常, \r\n{}", evt.getRequest()); - log.error("[移动位置通知] 异常内容: ", e); } } - }); + }catch (Exception e) { + log.warn("[移动位置通知] 发现未处理的异常, \r\n{}", take.getEvt().getRequest()); + log.error("[移动位置通知] 异常内容: ", e); + } + } + // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息 + if (!mobilePositionList.isEmpty()) { + try { + eventPublisher.mobilePositionsEventPublish(mobilePositionList); + }catch (Exception e) { + log.error("[MobilePositionEvent] 发送失败: ", e); + } } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java index bbfc57c70..2256d2758 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java @@ -1,34 +1,33 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; -import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; +import com.genersoft.iot.vmp.gb28181.bean.DeviceMobilePosition; +import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; import com.genersoft.iot.vmp.gb28181.bean.Platform; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; -import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; -import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; -import com.genersoft.iot.vmp.utils.DateUtil; import gov.nist.javax.sip.message.SIPRequest; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.dom4j.DocumentException; import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; import javax.sip.message.Response; import java.text.ParseException; - -import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; /** * 移动设备位置数据查询回复 @@ -36,18 +35,20 @@ import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; */ @Slf4j @Component +@RequiredArgsConstructor public class MobilePositionResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { private final String cmdType = "MobilePosition"; - @Autowired - private ResponseMessageHandler responseMessageHandler; + private final ResponseMessageHandler responseMessageHandler; - @Autowired - private IDeviceChannelService deviceChannelService; + private final EventPublisher eventPublisher; - @Autowired - private DeferredResultHolder resultHolder; + private final UserSetting userSetting; + + private final DeferredResultHolder resultHolder; + + private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Override public void afterPropertiesSet() throws Exception { @@ -56,81 +57,66 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { - SIPRequest request = (SIPRequest) evt.getRequest(); - + if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { + log.error("[移动设备位置查询回复] 待处理消息队列已满 {},丢弃消息", userSetting.getMaxNotifyCountQueue()); + return; + } + taskQueue.offer(new HandlerCatchData(evt, device, rootElement)); try { - rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { - log.warn("[ 移动设备位置数据查询回复 ] content cannot be null, {}", evt.getRequest()); - try { - responseAckAsync(request, Response.BAD_REQUEST); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 移动设备位置数据查询 BAD_REQUEST: {}", e.getMessage()); - } - return; + responseAckAsync((SIPRequest) evt.getRequest(), Response.OK); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] 移动设备位置数据查询 200: {}", e.getMessage()); + } + } + + @Scheduled(fixedDelay = 400) + @Async + public void executeTaskQueue() { + if (taskQueue.isEmpty()) { + return; + } + List handlerCatchDataList = new ArrayList<>(); + int size = taskQueue.size(); + for (int i = 0; i < size; i++) { + HandlerCatchData poll = taskQueue.poll(); + if (poll != null) { + handlerCatchDataList.add(poll); } - String channelId = getText(rootElement, "DeviceID"); - DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), channelId); - if (deviceChannel == null) { - log.warn("[解析报警消息] 未找到通道:{}/{}", device.getDeviceId(), channelId); - }else { - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setCreateTime(DateUtil.getNow()); - if (!ObjectUtils.isEmpty(device.getName())) { - mobilePosition.setDeviceName(device.getName()); - } - mobilePosition.setDeviceId(device.getDeviceId()); - mobilePosition.setChannelId(deviceChannel.getId()); - mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId()); - //兼容ISO 8601格式时间 - String time = getText(rootElement, "Time"); - if (ObjectUtils.isEmpty(time)){ - mobilePosition.setTime(DateUtil.getNow()); - }else { - mobilePosition.setTime(SipUtils.parseTime(time)); - } - mobilePosition.setLongitude(Double.parseDouble(getText(rootElement, "Longitude"))); - mobilePosition.setLatitude(Double.parseDouble(getText(rootElement, "Latitude"))); - if (NumericUtil.isDouble(getText(rootElement, "Speed"))) { - mobilePosition.setSpeed(Double.parseDouble(getText(rootElement, "Speed"))); - } else { - mobilePosition.setSpeed(0.0); - } - if (NumericUtil.isDouble(getText(rootElement, "Direction"))) { - mobilePosition.setDirection(Double.parseDouble(getText(rootElement, "Direction"))); - } else { - mobilePosition.setDirection(0.0); - } - if (NumericUtil.isDouble(getText(rootElement, "Altitude"))) { - mobilePosition.setAltitude(Double.parseDouble(getText(rootElement, "Altitude"))); - } else { - mobilePosition.setAltitude(0.0); - } - mobilePosition.setReportSource("Mobile Position"); - - // 更新device channel 的经纬度 - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); - - deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); - - String key = DeferredResultHolder.CALLBACK_CMD_MOBILE_POSITION + device.getDeviceId(); - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setData(mobilePosition); - resultHolder.invokeAllResult(msg); - } - - //回复 200 OK + } + if (handlerCatchDataList.isEmpty()) { + return; + } + List mobilePositionList = new ArrayList<>(); + for (HandlerCatchData take : handlerCatchDataList) { + Device device = take.getDevice(); try { - responseAckAsync(request, Response.OK); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 移动设备位置数据查询 200: {}", e.getMessage()); + Element rootElementAfterCharset = getRootElement(take.getEvt(), device.getCharset()); + if (rootElementAfterCharset == null) { + log.warn("[移动设备位置查询回复] {}处理失败,未识别到信息体", device.getDeviceId()); + continue; + } + List mobilePositions = DeviceMobilePosition.decode(device, rootElementAfterCharset); + for (DeviceMobilePosition mobilePosition : mobilePositions) { + log.info("[收到移动位置查询回复]:{}/{}->{}.{}, 时间: {}", device.getDeviceId(), mobilePosition.getChannelDeviceId(), + mobilePosition.getLongitude(), mobilePosition.getLatitude(), mobilePosition.getTimestamp()); + mobilePositionList.add(mobilePosition); + String key = DeferredResultHolder.CALLBACK_CMD_MOBILE_POSITION + device.getDeviceId(); + RequestMessage msg = new RequestMessage(); + msg.setKey(key); + msg.setData(mobilePosition); + resultHolder.invokeAllResult(msg); + } + } catch (Exception e) { + log.warn("[移动设备位置查询回复] 发现未处理的异常, \r\n{}", take.getEvt().getRequest()); + log.error("[移动设备位置查询回复] 异常内容: ", e); + } + } + if (!mobilePositionList.isEmpty()) { + try { + eventPublisher.mobilePositionsEventPublish(mobilePositionList); + } catch (Exception e) { + log.error("[MobilePositionEvent] 发送失败: ", e); } - - } catch (DocumentException e) { - log.error("未处理的异常 ", e); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java index 4272bd9e3..73cb741b9 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -49,7 +49,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent private ApplicationEventPublisher applicationEventPublisher; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; private Long recordInfoTtl = 1800L; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java index 90a7e2aac..269237b77 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/utils/SipUtils.java @@ -23,6 +23,7 @@ import javax.sip.header.UserAgentHeader; import javax.sip.message.Request; import java.text.ParseException; import java.time.LocalDateTime; +import java.time.ZoneId; import java.time.format.DateTimeParseException; import java.util.ArrayList; import java.util.List; @@ -264,4 +265,24 @@ public class SipUtils { } return localDateTime.format(DateUtil.formatter); } + + public static Long parseTimeForTimestamp(String timeStr) { + if (ObjectUtils.isEmpty(timeStr)){ + return null; + } + LocalDateTime localDateTime; + try { + localDateTime = LocalDateTime.parse(timeStr); + }catch (DateTimeParseException e) { + try { + localDateTime = LocalDateTime.parse(timeStr, DateUtil.formatterISO8601); + }catch (DateTimeParseException e2) { + log.error("[格式化时间] 无法格式化时间: {}", timeStr); + return null; + } + } + // 返回毫秒值 + return localDateTime.atZone(ZoneId.of(DateUtil.zoneStr)).toInstant().toEpochMilli(); + + } } diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/SourceOtherServiceForJTImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/SourceOtherServiceForJTImpl.java index 8774effed..86f23c5fe 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/SourceOtherServiceForJTImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/SourceOtherServiceForJTImpl.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.jt1078.service.impl; import com.genersoft.iot.vmp.common.enums.ChannelDataType; import com.genersoft.iot.vmp.common.enums.MediaStreamUtil; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.gb28181.service.ISourceOtherService; import com.genersoft.iot.vmp.jt1078.service.Ijt1078PlayService; import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service; @@ -10,6 +11,8 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import java.util.List; + @Slf4j @Service(ChannelDataType.OTHER_SERVICE + ChannelDataType.JT_1078) @RequiredArgsConstructor @@ -17,8 +20,6 @@ public class SourceOtherServiceForJTImpl implements ISourceOtherService { private final UserSetting userSetting; - private final Ijt1078Service ijt1078Service; - private final Ijt1078PlayService jt1078PlayService; @Override @@ -43,4 +44,10 @@ public class SourceOtherServiceForJTImpl implements ISourceOtherService { } return false; } + + @Override + public Boolean addChannelIdForMobilePosition(List mobilePositionList) { + + return null; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java index c579e655c..98329c7d1 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java @@ -65,7 +65,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { private JT1078Template jt1078Template; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private HookSubscribe subscribe; diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java index 298cc9ad7..c4ed19df1 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java @@ -66,7 +66,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { private JT1078Template jt1078Template; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private IGbChannelService channelService; diff --git a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLHttpHookListener.java index f888d0b55..8b799b238 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLHttpHookListener.java @@ -80,7 +80,7 @@ public class ABLHttpHookListener { private SSRCFactory ssrcFactory; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private ApplicationEventPublisher applicationEventPublisher; diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 84fadd2df..0cc045ab5 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -73,7 +73,7 @@ public class MediaServerServiceImpl implements IMediaServerService { private IInviteStreamService inviteStreamService; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private Map nodeServerServiceMap; diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java b/src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java index 1643ad7e5..06535f58d 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java @@ -8,14 +8,10 @@ import java.util.List; public interface IMobilePositionService { - void add(List mobilePositionList); - - void add(MobilePosition mobilePosition); - - List queryMobilePositions(String deviceId, String channelId, String startTime, String endTime); + List queryMobilePositions(Integer channelId, String startTime, String endTime); List queryEnablePlatformListWithAsMessageChannel(); - MobilePosition queryLatestPosition(String deviceId); + MobilePosition queryLatestPosition(Integer channelId); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java b/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java index a2e0e7276..11e02b722 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service.bean; import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; +import com.genersoft.iot.vmp.utils.DateUtil; import lombok.Data; @Data @@ -61,7 +62,7 @@ public class GPSMsgInfo { gpsMsgInfo.setLat(mobilePosition.getLatitude()); gpsMsgInfo.setSpeed(mobilePosition.getSpeed()); gpsMsgInfo.setDirection(mobilePosition.getDirection()); - gpsMsgInfo.setTime(mobilePosition.getTime()); + gpsMsgInfo.setTime(DateUtil.timestampMsTo_yyyy_MM_dd_HH_mm_ss(mobilePosition.getTimestamp())); return gpsMsgInfo; } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java index cfb5b0df6..f4867629f 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java @@ -1,82 +1,55 @@ package com.genersoft.iot.vmp.service.impl; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.gb28181.bean.Platform; -import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; -import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper; -import com.genersoft.iot.vmp.gb28181.dao.DeviceMobilePositionMapper; +import com.genersoft.iot.vmp.gb28181.dao.MobilePositionMapper; import com.genersoft.iot.vmp.gb28181.dao.PlatformMapper; -import com.genersoft.iot.vmp.gb28181.utils.Coordtransform; +import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent; +import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; +import com.genersoft.iot.vmp.gb28181.service.ISourceOtherService; import com.genersoft.iot.vmp.service.IMobilePositionService; import com.genersoft.iot.vmp.utils.DateUtil; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; @Slf4j @Service +@RequiredArgsConstructor public class MobilePositionServiceImpl implements IMobilePositionService { - @Autowired - private DeviceMapper deviceMapper; + private final ConcurrentLinkedQueue mobilePositionQueue = new ConcurrentLinkedQueue<>(); - @Autowired - private DeviceChannelMapper channelMapper; + private final Map sourceOtherServiceMap; - @Autowired - private DeviceMobilePositionMapper mobilePositionMapper; - - @Autowired - private UserSetting userSetting; - - - @Autowired - private PlatformMapper platformMapper; - - @Autowired - private RedisTemplate redisTemplate; - - private final String REDIS_MOBILE_POSITION_LIST = "redis_mobile_position_list"; - - @Override - public void add(MobilePosition mobilePosition) { - List list = new ArrayList<>(); - list.add(mobilePosition); - add(list); - } - - @Override - public void add(List mobilePositionList) { - redisTemplate.opsForList().leftPushAll(REDIS_MOBILE_POSITION_LIST, mobilePositionList); - } - - private List get(int length) { - Long size = redisTemplate.opsForList().size(REDIS_MOBILE_POSITION_LIST); - if (size == null || size == 0) { - return new ArrayList<>(); - } - return redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, Math.min(length, size)); - } + private final MobilePositionMapper mobilePositionMapper; + private final IPlatformChannelService platformChannelService; + private final PlatformMapper platformMapper; /** * 查询移动位置轨迹 */ @Override - public synchronized List queryMobilePositions(String deviceId, String channelId, String startTime, String endTime) { - return mobilePositionMapper.queryPositionByDeviceIdAndTime(deviceId, channelId, startTime, endTime); + public synchronized List queryMobilePositions(Integer channelId, String startTime, String endTime) { + Long startTimestamp = null; + Long endTimestamp = null; + if (startTime != null) { + startTimestamp = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestampMs(startTime); + } + if (endTime != null) { + endTimestamp = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestampMs(endTime); + } + return mobilePositionMapper.queryPositionByDeviceIdAndTime(channelId, startTimestamp, endTimestamp); } @Override @@ -88,57 +61,60 @@ public class MobilePositionServiceImpl implements IMobilePositionService { * 查询最新移动位置 */ @Override - public MobilePosition queryLatestPosition(String deviceId) { - return mobilePositionMapper.queryLatestPositionByDevice(deviceId); + public MobilePosition queryLatestPosition(Integer channelId) { + return mobilePositionMapper.queryLatestPosition(channelId); } - @Scheduled(fixedDelay = 1000) - @Transactional - public void executeTaskQueue() { - int countLimit = 3000; - List mobilePositions = get(countLimit); - if (mobilePositions == null || mobilePositions.isEmpty()) { + @Async + @EventListener + public void onApplicationEvent(MobilePositionEvent event) { + if (event.getMobilePositionList() == null || event.getMobilePositionList().isEmpty()) { return; } - if (userSetting.getSavePositionHistory()) { - mobilePositionMapper.batchadd(mobilePositions); - } - log.info("[移动位置订阅]更新通道位置: {}", mobilePositions.size()); - Map> updateChannelMap = new HashMap<>(); - for (MobilePosition mobilePosition : mobilePositions) { - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setId(mobilePosition.getChannelId()); - deviceChannel.setDeviceId(mobilePosition.getDeviceId()); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); - deviceChannel.setUpdateTime(DateUtil.getNow()); - if (mobilePosition.getLongitude() > 0 || mobilePosition.getLatitude() > 0) { - Double[] wgs84Position = Coordtransform.GCJ02ToWGS84(mobilePosition.getLongitude(), mobilePosition.getLatitude()); - deviceChannel.setGbLongitude(wgs84Position[0]); - deviceChannel.setGbLatitude(wgs84Position[1]); - } - if (!updateChannelMap.containsKey(mobilePosition.getDeviceId())) { - updateChannelMap.put(mobilePosition.getDeviceId(), new HashMap<>()); - } - updateChannelMap.get(mobilePosition.getDeviceId()).put(mobilePosition.getChannelId(), deviceChannel); - } - List deviceIds = new ArrayList<>(updateChannelMap.keySet()); - if (deviceIds.isEmpty()) { - log.info("[移动位置订阅]为查询到对应的设备,消息已经忽略"); + if (event.getMobilePositionList().get(0).getChannelId() != null) { + mobilePositionQueue.addAll(event.getMobilePositionList()); return; } - List deviceList = deviceMapper.queryByDeviceIds(deviceIds); - for (Device device : deviceList) { - Map channelMap = updateChannelMap.get(device.getDeviceId()); - if (device.getGeoCoordSys().equalsIgnoreCase("GCJ02")) { - channelMap.values().forEach(channel -> { - Double[] wgs84Position = Coordtransform.GCJ02ToWGS84(channel.getLongitude(), channel.getLatitude()); - channel.setGbLongitude(wgs84Position[0]); - channel.setGbLatitude(wgs84Position[1]); - }); + for (ISourceOtherService sourceOtherService : sourceOtherServiceMap.values()) { + try { + // 此时已经完成了通道ID的添加,以及坐标系的转换,后续只需要将数据保存到数据库即可 + Boolean addResult = sourceOtherService.addChannelIdForMobilePosition(event.getMobilePositionList()); + if (addResult != null && addResult) { + mobilePositionQueue.addAll(event.getMobilePositionList()); + } + }catch (Exception e) { + log.error("[移动位置事件] 处理移动位置事件失败", e); } - channelMapper.batchUpdatePosition(new ArrayList<>(channelMap.values())); + } + } + + @Scheduled(fixedDelay = 500) + public void executeMobilePositionQueue() { + if (mobilePositionQueue.isEmpty()) { + return; + } + List handlerCatchDataList = new ArrayList<>(); + int size = mobilePositionQueue.size(); + for (int i = 0; i < size; i++) { + MobilePosition poll = mobilePositionQueue.poll(); + if (poll != null) { + handlerCatchDataList.add(poll); + } + } + if (handlerCatchDataList.isEmpty()) { + return; + } + List mobilePositionList = handlerCatchDataList.stream().filter( + mobilePosition -> mobilePosition.getChannelId() != 0).toList(); + // 发送通知,方便国标级联转发给上级 + Thread.startVirtualThread(() -> platformChannelService.notifyMobilePosition(mobilePositionList)); + + // 批量保存到数据库 + int batchSize = 1000; + for (int i = 0; i < mobilePositionList.size(); i += batchSize) { + int end = Math.min(i + batchSize, mobilePositionList.size()); + List batchList = mobilePositionList.subList(i, end); + mobilePositionMapper.batchAdd(batchList); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java index f14c70c84..3f3b8d840 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java @@ -53,7 +53,7 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { private HookSubscribe subscribe; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; /** * 流到来的处理 diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java index 053ff9e69..3b8ec0c7b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java @@ -27,7 +27,7 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/UserApiKeyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/UserApiKeyServiceImpl.java index 8c552b10f..3078fb72e 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/UserApiKeyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/UserApiKeyServiceImpl.java @@ -20,7 +20,7 @@ public class UserApiKeyServiceImpl implements IUserApiKeyService { UserApiKeyMapper userApiKeyMapper; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Override public int addApiKey(UserApiKey userApiKey) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java index 63fee30dd..d51484248 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; import com.genersoft.iot.vmp.utils.DateUtil; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; @@ -29,16 +30,12 @@ import java.util.stream.Collectors; */ @Slf4j @Component +@RequiredArgsConstructor public class RedisGpsMsgListener implements MessageListener { - @Autowired - private IRedisCatchStorage redisCatchStorage; + private final IRedisCatchStorage redisCatchStorage; - @Autowired - private IStreamPushService streamPushService; - - @Autowired - private IGbChannelService channelService; + private final IGbChannelService channelService; private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java index acd8e8136..8a7d3908c 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java @@ -35,7 +35,7 @@ public class RedisRpcChannelPlayController extends RpcController { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private IGbChannelService channelService; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcCloudRecordController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcCloudRecordController.java index ae6c5a171..0bcc0e5d1 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcCloudRecordController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcCloudRecordController.java @@ -26,7 +26,7 @@ public class RedisRpcCloudRecordController extends RpcController { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private ICloudRecordService cloudRecordService; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java index c4491905a..7bae7442d 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java @@ -31,7 +31,7 @@ public class RedisRpcDeviceController extends RpcController { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private IDeviceService deviceService; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDevicePlayController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDevicePlayController.java index 2e83cc71f..66302efb5 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDevicePlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDevicePlayController.java @@ -29,7 +29,7 @@ public class RedisRpcDevicePlayController extends RpcController { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private IDeviceService deviceService; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcGbDeviceController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcGbDeviceController.java index f36d34a56..807542f98 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcGbDeviceController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcGbDeviceController.java @@ -25,7 +25,7 @@ public class RedisRpcGbDeviceController extends RpcController { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private IDeviceService deviceService; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcPlatformController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcPlatformController.java index 9f7a9f8df..e9b323cc4 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcPlatformController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcPlatformController.java @@ -33,7 +33,7 @@ public class RedisRpcPlatformController extends RpcController { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private IPlatformService platformService; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamProxyController.java index 4a688c3df..4180b1c7b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamProxyController.java @@ -27,7 +27,7 @@ public class RedisRpcStreamProxyController extends RpcController { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private IStreamProxyPlayService streamProxyPlayService; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java index 84f90d3b9..5c7927568 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java @@ -46,7 +46,7 @@ public class RedisRpcStreamPushController extends RpcController { private HookSubscribe hookSubscribe; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private IStreamPushPlayService streamPushPlayService; @@ -102,7 +102,7 @@ public class RedisRpcStreamPushController extends RpcController { sendRtpItem.setLocalIp(hookData.getMediaServer().getSdpIp()); sendRtpItem.setServerId(userSetting.getServerId()); - redisTemplate.opsForValue().set(sendRtpItem.getChannelId(), sendRtpItem); + redisTemplate.opsForValue().set(sendRtpItem.getChannelId() + "", sendRtpItem); RedisRpcResponse response = request.getResponse(); response.setBody(sendRtpItem.getChannelId()); response.setStatusCode(ErrorCode.SUCCESS.getCode()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index bb040e128..7c8242bca 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -47,7 +47,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService { private SSRCFactory ssrcFactory; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private IMediaServerService mediaServerService; diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 29c730428..a744f9382 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -7,7 +7,6 @@ import com.genersoft.iot.vmp.common.SystemAllInfo; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; @@ -19,6 +18,7 @@ import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.JsonUtil; import com.genersoft.iot.vmp.utils.SystemInfoUtils; import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.jspecify.annotations.NonNull; import org.springframework.beans.factory.annotation.Autowired; @@ -31,29 +31,21 @@ import org.springframework.stereotype.Component; import java.time.Duration; import java.util.*; -@SuppressWarnings("rawtypes") @Slf4j @Component +@RequiredArgsConstructor public class RedisCatchStorageImpl implements IRedisCatchStorage { - @Autowired - private DeviceChannelMapper deviceChannelMapper; + private final DeviceMapper deviceMapper; - @Autowired - private DeviceMapper deviceMapper; + private final UserSetting userSetting; - @Autowired - private UserSetting userSetting; + private final RedisTemplate redisTemplate; - @Autowired - private RedisTemplate redisTemplate; + private final RedisTemplate longRedisTemplate; - @Autowired - private RedisTemplate longRedisTemplate; - - @Autowired - private StringRedisTemplate stringRedisTemplate; + private final StringRedisTemplate stringRedisTemplate; @Override public List queryAllSendRTPServer() { @@ -126,7 +118,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type.toUpperCase() + "_*_*_" + mediaServerId; List streams = RedisUtil.scan(redisTemplate, key); for (Object stream : streams) { - redisTemplate.delete(stream); + redisTemplate.delete((String) stream); } } @@ -268,7 +260,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { MediaInfo result = null; List keys = RedisUtil.scan(redisTemplate, scanKey); - if (keys.size() > 0) { + if (!keys.isEmpty()) { String key = (String) keys.get(0); String mediaInfoJson = (String)redisTemplate.opsForValue().get(key); result = JSON.parseObject(mediaInfoJson, MediaInfo.class); @@ -283,7 +275,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { MediaInfo result = null; List keys = RedisUtil.scan(redisTemplate, scanKey); - if (keys.size() > 0) { + if (!keys.isEmpty()) { String key = (String) keys.get(0); String mediaInfoJson = (String)redisTemplate.opsForValue().get(key); result = JSON.parseObject(mediaInfoJson, MediaInfo.class); @@ -423,7 +415,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } msg.append(" ").append(online? "ON":"OFF"); log.info("[redis通知] 推送设备/通道状态-> {} ", msg); - // 使用 RedisTemplate 发送字符串消息会导致发送的消息多带了双引号 + // 使用 RedisTemplate 发送字符串消息会导致发送的消息多带了双引号 stringRedisTemplate.convertAndSend(key, msg.toString()); } @@ -439,7 +431,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } msg.append(" ").append(add? "ADD":"DELETE"); log.info("[redis通知] 推送通道-> {}", msg); - // 使用 RedisTemplate 发送字符串消息会导致发送的消息多带了双引号 + // 使用 RedisTemplate 发送字符串消息会导致发送的消息多带了双引号 stringRedisTemplate.convertAndSend(key, msg.toString()); } diff --git a/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java b/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java index 41a031062..2438ffc68 100755 --- a/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/utils/DateUtil.java @@ -70,7 +70,7 @@ public class DateUtil { public static final DateTimeFormatter formatter1078 = DateTimeFormatter.ofPattern(PATTERN1078, Locale.getDefault()).withZone(ZoneId.of(zoneStr)); public static final DateTimeFormatter formatter1078date = DateTimeFormatter.ofPattern(PATTERN1078Date, Locale.getDefault()).withZone(ZoneId.of(zoneStr)); - public static String yyyy_MM_dd_HH_mm_ssToISO8601(@NotNull String formatTime) { + public static String yyyy_MM_dd_HH_mm_ssToISO8601(@NotNull String formatTime) { return formatterISO8601.format(formatter.parse(formatTime)); } diff --git a/src/main/java/com/genersoft/iot/vmp/utils/JsonUtil.java b/src/main/java/com/genersoft/iot/vmp/utils/JsonUtil.java index b44af109a..3cd5f3d9d 100755 --- a/src/main/java/com/genersoft/iot/vmp/utils/JsonUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/utils/JsonUtil.java @@ -24,7 +24,7 @@ public final class JsonUtil { * @param * @return result type */ - public static T redisJsonToObject(RedisTemplate redisTemplate, String key, Class clazz) { + public static T redisJsonToObject(RedisTemplate redisTemplate, String key, Class clazz) { Object jsonObject = redisTemplate.opsForValue().get(key); if (Objects.isNull(jsonObject)) { return null; @@ -32,7 +32,7 @@ public final class JsonUtil { return clazz.cast(jsonObject); } - public static T redisHashJsonToObject(RedisTemplate redisTemplate, String key, String objKey, Class clazz) { + public static T redisHashJsonToObject(RedisTemplate redisTemplate, String key, String objKey, Class clazz) { // if (key == null || objKey == null) { // return null; // } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/TestController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/TestController.java index 6df13ba79..4fddbf890 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/TestController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/TestController.java @@ -25,7 +25,7 @@ public class TestController { private HookSubscribe subscribe; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @GetMapping("/hook/list") public List all(){ diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java index 3d053fc57..d8e7da545 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java @@ -62,7 +62,7 @@ public class PsController { @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @GetMapping(value = "/receive/open") @@ -172,7 +172,7 @@ public class PsController { if (!scan.isEmpty()) { for (Object key : scan) { // 将信息写入redis中,以备后用 - redisTemplate.delete(key); + redisTemplate.delete((String) key); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java index d8f130a89..60db1ea16 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java @@ -62,7 +62,7 @@ public class RtpController { private DynamicTask dynamicTask; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @GetMapping(value = "/receive/open") @@ -185,7 +185,7 @@ public class RtpController { if (scan.size() > 0) { for (Object key : scan) { // 将信息写入redis中,以备后用 - redisTemplate.delete(key); + redisTemplate.delete((String)key); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/web/custom/service/CameraChannelService.java b/src/main/java/com/genersoft/iot/vmp/web/custom/service/CameraChannelService.java index 4bfc722ba..4956778b5 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/custom/service/CameraChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/web/custom/service/CameraChannelService.java @@ -17,6 +17,7 @@ import com.genersoft.iot.vmp.gb28181.service.IGbChannelControlService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.utils.Coordtransform; +import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.web.custom.bean.*; import com.genersoft.iot.vmp.web.custom.conf.SyTokenManager; @@ -29,9 +30,11 @@ import org.springframework.boot.CommandLineRunner; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.event.EventListener; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.util.*; +import java.util.concurrent.Executors; @Slf4j @Service @@ -51,7 +54,7 @@ public class CameraChannelService implements CommandLineRunner { private GroupMapper groupMapper; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private RedisTemplate redisTemplateForString; @@ -321,32 +324,41 @@ public class CameraChannelService implements CommandLineRunner { } // 监听GPS消息,如果是移动设备则发送redis消息 + @Async @EventListener public void onApplicationEvent(MobilePositionEvent event) { - MobilePosition mobilePosition = event.getMobilePosition(); + List mobilePositionList = event.getMobilePositionList(); + try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { + for (MobilePosition mobilePosition : mobilePositionList) { + executor.submit(() -> { + // 从redis补充信息 + SYMember member = getMember(mobilePosition.getChannelDeviceId()); + if (member == null) { + log.info("[SY-redis发送通知-移动设备位置信息] 缓存未获取 {}", mobilePosition.toString()); + return; + } - // 从redis补充信息 - SYMember member = getMember(mobilePosition.getChannelDeviceId()); - if (member == null) { - log.info("[SY-redis发送通知-移动设备位置信息] 缓存未获取 {}", mobilePosition.toString()); - return; + // 发送redis消息 + JSONObject jsonObject = new JSONObject(); + jsonObject.put("gpsDate", DateUtil.timestampMsTo_yyyy_MM_dd_HH_mm_ss(mobilePosition.getTimestamp())); + jsonObject.put("unicodeNo", member.getUnicodeNo()); + jsonObject.put("memberNo", member.getNo()); + jsonObject.put("unitNo", member.getUnitNo()); + jsonObject.put("longitude", mobilePosition.getLongitude()); + jsonObject.put("latitude", mobilePosition.getLatitude()); + jsonObject.put("altitude", mobilePosition.getAltitude()); + jsonObject.put("direction", mobilePosition.getDirection()); + jsonObject.put("speed", mobilePosition.getSpeed()); + jsonObject.put("blockId", member.getBlockId()); + jsonObject.put("gbDeviceId", mobilePosition.getChannelDeviceId()); + log.info("[SY-redis发送通知-移动设备位置信息] 发送 {}: {}", REDIS_GPS_MESSAGE, jsonObject.toString()); + redisTemplateForString.convertAndSend(REDIS_GPS_MESSAGE, jsonObject.toString()); + }); + } } - // 发送redis消息 - JSONObject jsonObject = new JSONObject(); - jsonObject.put("gpsDate", mobilePosition.getTime()); - jsonObject.put("unicodeNo", member.getUnicodeNo()); - jsonObject.put("memberNo", member.getNo()); - jsonObject.put("unitNo", member.getUnitNo()); - jsonObject.put("longitude", mobilePosition.getLongitude()); - jsonObject.put("latitude", mobilePosition.getLatitude()); - jsonObject.put("altitude", mobilePosition.getAltitude()); - jsonObject.put("direction", mobilePosition.getDirection()); - jsonObject.put("speed", mobilePosition.getSpeed()); - jsonObject.put("blockId", member.getBlockId()); - jsonObject.put("gbDeviceId", mobilePosition.getChannelDeviceId()); - log.info("[SY-redis发送通知-移动设备位置信息] 发送 {}: {}", REDIS_GPS_MESSAGE, jsonObject.toString()); - redisTemplateForString.convertAndSend(REDIS_GPS_MESSAGE, jsonObject.toString()); + + } public SYMember getMember(String deviceId) { diff --git a/src/main/java/com/genersoft/iot/vmp/web/custom/service/SyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/web/custom/service/SyServiceImpl.java index 095076849..71bff3026 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/custom/service/SyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/web/custom/service/SyServiceImpl.java @@ -23,7 +23,7 @@ import java.util.List; public class SyServiceImpl implements IMapService { @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Override public List getConfig() { diff --git a/web/src/views/common/GroupTree.vue b/web/src/views/common/GroupTree.vue index 1541b7b2c..f78f44546 100755 --- a/web/src/views/common/GroupTree.vue +++ b/web/src/views/common/GroupTree.vue @@ -487,12 +487,15 @@ export default { }, nodeClickHandler: function(data, node, tree) { console.log(data) - + console.log(data.nextData.length) + console.log(this.treeLimit) if (data && data.nextData && data.nextData.length > 0) { + const parentNode = node.parent let nextData = data.nextData if (nextData.length > this.treeLimit) { let subData = nextData.splice(0, this.treeLimit) + console.log(subData) subData.push({ treeId: '---', deviceId: '---', @@ -506,7 +509,6 @@ export default { for (let item of subData) { this.$refs.veTree.append(item, parentNode) } - } else { this.$refs.veTree.remove(data, parentNode) for (let item of nextData) { diff --git a/数据库/2.7.4/初始化-mysql-2.7.4.sql b/数据库/2.7.4/初始化-mysql-2.7.4.sql index 4c5c3273f..bfce3678c 100644 --- a/数据库/2.7.4/初始化-mysql-2.7.4.sql +++ b/数据库/2.7.4/初始化-mysql-2.7.4.sql @@ -57,20 +57,17 @@ create table IF NOT EXISTS wvp_device_alarm ); -- 存储移动位置订阅上报的数据 -drop table IF EXISTS wvp_device_mobile_position; -create table IF NOT EXISTS wvp_device_mobile_position +drop table IF EXISTS wvp_mobile_position; +create table IF NOT EXISTS wvp_mobile_position ( id serial primary key COMMENT '主键ID', - device_id character varying(50) not null COMMENT '设备ID', channel_id character varying(50) not null COMMENT '通道ID', - device_name character varying(255) COMMENT '设备名称', - time character varying(50) COMMENT '上报时间', + timestamp BIGINT COMMENT '上报时间', longitude double precision COMMENT '经度', latitude double precision COMMENT '纬度', altitude double precision COMMENT '海拔', speed double precision COMMENT '速度', direction double precision COMMENT '方向角', - report_source character varying(50) COMMENT '上报来源', create_time character varying(50) COMMENT '入库时间' ); diff --git a/数据库/2.7.4/初始化-postgresql-kingbase-2.7.4.sql b/数据库/2.7.4/初始化-postgresql-kingbase-2.7.4.sql index 2289daee2..b34b04d9a 100644 --- a/数据库/2.7.4/初始化-postgresql-kingbase-2.7.4.sql +++ b/数据库/2.7.4/初始化-postgresql-kingbase-2.7.4.sql @@ -102,34 +102,28 @@ COMMENT ON COLUMN wvp_device_alarm.alarm_type IS '报警类型'; COMMENT ON COLUMN wvp_device_alarm.create_time IS '数据入库时间'; -drop table IF EXISTS wvp_device_mobile_position; -create table IF NOT EXISTS wvp_device_mobile_position +drop table IF EXISTS wvp_mobile_position; +create table IF NOT EXISTS wvp_mobile_position ( id serial primary key, - device_id character varying(50) not null, channel_id character varying(50) not null, - device_name character varying(255), - time character varying(50), + timestamp int8 varying(50), longitude double precision, latitude double precision, altitude double precision, speed double precision, direction double precision, - report_source character varying(50), create_time character varying(50) ); COMMENT ON TABLE wvp_device_mobile_position IS '存储移动位置订阅上报的数据'; COMMENT ON COLUMN wvp_device_mobile_position.id IS '主键ID'; -COMMENT ON COLUMN wvp_device_mobile_position.device_id IS '设备ID'; COMMENT ON COLUMN wvp_device_mobile_position.channel_id IS '通道ID'; -COMMENT ON COLUMN wvp_device_mobile_position.device_name IS '设备名称'; -COMMENT ON COLUMN wvp_device_mobile_position.time IS '上报时间'; +COMMENT ON COLUMN wvp_device_mobile_position.timestamp IS '上报时间'; COMMENT ON COLUMN wvp_device_mobile_position.longitude IS '经度'; COMMENT ON COLUMN wvp_device_mobile_position.latitude IS '纬度'; COMMENT ON COLUMN wvp_device_mobile_position.altitude IS '海拔'; COMMENT ON COLUMN wvp_device_mobile_position.speed IS '速度'; COMMENT ON COLUMN wvp_device_mobile_position.direction IS '方向角'; -COMMENT ON COLUMN wvp_device_mobile_position.report_source IS '上报来源'; COMMENT ON COLUMN wvp_device_mobile_position.create_time IS '入库时间'; diff --git a/数据库/2.7.4/更新-mysql-2.7.4.sql b/数据库/2.7.4/更新-mysql-2.7.4.sql index 2ad09bc1c..7b80a5f92 100644 --- a/数据库/2.7.4/更新-mysql-2.7.4.sql +++ b/数据库/2.7.4/更新-mysql-2.7.4.sql @@ -156,6 +156,51 @@ create table IF NOT EXISTS wvp_alarm ( ); +/* +* 20260417 将wvp_device_mobile_position从专属国标的位置记录表,改为通用通道共用的位置记录表 +*/ +DELIMITER // -- 重定义分隔符避免分号冲突 +CREATE PROCEDURE `wvp_20260417`() +BEGIN + IF NOT EXISTS (SELECT column_name FROM information_schema.columns + WHERE TABLE_SCHEMA = (SELECT DATABASE()) and table_name = 'wvp_device_mobile_position' and column_name = 'timestamp') + THEN + ALTER TABLE wvp_device_mobile_position ADD timestamp BIGINT COMMENT '上报时间'; +END IF; +IF EXISTS (SELECT column_name FROM information_schema.columns + WHERE TABLE_SCHEMA = (SELECT DATABASE()) and table_name = 'wvp_device_mobile_position' and column_name = 'time') + THEN + UPDATE wvp_device_mobile_position SET timestamp = UNIX_TIMESTAMP(time) * 1000; + ALTER TABLE wvp_device_mobile_position DROP time; +END IF; +IF EXISTS (SELECT column_name FROM information_schema.columns + WHERE TABLE_SCHEMA = (SELECT DATABASE()) and table_name = 'wvp_device_mobile_position' and column_name = 'device_id') + THEN + ALTER TABLE wvp_device_mobile_position DROP device_id; +END IF; +IF EXISTS (SELECT column_name FROM information_schema.columns + WHERE TABLE_SCHEMA = (SELECT DATABASE()) and table_name = 'wvp_device_mobile_position' and column_name = 'device_name') + THEN + ALTER TABLE wvp_device_mobile_position DROP device_name; +END IF; +IF EXISTS (SELECT column_name FROM information_schema.columns + WHERE TABLE_SCHEMA = (SELECT DATABASE()) and table_name = 'wvp_device_mobile_position' and column_name = 'report_source') + THEN +ALTER TABLE wvp_device_mobile_position DROP report_source; +END IF; +-- 修改表名 +IF EXISTS (SELECT table_name FROM information_schema.tables + WHERE TABLE_SCHEMA = (SELECT DATABASE()) and table_name = 'wvp_device_mobile_position') + THEN +ALTER TABLE wvp_device_mobile_position RENAME TO wvp_mobile_position; +END IF; + +END; // +call wvp_20260417(); +DROP PROCEDURE wvp_20260417; +DELIMITER ; + + diff --git a/数据库/2.7.4/更新-postgresql-kingbase-2.7.4.sql b/数据库/2.7.4/更新-postgresql-kingbase-2.7.4.sql index 31904ec08..9e4f80878 100644 --- a/数据库/2.7.4/更新-postgresql-kingbase-2.7.4.sql +++ b/数据库/2.7.4/更新-postgresql-kingbase-2.7.4.sql @@ -69,3 +69,18 @@ COMMENT ON COLUMN wvp_alarm.longitude IS '报警附带的经度'; COMMENT ON COLUMN wvp_alarm.latitude IS '报警附带的纬度'; COMMENT ON COLUMN wvp_alarm.alarm_type IS '报警类别'; COMMENT ON COLUMN wvp_alarm.alarm_time IS '报警时间'; + + +/* +* 20260417 将 wvp_device_mobile_position从专属国标的位置记录表,改为通用通道共用的位置记录表 +*/ +ALTER TABLE wvp_device_mobile_position ADD COLUMN IF NOT EXISTS timestamp int8; +UPDATE wvp_device_mobile_position SET timestamp = EXTRACT(EPOCH FROM time::timestamp) * 1000; +ALTER TABLE wvp_device_mobile_position DROP COLUMN IF EXISTS time; +ALTER TABLE wvp_device_mobile_position DROP COLUMN IF EXISTS device_id; +ALTER TABLE wvp_device_mobile_position DROP COLUMN IF EXISTS device_name; +ALTER TABLE wvp_device_mobile_position DROP COLUMN IF EXISTS report_source; + +-- 修改表名 +ALTER TABLE wvp_device_mobile_position RENAME TO wvp_mobile_position; +COMMENT ON COLUMN wvp_mobile_position.timestamp IS '上报时间';