mirror of
https://gitee.com/pan648540858/wvp-GB28181-pro.git
synced 2026-05-23 21:47:49 +08:00
完善设备有效期到期机制
This commit is contained in:
parent
58e82ec185
commit
578b54ed09
@ -46,7 +46,6 @@ public class VideoManagerConstants {
|
|||||||
public static final String SYSTEM_INFO_DISK_PREFIX = "VMP_SYSTEM_INFO_DISK_";
|
public static final String SYSTEM_INFO_DISK_PREFIX = "VMP_SYSTEM_INFO_DISK_";
|
||||||
public static final String BROADCAST_WAITE_INVITE = "task_broadcast_waite_invite_";
|
public static final String BROADCAST_WAITE_INVITE = "task_broadcast_waite_invite_";
|
||||||
|
|
||||||
public static final String REGISTER_EXPIRE_TASK_KEY_PREFIX = "VMP_device_register_expire_";
|
|
||||||
public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_";
|
public static final String PUSH_STREAM_LIST = "VMP_PUSH_STREAM_LIST_";
|
||||||
public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:";
|
public static final String WAITE_SEND_PUSH_STREAM = "VMP_WAITE_SEND_PUSH_STREAM:";
|
||||||
public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:";
|
public static final String START_SEND_PUSH_STREAM = "VMP_START_SEND_PUSH_STREAM:";
|
||||||
|
|||||||
@ -2,7 +2,6 @@ package com.genersoft.iot.vmp.gb28181.service.impl;
|
|||||||
|
|
||||||
import com.alibaba.fastjson2.JSON;
|
import com.alibaba.fastjson2.JSON;
|
||||||
import com.genersoft.iot.vmp.common.CommonCallback;
|
import com.genersoft.iot.vmp.common.CommonCallback;
|
||||||
import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
|
||||||
import com.genersoft.iot.vmp.common.enums.ChannelDataType;
|
import com.genersoft.iot.vmp.common.enums.ChannelDataType;
|
||||||
import com.genersoft.iot.vmp.conf.DynamicTask;
|
import com.genersoft.iot.vmp.conf.DynamicTask;
|
||||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||||
@ -53,9 +52,7 @@ import javax.sip.SipException;
|
|||||||
import javax.validation.constraints.NotNull;
|
import javax.validation.constraints.NotNull;
|
||||||
import java.text.ParseException;
|
import java.text.ParseException;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.ArrayList;
|
import java.util.*;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Objects;
|
|
||||||
import java.util.concurrent.SynchronousQueue;
|
import java.util.concurrent.SynchronousQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
@ -124,6 +121,30 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run(String... args) throws Exception {
|
public void run(String... args) throws Exception {
|
||||||
|
|
||||||
|
// 清理数据库不存在但是redis中存在的数据
|
||||||
|
List<Device> devicesInDb = getAll();
|
||||||
|
if (devicesInDb.isEmpty()) {
|
||||||
|
redisCatchStorage.removeAllDevice();
|
||||||
|
}else {
|
||||||
|
List<Device> devicesInRedis = redisCatchStorage.getAllDevices();
|
||||||
|
if (!devicesInRedis.isEmpty()) {
|
||||||
|
Map<String, Device> deviceMapInDb = new HashMap<>();
|
||||||
|
devicesInDb.parallelStream().forEach(device -> {
|
||||||
|
deviceMapInDb.put(device.getDeviceId(), device);
|
||||||
|
});
|
||||||
|
devicesInRedis.parallelStream().forEach(device -> {
|
||||||
|
if (deviceMapInDb.get(device.getDeviceId()) == null
|
||||||
|
&& userSetting.getServerId().equals(device.getServerId())) {
|
||||||
|
redisCatchStorage.removeDevice(device.getDeviceId());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 重置cseq计数
|
||||||
|
redisCatchStorage.resetAllCSEQ();
|
||||||
|
// 处理设备状态
|
||||||
List<DeviceStatusTaskInfo> allTaskInfo = deviceStatusTaskRunner.getAllTaskInfo();
|
List<DeviceStatusTaskInfo> allTaskInfo = deviceStatusTaskRunner.getAllTaskInfo();
|
||||||
List<String> onlineDeviceIds = new ArrayList<>();
|
List<String> onlineDeviceIds = new ArrayList<>();
|
||||||
if (!allTaskInfo.isEmpty()) {
|
if (!allTaskInfo.isEmpty()) {
|
||||||
@ -157,6 +178,16 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
offlineByIds(offlineDevices);
|
offlineByIds(offlineDevices);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}else {
|
||||||
|
// 所有设备全部离线
|
||||||
|
List<Device> onlineDevice = getAllOnlineDevice(userSetting.getServerId());
|
||||||
|
for (Device device : onlineDevice) {
|
||||||
|
// 此设备需要离线
|
||||||
|
device.setOnLine(false);
|
||||||
|
// 清理离线设备的相关缓存
|
||||||
|
cleanOfflineDevice(device);
|
||||||
|
}
|
||||||
|
offlineByIds(onlineDevice);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 处理订阅任务
|
// 处理订阅任务
|
||||||
@ -240,7 +271,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
|
|
||||||
private void deviceStatusExpire(String deviceId, SipTransactionInfo transactionInfo) {
|
private void deviceStatusExpire(String deviceId, SipTransactionInfo transactionInfo) {
|
||||||
log.info("[设备状态] 到期, 编号: {}", deviceId);
|
log.info("[设备状态] 到期, 编号: {}", deviceId);
|
||||||
offline(deviceId, "");
|
offline(deviceId, "保活到期");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -289,7 +320,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
sync(device);
|
sync(device);
|
||||||
}else {
|
}else {
|
||||||
device.setServerId(userSetting.getServerId());
|
device.setServerId(userSetting.getServerId());
|
||||||
if(!device.isOnLine()){
|
if(!deviceInDb.isOnLine()){
|
||||||
device.setOnLine(true);
|
device.setOnLine(true);
|
||||||
device.setCreateTime(now);
|
device.setCreateTime(now);
|
||||||
deviceMapper.update(device);
|
deviceMapper.update(device);
|
||||||
@ -312,6 +343,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
if (device.getSubscribeCycleForMobilePosition() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
|
if (device.getSubscribeCycleForMobilePosition() > 0 && !subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
|
||||||
addMobilePositionSubscribe(device, null);
|
addMobilePositionSubscribe(device, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (userSetting.getDeviceStatusNotify()) {
|
if (userSetting.getDeviceStatusNotify()) {
|
||||||
// 发送redis消息
|
// 发送redis消息
|
||||||
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, true);
|
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, true);
|
||||||
@ -326,12 +358,19 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
sync(device);
|
sync(device);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
|
||||||
// 刷新过期任务
|
if (deviceStatusTaskRunner.containsKey(device.getDeviceId())) {
|
||||||
String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + device.getDeviceId();
|
if (sipTransactionInfo == null) {
|
||||||
// 如果第一次注册那么必须在60 * 3时间内收到一个心跳,否则设备离线
|
deviceStatusTaskRunner.updateDelay(device.getDeviceId(), System.currentTimeMillis() + expiresTime);
|
||||||
dynamicTask.startDelay(registerExpireTaskKey, ()-> offline(device.getDeviceId(), "三次心跳超时"),
|
}else {
|
||||||
device.getHeartBeatInterval() * 1000 * device.getHeartBeatCount());
|
deviceStatusTaskRunner.removeTask(device.getDeviceId());
|
||||||
|
DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime, this::deviceStatusExpire);
|
||||||
|
deviceStatusTaskRunner.addTask(task);
|
||||||
|
}
|
||||||
|
}else {
|
||||||
|
DeviceStatusTask task = DeviceStatusTask.getInstance(device.getDeviceId(), sipTransactionInfo, expiresTime, this::deviceStatusExpire);
|
||||||
|
deviceStatusTaskRunner.addTask(task);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -388,7 +427,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
log.info("[目录订阅] 到期, 编号: {}, 设备不存在, 忽略", deviceId);
|
log.info("[目录订阅] 到期, 编号: {}, 设备不存在, 忽略", deviceId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (device.getSubscribeCycleForCatalog() > 0) {
|
if (device.isOnLine() && device.getSubscribeCycleForCatalog() > 0) {
|
||||||
addCatalogSubscribe(device, transactionInfo);
|
addCatalogSubscribe(device, transactionInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -400,7 +439,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
log.info("[移动位置订阅] 到期, 编号: {}, 设备不存在, 忽略", deviceId);
|
log.info("[移动位置订阅] 到期, 编号: {}, 设备不存在, 忽略", deviceId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (device.getSubscribeCycleForMobilePosition() > 0) {
|
if (device.isOnLine() && device.getSubscribeCycleForMobilePosition() > 0) {
|
||||||
addMobilePositionSubscribe(device, transactionInfo);
|
addMobilePositionSubscribe(device, transactionInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -440,9 +479,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean removeCatalogSubscribe(@NotNull Device device, CommonCallback<Boolean> callback) {
|
public boolean removeCatalogSubscribe(@NotNull Device device, CommonCallback<Boolean> callback) {
|
||||||
log.info("[移除目录订阅]: {}", device.getDeviceId());
|
|
||||||
String key = SubscribeTaskForCatalog.getKey(device);
|
String key = SubscribeTaskForCatalog.getKey(device);
|
||||||
if (subscribeTaskRunner.containsKey(key)) {
|
if (subscribeTaskRunner.containsKey(key)) {
|
||||||
|
log.info("[移除目录订阅]: {}", device.getDeviceId());
|
||||||
SipTransactionInfo transactionInfo = subscribeTaskRunner.getTransactionInfo(key);
|
SipTransactionInfo transactionInfo = subscribeTaskRunner.getTransactionInfo(key);
|
||||||
if (transactionInfo == null) {
|
if (transactionInfo == null) {
|
||||||
log.warn("[移除目录订阅] 未找到事务信息,{}", device.getDeviceId());
|
log.warn("[移除目录订阅] 未找到事务信息,{}", device.getDeviceId());
|
||||||
@ -500,9 +539,10 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback) {
|
public boolean removeMobilePositionSubscribe(Device device, CommonCallback<Boolean> callback) {
|
||||||
log.info("[移除移动位置订阅]: {}", device.getDeviceId());
|
|
||||||
String key = SubscribeTaskForMobilPosition.getKey(device);
|
String key = SubscribeTaskForMobilPosition.getKey(device);
|
||||||
if (subscribeTaskRunner.containsKey(key)) {
|
if (subscribeTaskRunner.containsKey(key)) {
|
||||||
|
log.info("[移除移动位置订阅]: {}", device.getDeviceId());
|
||||||
SipTransactionInfo transactionInfo = subscribeTaskRunner.getTransactionInfo(key);
|
SipTransactionInfo transactionInfo = subscribeTaskRunner.getTransactionInfo(key);
|
||||||
if (transactionInfo == null) {
|
if (transactionInfo == null) {
|
||||||
log.warn("[移除移动位置订阅] 未找到事务信息,{}", device.getDeviceId());
|
log.warn("[移除移动位置订阅] 未找到事务信息,{}", device.getDeviceId());
|
||||||
@ -684,9 +724,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
|
if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
|
||||||
removeMobilePositionSubscribe(device, null);
|
removeMobilePositionSubscribe(device, null);
|
||||||
}
|
}
|
||||||
// 停止状态检测
|
if (deviceStatusTaskRunner.containsKey(deviceId)) {
|
||||||
String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + device.getDeviceId();
|
deviceStatusTaskRunner.removeTask(deviceId);
|
||||||
dynamicTask.stop(registerExpireTaskKey);
|
}
|
||||||
platformChannelMapper.delChannelForDeviceId(deviceId);
|
platformChannelMapper.delChannelForDeviceId(deviceId);
|
||||||
deviceChannelMapper.cleanChannelsByDeviceId(device.getId());
|
deviceChannelMapper.cleanChannelsByDeviceId(device.getId());
|
||||||
deviceMapper.del(deviceId);
|
deviceMapper.del(deviceId);
|
||||||
@ -738,7 +778,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
public void subscribeCatalog(int id, int cycle) {
|
public void subscribeCatalog(int id, int cycle) {
|
||||||
Device device = deviceMapper.query(id);
|
Device device = deviceMapper.query(id);
|
||||||
Assert.notNull(device, "未找到设备");
|
Assert.notNull(device, "未找到设备");
|
||||||
|
Assert.isTrue(device.isOnLine(), "设备已离线");
|
||||||
if (device.getSubscribeCycleForCatalog() == cycle) {
|
if (device.getSubscribeCycleForCatalog() == cycle) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -769,6 +809,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
public void subscribeMobilePosition(int id, int cycle, int interval) {
|
public void subscribeMobilePosition(int id, int cycle, int interval) {
|
||||||
Device device = deviceMapper.query(id);
|
Device device = deviceMapper.query(id);
|
||||||
Assert.notNull(device, "未找到设备");
|
Assert.notNull(device, "未找到设备");
|
||||||
|
Assert.isTrue(device.isOnLine(), "设备已离线");
|
||||||
|
|
||||||
if (device.getSubscribeCycleForMobilePosition() == cycle) {
|
if (device.getSubscribeCycleForMobilePosition() == cycle) {
|
||||||
return;
|
return;
|
||||||
@ -806,15 +847,16 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
|||||||
}
|
}
|
||||||
if (!Objects.equals(deviceInDb.getHeartBeatCount(), device.getHeartBeatCount())
|
if (!Objects.equals(deviceInDb.getHeartBeatCount(), device.getHeartBeatCount())
|
||||||
|| !Objects.equals(deviceInDb.getHeartBeatInterval(), device.getHeartBeatInterval())) {
|
|| !Objects.equals(deviceInDb.getHeartBeatInterval(), device.getHeartBeatInterval())) {
|
||||||
// 刷新过期任务
|
|
||||||
String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + device.getDeviceId();
|
|
||||||
// 如果第一次注册那么必须在60 * 3时间内收到一个心跳,否则设备离线
|
|
||||||
dynamicTask.startDelay(registerExpireTaskKey, ()-> offline(device.getDeviceId(), "三次心跳超时"),
|
|
||||||
device.getHeartBeatInterval() * 1000 * device.getHeartBeatCount());
|
|
||||||
deviceInDb.setHeartBeatCount(device.getHeartBeatCount());
|
deviceInDb.setHeartBeatCount(device.getHeartBeatCount());
|
||||||
deviceInDb.setHeartBeatInterval(device.getHeartBeatInterval());
|
deviceInDb.setHeartBeatInterval(device.getHeartBeatInterval());
|
||||||
deviceInDb.setPositionCapability(device.getPositionCapability());
|
deviceInDb.setPositionCapability(device.getPositionCapability());
|
||||||
updateDevice(deviceInDb);
|
updateDevice(deviceInDb);
|
||||||
|
|
||||||
|
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
|
||||||
|
if (deviceStatusTaskRunner.containsKey(device.getDeviceId())) {
|
||||||
|
deviceStatusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -113,6 +113,35 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run(String... args) throws Exception {
|
public void run(String... args) throws Exception {
|
||||||
|
|
||||||
|
// 查找国标推流
|
||||||
|
List<SendRtpInfo> sendRtpItems = redisCatchStorage.queryAllSendRTPServer();
|
||||||
|
if (!sendRtpItems.isEmpty()) {
|
||||||
|
for (SendRtpInfo sendRtpItem : sendRtpItems) {
|
||||||
|
MediaServer mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
|
||||||
|
CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId());
|
||||||
|
if (channel == null){
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
sendRtpServerService.delete(sendRtpItem);
|
||||||
|
if (mediaServerItem != null) {
|
||||||
|
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
|
||||||
|
boolean stopResult = mediaServerService.initStopSendRtp(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
|
||||||
|
if (stopResult) {
|
||||||
|
Platform platform = queryPlatformByServerGBId(sendRtpItem.getTargetId());
|
||||||
|
|
||||||
|
if (platform != null) {
|
||||||
|
try {
|
||||||
|
commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel);
|
||||||
|
} catch (InvalidArgumentException | ParseException | SipException e) {
|
||||||
|
log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 启动时 如果存在未过期的注册平台,则发送注销
|
// 启动时 如果存在未过期的注册平台,则发送注销
|
||||||
List<PlatformRegisterTaskInfo> registerTaskInfoList = statusTaskRunner.getAllRegisterTaskInfo();
|
List<PlatformRegisterTaskInfo> registerTaskInfoList = statusTaskRunner.getAllRegisterTaskInfo();
|
||||||
if (registerTaskInfoList.isEmpty()) {
|
if (registerTaskInfoList.isEmpty()) {
|
||||||
@ -415,7 +444,6 @@ public class PlatformServiceImpl implements IPlatformService, CommandLineRunner
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void keepaliveExpire(String platformServerId, int failCount) {
|
private void keepaliveExpire(String platformServerId, int failCount) {
|
||||||
log.info("[国标级联] 心跳到期, 上级平台编号: {}", platformServerId);
|
|
||||||
Platform platform = queryPlatformByServerGBId(platformServerId);
|
Platform platform = queryPlatformByServerGBId(platformServerId);
|
||||||
if (platform == null || !platform.isEnable()) {
|
if (platform == null || !platform.isEnable()) {
|
||||||
log.info("[国标级联] 心跳到期, 上级平台编号: {}, 平台不存在或者未启用, 忽略", platformServerId);
|
log.info("[国标级联] 心跳到期, 上级平台编号: {}, 平台不存在或者未启用, 忽略", platformServerId);
|
||||||
|
|||||||
@ -1,132 +0,0 @@
|
|||||||
package com.genersoft.iot.vmp.gb28181.task;
|
|
||||||
|
|
||||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.CommonGBChannel;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.Platform;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.SendRtpInfo;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.service.IGbChannelService;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.service.IPlatformService;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.session.SSRCFactory;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.transmit.cmd.ISIPCommanderForPlatform;
|
|
||||||
import com.genersoft.iot.vmp.media.bean.MediaServer;
|
|
||||||
import com.genersoft.iot.vmp.media.service.IMediaServerService;
|
|
||||||
import com.genersoft.iot.vmp.service.ISendRtpServerService;
|
|
||||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
|
||||||
import lombok.extern.slf4j.Slf4j;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.boot.CommandLineRunner;
|
|
||||||
import org.springframework.core.annotation.Order;
|
|
||||||
import org.springframework.stereotype.Component;
|
|
||||||
|
|
||||||
import javax.sip.InvalidArgumentException;
|
|
||||||
import javax.sip.SipException;
|
|
||||||
import java.text.ParseException;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* 系统启动时控制设备
|
|
||||||
* @author lin
|
|
||||||
*/
|
|
||||||
@Slf4j
|
|
||||||
@Component
|
|
||||||
@Order(value=14)
|
|
||||||
public class SipRunner implements CommandLineRunner {
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private IRedisCatchStorage redisCatchStorage;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private SSRCFactory ssrcFactory;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private IDeviceService deviceService;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private IMediaServerService mediaServerService;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private IPlatformService platformService;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private IGbChannelService channelService;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private ISIPCommanderForPlatform commanderForPlatform;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private ISendRtpServerService sendRtpServerService;
|
|
||||||
|
|
||||||
@Autowired
|
|
||||||
private UserSetting userSetting;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run(String... args) throws Exception {
|
|
||||||
List<Device> deviceList = deviceService.getAllOnlineDevice(userSetting.getServerId());
|
|
||||||
|
|
||||||
for (Device device : deviceList) {
|
|
||||||
if (deviceService.expire(device)){
|
|
||||||
deviceService.offline(device.getDeviceId(), "注册已过期");
|
|
||||||
}else {
|
|
||||||
deviceService.online(device, null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// 重置cseq计数
|
|
||||||
redisCatchStorage.resetAllCSEQ();
|
|
||||||
// 清理redis
|
|
||||||
// 清理数据库不存在但是redis中存在的数据
|
|
||||||
List<Device> devicesInDb = deviceService.getAll();
|
|
||||||
if (devicesInDb.isEmpty()) {
|
|
||||||
redisCatchStorage.removeAllDevice();
|
|
||||||
}else {
|
|
||||||
List<Device> devicesInRedis = redisCatchStorage.getAllDevices();
|
|
||||||
if (!devicesInRedis.isEmpty()) {
|
|
||||||
Map<String, Device> deviceMapInDb = new HashMap<>();
|
|
||||||
devicesInDb.parallelStream().forEach(device -> {
|
|
||||||
deviceMapInDb.put(device.getDeviceId(), device);
|
|
||||||
});
|
|
||||||
devicesInRedis.parallelStream().forEach(device -> {
|
|
||||||
if (deviceMapInDb.get(device.getDeviceId()) == null
|
|
||||||
&& userSetting.getServerId().equals(device.getServerId())) {
|
|
||||||
redisCatchStorage.removeDevice(device.getDeviceId());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// 查找国标推流
|
|
||||||
List<SendRtpInfo> sendRtpItems = redisCatchStorage.queryAllSendRTPServer();
|
|
||||||
if (!sendRtpItems.isEmpty()) {
|
|
||||||
for (SendRtpInfo sendRtpItem : sendRtpItems) {
|
|
||||||
MediaServer mediaServerItem = mediaServerService.getOne(sendRtpItem.getMediaServerId());
|
|
||||||
CommonGBChannel channel = channelService.getOne(sendRtpItem.getChannelId());
|
|
||||||
if (channel == null){
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
sendRtpServerService.delete(sendRtpItem);
|
|
||||||
if (mediaServerItem != null) {
|
|
||||||
ssrcFactory.releaseSsrc(sendRtpItem.getMediaServerId(), sendRtpItem.getSsrc());
|
|
||||||
boolean stopResult = mediaServerService.initStopSendRtp(mediaServerItem, sendRtpItem.getApp(), sendRtpItem.getStream(), sendRtpItem.getSsrc());
|
|
||||||
if (stopResult) {
|
|
||||||
Platform platform = platformService.queryPlatformByServerGBId(sendRtpItem.getTargetId());
|
|
||||||
|
|
||||||
if (platform != null) {
|
|
||||||
try {
|
|
||||||
commanderForPlatform.streamByeCmd(platform, sendRtpItem, channel);
|
|
||||||
} catch (InvalidArgumentException | ParseException | SipException e) {
|
|
||||||
log.error("[命令发送失败] 国标级联 发送BYE: {}", e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
@ -136,16 +136,17 @@ public class SIPCommanderForPlatform implements ISIPCommanderForPlatform {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String keepalive(Platform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws SipException, InvalidArgumentException, ParseException {
|
public String keepalive(Platform parentPlatform, SipSubscribe.Event errorEvent , SipSubscribe.Event okEvent) throws SipException, InvalidArgumentException, ParseException {
|
||||||
String characterSet = parentPlatform.getCharacterSet();
|
log.info("[国标级联] 发送心跳, 上级平台编号: {}", parentPlatform.getServerGBId());
|
||||||
StringBuffer keepaliveXml = new StringBuffer(200);
|
String characterSet = parentPlatform.getCharacterSet();
|
||||||
keepaliveXml.append("<?xml version=\"1.0\" encoding=\"")
|
StringBuffer keepaliveXml = new StringBuffer(200);
|
||||||
.append(characterSet).append("\"?>\r\n")
|
keepaliveXml.append("<?xml version=\"1.0\" encoding=\"")
|
||||||
.append("<Notify>\r\n")
|
.append(characterSet).append("\"?>\r\n")
|
||||||
.append("<CmdType>Keepalive</CmdType>\r\n")
|
.append("<Notify>\r\n")
|
||||||
.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n")
|
.append("<CmdType>Keepalive</CmdType>\r\n")
|
||||||
.append("<DeviceID>" + parentPlatform.getDeviceGBId() + "</DeviceID>\r\n")
|
.append("<SN>" + (int)((Math.random()*9+1)*100000) + "</SN>\r\n")
|
||||||
.append("<Status>OK</Status>\r\n")
|
.append("<DeviceID>" + parentPlatform.getDeviceGBId() + "</DeviceID>\r\n")
|
||||||
.append("</Notify>\r\n");
|
.append("<Status>OK</Status>\r\n")
|
||||||
|
.append("</Notify>\r\n");
|
||||||
|
|
||||||
CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport());
|
CallIdHeader callIdHeader = sipSender.getNewCallIdHeader(parentPlatform.getDeviceIp(),parentPlatform.getTransport());
|
||||||
|
|
||||||
|
|||||||
@ -113,8 +113,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
|
|||||||
device.setTransport("TCP".equalsIgnoreCase(transport) ? "TCP" : "UDP");
|
device.setTransport("TCP".equalsIgnoreCase(transport) ? "TCP" : "UDP");
|
||||||
sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), registerOkResponse);
|
sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), registerOkResponse);
|
||||||
device.setRegisterTime(DateUtil.getNow());
|
device.setRegisterTime(DateUtil.getNow());
|
||||||
SipTransactionInfo sipTransactionInfo = new SipTransactionInfo((SIPResponse) registerOkResponse);
|
deviceService.online(device, null);
|
||||||
deviceService.online(device, sipTransactionInfo);
|
|
||||||
} else {
|
} else {
|
||||||
deviceService.offline(deviceId, "主动注销");
|
deviceService.offline(deviceId, "主动注销");
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,13 +1,13 @@
|
|||||||
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd;
|
package com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.cmd;
|
||||||
|
|
||||||
import com.genersoft.iot.vmp.common.VideoManagerConstants;
|
import com.genersoft.iot.vmp.common.RemoteAddressInfo;
|
||||||
import com.genersoft.iot.vmp.conf.DynamicTask;
|
import com.genersoft.iot.vmp.conf.DynamicTask;
|
||||||
import com.genersoft.iot.vmp.conf.UserSetting;
|
import com.genersoft.iot.vmp.conf.UserSetting;
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.Platform;
|
import com.genersoft.iot.vmp.gb28181.bean.Platform;
|
||||||
import com.genersoft.iot.vmp.common.RemoteAddressInfo;
|
|
||||||
import com.genersoft.iot.vmp.gb28181.bean.SipMsgInfo;
|
import com.genersoft.iot.vmp.gb28181.bean.SipMsgInfo;
|
||||||
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
|
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
|
||||||
|
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskRunner;
|
||||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
|
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
|
||||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
|
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
|
||||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
|
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
|
||||||
@ -48,6 +48,9 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
|
|||||||
@Autowired
|
@Autowired
|
||||||
private IDeviceService deviceService;
|
private IDeviceService deviceService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private DeviceStatusTaskRunner statusTaskRunner;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
private UserSetting userSetting;
|
private UserSetting userSetting;
|
||||||
|
|
||||||
@ -111,18 +114,16 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
|
|||||||
|
|
||||||
if (device.isOnLine()) {
|
if (device.isOnLine()) {
|
||||||
deviceService.updateDevice(device);
|
deviceService.updateDevice(device);
|
||||||
|
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
|
||||||
|
if (statusTaskRunner.containsKey(device.getDeviceId())) {
|
||||||
|
statusTaskRunner.updateDelay(device.getDeviceId(), expiresTime + System.currentTimeMillis());
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if (userSetting.getGbDeviceOnline() == 1) {
|
if (userSetting.getGbDeviceOnline() == 1) {
|
||||||
// 对于已经离线的设备判断他的注册是否已经过期
|
// 对于已经离线的设备判断他的注册是否已经过期
|
||||||
deviceService.online(device, null);
|
deviceService.online(device, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// 刷新过期任务
|
|
||||||
String registerExpireTaskKey = VideoManagerConstants.REGISTER_EXPIRE_TASK_KEY_PREFIX + device.getDeviceId();
|
|
||||||
// 如果三次心跳失败,则设置设备离线
|
|
||||||
dynamicTask.startDelay(registerExpireTaskKey, () -> deviceService.offline(device.getDeviceId(), "三次心跳超时"),
|
|
||||||
device.getHeartBeatInterval() * 1000 * device.getHeartBeatCount());
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,9 +1,9 @@
|
|||||||
// sidebar
|
// sidebar
|
||||||
$menuText:#bfcbd9;
|
$menuText:#bfcbd9;
|
||||||
$menuActiveText:#409EFF;
|
$menuActiveText:#409EFF;
|
||||||
$subMenuActiveText: #f7ff17; //https://github.com/ElemeFE/element/issues/12951
|
$subMenuActiveText: #f4f4f5; //https://github.com/ElemeFE/element/issues/12951
|
||||||
|
|
||||||
$menuBg: #1f61cd;
|
$menuBg: #304156;
|
||||||
$menuHover:#263445;
|
$menuHover:#263445;
|
||||||
|
|
||||||
$subMenuBg:#1f2d3d;
|
$subMenuBg:#1f2d3d;
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user