mirror of
https://gitee.com/pan648540858/wvp-GB28181-pro.git
synced 2026-05-18 19:37:50 +08:00
优化注册参数
This commit is contained in:
parent
7068898c7b
commit
8e9e75997a
1
pom.xml
1
pom.xml
@ -60,6 +60,7 @@
|
||||
<asciidoctor.html.output.directory>${project.build.directory}/asciidoc/html</asciidoctor.html.output.directory>
|
||||
<asciidoctor.pdf.output.directory>${project.build.directory}/asciidoc/pdf</asciidoctor.pdf.output.directory>
|
||||
|
||||
<java.version>21</java.version>
|
||||
<maven.compiler.source>21</maven.compiler.source>
|
||||
<maven.compiler.target>21</maven.compiler.target>
|
||||
</properties>
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
package com.genersoft.iot.vmp;
|
||||
|
||||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
||||
import com.genersoft.iot.vmp.jt1078.util.ClassUtil;
|
||||
import com.genersoft.iot.vmp.utils.GitUtil;
|
||||
import com.genersoft.iot.vmp.utils.SpringBeanFactory;
|
||||
@ -43,6 +44,7 @@ public class VManageBootstrap extends SpringBootServletInitializer {
|
||||
log.info("构建时间: {}", gitUtil.getBuildDate());
|
||||
log.info("GIT信息: 分支: {}, ID: {}, 时间: {}", gitUtil.getBranch(), gitUtil.getCommitIdShort(), gitUtil.getCommitTime());
|
||||
}
|
||||
|
||||
}
|
||||
// 项目重启
|
||||
public static void restart() {
|
||||
|
||||
@ -20,13 +20,12 @@ public interface IDeviceService {
|
||||
* 设备上线
|
||||
* @param device 设备信息
|
||||
*/
|
||||
void online(Device device, SipTransactionInfo sipTransactionInfo);
|
||||
void online(Device device);
|
||||
|
||||
/**
|
||||
* 设备下线
|
||||
* @param deviceId 设备编号
|
||||
*/
|
||||
void offline(String deviceId, String reason, boolean check);
|
||||
void offline(Device device);
|
||||
|
||||
/**
|
||||
* 添加目录订阅
|
||||
|
||||
@ -17,7 +17,7 @@ import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
|
||||
import com.genersoft.iot.vmp.gb28181.service.IInviteStreamService;
|
||||
import com.genersoft.iot.vmp.gb28181.session.AudioBroadcastManager;
|
||||
import com.genersoft.iot.vmp.gb28181.session.SipInviteSessionManager;
|
||||
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskRunner;
|
||||
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusManager;
|
||||
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTask;
|
||||
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskInfo;
|
||||
import com.genersoft.iot.vmp.gb28181.task.deviceSubscribe.SubscribeTaskRunner;
|
||||
@ -119,7 +119,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
||||
private SubscribeTaskRunner subscribeTaskRunner;
|
||||
|
||||
@Autowired
|
||||
private DeviceStatusTaskRunner deviceStatusTaskRunner;
|
||||
private DeviceStatusManager deviceStatusManager;
|
||||
|
||||
private Device getDeviceByDeviceIdFromDb(String deviceId) {
|
||||
return deviceMapper.getDeviceByDeviceId(deviceId);
|
||||
@ -132,7 +132,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
||||
List<Device> devicesInDb = getAll();
|
||||
if (devicesInDb.isEmpty()) {
|
||||
redisCatchStorage.removeAllDevice();
|
||||
deviceStatusTaskRunner.clear();
|
||||
deviceStatusManager.clear();
|
||||
}else {
|
||||
List<Device> devicesInRedis = redisCatchStorage.getAllDevices();
|
||||
if (!devicesInRedis.isEmpty()) {
|
||||
@ -152,7 +152,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
||||
// 重置 cseq 计数
|
||||
redisCatchStorage.resetAllCSEQ();
|
||||
// 处理设备状态
|
||||
Set<String> allDeviceIds = deviceStatusTaskRunner.getAll();
|
||||
Set<String> allDeviceIds = deviceStatusManager.getAll();
|
||||
if (!allDeviceIds.isEmpty()) {
|
||||
// 除了记录的设备以外, 其他设备全部离线
|
||||
List<Device> onlineDevice = getAllOnlineDevice(userSetting.getServerId());
|
||||
@ -233,7 +233,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
||||
for (Device device : offlineDevices) {
|
||||
device.setOnLine(false);
|
||||
redisCatchStorage.updateDevice(device);
|
||||
deviceStatusTaskRunner.removeTask(device.getDeviceId());
|
||||
deviceStatusManager.remove(device.getDeviceId());
|
||||
}
|
||||
}
|
||||
|
||||
@ -244,7 +244,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
||||
if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
|
||||
subscribeTaskRunner.removeSubscribe(SubscribeTaskForMobilPosition.getKey(device));
|
||||
}
|
||||
deviceStatusTaskRunner.removeTask(device.getDeviceId());
|
||||
deviceStatusManager.remove(device.getDeviceId());
|
||||
// 离线释放所有 ssrc
|
||||
List<SsrcTransaction> ssrcTransactions = sessionManager.getSsrcTransactionByDeviceId(device.getDeviceId());
|
||||
if (ssrcTransactions != null && !ssrcTransactions.isEmpty()) {
|
||||
@ -279,11 +279,18 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
||||
@EventListener
|
||||
public void onApplicationEvent(DeviceOfflineEvent event) {
|
||||
log.info("[设备状态] 到期, 编号: {}", event.getDeviceId());
|
||||
offline(event.getDeviceId(), "保活到期", true);
|
||||
Device device = getDeviceByDeviceId(event.getDeviceId());
|
||||
Boolean deviceStatus = getDeviceStatus(device);
|
||||
if (deviceStatus != null && deviceStatus) {
|
||||
log.info("[设备离线] 主动探测发现设备在线,暂不处理 device:{}", event.getDeviceId());
|
||||
online(device);
|
||||
return;
|
||||
}
|
||||
offline(device);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void online(Device device, SipTransactionInfo sipTransactionInfo) {
|
||||
public void online(Device device) {
|
||||
log.info("[设备上线] deviceId:{}->{}:{}", device.getDeviceId(), device.getIp(), device.getPort());
|
||||
Device deviceInRedis = redisCatchStorage.getDevice(device.getDeviceId());
|
||||
Device deviceInDb = getDeviceByDeviceIdFromDb(device.getDeviceId());
|
||||
@ -301,13 +308,6 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
||||
device.setHeartBeatInterval(60);
|
||||
device.setPositionCapability(0);
|
||||
}
|
||||
if (sipTransactionInfo != null) {
|
||||
device.setSipTransactionInfo(sipTransactionInfo);
|
||||
}else {
|
||||
if (deviceInRedis != null) {
|
||||
device.setSipTransactionInfo(deviceInRedis.getSipTransactionInfo());
|
||||
}
|
||||
}
|
||||
|
||||
// 第一次上线 或则设备之前是离线状态--进行通道同步和设备信息查询
|
||||
if (deviceInDb == null) {
|
||||
@ -371,35 +371,24 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
||||
}
|
||||
// 设备状态任务添加
|
||||
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
|
||||
deviceStatusTaskRunner.addTask(device.getDeviceId(), expiresTime + System.currentTimeMillis());
|
||||
deviceStatusManager.add(device.getDeviceId(), expiresTime + System.currentTimeMillis());
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional
|
||||
public void offline(String deviceId, String reason, boolean check) {
|
||||
Device device = getDeviceByDeviceIdFromDb(deviceId);
|
||||
public void offline(Device device) {
|
||||
if (device == null) {
|
||||
log.warn("[设备不存在] device:{}", deviceId);
|
||||
log.warn("[设备不存在]");
|
||||
return;
|
||||
}
|
||||
|
||||
// 主动查询设备状态, 没有HostAddress无法发送请求,可能是手动添加的设备
|
||||
if (check && device.getHostAddress() != null) {
|
||||
Boolean deviceStatus = getDeviceStatus(device);
|
||||
if (deviceStatus != null && deviceStatus) {
|
||||
log.info("[设备离线] 主动探测发现设备在线,暂不处理 device:{}", deviceId);
|
||||
online(device, null);
|
||||
return;
|
||||
}
|
||||
}
|
||||
log.info("[设备离线] {}, device:{}, 心跳间隔: {},心跳超时次数: {}, 上次心跳时间:{}, 上次注册时间: {}", reason, deviceId,
|
||||
String deviceId = device.getDeviceId();
|
||||
log.info("[设备离线] device:{}, 心跳间隔: {},心跳超时次数: {}, 上次心跳时间:{}, 上次注册时间: {}", deviceId,
|
||||
device.getHeartBeatInterval(), device.getHeartBeatCount(), device.getKeepaliveTime(), device.getRegisterTime());
|
||||
device.setOnLine(false);
|
||||
cleanOfflineDevice(device);
|
||||
redisCatchStorage.updateDevice(device);
|
||||
deviceMapper.update(device);
|
||||
if (userSetting.getDeviceStatusNotify()) {
|
||||
// 发送redis消息
|
||||
// 发送 redis 消息
|
||||
redisCatchStorage.sendDeviceOrChannelStatus(device.getDeviceId(), null, false);
|
||||
}
|
||||
if (isDevice(deviceId)) {
|
||||
@ -818,8 +807,8 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
||||
if (subscribeTaskRunner.containsKey(SubscribeTaskForMobilPosition.getKey(device))) {
|
||||
removeMobilePositionSubscribe(device, null);
|
||||
}
|
||||
if (deviceStatusTaskRunner.containsKey(deviceId)) {
|
||||
deviceStatusTaskRunner.removeTask(deviceId);
|
||||
if (deviceStatusManager.contains(deviceId)) {
|
||||
deviceStatusManager.remove(deviceId);
|
||||
}
|
||||
List<CommonGBChannel> commonGBChannels = commonGBChannelMapper.queryByDataTypeAndDeviceIds(1, List.of(device.getId()));
|
||||
|
||||
@ -967,7 +956,7 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
||||
updateDevice(deviceInDb);
|
||||
|
||||
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
|
||||
deviceStatusTaskRunner.addTask(device.getDeviceId(), expiresTime + System.currentTimeMillis());
|
||||
deviceStatusManager.add(device.getDeviceId(), expiresTime + System.currentTimeMillis());
|
||||
}
|
||||
}
|
||||
|
||||
@ -1196,9 +1185,9 @@ public class DeviceServiceImpl implements IDeviceService, CommandLineRunner {
|
||||
try {
|
||||
sipCommander.deviceStatusQuery(device, (code, msg, data) -> {
|
||||
if ("ONLINE".equalsIgnoreCase(data.trim())) {
|
||||
online(device, null);
|
||||
online(device);
|
||||
}else {
|
||||
offline(device.getDeviceId(), "设备状态查询结果:" + data.trim(), true);
|
||||
offline(device);
|
||||
}
|
||||
if (callback != null) {
|
||||
callback.run(code, msg, data);
|
||||
|
||||
@ -215,7 +215,8 @@ public class CatalogDataManager implements CommandLineRunner {
|
||||
redisTemplate.delete(key);
|
||||
}
|
||||
|
||||
@Scheduled(fixedDelay = 5 * 1000) //每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
|
||||
//每5秒执行一次, 发现数据5秒未更新则移除数据并认为数据接收超时
|
||||
@Scheduled(fixedDelay = 5, timeUnit = TimeUnit.SECONDS)
|
||||
private void timerTask(){
|
||||
if (dataMap.isEmpty()) {
|
||||
return;
|
||||
|
||||
@ -4,23 +4,20 @@ import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
||||
import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
|
||||
import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
|
||||
import com.genersoft.iot.vmp.utils.redis.RedisUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.ObjectUtils;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.lang.Thread;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class DeviceStatusTaskRunner {
|
||||
public class DeviceStatusManager {
|
||||
|
||||
@Autowired
|
||||
private RedisTemplate<String, String> redisTemplate;
|
||||
@ -41,9 +38,9 @@ public class DeviceStatusTaskRunner {
|
||||
}
|
||||
|
||||
/**
|
||||
* 状态过期检查
|
||||
* 状态过期检查, 每秒检查一次, 系统启动10秒后开始检查
|
||||
*/
|
||||
@Scheduled(fixedDelay = 1, timeUnit = TimeUnit.SECONDS)
|
||||
@Scheduled(fixedDelay = 1, initialDelay = 10, timeUnit = TimeUnit.SECONDS)
|
||||
public void expirationCheck(){
|
||||
long now = System.currentTimeMillis();
|
||||
// 获取已过期的 deviceId (Score 介于 0 到 现在之间)
|
||||
@ -56,7 +53,7 @@ public class DeviceStatusTaskRunner {
|
||||
for (String deviceId : expiredIds) {
|
||||
Thread.startVirtualThread(() -> {
|
||||
// 获取详情后删除缓存
|
||||
Device device = redisCatchStorage.getDevice(deviceId);
|
||||
// Device device = redisCatchStorage.getDevice(deviceId);
|
||||
// redisCatchStorage.removeDevice(deviceId);
|
||||
// 发送 Spring 异步事件
|
||||
eventPublisher.deviceOfflineEventPublish(deviceId);
|
||||
@ -65,15 +62,15 @@ public class DeviceStatusTaskRunner {
|
||||
}
|
||||
}
|
||||
|
||||
public void addTask(String deviceId, long expireTime) {
|
||||
public void add(String deviceId, long expireTime) {
|
||||
redisTemplate.opsForZSet().add(redisKey(), deviceId, expireTime);
|
||||
}
|
||||
|
||||
public void removeTask(String deviceId) {
|
||||
public void remove(String deviceId) {
|
||||
redisTemplate.opsForZSet().remove(redisKey(), deviceId);
|
||||
}
|
||||
|
||||
public boolean containsKey(String deviceId) {
|
||||
public boolean contains(String deviceId) {
|
||||
if (ObjectUtils.isEmpty(deviceId)) {
|
||||
return false;
|
||||
}
|
||||
@ -23,6 +23,7 @@ import gov.nist.javax.sip.message.SIPResponse;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.InitializingBean;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.ObjectUtils;
|
||||
|
||||
@ -38,6 +39,9 @@ import java.security.NoSuchAlgorithmException;
|
||||
import java.text.ParseException;
|
||||
import java.util.Calendar;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* SIP命令类型: REGISTER请求
|
||||
@ -63,6 +67,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
|
||||
@Autowired
|
||||
private UserSetting userSetting;
|
||||
|
||||
|
||||
@Override
|
||||
public void afterPropertiesSet() throws Exception {
|
||||
// 添加消息处理的订阅
|
||||
@ -115,9 +120,9 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
|
||||
device.setTransport("TCP".equalsIgnoreCase(transport) ? "TCP" : "UDP");
|
||||
sipSender.transmitRequest(request.getLocalAddress().getHostAddress(), registerOkResponse);
|
||||
device.setRegisterTime(DateUtil.getNow());
|
||||
deviceService.online(device, null);
|
||||
deviceService.online(device);
|
||||
} else {
|
||||
deviceService.offline(deviceId, "主动注销", false);
|
||||
deviceService.offline(device);
|
||||
}
|
||||
return;
|
||||
}else {
|
||||
@ -125,6 +130,7 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
|
||||
if (!ObjectUtils.isEmpty(device.getPassword()) || !ObjectUtils.isEmpty(sipConfig.getPassword())) {
|
||||
password = (!ObjectUtils.isEmpty(device.getPassword())) ? device.getPassword() : sipConfig.getPassword();
|
||||
}
|
||||
// 如果设置了一个无密码的设备,那么这里就会自动跳动,后续会直接注册成功
|
||||
}
|
||||
}else {
|
||||
if (ObjectUtils.isEmpty(sipConfig.getPassword())) {
|
||||
@ -225,10 +231,11 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
|
||||
log.info("[注册成功] deviceId: {}->{}", deviceId, requestAddress);
|
||||
device.setRegisterTime(DateUtil.getNow());
|
||||
SipTransactionInfo sipTransactionInfo = new SipTransactionInfo((SIPResponse) response);
|
||||
deviceService.online(device, sipTransactionInfo);
|
||||
device.setSipTransactionInfo(sipTransactionInfo);
|
||||
deviceService.online(device);
|
||||
} else {
|
||||
log.info("[注销成功] deviceId: {}->{}", deviceId, requestAddress);
|
||||
deviceService.offline(deviceId, "主动注销", false);
|
||||
deviceService.offline(device);
|
||||
}
|
||||
} catch (SipException | NoSuchAlgorithmException | ParseException e) {
|
||||
log.error("未处理的异常 ", e);
|
||||
@ -256,4 +263,5 @@ public class RegisterRequestProcessor extends SIPRequestProcessorParent implemen
|
||||
return response;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -5,7 +5,7 @@ import com.genersoft.iot.vmp.conf.UserSetting;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Device;
|
||||
import com.genersoft.iot.vmp.gb28181.bean.Platform;
|
||||
import com.genersoft.iot.vmp.gb28181.service.IDeviceService;
|
||||
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusTaskRunner;
|
||||
import com.genersoft.iot.vmp.gb28181.task.deviceStatus.DeviceStatusManager;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.SIPRequestProcessorParent;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.IMessageHandler;
|
||||
import com.genersoft.iot.vmp.gb28181.transmit.event.request.impl.message.notify.NotifyMessageHandler;
|
||||
@ -49,7 +49,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
|
||||
private IDeviceService deviceService;
|
||||
|
||||
@Autowired
|
||||
private DeviceStatusTaskRunner statusTaskRunner;
|
||||
private DeviceStatusManager statusTaskRunner;
|
||||
|
||||
@Autowired
|
||||
private UserSetting userSetting;
|
||||
@ -71,7 +71,7 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
|
||||
|
||||
RemoteAddressInfo remoteAddressInfo = SipUtils.getRemoteAddressFromRequest(request, userSetting.getSipUseSourceIpAsRemoteAddress());
|
||||
if (device.getIp() == null || !device.getIp().equalsIgnoreCase(remoteAddressInfo.getIp()) || device.getPort() != remoteAddressInfo.getPort()) {
|
||||
log.info("[收到心跳] 地址变化, {}({}), {}:{}->{}", device.getName(), device.getDeviceId(), remoteAddressInfo.getIp(), remoteAddressInfo.getPort(), request.getLocalAddress().getHostAddress());
|
||||
log.info("[收到心跳] 地址变化, {}({}), {}:{}->{}:{}", device.getName(), device.getDeviceId(), device.getIp(), device.getPort(), remoteAddressInfo.getIp(), remoteAddressInfo.getPort());
|
||||
device.setPort(remoteAddressInfo.getPort());
|
||||
device.setHostAddress(IpPortUtil.concatenateIpAndPort(remoteAddressInfo.getIp(), String.valueOf(remoteAddressInfo.getPort())));
|
||||
device.setIp(remoteAddressInfo.getIp());
|
||||
@ -83,11 +83,11 @@ public class KeepaliveNotifyMessageHandler extends SIPRequestProcessorParent imp
|
||||
if (device.isOnLine()) {
|
||||
taskQueue.add(device);
|
||||
long expiresTime = Math.min(device.getExpires(), device.getHeartBeatInterval() * device.getHeartBeatCount()) * 1000L;
|
||||
statusTaskRunner.addTask(device.getDeviceId(), expiresTime + System.currentTimeMillis());
|
||||
statusTaskRunner.add(device.getDeviceId(), expiresTime + System.currentTimeMillis());
|
||||
} else {
|
||||
if (userSetting.getGbDeviceOnline() == 1) {
|
||||
// 对于已经离线的设备判断他的注册是否已经过期
|
||||
deviceService.online(device, null);
|
||||
deviceService.online(device);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -31,6 +31,7 @@ import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.lang.Thread;
|
||||
|
||||
/**
|
||||
* 目录查询的回复
|
||||
|
||||
Loading…
Reference in New Issue
Block a user