mirror of
https://gitee.com/pan648540858/wvp-GB28181-pro.git
synced 2026-05-22 05:17:49 +08:00
重新定义移动位置事件处理流程
This commit is contained in:
parent
0893d45c20
commit
775376b327
@ -192,7 +192,7 @@ https://gitee.com/pan648540858/wvp-GB28181-pro.git
|
||||
## 付费社群
|
||||
<img src="doc/_media/shequ.png" width="50%" height="50%">
|
||||
|
||||
> 付费社群即可以对作者提供支持,也可以为大家更加快速的解决问题,对星球内容不满意,三天之内退出支持自动退款。如果暂时无法加入,给项目点个星也是极大的鼓励。
|
||||
> 付费社群即可以对作者提供支持,也可以为大家更加快速的解决问题,还为正式加入星球的用户提供了微信群。对星球内容不满意,三天之内退出支持自动退款。如果暂时无法加入,给项目点个星也是极大的鼓励。
|
||||
|
||||
[知识星球](https://t.zsxq.com/0d8VAD3Dm)专栏列表:
|
||||
- [WVP 部署安全加固指南:新手必看,防范攻击与漏洞](https://articles.zsxq.com/id_tv8wz4uubx2n.html)
|
||||
|
||||
88
src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceMobilePosition.java
Executable file
88
src/main/java/com/genersoft/iot/vmp/gb28181/bean/DeviceMobilePosition.java
Executable file
@ -0,0 +1,88 @@
|
||||
package com.genersoft.iot.vmp.gb28181.bean;
|
||||
|
||||
import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
|
||||
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
|
||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dom4j.Element;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
|
||||
|
||||
/**
|
||||
* 国标设备移动位置
|
||||
*/
|
||||
|
||||
@Slf4j
|
||||
@Getter
|
||||
@Setter
|
||||
public class DeviceMobilePosition extends MobilePosition{
|
||||
|
||||
/**
|
||||
* 通道数据库自增Id
|
||||
*/
|
||||
private String channelDeviceId;
|
||||
|
||||
|
||||
private Device device;
|
||||
|
||||
|
||||
public static List<DeviceMobilePosition> decode(Device device, Element rootElementAfterCharset) {
|
||||
|
||||
List<DeviceMobilePosition> mobilePositions = new ArrayList<>();
|
||||
|
||||
DeviceMobilePosition mobilePosition = new DeviceMobilePosition();
|
||||
mobilePosition.setCreateTime(DateUtil.getNow());
|
||||
mobilePosition.setDevice(device);
|
||||
|
||||
String channelId = getText(rootElementAfterCharset, "DeviceID");
|
||||
|
||||
mobilePosition.setChannelDeviceId(channelId);
|
||||
String time = getText(rootElementAfterCharset, "Time");
|
||||
if (ObjectUtils.isEmpty(time)){
|
||||
mobilePosition.setTimestamp(System.currentTimeMillis());
|
||||
}else {
|
||||
Long timestamp = SipUtils.parseTimeForTimestamp(time);
|
||||
if(timestamp == null) {
|
||||
log.warn("解析移动位置时间失败:{}, 使用当前时间", time);
|
||||
mobilePosition.setTimestamp(System.currentTimeMillis());
|
||||
}else {
|
||||
mobilePosition.setTimestamp(timestamp);
|
||||
}
|
||||
}
|
||||
mobilePosition.setLongitude(Double.parseDouble(getText(rootElementAfterCharset, "Longitude")));
|
||||
mobilePosition.setLatitude(Double.parseDouble(getText(rootElementAfterCharset, "Latitude")));
|
||||
if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Speed"))) {
|
||||
mobilePosition.setSpeed(Double.parseDouble(getText(rootElementAfterCharset, "Speed")));
|
||||
} else {
|
||||
mobilePosition.setSpeed(0.0);
|
||||
}
|
||||
if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Direction"))) {
|
||||
mobilePosition.setDirection(Double.parseDouble(getText(rootElementAfterCharset, "Direction")));
|
||||
} else {
|
||||
mobilePosition.setDirection(0.0);
|
||||
}
|
||||
if (NumericUtil.isDouble(getText(rootElementAfterCharset, "Altitude"))) {
|
||||
mobilePosition.setAltitude(Double.parseDouble(getText(rootElementAfterCharset, "Altitude")));
|
||||
} else {
|
||||
mobilePosition.setAltitude(0.0);
|
||||
}
|
||||
|
||||
mobilePositions.add(mobilePosition);
|
||||
|
||||
return mobilePositions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DeviceMobilePosition{" +
|
||||
"channelDeviceId='" + channelDeviceId + '\'' +
|
||||
", deviceId='" + device.getDeviceId() + '\'' +
|
||||
"} " + super.toString();
|
||||
}
|
||||
}
|
||||
@ -4,6 +4,7 @@ import com.genersoft.iot.vmp.gb28181.utils.NumericUtil;
|
||||
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
|
||||
import com.genersoft.iot.vmp.utils.DateUtil;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dom4j.Element;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
@ -18,15 +19,12 @@ import static com.genersoft.iot.vmp.gb28181.utils.XmlUtil.getText;
|
||||
* @date: 2021年1月23日
|
||||
*/
|
||||
|
||||
@Slf4j
|
||||
@Data
|
||||
public class MobilePosition {
|
||||
/**
|
||||
* 设备Id
|
||||
*/
|
||||
private String deviceId;
|
||||
|
||||
/**
|
||||
* 通道Id
|
||||
* 通道数据库自增Id
|
||||
*/
|
||||
private Integer channelId;
|
||||
|
||||
@ -35,15 +33,10 @@ public class MobilePosition {
|
||||
*/
|
||||
private String channelDeviceId;
|
||||
|
||||
/**
|
||||
* 设备名称
|
||||
*/
|
||||
private String deviceName;
|
||||
|
||||
/**
|
||||
* 通知时间
|
||||
*/
|
||||
private String time;
|
||||
private long timestamp;
|
||||
|
||||
/**
|
||||
* 经度
|
||||
@ -70,34 +63,32 @@ public class MobilePosition {
|
||||
*/
|
||||
private double direction;
|
||||
|
||||
/**
|
||||
* 位置信息上报来源(Mobile Position、GPS Alarm)
|
||||
*/
|
||||
private String reportSource;
|
||||
/**
|
||||
* 创建时间
|
||||
*/
|
||||
private String createTime;
|
||||
|
||||
public static List<MobilePosition> decode(String deviceName, String deviceId, Element rootElementAfterCharset) {
|
||||
public static List<MobilePosition> decode(Element rootElementAfterCharset) {
|
||||
|
||||
List<MobilePosition> mobilePositions = new ArrayList<>();
|
||||
|
||||
MobilePosition mobilePosition = new MobilePosition();
|
||||
mobilePosition.setCreateTime(DateUtil.getNow());
|
||||
if (!ObjectUtils.isEmpty(deviceName)) {
|
||||
mobilePosition.setDeviceName(deviceName);
|
||||
}
|
||||
mobilePosition.setDeviceId(deviceId);
|
||||
|
||||
String channelId = getText(rootElementAfterCharset, "DeviceID");
|
||||
|
||||
mobilePosition.setChannelDeviceId(channelId);
|
||||
String time = getText(rootElementAfterCharset, "Time");
|
||||
if (ObjectUtils.isEmpty(time)){
|
||||
mobilePosition.setTime(DateUtil.getNow());
|
||||
mobilePosition.setTimestamp(System.currentTimeMillis());
|
||||
}else {
|
||||
mobilePosition.setTime(SipUtils.parseTime(time));
|
||||
Long timestamp = SipUtils.parseTimeForTimestamp(time);
|
||||
if(timestamp == null) {
|
||||
log.warn("解析移动位置时间失败:{}, 使用当前时间", time);
|
||||
mobilePosition.setTimestamp(System.currentTimeMillis());
|
||||
}else {
|
||||
mobilePosition.setTimestamp(timestamp);
|
||||
}
|
||||
}
|
||||
mobilePosition.setLongitude(Double.parseDouble(getText(rootElementAfterCharset, "Longitude")));
|
||||
mobilePosition.setLatitude(Double.parseDouble(getText(rootElementAfterCharset, "Latitude")));
|
||||
@ -116,7 +107,6 @@ public class MobilePosition {
|
||||
} else {
|
||||
mobilePosition.setAltitude(0.0);
|
||||
}
|
||||
mobilePosition.setReportSource("Mobile Position");
|
||||
|
||||
mobilePositions.add(mobilePosition);
|
||||
|
||||
@ -126,17 +116,13 @@ public class MobilePosition {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "MobilePosition{" +
|
||||
"deviceId='" + deviceId + '\'' +
|
||||
", channelId=" + channelId +
|
||||
", channelDeviceId='" + channelDeviceId + '\'' +
|
||||
", deviceName='" + deviceName + '\'' +
|
||||
", time='" + time + '\'' +
|
||||
", longitude=" + longitude +
|
||||
", latitude=" + latitude +
|
||||
", altitude=" + altitude +
|
||||
", speed=" + speed +
|
||||
", direction=" + direction +
|
||||
", reportSource='" + reportSource + '\'' +
|
||||
", createTime='" + createTime + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
||||
@ -2,6 +2,7 @@ package com.genersoft.iot.vmp.gb28181.dao;
|
||||
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.DeviceMobilePosition;
|
||||
import com.genersoft.iot.vmp.gb28181.controller.bean.ChannelReduce;
|
||||
import com.genersoft.iot.vmp.gb28181.dao.provider.DeviceChannelProvider;
|
||||
import com.genersoft.iot.vmp.web.gb28181.dto.DeviceChannelExtend;
|
||||
@ -9,6 +10,7 @@ import org.apache.ibatis.annotations.*;
|
||||
import org.springframework.stereotype.Repository;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 用于存储设备通道信息
|
||||
@ -617,4 +619,63 @@ public interface DeviceChannelMapper {
|
||||
void offlineByDeviceIds(List<Device> deviceList);
|
||||
|
||||
|
||||
@Select(value = {" <script>" +
|
||||
" SELECT " +
|
||||
" CONCAT(data_device_id, '_', device_id) as deviceIdKey" +
|
||||
" id,\n" +
|
||||
" data_device_id,\n" +
|
||||
" create_time,\n" +
|
||||
" update_time,\n" +
|
||||
" sub_count,\n" +
|
||||
" stream_id,\n" +
|
||||
" has_audio,\n" +
|
||||
" gps_time,\n" +
|
||||
" stream_identification,\n" +
|
||||
" channel_type,\n" +
|
||||
" device_id,\n" +
|
||||
" name,\n" +
|
||||
" manufacturer,\n" +
|
||||
" model,\n" +
|
||||
" owner,\n" +
|
||||
" civil_code,\n" +
|
||||
" block,\n" +
|
||||
" address,\n" +
|
||||
" parental,\n" +
|
||||
" parent_id,\n" +
|
||||
" safety_way,\n" +
|
||||
" register_way,\n" +
|
||||
" cert_num,\n" +
|
||||
" certifiable,\n" +
|
||||
" err_code,\n" +
|
||||
" end_time,\n" +
|
||||
" secrecy,\n" +
|
||||
" ip_address,\n" +
|
||||
" port,\n" +
|
||||
" password,\n" +
|
||||
" status,\n" +
|
||||
" longitude,\n" +
|
||||
" latitude,\n" +
|
||||
" gb_longitude,\n" +
|
||||
" gb_latitude,\n" +
|
||||
" ptz_type,\n" +
|
||||
" position_type,\n" +
|
||||
" room_type,\n" +
|
||||
" use_type,\n" +
|
||||
" supply_light_type,\n" +
|
||||
" direction_type,\n" +
|
||||
" resolution,\n" +
|
||||
" business_group_id,\n" +
|
||||
" download_speed,\n" +
|
||||
" svc_space_support_mod,\n" +
|
||||
" svc_time_support_mode\n" +
|
||||
" from wvp_device_channel " +
|
||||
" where data_type = 1 " +
|
||||
" and data_device_id = #{deviceId} " +
|
||||
" and device_id in " +
|
||||
"<foreach item='item' index='index' collection='channelIds' open='(' separator=',' close=')'>" +
|
||||
" #{item} " +
|
||||
"</foreach>" +
|
||||
"</script>"})
|
||||
@MapKey("deviceIdKey")
|
||||
Map<String, DeviceChannel> getAllForMobilePosition(@Param("deviceId") int deviceId, List<DeviceMobilePosition> mobilePositionList);
|
||||
}
|
||||
|
||||
@ -10,14 +10,14 @@ import org.apache.ibatis.annotations.Select;
|
||||
import java.util.List;
|
||||
|
||||
@Mapper
|
||||
public interface DeviceMobilePositionMapper {
|
||||
public interface MobilePositionMapper {
|
||||
|
||||
@Insert("INSERT INTO wvp_device_mobile_position (device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source,create_time)"+
|
||||
@Insert("INSERT INTO wvp_mobile_position (device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source,create_time)"+
|
||||
"VALUES (#{deviceId}, #{channelId}, #{deviceName}, #{time}, #{longitude}, #{latitude}, #{altitude}, #{speed}, #{direction}, #{reportSource}, #{createTime})")
|
||||
int insertNewPosition(MobilePosition mobilePosition);
|
||||
|
||||
@Select(value = {" <script>" +
|
||||
"SELECT * FROM wvp_device_mobile_position" +
|
||||
"SELECT * FROM wvp_mobile_position" +
|
||||
" WHERE device_id = #{deviceId}" +
|
||||
"<if test=\"channelId != null\"> and channel_id = #{channelId}</if>" +
|
||||
"<if test=\"startTime != null\"> AND time>=#{startTime}</if>" +
|
||||
@ -26,16 +26,16 @@ public interface DeviceMobilePositionMapper {
|
||||
" </script>"})
|
||||
List<MobilePosition> queryPositionByDeviceIdAndTime(@Param("deviceId") String deviceId, @Param("channelId") String channelId, @Param("startTime") String startTime, @Param("endTime") String endTime);
|
||||
|
||||
@Select("SELECT * FROM wvp_device_mobile_position WHERE device_id = #{deviceId}" +
|
||||
@Select("SELECT * FROM wvp_mobile_position WHERE device_id = #{deviceId}" +
|
||||
" ORDER BY time DESC LIMIT 1")
|
||||
MobilePosition queryLatestPositionByDevice(String deviceId);
|
||||
|
||||
@Delete("DELETE FROM wvp_device_mobile_position WHERE device_id = #{deviceId}")
|
||||
@Delete("DELETE FROM wvp_mobile_position WHERE device_id = #{deviceId}")
|
||||
int clearMobilePositionsByDeviceId(String deviceId);
|
||||
|
||||
@Insert("<script> " +
|
||||
"<foreach collection='mobilePositions' index='index' item='item' separator=';'> " +
|
||||
"insert into wvp_device_mobile_position " +
|
||||
"insert into wvp_mobile_position " +
|
||||
"(device_id,channel_id, device_name,time,longitude,latitude,altitude,speed,direction,report_source," +
|
||||
"create_time)"+
|
||||
"values " +
|
||||
@ -46,4 +46,6 @@ public interface DeviceMobilePositionMapper {
|
||||
"</script>")
|
||||
void batchadd(List<MobilePosition> mobilePositions);
|
||||
|
||||
|
||||
void insertMobilePositions(List<MobilePosition> batchList);
|
||||
}
|
||||
@ -1,7 +1,10 @@
|
||||
package com.genersoft.iot.vmp.gb28181.event;
|
||||
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.*;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarmNotify;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Platform;
|
||||
import com.genersoft.iot.vmp.gb28181.event.alarm.DeviceAlarmEvent;
|
||||
import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent;
|
||||
import com.genersoft.iot.vmp.gb28181.event.device.DeviceOfflineEvent;
|
||||
@ -116,7 +119,7 @@ public class EventPublisher {
|
||||
applicationEventPublisher.publishEvent(event);
|
||||
}
|
||||
|
||||
public void mobilePositionListEventPublish(List<MobilePosition> mobilePositionList) {
|
||||
public void mobilePositionsEventPublish(List<? extends MobilePosition> mobilePositionList) {
|
||||
MobilePositionEvent event = new MobilePositionEvent(this);
|
||||
event.setMobilePositionList(mobilePositionList);
|
||||
applicationEventPublisher.publishEvent(event);
|
||||
|
||||
@ -19,5 +19,5 @@ public class MobilePositionEvent extends ApplicationEvent {
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
private List<MobilePosition> mobilePositionList;
|
||||
private List<? extends MobilePosition> mobilePositionList;
|
||||
}
|
||||
|
||||
@ -9,7 +9,9 @@ import com.github.pagehelper.PageInfo;
|
||||
import jakarta.validation.constraints.NotNull;
|
||||
import org.dom4j.Element;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* 国标通道业务类
|
||||
@ -96,4 +98,7 @@ public interface IDeviceChannelService {
|
||||
|
||||
void queryRecordInfo(CommonGBChannel channel, String startTime, String endTime, ErrorCallback<RecordInfo> object);
|
||||
|
||||
Map<String, DeviceChannel> getAllForMobilePosition(List<DeviceMobilePosition> mobilePositionList);
|
||||
|
||||
void asyncBatchChannelPosition(Collection<DeviceChannel> channels);
|
||||
}
|
||||
|
||||
@ -0,0 +1,4 @@
|
||||
package com.genersoft.iot.vmp.gb28181.service;
|
||||
|
||||
public interface IMobilePositionService {
|
||||
}
|
||||
@ -1,5 +1,9 @@
|
||||
package com.genersoft.iot.vmp.gb28181.service;
|
||||
|
||||
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* 资源能力接入-其他
|
||||
*/
|
||||
@ -7,4 +11,7 @@ public interface ISourceOtherService {
|
||||
|
||||
|
||||
Boolean closeStreamOnNoneReader(String mediaServerId, String app, String stream, String schema);
|
||||
|
||||
Boolean addChannelIdForMobilePosition(List<? extends MobilePosition> mobilePositionList);
|
||||
|
||||
}
|
||||
|
||||
@ -9,7 +9,7 @@ import com.genersoft.iot.vmp.conf.exception.ControllerException;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.*;
|
||||
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
|
||||
import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
|
||||
import com.genersoft.iot.vmp.gb28181.dao.DeviceMobilePositionMapper;
|
||||
import com.genersoft.iot.vmp.gb28181.dao.MobilePositionMapper;
|
||||
import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper;
|
||||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
||||
import com.genersoft.iot.vmp.gb28181.event.record.RecordInfoEndEvent;
|
||||
@ -19,7 +19,6 @@ import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
|
||||
import com.genersoft.iot.vmp.gb28181.service.IPlatformChannelService;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommander;
|
||||
import com.genersoft.iot.vmp.gb28181.utils.SipUtils;
|
||||
import com.genersoft.iot.vmp.service.bean.Alarm;
|
||||
import com.genersoft.iot.vmp.service.bean.ErrorCallback;
|
||||
import com.genersoft.iot.vmp.service.redisMsg.IRedisRpcPlayService;
|
||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||
@ -76,7 +75,7 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
|
||||
private DeviceMapper deviceMapper;
|
||||
|
||||
@Autowired
|
||||
private DeviceMobilePositionMapper deviceMobilePositionMapper;
|
||||
private MobilePositionMapper deviceMobilePositionMapper;
|
||||
|
||||
@Autowired
|
||||
private UserSetting userSetting;
|
||||
@ -659,4 +658,25 @@ public class DeviceChannelServiceImpl implements IDeviceChannelService {
|
||||
queryRecordInfo(device, deviceChannel, startTime, endTime, callback);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, DeviceChannel> getAllForMobilePosition(List<DeviceMobilePosition> mobilePositionList) {
|
||||
return channelMapper.getAllForMobilePosition(mobilePositionList.get(0).getDevice().getId(), mobilePositionList);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Async
|
||||
@Transactional
|
||||
public void asyncBatchChannelPosition(Collection<DeviceChannel> channels) {
|
||||
// 批量更新通道位置信息
|
||||
int limitCount = 500;
|
||||
List<DeviceChannel> channelList = new ArrayList<>(channels);
|
||||
if (!channelList.isEmpty()) {
|
||||
for (int i = 0; i < channelList.size(); i += limitCount) {
|
||||
int end = Math.min(i + limitCount, channelList.size());
|
||||
List<DeviceChannel> batchList = channelList.subList(i, end);
|
||||
channelMapper.batchUpdatePosition(batchList);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -211,12 +211,10 @@ public class GbChannelServiceImpl implements IGbChannelService, CommandLineRunne
|
||||
if (newChannel.getGbLongitude() != null && !Objects.equals(oldChannel.getGbLongitude(), newChannel.getGbLongitude())
|
||||
&& newChannel.getGbLatitude() != null && !Objects.equals(oldChannel.getGbLatitude(), newChannel.getGbLatitude())) {
|
||||
MobilePosition mobilePosition = new MobilePosition();
|
||||
mobilePosition.setDeviceId(newChannel.getGbDeviceId());
|
||||
mobilePosition.setChannelId(newChannel.getGbId());
|
||||
mobilePosition.setChannelDeviceId(newChannel.getGbDeviceId());
|
||||
mobilePosition.setDeviceName(newChannel.getGbName());
|
||||
mobilePosition.setCreateTime(DateUtil.getNow());
|
||||
mobilePosition.setTime(DateUtil.getNow());
|
||||
mobilePosition.setTimestamp(System.currentTimeMillis());
|
||||
mobilePosition.setLongitude(newChannel.getGbLongitude());
|
||||
mobilePosition.setLatitude(newChannel.getGbLatitude());
|
||||
eventPublisher.mobilePositionEventPublish(mobilePosition);
|
||||
|
||||
@ -0,0 +1,84 @@
|
||||
package com.genersoft.iot.vmp.gb28181.service.impl;
|
||||
|
||||
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
|
||||
import com.genersoft.iot.vmp.gb28181.dao.MobilePositionMapper;
|
||||
import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent;
|
||||
import com.genersoft.iot.vmp.gb28181.service.IMobilePositionService;
|
||||
import com.genersoft.iot.vmp.gb28181.service.ISourceOtherService;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
|
||||
@Slf4j
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class MobilePositionServiceImpl implements IMobilePositionService {
|
||||
|
||||
|
||||
private final ConcurrentLinkedQueue<MobilePosition> mobilePositionQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
private final Map<String, ISourceOtherService> sourceOtherServiceMap;
|
||||
|
||||
private final MobilePositionMapper mobilePositionMapper;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
|
||||
}
|
||||
|
||||
@Async
|
||||
@EventListener
|
||||
public void onApplicationEvent(MobilePositionEvent event) {
|
||||
if (event.getMobilePositionList() == null || event.getMobilePositionList().isEmpty()) {
|
||||
return;
|
||||
}
|
||||
for (ISourceOtherService sourceOtherService : sourceOtherServiceMap.values()) {
|
||||
try {
|
||||
Boolean addResult = sourceOtherService.addChannelIdForMobilePosition(event.getMobilePositionList());
|
||||
if (addResult != null && addResult) {
|
||||
mobilePositionQueue.addAll(event.getMobilePositionList());
|
||||
}
|
||||
}catch (Exception e) {
|
||||
log.error("[移动位置事件] 处理移动位置事件失败", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelay = 500)
|
||||
public void executeMobilePositionQueue() {
|
||||
if (mobilePositionQueue.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
List<MobilePosition> handlerCatchDataList = new ArrayList<>();
|
||||
int size = mobilePositionQueue.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
MobilePosition poll = mobilePositionQueue.poll();
|
||||
if (poll != null) {
|
||||
handlerCatchDataList.add(poll);
|
||||
}
|
||||
}
|
||||
if (handlerCatchDataList.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
// TODO 发送通知,方便国标级联转发给上级
|
||||
|
||||
|
||||
// 批量保存到数据库
|
||||
int batchSize = 1000;
|
||||
for (int i = 0; i < handlerCatchDataList.size(); i += batchSize) {
|
||||
int end = Math.min(i + batchSize, handlerCatchDataList.size());
|
||||
List<MobilePosition> batchList = handlerCatchDataList.subList(i, end);
|
||||
mobilePositionMapper.insertMobilePositions(batchList);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -6,13 +6,22 @@ import com.genersoft.iot.vmp.common.InviteSessionType;
|
||||
import com.genersoft.iot.vmp.common.enums.ChannelDataType;
|
||||
import com.genersoft.iot.vmp.common.enums.MediaStreamUtil;
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.DeviceChannel;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.DeviceMobilePosition;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
|
||||
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
|
||||
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
|
||||
import com.genersoft.iot.vmp.gb28181.service.ISourceOtherService;
|
||||
import com.genersoft.iot.vmp.utils.Coordtransform;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Slf4j
|
||||
@Service(ChannelDataType.OTHER_SERVICE + ChannelDataType.GB28181)
|
||||
@ -52,4 +61,46 @@ public class SourceOtherServiceForGbImpl implements ISourceOtherService {
|
||||
}
|
||||
return userSetting.getStreamOnDemand();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Boolean addChannelIdForMobilePosition(List<? extends MobilePosition> mobilePositionList) {
|
||||
if (CollectionUtils.isEmpty(mobilePositionList)) {
|
||||
return false;
|
||||
}
|
||||
if (!(mobilePositionList.get(0) instanceof DeviceMobilePosition)) {
|
||||
return null;
|
||||
}
|
||||
List<DeviceMobilePosition> deviceMobilePositionList = mobilePositionList.stream()
|
||||
.map(DeviceMobilePosition.class::cast)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
Map<String, DeviceChannel> deviceChannelMap = deviceChannelService.getAllForMobilePosition(deviceMobilePositionList);
|
||||
if (CollectionUtils.isEmpty(deviceChannelMap)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// 查询通道表,为mobilePositionList赋值channelId
|
||||
for (DeviceMobilePosition position : deviceMobilePositionList) {
|
||||
if (position.getDevice() == null) {
|
||||
continue;
|
||||
}
|
||||
String key = position.getDevice().getId() + "_" + position.getChannelDeviceId();
|
||||
DeviceChannel deviceChannel = deviceChannelMap.get(key);
|
||||
Device device = position.getDevice();
|
||||
if (deviceChannel != null) {
|
||||
position.setChannelId(deviceChannel.getId());
|
||||
|
||||
if (device.getGeoCoordSys().equalsIgnoreCase("GCJ02")) {
|
||||
Double[] wgs84Position = Coordtransform.GCJ02ToWGS84(position.getLongitude(), position.getLatitude());
|
||||
position.setLongitude(wgs84Position[0]);
|
||||
position.setLatitude(wgs84Position[1]);
|
||||
}
|
||||
deviceChannel.setLongitude(position.getLongitude());
|
||||
deviceChannel.setLatitude(position.getLatitude());
|
||||
}
|
||||
}
|
||||
// 批量更新通道
|
||||
deviceChannelService.asyncBatchChannelPosition(deviceChannelMap.values());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -109,16 +109,14 @@ public class NotifyRequestForAlarm extends SIPRequestProcessorParent {
|
||||
mobilePosition.setChannelId(deviceChannel.getId());
|
||||
mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId());
|
||||
mobilePosition.setCreateTime(DateUtil.getNow());
|
||||
mobilePosition.setDeviceId(deviceAlarmNotify.getDeviceId());
|
||||
mobilePosition.setTime(deviceAlarmNotify.getAlarmTime());
|
||||
mobilePosition.setTimestamp(DateUtil.yyyy_MM_dd_HH_mm_ssToTimestampMs(deviceAlarmNotify.getAlarmTime()));
|
||||
mobilePosition.setLongitude(deviceAlarmNotify.getLongitude());
|
||||
mobilePosition.setLatitude(deviceAlarmNotify.getLatitude());
|
||||
mobilePosition.setReportSource("GPS Alarm");
|
||||
|
||||
// 更新device channel 的经纬度
|
||||
deviceChannel.setLongitude(mobilePosition.getLongitude());
|
||||
deviceChannel.setLatitude(mobilePosition.getLatitude());
|
||||
deviceChannel.setGpsTime(mobilePosition.getTime());
|
||||
deviceChannel.setGpsTime(deviceAlarmNotify.getAlarmTime());
|
||||
|
||||
deviceChannelService.updateChannelGPS(device, deviceChannel, mobilePosition);
|
||||
}
|
||||
|
||||
@ -95,8 +95,6 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
|
||||
continue;
|
||||
}
|
||||
MobilePosition mobilePosition = new MobilePosition();
|
||||
mobilePosition.setDeviceId(device.getDeviceId());
|
||||
mobilePosition.setDeviceName(device.getName());
|
||||
mobilePosition.setCreateTime(DateUtil.getNow());
|
||||
|
||||
DeviceChannel deviceChannel = null;
|
||||
@ -116,10 +114,16 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
|
||||
break;
|
||||
case "Time":
|
||||
String timeVal = element.getStringValue();
|
||||
if (ObjectUtils.isEmpty(timeVal)) {
|
||||
mobilePosition.setTime(DateUtil.getNow());
|
||||
} else {
|
||||
mobilePosition.setTime(SipUtils.parseTime(timeVal));
|
||||
if (ObjectUtils.isEmpty(timeVal)){
|
||||
mobilePosition.setTimestamp(System.currentTimeMillis());
|
||||
}else {
|
||||
Long timestamp = SipUtils.parseTimeForTimestamp(time);
|
||||
if(timestamp == null) {
|
||||
log.warn("解析移动位置时间失败:{}, 使用当前时间", time);
|
||||
mobilePosition.setTimestamp(System.currentTimeMillis());
|
||||
}else {
|
||||
mobilePosition.setTimestamp(timestamp);
|
||||
}
|
||||
}
|
||||
break;
|
||||
case "Longitude":
|
||||
@ -161,7 +165,6 @@ public class NotifyRequestForMobilePositionProcessor extends SIPRequestProcessor
|
||||
|
||||
log.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelDeviceId(),
|
||||
mobilePosition.getLongitude(), mobilePosition.getLatitude(), System.currentTimeMillis() - startTime);
|
||||
mobilePosition.setReportSource("Mobile Position");
|
||||
|
||||
mobilePositionService.add(mobilePosition);
|
||||
// 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
|
||||
|
||||
@ -122,7 +122,6 @@ public class AlarmNotifyMessageHandler extends SIPRequestProcessorParent impleme
|
||||
mobilePosition.setTime(deviceAlarmNotify.getAlarmTime());
|
||||
mobilePosition.setLongitude(deviceAlarmNotify.getLongitude());
|
||||
mobilePosition.setLatitude(deviceAlarmNotify.getLatitude());
|
||||
mobilePosition.setReportSource("GPS Alarm");
|
||||
|
||||
// 更新device channel 的经纬度
|
||||
deviceChannel.setLongitude(mobilePosition.getLongitude());
|
||||
|
||||
@ -3,18 +3,14 @@ package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify
|
||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.*;
|
||||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
||||
import com.genersoft.iot.vmp.gb28181.service.IDeviceChannelService;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
|
||||
import com.genersoft.iot.vmp.service.IMobilePositionService;
|
||||
import gov.nist.javax.sip.message.SIPRequest;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dom4j.DocumentException;
|
||||
import org.dom4j.Element;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.core.task.TaskExecutor;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
@ -36,18 +32,10 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
@RequiredArgsConstructor
|
||||
public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParent implements InitializingBean, IMessageHandler {
|
||||
|
||||
private final String cmdType = "MobilePosition";
|
||||
|
||||
private final NotifyMessageHandler notifyMessageHandler;
|
||||
|
||||
private final IMobilePositionService mobilePositionService;
|
||||
|
||||
private final IDeviceChannelService deviceChannelService;
|
||||
|
||||
private final ConcurrentLinkedQueue<HandlerCatchData> taskQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
private final TaskExecutor taskExecutor;
|
||||
|
||||
private final EventPublisher eventPublisher;
|
||||
|
||||
private final UserSetting userSetting;
|
||||
@ -55,6 +43,7 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
String cmdType = "MobilePosition";
|
||||
notifyMessageHandler.addHandler(cmdType, this);
|
||||
}
|
||||
|
||||
@ -80,7 +69,6 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
|
||||
if (taskQueue.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<HandlerCatchData> handlerCatchDataList = new ArrayList<>();
|
||||
int size = taskQueue.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
@ -92,7 +80,7 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
|
||||
if (handlerCatchDataList.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
List<MobilePosition> mobilePositionList = new ArrayList<>();
|
||||
List<DeviceMobilePosition> mobilePositionList = new ArrayList<>();
|
||||
for (HandlerCatchData take : handlerCatchDataList) {
|
||||
if (take == null) {
|
||||
continue;
|
||||
@ -102,16 +90,16 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
|
||||
Element rootElementAfterCharset = getRootElement(take.getEvt(), device.getCharset());
|
||||
if (rootElementAfterCharset == null) {
|
||||
log.warn("[移动位置通知] {}处理失败,未识别到信息体", device.getDeviceId());
|
||||
List<MobilePosition> mobilePositions = MobilePosition.decode(device.getName(), device.getDeviceId(), rootElementAfterCharset);
|
||||
for (MobilePosition mobilePosition : mobilePositions) {
|
||||
try {
|
||||
mobilePosition.setReportSource("Mobile Position");
|
||||
log.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", mobilePosition.getDeviceId(), mobilePosition.getChannelDeviceId(),
|
||||
mobilePosition.getLongitude(), mobilePosition.getLatitude(), mobilePosition.getTime());
|
||||
mobilePositionList.add(mobilePosition);
|
||||
}catch (Exception e) {
|
||||
log.error("未处理的异常 ", e);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
List<DeviceMobilePosition> mobilePositions = DeviceMobilePosition.decode(device, rootElementAfterCharset);
|
||||
for (DeviceMobilePosition mobilePosition : mobilePositions) {
|
||||
try {
|
||||
log.info("[收到移动位置订阅通知]:{}/{}->{}.{}, 时间: {}", device.getDeviceId(), mobilePosition.getChannelDeviceId(),
|
||||
mobilePosition.getLongitude(), mobilePosition.getLatitude(), mobilePosition.getTimestamp());
|
||||
mobilePositionList.add(mobilePosition);
|
||||
}catch (Exception e) {
|
||||
log.error("未处理的异常 ", e);
|
||||
}
|
||||
}
|
||||
}catch (Exception e) {
|
||||
@ -122,7 +110,7 @@ public class MobilePositionNotifyMessageHandler extends SIPRequestProcessorParen
|
||||
// 向关联了该通道并且开启移动位置订阅的上级平台发送移动位置订阅消息
|
||||
if (!mobilePositionList.isEmpty()) {
|
||||
try {
|
||||
eventPublisher.mobilePositionListEventPublish(mobilePositionList);
|
||||
eventPublisher.mobilePositionsEventPublish(mobilePositionList);
|
||||
}catch (Exception e) {
|
||||
log.error("[MobilePositionEvent] 发送失败: ", e);
|
||||
}
|
||||
|
||||
@ -79,7 +79,6 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar
|
||||
if (!ObjectUtils.isEmpty(device.getName())) {
|
||||
mobilePosition.setDeviceName(device.getName());
|
||||
}
|
||||
mobilePosition.setDeviceId(device.getDeviceId());
|
||||
mobilePosition.setChannelId(deviceChannel.getId());
|
||||
mobilePosition.setChannelDeviceId(deviceChannel.getDeviceId());
|
||||
//兼容ISO 8601格式时间
|
||||
@ -106,7 +105,6 @@ public class MobilePositionResponseMessageHandler extends SIPRequestProcessorPar
|
||||
} else {
|
||||
mobilePosition.setAltitude(0.0);
|
||||
}
|
||||
mobilePosition.setReportSource("Mobile Position");
|
||||
|
||||
// 更新device channel 的经纬度
|
||||
deviceChannel.setLongitude(mobilePosition.getLongitude());
|
||||
|
||||
@ -23,6 +23,7 @@ import javax.sip.header.UserAgentHeader;
|
||||
import javax.sip.message.Request;
|
||||
import java.text.ParseException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeParseException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@ -264,4 +265,24 @@ public class SipUtils {
|
||||
}
|
||||
return localDateTime.format(DateUtil.formatter);
|
||||
}
|
||||
|
||||
public static Long parseTimeForTimestamp(String timeStr) {
|
||||
if (ObjectUtils.isEmpty(timeStr)){
|
||||
return null;
|
||||
}
|
||||
LocalDateTime localDateTime;
|
||||
try {
|
||||
localDateTime = LocalDateTime.parse(timeStr);
|
||||
}catch (DateTimeParseException e) {
|
||||
try {
|
||||
localDateTime = LocalDateTime.parse(timeStr, DateUtil.formatterISO8601);
|
||||
}catch (DateTimeParseException e2) {
|
||||
log.error("[格式化时间] 无法格式化时间: {}", timeStr);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
// 返回毫秒值
|
||||
return localDateTime.atZone(ZoneId.of(DateUtil.zoneStr)).toInstant().toEpochMilli();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -7,7 +7,7 @@ import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Platform;
|
||||
import com.genersoft.iot.vmp.gb28181.dao.DeviceChannelMapper;
|
||||
import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
|
||||
import com.genersoft.iot.vmp.gb28181.dao.DeviceMobilePositionMapper;
|
||||
import com.genersoft.iot.vmp.gb28181.dao.MobilePositionMapper;
|
||||
import com.genersoft.iot.vmp.gb28181.dao.PlatformMapper;
|
||||
import com.genersoft.iot.vmp.gb28181.utils.Coordtransform;
|
||||
import com.genersoft.iot.vmp.service.IMobilePositionService;
|
||||
@ -35,7 +35,7 @@ public class MobilePositionServiceImpl implements IMobilePositionService {
|
||||
private DeviceChannelMapper channelMapper;
|
||||
|
||||
@Autowired
|
||||
private DeviceMobilePositionMapper mobilePositionMapper;
|
||||
private MobilePositionMapper mobilePositionMapper;
|
||||
|
||||
@Autowired
|
||||
private UserSetting userSetting;
|
||||
|
||||
@ -70,7 +70,7 @@ public class DateUtil {
|
||||
public static final DateTimeFormatter formatter1078 = DateTimeFormatter.ofPattern(PATTERN1078, Locale.getDefault()).withZone(ZoneId.of(zoneStr));
|
||||
public static final DateTimeFormatter formatter1078date = DateTimeFormatter.ofPattern(PATTERN1078Date, Locale.getDefault()).withZone(ZoneId.of(zoneStr));
|
||||
|
||||
public static String yyyy_MM_dd_HH_mm_ssToISO8601(@NotNull String formatTime) {
|
||||
public static String yyyy_MM_dd_HH_mm_ssToISO8601(@NotNull String formatTime) {
|
||||
return formatterISO8601.format(formatter.parse(formatTime));
|
||||
}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user