优化移动位置相关逻辑

This commit is contained in:
lin 2026-04-18 23:31:47 +08:00
parent 775376b327
commit 9909aa9655
56 changed files with 438 additions and 632 deletions

View File

@ -21,7 +21,7 @@ public class UserManager implements org.apache.ftpserver.ftplet.UserManager {
private static final String PREFIX = "VMP_FTP_USER_";
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Override

View File

@ -38,7 +38,7 @@ public class RedisRpcConfig implements MessageListener {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();

View File

@ -13,8 +13,8 @@ import org.springframework.data.redis.serializer.StringRedisSerializer;
public class RedisTemplateConfig {
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
// 使用fastJson序列化
GenericFastJsonRedisSerializer fastJsonRedisSerializer = new GenericFastJsonRedisSerializer();
// value值的序列化采用fastJsonRedisSerializer

View File

@ -191,6 +191,8 @@ public class DeviceChannel extends CommonGBChannel {
@Schema(description = "通道类型, 默认0, 0 普通通道1 行政区划 2 业务分组/虚拟组织")
private int channelType;
private String dbKey;
private Integer dataType = ChannelDataType.GB28181;
public void setPtzType(int ptzType) {

View File

@ -0,0 +1,17 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import lombok.EqualsAndHashCode;
@EqualsAndHashCode(callSuper = true)
@Data
@Schema(description = "国标共享通道")
public class ShareGBChannel extends CommonGBChannel{
@Schema(description = "平台ID")
private int platformId;
}

View File

@ -4,12 +4,15 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author lin
@ -25,7 +28,7 @@ public class SubscribeHolder {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
private final String prefix = "VMP_SUBSCRIBE_OVERDUE";
@ -106,15 +109,28 @@ public class SubscribeHolder {
return result;
}
public List<String> getAllMobilePositionSubscribePlatform(List<Platform> platformList) {
public Map<Integer, Platform> getAllMobilePositionSubscribePlatform(List<Platform> platformList) {
if (platformList == null || platformList.isEmpty()) {
return new ArrayList<>();
return new HashMap<>();
}
List<String> result = new ArrayList<>();
for (Platform platform : platformList) {
String key = String.format("%s:%s:%s", prefix, "mobilePosition", platform.getServerGBId());
if (redisTemplate.hasKey(key)) {
result.add(platform.getServerGBId());
Map<Integer, Platform> result = new HashMap<>();
// 1. 先批量构建所有 key
List<String> keys = platformList.stream()
.map(platform -> String.format("%s:%s:%s", prefix, "mobilePosition", platform.getServerGBId()))
.toList();
// 2. 批量查询 Redis 关键只发1次请求
List<Object> results = redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
for (String key : keys) {
// 注意这里使用的是底层 connection 接口
connection.keyCommands().exists(key.getBytes());
}
return null; // 流水线模式下必须返回 null
});
for (int i = 0; i < results.size(); i++) {
if (results.get(i) instanceof Boolean exists && exists) {
result.put(platformList.get(i).getId(), platformList.get(i));
}
}
return result;

View File

@ -48,7 +48,7 @@ import java.util.concurrent.TimeUnit;
public class ChannelController {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private IGbChannelService channelService;

View File

@ -48,23 +48,14 @@ public class MobilePositionController {
private IDeviceService deviceService;
/**
* 查询历史轨迹
* @param deviceId 设备ID
* @param start 开始时间
* @param end 结束时间
* @return
*/
@Operation(summary = "查询历史轨迹", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@Parameter(name = "channelId", description = "通道国标编号")
@Parameter(name = "channelId", description = "通道的数据库ID")
@Parameter(name = "start", description = "开始时间")
@Parameter(name = "end", description = "结束时间")
@GetMapping("/history/{deviceId}")
public List<MobilePosition> positions(@PathVariable String deviceId,
@RequestParam(required = false) String channelId,
@RequestParam(required = false) String start,
@RequestParam(required = false) String end) {
public List<MobilePosition> positions( Integer channelId,
@RequestParam(required = false) String start,
@RequestParam(required = false) String end) {
if (StringUtil.isEmpty(start)) {
start = null;
@ -72,19 +63,14 @@ public class MobilePositionController {
if (StringUtil.isEmpty(end)) {
end = null;
}
return mobilePositionService.queryMobilePositions(deviceId, channelId, start, end);
return mobilePositionService.queryMobilePositions(channelId, start, end);
}
/**
* 查询设备最新位置
* @param deviceId 设备ID
* @return
*/
@Operation(summary = "查询设备最新位置", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@GetMapping("/latest/{deviceId}")
public MobilePosition latestPosition(@PathVariable String deviceId) {
return mobilePositionService.queryLatestPosition(deviceId);
@Operation(summary = "查询通道最新位置", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "channelId", description = "通道的数据库ID", required = true)
@GetMapping("/latest")
public MobilePosition latestPosition(Integer channelId) {
return mobilePositionService.queryLatestPosition(channelId);
}
/**

View File

@ -621,7 +621,7 @@ public interface DeviceChannelMapper {
@Select(value = {" <script>" +
" SELECT " +
" CONCAT(data_device_id, '_', device_id) as deviceIdKey" +
" CONCAT(data_device_id, '_', device_id) as dbKey, \n" +
" id,\n" +
" data_device_id,\n" +
" create_time,\n" +
@ -672,10 +672,10 @@ public interface DeviceChannelMapper {
" where data_type = 1 " +
" and data_device_id = #{deviceId} " +
" and device_id in " +
"<foreach item='item' index='index' collection='channelIds' open='(' separator=',' close=')'>" +
" #{item} " +
"<foreach item='item' index='index' collection='mobilePositionList' open='(' separator=',' close=')'>" +
" #{item.channelDeviceId} " +
"</foreach>" +
"</script>"})
@MapKey("deviceIdKey")
@MapKey("dbKey")
Map<String, DeviceChannel> getAllForMobilePosition(@Param("deviceId") int deviceId, List<DeviceMobilePosition> mobilePositionList);
}

View File

@ -1,7 +1,6 @@
package com.genersoft.iot.vmp.gb28181.dao;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
@ -12,40 +11,33 @@ import java.util.List;
@Mapper
public interface MobilePositionMapper {
@Insert("INSERT INTO wvp_mobile_position (device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source,create_time)"+
"VALUES (#{deviceId}, #{channelId}, #{deviceName}, #{time}, #{longitude}, #{latitude}, #{altitude}, #{speed}, #{direction}, #{reportSource}, #{createTime})")
@Insert("INSERT INTO wvp_mobile_position (channel_id, timestamp, longitude, latitude, altitude, speed, direction, create_time)"+
"VALUES (#{channelId}, #{timestamp}, #{longitude}, #{latitude}, #{altitude}, #{speed}, #{direction}, #{createTime})")
int insertNewPosition(MobilePosition mobilePosition);
@Select(value = {" <script>" +
"SELECT * FROM wvp_mobile_position" +
" WHERE device_id = #{deviceId}" +
"<if test=\"channelId != null\"> and channel_id = #{channelId}</if>" +
"<if test=\"startTime != null\"> AND time&gt;=#{startTime}</if>" +
"<if test=\"endTime != null\"> AND time&lt;=#{endTime}</if>" +
" WHERE channel_id = #{channelId}" +
"<if test=\"startTime != null\"> AND timestamp&gt;=#{startTime}</if>" +
"<if test=\"endTime != null\"> AND timestamp&lt;=#{endTime}</if>" +
" ORDER BY time ASC" +
" </script>"})
List<MobilePosition> queryPositionByDeviceIdAndTime(@Param("deviceId") String deviceId, @Param("channelId") String channelId, @Param("startTime") String startTime, @Param("endTime") String endTime);
List<MobilePosition> queryPositionByDeviceIdAndTime(@Param("channelId") Integer channelId, @Param("startTime") Long startTime, @Param("endTime") Long endTime);
@Select("SELECT * FROM wvp_mobile_position WHERE device_id = #{deviceId}" +
" ORDER BY time DESC LIMIT 1")
MobilePosition queryLatestPositionByDevice(String deviceId);
@Delete("DELETE FROM wvp_mobile_position WHERE device_id = #{deviceId}")
int clearMobilePositionsByDeviceId(String deviceId);
@Select("SELECT * FROM wvp_mobile_position WHERE channel_id = #{channelId}" +
" ORDER BY timestamp DESC LIMIT 1")
MobilePosition queryLatestPosition(@Param("channelId") Integer channelId);
@Insert("<script> " +
"<foreach collection='mobilePositions' index='index' item='item' separator=';'> " +
"insert into wvp_mobile_position " +
"(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
"(channel_id, timestamp,longitude,latitude,altitude,speed,direction," +
"create_time)"+
"values " +
"(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
"#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
"#{item.reportSource}, #{item.createTime}) " +
"( #{item.channelId}, #{item.timestamp}, #{item.longitude}, " +
" #{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
" #{item.createTime}) " +
"</foreach> " +
"</script>")
void batchadd(List<MobilePosition> mobilePositions);
void insertMobilePositions(List<MobilePosition> batchList);
void batchAdd(List<MobilePosition> mobilePositions);
}

View File

@ -548,4 +548,55 @@ public interface PlatformChannelMapper {
" order by wcr.id DESC" +
" </script>")
Set<Region> queryShareRegion(Integer id);
@Select("<script>" +
" select " +
" wpgc.platform_id as platform_id" +
" wdc.id as gb_id,\n" +
" wdc.data_type,\n" +
" wdc.data_device_id,\n" +
" wdc.create_time,\n" +
" wdc.update_time,\n" +
" coalesce(wpgc.custom_device_id, wdc.gb_device_id, wdc.device_id) as gb_device_id,\n" +
" coalesce(wpgc.custom_name, wdc.gb_name, wdc.name) as gb_name,\n" +
" coalesce(wpgc.custom_manufacturer, wdc.gb_manufacturer, wdc.manufacturer) as gb_manufacturer,\n" +
" coalesce(wpgc.custom_model, wdc.gb_model, wdc.model) as gb_model,\n" +
" coalesce(wpgc.custom_owner, wdc.gb_owner, wdc.owner) as gb_owner,\n" +
" coalesce(wpgc.custom_civil_code, wdc.gb_civil_code, wdc.civil_code) as gb_civil_code,\n" +
" coalesce(wpgc.custom_block, wdc.gb_block, wdc.block) as gb_block,\n" +
" coalesce(wpgc.custom_address, wdc.gb_address, wdc.address) as gb_address,\n" +
" coalesce(wpgc.custom_parental, wdc.gb_parental, wdc.parental) as gb_parental,\n" +
" coalesce(wpgc.custom_parent_id, wdc.gb_parent_id, wdc.parent_id) as gb_parent_id,\n" +
" coalesce(wpgc.custom_safety_way, wdc.gb_safety_way, wdc.safety_way) as gb_safety_way,\n" +
" coalesce(wpgc.custom_register_way, wdc.gb_register_way, wdc.register_way) as gb_register_way,\n" +
" coalesce(wpgc.custom_cert_num, wdc.gb_cert_num, wdc.cert_num) as gb_cert_num,\n" +
" coalesce(wpgc.custom_certifiable, wdc.gb_certifiable, wdc.certifiable) as gb_certifiable,\n" +
" coalesce(wpgc.custom_err_code, wdc.gb_err_code, wdc.err_code) as gb_err_code,\n" +
" coalesce(wpgc.custom_end_time, wdc.gb_end_time, wdc.end_time) as gb_end_time,\n" +
" coalesce(wpgc.custom_secrecy, wdc.gb_secrecy, wdc.secrecy) as gb_secrecy,\n" +
" coalesce(wpgc.custom_ip_address, wdc.gb_ip_address, wdc.ip_address) as gb_ip_address,\n" +
" coalesce(wpgc.custom_port, wdc.gb_port, wdc.port) as gb_port,\n" +
" coalesce(wpgc.custom_password, wdc.gb_password, wdc.password) as gb_password,\n" +
" coalesce(wpgc.custom_status, wdc.gb_status, wdc.status) as gb_status,\n" +
" coalesce(wpgc.custom_longitude, wdc.gb_longitude, wdc.longitude) as gb_longitude,\n" +
" coalesce(wpgc.custom_latitude, wdc.gb_latitude, wdc.latitude) as gb_latitude,\n" +
" coalesce(wpgc.custom_ptz_type, wdc.gb_ptz_type, wdc.ptz_type) as gb_ptz_type,\n" +
" coalesce(wpgc.custom_position_type, wdc.gb_position_type, wdc.position_type) as gb_position_type,\n" +
" coalesce(wpgc.custom_room_type, wdc.gb_room_type, wdc.room_type) as gb_room_type,\n" +
" coalesce(wpgc.custom_use_type, wdc.gb_use_type, wdc.use_type) as gb_use_type,\n" +
" coalesce(wpgc.custom_supply_light_type, wdc.gb_supply_light_type, wdc.supply_light_type) as gb_supply_light_type,\n" +
" coalesce(wpgc.custom_direction_type, wdc.gb_direction_type, wdc.direction_type) as gb_direction_type,\n" +
" coalesce(wpgc.custom_resolution, wdc.gb_resolution, wdc.resolution) as gb_resolution,\n" +
" coalesce(wpgc.custom_business_group_id, wdc.gb_business_group_id, wdc.business_group_id) as gb_business_group_id,\n" +
" coalesce(wpgc.custom_download_speed, wdc.gb_download_speed, wdc.download_speed) as gb_download_speed,\n" +
" coalesce(wpgc.custom_svc_space_support_mod, wdc.gb_svc_space_support_mod, wdc.svc_space_support_mod) as gb_svc_space_support_mod,\n" +
" coalesce(wpgc.custom_svc_time_support_mode, wdc.gb_svc_time_support_mode, wdc.svc_time_support_mode) as gb_svc_time_support_mode\n" +
" from wvp_device_channel wdc" +
" left join wvp_platform_channel wpgc on wdc.id = wpgc.device_channel_id" +
" where wdc.channel_type = 0 and wpgc.platform_id in" +
"<foreach collection='platforms' item='item' open='(' separator=',' close=')' > #{item.id} </foreach>" +
" and wdc.id in " +
"<foreach collection='channelIds' item='item' open='(' separator=',' close=')' > #{item} </foreach> " +
"</script>")
List<ShareGBChannel> queryShareChannelInPlatformsAndChannelIds(Collection<Platform> platforms, Collection<Integer> channelIds);
}

View File

@ -1,75 +0,0 @@
package com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService;
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.List;
/**
* 移动位置通知消息转发
*/
@Slf4j
@Component
public class MobilePositionEventLister implements ApplicationListener<MobilePositionEvent> {
@Autowired
private IPlatformService platformService;
@Autowired
private IPlatformChannelService platformChannelService;
@Autowired
private SIPCommanderForPlatform sipCommanderForPlatform;
@Autowired
private SubscribeHolder subscribeHolder;
@Autowired
private UserSetting userSetting;
@Override
public void onApplicationEvent(MobilePositionEvent event) {
if (event.getMobilePosition().getChannelId() == 0) {
return;
}
List<Platform> allPlatforms = platformService.queryAll(userSetting.getServerId());
// 获取所用订阅
List<String> platforms = subscribeHolder.getAllMobilePositionSubscribePlatform(allPlatforms);
if (platforms.isEmpty()) {
return;
}
List<Platform> platformsForGB = platformChannelService.queryPlatFormListByChannelDeviceId(event.getMobilePosition().getChannelId(), platforms);
for (Platform platform : platformsForGB) {
if (log.isDebugEnabled()){
log.debug("[向上级发送MobilePosition] 通道:{},平台:{} 位置: {}:{}", event.getMobilePosition().getChannelId(),
platform.getServerGBId(), event.getMobilePosition().getLongitude(), event.getMobilePosition().getLatitude());
}
SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId());
try {
GPSMsgInfo gpsMsgInfo = GPSMsgInfo.getInstance(event.getMobilePosition());
// 获取通道编号
CommonGBChannel commonGBChannel = platformChannelService.queryChannelByPlatformIdAndChannelId(platform.getId(), event.getMobilePosition().getChannelId());
sipCommanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, commonGBChannel,
subscribe);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
IllegalAccessException e) {
log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
}
}
}
}

View File

@ -1,4 +0,0 @@
package com.genersoft.iot.vmp.gb28181.service;
public interface IMobilePositionService {
}

View File

@ -50,4 +50,6 @@ public interface IPlatformChannelService {
void checkRegionRemove(List<CommonGBChannel> channelList, List<Region> regionList);
List<Platform> queryByPlatformBySharChannelId(String gbId);
void notifyMobilePosition(List<MobilePosition> handlerCatchDataList);
}

View File

@ -73,7 +73,7 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne
private DynamicTask dynamicTask;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private VectorTileCatch vectorTileCatch;

View File

@ -43,7 +43,7 @@ public class GroupServiceImpl implements IGroupService {
private EventPublisher eventPublisher;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Override
public void add(Group group) {

View File

@ -33,7 +33,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
private final Map<String, List<ErrorCallback<StreamInfo>>> inviteErrorCallbackMap = new ConcurrentHashMap<>();
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private UserSetting userSetting;

View File

@ -1,84 +0,0 @@
package com.genersoft.iot.vmp.gb28181.service.impl;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.dao.MobilePositionMapper;
import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent;
import com.genersoft.iot.vmp.gb28181.service.IMobilePositionService;
import com.genersoft.iot.vmp.gb28181.service.ISourceOtherService;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
@Slf4j
@Service
@RequiredArgsConstructor
public class MobilePositionServiceImpl implements IMobilePositionService {
private final ConcurrentLinkedQueue<MobilePosition> mobilePositionQueue = new ConcurrentLinkedQueue<>();
private final Map<String, ISourceOtherService> sourceOtherServiceMap;
private final MobilePositionMapper mobilePositionMapper;
@PostConstruct
public void init() {
}
@Async
@EventListener
public void onApplicationEvent(MobilePositionEvent event) {
if (event.getMobilePositionList() == null || event.getMobilePositionList().isEmpty()) {
return;
}
for (ISourceOtherService sourceOtherService : sourceOtherServiceMap.values()) {
try {
Boolean addResult = sourceOtherService.addChannelIdForMobilePosition(event.getMobilePositionList());
if (addResult != null && addResult) {
mobilePositionQueue.addAll(event.getMobilePositionList());
}
}catch (Exception e) {
log.error("[移动位置事件] 处理移动位置事件失败", e);
}
}
}
@Scheduled(fixedDelay = 500)
public void executeMobilePositionQueue() {
if (mobilePositionQueue.isEmpty()) {
return;
}
List<MobilePosition> handlerCatchDataList = new ArrayList<>();
int size = mobilePositionQueue.size();
for (int i = 0; i < size; i++) {
MobilePosition poll = mobilePositionQueue.poll();
if (poll != null) {
handlerCatchDataList.add(poll);
}
}
if (handlerCatchDataList.isEmpty()) {
return;
}
// TODO 发送通知方便国标级联转发给上级
// 批量保存到数据库
int batchSize = 1000;
for (int i = 0; i < handlerCatchDataList.size(); i += batchSize) {
int end = Math.min(i + batchSize, handlerCatchDataList.size());
List<MobilePosition> batchList = handlerCatchDataList.subList(i, end);
mobilePositionMapper.insertMobilePositions(batchList);
}
}
}

View File

@ -10,11 +10,12 @@ import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -24,6 +25,7 @@ import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/**
@ -31,38 +33,29 @@ import java.util.stream.Collectors;
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class PlatformChannelServiceImpl implements IPlatformChannelService {
@Autowired
private PlatformChannelMapper platformChannelMapper;
private final PlatformChannelMapper platformChannelMapper;
@Autowired
private EventPublisher eventPublisher;
private final EventPublisher eventPublisher;
@Autowired
private GroupMapper groupMapper;
private final GroupMapper groupMapper;
private final RegionMapper regionMapper;
@Autowired
private RegionMapper regionMapper;
private final CommonGBChannelMapper commonGBChannelMapper;
@Autowired
private CommonGBChannelMapper commonGBChannelMapper;
private final PlatformMapper platformMapper;
@Autowired
private PlatformMapper platformMapper;
private final ISIPCommanderForPlatform sipCommanderForPlatform;
@Autowired
private ISIPCommanderForPlatform sipCommanderFroPlatform;
private final SubscribeHolder subscribeHolder;
@Autowired
private SubscribeHolder subscribeHolder;
private final UserSetting userSetting;
@Autowired
private UserSetting userSetting;
private final IRedisRpcService redisRpcService;
@Autowired
private IRedisRpcService redisRpcService;
// 监听通道信息变化
@EventListener
@ -116,7 +109,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
deviceChannel.setGbDeviceId(serverGbId);
deviceChannelList.add(deviceChannel);
try {
sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getMessageType().name(), platform, deviceChannelList, subscribeInfo, null);
sipCommanderForPlatform.sendNotifyForCatalogOther(event.getMessageType().name(), platform, deviceChannelList, subscribeInfo, null);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
IllegalAccessException e) {
log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
@ -146,7 +139,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
CommonGBChannel deviceChannel = channelMap.get(gbId);
channelList.add(deviceChannel);
try {
sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(event.getMessageType().name(), platform, channelList, subscribeInfo, null);
sipCommanderForPlatform.sendNotifyForCatalogAddOrUpdate(event.getMessageType().name(), platform, channelList, subscribeInfo, null);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException |
SipException | IllegalAccessException e) {
log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
@ -185,7 +178,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
if (!channels.isEmpty()) {
log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), deviceIds);
try {
sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getType(), platform, channels, subscribe, null);
sipCommanderForPlatform.sendNotifyForCatalogOther(event.getType(), platform, channels, subscribe, null);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
IllegalAccessException e) {
log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
@ -205,7 +198,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
if (!deviceChannelList.isEmpty()) {
log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), deviceIds);
try {
sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(event.getType(), platform, deviceChannelList, subscribe, null);
sipCommanderForPlatform.sendNotifyForCatalogAddOrUpdate(event.getType(), platform, deviceChannelList, subscribe, null);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
IllegalAccessException e) {
log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
@ -646,7 +639,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
SubscribeInfo subscribeInfo = SubscribeInfo.buildSimulated(platform.getServerGBId(), platform.getServerIp());
try {
sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(CatalogEvent.ADD, platform, channelList, subscribeInfo, null);
sipCommanderForPlatform.sendNotifyForCatalogAddOrUpdate(CatalogEvent.ADD, platform, channelList, subscribeInfo, null);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException |
SipException | IllegalAccessException e) {
log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
@ -852,4 +845,52 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
}
return platformChannelMapper.queryPlatFormListByChannelList(ids);
}
@Override
public void notifyMobilePosition(List<MobilePosition> mobilePositionList) {
List<Platform> allPlatforms = platformMapper.queryServerIdsWithEnableAndServer(userSetting.getServerId());
// 获取所用订阅
Map<Integer, Platform> platformMap = subscribeHolder.getAllMobilePositionSubscribePlatform(allPlatforms);
if (platformMap.isEmpty()) {
return;
}
// 对mobilePositionList内部的channelId分类
Map<Integer, List<MobilePosition>> channelIdMap = mobilePositionList.stream().collect(Collectors.groupingBy(MobilePosition::getChannelId));
List<ShareGBChannel> shareGBChannels = platformChannelMapper.queryShareChannelInPlatformsAndChannelIds(platformMap.values(), channelIdMap.keySet());
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (ShareGBChannel shareGBChannel : shareGBChannels) {
List<MobilePosition> mobilePositions = channelIdMap.get(shareGBChannel.getGbId());
if (mobilePositions == null || mobilePositions.isEmpty()) {
continue;
}
executor.submit(() -> {
Platform platform = platformMap.get(shareGBChannel.getPlatformId());
if (platform == null) {
log.info("[查询平台] 平台ID{} 未查询到", shareGBChannel.getPlatformId());
return;
}
SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId());
if (subscribe == null) {
log.info("[查询订阅] 平台:{} 未查询到移动位置订阅", platform.getServerGBId());
return;
}
for (MobilePosition mobilePosition : mobilePositions) {
try {
GPSMsgInfo gpsMsgInfo = GPSMsgInfo.getInstance(mobilePosition);
// 获取通道编号
CommonGBChannel commonGBChannel = queryChannelByPlatformIdAndChannelId(platform.getId(), mobilePosition.getChannelId());
sipCommanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, commonGBChannel,
subscribe);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
IllegalAccessException e) {
log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
}
}
});
}
}
}
}

View File

@ -33,7 +33,7 @@ public class CatalogDataManager implements CommandLineRunner {
private IGroupService groupService;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
private final Map<String, CatalogData> dataMap = new ConcurrentHashMap<>();

View File

@ -20,7 +20,7 @@ public class SipInviteSessionManager {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
/**
* 添加一个点播/回放的事务信息

View File

@ -26,7 +26,7 @@ public class SubscribeTaskRunner{
private final DelayQueue<SubscribeTask> delayQueue = new DelayQueue<>();
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private UserSetting userSetting;

View File

@ -30,7 +30,7 @@ public class PlatformStatusTaskRunner {
private final DelayQueue<PlatformKeepaliveTask> keepaliveTaskDelayQueue = new DelayQueue<>();
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private UserSetting userSetting;

View File

@ -2,25 +2,18 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.DeviceMobilePosition;
import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.service.IMobilePositionService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.sip.RequestEvent;
import javax.sip.header.FromHeader;
@ -33,27 +26,18 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessorParent {
private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
@Autowired
private UserSetting userSetting;
private final UserSetting userSetting;
@Autowired
private EventPublisher eventPublisher;
private final EventPublisher eventPublisher;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IDeviceChannelService deviceChannelService;
@Autowired
private IMobilePositionService mobilePositionService;
private final IRedisCatchStorage redisCatchStorage;
public void process(RequestEvent evt) {
if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
log.error("[notify-移动位置] 待处理消息队列已满 {}返回486 BUSY_HERE消息不做处理", userSetting.getMaxNotifyCountQueue());
return;
@ -61,127 +45,56 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
taskQueue.offer(new HandlerCatchData(evt, null, null));
}
@Scheduled(fixedDelay = 200) //每200毫秒执行一次
@Scheduled(fixedDelay = 200)
@Async
public void executeTaskQueue() {
if (taskQueue.isEmpty()) {
return;
}
List<HandlerCatchData> handlerCatchDataList = new ArrayList<>();
while (!taskQueue.isEmpty()) {
handlerCatchDataList.add(taskQueue.poll());
int size = taskQueue.size();
for (int i = 0; i < size; i++) {
HandlerCatchData poll = taskQueue.poll();
if (poll != null) {
handlerCatchDataList.add(poll);
}
}
if (handlerCatchDataList.isEmpty()) {
return;
}
List<DeviceMobilePosition> mobilePositionList = new ArrayList<>();
for (HandlerCatchData take : handlerCatchDataList) {
if (take == null) {
continue;
}
RequestEvent evt = take.getEvt();
try {
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
long startTime = System.currentTimeMillis();
// 回复 200 OK
Element rootElement = getRootElement(evt);
if (rootElement == null) {
log.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest());
continue;
}
Device device = redisCatchStorage.getDevice(deviceId);
if (device == null) {
log.error("处理MobilePosition移动位置Notify时未获取到device,{}", deviceId);
log.error("[notify-移动位置] 未获取到device, {}", deviceId);
continue;
}
MobilePosition mobilePosition = new MobilePosition();
mobilePosition.setCreateTime(DateUtil.getNow());
DeviceChannel deviceChannel = null;
List<Element> elements = rootElement.elements();
readDocument: for (Element element : elements) {
switch (element.getName()){
case "DeviceID":
String channelId = element.getStringValue();
deviceChannel = deviceChannelService.getOne(device.getDeviceId(), channelId);
if (deviceChannel != null) {
mobilePosition.setChannelId(deviceChannel.getId());
mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId());
}else {
log.error("[notify-移动位置] 未找到通道 {}/{}", device.getDeviceId(), channelId);
break readDocument;
}
break;
case "Time":
String timeVal = element.getStringValue();
if (ObjectUtils.isEmpty(timeVal)){
mobilePosition.setTimestamp(System.currentTimeMillis());
}else {
Long timestamp = SipUtils.parseTimeForTimestamp(time);
if(timestamp == null) {
log.warn("解析移动位置时间失败:{} 使用当前时间", time);
mobilePosition.setTimestamp(System.currentTimeMillis());
}else {
mobilePosition.setTimestamp(timestamp);
}
}
break;
case "Longitude":
mobilePosition.setLongitude(Double.parseDouble(element.getStringValue()));
break;
case "Latitude":
mobilePosition.setLatitude(Double.parseDouble(element.getStringValue()));
break;
case "Speed":
String speedVal = element.getStringValue();
if (NumericUtil.isDouble(speedVal)) {
mobilePosition.setSpeed(Double.parseDouble(speedVal));
} else {
mobilePosition.setSpeed(0.0);
}
break;
case "Direction":
String directionVal = element.getStringValue();
if (NumericUtil.isDouble(directionVal)) {
mobilePosition.setDirection(Double.parseDouble(directionVal));
} else {
mobilePosition.setDirection(0.0);
}
break;
case "Altitude":
String altitudeVal = element.getStringValue();
if (NumericUtil.isDouble(altitudeVal)) {
mobilePosition.setAltitude(Double.parseDouble(altitudeVal));
} else {
mobilePosition.setAltitude(0.0);
}
break;
}
}
if (deviceChannel == null) {
Element rootElement = getRootElement(evt, device.getCharset());
if (rootElement == null) {
log.warn("[notify-移动位置] {}处理失败,未识别到信息体", deviceId);
continue;
}
log.info("[收到移动位置订阅通知]{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelDeviceId(),
mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
mobilePositionService.add(mobilePosition);
// 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
try {
eventPublisher.mobilePositionEventPublish(mobilePosition);
}catch (Exception e) {
log.error("[MobilePositionEvent] 发送失败: ", e);
List<DeviceMobilePosition> mobilePositions = DeviceMobilePosition.decode(device, rootElement);
for (DeviceMobilePosition mobilePosition : mobilePositions) {
log.info("[收到移动位置订阅通知]{}/{}->{}.{}, 时间: {}", device.getDeviceId(), mobilePosition.getChannelDeviceId(),
mobilePosition.getLongitude(), mobilePosition.getLatitude(), mobilePosition.getTimestamp());
mobilePositionList.add(mobilePosition);
}
} catch (DocumentException e) {
log.error("[收到移动位置订阅通知] 文档解析异常: \r\n{}", evt.getRequest(), e);
} catch ( Exception e) {
log.error("[收到移动位置订阅通知] 异常: ", e);
} catch (Exception e) {
log.warn("[notify-移动位置] 发现未处理的异常, \r\n{}", evt.getRequest());
log.error("[notify-移动位置] 异常内容: ", e);
}
}
if (!mobilePositionList.isEmpty()) {
try {
eventPublisher.mobilePositionsEventPublish(mobilePositionList);
} catch (Exception e) {
log.error("[MobilePositionEvent] 发送失败: ", e);
}
}
}
// @Scheduled(fixedRate = 10000)
// public void execute(){
// logger.debug("[待处理Notify-移动位置订阅消息数量]: {}", taskQueue.size());
// }
}

View File

@ -114,21 +114,16 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
if (deviceChannel == null) {
log.warn("[解析报警消息] 未找到通道:{}/{}", device.getDeviceId(), deviceAlarmNotify.getChannelId());
} else {
MobilePosition mobilePosition = new MobilePosition();
DeviceMobilePosition mobilePosition = new DeviceMobilePosition();
mobilePosition.setCreateTime(DateUtil.getNow());
mobilePosition.setDeviceId(device.getDeviceId());
mobilePosition.setChannelId(deviceChannel.getId());
mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId());
mobilePosition.setTime(deviceAlarmNotify.getAlarmTime());
mobilePosition.setTimestamp(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestampMs(deviceAlarmNotify.getAlarmTime()));
mobilePosition.setLongitude(deviceAlarmNotify.getLongitude());
mobilePosition.setLatitude(deviceAlarmNotify.getLatitude());
// 更新device channel 的经纬度
deviceChannel.setLongitude(mobilePosition.getLongitude());
deviceChannel.setLatitude(mobilePosition.getLatitude());
deviceChannel.setGpsTime(mobilePosition.getTime());
deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
mobilePosition.setDevice(device);
// 发送移动位置事件后续会保存到数据库并且发送给上级平台
publisher.mobilePositionsEventPublish(List.of(mobilePosition));
}
}

View File

@ -54,7 +54,7 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
log.error("[message-notify-移动位置] 待处理消息队列已满 {}返回486 BUSY_HERE", userSetting.getMaxNotifyCountQueue());
return;
}
taskQueue.offer(new HandlerCatchData(evt, null, null));
taskQueue.offer(new HandlerCatchData(evt, device, rootElement));
// 回复200 OK
try {
responseAckAsync((SIPRequest) evt.getRequest(), Response.OK);

View File

@ -1,34 +1,33 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.bean.DeviceMobilePosition;
import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.utils.DateUtil;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.message.Response;
import java.text.ParseException;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* 移动设备位置数据查询回复
@ -36,18 +35,20 @@ import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class MobilePositionResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
private final String cmdType = "MobilePosition";
@Autowired
private ResponseMessageHandler responseMessageHandler;
private final ResponseMessageHandler responseMessageHandler;
@Autowired
private IDeviceChannelService deviceChannelService;
private final EventPublisher eventPublisher;
@Autowired
private DeferredResultHolder resultHolder;
private final UserSetting userSetting;
private final DeferredResultHolder resultHolder;
private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
@Override
public void afterPropertiesSet() throws Exception {
@ -56,79 +57,66 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar
@Override
public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
SIPRequest request = (SIPRequest) evt.getRequest();
if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
log.error("[移动设备位置查询回复] 待处理消息队列已满 {},丢弃消息", userSetting.getMaxNotifyCountQueue());
return;
}
taskQueue.offer(new HandlerCatchData(evt, device, rootElement));
try {
rootElement = getRootElement(evt, device.getCharset());
if (rootElement == null) {
log.warn("[ 移动设备位置数据查询回复 ] content cannot be null, {}", evt.getRequest());
try {
responseAckAsync(request, Response.BAD_REQUEST);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 移动设备位置数据查询 BAD_REQUEST: {}", e.getMessage());
}
return;
responseAckAsync((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 移动设备位置数据查询 200: {}", e.getMessage());
}
}
@Scheduled(fixedDelay = 400)
@Async
public void executeTaskQueue() {
if (taskQueue.isEmpty()) {
return;
}
List<HandlerCatchData> handlerCatchDataList = new ArrayList<>();
int size = taskQueue.size();
for (int i = 0; i < size; i++) {
HandlerCatchData poll = taskQueue.poll();
if (poll != null) {
handlerCatchDataList.add(poll);
}
String channelId = getText(rootElement, "DeviceID");
DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), channelId);
if (deviceChannel == null) {
log.warn("[解析报警消息] 未找到通道:{}/{}", device.getDeviceId(), channelId);
}else {
MobilePosition mobilePosition = new MobilePosition();
mobilePosition.setCreateTime(DateUtil.getNow());
if (!ObjectUtils.isEmpty(device.getName())) {
mobilePosition.setDeviceName(device.getName());
}
mobilePosition.setChannelId(deviceChannel.getId());
mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId());
//兼容ISO 8601格式时间
String time = getText(rootElement, "Time");
if (ObjectUtils.isEmpty(time)){
mobilePosition.setTime(DateUtil.getNow());
}else {
mobilePosition.setTime(SipUtils.parseTime(time));
}
mobilePosition.setLongitude(Double.parseDouble(getText(rootElement, "Longitude")));
mobilePosition.setLatitude(Double.parseDouble(getText(rootElement, "Latitude")));
if (NumericUtil.isDouble(getText(rootElement, "Speed"))) {
mobilePosition.setSpeed(Double.parseDouble(getText(rootElement, "Speed")));
} else {
mobilePosition.setSpeed(0.0);
}
if (NumericUtil.isDouble(getText(rootElement, "Direction"))) {
mobilePosition.setDirection(Double.parseDouble(getText(rootElement, "Direction")));
} else {
mobilePosition.setDirection(0.0);
}
if (NumericUtil.isDouble(getText(rootElement, "Altitude"))) {
mobilePosition.setAltitude(Double.parseDouble(getText(rootElement, "Altitude")));
} else {
mobilePosition.setAltitude(0.0);
}
// 更新device channel 的经纬度
deviceChannel.setLongitude(mobilePosition.getLongitude());
deviceChannel.setLatitude(mobilePosition.getLatitude());
deviceChannel.setGpsTime(mobilePosition.getTime());
deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
String key = DeferredResultHolder.CALLBACK_CMD_MOBILE_POSITION + device.getDeviceId();
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setData(mobilePosition);
resultHolder.invokeAllResult(msg);
}
//回复 200 OK
}
if (handlerCatchDataList.isEmpty()) {
return;
}
List<DeviceMobilePosition> mobilePositionList = new ArrayList<>();
for (HandlerCatchData take : handlerCatchDataList) {
Device device = take.getDevice();
try {
responseAckAsync(request, Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 移动设备位置数据查询 200: {}", e.getMessage());
Element rootElementAfterCharset = getRootElement(take.getEvt(), device.getCharset());
if (rootElementAfterCharset == null) {
log.warn("[移动设备位置查询回复] {}处理失败,未识别到信息体", device.getDeviceId());
continue;
}
List<DeviceMobilePosition> mobilePositions = DeviceMobilePosition.decode(device, rootElementAfterCharset);
for (DeviceMobilePosition mobilePosition : mobilePositions) {
log.info("[收到移动位置查询回复]{}/{}->{}.{}, 时间: {}", device.getDeviceId(), mobilePosition.getChannelDeviceId(),
mobilePosition.getLongitude(), mobilePosition.getLatitude(), mobilePosition.getTimestamp());
mobilePositionList.add(mobilePosition);
String key = DeferredResultHolder.CALLBACK_CMD_MOBILE_POSITION + device.getDeviceId();
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setData(mobilePosition);
resultHolder.invokeAllResult(msg);
}
} catch (Exception e) {
log.warn("[移动设备位置查询回复] 发现未处理的异常, \r\n{}", take.getEvt().getRequest());
log.error("[移动设备位置查询回复] 异常内容: ", e);
}
}
if (!mobilePositionList.isEmpty()) {
try {
eventPublisher.mobilePositionsEventPublish(mobilePositionList);
} catch (Exception e) {
log.error("[MobilePositionEvent] 发送失败: ", e);
}
} catch (DocumentException e) {
log.error("未处理的异常 ", e);
}
}

View File

@ -49,7 +49,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
private ApplicationEventPublisher applicationEventPublisher;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
private Long recordInfoTtl = 1800L;

View File

@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.jt1078.service.impl;
import com.genersoft.iot.vmp.common.enums.ChannelDataType;
import com.genersoft.iot.vmp.common.enums.MediaStreamUtil;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.service.ISourceOtherService;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078PlayService;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service;
@ -10,6 +11,8 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.List;
@Slf4j
@Service(ChannelDataType.OTHER_SERVICE + ChannelDataType.JT_1078)
@RequiredArgsConstructor
@ -17,8 +20,6 @@ public class SourceOtherServiceForJTImpl implements ISourceOtherService {
private final UserSetting userSetting;
private final Ijt1078Service ijt1078Service;
private final Ijt1078PlayService jt1078PlayService;
@Override
@ -43,4 +44,10 @@ public class SourceOtherServiceForJTImpl implements ISourceOtherService {
}
return false;
}
@Override
public Boolean addChannelIdForMobilePosition(List<? extends MobilePosition> mobilePositionList) {
return null;
}
}

View File

@ -65,7 +65,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
private JT1078Template jt1078Template;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private HookSubscribe subscribe;

View File

@ -66,7 +66,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
private JT1078Template jt1078Template;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private IGbChannelService channelService;

View File

@ -80,7 +80,7 @@ public class ABLHttpHookListener {
private SSRCFactory ssrcFactory;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ApplicationEventPublisher applicationEventPublisher;

View File

@ -73,7 +73,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
private IInviteStreamService inviteStreamService;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private Map<String, IMediaNodeServerService> nodeServerServiceMap;

View File

@ -8,14 +8,10 @@ import java.util.List;
public interface IMobilePositionService {
void add(List<MobilePosition> mobilePositionList);
void add(MobilePosition mobilePosition);
List<MobilePosition> queryMobilePositions(String deviceId, String channelId, String startTime, String endTime);
List<MobilePosition> queryMobilePositions(Integer channelId, String startTime, String endTime);
List<Platform> queryEnablePlatformListWithAsMessageChannel();
MobilePosition queryLatestPosition(String deviceId);
MobilePosition queryLatestPosition(Integer channelId);
}

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.utils.DateUtil;
import lombok.Data;
@Data
@ -61,7 +62,7 @@ public class GPSMsgInfo {
gpsMsgInfo.setLat(mobilePosition.getLatitude());
gpsMsgInfo.setSpeed(mobilePosition.getSpeed());
gpsMsgInfo.setDirection(mobilePosition.getDirection());
gpsMsgInfo.setTime(mobilePosition.getTime());
gpsMsgInfo.setTime(DateUtil.timestampMsTo_yyyy_MM_dd_HH_mm_ss(mobilePosition.getTimestamp()));
return gpsMsgInfo;
}
}

View File

@ -1,82 +1,55 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
import com.genersoft.iot.vmp.gb28181.dao.MobilePositionMapper;
import com.genersoft.iot.vmp.gb28181.dao.PlatformMapper;
import com.genersoft.iot.vmp.gb28181.utils.Coordtransform;
import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent;
import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService;
import com.genersoft.iot.vmp.gb28181.service.ISourceOtherService;
import com.genersoft.iot.vmp.service.IMobilePositionService;
import com.genersoft.iot.vmp.utils.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
@Slf4j
@Service
@RequiredArgsConstructor
public class MobilePositionServiceImpl implements IMobilePositionService {
@Autowired
private DeviceMapper deviceMapper;
private final ConcurrentLinkedQueue<MobilePosition> mobilePositionQueue = new ConcurrentLinkedQueue<>();
@Autowired
private DeviceChannelMapper channelMapper;
private final Map<String, ISourceOtherService> sourceOtherServiceMap;
@Autowired
private MobilePositionMapper mobilePositionMapper;
@Autowired
private UserSetting userSetting;
@Autowired
private PlatformMapper platformMapper;
@Autowired
private RedisTemplate<String, MobilePosition> redisTemplate;
private final String REDIS_MOBILE_POSITION_LIST = "redis_mobile_position_list";
@Override
public void add(MobilePosition mobilePosition) {
List<MobilePosition> list = new ArrayList<>();
list.add(mobilePosition);
add(list);
}
@Override
public void add(List<MobilePosition> mobilePositionList) {
redisTemplate.opsForList().leftPushAll(REDIS_MOBILE_POSITION_LIST, mobilePositionList);
}
private List<MobilePosition> get(int length) {
Long size = redisTemplate.opsForList().size(REDIS_MOBILE_POSITION_LIST);
if (size == null || size == 0) {
return new ArrayList<>();
}
return redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, Math.min(length, size));
}
private final MobilePositionMapper mobilePositionMapper;
private final IPlatformChannelService platformChannelService;
private final PlatformMapper platformMapper;
/**
* 查询移动位置轨迹
*/
@Override
public synchronized List<MobilePosition> queryMobilePositions(String deviceId, String channelId, String startTime, String endTime) {
return mobilePositionMapper.queryPositionByDeviceIdAndTime(deviceId, channelId, startTime, endTime);
public synchronized List<MobilePosition> queryMobilePositions(Integer channelId, String startTime, String endTime) {
Long startTimestamp = null;
Long endTimestamp = null;
if (startTime != null) {
startTimestamp = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestampMs(startTime);
}
if (endTime != null) {
endTimestamp = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestampMs(endTime);
}
return mobilePositionMapper.queryPositionByDeviceIdAndTime(channelId, startTimestamp, endTimestamp);
}
@Override
@ -88,57 +61,56 @@ public class MobilePositionServiceImpl implements IMobilePositionService {
* 查询最新移动位置
*/
@Override
public MobilePosition queryLatestPosition(String deviceId) {
return mobilePositionMapper.queryLatestPositionByDevice(deviceId);
public MobilePosition queryLatestPosition(Integer channelId) {
return mobilePositionMapper.queryLatestPosition(channelId);
}
@Scheduled(fixedDelay = 1000)
@Transactional
public void executeTaskQueue() {
int countLimit = 3000;
List<MobilePosition> mobilePositions = get(countLimit);
if (mobilePositions == null || mobilePositions.isEmpty()) {
@Async
@EventListener
public void onApplicationEvent(MobilePositionEvent event) {
if (event.getMobilePositionList() == null || event.getMobilePositionList().isEmpty()) {
return;
}
if (userSetting.getSavePositionHistory()) {
mobilePositionMapper.batchadd(mobilePositions);
}
log.info("[移动位置订阅]更新通道位置: {}", mobilePositions.size());
Map<String, Map<Integer, DeviceChannel>> updateChannelMap = new HashMap<>();
for (MobilePosition mobilePosition : mobilePositions) {
DeviceChannel deviceChannel = new DeviceChannel();
deviceChannel.setId(mobilePosition.getChannelId());
deviceChannel.setDeviceId(mobilePosition.getDeviceId());
deviceChannel.setLongitude(mobilePosition.getLongitude());
deviceChannel.setLatitude(mobilePosition.getLatitude());
deviceChannel.setGpsTime(mobilePosition.getTime());
deviceChannel.setUpdateTime(DateUtil.getNow());
if (mobilePosition.getLongitude() > 0 || mobilePosition.getLatitude() > 0) {
Double[] wgs84Position = Coordtransform.GCJ02ToWGS84(mobilePosition.getLongitude(), mobilePosition.getLatitude());
deviceChannel.setGbLongitude(wgs84Position[0]);
deviceChannel.setGbLatitude(wgs84Position[1]);
for (ISourceOtherService sourceOtherService : sourceOtherServiceMap.values()) {
try {
// 此时已经完成了通道ID的添加以及坐标系的转换后续只需要将数据保存到数据库即可
Boolean addResult = sourceOtherService.addChannelIdForMobilePosition(event.getMobilePositionList());
if (addResult != null && addResult) {
mobilePositionQueue.addAll(event.getMobilePositionList());
}
}catch (Exception e) {
log.error("[移动位置事件] 处理移动位置事件失败", e);
}
if (!updateChannelMap.containsKey(mobilePosition.getDeviceId())) {
updateChannelMap.put(mobilePosition.getDeviceId(), new HashMap<>());
}
updateChannelMap.get(mobilePosition.getDeviceId()).put(mobilePosition.getChannelId(), deviceChannel);
}
List<String> deviceIds = new ArrayList<>(updateChannelMap.keySet());
if (deviceIds.isEmpty()) {
log.info("[移动位置订阅]为查询到对应的设备,消息已经忽略");
}
@Scheduled(fixedDelay = 500)
public void executeMobilePositionQueue() {
if (mobilePositionQueue.isEmpty()) {
return;
}
List<Device> deviceList = deviceMapper.queryByDeviceIds(deviceIds);
for (Device device : deviceList) {
Map<Integer, DeviceChannel> channelMap = updateChannelMap.get(device.getDeviceId());
if (device.getGeoCoordSys().equalsIgnoreCase("GCJ02")) {
channelMap.values().forEach(channel -> {
Double[] wgs84Position = Coordtransform.GCJ02ToWGS84(channel.getLongitude(), channel.getLatitude());
channel.setGbLongitude(wgs84Position[0]);
channel.setGbLatitude(wgs84Position[1]);
});
List<MobilePosition> handlerCatchDataList = new ArrayList<>();
int size = mobilePositionQueue.size();
for (int i = 0; i < size; i++) {
MobilePosition poll = mobilePositionQueue.poll();
if (poll != null) {
handlerCatchDataList.add(poll);
}
channelMapper.batchUpdatePosition(new ArrayList<>(channelMap.values()));
}
if (handlerCatchDataList.isEmpty()) {
return;
}
List<MobilePosition> mobilePositionList = handlerCatchDataList.stream().filter(
mobilePosition -> mobilePosition.getChannelId() != 0).toList();
// 发送通知方便国标级联转发给上级
Thread.startVirtualThread(() -> platformChannelService.notifyMobilePosition(mobilePositionList));
// 批量保存到数据库
int batchSize = 1000;
for (int i = 0; i < mobilePositionList.size(); i += batchSize) {
int end = Math.min(i + batchSize, mobilePositionList.size());
List<MobilePosition> batchList = mobilePositionList.subList(i, end);
mobilePositionMapper.batchAdd(batchList);
}
}

View File

@ -53,7 +53,7 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
private HookSubscribe subscribe;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
/**
* 流到来的处理

View File

@ -27,7 +27,7 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Override

View File

@ -20,7 +20,7 @@ public class UserApiKeyServiceImpl implements IUserApiKeyService {
UserApiKeyMapper userApiKeyMapper;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Override
public int addApiKey(UserApiKey userApiKey) {

View File

@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
import com.genersoft.iot.vmp.utils.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
@ -29,16 +30,12 @@ import java.util.stream.Collectors;
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisGpsMsgListener implements MessageListener {
@Autowired
private IRedisCatchStorage redisCatchStorage;
private final IRedisCatchStorage redisCatchStorage;
@Autowired
private IStreamPushService streamPushService;
@Autowired
private IGbChannelService channelService;
private final IGbChannelService channelService;
private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();

View File

@ -35,7 +35,7 @@ public class RedisRpcChannelPlayController extends RpcController {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private IGbChannelService channelService;

View File

@ -26,7 +26,7 @@ public class RedisRpcCloudRecordController extends RpcController {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ICloudRecordService cloudRecordService;

View File

@ -31,7 +31,7 @@ public class RedisRpcDeviceController extends RpcController {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private IDeviceService deviceService;

View File

@ -29,7 +29,7 @@ public class RedisRpcDevicePlayController extends RpcController {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private IDeviceService deviceService;

View File

@ -25,7 +25,7 @@ public class RedisRpcGbDeviceController extends RpcController {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private IDeviceService deviceService;

View File

@ -33,7 +33,7 @@ public class RedisRpcPlatformController extends RpcController {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private IPlatformService platformService;

View File

@ -27,7 +27,7 @@ public class RedisRpcStreamProxyController extends RpcController {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private IStreamProxyPlayService streamProxyPlayService;

View File

@ -46,7 +46,7 @@ public class RedisRpcStreamPushController extends RpcController {
private HookSubscribe hookSubscribe;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private IStreamPushPlayService streamPushPlayService;
@ -102,7 +102,7 @@ public class RedisRpcStreamPushController extends RpcController {
sendRtpItem.setLocalIp(hookData.getMediaServer().getSdpIp());
sendRtpItem.setServerId(userSetting.getServerId());
redisTemplate.opsForValue().set(sendRtpItem.getChannelId(), sendRtpItem);
redisTemplate.opsForValue().set(sendRtpItem.getChannelId() + "", sendRtpItem);
RedisRpcResponse response = request.getResponse();
response.setBody(sendRtpItem.getChannelId());
response.setStatusCode(ErrorCode.SUCCESS.getCode());

View File

@ -47,7 +47,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
private SSRCFactory ssrcFactory;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private IMediaServerService mediaServerService;

View File

@ -7,7 +7,6 @@ import com.genersoft.iot.vmp.common.SystemAllInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
@ -19,6 +18,7 @@ import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.JsonUtil;
import com.genersoft.iot.vmp.utils.SystemInfoUtils;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jspecify.annotations.NonNull;
import org.springframework.beans.factory.annotation.Autowired;
@ -31,29 +31,21 @@ import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.*;
@SuppressWarnings("rawtypes")
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Autowired
private DeviceChannelMapper deviceChannelMapper;
private final DeviceMapper deviceMapper;
@Autowired
private DeviceMapper deviceMapper;
private final UserSetting userSetting;
@Autowired
private UserSetting userSetting;
private final RedisTemplate<String, Object> redisTemplate;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private final RedisTemplate<String, Long> longRedisTemplate;
@Autowired
private RedisTemplate<String, Long> longRedisTemplate;
@Autowired
private StringRedisTemplate stringRedisTemplate;
private final StringRedisTemplate stringRedisTemplate;
@Override
public List<SendRtpInfo> queryAllSendRTPServer() {
@ -126,7 +118,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type.toUpperCase() + "_*_*_" + mediaServerId;
List<Object> streams = RedisUtil.scan(redisTemplate, key);
for (Object stream : streams) {
redisTemplate.delete(stream);
redisTemplate.delete((String) stream);
}
}
@ -268,7 +260,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
MediaInfo result = null;
List<Object> keys = RedisUtil.scan(redisTemplate, scanKey);
if (keys.size() > 0) {
if (!keys.isEmpty()) {
String key = (String) keys.get(0);
String mediaInfoJson = (String)redisTemplate.opsForValue().get(key);
result = JSON.parseObject(mediaInfoJson, MediaInfo.class);
@ -283,7 +275,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
MediaInfo result = null;
List<Object> keys = RedisUtil.scan(redisTemplate, scanKey);
if (keys.size() > 0) {
if (!keys.isEmpty()) {
String key = (String) keys.get(0);
String mediaInfoJson = (String)redisTemplate.opsForValue().get(key);
result = JSON.parseObject(mediaInfoJson, MediaInfo.class);
@ -423,7 +415,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
}
msg.append(" ").append(online? "ON":"OFF");
log.info("[redis通知] 推送设备/通道状态-> {} ", msg);
// 使用 RedisTemplate<Object, Object> 发送字符串消息会导致发送的消息多带了双引号
// 使用 RedisTemplate<String, Object> 发送字符串消息会导致发送的消息多带了双引号
stringRedisTemplate.convertAndSend(key, msg.toString());
}
@ -439,7 +431,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
}
msg.append(" ").append(add? "ADD":"DELETE");
log.info("[redis通知] 推送通道-> {}", msg);
// 使用 RedisTemplate<Object, Object> 发送字符串消息会导致发送的消息多带了双引号
// 使用 RedisTemplate<String, Object> 发送字符串消息会导致发送的消息多带了双引号
stringRedisTemplate.convertAndSend(key, msg.toString());
}

View File

@ -24,7 +24,7 @@ public final class JsonUtil {
* @param <T>
* @return result type
*/
public static <T> T redisJsonToObject(RedisTemplate<Object, Object> redisTemplate, String key, Class<T> clazz) {
public static <T> T redisJsonToObject(RedisTemplate<String, Object> redisTemplate, String key, Class<T> clazz) {
Object jsonObject = redisTemplate.opsForValue().get(key);
if (Objects.isNull(jsonObject)) {
return null;
@ -32,7 +32,7 @@ public final class JsonUtil {
return clazz.cast(jsonObject);
}
public static <T> T redisHashJsonToObject(RedisTemplate<Object, Object> redisTemplate, String key, String objKey, Class<T> clazz) {
public static <T> T redisHashJsonToObject(RedisTemplate<String, Object> redisTemplate, String key, String objKey, Class<T> clazz) {
// if (key == null || objKey == null) {
// return null;
// }

View File

@ -25,7 +25,7 @@ public class TestController {
private HookSubscribe subscribe;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@GetMapping("/hook/list")
public List<Hook> all(){

View File

@ -62,7 +62,7 @@ public class PsController {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@GetMapping(value = "/receive/open")
@ -172,7 +172,7 @@ public class PsController {
if (!scan.isEmpty()) {
for (Object key : scan) {
// 将信息写入redis中以备后用
redisTemplate.delete(key);
redisTemplate.delete((String) key);
}
}
}

View File

@ -62,7 +62,7 @@ public class RtpController {
private DynamicTask dynamicTask;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@GetMapping(value = "/receive/open")
@ -185,7 +185,7 @@ public class RtpController {
if (scan.size() > 0) {
for (Object key : scan) {
// 将信息写入redis中以备后用
redisTemplate.delete(key);
redisTemplate.delete((String)key);
}
}
}

View File

@ -17,6 +17,7 @@ import com.genersoft.iot.vmp.gb28181.service.IGbChannelControlService;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.utils.Coordtransform;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.web.custom.bean.*;
import com.genersoft.iot.vmp.web.custom.conf.SyTokenManager;
@ -51,7 +52,7 @@ public class CameraChannelService implements CommandLineRunner {
private GroupMapper groupMapper;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RedisTemplate<String, String> redisTemplateForString;
@ -334,7 +335,7 @@ public class CameraChannelService implements CommandLineRunner {
// 发送redis消息
JSONObject jsonObject = new JSONObject();
jsonObject.put("gpsDate", mobilePosition.getTime());
jsonObject.put("gpsDate", DateUtil.timestampMsTo_yyyy_MM_dd_HH_mm_ss(mobilePosition.getTimestamp()));
jsonObject.put("unicodeNo", member.getUnicodeNo());
jsonObject.put("memberNo", member.getNo());
jsonObject.put("unitNo", member.getUnitNo());

View File

@ -23,7 +23,7 @@ import java.util.List;
public class SyServiceImpl implements IMapService {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Override
public List<MapConfig> getConfig() {