Merge branch 'dev/移动位置重构'

This commit is contained in:
lin 2026-05-15 10:39:35 +08:00
commit 562df5f739
72 changed files with 976 additions and 728 deletions

View File

@ -192,7 +192,7 @@ https://gitee.com/pan648540858/wvp-GB28181-pro.git
## 付费社群
<img src="doc/_media/shequ.png" width="50%" height="50%">
> 付费社群即可以对作者提供支持,也可以为大家更加快速的解决问题,对星球内容不满意,三天之内退出支持自动退款。如果暂时无法加入,给项目点个星也是极大的鼓励。
> 付费社群即可以对作者提供支持,也可以为大家更加快速的解决问题,还为正式加入星球的用户提供了微信群。对星球内容不满意,三天之内退出支持自动退款。如果暂时无法加入,给项目点个星也是极大的鼓励。
[知识星球](https://t.zsxq.com/0d8VAD3Dm)专栏列表:
- [WVP 部署安全加固指南:新手必看,防范攻击与漏洞](https://articles.zsxq.com/id_tv8wz4uubx2n.html)

View File

@ -21,7 +21,7 @@ public class UserManager implements org.apache.ftpserver.ftplet.UserManager {
private static final String PREFIX = "VMP_FTP_USER_";
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Override

View File

@ -38,7 +38,7 @@ public class RedisRpcConfig implements MessageListener {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
private ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();

View File

@ -13,8 +13,8 @@ import org.springframework.data.redis.serializer.StringRedisSerializer;
public class RedisTemplateConfig {
@Bean
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>();
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
// 使用fastJson序列化
GenericFastJsonRedisSerializer fastJsonRedisSerializer = new GenericFastJsonRedisSerializer();
// value值的序列化采用fastJsonRedisSerializer

View File

@ -191,6 +191,8 @@ public class DeviceChannel extends CommonGBChannel {
@Schema(description = "通道类型, 默认0, 0 普通通道1 行政区划 2 业务分组/虚拟组织")
private int channelType;
private String dbKey;
private Integer dataType = ChannelDataType.GB28181;
public void setPtzType(int ptzType) {

View File

@ -0,0 +1,88 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.utils.DateUtil;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.List;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
/**
* 国标设备移动位置
*/
@Slf4j
@Getter
@Setter
public class DeviceMobilePosition extends MobilePosition{
/**
* 通道数据库自增Id
*/
private String channelDeviceId;
private Device device;
public static List<DeviceMobilePosition> decode(Device device, Element rootElementAfterCharset) {
List<DeviceMobilePosition> mobilePositions = new ArrayList<>();
DeviceMobilePosition mobilePosition = new DeviceMobilePosition();
mobilePosition.setCreateTime(DateUtil.getNow());
mobilePosition.setDevice(device);
String channelId = getText(rootElementAfterCharset, "DeviceID");
mobilePosition.setChannelDeviceId(channelId);
String time = getText(rootElementAfterCharset, "Time");
if (ObjectUtils.isEmpty(time)){
mobilePosition.setTimestamp(System.currentTimeMillis());
}else {
Long timestamp = SipUtils.parseTimeForTimestamp(time);
if(timestamp == null) {
log.warn("解析移动位置时间失败:{} 使用当前时间", time);
mobilePosition.setTimestamp(System.currentTimeMillis());
}else {
mobilePosition.setTimestamp(timestamp);
}
}
mobilePosition.setLongitude(Double.parseDouble(getText(rootElementAfterCharset, "Longitude")));
mobilePosition.setLatitude(Double.parseDouble(getText(rootElementAfterCharset, "Latitude")));
if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Speed"))) {
mobilePosition.setSpeed(Double.parseDouble(getText(rootElementAfterCharset, "Speed")));
} else {
mobilePosition.setSpeed(0.0);
}
if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Direction"))) {
mobilePosition.setDirection(Double.parseDouble(getText(rootElementAfterCharset, "Direction")));
} else {
mobilePosition.setDirection(0.0);
}
if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Altitude"))) {
mobilePosition.setAltitude(Double.parseDouble(getText(rootElementAfterCharset, "Altitude")));
} else {
mobilePosition.setAltitude(0.0);
}
mobilePositions.add(mobilePosition);
return mobilePositions;
}
@Override
public String toString() {
return "DeviceMobilePosition{" +
"channelDeviceId='" + channelDeviceId + '\'' +
", deviceId='" + device.getDeviceId() + '\'' +
"} " + super.toString();
}
}

View File

@ -1,6 +1,17 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.utils.DateUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.List;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
/**
* @description: 移动位置bean
@ -8,15 +19,12 @@ import lombok.Data;
* @date: 2021年1月23日
*/
@Slf4j
@Data
public class MobilePosition {
/**
* 设备Id
*/
private String deviceId;
/**
* 通道Id
* 通道数据库自增Id
*/
private Integer channelId;
@ -25,15 +33,10 @@ public class MobilePosition {
*/
private String channelDeviceId;
/**
* 设备名称
*/
private String deviceName;
/**
* 通知时间
*/
private String time;
private long timestamp;
/**
* 经度
@ -60,29 +63,66 @@ public class MobilePosition {
*/
private double direction;
/**
* 位置信息上报来源Mobile PositionGPS Alarm
*/
private String reportSource;
/**
* 创建时间
*/
private String createTime;
public static List<MobilePosition> decode(Element rootElementAfterCharset) {
List<MobilePosition> mobilePositions = new ArrayList<>();
MobilePosition mobilePosition = new MobilePosition();
mobilePosition.setCreateTime(DateUtil.getNow());
String channelId = getText(rootElementAfterCharset, "DeviceID");
mobilePosition.setChannelDeviceId(channelId);
String time = getText(rootElementAfterCharset, "Time");
if (ObjectUtils.isEmpty(time)){
mobilePosition.setTimestamp(System.currentTimeMillis());
}else {
Long timestamp = SipUtils.parseTimeForTimestamp(time);
if(timestamp == null) {
log.warn("解析移动位置时间失败:{} 使用当前时间", time);
mobilePosition.setTimestamp(System.currentTimeMillis());
}else {
mobilePosition.setTimestamp(timestamp);
}
}
mobilePosition.setLongitude(Double.parseDouble(getText(rootElementAfterCharset, "Longitude")));
mobilePosition.setLatitude(Double.parseDouble(getText(rootElementAfterCharset, "Latitude")));
if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Speed"))) {
mobilePosition.setSpeed(Double.parseDouble(getText(rootElementAfterCharset, "Speed")));
} else {
mobilePosition.setSpeed(0.0);
}
if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Direction"))) {
mobilePosition.setDirection(Double.parseDouble(getText(rootElementAfterCharset, "Direction")));
} else {
mobilePosition.setDirection(0.0);
}
if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Altitude"))) {
mobilePosition.setAltitude(Double.parseDouble(getText(rootElementAfterCharset, "Altitude")));
} else {
mobilePosition.setAltitude(0.0);
}
mobilePositions.add(mobilePosition);
return mobilePositions;
}
@Override
public String toString() {
return "MobilePosition{" +
"deviceId='" + deviceId + '\'' +
", channelId=" + channelId +
", channelDeviceId='" + channelDeviceId + '\'' +
", deviceName='" + deviceName + '\'' +
", time='" + time + '\'' +
", longitude=" + longitude +
", latitude=" + latitude +
", altitude=" + altitude +
", speed=" + speed +
", direction=" + direction +
", reportSource='" + reportSource + '\'' +
", createTime='" + createTime + '\'' +
'}';
}

View File

@ -0,0 +1,17 @@
package com.genersoft.iot.vmp.gb28181.bean;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import lombok.EqualsAndHashCode;
@EqualsAndHashCode(callSuper = true)
@Data
@Schema(description = "国标共享通道")
public class ShareGBChannel extends CommonGBChannel{
@Schema(description = "平台ID")
private int platformId;
}

View File

@ -4,12 +4,15 @@ import com.genersoft.iot.vmp.conf.DynamicTask;
import com.genersoft.iot.vmp.conf.UserSetting;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author lin
@ -25,7 +28,7 @@ public class SubscribeHolder {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
private final String prefix = "VMP_SUBSCRIBE_OVERDUE";
@ -106,15 +109,28 @@ public class SubscribeHolder {
return result;
}
public List<String> getAllMobilePositionSubscribePlatform(List<Platform> platformList) {
public Map<Integer, Platform> getAllMobilePositionSubscribePlatform(List<Platform> platformList) {
if (platformList == null || platformList.isEmpty()) {
return new ArrayList<>();
return new HashMap<>();
}
List<String> result = new ArrayList<>();
for (Platform platform : platformList) {
String key = String.format("%s:%s:%s", prefix, "mobilePosition", platform.getServerGBId());
if (redisTemplate.hasKey(key)) {
result.add(platform.getServerGBId());
Map<Integer, Platform> result = new HashMap<>();
// 1. 先批量构建所有 key
List<String> keys = platformList.stream()
.map(platform -> String.format("%s:%s:%s", prefix, "mobilePosition", platform.getServerGBId()))
.toList();
// 2. 批量查询 Redis 关键只发1次请求
List<Object> results = redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
for (String key : keys) {
// 注意这里使用的是底层 connection 接口
connection.keyCommands().exists(key.getBytes());
}
return null; // 流水线模式下必须返回 null
});
for (int i = 0; i < results.size(); i++) {
if (results.get(i) instanceof Boolean exists && exists) {
result.put(platformList.get(i).getId(), platformList.get(i));
}
}
return result;

View File

@ -48,7 +48,7 @@ import java.util.concurrent.TimeUnit;
public class ChannelController {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private IGbChannelService channelService;

View File

@ -48,21 +48,12 @@ public class MobilePositionController {
private IDeviceService deviceService;
/**
* 查询历史轨迹
* @param deviceId 设备ID
* @param start 开始时间
* @param end 结束时间
* @return
*/
@Operation(summary = "查询历史轨迹", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@Parameter(name = "channelId", description = "通道国标编号")
@Parameter(name = "channelId", description = "通道的数据库ID")
@Parameter(name = "start", description = "开始时间")
@Parameter(name = "end", description = "结束时间")
@GetMapping("/history/{deviceId}")
public List<MobilePosition> positions(@PathVariable String deviceId,
@RequestParam(required = false) String channelId,
public List<MobilePosition> positions( Integer channelId,
@RequestParam(required = false) String start,
@RequestParam(required = false) String end) {
@ -72,19 +63,14 @@ public class MobilePositionController {
if (StringUtil.isEmpty(end)) {
end = null;
}
return mobilePositionService.queryMobilePositions(deviceId, channelId, start, end);
return mobilePositionService.queryMobilePositions(channelId, start, end);
}
/**
* 查询设备最新位置
* @param deviceId 设备ID
* @return
*/
@Operation(summary = "查询设备最新位置", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "deviceId", description = "设备国标编号", required = true)
@GetMapping("/latest/{deviceId}")
public MobilePosition latestPosition(@PathVariable String deviceId) {
return mobilePositionService.queryLatestPosition(deviceId);
@Operation(summary = "查询通道最新位置", security = @SecurityRequirement(name = JwtUtils.HEADER))
@Parameter(name = "channelId", description = "通道的数据库ID", required = true)
@GetMapping("/latest")
public MobilePosition latestPosition(Integer channelId) {
return mobilePositionService.queryLatestPosition(channelId);
}
/**

View File

@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.dao;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.DeviceMobilePosition;
import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce;
import com.genersoft.iot.vmp.gb28181.dao.provider.DeviceChannelProvider;
import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend;
@ -9,6 +10,7 @@ import org.apache.ibatis.annotations.*;
import org.springframework.stereotype.Repository;
import java.util.List;
import java.util.Map;
/**
* 用于存储设备通道信息
@ -617,4 +619,63 @@ public interface DeviceChannelMapper {
void offlineByDeviceIds(List<Device> deviceList);
@Select(value = {" <script>" +
" SELECT " +
" CONCAT(data_device_id, '_', device_id) as dbKey, \n" +
" id,\n" +
" data_device_id,\n" +
" create_time,\n" +
" update_time,\n" +
" sub_count,\n" +
" stream_id,\n" +
" has_audio,\n" +
" gps_time,\n" +
" stream_identification,\n" +
" channel_type,\n" +
" device_id,\n" +
" name,\n" +
" manufacturer,\n" +
" model,\n" +
" owner,\n" +
" civil_code,\n" +
" block,\n" +
" address,\n" +
" parental,\n" +
" parent_id,\n" +
" safety_way,\n" +
" register_way,\n" +
" cert_num,\n" +
" certifiable,\n" +
" err_code,\n" +
" end_time,\n" +
" secrecy,\n" +
" ip_address,\n" +
" port,\n" +
" password,\n" +
" status,\n" +
" longitude,\n" +
" latitude,\n" +
" gb_longitude,\n" +
" gb_latitude,\n" +
" ptz_type,\n" +
" position_type,\n" +
" room_type,\n" +
" use_type,\n" +
" supply_light_type,\n" +
" direction_type,\n" +
" resolution,\n" +
" business_group_id,\n" +
" download_speed,\n" +
" svc_space_support_mod,\n" +
" svc_time_support_mode\n" +
" from wvp_device_channel " +
" where data_type = 1 " +
" and data_device_id = #{deviceId} " +
" and device_id in " +
"<foreach item='item' index='index' collection='mobilePositionList' open='(' separator=',' close=')'>" +
" #{item.channelDeviceId} " +
"</foreach>" +
"</script>"})
@MapKey("dbKey")
Map<String, DeviceChannel> getAllForMobilePosition(@Param("deviceId") int deviceId, List<DeviceMobilePosition> mobilePositionList);
}

View File

@ -1,49 +0,0 @@
package com.genersoft.iot.vmp.gb28181.dao;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import java.util.List;
@Mapper
public interface DeviceMobilePositionMapper {
@Insert("INSERT INTO wvp_device_mobile_position (device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source,create_time)"+
"VALUES (#{deviceId}, #{channelId}, #{deviceName}, #{time}, #{longitude}, #{latitude}, #{altitude}, #{speed}, #{direction}, #{reportSource}, #{createTime})")
int insertNewPosition(MobilePosition mobilePosition);
@Select(value = {" <script>" +
"SELECT * FROM wvp_device_mobile_position" +
" WHERE device_id = #{deviceId}" +
"<if test=\"channelId != null\"> and channel_id = #{channelId}</if>" +
"<if test=\"startTime != null\"> AND time&gt;=#{startTime}</if>" +
"<if test=\"endTime != null\"> AND time&lt;=#{endTime}</if>" +
" ORDER BY time ASC" +
" </script>"})
List<MobilePosition> queryPositionByDeviceIdAndTime(@Param("deviceId") String deviceId, @Param("channelId") String channelId, @Param("startTime") String startTime, @Param("endTime") String endTime);
@Select("SELECT * FROM wvp_device_mobile_position WHERE device_id = #{deviceId}" +
" ORDER BY time DESC LIMIT 1")
MobilePosition queryLatestPositionByDevice(String deviceId);
@Delete("DELETE FROM wvp_device_mobile_position WHERE device_id = #{deviceId}")
int clearMobilePositionsByDeviceId(String deviceId);
@Insert("<script> " +
"<foreach collection='mobilePositions' index='index' item='item' separator=';'> " +
"insert into wvp_device_mobile_position " +
"(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
"create_time)"+
"values " +
"(#{item.deviceId}, #{item.channelId}, #{item.deviceName}, #{item.time}, #{item.longitude}, " +
"#{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
"#{item.reportSource}, #{item.createTime}) " +
"</foreach> " +
"</script>")
void batchadd(List<MobilePosition> mobilePositions);
}

View File

@ -0,0 +1,43 @@
package com.genersoft.iot.vmp.gb28181.dao;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import java.util.List;
@Mapper
public interface MobilePositionMapper {
@Insert("INSERT INTO wvp_mobile_position (channel_id, timestamp, longitude, latitude, altitude, speed, direction, create_time)"+
"VALUES (#{channelId}, #{timestamp}, #{longitude}, #{latitude}, #{altitude}, #{speed}, #{direction}, #{createTime})")
int insertNewPosition(MobilePosition mobilePosition);
@Select(value = {" <script>" +
"SELECT * FROM wvp_mobile_position" +
" WHERE channel_id = #{channelId}" +
"<if test=\"startTime != null\"> AND timestamp&gt;=#{startTime}</if>" +
"<if test=\"endTime != null\"> AND timestamp&lt;=#{endTime}</if>" +
" ORDER BY time ASC" +
" </script>"})
List<MobilePosition> queryPositionByDeviceIdAndTime(@Param("channelId") Integer channelId, @Param("startTime") Long startTime, @Param("endTime") Long endTime);
@Select("SELECT * FROM wvp_mobile_position WHERE channel_id = #{channelId}" +
" ORDER BY timestamp DESC LIMIT 1")
MobilePosition queryLatestPosition(@Param("channelId") Integer channelId);
@Insert("<script> " +
"<foreach collection='mobilePositions' index='index' item='item' separator=';'> " +
"insert into wvp_mobile_position " +
"(channel_id, timestamp,longitude,latitude,altitude,speed,direction," +
"create_time)"+
"values " +
"( #{item.channelId}, #{item.timestamp}, #{item.longitude}, " +
" #{item.latitude}, #{item.altitude}, #{item.speed},#{item.direction}," +
" #{item.createTime}) " +
"</foreach> " +
"</script>")
void batchAdd(List<MobilePosition> mobilePositions);
}

View File

@ -548,4 +548,55 @@ public interface PlatformChannelMapper {
" order by wcr.id DESC" +
" </script>")
Set<Region> queryShareRegion(Integer id);
@Select("<script>" +
" select " +
" wpgc.platform_id as platform_id" +
" wdc.id as gb_id,\n" +
" wdc.data_type,\n" +
" wdc.data_device_id,\n" +
" wdc.create_time,\n" +
" wdc.update_time,\n" +
" coalesce(wpgc.custom_device_id, wdc.gb_device_id, wdc.device_id) as gb_device_id,\n" +
" coalesce(wpgc.custom_name, wdc.gb_name, wdc.name) as gb_name,\n" +
" coalesce(wpgc.custom_manufacturer, wdc.gb_manufacturer, wdc.manufacturer) as gb_manufacturer,\n" +
" coalesce(wpgc.custom_model, wdc.gb_model, wdc.model) as gb_model,\n" +
" coalesce(wpgc.custom_owner, wdc.gb_owner, wdc.owner) as gb_owner,\n" +
" coalesce(wpgc.custom_civil_code, wdc.gb_civil_code, wdc.civil_code) as gb_civil_code,\n" +
" coalesce(wpgc.custom_block, wdc.gb_block, wdc.block) as gb_block,\n" +
" coalesce(wpgc.custom_address, wdc.gb_address, wdc.address) as gb_address,\n" +
" coalesce(wpgc.custom_parental, wdc.gb_parental, wdc.parental) as gb_parental,\n" +
" coalesce(wpgc.custom_parent_id, wdc.gb_parent_id, wdc.parent_id) as gb_parent_id,\n" +
" coalesce(wpgc.custom_safety_way, wdc.gb_safety_way, wdc.safety_way) as gb_safety_way,\n" +
" coalesce(wpgc.custom_register_way, wdc.gb_register_way, wdc.register_way) as gb_register_way,\n" +
" coalesce(wpgc.custom_cert_num, wdc.gb_cert_num, wdc.cert_num) as gb_cert_num,\n" +
" coalesce(wpgc.custom_certifiable, wdc.gb_certifiable, wdc.certifiable) as gb_certifiable,\n" +
" coalesce(wpgc.custom_err_code, wdc.gb_err_code, wdc.err_code) as gb_err_code,\n" +
" coalesce(wpgc.custom_end_time, wdc.gb_end_time, wdc.end_time) as gb_end_time,\n" +
" coalesce(wpgc.custom_secrecy, wdc.gb_secrecy, wdc.secrecy) as gb_secrecy,\n" +
" coalesce(wpgc.custom_ip_address, wdc.gb_ip_address, wdc.ip_address) as gb_ip_address,\n" +
" coalesce(wpgc.custom_port, wdc.gb_port, wdc.port) as gb_port,\n" +
" coalesce(wpgc.custom_password, wdc.gb_password, wdc.password) as gb_password,\n" +
" coalesce(wpgc.custom_status, wdc.gb_status, wdc.status) as gb_status,\n" +
" coalesce(wpgc.custom_longitude, wdc.gb_longitude, wdc.longitude) as gb_longitude,\n" +
" coalesce(wpgc.custom_latitude, wdc.gb_latitude, wdc.latitude) as gb_latitude,\n" +
" coalesce(wpgc.custom_ptz_type, wdc.gb_ptz_type, wdc.ptz_type) as gb_ptz_type,\n" +
" coalesce(wpgc.custom_position_type, wdc.gb_position_type, wdc.position_type) as gb_position_type,\n" +
" coalesce(wpgc.custom_room_type, wdc.gb_room_type, wdc.room_type) as gb_room_type,\n" +
" coalesce(wpgc.custom_use_type, wdc.gb_use_type, wdc.use_type) as gb_use_type,\n" +
" coalesce(wpgc.custom_supply_light_type, wdc.gb_supply_light_type, wdc.supply_light_type) as gb_supply_light_type,\n" +
" coalesce(wpgc.custom_direction_type, wdc.gb_direction_type, wdc.direction_type) as gb_direction_type,\n" +
" coalesce(wpgc.custom_resolution, wdc.gb_resolution, wdc.resolution) as gb_resolution,\n" +
" coalesce(wpgc.custom_business_group_id, wdc.gb_business_group_id, wdc.business_group_id) as gb_business_group_id,\n" +
" coalesce(wpgc.custom_download_speed, wdc.gb_download_speed, wdc.download_speed) as gb_download_speed,\n" +
" coalesce(wpgc.custom_svc_space_support_mod, wdc.gb_svc_space_support_mod, wdc.svc_space_support_mod) as gb_svc_space_support_mod,\n" +
" coalesce(wpgc.custom_svc_time_support_mode, wdc.gb_svc_time_support_mode, wdc.svc_time_support_mode) as gb_svc_time_support_mode\n" +
" from wvp_device_channel wdc" +
" left join wvp_platform_channel wpgc on wdc.id = wpgc.device_channel_id" +
" where wdc.channel_type = 0 and wpgc.platform_id in" +
"<foreach collection='platforms' item='item' open='(' separator=',' close=')' > #{item.id} </foreach>" +
" and wdc.id in " +
"<foreach collection='channelIds' item='item' open='(' separator=',' close=')' > #{item} </foreach> " +
"</script>")
List<ShareGBChannel> queryShareChannelInPlatformsAndChannelIds(Collection<Platform> platforms, Collection<Integer> channelIds);
}

View File

@ -1,7 +1,10 @@
package com.genersoft.iot.vmp.gb28181.event;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarmNotify;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.event.alarm.DeviceAlarmEvent;
import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent;
import com.genersoft.iot.vmp.gb28181.event.device.DeviceOfflineEvent;
@ -112,7 +115,13 @@ public class EventPublisher {
public void mobilePositionEventPublish(MobilePosition mobilePosition) {
MobilePositionEvent event = new MobilePositionEvent(this);
event.setMobilePosition(mobilePosition);
event.setMobilePositionList(List.of(mobilePosition));
applicationEventPublisher.publishEvent(event);
}
public void mobilePositionsEventPublish(List<? extends MobilePosition> mobilePositionList) {
MobilePositionEvent event = new MobilePositionEvent(this);
event.setMobilePositionList(mobilePositionList);
applicationEventPublisher.publishEvent(event);
}
@ -122,4 +131,6 @@ public class EventPublisher {
event.setDeviceIds(deviceIds);
applicationEventPublisher.publishEvent(event);
}
}

View File

@ -5,6 +5,8 @@ import lombok.Getter;
import lombok.Setter;
import org.springframework.context.ApplicationEvent;
import java.util.List;
public class MobilePositionEvent extends ApplicationEvent {
public MobilePositionEvent(Object source) {
@ -13,5 +15,5 @@ public class MobilePositionEvent extends ApplicationEvent {
@Getter
@Setter
private MobilePosition mobilePosition;
private List<? extends MobilePosition> mobilePositionList;
}

View File

@ -1,75 +0,0 @@
package com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeHolder;
import com.genersoft.iot.vmp.gb28181.bean.SubscribeInfo;
import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService;
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.impl.SIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.List;
/**
* 移动位置通知消息转发
*/
@Slf4j
@Component
public class MobilePositionEventLister implements ApplicationListener<MobilePositionEvent> {
@Autowired
private IPlatformService platformService;
@Autowired
private IPlatformChannelService platformChannelService;
@Autowired
private SIPCommanderForPlatform sipCommanderForPlatform;
@Autowired
private SubscribeHolder subscribeHolder;
@Autowired
private UserSetting userSetting;
@Override
public void onApplicationEvent(MobilePositionEvent event) {
if (event.getMobilePosition().getChannelId() == 0) {
return;
}
List<Platform> allPlatforms = platformService.queryAll(userSetting.getServerId());
// 获取所用订阅
List<String> platforms = subscribeHolder.getAllMobilePositionSubscribePlatform(allPlatforms);
if (platforms.isEmpty()) {
return;
}
List<Platform> platformsForGB = platformChannelService.queryPlatFormListByChannelDeviceId(event.getMobilePosition().getChannelId(), platforms);
for (Platform platform : platformsForGB) {
if (log.isDebugEnabled()){
log.debug("[向上级发送MobilePosition] 通道:{},平台:{} 位置: {}:{}", event.getMobilePosition().getChannelId(),
platform.getServerGBId(), event.getMobilePosition().getLongitude(), event.getMobilePosition().getLatitude());
}
SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId());
try {
GPSMsgInfo gpsMsgInfo = GPSMsgInfo.getInstance(event.getMobilePosition());
// 获取通道编号
CommonGBChannel commonGBChannel = platformChannelService.queryChannelByPlatformIdAndChannelId(platform.getId(), event.getMobilePosition().getChannelId());
sipCommanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, commonGBChannel,
subscribe);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
IllegalAccessException e) {
log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
}
}
}
}

View File

@ -9,7 +9,9 @@ import com.github.pagehelper.PageInfo;
import jakarta.validation.constraints.NotNull;
import org.dom4j.Element;
import java.util.Collection;
import java.util.List;
import java.util.Map;
/**
* 国标通道业务类
@ -96,4 +98,7 @@ public interface IDeviceChannelService {
void queryRecordInfo(CommonGBChannel channel, String startTime, String endTime, ErrorCallback<RecordInfo> object);
Map<String, DeviceChannel> getAllForMobilePosition(List<DeviceMobilePosition> mobilePositionList);
void asyncBatchChannelPosition(Collection<DeviceChannel> channels);
}

View File

@ -50,4 +50,6 @@ public interface IPlatformChannelService {
void checkRegionRemove(List<CommonGBChannel> channelList, List<Region> regionList);
List<Platform> queryByPlatformBySharChannelId(String gbId);
void notifyMobilePosition(List<MobilePosition> handlerCatchDataList);
}

View File

@ -1,5 +1,9 @@
package com.genersoft.iot.vmp.gb28181.service;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import java.util.List;
/**
* 资源能力接入-其他
*/
@ -7,4 +11,7 @@ public interface ISourceOtherService {
Boolean closeStreamOnNoneReader(String mediaServerId, String app, String stream, String schema);
Boolean addChannelIdForMobilePosition(List<? extends MobilePosition> mobilePositionList);
}

View File

@ -9,7 +9,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMobilePositionMapper;
import com.genersoft.iot.vmp.gb28181.dao.MobilePositionMapper;
import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEndEvent;
@ -19,7 +19,6 @@ import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.service.bean.Alarm;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
@ -34,6 +33,7 @@ import jakarta.validation.constraints.NotNull;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.Element;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -75,7 +75,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
private DeviceMapper deviceMapper;
@Autowired
private DeviceMobilePositionMapper deviceMobilePositionMapper;
private MobilePositionMapper deviceMobilePositionMapper;
@Autowired
private UserSetting userSetting;
@ -99,7 +99,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
* 监听录像查询结束事件
*/
@Async
@org.springframework.context.event.EventListener
@EventListener
public void onApplicationEvent(RecordInfoEndEvent event) {
SynchronousQueue<RecordInfo> queue = topicSubscribers.get("record" + event.getRecordInfo().getSn());
if (queue != null) {
@ -107,8 +107,6 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
}
}
@Autowired
private ISIPCommander cmder;
@Override
@ -293,7 +291,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
deviceChannel.getName(), deviceChannel.getDeviceId());
String cmdString = getText(rootElement, type.getVal());
try {
cmder.fronEndCmd(device, deviceChannel.getDeviceId(), cmdString, errorResult->{
commander.fronEndCmd(device, deviceChannel.getDeviceId(), cmdString, errorResult->{
callback.run(errorResult.statusCode, errorResult.msg, null);
}, errorResult->{
callback.run(errorResult.statusCode, errorResult.msg, null);
@ -660,4 +658,25 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
queryRecordInfo(device, deviceChannel, startTime, endTime, callback);
}
@Override
public Map<String, DeviceChannel> getAllForMobilePosition(List<DeviceMobilePosition> mobilePositionList) {
return channelMapper.getAllForMobilePosition(mobilePositionList.get(0).getDevice().getId(), mobilePositionList);
}
@Override
@Async
@Transactional
public void asyncBatchChannelPosition(Collection<DeviceChannel> channels) {
// 批量更新通道位置信息
int limitCount = 500;
List<DeviceChannel> channelList = new ArrayList<>(channels);
if (!channelList.isEmpty()) {
for (int i = 0; i < channelList.size(); i += limitCount) {
int end = Math.min(i + limitCount, channelList.size());
List<DeviceChannel> batchList = channelList.subList(i, end);
channelMapper.batchUpdatePosition(batchList);
}
}
}
}

View File

@ -74,7 +74,7 @@ public class GbChannelServiceImpl implements IGbChannelService {
private DynamicTask dynamicTask;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private VectorTileCatch vectorTileCatch;
@ -212,12 +212,10 @@ public class GbChannelServiceImpl implements IGbChannelService {
if (newChannel.getGbLongitude() != null && !Objects.equals(oldChannel.getGbLongitude(), newChannel.getGbLongitude())
&& newChannel.getGbLatitude() != null && !Objects.equals(oldChannel.getGbLatitude(), newChannel.getGbLatitude())) {
MobilePosition mobilePosition = new MobilePosition();
mobilePosition.setDeviceId(newChannel.getGbDeviceId());
mobilePosition.setChannelId(newChannel.getGbId());
mobilePosition.setChannelDeviceId(newChannel.getGbDeviceId());
mobilePosition.setDeviceName(newChannel.getGbName());
mobilePosition.setCreateTime(DateUtil.getNow());
mobilePosition.setTime(DateUtil.getNow());
mobilePosition.setTimestamp(System.currentTimeMillis());
mobilePosition.setLongitude(newChannel.getGbLongitude());
mobilePosition.setLatitude(newChannel.getGbLatitude());
eventPublisher.mobilePositionEventPublish(mobilePosition);

View File

@ -43,7 +43,7 @@ public class GroupServiceImpl implements IGroupService {
private EventPublisher eventPublisher;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Override
public void add(Group group) {

View File

@ -33,7 +33,7 @@ public class InviteStreamServiceImpl implements IInviteStreamService {
private final Map<String, List<ErrorCallback<StreamInfo>>> inviteErrorCallbackMap = new ConcurrentHashMap<>();
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private UserSetting userSetting;

View File

@ -10,11 +10,12 @@ import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent;
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService;
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcService;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@ -24,6 +25,7 @@ import javax.sip.InvalidArgumentException;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/**
@ -31,38 +33,29 @@ import java.util.stream.Collectors;
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class PlatformChannelServiceImpl implements IPlatformChannelService {
@Autowired
private PlatformChannelMapper platformChannelMapper;
private final PlatformChannelMapper platformChannelMapper;
@Autowired
private EventPublisher eventPublisher;
private final EventPublisher eventPublisher;
@Autowired
private GroupMapper groupMapper;
private final GroupMapper groupMapper;
private final RegionMapper regionMapper;
@Autowired
private RegionMapper regionMapper;
private final CommonGBChannelMapper commonGBChannelMapper;
@Autowired
private CommonGBChannelMapper commonGBChannelMapper;
private final PlatformMapper platformMapper;
@Autowired
private PlatformMapper platformMapper;
private final ISIPCommanderForPlatform sipCommanderForPlatform;
@Autowired
private ISIPCommanderForPlatform sipCommanderFroPlatform;
private final SubscribeHolder subscribeHolder;
@Autowired
private SubscribeHolder subscribeHolder;
private final UserSetting userSetting;
@Autowired
private UserSetting userSetting;
private final IRedisRpcService redisRpcService;
@Autowired
private IRedisRpcService redisRpcService;
// 监听通道信息变化
@EventListener
@ -116,7 +109,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
deviceChannel.setGbDeviceId(serverGbId);
deviceChannelList.add(deviceChannel);
try {
sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getMessageType().name(), platform, deviceChannelList, subscribeInfo, null);
sipCommanderForPlatform.sendNotifyForCatalogOther(event.getMessageType().name(), platform, deviceChannelList, subscribeInfo, null);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
IllegalAccessException e) {
log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
@ -146,7 +139,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
CommonGBChannel deviceChannel = channelMap.get(gbId);
channelList.add(deviceChannel);
try {
sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(event.getMessageType().name(), platform, channelList, subscribeInfo, null);
sipCommanderForPlatform.sendNotifyForCatalogAddOrUpdate(event.getMessageType().name(), platform, channelList, subscribeInfo, null);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException |
SipException | IllegalAccessException e) {
log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
@ -185,7 +178,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
if (!channels.isEmpty()) {
log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), deviceIds);
try {
sipCommanderFroPlatform.sendNotifyForCatalogOther(event.getType(), platform, channels, subscribe, null);
sipCommanderForPlatform.sendNotifyForCatalogOther(event.getType(), platform, channels, subscribe, null);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
IllegalAccessException e) {
log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
@ -205,7 +198,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
if (!deviceChannelList.isEmpty()) {
log.info("[Catalog事件: {}]平台:{},影响通道{}", event.getType(), platform.getServerGBId(), deviceIds);
try {
sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(event.getType(), platform, deviceChannelList, subscribe, null);
sipCommanderForPlatform.sendNotifyForCatalogAddOrUpdate(event.getType(), platform, deviceChannelList, subscribe, null);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
IllegalAccessException e) {
log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
@ -646,7 +639,7 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
SubscribeInfo subscribeInfo = SubscribeInfo.buildSimulated(platform.getServerGBId(), platform.getServerIp());
try {
sipCommanderFroPlatform.sendNotifyForCatalogAddOrUpdate(CatalogEvent.ADD, platform, channelList, subscribeInfo, null);
sipCommanderForPlatform.sendNotifyForCatalogAddOrUpdate(CatalogEvent.ADD, platform, channelList, subscribeInfo, null);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException |
SipException | IllegalAccessException e) {
log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
@ -852,4 +845,52 @@ public class PlatformChannelServiceImpl implements IPlatformChannelService {
}
return platformChannelMapper.queryPlatFormListByChannelList(ids);
}
@Override
public void notifyMobilePosition(List<MobilePosition> mobilePositionList) {
List<Platform> allPlatforms = platformMapper.queryServerIdsWithEnableAndServer(userSetting.getServerId());
// 获取所用订阅
Map<Integer, Platform> platformMap = subscribeHolder.getAllMobilePositionSubscribePlatform(allPlatforms);
if (platformMap.isEmpty()) {
return;
}
// 对mobilePositionList内部的channelId分类
Map<Integer, List<MobilePosition>> channelIdMap = mobilePositionList.stream().collect(Collectors.groupingBy(MobilePosition::getChannelId));
List<ShareGBChannel> shareGBChannels = platformChannelMapper.queryShareChannelInPlatformsAndChannelIds(platformMap.values(), channelIdMap.keySet());
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (ShareGBChannel shareGBChannel : shareGBChannels) {
List<MobilePosition> mobilePositions = channelIdMap.get(shareGBChannel.getGbId());
if (mobilePositions == null || mobilePositions.isEmpty()) {
continue;
}
executor.submit(() -> {
Platform platform = platformMap.get(shareGBChannel.getPlatformId());
if (platform == null) {
log.info("[查询平台] 平台ID{} 未查询到", shareGBChannel.getPlatformId());
return;
}
SubscribeInfo subscribe = subscribeHolder.getMobilePositionSubscribe(platform.getServerGBId());
if (subscribe == null) {
log.info("[查询订阅] 平台:{} 未查询到移动位置订阅", platform.getServerGBId());
return;
}
for (MobilePosition mobilePosition : mobilePositions) {
try {
GPSMsgInfo gpsMsgInfo = GPSMsgInfo.getInstance(mobilePosition);
// 获取通道编号
CommonGBChannel commonGBChannel = queryChannelByPlatformIdAndChannelId(platform.getId(), mobilePosition.getChannelId());
sipCommanderForPlatform.sendNotifyMobilePosition(platform, gpsMsgInfo, commonGBChannel,
subscribe);
} catch (InvalidArgumentException | ParseException | NoSuchFieldException | SipException |
IllegalAccessException e) {
log.error("[命令发送失败] 国标级联 Catalog通知: {}", e.getMessage());
}
}
});
}
}
}
}

View File

@ -6,13 +6,22 @@ import com.genersoft.iot.vmp.common.InviteSessionType;
import com.genersoft.iot.vmp.common.enums.ChannelDataType;
import com.genersoft.iot.vmp.common.enums.MediaStreamUtil;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.DeviceMobilePosition;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
import com.genersoft.iot.vmp.gb28181.service.ISourceOtherService;
import com.genersoft.iot.vmp.utils.Coordtransform;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Slf4j
@Service(ChannelDataType.OTHER_SERVICE + ChannelDataType.GB28181)
@ -52,4 +61,46 @@ public class SourceOtherServiceForGbImpl implements ISourceOtherService {
}
return userSetting.getStreamOnDemand();
}
@Override
public Boolean addChannelIdForMobilePosition(List<? extends MobilePosition> mobilePositionList) {
if (CollectionUtils.isEmpty(mobilePositionList)) {
return false;
}
if (!(mobilePositionList.get(0) instanceof DeviceMobilePosition)) {
return null;
}
List<DeviceMobilePosition> deviceMobilePositionList = mobilePositionList.stream()
.map(DeviceMobilePosition.class::cast)
.collect(Collectors.toList());
Map<String, DeviceChannel> deviceChannelMap = deviceChannelService.getAllForMobilePosition(deviceMobilePositionList);
if (CollectionUtils.isEmpty(deviceChannelMap)) {
return false;
}
// 查询通道表为mobilePositionList赋值channelId
for (DeviceMobilePosition position : deviceMobilePositionList) {
if (position.getDevice() == null) {
continue;
}
String key = position.getDevice().getId() + "_" + position.getChannelDeviceId();
DeviceChannel deviceChannel = deviceChannelMap.get(key);
Device device = position.getDevice();
if (deviceChannel != null) {
position.setChannelId(deviceChannel.getId());
if (device.getGeoCoordSys().equalsIgnoreCase("GCJ02")) {
Double[] wgs84Position = Coordtransform.GCJ02ToWGS84(position.getLongitude(), position.getLatitude());
position.setLongitude(wgs84Position[0]);
position.setLatitude(wgs84Position[1]);
}
deviceChannel.setLongitude(position.getLongitude());
deviceChannel.setLatitude(position.getLatitude());
}
}
// 批量更新通道
deviceChannelService.asyncBatchChannelPosition(deviceChannelMap.values());
return true;
}
}

View File

@ -34,7 +34,7 @@ public class CatalogDataManager{
private IGroupService groupService;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
private final Map<String, CatalogData> dataMap = new ConcurrentHashMap<>();

View File

@ -20,7 +20,7 @@ public class SipInviteSessionManager {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
/**
* 添加一个点播/回放的事务信息

View File

@ -26,7 +26,7 @@ public class SubscribeTaskRunner{
private final DelayQueue<SubscribeTask> delayQueue = new DelayQueue<>();
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private UserSetting userSetting;

View File

@ -30,7 +30,7 @@ public class PlatformStatusTaskRunner {
private final DelayQueue<PlatformKeepaliveTask> keepaliveTaskDelayQueue = new DelayQueue<>();
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private UserSetting userSetting;

View File

@ -109,16 +109,14 @@ public class NotifyRequestForAlarm extends SIPRequestProcessorParent {
mobilePosition.setChannelId(deviceChannel.getId());
mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId());
mobilePosition.setCreateTime(DateUtil.getNow());
mobilePosition.setDeviceId(deviceAlarmNotify.getDeviceId());
mobilePosition.setTime(deviceAlarmNotify.getAlarmTime());
mobilePosition.setTimestamp(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestampMs(deviceAlarmNotify.getAlarmTime()));
mobilePosition.setLongitude(deviceAlarmNotify.getLongitude());
mobilePosition.setLatitude(deviceAlarmNotify.getLatitude());
mobilePosition.setReportSource("GPS Alarm");
// 更新device channel 的经纬度
deviceChannel.setLongitude(mobilePosition.getLongitude());
deviceChannel.setLatitude(mobilePosition.getLatitude());
deviceChannel.setGpsTime(mobilePosition.getTime());
deviceChannel.setGpsTime(deviceAlarmNotify.getAlarmTime());
deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
}

View File

@ -2,25 +2,18 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.DeviceMobilePosition;
import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.service.IMobilePositionService;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.utils.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.sip.RequestEvent;
import javax.sip.header.FromHeader;
@ -33,27 +26,18 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessorParent {
private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
@Autowired
private UserSetting userSetting;
private final UserSetting userSetting;
@Autowired
private EventPublisher eventPublisher;
private final EventPublisher eventPublisher;
@Autowired
private IRedisCatchStorage redisCatchStorage;
@Autowired
private IDeviceChannelService deviceChannelService;
@Autowired
private IMobilePositionService mobilePositionService;
private final IRedisCatchStorage redisCatchStorage;
public void process(RequestEvent evt) {
if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
log.error("[notify-移动位置] 待处理消息队列已满 {}返回486 BUSY_HERE消息不做处理", userSetting.getMaxNotifyCountQueue());
return;
@ -61,124 +45,56 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
taskQueue.offer(new HandlerCatchData(evt, null, null));
}
@Scheduled(fixedDelay = 200) //每200毫秒执行一次
@Scheduled(fixedDelay = 200)
@Async
public void executeTaskQueue() {
if (taskQueue.isEmpty()) {
return;
}
List<HandlerCatchData> handlerCatchDataList = new ArrayList<>();
while (!taskQueue.isEmpty()) {
handlerCatchDataList.add(taskQueue.poll());
int size = taskQueue.size();
for (int i = 0; i < size; i++) {
HandlerCatchData poll = taskQueue.poll();
if (poll != null) {
handlerCatchDataList.add(poll);
}
}
if (handlerCatchDataList.isEmpty()) {
return;
}
List<DeviceMobilePosition> mobilePositionList = new ArrayList<>();
for (HandlerCatchData take : handlerCatchDataList) {
if (take == null) {
continue;
}
RequestEvent evt = take.getEvt();
try {
FromHeader fromHeader = (FromHeader) evt.getRequest().getHeader(FromHeader.NAME);
String deviceId = SipUtils.getUserIdFromFromHeader(fromHeader);
long startTime = System.currentTimeMillis();
// 回复 200 OK
Element rootElement = getRootElement(evt);
if (rootElement == null) {
log.error("处理MobilePosition移动位置Notify时未获取到消息体,{}", evt.getRequest());
continue;
}
Device device = redisCatchStorage.getDevice(deviceId);
if (device == null) {
log.error("处理MobilePosition移动位置Notify时未获取到device,{}", deviceId);
log.error("[notify-移动位置] 未获取到device, {}", deviceId);
continue;
}
MobilePosition mobilePosition = new MobilePosition();
mobilePosition.setDeviceId(device.getDeviceId());
mobilePosition.setDeviceName(device.getName());
mobilePosition.setCreateTime(DateUtil.getNow());
DeviceChannel deviceChannel = null;
List<Element> elements = rootElement.elements();
readDocument: for (Element element : elements) {
switch (element.getName()){
case "DeviceID":
String channelId = element.getStringValue();
deviceChannel = deviceChannelService.getOne(device.getDeviceId(), channelId);
if (deviceChannel != null) {
mobilePosition.setChannelId(deviceChannel.getId());
mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId());
}else {
log.error("[notify-移动位置] 未找到通道 {}/{}", device.getDeviceId(), channelId);
break readDocument;
}
break;
case "Time":
String timeVal = element.getStringValue();
if (ObjectUtils.isEmpty(timeVal)) {
mobilePosition.setTime(DateUtil.getNow());
} else {
mobilePosition.setTime(SipUtils.parseTime(timeVal));
}
break;
case "Longitude":
mobilePosition.setLongitude(Double.parseDouble(element.getStringValue()));
break;
case "Latitude":
mobilePosition.setLatitude(Double.parseDouble(element.getStringValue()));
break;
case "Speed":
String speedVal = element.getStringValue();
if (NumericUtil.isDouble(speedVal)) {
mobilePosition.setSpeed(Double.parseDouble(speedVal));
} else {
mobilePosition.setSpeed(0.0);
}
break;
case "Direction":
String directionVal = element.getStringValue();
if (NumericUtil.isDouble(directionVal)) {
mobilePosition.setDirection(Double.parseDouble(directionVal));
} else {
mobilePosition.setDirection(0.0);
}
break;
case "Altitude":
String altitudeVal = element.getStringValue();
if (NumericUtil.isDouble(altitudeVal)) {
mobilePosition.setAltitude(Double.parseDouble(altitudeVal));
} else {
mobilePosition.setAltitude(0.0);
}
break;
}
}
if (deviceChannel == null) {
Element rootElement = getRootElement(evt, device.getCharset());
if (rootElement == null) {
log.warn("[notify-移动位置] {}处理失败,未识别到信息体", deviceId);
continue;
}
log.info("[收到移动位置订阅通知]{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelDeviceId(),
mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
mobilePosition.setReportSource("Mobile Position");
mobilePositionService.add(mobilePosition);
// 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
List<DeviceMobilePosition> mobilePositions = DeviceMobilePosition.decode(device, rootElement);
for (DeviceMobilePosition mobilePosition : mobilePositions) {
log.info("[收到移动位置订阅通知]{}/{}->{}.{}, 时间: {}", device.getDeviceId(), mobilePosition.getChannelDeviceId(),
mobilePosition.getLongitude(), mobilePosition.getLatitude(), mobilePosition.getTimestamp());
mobilePositionList.add(mobilePosition);
}
} catch (Exception e) {
log.warn("[notify-移动位置] 发现未处理的异常, \r\n{}", evt.getRequest());
log.error("[notify-移动位置] 异常内容: ", e);
}
}
if (!mobilePositionList.isEmpty()) {
try {
eventPublisher.mobilePositionEventPublish(mobilePosition);
eventPublisher.mobilePositionsEventPublish(mobilePositionList);
} catch (Exception e) {
log.error("[MobilePositionEvent] 发送失败: ", e);
}
} catch (DocumentException e) {
log.error("[收到移动位置订阅通知] 文档解析异常: \r\n{}", evt.getRequest(), e);
} catch ( Exception e) {
log.error("[收到移动位置订阅通知] 异常: ", e);
}
}
}
// @Scheduled(fixedRate = 10000)
// public void execute(){
// logger.debug("[待处理Notify-移动位置订阅消息数量]: {}", taskQueue.size());
// }
}

View File

@ -114,22 +114,16 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
if (deviceChannel == null) {
log.warn("[解析报警消息] 未找到通道:{}/{}", device.getDeviceId(), deviceAlarmNotify.getChannelId());
} else {
MobilePosition mobilePosition = new MobilePosition();
DeviceMobilePosition mobilePosition = new DeviceMobilePosition();
mobilePosition.setCreateTime(DateUtil.getNow());
mobilePosition.setDeviceId(device.getDeviceId());
mobilePosition.setChannelId(deviceChannel.getId());
mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId());
mobilePosition.setTime(deviceAlarmNotify.getAlarmTime());
mobilePosition.setTimestamp(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestampMs(deviceAlarmNotify.getAlarmTime()));
mobilePosition.setLongitude(deviceAlarmNotify.getLongitude());
mobilePosition.setLatitude(deviceAlarmNotify.getLatitude());
mobilePosition.setReportSource("GPS Alarm");
// 更新device channel 的经纬度
deviceChannel.setLongitude(mobilePosition.getLongitude());
deviceChannel.setLatitude(mobilePosition.getLatitude());
deviceChannel.setGpsTime(mobilePosition.getTime());
deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
mobilePosition.setDevice(device);
// 发送移动位置事件后续会保存到数据库并且发送给上级平台
publisher.mobilePositionsEventPublish(List.of(mobilePosition));
}
}

View File

@ -1,33 +1,29 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.utils.DateUtil;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.message.Response;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
/**
* 移动设备位置数据通知设备主动发起不需要上级订阅
*/
@ -36,101 +32,88 @@ import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
@RequiredArgsConstructor
public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
private final String cmdType = "MobilePosition";
private final NotifyMessageHandler notifyMessageHandler;
@Autowired
private NotifyMessageHandler notifyMessageHandler;
private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
@Autowired
private IDeviceChannelService deviceChannelService;
private final EventPublisher eventPublisher;
private ConcurrentLinkedQueue<SipMsgInfo> taskQueue = new ConcurrentLinkedQueue<>();
private final UserSetting userSetting;
@Autowired
private TaskExecutor taskExecutor;
@Override
public void afterPropertiesSet() throws Exception {
String cmdType = "MobilePosition";
notifyMessageHandler.addHandler(cmdType, this);
}
@Override
public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
boolean isEmpty = taskQueue.isEmpty();
taskQueue.offer(new SipMsgInfo(evt, device, rootElement));
if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
log.error("[message-notify-移动位置] 待处理消息队列已满 {}返回486 BUSY_HERE", userSetting.getMaxNotifyCountQueue());
return;
}
taskQueue.offer(new HandlerCatchData(evt, device, rootElement));
// 回复200 OK
try {
responseAckAsync((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 移动位置通知回复: {}", e.getMessage());
}
if (isEmpty) {
taskExecutor.execute(() -> {
while (!taskQueue.isEmpty()) {
SipMsgInfo sipMsgInfo = taskQueue.poll();
}
@Scheduled(fixedDelay = 400) //每400毫秒执行一次
@Async
public void executeTaskQueue(){
if (taskQueue.isEmpty()) {
return;
}
List<HandlerCatchData> handlerCatchDataList = new ArrayList<>();
int size = taskQueue.size();
for (int i = 0; i < size; i++) {
HandlerCatchData poll = taskQueue.poll();
if (poll != null) {
handlerCatchDataList.add(poll);
}
}
if (handlerCatchDataList.isEmpty()) {
return;
}
List<DeviceMobilePosition> mobilePositionList = new ArrayList<>();
for (HandlerCatchData take : handlerCatchDataList) {
if (take == null) {
continue;
}
Device device = take.getDevice();
try {
Element rootElementAfterCharset = getRootElement(sipMsgInfo.getEvt(), sipMsgInfo.getDevice().getCharset());
Element rootElementAfterCharset = getRootElement(take.getEvt(), device.getCharset());
if (rootElementAfterCharset == null) {
log.warn("[移动位置通知] {}处理失败,未识别到信息体", device.getDeviceId());
continue;
}
String channelId = getText(rootElementAfterCharset, "DeviceID");
DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), channelId);
if (deviceChannel == null) {
log.warn("[解析移动位置通知] 未找到通道:{}/{}", device.getDeviceId(), channelId);
continue;
}
MobilePosition mobilePosition = new MobilePosition();
mobilePosition.setCreateTime(DateUtil.getNow());
if (!ObjectUtils.isEmpty(sipMsgInfo.getDevice().getName())) {
mobilePosition.setDeviceName(sipMsgInfo.getDevice().getName());
}
mobilePosition.setDeviceId(sipMsgInfo.getDevice().getDeviceId());
mobilePosition.setChannelId(deviceChannel.getId());
mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId());
String time = getText(rootElementAfterCharset, "Time");
if (ObjectUtils.isEmpty(time)){
mobilePosition.setTime(DateUtil.getNow());
}else {
mobilePosition.setTime(SipUtils.parseTime(time));
}
mobilePosition.setLongitude(Double.parseDouble(getText(rootElementAfterCharset, "Longitude")));
mobilePosition.setLatitude(Double.parseDouble(getText(rootElementAfterCharset, "Latitude")));
if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Speed"))) {
mobilePosition.setSpeed(Double.parseDouble(getText(rootElementAfterCharset, "Speed")));
} else {
mobilePosition.setSpeed(0.0);
}
if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Direction"))) {
mobilePosition.setDirection(Double.parseDouble(getText(rootElementAfterCharset, "Direction")));
} else {
mobilePosition.setDirection(0.0);
}
if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Altitude"))) {
mobilePosition.setAltitude(Double.parseDouble(getText(rootElementAfterCharset, "Altitude")));
} else {
mobilePosition.setAltitude(0.0);
}
mobilePosition.setReportSource("Mobile Position");
// 更新device channel 的经纬度
deviceChannel.setLongitude(mobilePosition.getLongitude());
deviceChannel.setLatitude(mobilePosition.getLatitude());
deviceChannel.setGpsTime(mobilePosition.getTime());
deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
} catch (DocumentException e) {
log.error("未处理的异常 ", e);
List<DeviceMobilePosition> mobilePositions = DeviceMobilePosition.decode(device, rootElementAfterCharset);
for (DeviceMobilePosition mobilePosition : mobilePositions) {
try {
log.info("[收到移动位置订阅通知]{}/{}->{}.{}, 时间: {}", device.getDeviceId(), mobilePosition.getChannelDeviceId(),
mobilePosition.getLongitude(), mobilePosition.getLatitude(), mobilePosition.getTimestamp());
mobilePositionList.add(mobilePosition);
}catch (Exception e) {
log.warn("[移动位置通知] 发现未处理的异常, \r\n{}", evt.getRequest());
log.error("未处理的异常 ", e);
}
}
}catch (Exception e) {
log.warn("[移动位置通知] 发现未处理的异常, \r\n{}", take.getEvt().getRequest());
log.error("[移动位置通知] 异常内容: ", e);
}
}
});
// 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
if (!mobilePositionList.isEmpty()) {
try {
eventPublisher.mobilePositionsEventPublish(mobilePositionList);
}catch (Exception e) {
log.error("[MobilePositionEvent] 发送失败: ", e);
}
}
}

View File

@ -1,34 +1,33 @@
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.cmd;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.bean.DeviceMobilePosition;
import com.genersoft.iot.vmp.gb28181.bean.HandlerCatchData;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
import com.genersoft.iot.vmp.gb28181.transmit.callback.DeferredResultHolder;
import com.genersoft.iot.vmp.gb28181.transmit.callback.RequestMessage;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.response.ResponseMessageHandler;
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
import com.genersoft.iot.vmp.utils.DateUtil;
import gov.nist.javax.sip.message.SIPRequest;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import javax.sip.message.Response;
import java.text.ParseException;
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* 移动设备位置数据查询回复
@ -36,18 +35,20 @@ import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class MobilePositionResponseMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
private final String cmdType = "MobilePosition";
@Autowired
private ResponseMessageHandler responseMessageHandler;
private final ResponseMessageHandler responseMessageHandler;
@Autowired
private IDeviceChannelService deviceChannelService;
private final EventPublisher eventPublisher;
@Autowired
private DeferredResultHolder resultHolder;
private final UserSetting userSetting;
private final DeferredResultHolder resultHolder;
private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
@Override
public void afterPropertiesSet() throws Exception {
@ -56,81 +57,66 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar
@Override
public void handForDevice(RequestEvent evt, Device device, Element rootElement) {
SIPRequest request = (SIPRequest) evt.getRequest();
try {
rootElement = getRootElement(evt, device.getCharset());
if (rootElement == null) {
log.warn("[ 移动设备位置数据查询回复 ] content cannot be null, {}", evt.getRequest());
try {
responseAckAsync(request, Response.BAD_REQUEST);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 移动设备位置数据查询 BAD_REQUEST: {}", e.getMessage());
}
if (taskQueue.size() >= userSetting.getMaxNotifyCountQueue()) {
log.error("[移动设备位置查询回复] 待处理消息队列已满 {},丢弃消息", userSetting.getMaxNotifyCountQueue());
return;
}
String channelId = getText(rootElement, "DeviceID");
DeviceChannel deviceChannel = deviceChannelService.getOne(device.getDeviceId(), channelId);
if (deviceChannel == null) {
log.warn("[解析报警消息] 未找到通道:{}/{}", device.getDeviceId(), channelId);
}else {
MobilePosition mobilePosition = new MobilePosition();
mobilePosition.setCreateTime(DateUtil.getNow());
if (!ObjectUtils.isEmpty(device.getName())) {
mobilePosition.setDeviceName(device.getName());
taskQueue.offer(new HandlerCatchData(evt, device, rootElement));
try {
responseAckAsync((SIPRequest) evt.getRequest(), Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 移动设备位置数据查询 200: {}", e.getMessage());
}
mobilePosition.setDeviceId(device.getDeviceId());
mobilePosition.setChannelId(deviceChannel.getId());
mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId());
//兼容ISO 8601格式时间
String time = getText(rootElement, "Time");
if (ObjectUtils.isEmpty(time)){
mobilePosition.setTime(DateUtil.getNow());
}else {
mobilePosition.setTime(SipUtils.parseTime(time));
}
mobilePosition.setLongitude(Double.parseDouble(getText(rootElement, "Longitude")));
mobilePosition.setLatitude(Double.parseDouble(getText(rootElement, "Latitude")));
if (NumericUtil.isDouble(getText(rootElement, "Speed"))) {
mobilePosition.setSpeed(Double.parseDouble(getText(rootElement, "Speed")));
} else {
mobilePosition.setSpeed(0.0);
}
if (NumericUtil.isDouble(getText(rootElement, "Direction"))) {
mobilePosition.setDirection(Double.parseDouble(getText(rootElement, "Direction")));
} else {
mobilePosition.setDirection(0.0);
}
if (NumericUtil.isDouble(getText(rootElement, "Altitude"))) {
mobilePosition.setAltitude(Double.parseDouble(getText(rootElement, "Altitude")));
} else {
mobilePosition.setAltitude(0.0);
}
mobilePosition.setReportSource("Mobile Position");
// 更新device channel 的经纬度
deviceChannel.setLongitude(mobilePosition.getLongitude());
deviceChannel.setLatitude(mobilePosition.getLatitude());
deviceChannel.setGpsTime(mobilePosition.getTime());
deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
@Scheduled(fixedDelay = 400)
@Async
public void executeTaskQueue() {
if (taskQueue.isEmpty()) {
return;
}
List<HandlerCatchData> handlerCatchDataList = new ArrayList<>();
int size = taskQueue.size();
for (int i = 0; i < size; i++) {
HandlerCatchData poll = taskQueue.poll();
if (poll != null) {
handlerCatchDataList.add(poll);
}
}
if (handlerCatchDataList.isEmpty()) {
return;
}
List<DeviceMobilePosition> mobilePositionList = new ArrayList<>();
for (HandlerCatchData take : handlerCatchDataList) {
Device device = take.getDevice();
try {
Element rootElementAfterCharset = getRootElement(take.getEvt(), device.getCharset());
if (rootElementAfterCharset == null) {
log.warn("[移动设备位置查询回复] {}处理失败,未识别到信息体", device.getDeviceId());
continue;
}
List<DeviceMobilePosition> mobilePositions = DeviceMobilePosition.decode(device, rootElementAfterCharset);
for (DeviceMobilePosition mobilePosition : mobilePositions) {
log.info("[收到移动位置查询回复]{}/{}->{}.{}, 时间: {}", device.getDeviceId(), mobilePosition.getChannelDeviceId(),
mobilePosition.getLongitude(), mobilePosition.getLatitude(), mobilePosition.getTimestamp());
mobilePositionList.add(mobilePosition);
String key = DeferredResultHolder.CALLBACK_CMD_MOBILE_POSITION + device.getDeviceId();
RequestMessage msg = new RequestMessage();
msg.setKey(key);
msg.setData(mobilePosition);
resultHolder.invokeAllResult(msg);
}
//回复 200 OK
try {
responseAckAsync(request, Response.OK);
} catch (SipException | InvalidArgumentException | ParseException e) {
log.error("[命令发送失败] 移动设备位置数据查询 200: {}", e.getMessage());
} catch (Exception e) {
log.warn("[移动设备位置查询回复] 发现未处理的异常, \r\n{}", take.getEvt().getRequest());
log.error("[移动设备位置查询回复] 异常内容: ", e);
}
}
if (!mobilePositionList.isEmpty()) {
try {
eventPublisher.mobilePositionsEventPublish(mobilePositionList);
} catch (Exception e) {
log.error("[MobilePositionEvent] 发送失败: ", e);
}
} catch (DocumentException e) {
log.error("未处理的异常 ", e);
}
}

View File

@ -49,7 +49,7 @@ public class RecordInfoResponseMessageHandler extends SIPRequestProcessorParent
private ApplicationEventPublisher applicationEventPublisher;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
private Long recordInfoTtl = 1800L;

View File

@ -23,6 +23,7 @@ import javax.sip.header.UserAgentHeader;
import javax.sip.message.Request;
import java.text.ParseException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.List;
@ -264,4 +265,24 @@ public class SipUtils {
}
return localDateTime.format(DateUtil.formatter);
}
public static Long parseTimeForTimestamp(String timeStr) {
if (ObjectUtils.isEmpty(timeStr)){
return null;
}
LocalDateTime localDateTime;
try {
localDateTime = LocalDateTime.parse(timeStr);
}catch (DateTimeParseException e) {
try {
localDateTime = LocalDateTime.parse(timeStr, DateUtil.formatterISO8601);
}catch (DateTimeParseException e2) {
log.error("[格式化时间] 无法格式化时间: {}", timeStr);
return null;
}
}
// 返回毫秒值
return localDateTime.atZone(ZoneId.of(DateUtil.zoneStr)).toInstant().toEpochMilli();
}
}

View File

@ -3,6 +3,7 @@ package com.genersoft.iot.vmp.jt1078.service.impl;
import com.genersoft.iot.vmp.common.enums.ChannelDataType;
import com.genersoft.iot.vmp.common.enums.MediaStreamUtil;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.service.ISourceOtherService;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078PlayService;
import com.genersoft.iot.vmp.jt1078.service.Ijt1078Service;
@ -10,6 +11,8 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.util.List;
@Slf4j
@Service(ChannelDataType.OTHER_SERVICE + ChannelDataType.JT_1078)
@RequiredArgsConstructor
@ -17,8 +20,6 @@ public class SourceOtherServiceForJTImpl implements ISourceOtherService {
private final UserSetting userSetting;
private final Ijt1078Service ijt1078Service;
private final Ijt1078PlayService jt1078PlayService;
@Override
@ -43,4 +44,10 @@ public class SourceOtherServiceForJTImpl implements ISourceOtherService {
}
return false;
}
@Override
public Boolean addChannelIdForMobilePosition(List<? extends MobilePosition> mobilePositionList) {
return null;
}
}

View File

@ -65,7 +65,7 @@ public class jt1078PlayServiceImpl implements Ijt1078PlayService {
private JT1078Template jt1078Template;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private HookSubscribe subscribe;

View File

@ -66,7 +66,7 @@ public class jt1078ServiceImpl implements Ijt1078Service {
private JT1078Template jt1078Template;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private IGbChannelService channelService;

View File

@ -80,7 +80,7 @@ public class ABLHttpHookListener {
private SSRCFactory ssrcFactory;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ApplicationEventPublisher applicationEventPublisher;

View File

@ -73,7 +73,7 @@ public class MediaServerServiceImpl implements IMediaServerService {
private IInviteStreamService inviteStreamService;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private Map<String, IMediaNodeServerService> nodeServerServiceMap;

View File

@ -8,14 +8,10 @@ import java.util.List;
public interface IMobilePositionService {
void add(List<MobilePosition> mobilePositionList);
void add(MobilePosition mobilePosition);
List<MobilePosition> queryMobilePositions(String deviceId, String channelId, String startTime, String endTime);
List<MobilePosition> queryMobilePositions(Integer channelId, String startTime, String endTime);
List<Platform> queryEnablePlatformListWithAsMessageChannel();
MobilePosition queryLatestPosition(String deviceId);
MobilePosition queryLatestPosition(Integer channelId);
}

View File

@ -1,6 +1,7 @@
package com.genersoft.iot.vmp.service.bean;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.utils.DateUtil;
import lombok.Data;
@Data
@ -61,7 +62,7 @@ public class GPSMsgInfo {
gpsMsgInfo.setLat(mobilePosition.getLatitude());
gpsMsgInfo.setSpeed(mobilePosition.getSpeed());
gpsMsgInfo.setDirection(mobilePosition.getDirection());
gpsMsgInfo.setTime(mobilePosition.getTime());
gpsMsgInfo.setTime(DateUtil.timestampMsTo_yyyy_MM_dd_HH_mm_ss(mobilePosition.getTimestamp()));
return gpsMsgInfo;
}
}

View File

@ -1,82 +1,55 @@
package com.genersoft.iot.vmp.service.impl;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.Device;
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
import com.genersoft.iot.vmp.gb28181.bean.Platform;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMobilePositionMapper;
import com.genersoft.iot.vmp.gb28181.dao.MobilePositionMapper;
import com.genersoft.iot.vmp.gb28181.dao.PlatformMapper;
import com.genersoft.iot.vmp.gb28181.utils.Coordtransform;
import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent;
import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService;
import com.genersoft.iot.vmp.gb28181.service.ISourceOtherService;
import com.genersoft.iot.vmp.service.IMobilePositionService;
import com.genersoft.iot.vmp.utils.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
@Slf4j
@Service
@RequiredArgsConstructor
public class MobilePositionServiceImpl implements IMobilePositionService {
@Autowired
private DeviceMapper deviceMapper;
private final ConcurrentLinkedQueue<MobilePosition> mobilePositionQueue = new ConcurrentLinkedQueue<>();
@Autowired
private DeviceChannelMapper channelMapper;
private final Map<String, ISourceOtherService> sourceOtherServiceMap;
@Autowired
private DeviceMobilePositionMapper mobilePositionMapper;
@Autowired
private UserSetting userSetting;
@Autowired
private PlatformMapper platformMapper;
@Autowired
private RedisTemplate<String, MobilePosition> redisTemplate;
private final String REDIS_MOBILE_POSITION_LIST = "redis_mobile_position_list";
@Override
public void add(MobilePosition mobilePosition) {
List<MobilePosition> list = new ArrayList<>();
list.add(mobilePosition);
add(list);
}
@Override
public void add(List<MobilePosition> mobilePositionList) {
redisTemplate.opsForList().leftPushAll(REDIS_MOBILE_POSITION_LIST, mobilePositionList);
}
private List<MobilePosition> get(int length) {
Long size = redisTemplate.opsForList().size(REDIS_MOBILE_POSITION_LIST);
if (size == null || size == 0) {
return new ArrayList<>();
}
return redisTemplate.opsForList().rightPop(REDIS_MOBILE_POSITION_LIST, Math.min(length, size));
}
private final MobilePositionMapper mobilePositionMapper;
private final IPlatformChannelService platformChannelService;
private final PlatformMapper platformMapper;
/**
* 查询移动位置轨迹
*/
@Override
public synchronized List<MobilePosition> queryMobilePositions(String deviceId, String channelId, String startTime, String endTime) {
return mobilePositionMapper.queryPositionByDeviceIdAndTime(deviceId, channelId, startTime, endTime);
public synchronized List<MobilePosition> queryMobilePositions(Integer channelId, String startTime, String endTime) {
Long startTimestamp = null;
Long endTimestamp = null;
if (startTime != null) {
startTimestamp = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestampMs(startTime);
}
if (endTime != null) {
endTimestamp = DateUtil.yyyy_MM_dd_HH_mm_ssToTimestampMs(endTime);
}
return mobilePositionMapper.queryPositionByDeviceIdAndTime(channelId, startTimestamp, endTimestamp);
}
@Override
@ -88,57 +61,60 @@ public class MobilePositionServiceImpl implements IMobilePositionService {
* 查询最新移动位置
*/
@Override
public MobilePosition queryLatestPosition(String deviceId) {
return mobilePositionMapper.queryLatestPositionByDevice(deviceId);
public MobilePosition queryLatestPosition(Integer channelId) {
return mobilePositionMapper.queryLatestPosition(channelId);
}
@Scheduled(fixedDelay = 1000)
@Transactional
public void executeTaskQueue() {
int countLimit = 3000;
List<MobilePosition> mobilePositions = get(countLimit);
if (mobilePositions == null || mobilePositions.isEmpty()) {
@Async
@EventListener
public void onApplicationEvent(MobilePositionEvent event) {
if (event.getMobilePositionList() == null || event.getMobilePositionList().isEmpty()) {
return;
}
if (userSetting.getSavePositionHistory()) {
mobilePositionMapper.batchadd(mobilePositions);
}
log.info("[移动位置订阅]更新通道位置: {}", mobilePositions.size());
Map<String, Map<Integer, DeviceChannel>> updateChannelMap = new HashMap<>();
for (MobilePosition mobilePosition : mobilePositions) {
DeviceChannel deviceChannel = new DeviceChannel();
deviceChannel.setId(mobilePosition.getChannelId());
deviceChannel.setDeviceId(mobilePosition.getDeviceId());
deviceChannel.setLongitude(mobilePosition.getLongitude());
deviceChannel.setLatitude(mobilePosition.getLatitude());
deviceChannel.setGpsTime(mobilePosition.getTime());
deviceChannel.setUpdateTime(DateUtil.getNow());
if (mobilePosition.getLongitude() > 0 || mobilePosition.getLatitude() > 0) {
Double[] wgs84Position = Coordtransform.GCJ02ToWGS84(mobilePosition.getLongitude(), mobilePosition.getLatitude());
deviceChannel.setGbLongitude(wgs84Position[0]);
deviceChannel.setGbLatitude(wgs84Position[1]);
}
if (!updateChannelMap.containsKey(mobilePosition.getDeviceId())) {
updateChannelMap.put(mobilePosition.getDeviceId(), new HashMap<>());
}
updateChannelMap.get(mobilePosition.getDeviceId()).put(mobilePosition.getChannelId(), deviceChannel);
}
List<String> deviceIds = new ArrayList<>(updateChannelMap.keySet());
if (deviceIds.isEmpty()) {
log.info("[移动位置订阅]为查询到对应的设备,消息已经忽略");
if (event.getMobilePositionList().get(0).getChannelId() != null) {
mobilePositionQueue.addAll(event.getMobilePositionList());
return;
}
List<Device> deviceList = deviceMapper.queryByDeviceIds(deviceIds);
for (Device device : deviceList) {
Map<Integer, DeviceChannel> channelMap = updateChannelMap.get(device.getDeviceId());
if (device.getGeoCoordSys().equalsIgnoreCase("GCJ02")) {
channelMap.values().forEach(channel -> {
Double[] wgs84Position = Coordtransform.GCJ02ToWGS84(channel.getLongitude(), channel.getLatitude());
channel.setGbLongitude(wgs84Position[0]);
channel.setGbLatitude(wgs84Position[1]);
});
for (ISourceOtherService sourceOtherService : sourceOtherServiceMap.values()) {
try {
// 此时已经完成了通道ID的添加以及坐标系的转换后续只需要将数据保存到数据库即可
Boolean addResult = sourceOtherService.addChannelIdForMobilePosition(event.getMobilePositionList());
if (addResult != null && addResult) {
mobilePositionQueue.addAll(event.getMobilePositionList());
}
channelMapper.batchUpdatePosition(new ArrayList<>(channelMap.values()));
}catch (Exception e) {
log.error("[移动位置事件] 处理移动位置事件失败", e);
}
}
}
@Scheduled(fixedDelay = 500)
public void executeMobilePositionQueue() {
if (mobilePositionQueue.isEmpty()) {
return;
}
List<MobilePosition> handlerCatchDataList = new ArrayList<>();
int size = mobilePositionQueue.size();
for (int i = 0; i < size; i++) {
MobilePosition poll = mobilePositionQueue.poll();
if (poll != null) {
handlerCatchDataList.add(poll);
}
}
if (handlerCatchDataList.isEmpty()) {
return;
}
List<MobilePosition> mobilePositionList = handlerCatchDataList.stream().filter(
mobilePosition -> mobilePosition.getChannelId() != 0).toList();
// 发送通知方便国标级联转发给上级
Thread.startVirtualThread(() -> platformChannelService.notifyMobilePosition(mobilePositionList));
// 批量保存到数据库
int batchSize = 1000;
for (int i = 0; i < mobilePositionList.size(); i += batchSize) {
int end = Math.min(i + batchSize, mobilePositionList.size());
List<MobilePosition> batchList = mobilePositionList.subList(i, end);
mobilePositionMapper.batchAdd(batchList);
}
}

View File

@ -53,7 +53,7 @@ public class RtpServerServiceImpl implements IReceiveRtpServerService {
private HookSubscribe subscribe;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
/**
* 流到来的处理

View File

@ -27,7 +27,7 @@ public class SendRtpServerServiceImpl implements ISendRtpServerService {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Override

View File

@ -20,7 +20,7 @@ public class UserApiKeyServiceImpl implements IUserApiKeyService {
UserApiKeyMapper userApiKeyMapper;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Override
public int addApiKey(UserApiKey userApiKey) {

View File

@ -6,6 +6,7 @@ import com.genersoft.iot.vmp.service.bean.GPSMsgInfo;
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
import com.genersoft.iot.vmp.streamPush.service.IStreamPushService;
import com.genersoft.iot.vmp.utils.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.springframework.beans.factory.annotation.Autowired;
@ -29,16 +30,12 @@ import java.util.stream.Collectors;
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisGpsMsgListener implements MessageListener {
@Autowired
private IRedisCatchStorage redisCatchStorage;
private final IRedisCatchStorage redisCatchStorage;
@Autowired
private IStreamPushService streamPushService;
@Autowired
private IGbChannelService channelService;
private final IGbChannelService channelService;
private final ConcurrentLinkedQueue<Message> taskQueue = new ConcurrentLinkedQueue<>();

View File

@ -35,7 +35,7 @@ public class RedisRpcChannelPlayController extends RpcController {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private IGbChannelService channelService;

View File

@ -26,7 +26,7 @@ public class RedisRpcCloudRecordController extends RpcController {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private ICloudRecordService cloudRecordService;

View File

@ -31,7 +31,7 @@ public class RedisRpcDeviceController extends RpcController {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private IDeviceService deviceService;

View File

@ -29,7 +29,7 @@ public class RedisRpcDevicePlayController extends RpcController {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private IDeviceService deviceService;

View File

@ -25,7 +25,7 @@ public class RedisRpcGbDeviceController extends RpcController {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private IDeviceService deviceService;

View File

@ -33,7 +33,7 @@ public class RedisRpcPlatformController extends RpcController {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private IPlatformService platformService;

View File

@ -27,7 +27,7 @@ public class RedisRpcStreamProxyController extends RpcController {
private UserSetting userSetting;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private IStreamProxyPlayService streamProxyPlayService;

View File

@ -46,7 +46,7 @@ public class RedisRpcStreamPushController extends RpcController {
private HookSubscribe hookSubscribe;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private IStreamPushPlayService streamPushPlayService;
@ -102,7 +102,7 @@ public class RedisRpcStreamPushController extends RpcController {
sendRtpItem.setLocalIp(hookData.getMediaServer().getSdpIp());
sendRtpItem.setServerId(userSetting.getServerId());
redisTemplate.opsForValue().set(sendRtpItem.getChannelId(), sendRtpItem);
redisTemplate.opsForValue().set(sendRtpItem.getChannelId() + "", sendRtpItem);
RedisRpcResponse response = request.getResponse();
response.setBody(sendRtpItem.getChannelId());
response.setStatusCode(ErrorCode.SUCCESS.getCode());

View File

@ -47,7 +47,7 @@ public class RedisRpcServiceImpl implements IRedisRpcService {
private SSRCFactory ssrcFactory;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private IMediaServerService mediaServerService;

View File

@ -7,7 +7,6 @@ import com.genersoft.iot.vmp.common.SystemAllInfo;
import com.genersoft.iot.vmp.common.VideoManagerConstants;
import com.genersoft.iot.vmp.conf.UserSetting;
import com.genersoft.iot.vmp.gb28181.bean.*;
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
import com.genersoft.iot.vmp.media.bean.MediaInfo;
import com.genersoft.iot.vmp.media.bean.MediaServer;
@ -19,6 +18,7 @@ import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.utils.JsonUtil;
import com.genersoft.iot.vmp.utils.SystemInfoUtils;
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jspecify.annotations.NonNull;
import org.springframework.beans.factory.annotation.Autowired;
@ -31,29 +31,21 @@ import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.*;
@SuppressWarnings("rawtypes")
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisCatchStorageImpl implements IRedisCatchStorage {
@Autowired
private DeviceChannelMapper deviceChannelMapper;
private final DeviceMapper deviceMapper;
@Autowired
private DeviceMapper deviceMapper;
private final UserSetting userSetting;
@Autowired
private UserSetting userSetting;
private final RedisTemplate<String, Object> redisTemplate;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private final RedisTemplate<String, Long> longRedisTemplate;
@Autowired
private RedisTemplate<String, Long> longRedisTemplate;
@Autowired
private StringRedisTemplate stringRedisTemplate;
private final StringRedisTemplate stringRedisTemplate;
@Override
public List<SendRtpInfo> queryAllSendRTPServer() {
@ -126,7 +118,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
String key = VideoManagerConstants.WVP_SERVER_STREAM_PREFIX + userSetting.getServerId() + "_" + type.toUpperCase() + "_*_*_" + mediaServerId;
List<Object> streams = RedisUtil.scan(redisTemplate, key);
for (Object stream : streams) {
redisTemplate.delete(stream);
redisTemplate.delete((String) stream);
}
}
@ -268,7 +260,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
MediaInfo result = null;
List<Object> keys = RedisUtil.scan(redisTemplate, scanKey);
if (keys.size() > 0) {
if (!keys.isEmpty()) {
String key = (String) keys.get(0);
String mediaInfoJson = (String)redisTemplate.opsForValue().get(key);
result = JSON.parseObject(mediaInfoJson, MediaInfo.class);
@ -283,7 +275,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
MediaInfo result = null;
List<Object> keys = RedisUtil.scan(redisTemplate, scanKey);
if (keys.size() > 0) {
if (!keys.isEmpty()) {
String key = (String) keys.get(0);
String mediaInfoJson = (String)redisTemplate.opsForValue().get(key);
result = JSON.parseObject(mediaInfoJson, MediaInfo.class);
@ -423,7 +415,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
}
msg.append(" ").append(online? "ON":"OFF");
log.info("[redis通知] 推送设备/通道状态-> {} ", msg);
// 使用 RedisTemplate<Object, Object> 发送字符串消息会导致发送的消息多带了双引号
// 使用 RedisTemplate<String, Object> 发送字符串消息会导致发送的消息多带了双引号
stringRedisTemplate.convertAndSend(key, msg.toString());
}
@ -439,7 +431,7 @@ public class RedisCatchStorageImpl implements IRedisCatchStorage {
}
msg.append(" ").append(add? "ADD":"DELETE");
log.info("[redis通知] 推送通道-> {}", msg);
// 使用 RedisTemplate<Object, Object> 发送字符串消息会导致发送的消息多带了双引号
// 使用 RedisTemplate<String, Object> 发送字符串消息会导致发送的消息多带了双引号
stringRedisTemplate.convertAndSend(key, msg.toString());
}

View File

@ -24,7 +24,7 @@ public final class JsonUtil {
* @param <T>
* @return result type
*/
public static <T> T redisJsonToObject(RedisTemplate<Object, Object> redisTemplate, String key, Class<T> clazz) {
public static <T> T redisJsonToObject(RedisTemplate<String, Object> redisTemplate, String key, Class<T> clazz) {
Object jsonObject = redisTemplate.opsForValue().get(key);
if (Objects.isNull(jsonObject)) {
return null;
@ -32,7 +32,7 @@ public final class JsonUtil {
return clazz.cast(jsonObject);
}
public static <T> T redisHashJsonToObject(RedisTemplate<Object, Object> redisTemplate, String key, String objKey, Class<T> clazz) {
public static <T> T redisHashJsonToObject(RedisTemplate<String, Object> redisTemplate, String key, String objKey, Class<T> clazz) {
// if (key == null || objKey == null) {
// return null;
// }

View File

@ -25,7 +25,7 @@ public class TestController {
private HookSubscribe subscribe;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@GetMapping("/hook/list")
public List<Hook> all(){

View File

@ -62,7 +62,7 @@ public class PsController {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@GetMapping(value = "/receive/open")
@ -172,7 +172,7 @@ public class PsController {
if (!scan.isEmpty()) {
for (Object key : scan) {
// 将信息写入redis中以备后用
redisTemplate.delete(key);
redisTemplate.delete((String) key);
}
}
}

View File

@ -62,7 +62,7 @@ public class RtpController {
private DynamicTask dynamicTask;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@GetMapping(value = "/receive/open")
@ -185,7 +185,7 @@ public class RtpController {
if (scan.size() > 0) {
for (Object key : scan) {
// 将信息写入redis中以备后用
redisTemplate.delete(key);
redisTemplate.delete((String)key);
}
}
}

View File

@ -17,6 +17,7 @@ import com.genersoft.iot.vmp.gb28181.service.IGbChannelControlService;
import com.genersoft.iot.vmp.gb28181.service.IGbChannelPlayService;
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
import com.genersoft.iot.vmp.utils.Coordtransform;
import com.genersoft.iot.vmp.utils.DateUtil;
import com.genersoft.iot.vmp.vmanager.bean.ErrorCode;
import com.genersoft.iot.vmp.web.custom.bean.*;
import com.genersoft.iot.vmp.web.custom.conf.SyTokenManager;
@ -29,9 +30,11 @@ import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.Executors;
@Slf4j
@Service
@ -51,7 +54,7 @@ public class CameraChannelService implements CommandLineRunner {
private GroupMapper groupMapper;
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private RedisTemplate<String, String> redisTemplateForString;
@ -321,10 +324,13 @@ public class CameraChannelService implements CommandLineRunner {
}
// 监听GPS消息如果是移动设备则发送redis消息
@Async
@EventListener
public void onApplicationEvent(MobilePositionEvent event) {
MobilePosition mobilePosition = event.getMobilePosition();
List<? extends MobilePosition> mobilePositionList = event.getMobilePositionList();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (MobilePosition mobilePosition : mobilePositionList) {
executor.submit(() -> {
// 从redis补充信息
SYMember member = getMember(mobilePosition.getChannelDeviceId());
if (member == null) {
@ -334,7 +340,7 @@ public class CameraChannelService implements CommandLineRunner {
// 发送redis消息
JSONObject jsonObject = new JSONObject();
jsonObject.put("gpsDate", mobilePosition.getTime());
jsonObject.put("gpsDate", DateUtil.timestampMsTo_yyyy_MM_dd_HH_mm_ss(mobilePosition.getTimestamp()));
jsonObject.put("unicodeNo", member.getUnicodeNo());
jsonObject.put("memberNo", member.getNo());
jsonObject.put("unitNo", member.getUnitNo());
@ -347,6 +353,12 @@ public class CameraChannelService implements CommandLineRunner {
jsonObject.put("gbDeviceId", mobilePosition.getChannelDeviceId());
log.info("[SY-redis发送通知-移动设备位置信息] 发送 {}: {}", REDIS_GPS_MESSAGE, jsonObject.toString());
redisTemplateForString.convertAndSend(REDIS_GPS_MESSAGE, jsonObject.toString());
});
}
}
}
public SYMember getMember(String deviceId) {

View File

@ -23,7 +23,7 @@ import java.util.List;
public class SyServiceImpl implements IMapService {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
private RedisTemplate<String, Object> redisTemplate;
@Override
public List<MapConfig> getConfig() {

View File

@ -487,12 +487,15 @@ export default {
},
nodeClickHandler: function(data, node, tree) {
console.log(data)
console.log(data.nextData.length)
console.log(this.treeLimit)
if (data && data.nextData && data.nextData.length > 0) {
const parentNode = node.parent
let nextData = data.nextData
if (nextData.length > this.treeLimit) {
let subData = nextData.splice(0, this.treeLimit)
console.log(subData)
subData.push({
treeId: '---',
deviceId: '---',
@ -506,7 +509,6 @@ export default {
for (let item of subData) {
this.$refs.veTree.append(item, parentNode)
}
} else {
this.$refs.veTree.remove(data, parentNode)
for (let item of nextData) {

View File

@ -57,20 +57,17 @@ create table IF NOT EXISTS wvp_device_alarm
);
-- 存储移动位置订阅上报的数据
drop table IF EXISTS wvp_device_mobile_position;
create table IF NOT EXISTS wvp_device_mobile_position
drop table IF EXISTS wvp_mobile_position;
create table IF NOT EXISTS wvp_mobile_position
(
id serial primary key COMMENT '主键ID',
device_id character varying(50) not null COMMENT '设备ID',
channel_id character varying(50) not null COMMENT '通道ID',
device_name character varying(255) COMMENT '设备名称',
time character varying(50) COMMENT '上报时间',
timestamp BIGINT COMMENT '上报时间',
longitude double precision COMMENT '经度',
latitude double precision COMMENT '纬度',
altitude double precision COMMENT '海拔',
speed double precision COMMENT '速度',
direction double precision COMMENT '方向角',
report_source character varying(50) COMMENT '上报来源',
create_time character varying(50) COMMENT '入库时间'
);

View File

@ -102,34 +102,28 @@ COMMENT ON COLUMN wvp_device_alarm.alarm_type IS '报警类型';
COMMENT ON COLUMN wvp_device_alarm.create_time IS '数据入库时间';
drop table IF EXISTS wvp_device_mobile_position;
create table IF NOT EXISTS wvp_device_mobile_position
drop table IF EXISTS wvp_mobile_position;
create table IF NOT EXISTS wvp_mobile_position
(
id serial primary key,
device_id character varying(50) not null,
channel_id character varying(50) not null,
device_name character varying(255),
time character varying(50),
timestamp int8 varying(50),
longitude double precision,
latitude double precision,
altitude double precision,
speed double precision,
direction double precision,
report_source character varying(50),
create_time character varying(50)
);
COMMENT ON TABLE wvp_device_mobile_position IS '存储移动位置订阅上报的数据';
COMMENT ON COLUMN wvp_device_mobile_position.id IS '主键ID';
COMMENT ON COLUMN wvp_device_mobile_position.device_id IS '设备ID';
COMMENT ON COLUMN wvp_device_mobile_position.channel_id IS '通道ID';
COMMENT ON COLUMN wvp_device_mobile_position.device_name IS '设备名称';
COMMENT ON COLUMN wvp_device_mobile_position.time IS '上报时间';
COMMENT ON COLUMN wvp_device_mobile_position.timestamp IS '上报时间';
COMMENT ON COLUMN wvp_device_mobile_position.longitude IS '经度';
COMMENT ON COLUMN wvp_device_mobile_position.latitude IS '纬度';
COMMENT ON COLUMN wvp_device_mobile_position.altitude IS '海拔';
COMMENT ON COLUMN wvp_device_mobile_position.speed IS '速度';
COMMENT ON COLUMN wvp_device_mobile_position.direction IS '方向角';
COMMENT ON COLUMN wvp_device_mobile_position.report_source IS '上报来源';
COMMENT ON COLUMN wvp_device_mobile_position.create_time IS '入库时间';

View File

@ -156,6 +156,51 @@ create table IF NOT EXISTS wvp_alarm (
);
/*
* 20260417 wvp_device_mobile_position从专属国标的位置记录表
*/
DELIMITER // -- 重定义分隔符避免分号冲突
CREATE PROCEDURE `wvp_20260417`()
BEGIN
IF NOT EXISTS (SELECT column_name FROM information_schema.columns
WHERE TABLE_SCHEMA = (SELECT DATABASE()) and table_name = 'wvp_device_mobile_position' and column_name = 'timestamp')
THEN
ALTER TABLE wvp_device_mobile_position ADD timestamp BIGINT COMMENT '上报时间';
END IF;
IF EXISTS (SELECT column_name FROM information_schema.columns
WHERE TABLE_SCHEMA = (SELECT DATABASE()) and table_name = 'wvp_device_mobile_position' and column_name = 'time')
THEN
UPDATE wvp_device_mobile_position SET timestamp = UNIX_TIMESTAMP(time) * 1000;
ALTER TABLE wvp_device_mobile_position DROP time;
END IF;
IF EXISTS (SELECT column_name FROM information_schema.columns
WHERE TABLE_SCHEMA = (SELECT DATABASE()) and table_name = 'wvp_device_mobile_position' and column_name = 'device_id')
THEN
ALTER TABLE wvp_device_mobile_position DROP device_id;
END IF;
IF EXISTS (SELECT column_name FROM information_schema.columns
WHERE TABLE_SCHEMA = (SELECT DATABASE()) and table_name = 'wvp_device_mobile_position' and column_name = 'device_name')
THEN
ALTER TABLE wvp_device_mobile_position DROP device_name;
END IF;
IF EXISTS (SELECT column_name FROM information_schema.columns
WHERE TABLE_SCHEMA = (SELECT DATABASE()) and table_name = 'wvp_device_mobile_position' and column_name = 'report_source')
THEN
ALTER TABLE wvp_device_mobile_position DROP report_source;
END IF;
-- 修改表名
IF EXISTS (SELECT table_name FROM information_schema.tables
WHERE TABLE_SCHEMA = (SELECT DATABASE()) and table_name = 'wvp_device_mobile_position')
THEN
ALTER TABLE wvp_device_mobile_position RENAME TO wvp_mobile_position;
END IF;
END; //
call wvp_20260417();
DROP PROCEDURE wvp_20260417;
DELIMITER ;

View File

@ -69,3 +69,18 @@ COMMENT ON COLUMN wvp_alarm.longitude IS '报警附带的经度';
COMMENT ON COLUMN wvp_alarm.latitude IS '报警附带的纬度';
COMMENT ON COLUMN wvp_alarm.alarm_type IS '报警类别';
COMMENT ON COLUMN wvp_alarm.alarm_time IS '报警时间';
/*
* 20260417 wvp_device_mobile_position从专属国标的位置记录表
*/
ALTER TABLE wvp_device_mobile_position ADD COLUMN IF NOT EXISTS timestamp int8;
UPDATE wvp_device_mobile_position SET timestamp = EXTRACT(EPOCH FROM time::timestamp) * 1000;
ALTER TABLE wvp_device_mobile_position DROP COLUMN IF EXISTS time;
ALTER TABLE wvp_device_mobile_position DROP COLUMN IF EXISTS device_id;
ALTER TABLE wvp_device_mobile_position DROP COLUMN IF EXISTS device_name;
ALTER TABLE wvp_device_mobile_position DROP COLUMN IF EXISTS report_source;
-- 修改表名
ALTER TABLE wvp_device_mobile_position RENAME TO wvp_mobile_position;
COMMENT ON COLUMN wvp_mobile_position.timestamp IS '上报时间';