From 9909aa96552c54725ca1d3db0de3f10f22a6ce0e Mon Sep 17 00:00:00 2001 From: lin <648540858@qq.com> Date: Sat, 18 Apr 2026 23:31:47 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E7=A7=BB=E5=8A=A8=E4=BD=8D?= =?UTF-8?q?=E7=BD=AE=E7=9B=B8=E5=85=B3=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../iot/vmp/conf/ftpServer/UserManager.java | 2 +- .../iot/vmp/conf/redis/RedisRpcConfig.java | 2 +- .../vmp/conf/redis/RedisTemplateConfig.java | 4 +- .../iot/vmp/gb28181/bean/DeviceChannel.java | 2 + .../iot/vmp/gb28181/bean/ShareGBChannel.java | 17 ++ .../iot/vmp/gb28181/bean/SubscribeHolder.java | 32 +++- .../gb28181/controller/ChannelController.java | 2 +- .../controller/MobilePositionController.java | 34 ++-- .../vmp/gb28181/dao/DeviceChannelMapper.java | 8 +- .../vmp/gb28181/dao/MobilePositionMapper.java | 36 ++-- .../gb28181/dao/PlatformChannelMapper.java | 51 ++++++ .../MobilePositionEventLister.java | 75 -------- .../service/IMobilePositionService.java | 4 - .../service/IPlatformChannelService.java | 2 + .../service/impl/GbChannelServiceImpl.java | 2 +- .../service/impl/GroupServiceImpl.java | 2 +- .../service/impl/InviteStreamServiceImpl.java | 2 +- .../impl/MobilePositionServiceImpl.java | 84 --------- .../impl/PlatformChannelServiceImpl.java | 93 +++++++--- .../gb28181/session/CatalogDataManager.java | 2 +- .../session/SipInviteSessionManager.java | 2 +- .../deviceSubscribe/SubscribeTaskRunner.java | 2 +- .../PlatformStatusTaskRunner.java | 2 +- ...tifyRequestForMobilePositionProcessor.java | 153 ++++------------- .../notify/cmd/AlarmNotifyMessageHandler.java | 15 +- .../MobilePositionNotifyMessageHandler.java | 2 +- .../MobilePositionResponseMessageHandler.java | 160 ++++++++---------- .../cmd/RecordInfoResponseMessageHandler.java | 2 +- .../impl/SourceOtherServiceForJTImpl.java | 11 +- .../service/impl/jt1078PlayServiceImpl.java | 2 +- .../service/impl/jt1078ServiceImpl.java | 2 +- .../vmp/media/abl/ABLHttpHookListener.java | 2 +- .../service/impl/MediaServerServiceImpl.java | 2 +- .../vmp/service/IMobilePositionService.java | 8 +- .../iot/vmp/service/bean/GPSMsgInfo.java | 3 +- .../impl/MobilePositionServiceImpl.java | 156 +++++++---------- .../service/impl/RtpServerServiceImpl.java | 2 +- .../impl/SendRtpServerServiceImpl.java | 2 +- .../service/impl/UserApiKeyServiceImpl.java | 2 +- .../service/redisMsg/RedisGpsMsgListener.java | 11 +- .../RedisRpcChannelPlayController.java | 2 +- .../RedisRpcCloudRecordController.java | 2 +- .../control/RedisRpcDeviceController.java | 2 +- .../control/RedisRpcDevicePlayController.java | 2 +- .../control/RedisRpcGbDeviceController.java | 2 +- .../control/RedisRpcPlatformController.java | 2 +- .../RedisRpcStreamProxyController.java | 2 +- .../control/RedisRpcStreamPushController.java | 4 +- .../redisMsg/service/RedisRpcServiceImpl.java | 2 +- .../storager/impl/RedisCatchStorageImpl.java | 32 ++-- .../com/genersoft/iot/vmp/utils/JsonUtil.java | 4 +- .../iot/vmp/vmanager/TestController.java | 2 +- .../iot/vmp/vmanager/ps/PsController.java | 4 +- .../iot/vmp/vmanager/rtp/RtpController.java | 4 +- .../custom/service/CameraChannelService.java | 5 +- .../vmp/web/custom/service/SyServiceImpl.java | 2 +- 56 files changed, 438 insertions(+), 632 deletions(-) create mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/bean/ShareGBChannel.java delete mode 100755 src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java delete mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/service/IMobilePositionService.java delete mode 100644 src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/MobilePositionServiceImpl.java diff --git a/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/UserManager.java b/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/UserManager.java index da5c5ed69..31766e3f3 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/UserManager.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/ftpServer/UserManager.java @@ -21,7 +21,7 @@ public class UserManager implements org.apache.ftpserver.ftplet.UserManager { private static final String PREFIX = "VMP_FTP_USER_"; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Override diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java index 779fe9eda..45f98e923 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisRpcConfig.java @@ -38,7 +38,7 @@ public class RedisRpcConfig implements MessageListener { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; private ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java index 3bfee31ba..aac310a2a 100644 --- a/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java +++ b/src/main/java/com/genersoft/iot/vmp/conf/redis/RedisTemplateConfig.java @@ -13,8 +13,8 @@ import org.springframework.data.redis.serializer.StringRedisSerializer; public class RedisTemplateConfig { @Bean - public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) { - RedisTemplate redisTemplate = new RedisTemplate<>(); + public RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory) { + RedisTemplate redisTemplate = new RedisTemplate<>(); // 使用fastJson序列化 GenericFastJsonRedisSerializer fastJsonRedisSerializer = new GenericFastJsonRedisSerializer(); // value值的序列化采用fastJsonRedisSerializer diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java index 1ba09b540..02d2bdb1e 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceChannel.java @@ -191,6 +191,8 @@ public class DeviceChannel extends CommonGBChannel { @Schema(description = "通道类型, 默认0, 0: 普通通道,1 行政区划 2 业务分组/虚拟组织") private int channelType; + private String dbKey; + private Integer dataType = ChannelDataType.GB28181; public void setPtzType(int ptzType) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ShareGBChannel.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ShareGBChannel.java new file mode 100644 index 000000000..bd8d161ca --- /dev/null +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/ShareGBChannel.java @@ -0,0 +1,17 @@ +package com.genersoft.iot.vmp.gb28181.bean; + +import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@EqualsAndHashCode(callSuper = true) +@Data +@Schema(description = "国标共享通道") +public class ShareGBChannel extends CommonGBChannel{ + + @Schema(description = "平台ID") + private int platformId; + + +} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java index 0bcb9a695..e41caad6b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/bean/SubscribeHolder.java @@ -4,12 +4,15 @@ import com.genersoft.iot.vmp.conf.DynamicTask; import com.genersoft.iot.vmp.conf.UserSetting; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.time.Duration; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * @author lin @@ -25,7 +28,7 @@ public class SubscribeHolder { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; private final String prefix = "VMP_SUBSCRIBE_OVERDUE"; @@ -106,15 +109,28 @@ public class SubscribeHolder { return result; } - public List getAllMobilePositionSubscribePlatform(List platformList) { + public Map getAllMobilePositionSubscribePlatform(List platformList) { if (platformList == null || platformList.isEmpty()) { - return new ArrayList<>(); + return new HashMap<>(); } - List result = new ArrayList<>(); - for (Platform platform : platformList) { - String key = String.format("%s:%s:%s", prefix, "mobilePosition", platform.getServerGBId()); - if (redisTemplate.hasKey(key)) { - result.add(platform.getServerGBId()); + Map result = new HashMap<>(); + + // 1. 先批量构建所有 key + List keys = platformList.stream() + .map(platform -> String.format("%s:%s:%s", prefix, "mobilePosition", platform.getServerGBId())) + .toList(); + + // 2. 批量查询 Redis 【关键:只发1次请求!】 + List results = redisTemplate.executePipelined((RedisCallback) connection -> { + for (String key : keys) { + // 注意:这里使用的是底层 connection 接口 + connection.keyCommands().exists(key.getBytes()); + } + return null; // 流水线模式下必须返回 null + }); + for (int i = 0; i < results.size(); i++) { + if (results.get(i) instanceof Boolean exists && exists) { + result.put(platformList.get(i).getId(), platformList.get(i)); } } return result; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/ChannelController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/ChannelController.java index 20a92a8d0..647a679e2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/ChannelController.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/ChannelController.java @@ -48,7 +48,7 @@ import java.util.concurrent.TimeUnit; public class ChannelController { @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private IGbChannelService channelService; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/MobilePositionController.java b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/MobilePositionController.java index 768983c97..74bacf083 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/controller/MobilePositionController.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/controller/MobilePositionController.java @@ -48,23 +48,14 @@ public class MobilePositionController { private IDeviceService deviceService; - /** - * 查询历史轨迹 - * @param deviceId 设备ID - * @param start 开始时间 - * @param end 结束时间 - * @return - */ @Operation(summary = "查询历史轨迹", security = @SecurityRequirement(name = JwtUtils.HEADER)) - @Parameter(name = "deviceId", description = "设备国标编号", required = true) - @Parameter(name = "channelId", description = "通道国标编号") + @Parameter(name = "channelId", description = "通道的数据库ID") @Parameter(name = "start", description = "开始时间") @Parameter(name = "end", description = "结束时间") @GetMapping("/history/{deviceId}") - public List positions(@PathVariable String deviceId, - @RequestParam(required = false) String channelId, - @RequestParam(required = false) String start, - @RequestParam(required = false) String end) { + public List positions( Integer channelId, + @RequestParam(required = false) String start, + @RequestParam(required = false) String end) { if (StringUtil.isEmpty(start)) { start = null; @@ -72,19 +63,14 @@ public class MobilePositionController { if (StringUtil.isEmpty(end)) { end = null; } - return mobilePositionService.queryMobilePositions(deviceId, channelId, start, end); + return mobilePositionService.queryMobilePositions(channelId, start, end); } - /** - * 查询设备最新位置 - * @param deviceId 设备ID - * @return - */ - @Operation(summary = "查询设备最新位置", security = @SecurityRequirement(name = JwtUtils.HEADER)) - @Parameter(name = "deviceId", description = "设备国标编号", required = true) - @GetMapping("/latest/{deviceId}") - public MobilePosition latestPosition(@PathVariable String deviceId) { - return mobilePositionService.queryLatestPosition(deviceId); + @Operation(summary = "查询通道最新位置", security = @SecurityRequirement(name = JwtUtils.HEADER)) + @Parameter(name = "channelId", description = "通道的数据库ID", required = true) + @GetMapping("/latest") + public MobilePosition latestPosition(Integer channelId) { + return mobilePositionService.queryLatestPosition(channelId); } /** diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java index 1e59c4154..01ce71c2b 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/DeviceChannelMapper.java @@ -621,7 +621,7 @@ public interface DeviceChannelMapper { @Select(value = {" "}) - @MapKey("deviceIdKey") + @MapKey("dbKey") Map getAllForMobilePosition(@Param("deviceId") int deviceId, List mobilePositionList); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/MobilePositionMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/MobilePositionMapper.java index fe28567e1..c3c51ed93 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/MobilePositionMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/MobilePositionMapper.java @@ -1,7 +1,6 @@ package com.genersoft.iot.vmp.gb28181.dao; import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; -import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Insert; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; @@ -12,40 +11,33 @@ import java.util.List; @Mapper public interface MobilePositionMapper { - @Insert("INSERT INTO wvp_mobile_position (device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source,create_time)"+ - "VALUES (#{deviceId}, #{channelId}, #{deviceName}, #{time}, #{longitude}, #{latitude}, #{altitude}, #{speed}, #{direction}, #{reportSource}, #{createTime})") + @Insert("INSERT INTO wvp_mobile_position (channel_id, timestamp, longitude, latitude, altitude, speed, direction, create_time)"+ + "VALUES (#{channelId}, #{timestamp}, #{longitude}, #{latitude}, #{altitude}, #{speed}, #{direction}, #{createTime})") int insertNewPosition(MobilePosition mobilePosition); @Select(value = {" "}) - List queryPositionByDeviceIdAndTime(@Param("deviceId") String deviceId, @Param("channelId") String channelId, @Param("startTime") String startTime, @Param("endTime") String endTime); + List queryPositionByDeviceIdAndTime(@Param("channelId") Integer channelId, @Param("startTime") Long startTime, @Param("endTime") Long endTime); - @Select("SELECT * FROM wvp_mobile_position WHERE device_id = #{deviceId}" + - " ORDER BY time DESC LIMIT 1") - MobilePosition queryLatestPositionByDevice(String deviceId); - - @Delete("DELETE FROM wvp_mobile_position WHERE device_id = #{deviceId}") - int clearMobilePositionsByDeviceId(String deviceId); + @Select("SELECT * FROM wvp_mobile_position WHERE channel_id = #{channelId}" + + " ORDER BY timestamp DESC LIMIT 1") + MobilePosition queryLatestPosition(@Param("channelId") Integer channelId); @Insert("") - void batchadd(List mobilePositions); - - - void insertMobilePositions(List batchList); + void batchAdd(List mobilePositions); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformChannelMapper.java b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformChannelMapper.java index 8d44f8df3..cb5145351 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformChannelMapper.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/dao/PlatformChannelMapper.java @@ -548,4 +548,55 @@ public interface PlatformChannelMapper { " order by wcr.id DESC" + " ") Set queryShareRegion(Integer id); + + @Select("") + List queryShareChannelInPlatformsAndChannelIds(Collection platforms, Collection channelIds); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java b/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java deleted file mode 100755 index 7b06f07fc..000000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/event/subscribe/mobilePosition/MobilePositionEventLister.java +++ /dev/null @@ -1,75 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition; - -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel; -import com.genersoft.iot.vmp.gb28181.bean.Platform; -import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder; -import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo; -import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; -import com.genersoft.iot.vmp.gb28181.service.IPlatformService; -import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderForPlatform; -import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationListener; -import org.springframework.stereotype.Component; - -import javax.sip.InvalidArgumentException; -import javax.sip.SipException; -import java.text.ParseException; -import java.util.List; - -/** - * 移动位置通知消息转发 - */ -@Slf4j -@Component -public class MobilePositionEventLister implements ApplicationListener { - - @Autowired - private IPlatformService platformService; - - @Autowired - private IPlatformChannelService platformChannelService; - - @Autowired - private SIPCommanderForPlatform sipCommanderForPlatform; - - @Autowired - private SubscribeHolder subscribeHolder; - - @Autowired - private UserSetting userSetting; - - @Override - public void onApplicationEvent(MobilePositionEvent event) { - if (event.getMobilePosition().getChannelId() == 0) { - return; - } - List allPlatforms = platformService.queryAll(userSetting.getServerId()); - // 获取所用订阅 - List platforms = subscribeHolder.getAllMobilePositionSubscribePlatform(allPlatforms); - if (platforms.isEmpty()) { - return; - } - List platformsForGB = platformChannelService.queryPlatFormListByChannelDeviceId(event.getMobilePosition().getChannelId(), platforms); - - for (Platform platform : platformsForGB) { - if (log.isDebugEnabled()){ - log.debug("[向上级发送MobilePosition] 通道:{},平台:{}, 位置: {}:{}", event.getMobilePosition().getChannelId(), - platform.getServerGBId(), event.getMobilePosition().getLongitude(), event.getMobilePosition().getLatitude()); - } - SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId()); - try { - GPSMsgInfo gpsMsgInfo = GPSMsgInfo.getInstance(event.getMobilePosition()); - // 获取通道编号 - CommonGBChannel commonGBChannel = platformChannelService.queryChannelByPlatformIdAndChannelId(platform.getId(), event.getMobilePosition().getChannelId()); - sipCommanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, commonGBChannel, - subscribe); - } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | - IllegalAccessException e) { - log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage()); - } - } - } -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IMobilePositionService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IMobilePositionService.java deleted file mode 100644 index 4bb104df2..000000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IMobilePositionService.java +++ /dev/null @@ -1,4 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.service; - -public interface IMobilePositionService { -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformChannelService.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformChannelService.java index 0151b63a5..f29668db2 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/IPlatformChannelService.java @@ -50,4 +50,6 @@ public interface IPlatformChannelService { void checkRegionRemove(List channelList, List regionList); List queryByPlatformBySharChannelId(String gbId); + + void notifyMobilePosition(List handlerCatchDataList); } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java index dd446d87d..006a39fd9 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GbChannelServiceImpl.java @@ -73,7 +73,7 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne private DynamicTask dynamicTask; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private VectorTileCatch vectorTileCatch; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GroupServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GroupServiceImpl.java index f7342a7ff..6e1df3a3a 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GroupServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/GroupServiceImpl.java @@ -43,7 +43,7 @@ public class GroupServiceImpl implements IGroupService { private EventPublisher eventPublisher; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Override public void add(Group group) { diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java index dbac486b5..759c29cf6 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/InviteStreamServiceImpl.java @@ -33,7 +33,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService { private final Map>> inviteErrorCallbackMap = new ConcurrentHashMap<>(); @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private UserSetting userSetting; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/MobilePositionServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/MobilePositionServiceImpl.java deleted file mode 100644 index f27092621..000000000 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/MobilePositionServiceImpl.java +++ /dev/null @@ -1,84 +0,0 @@ -package com.genersoft.iot.vmp.gb28181.service.impl; - -import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; -import com.genersoft.iot.vmp.gb28181.dao.MobilePositionMapper; -import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent; -import com.genersoft.iot.vmp.gb28181.service.IMobilePositionService; -import com.genersoft.iot.vmp.gb28181.service.ISourceOtherService; -import jakarta.annotation.PostConstruct; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.springframework.context.event.EventListener; -import org.springframework.scheduling.annotation.Async; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Service; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentLinkedQueue; - -@Slf4j -@Service -@RequiredArgsConstructor -public class MobilePositionServiceImpl implements IMobilePositionService { - - - private final ConcurrentLinkedQueue mobilePositionQueue = new ConcurrentLinkedQueue<>(); - - private final Map sourceOtherServiceMap; - - private final MobilePositionMapper mobilePositionMapper; - - @PostConstruct - public void init() { - - } - - @Async - @EventListener - public void onApplicationEvent(MobilePositionEvent event) { - if (event.getMobilePositionList() == null || event.getMobilePositionList().isEmpty()) { - return; - } - for (ISourceOtherService sourceOtherService : sourceOtherServiceMap.values()) { - try { - Boolean addResult = sourceOtherService.addChannelIdForMobilePosition(event.getMobilePositionList()); - if (addResult != null && addResult) { - mobilePositionQueue.addAll(event.getMobilePositionList()); - } - }catch (Exception e) { - log.error("[移动位置事件] 处理移动位置事件失败", e); - } - } - } - - @Scheduled(fixedDelay = 500) - public void executeMobilePositionQueue() { - if (mobilePositionQueue.isEmpty()) { - return; - } - List handlerCatchDataList = new ArrayList<>(); - int size = mobilePositionQueue.size(); - for (int i = 0; i < size; i++) { - MobilePosition poll = mobilePositionQueue.poll(); - if (poll != null) { - handlerCatchDataList.add(poll); - } - } - if (handlerCatchDataList.isEmpty()) { - return; - } - // TODO 发送通知,方便国标级联转发给上级 - - - // 批量保存到数据库 - int batchSize = 1000; - for (int i = 0; i < handlerCatchDataList.size(); i += batchSize) { - int end = Math.min(i + batchSize, handlerCatchDataList.size()); - List batchList = handlerCatchDataList.subList(i, end); - mobilePositionMapper.insertMobilePositions(batchList); - } - } - -} diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java index 390153a48..61e7e10db 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/service/impl/PlatformChannelServiceImpl.java @@ -10,11 +10,12 @@ import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent; import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent; import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform; +import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -24,6 +25,7 @@ import javax.sip.InvalidArgumentException; import javax.sip.SipException; import java.text.ParseException; import java.util.*; +import java.util.concurrent.Executors; import java.util.stream.Collectors; /** @@ -31,38 +33,29 @@ import java.util.stream.Collectors; */ @Slf4j @Service +@RequiredArgsConstructor public class PlatformChannelServiceImpl implements IPlatformChannelService { - @Autowired - private PlatformChannelMapper platformChannelMapper; + private final PlatformChannelMapper platformChannelMapper; - @Autowired - private EventPublisher eventPublisher; + private final EventPublisher eventPublisher; - @Autowired - private GroupMapper groupMapper; + private final GroupMapper groupMapper; + private final RegionMapper regionMapper; - @Autowired - private RegionMapper regionMapper; + private final CommonGBChannelMapper commonGBChannelMapper; - @Autowired - private CommonGBChannelMapper commonGBChannelMapper; + private final PlatformMapper platformMapper; - @Autowired - private PlatformMapper platformMapper; + private final ISIPCommanderForPlatform sipCommanderForPlatform; - @Autowired - private ISIPCommanderForPlatform sipCommanderFroPlatform; + private final SubscribeHolder subscribeHolder; - @Autowired - private SubscribeHolder subscribeHolder; + private final UserSetting userSetting; - @Autowired - private UserSetting userSetting; + private final IRedisRpcService redisRpcService; - @Autowired - private IRedisRpcService redisRpcService; // 监听通道信息变化 @EventListener @@ -116,7 +109,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { deviceChannel.setGbDeviceId(serverGbId); deviceChannelList.add(deviceChannel); try { - sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getMessageType().name(), platform, deviceChannelList, subscribeInfo, null); + sipCommanderForPlatform.sendNotifyForCatalogOther(event.getMessageType().name(), platform, deviceChannelList, subscribeInfo, null); } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | IllegalAccessException e) { log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage()); @@ -146,7 +139,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { CommonGBChannel deviceChannel = channelMap.get(gbId); channelList.add(deviceChannel); try { - sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(event.getMessageType().name(), platform, channelList, subscribeInfo, null); + sipCommanderForPlatform.sendNotifyForCatalogAddOrUpdate(event.getMessageType().name(), platform, channelList, subscribeInfo, null); } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | IllegalAccessException e) { log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage()); @@ -185,7 +178,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { if (!channels.isEmpty()) { log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), deviceIds); try { - sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getType(), platform, channels, subscribe, null); + sipCommanderForPlatform.sendNotifyForCatalogOther(event.getType(), platform, channels, subscribe, null); } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | IllegalAccessException e) { log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage()); @@ -205,7 +198,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { if (!deviceChannelList.isEmpty()) { log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), deviceIds); try { - sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(event.getType(), platform, deviceChannelList, subscribe, null); + sipCommanderForPlatform.sendNotifyForCatalogAddOrUpdate(event.getType(), platform, deviceChannelList, subscribe, null); } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | IllegalAccessException e) { log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage()); @@ -646,7 +639,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { SubscribeInfo subscribeInfo = SubscribeInfo.buildSimulated(platform.getServerGBId(), platform.getServerIp()); try { - sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(CatalogEvent.ADD, platform, channelList, subscribeInfo, null); + sipCommanderForPlatform.sendNotifyForCatalogAddOrUpdate(CatalogEvent.ADD, platform, channelList, subscribeInfo, null); } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | IllegalAccessException e) { log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage()); @@ -852,4 +845,52 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService { } return platformChannelMapper.queryPlatFormListByChannelList(ids); } + + @Override + public void notifyMobilePosition(List mobilePositionList) { + + List allPlatforms = platformMapper.queryServerIdsWithEnableAndServer(userSetting.getServerId()); + // 获取所用订阅 + Map platformMap = subscribeHolder.getAllMobilePositionSubscribePlatform(allPlatforms); + if (platformMap.isEmpty()) { + return; + } + + // 对mobilePositionList内部的channelId分类 + Map> channelIdMap = mobilePositionList.stream().collect(Collectors.groupingBy(MobilePosition::getChannelId)); + + List shareGBChannels = platformChannelMapper.queryShareChannelInPlatformsAndChannelIds(platformMap.values(), channelIdMap.keySet()); + try (var executor = Executors.newVirtualThreadPerTaskExecutor()) { + for (ShareGBChannel shareGBChannel : shareGBChannels) { + List mobilePositions = channelIdMap.get(shareGBChannel.getGbId()); + if (mobilePositions == null || mobilePositions.isEmpty()) { + continue; + } + executor.submit(() -> { + Platform platform = platformMap.get(shareGBChannel.getPlatformId()); + if (platform == null) { + log.info("[查询平台] 平台ID:{} 未查询到", shareGBChannel.getPlatformId()); + return; + } + SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId()); + if (subscribe == null) { + log.info("[查询订阅] 平台:{} 未查询到移动位置订阅", platform.getServerGBId()); + return; + } + for (MobilePosition mobilePosition : mobilePositions) { + try { + GPSMsgInfo gpsMsgInfo = GPSMsgInfo.getInstance(mobilePosition); + // 获取通道编号 + CommonGBChannel commonGBChannel = queryChannelByPlatformIdAndChannelId(platform.getId(), mobilePosition.getChannelId()); + sipCommanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, commonGBChannel, + subscribe); + } catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException | + IllegalAccessException e) { + log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage()); + } + } + }); + } + } + } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java index 88ac9d59c..02a53f93c 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/CatalogDataManager.java @@ -33,7 +33,7 @@ public class CatalogDataManager implements CommandLineRunner { private IGroupService groupService; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; private final Map dataMap = new ConcurrentHashMap<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SipInviteSessionManager.java b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SipInviteSessionManager.java index a4468d921..80716d173 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/session/SipInviteSessionManager.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/session/SipInviteSessionManager.java @@ -20,7 +20,7 @@ public class SipInviteSessionManager { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; /** * 添加一个点播/回放的事务信息 diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/SubscribeTaskRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/SubscribeTaskRunner.java index 7e70935f5..dbfef68e3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/SubscribeTaskRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/deviceSubscribe/SubscribeTaskRunner.java @@ -26,7 +26,7 @@ public class SubscribeTaskRunner{ private final DelayQueue delayQueue = new DelayQueue<>(); @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private UserSetting userSetting; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformStatusTaskRunner.java b/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformStatusTaskRunner.java index a043d8507..a8e290ae3 100644 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformStatusTaskRunner.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/task/platformStatus/PlatformStatusTaskRunner.java @@ -30,7 +30,7 @@ public class PlatformStatusTaskRunner { private final DelayQueue keepaliveTaskDelayQueue = new DelayQueue<>(); @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private UserSetting userSetting; diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java index 48f53d96b..4e816ab7e 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/NotifyRequestForMobilePositionProcessor.java @@ -2,25 +2,18 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; +import com.genersoft.iot.vmp.gb28181.bean.DeviceMobilePosition; import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; -import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.gb28181.event.EventPublisher; -import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; -import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.service.IMobilePositionService; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; -import com.genersoft.iot.vmp.utils.DateUtil; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.dom4j.DocumentException; import org.dom4j.Element; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; import javax.sip.RequestEvent; import javax.sip.header.FromHeader; @@ -33,27 +26,18 @@ import java.util.concurrent.ConcurrentLinkedQueue; */ @Slf4j @Component +@RequiredArgsConstructor public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessorParent { private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); - @Autowired - private UserSetting userSetting; + private final UserSetting userSetting; - @Autowired - private EventPublisher eventPublisher; + private final EventPublisher eventPublisher; - @Autowired - private IRedisCatchStorage redisCatchStorage; - - @Autowired - private IDeviceChannelService deviceChannelService; - - @Autowired - private IMobilePositionService mobilePositionService; + private final IRedisCatchStorage redisCatchStorage; public void process(RequestEvent evt) { - if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { log.error("[notify-移动位置] 待处理消息队列已满 {},返回486 BUSY_HERE,消息不做处理", userSetting.getMaxNotifyCountQueue()); return; @@ -61,127 +45,56 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor taskQueue.offer(new HandlerCatchData(evt, null, null)); } - @Scheduled(fixedDelay = 200) //每200毫秒执行一次 + @Scheduled(fixedDelay = 200) @Async public void executeTaskQueue() { if (taskQueue.isEmpty()) { return; } List handlerCatchDataList = new ArrayList<>(); - while (!taskQueue.isEmpty()) { - handlerCatchDataList.add(taskQueue.poll()); + int size = taskQueue.size(); + for (int i = 0; i < size; i++) { + HandlerCatchData poll = taskQueue.poll(); + if (poll != null) { + handlerCatchDataList.add(poll); + } } if (handlerCatchDataList.isEmpty()) { return; } + List mobilePositionList = new ArrayList<>(); for (HandlerCatchData take : handlerCatchDataList) { - if (take == null) { - continue; - } RequestEvent evt = take.getEvt(); try { FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME); String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader); - long startTime = System.currentTimeMillis(); - // 回复 200 OK - Element rootElement = getRootElement(evt); - if (rootElement == null) { - log.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest()); - continue; - } Device device = redisCatchStorage.getDevice(deviceId); if (device == null) { - log.error("处理MobilePosition移动位置Notify时未获取到device,{}", deviceId); + log.error("[notify-移动位置] 未获取到device, {}", deviceId); continue; } - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setCreateTime(DateUtil.getNow()); - - DeviceChannel deviceChannel = null; - List elements = rootElement.elements(); - readDocument: for (Element element : elements) { - switch (element.getName()){ - case "DeviceID": - String channelId = element.getStringValue(); - deviceChannel = deviceChannelService.getOne(device.getDeviceId(), channelId); - if (deviceChannel != null) { - mobilePosition.setChannelId(deviceChannel.getId()); - mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId()); - }else { - log.error("[notify-移动位置] 未找到通道 {}/{}", device.getDeviceId(), channelId); - break readDocument; - } - break; - case "Time": - String timeVal = element.getStringValue(); - if (ObjectUtils.isEmpty(timeVal)){ - mobilePosition.setTimestamp(System.currentTimeMillis()); - }else { - Long timestamp = SipUtils.parseTimeForTimestamp(time); - if(timestamp == null) { - log.warn("解析移动位置时间失败:{}, 使用当前时间", time); - mobilePosition.setTimestamp(System.currentTimeMillis()); - }else { - mobilePosition.setTimestamp(timestamp); - } - } - break; - case "Longitude": - mobilePosition.setLongitude(Double.parseDouble(element.getStringValue())); - break; - case "Latitude": - mobilePosition.setLatitude(Double.parseDouble(element.getStringValue())); - break; - case "Speed": - String speedVal = element.getStringValue(); - if (NumericUtil.isDouble(speedVal)) { - mobilePosition.setSpeed(Double.parseDouble(speedVal)); - } else { - mobilePosition.setSpeed(0.0); - } - break; - case "Direction": - String directionVal = element.getStringValue(); - if (NumericUtil.isDouble(directionVal)) { - mobilePosition.setDirection(Double.parseDouble(directionVal)); - } else { - mobilePosition.setDirection(0.0); - } - break; - case "Altitude": - String altitudeVal = element.getStringValue(); - if (NumericUtil.isDouble(altitudeVal)) { - mobilePosition.setAltitude(Double.parseDouble(altitudeVal)); - } else { - mobilePosition.setAltitude(0.0); - } - break; - - } - } - if (deviceChannel == null) { + Element rootElement = getRootElement(evt, device.getCharset()); + if (rootElement == null) { + log.warn("[notify-移动位置] {}处理失败,未识别到信息体", deviceId); continue; } - - log.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelDeviceId(), - mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime); - - mobilePositionService.add(mobilePosition); - // 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息 - try { - eventPublisher.mobilePositionEventPublish(mobilePosition); - }catch (Exception e) { - log.error("[MobilePositionEvent] 发送失败: ", e); + List mobilePositions = DeviceMobilePosition.decode(device, rootElement); + for (DeviceMobilePosition mobilePosition : mobilePositions) { + log.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", device.getDeviceId(), mobilePosition.getChannelDeviceId(), + mobilePosition.getLongitude(), mobilePosition.getLatitude(), mobilePosition.getTimestamp()); + mobilePositionList.add(mobilePosition); } - } catch (DocumentException e) { - log.error("[收到移动位置订阅通知] 文档解析异常: \r\n{}", evt.getRequest(), e); - } catch ( Exception e) { - log.error("[收到移动位置订阅通知] 异常: ", e); + } catch (Exception e) { + log.warn("[notify-移动位置] 发现未处理的异常, \r\n{}", evt.getRequest()); + log.error("[notify-移动位置] 异常内容: ", e); + } + } + if (!mobilePositionList.isEmpty()) { + try { + eventPublisher.mobilePositionsEventPublish(mobilePositionList); + } catch (Exception e) { + log.error("[MobilePositionEvent] 发送失败: ", e); } } } -// @Scheduled(fixedRate = 10000) -// public void execute(){ -// logger.debug("[待处理Notify-移动位置订阅消息数量]: {}", taskQueue.size()); -// } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java index f54c536d3..a250b44f6 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/AlarmNotifyMessageHandler.java @@ -114,21 +114,16 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme if (deviceChannel == null) { log.warn("[解析报警消息] 未找到通道:{}/{}", device.getDeviceId(), deviceAlarmNotify.getChannelId()); } else { - MobilePosition mobilePosition = new MobilePosition(); + DeviceMobilePosition mobilePosition = new DeviceMobilePosition(); mobilePosition.setCreateTime(DateUtil.getNow()); - mobilePosition.setDeviceId(device.getDeviceId()); mobilePosition.setChannelId(deviceChannel.getId()); mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId()); - mobilePosition.setTime(deviceAlarmNotify.getAlarmTime()); + mobilePosition.setTimestamp(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestampMs(deviceAlarmNotify.getAlarmTime())); mobilePosition.setLongitude(deviceAlarmNotify.getLongitude()); mobilePosition.setLatitude(deviceAlarmNotify.getLatitude()); - - // 更新device channel 的经纬度 - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); - - deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); + mobilePosition.setDevice(device); + // 发送移动位置事件,后续会保存到数据库,并且发送给上级平台 + publisher.mobilePositionsEventPublish(List.of(mobilePosition)); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java index c7b867b95..0f345c2d9 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/notify/cmd/MobilePositionNotifyMessageHandler.java @@ -54,7 +54,7 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen log.error("[message-notify-移动位置] 待处理消息队列已满 {},返回486 BUSY_HERE", userSetting.getMaxNotifyCountQueue()); return; } - taskQueue.offer(new HandlerCatchData(evt, null, null)); + taskQueue.offer(new HandlerCatchData(evt, device, rootElement)); // 回复200 OK try { responseAckAsync((SIPRequest) evt.getRequest(), Response.OK); diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java index e0c214a23..2256d2758 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/MobilePositionResponseMessageHandler.java @@ -1,34 +1,33 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd; +import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; -import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; +import com.genersoft.iot.vmp.gb28181.bean.DeviceMobilePosition; +import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData; import com.genersoft.iot.vmp.gb28181.bean.Platform; +import com.genersoft.iot.vmp.gb28181.event.EventPublisher; import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder; import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage; import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler; import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler; -import com.genersoft.iot.vmp.gb28181.utils.NumericUtil; -import com.genersoft.iot.vmp.gb28181.utils.SipUtils; -import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService; -import com.genersoft.iot.vmp.utils.DateUtil; import gov.nist.javax.sip.message.SIPRequest; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.dom4j.DocumentException; import org.dom4j.Element; import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; import javax.sip.InvalidArgumentException; import javax.sip.RequestEvent; import javax.sip.SipException; import javax.sip.message.Response; import java.text.ParseException; - -import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; /** * 移动设备位置数据查询回复 @@ -36,18 +35,20 @@ import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText; */ @Slf4j @Component +@RequiredArgsConstructor public class MobilePositionResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler { private final String cmdType = "MobilePosition"; - @Autowired - private ResponseMessageHandler responseMessageHandler; + private final ResponseMessageHandler responseMessageHandler; - @Autowired - private IDeviceChannelService deviceChannelService; + private final EventPublisher eventPublisher; - @Autowired - private DeferredResultHolder resultHolder; + private final UserSetting userSetting; + + private final DeferredResultHolder resultHolder; + + private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); @Override public void afterPropertiesSet() throws Exception { @@ -56,79 +57,66 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar @Override public void handForDevice(RequestEvent evt, Device device, Element rootElement) { - SIPRequest request = (SIPRequest) evt.getRequest(); - + if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) { + log.error("[移动设备位置查询回复] 待处理消息队列已满 {},丢弃消息", userSetting.getMaxNotifyCountQueue()); + return; + } + taskQueue.offer(new HandlerCatchData(evt, device, rootElement)); try { - rootElement = getRootElement(evt, device.getCharset()); - if (rootElement == null) { - log.warn("[ 移动设备位置数据查询回复 ] content cannot be null, {}", evt.getRequest()); - try { - responseAckAsync(request, Response.BAD_REQUEST); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 移动设备位置数据查询 BAD_REQUEST: {}", e.getMessage()); - } - return; + responseAckAsync((SIPRequest) evt.getRequest(), Response.OK); + } catch (SipException | InvalidArgumentException | ParseException e) { + log.error("[命令发送失败] 移动设备位置数据查询 200: {}", e.getMessage()); + } + } + + @Scheduled(fixedDelay = 400) + @Async + public void executeTaskQueue() { + if (taskQueue.isEmpty()) { + return; + } + List handlerCatchDataList = new ArrayList<>(); + int size = taskQueue.size(); + for (int i = 0; i < size; i++) { + HandlerCatchData poll = taskQueue.poll(); + if (poll != null) { + handlerCatchDataList.add(poll); } - String channelId = getText(rootElement, "DeviceID"); - DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), channelId); - if (deviceChannel == null) { - log.warn("[解析报警消息] 未找到通道:{}/{}", device.getDeviceId(), channelId); - }else { - MobilePosition mobilePosition = new MobilePosition(); - mobilePosition.setCreateTime(DateUtil.getNow()); - if (!ObjectUtils.isEmpty(device.getName())) { - mobilePosition.setDeviceName(device.getName()); - } - mobilePosition.setChannelId(deviceChannel.getId()); - mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId()); - //兼容ISO 8601格式时间 - String time = getText(rootElement, "Time"); - if (ObjectUtils.isEmpty(time)){ - mobilePosition.setTime(DateUtil.getNow()); - }else { - mobilePosition.setTime(SipUtils.parseTime(time)); - } - mobilePosition.setLongitude(Double.parseDouble(getText(rootElement, "Longitude"))); - mobilePosition.setLatitude(Double.parseDouble(getText(rootElement, "Latitude"))); - if (NumericUtil.isDouble(getText(rootElement, "Speed"))) { - mobilePosition.setSpeed(Double.parseDouble(getText(rootElement, "Speed"))); - } else { - mobilePosition.setSpeed(0.0); - } - if (NumericUtil.isDouble(getText(rootElement, "Direction"))) { - mobilePosition.setDirection(Double.parseDouble(getText(rootElement, "Direction"))); - } else { - mobilePosition.setDirection(0.0); - } - if (NumericUtil.isDouble(getText(rootElement, "Altitude"))) { - mobilePosition.setAltitude(Double.parseDouble(getText(rootElement, "Altitude"))); - } else { - mobilePosition.setAltitude(0.0); - } - - // 更新device channel 的经纬度 - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); - - deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition); - - String key = DeferredResultHolder.CALLBACK_CMD_MOBILE_POSITION + device.getDeviceId(); - RequestMessage msg = new RequestMessage(); - msg.setKey(key); - msg.setData(mobilePosition); - resultHolder.invokeAllResult(msg); - } - - //回复 200 OK + } + if (handlerCatchDataList.isEmpty()) { + return; + } + List mobilePositionList = new ArrayList<>(); + for (HandlerCatchData take : handlerCatchDataList) { + Device device = take.getDevice(); try { - responseAckAsync(request, Response.OK); - } catch (SipException | InvalidArgumentException | ParseException e) { - log.error("[命令发送失败] 移动设备位置数据查询 200: {}", e.getMessage()); + Element rootElementAfterCharset = getRootElement(take.getEvt(), device.getCharset()); + if (rootElementAfterCharset == null) { + log.warn("[移动设备位置查询回复] {}处理失败,未识别到信息体", device.getDeviceId()); + continue; + } + List mobilePositions = DeviceMobilePosition.decode(device, rootElementAfterCharset); + for (DeviceMobilePosition mobilePosition : mobilePositions) { + log.info("[收到移动位置查询回复]:{}/{}->{}.{}, 时间: {}", device.getDeviceId(), mobilePosition.getChannelDeviceId(), + mobilePosition.getLongitude(), mobilePosition.getLatitude(), mobilePosition.getTimestamp()); + mobilePositionList.add(mobilePosition); + String key = DeferredResultHolder.CALLBACK_CMD_MOBILE_POSITION + device.getDeviceId(); + RequestMessage msg = new RequestMessage(); + msg.setKey(key); + msg.setData(mobilePosition); + resultHolder.invokeAllResult(msg); + } + } catch (Exception e) { + log.warn("[移动设备位置查询回复] 发现未处理的异常, \r\n{}", take.getEvt().getRequest()); + log.error("[移动设备位置查询回复] 异常内容: ", e); + } + } + if (!mobilePositionList.isEmpty()) { + try { + eventPublisher.mobilePositionsEventPublish(mobilePositionList); + } catch (Exception e) { + log.error("[MobilePositionEvent] 发送失败: ", e); } - - } catch (DocumentException e) { - log.error("未处理的异常 ", e); } } diff --git a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java index 4272bd9e3..73cb741b9 100755 --- a/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java +++ b/src/main/java/com/genersoft/iot/vmp/gb28181/transmit/event/request/impl/message/response/cmd/RecordInfoResponseMessageHandler.java @@ -49,7 +49,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent private ApplicationEventPublisher applicationEventPublisher; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; private Long recordInfoTtl = 1800L; diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/SourceOtherServiceForJTImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/SourceOtherServiceForJTImpl.java index 8774effed..86f23c5fe 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/SourceOtherServiceForJTImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/SourceOtherServiceForJTImpl.java @@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.jt1078.service.impl; import com.genersoft.iot.vmp.common.enums.ChannelDataType; import com.genersoft.iot.vmp.common.enums.MediaStreamUtil; import com.genersoft.iot.vmp.conf.UserSetting; +import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.gb28181.service.ISourceOtherService; import com.genersoft.iot.vmp.jt1078.service.Ijt1078PlayService; import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service; @@ -10,6 +11,8 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import java.util.List; + @Slf4j @Service(ChannelDataType.OTHER_SERVICE + ChannelDataType.JT_1078) @RequiredArgsConstructor @@ -17,8 +20,6 @@ public class SourceOtherServiceForJTImpl implements ISourceOtherService { private final UserSetting userSetting; - private final Ijt1078Service ijt1078Service; - private final Ijt1078PlayService jt1078PlayService; @Override @@ -43,4 +44,10 @@ public class SourceOtherServiceForJTImpl implements ISourceOtherService { } return false; } + + @Override + public Boolean addChannelIdForMobilePosition(List mobilePositionList) { + + return null; + } } diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java index 54f7d7e3f..9dbecb661 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078PlayServiceImpl.java @@ -65,7 +65,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService { private JT1078Template jt1078Template; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private HookSubscribe subscribe; diff --git a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java index 342f4f52b..7a77c0a02 100644 --- a/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/jt1078/service/impl/jt1078ServiceImpl.java @@ -66,7 +66,7 @@ public class jt1078ServiceImpl implements Ijt1078Service { private JT1078Template jt1078Template; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private IGbChannelService channelService; diff --git a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLHttpHookListener.java b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLHttpHookListener.java index f888d0b55..8b799b238 100644 --- a/src/main/java/com/genersoft/iot/vmp/media/abl/ABLHttpHookListener.java +++ b/src/main/java/com/genersoft/iot/vmp/media/abl/ABLHttpHookListener.java @@ -80,7 +80,7 @@ public class ABLHttpHookListener { private SSRCFactory ssrcFactory; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private ApplicationEventPublisher applicationEventPublisher; diff --git a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java index 84fadd2df..0cc045ab5 100755 --- a/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/media/service/impl/MediaServerServiceImpl.java @@ -73,7 +73,7 @@ public class MediaServerServiceImpl implements IMediaServerService { private IInviteStreamService inviteStreamService; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private Map nodeServerServiceMap; diff --git a/src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java b/src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java index 1643ad7e5..06535f58d 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java +++ b/src/main/java/com/genersoft/iot/vmp/service/IMobilePositionService.java @@ -8,14 +8,10 @@ import java.util.List; public interface IMobilePositionService { - void add(List mobilePositionList); - - void add(MobilePosition mobilePosition); - - List queryMobilePositions(String deviceId, String channelId, String startTime, String endTime); + List queryMobilePositions(Integer channelId, String startTime, String endTime); List queryEnablePlatformListWithAsMessageChannel(); - MobilePosition queryLatestPosition(String deviceId); + MobilePosition queryLatestPosition(Integer channelId); } diff --git a/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java b/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java index a2e0e7276..11e02b722 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java +++ b/src/main/java/com/genersoft/iot/vmp/service/bean/GPSMsgInfo.java @@ -1,6 +1,7 @@ package com.genersoft.iot.vmp.service.bean; import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; +import com.genersoft.iot.vmp.utils.DateUtil; import lombok.Data; @Data @@ -61,7 +62,7 @@ public class GPSMsgInfo { gpsMsgInfo.setLat(mobilePosition.getLatitude()); gpsMsgInfo.setSpeed(mobilePosition.getSpeed()); gpsMsgInfo.setDirection(mobilePosition.getDirection()); - gpsMsgInfo.setTime(mobilePosition.getTime()); + gpsMsgInfo.setTime(DateUtil.timestampMsTo_yyyy_MM_dd_HH_mm_ss(mobilePosition.getTimestamp())); return gpsMsgInfo; } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java index 3792cd898..198831d18 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/MobilePositionServiceImpl.java @@ -1,82 +1,55 @@ package com.genersoft.iot.vmp.service.impl; -import com.genersoft.iot.vmp.conf.UserSetting; -import com.genersoft.iot.vmp.gb28181.bean.Device; -import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel; import com.genersoft.iot.vmp.gb28181.bean.MobilePosition; import com.genersoft.iot.vmp.gb28181.bean.Platform; -import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; -import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper; import com.genersoft.iot.vmp.gb28181.dao.MobilePositionMapper; import com.genersoft.iot.vmp.gb28181.dao.PlatformMapper; -import com.genersoft.iot.vmp.gb28181.utils.Coordtransform; +import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent; +import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService; +import com.genersoft.iot.vmp.gb28181.service.ISourceOtherService; import com.genersoft.iot.vmp.service.IMobilePositionService; import com.genersoft.iot.vmp.utils.DateUtil; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentLinkedQueue; @Slf4j @Service +@RequiredArgsConstructor public class MobilePositionServiceImpl implements IMobilePositionService { - @Autowired - private DeviceMapper deviceMapper; + private final ConcurrentLinkedQueue mobilePositionQueue = new ConcurrentLinkedQueue<>(); - @Autowired - private DeviceChannelMapper channelMapper; + private final Map sourceOtherServiceMap; - @Autowired - private MobilePositionMapper mobilePositionMapper; - - @Autowired - private UserSetting userSetting; - - - @Autowired - private PlatformMapper platformMapper; - - @Autowired - private RedisTemplate redisTemplate; - - private final String REDIS_MOBILE_POSITION_LIST = "redis_mobile_position_list"; - - @Override - public void add(MobilePosition mobilePosition) { - List list = new ArrayList<>(); - list.add(mobilePosition); - add(list); - } - - @Override - public void add(List mobilePositionList) { - redisTemplate.opsForList().leftPushAll(REDIS_MOBILE_POSITION_LIST, mobilePositionList); - } - - private List get(int length) { - Long size = redisTemplate.opsForList().size(REDIS_MOBILE_POSITION_LIST); - if (size == null || size == 0) { - return new ArrayList<>(); - } - return redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, Math.min(length, size)); - } + private final MobilePositionMapper mobilePositionMapper; + private final IPlatformChannelService platformChannelService; + private final PlatformMapper platformMapper; /** * 查询移动位置轨迹 */ @Override - public synchronized List queryMobilePositions(String deviceId, String channelId, String startTime, String endTime) { - return mobilePositionMapper.queryPositionByDeviceIdAndTime(deviceId, channelId, startTime, endTime); + public synchronized List queryMobilePositions(Integer channelId, String startTime, String endTime) { + Long startTimestamp = null; + Long endTimestamp = null; + if (startTime != null) { + startTimestamp = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestampMs(startTime); + } + if (endTime != null) { + endTimestamp = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestampMs(endTime); + } + return mobilePositionMapper.queryPositionByDeviceIdAndTime(channelId, startTimestamp, endTimestamp); } @Override @@ -88,57 +61,56 @@ public class MobilePositionServiceImpl implements IMobilePositionService { * 查询最新移动位置 */ @Override - public MobilePosition queryLatestPosition(String deviceId) { - return mobilePositionMapper.queryLatestPositionByDevice(deviceId); + public MobilePosition queryLatestPosition(Integer channelId) { + return mobilePositionMapper.queryLatestPosition(channelId); } - @Scheduled(fixedDelay = 1000) - @Transactional - public void executeTaskQueue() { - int countLimit = 3000; - List mobilePositions = get(countLimit); - if (mobilePositions == null || mobilePositions.isEmpty()) { + @Async + @EventListener + public void onApplicationEvent(MobilePositionEvent event) { + if (event.getMobilePositionList() == null || event.getMobilePositionList().isEmpty()) { return; } - if (userSetting.getSavePositionHistory()) { - mobilePositionMapper.batchadd(mobilePositions); - } - log.info("[移动位置订阅]更新通道位置: {}", mobilePositions.size()); - Map> updateChannelMap = new HashMap<>(); - for (MobilePosition mobilePosition : mobilePositions) { - DeviceChannel deviceChannel = new DeviceChannel(); - deviceChannel.setId(mobilePosition.getChannelId()); - deviceChannel.setDeviceId(mobilePosition.getDeviceId()); - deviceChannel.setLongitude(mobilePosition.getLongitude()); - deviceChannel.setLatitude(mobilePosition.getLatitude()); - deviceChannel.setGpsTime(mobilePosition.getTime()); - deviceChannel.setUpdateTime(DateUtil.getNow()); - if (mobilePosition.getLongitude() > 0 || mobilePosition.getLatitude() > 0) { - Double[] wgs84Position = Coordtransform.GCJ02ToWGS84(mobilePosition.getLongitude(), mobilePosition.getLatitude()); - deviceChannel.setGbLongitude(wgs84Position[0]); - deviceChannel.setGbLatitude(wgs84Position[1]); + for (ISourceOtherService sourceOtherService : sourceOtherServiceMap.values()) { + try { + // 此时已经完成了通道ID的添加,以及坐标系的转换,后续只需要将数据保存到数据库即可 + Boolean addResult = sourceOtherService.addChannelIdForMobilePosition(event.getMobilePositionList()); + if (addResult != null && addResult) { + mobilePositionQueue.addAll(event.getMobilePositionList()); + } + }catch (Exception e) { + log.error("[移动位置事件] 处理移动位置事件失败", e); } - if (!updateChannelMap.containsKey(mobilePosition.getDeviceId())) { - updateChannelMap.put(mobilePosition.getDeviceId(), new HashMap<>()); - } - updateChannelMap.get(mobilePosition.getDeviceId()).put(mobilePosition.getChannelId(), deviceChannel); } - List deviceIds = new ArrayList<>(updateChannelMap.keySet()); - if (deviceIds.isEmpty()) { - log.info("[移动位置订阅]为查询到对应的设备,消息已经忽略"); + } + + @Scheduled(fixedDelay = 500) + public void executeMobilePositionQueue() { + if (mobilePositionQueue.isEmpty()) { return; } - List deviceList = deviceMapper.queryByDeviceIds(deviceIds); - for (Device device : deviceList) { - Map channelMap = updateChannelMap.get(device.getDeviceId()); - if (device.getGeoCoordSys().equalsIgnoreCase("GCJ02")) { - channelMap.values().forEach(channel -> { - Double[] wgs84Position = Coordtransform.GCJ02ToWGS84(channel.getLongitude(), channel.getLatitude()); - channel.setGbLongitude(wgs84Position[0]); - channel.setGbLatitude(wgs84Position[1]); - }); + List handlerCatchDataList = new ArrayList<>(); + int size = mobilePositionQueue.size(); + for (int i = 0; i < size; i++) { + MobilePosition poll = mobilePositionQueue.poll(); + if (poll != null) { + handlerCatchDataList.add(poll); } - channelMapper.batchUpdatePosition(new ArrayList<>(channelMap.values())); + } + if (handlerCatchDataList.isEmpty()) { + return; + } + List mobilePositionList = handlerCatchDataList.stream().filter( + mobilePosition -> mobilePosition.getChannelId() != 0).toList(); + // 发送通知,方便国标级联转发给上级 + Thread.startVirtualThread(() -> platformChannelService.notifyMobilePosition(mobilePositionList)); + + // 批量保存到数据库 + int batchSize = 1000; + for (int i = 0; i < mobilePositionList.size(); i += batchSize) { + int end = Math.min(i + batchSize, mobilePositionList.size()); + List batchList = mobilePositionList.subList(i, end); + mobilePositionMapper.batchAdd(batchList); } } diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java index 13b5c5664..437a63fea 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/RtpServerServiceImpl.java @@ -53,7 +53,7 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService { private HookSubscribe subscribe; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; /** * 流到来的处理 diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java index 053ff9e69..3b8ec0c7b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/SendRtpServerServiceImpl.java @@ -27,7 +27,7 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Override diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/UserApiKeyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/UserApiKeyServiceImpl.java index 8c552b10f..3078fb72e 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/impl/UserApiKeyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/impl/UserApiKeyServiceImpl.java @@ -20,7 +20,7 @@ public class UserApiKeyServiceImpl implements IUserApiKeyService { UserApiKeyMapper userApiKeyMapper; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Override public int addApiKey(UserApiKey userApiKey) { diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java index 63fee30dd..d51484248 100755 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/RedisGpsMsgListener.java @@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.service.bean.GPSMsgInfo; import com.genersoft.iot.vmp.storager.IRedisCatchStorage; import com.genersoft.iot.vmp.streamPush.service.IStreamPushService; import com.genersoft.iot.vmp.utils.DateUtil; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.jetbrains.annotations.NotNull; import org.springframework.beans.factory.annotation.Autowired; @@ -29,16 +30,12 @@ import java.util.stream.Collectors; */ @Slf4j @Component +@RequiredArgsConstructor public class RedisGpsMsgListener implements MessageListener { - @Autowired - private IRedisCatchStorage redisCatchStorage; + private final IRedisCatchStorage redisCatchStorage; - @Autowired - private IStreamPushService streamPushService; - - @Autowired - private IGbChannelService channelService; + private final IGbChannelService channelService; private final ConcurrentLinkedQueue taskQueue = new ConcurrentLinkedQueue<>(); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java index acd8e8136..8a7d3908c 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcChannelPlayController.java @@ -35,7 +35,7 @@ public class RedisRpcChannelPlayController extends RpcController { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private IGbChannelService channelService; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcCloudRecordController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcCloudRecordController.java index ae6c5a171..0bcc0e5d1 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcCloudRecordController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcCloudRecordController.java @@ -26,7 +26,7 @@ public class RedisRpcCloudRecordController extends RpcController { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private ICloudRecordService cloudRecordService; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java index c4491905a..7bae7442d 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDeviceController.java @@ -31,7 +31,7 @@ public class RedisRpcDeviceController extends RpcController { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private IDeviceService deviceService; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDevicePlayController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDevicePlayController.java index 2e83cc71f..66302efb5 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDevicePlayController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcDevicePlayController.java @@ -29,7 +29,7 @@ public class RedisRpcDevicePlayController extends RpcController { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private IDeviceService deviceService; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcGbDeviceController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcGbDeviceController.java index f36d34a56..807542f98 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcGbDeviceController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcGbDeviceController.java @@ -25,7 +25,7 @@ public class RedisRpcGbDeviceController extends RpcController { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private IDeviceService deviceService; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcPlatformController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcPlatformController.java index 9f7a9f8df..e9b323cc4 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcPlatformController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcPlatformController.java @@ -33,7 +33,7 @@ public class RedisRpcPlatformController extends RpcController { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private IPlatformService platformService; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamProxyController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamProxyController.java index 4a688c3df..4180b1c7b 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamProxyController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamProxyController.java @@ -27,7 +27,7 @@ public class RedisRpcStreamProxyController extends RpcController { private UserSetting userSetting; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private IStreamProxyPlayService streamProxyPlayService; diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java index 84f90d3b9..5c7927568 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/control/RedisRpcStreamPushController.java @@ -46,7 +46,7 @@ public class RedisRpcStreamPushController extends RpcController { private HookSubscribe hookSubscribe; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private IStreamPushPlayService streamPushPlayService; @@ -102,7 +102,7 @@ public class RedisRpcStreamPushController extends RpcController { sendRtpItem.setLocalIp(hookData.getMediaServer().getSdpIp()); sendRtpItem.setServerId(userSetting.getServerId()); - redisTemplate.opsForValue().set(sendRtpItem.getChannelId(), sendRtpItem); + redisTemplate.opsForValue().set(sendRtpItem.getChannelId() + "", sendRtpItem); RedisRpcResponse response = request.getResponse(); response.setBody(sendRtpItem.getChannelId()); response.setStatusCode(ErrorCode.SUCCESS.getCode()); diff --git a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java index bb040e128..7c8242bca 100644 --- a/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/service/redisMsg/service/RedisRpcServiceImpl.java @@ -47,7 +47,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService { private SSRCFactory ssrcFactory; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private IMediaServerService mediaServerService; diff --git a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java index 29c730428..a744f9382 100755 --- a/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/storager/impl/RedisCatchStorageImpl.java @@ -7,7 +7,6 @@ import com.genersoft.iot.vmp.common.SystemAllInfo; import com.genersoft.iot.vmp.common.VideoManagerConstants; import com.genersoft.iot.vmp.conf.UserSetting; import com.genersoft.iot.vmp.gb28181.bean.*; -import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper; import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper; import com.genersoft.iot.vmp.media.bean.MediaInfo; import com.genersoft.iot.vmp.media.bean.MediaServer; @@ -19,6 +18,7 @@ import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.utils.JsonUtil; import com.genersoft.iot.vmp.utils.SystemInfoUtils; import com.genersoft.iot.vmp.utils.redis.RedisUtil; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.jspecify.annotations.NonNull; import org.springframework.beans.factory.annotation.Autowired; @@ -31,29 +31,21 @@ import org.springframework.stereotype.Component; import java.time.Duration; import java.util.*; -@SuppressWarnings("rawtypes") @Slf4j @Component +@RequiredArgsConstructor public class RedisCatchStorageImpl implements IRedisCatchStorage { - @Autowired - private DeviceChannelMapper deviceChannelMapper; + private final DeviceMapper deviceMapper; - @Autowired - private DeviceMapper deviceMapper; + private final UserSetting userSetting; - @Autowired - private UserSetting userSetting; + private final RedisTemplate redisTemplate; - @Autowired - private RedisTemplate redisTemplate; + private final RedisTemplate longRedisTemplate; - @Autowired - private RedisTemplate longRedisTemplate; - - @Autowired - private StringRedisTemplate stringRedisTemplate; + private final StringRedisTemplate stringRedisTemplate; @Override public List queryAllSendRTPServer() { @@ -126,7 +118,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type.toUpperCase() + "_*_*_" + mediaServerId; List streams = RedisUtil.scan(redisTemplate, key); for (Object stream : streams) { - redisTemplate.delete(stream); + redisTemplate.delete((String) stream); } } @@ -268,7 +260,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { MediaInfo result = null; List keys = RedisUtil.scan(redisTemplate, scanKey); - if (keys.size() > 0) { + if (!keys.isEmpty()) { String key = (String) keys.get(0); String mediaInfoJson = (String)redisTemplate.opsForValue().get(key); result = JSON.parseObject(mediaInfoJson, MediaInfo.class); @@ -283,7 +275,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { MediaInfo result = null; List keys = RedisUtil.scan(redisTemplate, scanKey); - if (keys.size() > 0) { + if (!keys.isEmpty()) { String key = (String) keys.get(0); String mediaInfoJson = (String)redisTemplate.opsForValue().get(key); result = JSON.parseObject(mediaInfoJson, MediaInfo.class); @@ -423,7 +415,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } msg.append(" ").append(online? "ON":"OFF"); log.info("[redis通知] 推送设备/通道状态-> {} ", msg); - // 使用 RedisTemplate 发送字符串消息会导致发送的消息多带了双引号 + // 使用 RedisTemplate 发送字符串消息会导致发送的消息多带了双引号 stringRedisTemplate.convertAndSend(key, msg.toString()); } @@ -439,7 +431,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage { } msg.append(" ").append(add? "ADD":"DELETE"); log.info("[redis通知] 推送通道-> {}", msg); - // 使用 RedisTemplate 发送字符串消息会导致发送的消息多带了双引号 + // 使用 RedisTemplate 发送字符串消息会导致发送的消息多带了双引号 stringRedisTemplate.convertAndSend(key, msg.toString()); } diff --git a/src/main/java/com/genersoft/iot/vmp/utils/JsonUtil.java b/src/main/java/com/genersoft/iot/vmp/utils/JsonUtil.java index b44af109a..3cd5f3d9d 100755 --- a/src/main/java/com/genersoft/iot/vmp/utils/JsonUtil.java +++ b/src/main/java/com/genersoft/iot/vmp/utils/JsonUtil.java @@ -24,7 +24,7 @@ public final class JsonUtil { * @param * @return result type */ - public static T redisJsonToObject(RedisTemplate redisTemplate, String key, Class clazz) { + public static T redisJsonToObject(RedisTemplate redisTemplate, String key, Class clazz) { Object jsonObject = redisTemplate.opsForValue().get(key); if (Objects.isNull(jsonObject)) { return null; @@ -32,7 +32,7 @@ public final class JsonUtil { return clazz.cast(jsonObject); } - public static T redisHashJsonToObject(RedisTemplate redisTemplate, String key, String objKey, Class clazz) { + public static T redisHashJsonToObject(RedisTemplate redisTemplate, String key, String objKey, Class clazz) { // if (key == null || objKey == null) { // return null; // } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/TestController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/TestController.java index 6df13ba79..4fddbf890 100644 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/TestController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/TestController.java @@ -25,7 +25,7 @@ public class TestController { private HookSubscribe subscribe; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @GetMapping("/hook/list") public List all(){ diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java index 3d053fc57..d8e7da545 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/ps/PsController.java @@ -62,7 +62,7 @@ public class PsController { @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @GetMapping(value = "/receive/open") @@ -172,7 +172,7 @@ public class PsController { if (!scan.isEmpty()) { for (Object key : scan) { // 将信息写入redis中,以备后用 - redisTemplate.delete(key); + redisTemplate.delete((String) key); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java index d8f130a89..60db1ea16 100755 --- a/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java +++ b/src/main/java/com/genersoft/iot/vmp/vmanager/rtp/RtpController.java @@ -62,7 +62,7 @@ public class RtpController { private DynamicTask dynamicTask; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @GetMapping(value = "/receive/open") @@ -185,7 +185,7 @@ public class RtpController { if (scan.size() > 0) { for (Object key : scan) { // 将信息写入redis中,以备后用 - redisTemplate.delete(key); + redisTemplate.delete((String)key); } } } diff --git a/src/main/java/com/genersoft/iot/vmp/web/custom/service/CameraChannelService.java b/src/main/java/com/genersoft/iot/vmp/web/custom/service/CameraChannelService.java index 4bfc722ba..fdfa51283 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/custom/service/CameraChannelService.java +++ b/src/main/java/com/genersoft/iot/vmp/web/custom/service/CameraChannelService.java @@ -17,6 +17,7 @@ import com.genersoft.iot.vmp.gb28181.service.IGbChannelControlService; import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService; import com.genersoft.iot.vmp.service.bean.ErrorCallback; import com.genersoft.iot.vmp.utils.Coordtransform; +import com.genersoft.iot.vmp.utils.DateUtil; import com.genersoft.iot.vmp.vmanager.bean.ErrorCode; import com.genersoft.iot.vmp.web.custom.bean.*; import com.genersoft.iot.vmp.web.custom.conf.SyTokenManager; @@ -51,7 +52,7 @@ public class CameraChannelService implements CommandLineRunner { private GroupMapper groupMapper; @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Autowired private RedisTemplate redisTemplateForString; @@ -334,7 +335,7 @@ public class CameraChannelService implements CommandLineRunner { // 发送redis消息 JSONObject jsonObject = new JSONObject(); - jsonObject.put("gpsDate", mobilePosition.getTime()); + jsonObject.put("gpsDate", DateUtil.timestampMsTo_yyyy_MM_dd_HH_mm_ss(mobilePosition.getTimestamp())); jsonObject.put("unicodeNo", member.getUnicodeNo()); jsonObject.put("memberNo", member.getNo()); jsonObject.put("unitNo", member.getUnitNo()); diff --git a/src/main/java/com/genersoft/iot/vmp/web/custom/service/SyServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/web/custom/service/SyServiceImpl.java index 095076849..71bff3026 100644 --- a/src/main/java/com/genersoft/iot/vmp/web/custom/service/SyServiceImpl.java +++ b/src/main/java/com/genersoft/iot/vmp/web/custom/service/SyServiceImpl.java @@ -23,7 +23,7 @@ import java.util.List; public class SyServiceImpl implements IMapService { @Autowired - private RedisTemplate redisTemplate; + private RedisTemplate redisTemplate; @Override public List getConfig() {