mirror of
https://gitee.com/pan648540858/wvp-GB28181-pro.git
synced 2026-05-27 15:37:50 +08:00
Compare commits
No commits in common. "7068898c7be59856e3ce9cf8f4db5def9d0041a1" and "5373e6082ca61158477c91d78f7208399f51c0b6" have entirely different histories.
7068898c7b
...
5373e6082c
@ -1,10 +1,12 @@
|
|||||||
package com.genersoft.iot.vmp.gb28181.event;
|
package com.genersoft.iot.vmp.gb28181.event;
|
||||||
|
|
||||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.*;
|
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.bean.DeviceAlarm;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.bean.MobilePosition;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.bean.Platform;
|
||||||
import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEvent;
|
import com.genersoft.iot.vmp.gb28181.event.alarm.AlarmEvent;
|
||||||
import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent;
|
import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent;
|
||||||
import com.genersoft.iot.vmp.gb28181.event.device.DeviceOfflineEvent;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
|
import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
|
||||||
import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent;
|
import com.genersoft.iot.vmp.gb28181.event.subscribe.mobilePosition.MobilePositionEvent;
|
||||||
import com.genersoft.iot.vmp.media.bean.MediaServer;
|
import com.genersoft.iot.vmp.media.bean.MediaServer;
|
||||||
@ -118,9 +120,4 @@ public class EventPublisher {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void deviceOfflineEventPublish(String deviceId) {
|
|
||||||
DeviceOfflineEvent event = new DeviceOfflineEvent(this);
|
|
||||||
event.setDeviceId(deviceId);
|
|
||||||
applicationEventPublisher.publishEvent(event);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,22 +0,0 @@
|
|||||||
package com.genersoft.iot.vmp.gb28181.event.device;
|
|
||||||
|
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
|
||||||
import lombok.Getter;
|
|
||||||
import lombok.Setter;
|
|
||||||
import org.springframework.context.ApplicationEvent;
|
|
||||||
|
|
||||||
import java.io.Serial;
|
|
||||||
|
|
||||||
@Getter
|
|
||||||
@Setter
|
|
||||||
public class DeviceOfflineEvent extends ApplicationEvent {
|
|
||||||
|
|
||||||
private String deviceId;
|
|
||||||
|
|
||||||
@Serial
|
|
||||||
private static final long serialVersionUID = 1L;
|
|
||||||
|
|
||||||
public DeviceOfflineEvent(Object source) {
|
|
||||||
super(source);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -1,22 +0,0 @@
|
|||||||
package com.genersoft.iot.vmp.gb28181.event.device;
|
|
||||||
|
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
|
||||||
import lombok.Getter;
|
|
||||||
import lombok.Setter;
|
|
||||||
import org.springframework.context.ApplicationEvent;
|
|
||||||
|
|
||||||
import java.io.Serial;
|
|
||||||
|
|
||||||
@Getter
|
|
||||||
@Setter
|
|
||||||
public class DeviceOnlineEvent extends ApplicationEvent {
|
|
||||||
|
|
||||||
private Device device;
|
|
||||||
|
|
||||||
@Serial
|
|
||||||
private static final long serialVersionUID = 1L;
|
|
||||||
|
|
||||||
public DeviceOnlineEvent(Object source) {
|
|
||||||
super(source);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@ -12,11 +12,12 @@ import com.genersoft.iot.vmp.gb28181.dao.DeviceMapper;
|
|||||||
import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper;
|
import com.genersoft.iot.vmp.gb28181.dao.PlatformChannelMapper;
|
||||||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
||||||
import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent;
|
import com.genersoft.iot.vmp.gb28181.event.channel.ChannelEvent;
|
||||||
import com.genersoft.iot.vmp.gb28181.event.device.DeviceOfflineEvent;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
|
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
|
||||||
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
|
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
|
||||||
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
|
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
|
||||||
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
|
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTask;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskInfo;
|
||||||
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskRunner;
|
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskRunner;
|
||||||
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask;
|
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask;
|
||||||
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskInfo;
|
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskInfo;
|
||||||
@ -42,9 +43,7 @@ import jakarta.validation.constraints.NotNull;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.boot.CommandLineRunner;
|
import org.springframework.boot.CommandLineRunner;
|
||||||
import org.springframework.context.event.EventListener;
|
|
||||||
import org.springframework.core.annotation.Order;
|
import org.springframework.core.annotation.Order;
|
||||||
import org.springframework.scheduling.annotation.Async;
|
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
@ -128,11 +127,10 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
@Override
|
@Override
|
||||||
public void run(String... args) throws Exception {
|
public void run(String... args) throws Exception {
|
||||||
|
|
||||||
// 清理数据库不存在但是 redis 中存在的数据
|
// 清理数据库不存在但是redis中存在的数据
|
||||||
List<Device> devicesInDb = getAll();
|
List<Device> devicesInDb = getAll();
|
||||||
if (devicesInDb.isEmpty()) {
|
if (devicesInDb.isEmpty()) {
|
||||||
redisCatchStorage.removeAllDevice();
|
redisCatchStorage.removeAllDevice();
|
||||||
deviceStatusTaskRunner.clear();
|
|
||||||
}else {
|
}else {
|
||||||
List<Device> devicesInRedis = redisCatchStorage.getAllDevices();
|
List<Device> devicesInRedis = redisCatchStorage.getAllDevices();
|
||||||
if (!devicesInRedis.isEmpty()) {
|
if (!devicesInRedis.isEmpty()) {
|
||||||
@ -149,17 +147,30 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 重置 cseq 计数
|
// 重置cseq计数
|
||||||
redisCatchStorage.resetAllCSEQ();
|
redisCatchStorage.resetAllCSEQ();
|
||||||
// 处理设备状态
|
// 处理设备状态
|
||||||
Set<String> allDeviceIds = deviceStatusTaskRunner.getAll();
|
List<DeviceStatusTaskInfo> allTaskInfo = deviceStatusTaskRunner.getAllTaskInfo();
|
||||||
if (!allDeviceIds.isEmpty()) {
|
List<String> onlineDeviceIds = new ArrayList<>();
|
||||||
|
if (!allTaskInfo.isEmpty()) {
|
||||||
|
for (DeviceStatusTaskInfo taskInfo : allTaskInfo) {
|
||||||
|
Device device = getDeviceByDeviceId(taskInfo.getDeviceId());
|
||||||
|
if (device == null) {
|
||||||
|
deviceStatusTaskRunner.removeTask(taskInfo.getDeviceId());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
// 恢复定时任务, TCP因为连接已经断开必须等待设备重新连接
|
||||||
|
DeviceStatusTask deviceStatusTask = DeviceStatusTask.getInstance(taskInfo.getDeviceId(),
|
||||||
|
taskInfo.getTransactionInfo(), taskInfo.getExpireTime() + 1000 + System.currentTimeMillis(), this::deviceStatusExpire);
|
||||||
|
deviceStatusTaskRunner.addTask(deviceStatusTask);
|
||||||
|
onlineDeviceIds.add(taskInfo.getDeviceId());
|
||||||
|
}
|
||||||
// 除了记录的设备以外, 其他设备全部离线
|
// 除了记录的设备以外, 其他设备全部离线
|
||||||
List<Device> onlineDevice = getAllOnlineDevice(userSetting.getServerId());
|
List<Device> onlineDevice = getAllOnlineDevice(userSetting.getServerId());
|
||||||
if (!onlineDevice.isEmpty()) {
|
if (!onlineDevice.isEmpty()) {
|
||||||
List<Device> offlineDevices = new ArrayList<>();
|
List<Device> offlineDevices = new ArrayList<>();
|
||||||
for (Device device : onlineDevice) {
|
for (Device device : onlineDevice) {
|
||||||
if (!allDeviceIds.contains(device.getDeviceId())) {
|
if (!onlineDeviceIds.contains(device.getDeviceId())) {
|
||||||
// 此设备需要离线
|
// 此设备需要离线
|
||||||
device.setOnLine(false);
|
device.setOnLine(false);
|
||||||
// 清理离线设备的相关缓存
|
// 清理离线设备的相关缓存
|
||||||
@ -192,7 +203,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
Device device = getDeviceByDeviceId(taskInfo.getDeviceId());
|
Device device = getDeviceByDeviceId(taskInfo.getDeviceId());
|
||||||
if (device == null || !device.isOnLine() || !allDeviceIds.contains(taskInfo.getDeviceId())) {
|
if (device == null || !device.isOnLine() || !onlineDeviceIds.contains(taskInfo.getDeviceId())) {
|
||||||
subscribeTaskRunner.removeSubscribe(taskInfo.getKey());
|
subscribeTaskRunner.removeSubscribe(taskInfo.getKey());
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -233,7 +244,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
for (Device device : offlineDevices) {
|
for (Device device : offlineDevices) {
|
||||||
device.setOnLine(false);
|
device.setOnLine(false);
|
||||||
redisCatchStorage.updateDevice(device);
|
redisCatchStorage.updateDevice(device);
|
||||||
deviceStatusTaskRunner.removeTask(device.getDeviceId());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -244,8 +254,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
|
if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
|
||||||
subscribeTaskRunner.removeSubscribe(SubscribeTaskForMobilPosition.getKey(device));
|
subscribeTaskRunner.removeSubscribe(SubscribeTaskForMobilPosition.getKey(device));
|
||||||
}
|
}
|
||||||
deviceStatusTaskRunner.removeTask(device.getDeviceId());
|
// 离线释放所有ssrc
|
||||||
// 离线释放所有 ssrc
|
|
||||||
List<SsrcTransaction> ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(device.getDeviceId());
|
List<SsrcTransaction> ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(device.getDeviceId());
|
||||||
if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) {
|
if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) {
|
||||||
for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
|
for (SsrcTransaction ssrcTransaction : ssrcTransactions) {
|
||||||
@ -274,12 +283,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 监听设备过期事件
|
private void deviceStatusExpire(String deviceId, SipTransactionInfo transactionInfo) {
|
||||||
@Async
|
log.info("[设备状态] 到期, 编号: {}", deviceId);
|
||||||
@EventListener
|
offline(deviceId, "保活到期", true);
|
||||||
public void onApplicationEvent(DeviceOfflineEvent event) {
|
|
||||||
log.info("[设备状态] 到期, 编号: {}", event.getDeviceId());
|
|
||||||
offline(event.getDeviceId(), "保活到期", true);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -357,7 +363,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (userSetting.getDeviceStatusNotify()) {
|
if (userSetting.getDeviceStatusNotify()) {
|
||||||
// 发送 redis消息
|
// 发送redis消息
|
||||||
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, true);
|
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, true);
|
||||||
}
|
}
|
||||||
}else {
|
}else {
|
||||||
@ -369,9 +375,20 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
sync(device);
|
sync(device);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// 设备状态任务添加
|
|
||||||
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
|
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
|
||||||
deviceStatusTaskRunner.addTask(device.getDeviceId(), expiresTime + System.currentTimeMillis());
|
if (deviceStatusTaskRunner.containsKey(device.getDeviceId())) {
|
||||||
|
if (sipTransactionInfo == null) {
|
||||||
|
deviceStatusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis());
|
||||||
|
}else {
|
||||||
|
deviceStatusTaskRunner.removeTask(device.getDeviceId());
|
||||||
|
DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime + System.currentTimeMillis(), this::deviceStatusExpire);
|
||||||
|
deviceStatusTaskRunner.addTask(task);
|
||||||
|
}
|
||||||
|
}else {
|
||||||
|
DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime + System.currentTimeMillis(), this::deviceStatusExpire);
|
||||||
|
deviceStatusTaskRunner.addTask(task);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -450,6 +467,25 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 设备状态丢失检查
|
||||||
|
@Scheduled(fixedDelay = 30, timeUnit = TimeUnit.SECONDS)
|
||||||
|
public void lostCheckForStatus(){
|
||||||
|
// 获取所有设备
|
||||||
|
List<Device> deviceList = redisCatchStorage.getAllDevices();
|
||||||
|
if (deviceList.isEmpty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (Device device : deviceList) {
|
||||||
|
if (device == null || !device.isOnLine() || !userSetting.getServerId().equals(device.getServerId())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (!deviceStatusTaskRunner.containsKey(device.getDeviceId())) {
|
||||||
|
log.debug("[状态丢失] 执行设备离线, 编号: {},", device.getDeviceId());
|
||||||
|
offline(device.getDeviceId(), "", true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void catalogSubscribeExpire(String deviceId, SipTransactionInfo transactionInfo) {
|
private void catalogSubscribeExpire(String deviceId, SipTransactionInfo transactionInfo) {
|
||||||
log.info("[目录订阅] 到期, 编号: {}", deviceId);
|
log.info("[目录订阅] 到期, 编号: {}", deviceId);
|
||||||
Device device = getDeviceByDeviceId(deviceId);
|
Device device = getDeviceByDeviceId(deviceId);
|
||||||
@ -967,7 +1003,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
updateDevice(deviceInDb);
|
updateDevice(deviceInDb);
|
||||||
|
|
||||||
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
|
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
|
||||||
deviceStatusTaskRunner.addTask(device.getDeviceId(), expiresTime + System.currentTimeMillis());
|
if (deviceStatusTaskRunner.containsKey(device.getDeviceId())) {
|
||||||
|
deviceStatusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -0,0 +1,60 @@
|
|||||||
|
package com.genersoft.iot.vmp.gb28181.task.deviceStatus;
|
||||||
|
|
||||||
|
import com.genersoft.iot.vmp.common.DeviceStatusCallback;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
|
||||||
|
import lombok.Data;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.jetbrains.annotations.NotNull;
|
||||||
|
|
||||||
|
import java.util.concurrent.Delayed;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Data
|
||||||
|
public class DeviceStatusTask implements Delayed {
|
||||||
|
|
||||||
|
private String deviceId;
|
||||||
|
|
||||||
|
private SipTransactionInfo transactionInfo;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 超时时间(单位: 毫秒)
|
||||||
|
*/
|
||||||
|
private long delayTime;
|
||||||
|
|
||||||
|
private DeviceStatusCallback callback;
|
||||||
|
|
||||||
|
public static DeviceStatusTask getInstance(String deviceId, SipTransactionInfo transactionInfo, long delayTime, DeviceStatusCallback callback) {
|
||||||
|
DeviceStatusTask deviceStatusTask = new DeviceStatusTask();
|
||||||
|
deviceStatusTask.setDeviceId(deviceId);
|
||||||
|
deviceStatusTask.setTransactionInfo(transactionInfo);
|
||||||
|
deviceStatusTask.setDelayTime(delayTime);
|
||||||
|
deviceStatusTask.setCallback(callback);
|
||||||
|
return deviceStatusTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void expired() {
|
||||||
|
if (callback == null) {
|
||||||
|
log.info("[设备离线] 未找到过期处理回调, {}", deviceId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
callback.run(deviceId, transactionInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getDelay(@NotNull TimeUnit unit) {
|
||||||
|
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(@NotNull Delayed o) {
|
||||||
|
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
public DeviceStatusTaskInfo getInfo(){
|
||||||
|
DeviceStatusTaskInfo taskInfo = new DeviceStatusTaskInfo();
|
||||||
|
taskInfo.setTransactionInfo(transactionInfo);
|
||||||
|
taskInfo.setDeviceId(deviceId);
|
||||||
|
return taskInfo;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,17 @@
|
|||||||
|
package com.genersoft.iot.vmp.gb28181.task.deviceStatus;
|
||||||
|
|
||||||
|
import com.genersoft.iot.vmp.gb28181.bean.SipTransactionInfo;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
@Data
|
||||||
|
public class DeviceStatusTaskInfo{
|
||||||
|
|
||||||
|
private String deviceId;
|
||||||
|
|
||||||
|
private SipTransactionInfo transactionInfo;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 过期时间,单位毫秒
|
||||||
|
*/
|
||||||
|
private long expireTime;
|
||||||
|
}
|
||||||
@ -1,9 +1,6 @@
|
|||||||
package com.genersoft.iot.vmp.gb28181.task.deviceStatus;
|
package com.genersoft.iot.vmp.gb28181.task.deviceStatus;
|
||||||
|
|
||||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
|
||||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
|
||||||
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
|
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.ObjectUtils;
|
import org.apache.commons.lang3.ObjectUtils;
|
||||||
@ -15,7 +12,6 @@ import org.springframework.stereotype.Component;
|
|||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@Slf4j
|
@Slf4j
|
||||||
@ -23,68 +19,54 @@ import java.util.concurrent.TimeUnit;
|
|||||||
public class DeviceStatusTaskRunner {
|
public class DeviceStatusTaskRunner {
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private RedisTemplate<String, String> redisTemplate;
|
private RedisTemplate<Object, Object> redisTemplate;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private IRedisCatchStorage redisCatchStorage;
|
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private UserSetting userSetting;
|
private UserSetting userSetting;
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private EventPublisher eventPublisher;
|
|
||||||
|
|
||||||
private final String prefix = "VMP_DEVICE_EXPIRES";
|
private final String prefix = "VMP_DEVICE_EXPIRES";
|
||||||
|
private final String redisKey = String.format("%s_%s", prefix, userSetting.getServerId());
|
||||||
|
|
||||||
public String redisKey(){
|
// 状态过期检查
|
||||||
return String.format("%s_%s", prefix, userSetting.getServerId());
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 状态过期检查
|
|
||||||
*/
|
|
||||||
@Scheduled(fixedDelay = 1, timeUnit = TimeUnit.SECONDS)
|
@Scheduled(fixedDelay = 1, timeUnit = TimeUnit.SECONDS)
|
||||||
|
@Async
|
||||||
public void expirationCheck(){
|
public void expirationCheck(){
|
||||||
long now = System.currentTimeMillis();
|
|
||||||
// 获取已过期的 deviceId (Score 介于 0 到 现在之间)
|
|
||||||
Set<String> expiredIds = redisTemplate.opsForZSet().rangeByScore(redisKey(), 0, now);
|
|
||||||
|
|
||||||
if (expiredIds != null && !expiredIds.isEmpty()) {
|
|
||||||
redisTemplate.opsForZSet().remove(redisKey(), expiredIds.toArray());
|
|
||||||
|
|
||||||
// 使用 JDK 21 虚拟线程异步分发事件
|
|
||||||
for (String deviceId : expiredIds) {
|
|
||||||
Thread.startVirtualThread(() -> {
|
|
||||||
// 获取详情后删除缓存
|
|
||||||
Device device = redisCatchStorage.getDevice(deviceId);
|
|
||||||
// redisCatchStorage.removeDevice(deviceId);
|
|
||||||
// 发送 Spring 异步事件
|
|
||||||
eventPublisher.deviceOfflineEventPublish(deviceId);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addTask(String deviceId, long expireTime) {
|
public void addTask(String deviceId, long expireTime) {
|
||||||
redisTemplate.opsForZSet().add(redisKey(), deviceId, expireTime);
|
redisTemplate.opsForZSet().add(redisKey, deviceId, expireTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeTask(String deviceId) {
|
public void removeTask(String deviceId) {
|
||||||
redisTemplate.opsForZSet().remove(redisKey(), deviceId);
|
redisTemplate.opsForZSet().remove(redisKey, deviceId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean containsKey(String deviceId) {
|
public boolean containsKey(String deviceId) {
|
||||||
if (ObjectUtils.isEmpty(deviceId)) {
|
if (ObjectUtils.isEmpty(deviceId)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return redisTemplate.opsForZSet().score(redisKey(), deviceId) != null;
|
return redisTemplate.opsForZSet().score(redisKey, deviceId) != null;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clear() {
|
public List<DeviceStatusTaskInfo> getAllTaskInfo(){
|
||||||
redisTemplate.opsForZSet().removeRangeByScore(redisKey(), 0, Long.MAX_VALUE);
|
String scanKey = String.format("%s_%s_*", prefix, userSetting.getServerId());
|
||||||
}
|
List<Object> values = RedisUtil.scan(redisTemplate, scanKey);
|
||||||
|
if (values.isEmpty()) {
|
||||||
|
return new ArrayList<>();
|
||||||
|
}
|
||||||
|
List<DeviceStatusTaskInfo> result = new ArrayList<>();
|
||||||
|
for (Object value : values) {
|
||||||
|
String redisKey = (String)value;
|
||||||
|
DeviceStatusTaskInfo taskInfo = (DeviceStatusTaskInfo)redisTemplate.opsForValue().get(redisKey);
|
||||||
|
if (taskInfo == null) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Long expire = redisTemplate.getExpire(redisKey, TimeUnit.MILLISECONDS);
|
||||||
|
taskInfo.setExpireTime(expire);
|
||||||
|
result.add(taskInfo);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
|
||||||
public Set<String> getAll() {
|
|
||||||
return redisTemplate.opsForZSet().rangeByScore(redisKey(), 0, Long.MAX_VALUE);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -12,7 +12,6 @@ import gov.nist.javax.sip.message.SIPRequest;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.dom4j.Element;
|
import org.dom4j.Element;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.scheduling.annotation.Async;
|
|
||||||
|
|
||||||
import javax.sip.InvalidArgumentException;
|
import javax.sip.InvalidArgumentException;
|
||||||
import javax.sip.RequestEvent;
|
import javax.sip.RequestEvent;
|
||||||
@ -75,8 +74,9 @@ public abstract class MessageHandlerAbstract extends SIPRequestProcessorParent i
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Async
|
|
||||||
public void handMessageEvent(Element element, Object data) {
|
public void handMessageEvent(Element element, Object data) {
|
||||||
|
|
||||||
String cmd = getText(element, "CmdType");
|
String cmd = getText(element, "CmdType");
|
||||||
String sn = getText(element, "SN");
|
String sn = getText(element, "SN");
|
||||||
MessageEvent<Object> subscribe = (MessageEvent<Object>)messageSubscribe.getSubscribe(cmd + sn);
|
MessageEvent<Object> subscribe = (MessageEvent<Object>)messageSubscribe.getSubscribe(cmd + sn);
|
||||||
|
|||||||
@ -28,7 +28,6 @@ import javax.sip.message.Response;
|
|||||||
import java.text.ParseException;
|
import java.text.ParseException;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 状态信息(心跳)报送
|
* 状态信息(心跳)报送
|
||||||
@ -83,7 +82,9 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
|
|||||||
if (device.isOnLine()) {
|
if (device.isOnLine()) {
|
||||||
taskQueue.add(device);
|
taskQueue.add(device);
|
||||||
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
|
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
|
||||||
statusTaskRunner.addTask(device.getDeviceId(), expiresTime + System.currentTimeMillis());
|
if (statusTaskRunner.containsKey(device.getDeviceId())) {
|
||||||
|
statusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis());
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if (userSetting.getGbDeviceOnline() == 1) {
|
if (userSetting.getGbDeviceOnline() == 1) {
|
||||||
// 对于已经离线的设备判断他的注册是否已经过期
|
// 对于已经离线的设备判断他的注册是否已经过期
|
||||||
@ -91,7 +92,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@Scheduled(fixedDelay = 1000, timeUnit = TimeUnit.MILLISECONDS)
|
@Scheduled(fixedDelay = 1000)
|
||||||
@Async
|
@Async
|
||||||
public void executeUpdateDeviceList() {
|
public void executeUpdateDeviceList() {
|
||||||
if (!taskQueue.isEmpty()) {
|
if (!taskQueue.isEmpty()) {
|
||||||
|
|||||||
@ -16,7 +16,6 @@ import org.dom4j.DocumentException;
|
|||||||
import org.dom4j.Element;
|
import org.dom4j.Element;
|
||||||
import org.springframework.beans.factory.InitializingBean;
|
import org.springframework.beans.factory.InitializingBean;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.scheduling.annotation.Async;
|
|
||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
@ -68,120 +67,17 @@ public class CatalogResponseMessageHandler extends SIPRequestProcessorParent imp
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handForDevice(RequestEvent evt, Device device, Element element) {
|
public void handForDevice(RequestEvent evt, Device device, Element element) {
|
||||||
|
taskQueue.offer(new HandlerCatchData(evt, device, element));
|
||||||
// 回复200 OK
|
// 回复200 OK
|
||||||
try {
|
try {
|
||||||
responseAck((SIPRequest) evt.getRequest(), Response.OK);
|
responseAck((SIPRequest) evt.getRequest(), Response.OK);
|
||||||
} catch (SipException | InvalidArgumentException | ParseException e) {
|
} catch (SipException | InvalidArgumentException | ParseException e) {
|
||||||
log.error("[命令发送失败] 目录查询回复: {}", e.getMessage());
|
log.error("[命令发送失败] 目录查询回复: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
int sn = 0;
|
|
||||||
// 全局异常捕获,保证下一条可以得到处理
|
|
||||||
try {
|
|
||||||
Element rootElement = null;
|
|
||||||
try {
|
|
||||||
rootElement = getRootElement(evt, device.getCharset());
|
|
||||||
} catch (DocumentException e) {
|
|
||||||
log.error("[xml解析] 失败: ", e);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (rootElement == null) {
|
|
||||||
log.warn("[ 收到通道 ] content cannot be null, {}", evt.getRequest());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
Element deviceListElement = rootElement.element("DeviceList");
|
|
||||||
Element sumNumElement = rootElement.element("SumNum");
|
|
||||||
Element snElement = rootElement.element("SN");
|
|
||||||
|
|
||||||
sn = Integer.parseInt(snElement.getText());
|
|
||||||
int sumNum = Integer.parseInt(sumNumElement.getText());
|
|
||||||
|
|
||||||
if (sumNum == 0) {
|
|
||||||
log.info("[收到通道]设备:{}的: 0个", device.getDeviceId());
|
|
||||||
// 数据已经完整接收
|
|
||||||
deviceChannelService.cleanChannelsForDevice(device.getId());
|
|
||||||
// 推送空数据,不然无法及时结束
|
|
||||||
catalogDataCatch.put(device.getDeviceId(), sn, 0, device,
|
|
||||||
Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
|
|
||||||
catalogDataCatch.setChannelSyncEnd(device.getDeviceId(), sn, null);
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
Iterator<Element> deviceListIterator = deviceListElement.elementIterator();
|
|
||||||
if (deviceListIterator != null) {
|
|
||||||
List<DeviceChannel> channelList = new ArrayList<>();
|
|
||||||
List<Region> regionList = new ArrayList<>();
|
|
||||||
List<Group> groupList = new ArrayList<>();
|
|
||||||
// 遍历DeviceList
|
|
||||||
while (deviceListIterator.hasNext()) {
|
|
||||||
Element itemDevice = deviceListIterator.next();
|
|
||||||
Element channelDeviceElement = itemDevice.element("DeviceID");
|
|
||||||
if (channelDeviceElement == null) {
|
|
||||||
// 总数减一, 避免最后总数不对 无法确定问题
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// 从xml解析内容到 DeviceChannel 对象
|
|
||||||
DeviceChannel channel = DeviceChannel.decode(itemDevice);
|
|
||||||
if (channel.getDeviceId() == null) {
|
|
||||||
log.info("[收到目录订阅]:但是解析失败 {}", new String(evt.getRequest().getRawContent()));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
channel.setDataDeviceId(device.getId());
|
|
||||||
if (channel.getParentId() != null && channel.getParentId().equals(sipConfig.getId())) {
|
|
||||||
channel.setParentId(null);
|
|
||||||
}
|
|
||||||
// 解析通道类型
|
|
||||||
if (channel.getDeviceId().length() <= 8) {
|
|
||||||
// 行政区划
|
|
||||||
Region region = Region.getInstance(channel);
|
|
||||||
regionList.add(region);
|
|
||||||
channel.setChannelType(1);
|
|
||||||
}else if (channel.getDeviceId().length() == 20){
|
|
||||||
// 业务分组/虚拟组织
|
|
||||||
Group group = Group.getInstance(channel);
|
|
||||||
if (group != null) {
|
|
||||||
channel.setParental(1);
|
|
||||||
channel.setChannelType(2);
|
|
||||||
groupList.add(group);
|
|
||||||
}
|
|
||||||
if (channel.getLongitude() != null && channel.getLatitude() != null && channel.getLongitude() > 0 && channel.getLatitude() > 0) {
|
|
||||||
Double[] wgs84Position = Coordtransform.GCJ02ToWGS84(channel.getLongitude(), channel.getLatitude());
|
|
||||||
channel.setGbLongitude(wgs84Position[0]);
|
|
||||||
channel.setGbLatitude(wgs84Position[1]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
channelList.add(channel);
|
|
||||||
}
|
|
||||||
|
|
||||||
catalogDataCatch.put(device.getDeviceId(), sn, sumNum, device,
|
|
||||||
channelList, regionList, groupList);
|
|
||||||
log.info("[收到通道]设备: {} -> {}个,{}/{}", device.getDeviceId(), channelList.size(), catalogDataCatch.size(device.getDeviceId(), sn), sumNum);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
log.warn("[收到通道] 发现未处理的异常, \r\n{}", evt.getRequest());
|
|
||||||
log.error("[收到通道] 异常内容: ", e);
|
|
||||||
} finally {
|
|
||||||
String deviceId = device.getDeviceId();
|
|
||||||
if (catalogDataCatch.size(deviceId, sn) > 0
|
|
||||||
&& catalogDataCatch.size(deviceId, sn) == catalogDataCatch.sumNum(deviceId, sn)) {
|
|
||||||
// 数据已经完整接收, 此时可能存在某个设备离线变上线的情况,但是考虑到性能,此处不做处理,
|
|
||||||
// 目前支持设备通道上线通知时和设备上线时向上级通知
|
|
||||||
int finalSn = sn;
|
|
||||||
Thread.startVirtualThread(() -> {
|
|
||||||
boolean resetChannelsResult = saveData(device, finalSn);
|
|
||||||
if (!resetChannelsResult) {
|
|
||||||
String errorMsg = "接收成功,写入失败,共" + catalogDataCatch.sumNum(deviceId, finalSn) + "条,已接收" + catalogDataCatch.getDeviceChannelList(device.getDeviceId(), finalSn).size() + "条";
|
|
||||||
catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, errorMsg);
|
|
||||||
} else {
|
|
||||||
catalogDataCatch.setChannelSyncEnd(deviceId, finalSn, null);
|
|
||||||
}
|
|
||||||
}).start();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// @Scheduled(fixedDelay = 50)
|
@Scheduled(fixedDelay = 50)
|
||||||
// @Transactional
|
@Transactional
|
||||||
public void executeTaskQueue(){
|
public void executeTaskQueue(){
|
||||||
if (taskQueue.isEmpty()) {
|
if (taskQueue.isEmpty()) {
|
||||||
return;
|
return;
|
||||||
|
|||||||
@ -51,7 +51,13 @@ public class DeviceStatusResponseMessageHandler extends SIPRequestProcessorParen
|
|||||||
} catch (SipException | InvalidArgumentException | ParseException e) {
|
} catch (SipException | InvalidArgumentException | ParseException e) {
|
||||||
log.error("[命令发送失败] 国标级联 设备状态应答回复200OK: {}", e.getMessage());
|
log.error("[命令发送失败] 国标级联 设备状态应答回复200OK: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
String text = element.elementText("Online");
|
Element onlineElement = element.element("Online");
|
||||||
|
JSONObject json = new JSONObject();
|
||||||
|
XmlUtil.node2Json(element, json);
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug(json.toJSONString());
|
||||||
|
}
|
||||||
|
String text = onlineElement.getText();
|
||||||
responseMessageHandler.handMessageEvent(element, text);
|
responseMessageHandler.handMessageEvent(element, text);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -207,7 +207,8 @@ create table IF NOT EXISTS wvp_media_server
|
|||||||
record_path character varying(255) COMMENT '录像目录',
|
record_path character varying(255) COMMENT '录像目录',
|
||||||
record_day integer default 7 COMMENT '录像保留天数',
|
record_day integer default 7 COMMENT '录像保留天数',
|
||||||
transcode_suffix character varying(255) COMMENT '转码指令后缀',
|
transcode_suffix character varying(255) COMMENT '转码指令后缀',
|
||||||
server_id character varying(50) COMMENT '对应信令服务器ID'
|
server_id character varying(50) COMMENT '对应信令服务器ID',
|
||||||
|
constraint uk_media_server_unique_ip_http_port unique (ip, http_port, server_id)
|
||||||
);
|
);
|
||||||
|
|
||||||
-- 上级国标平台注册信息
|
-- 上级国标平台注册信息
|
||||||
|
|||||||
@ -355,6 +355,7 @@ create table IF NOT EXISTS wvp_media_server
|
|||||||
record_day integer default 7,
|
record_day integer default 7,
|
||||||
transcode_suffix character varying(255),
|
transcode_suffix character varying(255),
|
||||||
server_id character varying(50),
|
server_id character varying(50),
|
||||||
|
constraint uk_media_server_unique_ip_http_port unique (ip, http_port, server_id)
|
||||||
);
|
);
|
||||||
COMMENT ON TABLE wvp_media_server IS '媒体服务器(如 ZLM)节点信息';
|
COMMENT ON TABLE wvp_media_server IS '媒体服务器(如 ZLM)节点信息';
|
||||||
COMMENT ON COLUMN wvp_media_server.id IS '媒体服务器ID';
|
COMMENT ON COLUMN wvp_media_server.id IS '媒体服务器ID';
|
||||||
|
|||||||
@ -118,8 +118,6 @@ call wvp_20251027();
|
|||||||
DROP PROCEDURE wvp_20251027;
|
DROP PROCEDURE wvp_20251027;
|
||||||
DELIMITER ;
|
DELIMITER ;
|
||||||
|
|
||||||
drop index uk_media_server_unique_ip_http_port on wvp_media_server;
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -42,5 +42,3 @@ ALTER table wvp_device_channel ADD COLUMN IF NOT EXISTS enable_broadcast integer
|
|||||||
ALTER table wvp_device_channel ADD COLUMN IF NOT EXISTS map_level integer default 0;
|
ALTER table wvp_device_channel ADD COLUMN IF NOT EXISTS map_level integer default 0;
|
||||||
ALTER table wvp_common_group ADD COLUMN IF NOT EXISTS alias varchar(255) default null;
|
ALTER table wvp_common_group ADD COLUMN IF NOT EXISTS alias varchar(255) default null;
|
||||||
ALTER table wvp_stream_proxy DROP COLUMN IF EXISTS enable_remove_none_reader;
|
ALTER table wvp_stream_proxy DROP COLUMN IF EXISTS enable_remove_none_reader;
|
||||||
|
|
||||||
drop index uk_media_server_unique_ip_http_port on wvp_media_server;
|
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user